Files
joycontrol/protocol.py
T
2020-01-29 20:51:59 +09:00

132 lines
4.6 KiB
Python

import asyncio
import logging
from asyncio import BaseTransport, BaseProtocol
from typing import Optional, Union, Tuple, Text
from controller import Controller
from report import OutputReport, SubCommand, InputReport
logger = logging.getLogger(__name__)
def controller_protocol_factory(controller: Controller):
def create_controller_protocol():
return ControllerProtocol(controller)
return create_controller_protocol
class ControllerProtocol(BaseProtocol):
def __init__(self, controller: Controller):
self.controller = controller
self.transport = None
self._data_received = asyncio.Event()
async def wait_for_output_report(self):
self._data_received.clear()
await self._data_received.wait()
def connection_made(self, transport: BaseTransport) -> None:
logger.debug('Connection established.')
self.transport = transport
def connection_lost(self, exc: Optional[Exception]) -> None:
raise NotImplementedError()
def error_received(self, exc: Exception) -> None:
raise NotImplementedError()
async def report_received(self, data: Union[bytes, Text], addr: Tuple[str, int]) -> None:
self._data_received.set()
try:
report = OutputReport(list(data))
except ValueError as v_err:
logger.warning(f'Report parsing error "{v_err}" - IGNORE')
return
# classify sub command
sc_byte, sub_command = report.get_sub_command()
logging.info(f'received output report - {sub_command}')
if sub_command == SubCommand.REQUEST_DEVICE_INFO:
await self._command_request_device_info(report)
elif sub_command == SubCommand.SET_SHIPMENT_STATE:
await self._command_set_shipment_state(report)
elif sub_command == SubCommand.SPI_FLASH_READ:
await self._command_spi_flash_read(report)
elif sub_command == SubCommand.SET_INPUT_REPORT_MODE:
await self._command_set_input_report_mode(report)
elif sub_command == SubCommand.TRIGGER_BUTTONS_ELAPSED_TIME:
await self._command_trigger_buttons_elapsed_time(report)
elif sub_command == SubCommand.NOT_IMPLEMENTED:
logger.error(f'Sub command 0x{sc_byte:02x} not implemented - ignoring')
async def _command_request_device_info(self, output_report):
address = self.transport.get_extra_info('sockname')
assert address is not None
bd_address = list(map(lambda x: int(x, 16), address[0].split(':')))
input_report = InputReport()
input_report.set_input_report_id(0x21)
input_report.set_misc()
input_report.set_ack(0x82)
#input_report.set_button_status()
#input_report.set_left_analog_stick()
#input_report.set_right_analog_stick()
#input_report.set_vibrator_input()
input_report.sub_0x02_device_info(bd_address, controller=self.controller)
asyncio.ensure_future(self.transport.write(input_report))
async def _command_set_shipment_state(self, output_report):
input_report = InputReport()
input_report.set_input_report_id(0x21)
input_report.set_misc()
input_report.set_ack(0x80)
input_report.sub_0x08_shipment()
asyncio.ensure_future(self.transport.write(input_report))
async def _command_spi_flash_read(self, output_report):
input_report = InputReport()
input_report.set_input_report_id(0x21)
input_report.set_misc()
input_report.set_ack(0x90)
input_report.sub_0x10_spi_flash_read(output_report)
asyncio.ensure_future(self.transport.write(input_report))
async def _command_set_input_report_mode(self, output_report):
input_report = InputReport()
input_report.set_input_report_id(0x21)
input_report.set_misc()
input_report.set_ack(0x80)
input_report.sub_0x03_set_input_report_mode()
asyncio.ensure_future(self.transport.write(input_report))
async def _command_trigger_buttons_elapsed_time(self, output_report):
input_report = InputReport()
input_report.set_input_report_id(0x21)
input_report.set_misc()
input_report.set_ack(0x83)
input_report.sub_0x04_trigger_buttons_elapsed_time()
asyncio.ensure_future(self.transport.write(input_report))
async def _enable_6axis_sensor(self, output_report):
input_report = InputReport()
input_report.set_input_report_id(0x21)
input_report.set_misc()
input_report.set_ack(0x80)
input_report.reply_to_subcommand_id(0x40)
asyncio.ensure_future(self.transport.write(input_report))