From be8dce71a0bcddd00b7b599bed3b8f495f7b87e8 Mon Sep 17 00:00:00 2001 From: spacemeowx2 Date: Thu, 23 Apr 2020 23:07:21 +0800 Subject: [PATCH 1/3] feat: send amiibo report --- joycontrol/controller_state.py | 5 ++ joycontrol/mcu.py | 154 +++++++++++++++++++++++++++++++++ joycontrol/protocol.py | 145 ++++++++++++++++++++++++------- joycontrol/report.py | 14 ++- run_amiibo_cli.py | 104 ++++++++++++++++++++++ setup.py | 2 +- 6 files changed, 387 insertions(+), 37 deletions(-) create mode 100644 joycontrol/mcu.py create mode 100644 run_amiibo_cli.py diff --git a/joycontrol/controller_state.py b/joycontrol/controller_state.py index 6902a22..aee833e 100644 --- a/joycontrol/controller_state.py +++ b/joycontrol/controller_state.py @@ -9,6 +9,7 @@ class ControllerState: def __init__(self, protocol, controller: Controller, spi_flash: FlashMemory = None): self._protocol = protocol self._controller = controller + self._nfc_content = None self._spi_flash = spi_flash @@ -198,6 +199,10 @@ async def button_push(controller_state, *buttons, sec=0.1): await controller_state.send() +async def set_nfc(controller_state, nfc_content): + controller_state._nfc_content = nfc_content + + class _StickCalibration: def __init__(self, h_center, v_center, h_max_above_center, v_max_above_center, h_max_below_center, v_max_below_center): self.h_center = h_center diff --git a/joycontrol/mcu.py b/joycontrol/mcu.py new file mode 100644 index 0000000..0f26e93 --- /dev/null +++ b/joycontrol/mcu.py @@ -0,0 +1,154 @@ +from enum import Enum +from crc8 import crc8 + + +class Action(Enum): + NON = 0 + REQUEST_STATUS = 1 + START_TAG_POLLING = 2 + START_TAG_DISCOVERY = 3 + READ_TAG = 4 + READ_TAG_2 = 5 + READ_FINISHED = 6 + + +class McuState(Enum): + NOT_INITIALIZED = 0 + IRC = 1 + NFC = 2 + STAND_BY = 3 + BUSY = 4 + + +def copyarray(dest, offset, src): + for i in range(len(src)): + dest[offset + i] = src[i] + +class Mcu: + def __init__(self): + self._fw_major = [0, 3] + self._fw_minor = [0, 5] + + self._bytes = [0] * 313 + + self._action = Action.NON + self._state = McuState.NOT_INITIALIZED + + self._nfc_content = None + + def get_fw_major(self): + return self._fw_major + + def get_fw_minor(self): + return self._fw_minor + + def set_action(self, v): + self._action = v + + def get_action(self): + return self._action + + def set_state(self, v): + self._state = v + + def get_state(self): + return self._state + + def _get_state_byte(self): + if self.get_state() == McuState.NFC: + return 4 + elif self.get_state() == McuState.BUSY: + return 6 + elif self.get_state() == McuState.NOT_INITIALIZED: + return 1 + elif self.get_state() == McuState.STAND_BY: + return 1 + else: + return 0 + + def update_status(self): + self._bytes[0] = 1 + self._bytes[1] = 0 + self._bytes[2] = 0 + self._bytes[3] = self._fw_major[0] + self._bytes[4] = self._fw_major[1] + self._bytes[5] = self._fw_minor[0] + self._bytes[6] = self._fw_minor[1] + self._bytes[7] = self._get_state_byte() + + def update_nfc_report(self): + self._bytes = [0] * 313 + if self.get_action() == Action.REQUEST_STATUS: + self._bytes[0] = 1 + self._bytes[1] = 0 + self._bytes[2] = 0 + self._bytes[3] = self._fw_major[0] + self._bytes[4] = self._fw_major[1] + self._bytes[5] = self._fw_minor[0] + self._bytes[6] = self._fw_minor[1] + self._bytes[7] = self._get_state_byte() + elif self.get_action() == Action.NON: + self._bytes[0] = 0xff + elif self.get_action() == Action.START_TAG_DISCOVERY: + self._bytes[0] = 0x2a + self._bytes[1] = 0 + self._bytes[2] = 5 + self._bytes[3] = 0 + self._bytes[4] = 0 + self._bytes[5] = 9 + self._bytes[6] = 0x31 + self._bytes[7] = 0 + elif self.get_action() == Action.START_TAG_POLLING: + self._bytes[0] = 0x2a + self._bytes[1] = 0 + self._bytes[2] = 5 + self._bytes[3] = 0 + self._bytes[4] = 0 + if not self._nfc_content is None: + data = [0x09, 0x31, 0x09, 0x00, 0x00, 0x00, 0x01, 0x01, 0x02, 0x00, 0x07] + copyarray(self._bytes, 5, data) + copyarray(self._bytes, 5 + len(data), self._nfc_content[0:3]) + copyarray(self._bytes, 5 + len(data) + 3, self._nfc_content[4:8]) + else: + print('nfc content is none') + self._bytes[5] = 9 + self._bytes[6] = 0x31 + self._bytes[7] = 0 + elif self.get_action() == Action.READ_TAG or self.get_action() == Action.READ_TAG_2: + self._bytes[0] = 0x3a + self._bytes[1] = 0 + self._bytes[2] = 7 + if self.get_action() == Action.READ_TAG: + data1 = bytes.fromhex('010001310200000001020007') + copyarray(self._bytes, 3, data1) + copyarray(self._bytes, 3 + len(data1), self._nfc_content[0:3]) + copyarray(self._bytes, 3 + len(data1) + 3, self._nfc_content[4:8]) + data2 = bytes.fromhex('000000007DFDF0793651ABD7466E39C191BABEB856CEEDF1CE44CC75EAFB27094D087AE803003B3C7778860000') + copyarray(self._bytes, 3 + len(data1) + 3 + 4, data2) + copyarray(self._bytes, 3 + len(data1) + 3 + 4 + len(data2), self._nfc_content[0:245]) + self.set_action(Action.READ_TAG_2) + else: + data = bytes.fromhex('02000927') + copyarray(self._bytes, 3, data) + copyarray(self._bytes, 3 + len(data), self._nfc_content[245:]) + self.set_action(Action.READ_FINISHED) + elif self.get_action() == Action.READ_FINISHED: + self._bytes[0] = 0x2a + self._bytes[1] = 0 + self._bytes[2] = 5 + self._bytes[3] = 0 + self._bytes[4] = 0 + data = bytes.fromhex('0931040000000101020007') + copyarray(self._bytes, 5, data) + copyarray(self._bytes, 5 + len(data), self._nfc_content[0:3]) + copyarray(self._bytes, 5 + len(data) + 3, self._nfc_content[4:8]) + + crc = crc8() + crc.update(bytes(self._bytes[:-1])) + self._bytes[-1] = ord(crc.digest()) + + def set_nfc(self, nfc_content): + self._nfc_content = nfc_content + + def __bytes__(self): + return bytes(self._bytes) diff --git a/joycontrol/protocol.py b/joycontrol/protocol.py index 9606211..97b2537 100644 --- a/joycontrol/protocol.py +++ b/joycontrol/protocol.py @@ -10,6 +10,8 @@ from joycontrol.controller_state import ControllerState from joycontrol.memory import FlashMemory from joycontrol.report import OutputReport, SubCommand, InputReport, OutputReportID from joycontrol.transport import NotConnectedError +from joycontrol.mcu import Mcu, McuState, Action +from crc8 import crc8 logger = logging.getLogger(__name__) @@ -39,6 +41,8 @@ class ControllerProtocol(BaseProtocol): self._controller_state = ControllerState(self, controller, spi_flash=spi_flash) self._controller_state_sender = None + self._mcu = Mcu() + # None = Just answer to sub commands self._input_report_mode = None @@ -89,6 +93,11 @@ class ControllerProtocol(BaseProtocol): input_report.set_timer(self._input_report_timer) self._input_report_timer = (self._input_report_timer + 1) % 0x100 + if input_report.get_input_report_id() == 0x31: + self._mcu.set_nfc(self._controller_state._nfc_content) + self._mcu.update_nfc_report() + input_report.set_mcu(self._mcu) + await self.transport.write(input_report) self._controller_state.sig_is_send.set() @@ -120,15 +129,14 @@ class ControllerProtocol(BaseProtocol): # TODO? raise NotImplementedError() - async def input_report_mode_0x30(self): + async def input_report_mode_full(self): """ - Continuously sends 0x30 input reports containing the controller state. + Continuously sends full input reports containing the controller state. """ if self.transport.is_reading(): - raise ValueError('Transport must be paused in 0x30 input report mode') + raise ValueError('Transport must be paused in full input report mode') input_report = InputReport() - input_report.set_input_report_id(0x30) input_report.set_vibrator_input() input_report.set_misc() @@ -136,6 +144,7 @@ class ControllerProtocol(BaseProtocol): try: while True: + input_report.set_input_report_id(self._input_report_mode) # TODO: improve timing if self.controller == Controller.PRO_CONTROLLER: # send state at 120Hz @@ -159,6 +168,10 @@ class ControllerProtocol(BaseProtocol): pass elif output_report_id == OutputReportID.SUB_COMMAND: reply_send = await self._reply_to_sub_command(report) + elif output_report_id == OutputReportID.REQUEST_MCU: + reply_send = await self._reply_to_mcu(report) + else: + logger.warning(f'Report unknown output report "{output_report_id}" - IGNORE') except ValueError as v_err: logger.warning(f'Report parsing error "{v_err}" - IGNORE') except NotImplementedError as err: @@ -207,6 +220,47 @@ class ControllerProtocol(BaseProtocol): else: logger.warning(f'Output report {output_report_id} not implemented - ignoring') + async def _reply_to_mcu(self, report): + sub_command = report.data[11] + sub_command_data = report.data[12:] + + # logging.info(f'received output report - Request MCU sub command {sub_command}') + + if self._mcu.get_action() == Action.READ_TAG or self._mcu.get_action() == Action.READ_TAG_2 or self._mcu.get_action() == Action.READ_FINISHED: + return + + # Request mcu state + if sub_command == 0x01: + # input_report = InputReport() + # input_report.set_input_report_id(0x21) + # input_report.set_misc() + + # input_report.set_ack(0xA0) + # input_report.reply_to_subcommand_id(0x21) + + self._mcu.set_action(Action.REQUEST_STATUS) + # input_report.set_mcu(self._mcu) + + # await self.write(input_report) + # Send Start tag discovery + elif sub_command == 0x02: + # 0: Cancel all, 4: StartWaitingReceive + if sub_command_data[0] == 0x04: + self._mcu.set_action(Action.START_TAG_DISCOVERY) + # 1: Start polling + elif sub_command_data[0] == 0x01: + self._mcu.set_action(Action.START_TAG_POLLING) + # 2: stop polling + elif sub_command_data[0] == 0x02: + self._mcu.set_action(Action.NON) + elif sub_command_data[0] == 0x06: + self._mcu.set_action(Action.READ_TAG) + else: + logging.info(f'Unknown sub_command_data arg {sub_command_data}') + else: + logging.info(f'Unknown MCU sub command {sub_command}') + + async def _reply_to_sub_command(self, report): # classify sub command try: @@ -317,34 +371,40 @@ class ControllerProtocol(BaseProtocol): async def _command_set_input_report_mode(self, sub_command_data): if sub_command_data[0] == 0x30: - logger.info('Setting input report mode to 0x30...') - - input_report = InputReport() - input_report.set_input_report_id(0x21) - input_report.set_misc() - - input_report.set_ack(0x80) - input_report.reply_to_subcommand_id(0x03) - - await self.write(input_report) - - # start sending 0x30 input reports - if self._input_report_mode != 0x30: - self._input_report_mode = 0x30 - - self.transport.pause_reading() - new_reader = asyncio.ensure_future(self.input_report_mode_0x30()) - - # We need to swap the reader in the future because this function was probably called by it - async def set_reader(): - await self.transport.set_reader(new_reader) - self.transport.resume_reading() - - asyncio.ensure_future(set_reader()).add_done_callback( - utils.create_error_check_callback() - ) + pass + elif sub_command_data[0] == 0x31: + pass else: logger.error(f'input report mode {sub_command_data[0]} not implemented - ignoring request') + return + + logger.info(f'Setting input report mode to {hex(sub_command_data[0])}...') + + input_report = InputReport() + input_report.set_input_report_id(0x21) + input_report.set_misc() + + input_report.set_ack(0x80) + input_report.reply_to_subcommand_id(0x03) + + await self.write(input_report) + + # start sending input reports + if self._input_report_mode is None: + + self.transport.pause_reading() + new_reader = asyncio.ensure_future(self.input_report_mode_full()) + + # We need to swap the reader in the future because this function was probably called by it + async def set_reader(): + await self.transport.set_reader(new_reader) + self.transport.resume_reading() + + asyncio.ensure_future(set_reader()).add_done_callback( + utils.create_error_check_callback() + ) + + self._input_report_mode = sub_command_data[0] async def _command_trigger_buttons_elapsed_time(self, sub_command_data): input_report = InputReport() @@ -392,10 +452,26 @@ class ControllerProtocol(BaseProtocol): input_report.set_ack(0xA0) input_report.reply_to_subcommand_id(SubCommand.SET_NFC_IR_MCU_CONFIG.value) - # TODO - data = [1, 0, 255, 0, 8, 0, 27, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 200] + self._mcu.update_status() + data = list(bytes(self._mcu)[0:34]) + crc = crc8() + crc.update(bytes(data[:-1])) + checksum = crc.digest() + data[-1] = ord(checksum) + for i in range(len(data)): - input_report.data[16 + i] = data[i] + input_report.data[16+i] = data[i] + + # Set MCU mode cmd + if sub_command_data[1] == 0: + if sub_command_data[2] == 0: + self._mcu.set_state(McuState.STAND_BY) + elif sub_command_data[2] == 4: + self._mcu.set_state(McuState.NFC) + else: + logger.info(f"unknown mcu state {sub_command_data[2]}") + else: + logger.info(f"unknown mcu config command {sub_command_data}") await self.write(input_report) @@ -408,10 +484,13 @@ class ControllerProtocol(BaseProtocol): # 0x01 = Resume input_report.set_ack(0x80) input_report.reply_to_subcommand_id(SubCommand.SET_NFC_IR_MCU_STATE.value) + self._mcu.set_action(Action.NON) + self._mcu.set_state(McuState.STAND_BY) elif sub_command_data[0] == 0x00: # 0x00 = Suspend input_report.set_ack(0x80) input_report.reply_to_subcommand_id(SubCommand.SET_NFC_IR_MCU_STATE.value) + self._mcu.set_state(McuState.STAND_BY) else: raise NotImplementedError(f'Argument {sub_command_data[0]} of {SubCommand.SET_NFC_IR_MCU_STATE} ' f'not implemented.') diff --git a/joycontrol/report.py b/joycontrol/report.py index c6db581..fed830e 100644 --- a/joycontrol/report.py +++ b/joycontrol/report.py @@ -10,8 +10,7 @@ class InputReport: """ def __init__(self, data=None): if not data: - # TODO: not enough space for NFC/IR data input report - self.data = [0x00] * 51 + self.data = [0x00] * 364 # all input reports are prepended with 0xA1 self.data[0] = 0xA1 else: @@ -113,6 +112,12 @@ class InputReport: for i in range(14, 50): self.data[i] = 0x00 + def set_mcu(self, data): + # write to data + data = bytes(data) + for i in range(len(data)): + self.data[50 + i] = data[i] + def reply_to_subcommand_id(self, _id): if isinstance(_id, SubCommand): self.data[15] = _id.value @@ -195,8 +200,10 @@ class InputReport: return bytes(self.data[:51]) elif _id == 0x30: return bytes(self.data[:14]) + elif _id == 0x31: + return bytes(self.data[:363]) else: - return bytes(self.data) + return bytes(self.data[:51]) class SubCommand(Enum): @@ -215,6 +222,7 @@ class SubCommand(Enum): class OutputReportID(Enum): SUB_COMMAND = 0x01 RUMBLE_ONLY = 0x10 + REQUEST_MCU = 0x11 class OutputReport: diff --git a/run_amiibo_cli.py b/run_amiibo_cli.py new file mode 100644 index 0000000..57a00e0 --- /dev/null +++ b/run_amiibo_cli.py @@ -0,0 +1,104 @@ +import argparse +import asyncio +import logging +import os +from contextlib import contextmanager + +from joycontrol import logging_default as log +from joycontrol.command_line_interface import ControllerCLI +from joycontrol.controller_state import ControllerState, button_push, set_nfc +from joycontrol.protocol import controller_protocol_factory, Controller +from joycontrol.server import create_hid_server + +logger = logging.getLogger(__name__) + + +async def _main(controller, reconnect_bt_addr=None, capture_file=None, spi_flash=None, device_id=None, amiibo=None): + factory = controller_protocol_factory(controller, spi_flash=spi_flash) + ctl_psm, itr_psm = 17, 19 + transport, protocol = await create_hid_server(factory, + reconnect_bt_addr=reconnect_bt_addr, + ctl_psm=ctl_psm, itr_psm=itr_psm, capture_file=capture_file, device_id=device_id) + + controller_state = protocol.get_controller_state() + if amiibo: + await set_nfc(controller_state, amiibo.read()) + + await controller_state.connect() + + async def amiibo(filename): + with open(filename, "rb") as amiibo_file: + content = amiibo_file.read() + await set_nfc(controller_state, content) + + async def remove_amiibo(): + await controller_state.set_nfc(None) + + cli = ControllerCLI(controller_state) + cli.add_command('amiibo', amiibo) + cli.add_command('remove_amiibo', remove_amiibo) + await cli.run() + + logger.info('Stopping communication...') + await transport.close() + + +if __name__ == '__main__': + # check if root + if not os.geteuid() == 0: + raise PermissionError('Script must be run as root!') + + # setup logging + log.configure() + + parser = argparse.ArgumentParser() + #parser.add_argument('controller', help='JOYCON_R, JOYCON_L or PRO_CONTROLLER') + parser.add_argument('-l', '--log') + parser.add_argument('-d', '--device_id') + parser.add_argument('--spi_flash') + parser.add_argument('-r', '--reconnect_bt_addr', type=str, default=None, + help='The Switch console bluetooth address, for reconnecting as an already paired controller') + parser.add_argument('-a', '--amiibo', type=argparse.FileType('rb'), default=None, + help='The amiibo dump file') + args = parser.parse_args() + + """ + if args.controller == 'JOYCON_R': + controller = Controller.JOYCON_R + elif args.controller == 'JOYCON_L': + controller = Controller.JOYCON_L + elif args.controller == 'PRO_CONTROLLER': + controller = Controller.PRO_CONTROLLER + else: + raise ValueError(f'Unknown controller "{args.controller}".') + """ + controller = Controller.PRO_CONTROLLER + + spi_flash = None + if args.spi_flash: + with open(args.spi_flash, 'rb') as spi_flash_file: + spi_flash = spi_flash_file.read() + + # creates file if arg is given + @contextmanager + def get_output(path=None): + """ + Opens file if path is given + """ + if path is not None: + file = open(path, 'wb') + yield file + file.close() + else: + yield None + + with get_output(args.log) as capture_file: + loop = asyncio.get_event_loop() + loop.run_until_complete(_main( + controller, + reconnect_bt_addr=args.reconnect_bt_addr, + capture_file=capture_file, + spi_flash=spi_flash, + device_id=args.device_id, + amiibo=args.amiibo + )) diff --git a/setup.py b/setup.py index 1e0e4c9..bc1abf1 100644 --- a/setup.py +++ b/setup.py @@ -10,7 +10,7 @@ setup(name='joycontrol', package_data={'joycontrol': ['profile/sdp_record_hid.xml']}, zip_safe=False, install_requires=[ - 'hid', 'aioconsole', 'dbus-python' + 'hid', 'aioconsole', 'dbus-python', 'crc8' ] ) From c17ab2114074b86a3fc9505c17391016a99007c9 Mon Sep 17 00:00:00 2001 From: Robert Martin Date: Thu, 30 Apr 2020 20:44:07 +0200 Subject: [PATCH 2/3] merged run_amiibo_cli with run_controller_cli; some cleanups --- joycontrol/command_line_interface.py | 3 +- joycontrol/controller.py | 11 +++ joycontrol/controller_state.py | 7 +- joycontrol/protocol.py | 1 - run_amiibo_cli.py | 104 ------------------------ run_controller_cli.py | 115 +++++++++++++++++---------- 6 files changed, 87 insertions(+), 154 deletions(-) delete mode 100644 run_amiibo_cli.py diff --git a/joycontrol/command_line_interface.py b/joycontrol/command_line_interface.py index 8c4cad6..526726d 100644 --- a/joycontrol/command_line_interface.py +++ b/joycontrol/command_line_interface.py @@ -1,5 +1,6 @@ import inspect import logging +import shlex from aioconsole import ainput @@ -122,7 +123,7 @@ class ControllerCLI: buttons_to_push = [] for command in user_input.split('&&'): - cmd, *args = command.split() + cmd, *args = shlex.split(command) if cmd == 'exit': return diff --git a/joycontrol/controller.py b/joycontrol/controller.py index 33da3ae..3faecb4 100644 --- a/joycontrol/controller.py +++ b/joycontrol/controller.py @@ -18,3 +18,14 @@ class Controller(enum.Enum): return 'Pro Controller' else: raise NotImplementedError() + + @staticmethod + def from_arg(arg): + if arg == 'JOYCON_R': + return Controller.JOYCON_R + elif arg == 'JOYCON_L': + return Controller.JOYCON_L + elif arg == 'PRO_CONTROLLER': + return Controller.PRO_CONTROLLER + else: + raise ValueError(f'Unknown controller "{arg}".') diff --git a/joycontrol/controller_state.py b/joycontrol/controller_state.py index aee833e..997ed6b 100644 --- a/joycontrol/controller_state.py +++ b/joycontrol/controller_state.py @@ -48,6 +48,9 @@ class ControllerState: def get_flash_memory(self): return self._spi_flash + def set_nfc(self, nfc_content): + self._nfc_content = nfc_content + async def send(self): """ Invokes protocol.send_controller_state(). Returns after the controller state was send. @@ -199,10 +202,6 @@ async def button_push(controller_state, *buttons, sec=0.1): await controller_state.send() -async def set_nfc(controller_state, nfc_content): - controller_state._nfc_content = nfc_content - - class _StickCalibration: def __init__(self, h_center, v_center, h_max_above_center, v_max_above_center, h_max_below_center, v_max_below_center): self.h_center = h_center diff --git a/joycontrol/protocol.py b/joycontrol/protocol.py index 97b2537..cecf0b3 100644 --- a/joycontrol/protocol.py +++ b/joycontrol/protocol.py @@ -260,7 +260,6 @@ class ControllerProtocol(BaseProtocol): else: logging.info(f'Unknown MCU sub command {sub_command}') - async def _reply_to_sub_command(self, report): # classify sub command try: diff --git a/run_amiibo_cli.py b/run_amiibo_cli.py deleted file mode 100644 index 57a00e0..0000000 --- a/run_amiibo_cli.py +++ /dev/null @@ -1,104 +0,0 @@ -import argparse -import asyncio -import logging -import os -from contextlib import contextmanager - -from joycontrol import logging_default as log -from joycontrol.command_line_interface import ControllerCLI -from joycontrol.controller_state import ControllerState, button_push, set_nfc -from joycontrol.protocol import controller_protocol_factory, Controller -from joycontrol.server import create_hid_server - -logger = logging.getLogger(__name__) - - -async def _main(controller, reconnect_bt_addr=None, capture_file=None, spi_flash=None, device_id=None, amiibo=None): - factory = controller_protocol_factory(controller, spi_flash=spi_flash) - ctl_psm, itr_psm = 17, 19 - transport, protocol = await create_hid_server(factory, - reconnect_bt_addr=reconnect_bt_addr, - ctl_psm=ctl_psm, itr_psm=itr_psm, capture_file=capture_file, device_id=device_id) - - controller_state = protocol.get_controller_state() - if amiibo: - await set_nfc(controller_state, amiibo.read()) - - await controller_state.connect() - - async def amiibo(filename): - with open(filename, "rb") as amiibo_file: - content = amiibo_file.read() - await set_nfc(controller_state, content) - - async def remove_amiibo(): - await controller_state.set_nfc(None) - - cli = ControllerCLI(controller_state) - cli.add_command('amiibo', amiibo) - cli.add_command('remove_amiibo', remove_amiibo) - await cli.run() - - logger.info('Stopping communication...') - await transport.close() - - -if __name__ == '__main__': - # check if root - if not os.geteuid() == 0: - raise PermissionError('Script must be run as root!') - - # setup logging - log.configure() - - parser = argparse.ArgumentParser() - #parser.add_argument('controller', help='JOYCON_R, JOYCON_L or PRO_CONTROLLER') - parser.add_argument('-l', '--log') - parser.add_argument('-d', '--device_id') - parser.add_argument('--spi_flash') - parser.add_argument('-r', '--reconnect_bt_addr', type=str, default=None, - help='The Switch console bluetooth address, for reconnecting as an already paired controller') - parser.add_argument('-a', '--amiibo', type=argparse.FileType('rb'), default=None, - help='The amiibo dump file') - args = parser.parse_args() - - """ - if args.controller == 'JOYCON_R': - controller = Controller.JOYCON_R - elif args.controller == 'JOYCON_L': - controller = Controller.JOYCON_L - elif args.controller == 'PRO_CONTROLLER': - controller = Controller.PRO_CONTROLLER - else: - raise ValueError(f'Unknown controller "{args.controller}".') - """ - controller = Controller.PRO_CONTROLLER - - spi_flash = None - if args.spi_flash: - with open(args.spi_flash, 'rb') as spi_flash_file: - spi_flash = spi_flash_file.read() - - # creates file if arg is given - @contextmanager - def get_output(path=None): - """ - Opens file if path is given - """ - if path is not None: - file = open(path, 'wb') - yield file - file.close() - else: - yield None - - with get_output(args.log) as capture_file: - loop = asyncio.get_event_loop() - loop.run_until_complete(_main( - controller, - reconnect_bt_addr=args.reconnect_bt_addr, - capture_file=capture_file, - spi_flash=spi_flash, - device_id=args.device_id, - amiibo=args.amiibo - )) diff --git a/run_controller_cli.py b/run_controller_cli.py index 6f517b2..9204b6e 100644 --- a/run_controller_cli.py +++ b/run_controller_cli.py @@ -4,7 +4,6 @@ import argparse import asyncio import logging import os -from contextlib import contextmanager from aioconsole import ainput @@ -133,31 +132,79 @@ async def test_controller_buttons(controller_state: ControllerState): await button_push(controller_state, 'home') -async def _main(controller, reconnect_bt_addr=None, capture_file=None, spi_flash=None, device_id=None): - factory = controller_protocol_factory(controller, spi_flash=spi_flash) - ctl_psm, itr_psm = 17, 19 - transport, protocol = await create_hid_server(factory, reconnect_bt_addr=reconnect_bt_addr, ctl_psm=ctl_psm, - itr_psm=itr_psm, capture_file=capture_file, device_id=device_id) +async def set_amiibo(controller_state, file_path): + """ + Sets nfc content of the controller state to contents of the given file. + :param controller_state: Emulated controller state + :param file_path: Path to amiibo dump file + """ + loop = asyncio.get_event_loop() - controller_state = protocol.get_controller_state() + with open(file_path, 'rb') as amiibo_file: + content = await loop.run_in_executor(None, amiibo_file.read) + controller_state.set_nfc(content) - # Create command line interface and add some extra commands - cli = ControllerCLI(controller_state) - # Wrap the script so we can pass the controller state. The doc string will be printed when calling 'help' - async def _run_test_controller_buttons(): - """ - test_buttons - Navigates to the "Test Controller Buttons" menu and presses all buttons. - """ - await test_controller_buttons(controller_state) +async def _main(args): + # parse the spi flash + spi_flash = None + if args.spi_flash: + with open(args.spi_flash, 'rb') as spi_flash_file: + spi_flash = FlashMemory(spi_flash_file.read()) - # add the script from above - cli.add_command('test_buttons', _run_test_controller_buttons) + # Get controller name to emulate from arguments + controller = Controller.from_arg(args.controller) - await cli.run() + with utils.get_output(path=args.log, default=None) as capture_file: + factory = controller_protocol_factory(controller, spi_flash=spi_flash) + ctl_psm, itr_psm = 17, 19 + transport, protocol = await create_hid_server(factory, reconnect_bt_addr=args.reconnect_bt_addr, + ctl_psm=ctl_psm, + itr_psm=itr_psm, capture_file=capture_file, + device_id=args.device_id) - logger.info('Stopping communication...') - await transport.close() + controller_state = protocol.get_controller_state() + + # Create command line interface and add some extra commands + cli = ControllerCLI(controller_state) + + # Wrap the script so we can pass the controller state. The doc string will be printed when calling 'help' + async def _run_test_controller_buttons(): + """ + test_buttons - Navigates to the "Test Controller Buttons" menu and presses all buttons. + """ + await test_controller_buttons(controller_state) + + # add the script from above + cli.add_command('test_buttons', _run_test_controller_buttons) + + # Create amiibo command + async def amiibo(*args): + """ + amiibo - Sets amiibo content + + Usage: + amiibo Set controller state NFC content to file + amiibo remove Remove NFC content from controller state + """ + if controller_state.get_controller() == Controller.JOYCON_L: + raise ValueError('NFC content cannot be set for JOYCON_L') + elif not args: + raise ValueError('"amiibo" command requires amiibo dump file path as argument!') + elif args[0] == 'remove': + controller_state.set_nfc(None) + print('Removed nfc content.') + else: + await set_amiibo(controller_state, args[0]) + + # add the script from above + cli.add_command('amiibo', amiibo) + + try: + await cli.run() + finally: + logger.info('Stopping communication...') + await transport.close() if __name__ == '__main__': @@ -178,27 +225,7 @@ if __name__ == '__main__': help='The Switch console Bluetooth address, for reconnecting as an already paired controller') args = parser.parse_args() - if args.controller == 'JOYCON_R': - controller = Controller.JOYCON_R - elif args.controller == 'JOYCON_L': - controller = Controller.JOYCON_L - elif args.controller == 'PRO_CONTROLLER': - controller = Controller.PRO_CONTROLLER - else: - raise ValueError(f'Unknown controller "{args.controller}".') - - spi_flash = None - if args.spi_flash: - with open(args.spi_flash, 'rb') as spi_flash_file: - spi_flash = FlashMemory(spi_flash_file.read()) - - with utils.get_output(path=args.log, default=None) as capture_file: - loop = asyncio.get_event_loop() - loop.run_until_complete( - _main(controller, - reconnect_bt_addr=args.reconnect_bt_addr, - capture_file=capture_file, - spi_flash=spi_flash, - device_id=args.device_id - ) - ) + loop = asyncio.get_event_loop() + loop.run_until_complete( + _main(args) + ) From 3adf0b2878b2a9677644a88eda351e122f432095 Mon Sep 17 00:00:00 2001 From: Robert Martin Date: Mon, 4 May 2020 16:11:47 +0200 Subject: [PATCH 3/3] improved input report timing; some nfc cleanup --- joycontrol/command_line_interface.py | 59 +++++++++++--- joycontrol/controller_state.py | 3 + joycontrol/{mcu.py => ir_nfc_mcu.py} | 23 +++--- joycontrol/protocol.py | 111 ++++++++++++++++----------- joycontrol/report.py | 26 ++++++- scripts/relay_joycon.py | 1 + 6 files changed, 154 insertions(+), 69 deletions(-) rename joycontrol/{mcu.py => ir_nfc_mcu.py} (91%) diff --git a/joycontrol/command_line_interface.py b/joycontrol/command_line_interface.py index 526726d..fb5a870 100644 --- a/joycontrol/command_line_interface.py +++ b/joycontrol/command_line_interface.py @@ -38,15 +38,16 @@ def _print_doc(string): print(line[prefix_i:] if line.strip() else line) -class ControllerCLI: - def __init__(self, controller_state: ControllerState): - self.controller_state = controller_state +class CLI: + def __init__(self): self.commands = {} + def add_command(self, name, command): + if name in self.commands: + raise ValueError(f'Command {name} already registered.') + self.commands[name] = command + async def cmd_help(self): - print('Button commands:') - print(', '.join(self.controller_state.button_state.get_available_buttons())) - print() print('Commands:') for name, fun in inspect.getmembers(self): if name.startswith('cmd_') and fun.__doc__: @@ -59,6 +60,47 @@ class ControllerCLI: print('Commands can be chained using "&&"') print('Type "exit" to close.') + async def run(self): + while True: + user_input = await ainput(prompt='cmd >> ') + if not user_input: + continue + + for command in user_input.split('&&'): + cmd, *args = shlex.split(command) + + if cmd == 'exit': + return + + if hasattr(self, f'cmd_{cmd}'): + try: + result = await getattr(self, f'cmd_{cmd}')(*args) + if result: + print(result) + except Exception as e: + print(e) + elif cmd in self.commands: + try: + result = await self.commands[cmd](*args) + if result: + print(result) + except Exception as e: + print(e) + else: + print('command', cmd, 'not found, call help for help.') + + +class ControllerCLI(CLI): + def __init__(self, controller_state: ControllerState): + super().__init__() + self.controller_state = controller_state + + async def cmd_help(self): + print('Button commands:') + print(', '.join(self.controller_state.button_state.get_available_buttons())) + print() + await super().cmd_help() + @staticmethod def _set_stick(stick, direction, value): if direction == 'center': @@ -109,11 +151,6 @@ class ControllerCLI: else: raise ValueError('Value of side must be "l", "left" or "r", "right"') - def add_command(self, name, command): - if name in self.commands: - raise ValueError(f'Command {name} already registered.') - self.commands[name] = command - async def run(self): while True: user_input = await ainput(prompt='cmd >> ') diff --git a/joycontrol/controller_state.py b/joycontrol/controller_state.py index 997ed6b..dcc0460 100644 --- a/joycontrol/controller_state.py +++ b/joycontrol/controller_state.py @@ -51,6 +51,9 @@ class ControllerState: def set_nfc(self, nfc_content): self._nfc_content = nfc_content + def get_nfc(self): + return self._nfc_content + async def send(self): """ Invokes protocol.send_controller_state(). Returns after the controller state was send. diff --git a/joycontrol/mcu.py b/joycontrol/ir_nfc_mcu.py similarity index 91% rename from joycontrol/mcu.py rename to joycontrol/ir_nfc_mcu.py index 0f26e93..e618edd 100644 --- a/joycontrol/mcu.py +++ b/joycontrol/ir_nfc_mcu.py @@ -1,6 +1,9 @@ +import logging from enum import Enum from crc8 import crc8 +logger = logging.getLogger(__name__) + class Action(Enum): NON = 0 @@ -24,7 +27,12 @@ def copyarray(dest, offset, src): for i in range(len(src)): dest[offset + i] = src[i] -class Mcu: + +class IrNfcMcu: + """ + TODO: cleanup + """ + def __init__(self): self._fw_major = [0, 3] self._fw_minor = [0, 5] @@ -79,14 +87,7 @@ class Mcu: def update_nfc_report(self): self._bytes = [0] * 313 if self.get_action() == Action.REQUEST_STATUS: - self._bytes[0] = 1 - self._bytes[1] = 0 - self._bytes[2] = 0 - self._bytes[3] = self._fw_major[0] - self._bytes[4] = self._fw_major[1] - self._bytes[5] = self._fw_minor[0] - self._bytes[6] = self._fw_minor[1] - self._bytes[7] = self._get_state_byte() + self.update_status() elif self.get_action() == Action.NON: self._bytes[0] = 0xff elif self.get_action() == Action.START_TAG_DISCOVERY: @@ -104,13 +105,13 @@ class Mcu: self._bytes[2] = 5 self._bytes[3] = 0 self._bytes[4] = 0 - if not self._nfc_content is None: + if self._nfc_content is not None: data = [0x09, 0x31, 0x09, 0x00, 0x00, 0x00, 0x01, 0x01, 0x02, 0x00, 0x07] copyarray(self._bytes, 5, data) copyarray(self._bytes, 5 + len(data), self._nfc_content[0:3]) copyarray(self._bytes, 5 + len(data) + 3, self._nfc_content[4:8]) else: - print('nfc content is none') + logger.info('nfc content is none') self._bytes[5] = 9 self._bytes[6] = 0x31 self._bytes[7] = 0 diff --git a/joycontrol/protocol.py b/joycontrol/protocol.py index cecf0b3..7134f0d 100644 --- a/joycontrol/protocol.py +++ b/joycontrol/protocol.py @@ -1,5 +1,6 @@ import asyncio import logging +import time from asyncio import BaseTransport, BaseProtocol from contextlib import suppress from typing import Optional, Union, Tuple, Text @@ -10,7 +11,7 @@ from joycontrol.controller_state import ControllerState from joycontrol.memory import FlashMemory from joycontrol.report import OutputReport, SubCommand, InputReport, OutputReportID from joycontrol.transport import NotConnectedError -from joycontrol.mcu import Mcu, McuState, Action +from joycontrol.ir_nfc_mcu import IrNfcMcu, McuState, Action from crc8 import crc8 logger = logging.getLogger(__name__) @@ -41,7 +42,7 @@ class ControllerProtocol(BaseProtocol): self._controller_state = ControllerState(self, controller, spi_flash=spi_flash) self._controller_state_sender = None - self._mcu = Mcu() + self._mcu = IrNfcMcu() # None = Just answer to sub commands self._input_report_mode = None @@ -55,7 +56,7 @@ class ControllerProtocol(BaseProtocol): Raises NotConnected exception if the transport is not connected or the connection was lost. """ - # TODO: Call write directly if not in 0x30 input report mode + # TODO: Call write directly if in continuously sending input report mode if self.transport is None: raise NotConnectedError('Transport not registered.') @@ -93,11 +94,6 @@ class ControllerProtocol(BaseProtocol): input_report.set_timer(self._input_report_timer) self._input_report_timer = (self._input_report_timer + 1) % 0x100 - if input_report.get_input_report_id() == 0x31: - self._mcu.set_nfc(self._controller_state._nfc_content) - self._mcu.update_nfc_report() - input_report.set_mcu(self._mcu) - await self.transport.write(input_report) self._controller_state.sig_is_send.set() @@ -131,28 +127,29 @@ class ControllerProtocol(BaseProtocol): async def input_report_mode_full(self): """ - Continuously sends full input reports containing the controller state. + Continuously sends: + 0x30 input reports containing the controller state OR + 0x31 input reports containing the controller state and nfc data """ if self.transport.is_reading(): raise ValueError('Transport must be paused in full input report mode') + # send state at 66Hz + send_delay = 0.015 + await asyncio.sleep(send_delay) + last_send_time = time.time() + input_report = InputReport() input_report.set_vibrator_input() input_report.set_misc() + if self._input_report_mode is None: + raise ValueError('Input report mode is not set.') + input_report.set_input_report_id(self._input_report_mode) reader = asyncio.ensure_future(self.transport.read()) try: while True: - input_report.set_input_report_id(self._input_report_mode) - # TODO: improve timing - if self.controller == Controller.PRO_CONTROLLER: - # send state at 120Hz - await asyncio.sleep(1 / 120) - else: - # send state at 60Hz - await asyncio.sleep(1 / 60) - reply_send = False if reader.done(): data = await reader @@ -168,8 +165,10 @@ class ControllerProtocol(BaseProtocol): pass elif output_report_id == OutputReportID.SUB_COMMAND: reply_send = await self._reply_to_sub_command(report) - elif output_report_id == OutputReportID.REQUEST_MCU: - reply_send = await self._reply_to_mcu(report) + elif output_report_id == OutputReportID.REQUEST_IR_NFC_MCU: + # TODO: This does not reply anything + # reply_send = await self._reply_to_ir_nfc_mcu(report) + await self._reply_to_ir_nfc_mcu(report) else: logger.warning(f'Report unknown output report "{output_report_id}" - IGNORE') except ValueError as v_err: @@ -185,8 +184,26 @@ class ControllerProtocol(BaseProtocol): # TODO: set some sensor data input_report.set_6axis_data() + # set nfc data + if input_report.get_input_report_id() == 0x31: + self._mcu.set_nfc(self._controller_state.get_nfc()) + self._mcu.update_nfc_report() + input_report.set_ir_nfc_data(bytes(self._mcu)) + await self.write(input_report) + # calculate delay + current_time = time.time() + time_delta = time.time() - last_send_time + sleep_time = send_delay - time_delta + last_send_time = current_time + + if sleep_time < 0: + # logger.warning(f'Code is running {abs(sleep_time)} s too slow!') + sleep_time = 0 + + await asyncio.sleep(sleep_time) + except NotConnectedError as err: # Stop 0x30 input report mode if disconnected. logger.error(err) @@ -220,13 +237,17 @@ class ControllerProtocol(BaseProtocol): else: logger.warning(f'Output report {output_report_id} not implemented - ignoring') - async def _reply_to_mcu(self, report): + async def _reply_to_ir_nfc_mcu(self, report): + """ + TODO: Cleanup + We aren't replying to anything here, do we need to? + """ sub_command = report.data[11] sub_command_data = report.data[12:] # logging.info(f'received output report - Request MCU sub command {sub_command}') - if self._mcu.get_action() == Action.READ_TAG or self._mcu.get_action() == Action.READ_TAG_2 or self._mcu.get_action() == Action.READ_FINISHED: + if self._mcu.get_action() in (Action.READ_TAG, Action.READ_TAG_2, Action.READ_FINISHED): return # Request mcu state @@ -369,16 +390,35 @@ class ControllerProtocol(BaseProtocol): await self.write(input_report) async def _command_set_input_report_mode(self, sub_command_data): - if sub_command_data[0] == 0x30: - pass - elif sub_command_data[0] == 0x31: - pass + if self._input_report_mode == sub_command_data[0]: + logger.warning(f'Already in input report mode {sub_command_data[0]} - ignoring request') + + # Start input report reader + if sub_command_data[0] in (0x30, 0x31): + new_reader = asyncio.ensure_future(self.input_report_mode_full()) else: logger.error(f'input report mode {sub_command_data[0]} not implemented - ignoring request') return - logger.info(f'Setting input report mode to {hex(sub_command_data[0])}...') + # Replace the currently running reader with the input report mode sender, + # which will also handle incoming requests in the future + self.transport.pause_reading() + + # We need to replace the reader in the future because this function was probably called by it + async def set_reader(): + await self.transport.set_reader(new_reader) + + logger.info(f'Setting input report mode to {hex(sub_command_data[0])}...') + self._input_report_mode = sub_command_data[0] + + self.transport.resume_reading() + + asyncio.ensure_future(set_reader()).add_done_callback( + utils.create_error_check_callback() + ) + + # Send acknowledgement input_report = InputReport() input_report.set_input_report_id(0x21) input_report.set_misc() @@ -388,23 +428,6 @@ class ControllerProtocol(BaseProtocol): await self.write(input_report) - # start sending input reports - if self._input_report_mode is None: - - self.transport.pause_reading() - new_reader = asyncio.ensure_future(self.input_report_mode_full()) - - # We need to swap the reader in the future because this function was probably called by it - async def set_reader(): - await self.transport.set_reader(new_reader) - self.transport.resume_reading() - - asyncio.ensure_future(set_reader()).add_done_callback( - utils.create_error_check_callback() - ) - - self._input_report_mode = sub_command_data[0] - async def _command_trigger_buttons_elapsed_time(self, sub_command_data): input_report = InputReport() input_report.set_input_report_id(0x21) diff --git a/joycontrol/report.py b/joycontrol/report.py index fed830e..3218245 100644 --- a/joycontrol/report.py +++ b/joycontrol/report.py @@ -112,9 +112,11 @@ class InputReport: for i in range(14, 50): self.data[i] = 0x00 - def set_mcu(self, data): + def set_ir_nfc_data(self, data): + if 50 + len(data) > len(self.data): + raise ValueError('Too much data.') + # write to data - data = bytes(data) for i in range(len(data)): self.data[50 + i] = data[i] @@ -205,6 +207,15 @@ class InputReport: else: return bytes(self.data[:51]) + def __str__(self): + _id = f'Input {self.get_input_report_id():x}' + _info = '' + if self.get_input_report_id() == 0x21: + _info = self.get_reply_to_subcommand_id() + _bytes = ' '.join(f'{byte:x}' for byte in bytes(self)) + + return f'{_id} {_info}\n{_bytes}' + class SubCommand(Enum): REQUEST_DEVICE_INFO = 0x02 @@ -222,7 +233,7 @@ class SubCommand(Enum): class OutputReportID(Enum): SUB_COMMAND = 0x01 RUMBLE_ONLY = 0x10 - REQUEST_MCU = 0x11 + REQUEST_IR_NFC_MCU = 0x11 class OutputReport: @@ -306,3 +317,12 @@ class OutputReport: def __bytes__(self): return bytes(self.data) + + def __str__(self): + _id = f'Output {self.get_output_report_id()}' + _info = '' + if self.get_output_report_id() == OutputReportID.SUB_COMMAND: + _info = self.get_sub_command() + _bytes = ' '.join(f'{byte:x}' for byte in bytes(self)) + + return f'{_id} {_info}\n{_bytes}' diff --git a/scripts/relay_joycon.py b/scripts/relay_joycon.py index 14d6b4c..41b37e3 100644 --- a/scripts/relay_joycon.py +++ b/scripts/relay_joycon.py @@ -20,6 +20,7 @@ PRODUCT_ID_JL = 8198 PRODUCT_ID_JR = 8199 PRODUCT_ID_PC = 8201 + class Relay: def __init__(self, capture_file=None): self._capture_file = capture_file