diff --git a/README.md b/README.md index 0c88ad6..6c8256d 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ Tested on Ubuntu 19.10, and with Raspberry Pi 3B+ and 4B Raspbian GNU/Linux 10 ( Emulation of JOYCON_R, JOYCON_L and PRO_CONTROLLER. Able to send: - button commands - stick state -- nfc data +- ~~nfc data~~ (removed, see [#80](https://github.com/mart1nro/joycontrol/issues/80)) ## Installation - Install dependencies @@ -24,7 +24,7 @@ Arch Linux Derivatives: Install the `hidapi` and `bluez-utils-compat`(AUR) packa ```bash sudo pip3 install . ``` -- Disable the bluez "input" plugin, see [#8](https://github.com/mart1nro/joycontrol/issues/8) +- Consider to disable the bluez "input" plugin, see [#8](https://github.com/mart1nro/joycontrol/issues/8) ## Command line interface example - Run the script @@ -57,6 +57,9 @@ Call "help" to see a list of available commands. opening of the "Change Grip/Order" menu is not required. - ... +## Thanks +- Special thanks to https://github.com/dekuNukem/Nintendo_Switch_Reverse_Engineering for reverse engineering of the joycon protocol +- Thanks to the growing number of contributers and users ## Resources diff --git a/joycontrol/controller_state.py b/joycontrol/controller_state.py index 8dadd55..000d5f8 100644 --- a/joycontrol/controller_state.py +++ b/joycontrol/controller_state.py @@ -118,7 +118,7 @@ class ButtonState: self._available_buttons = {'y', 'x', 'b', 'a', 'sr', 'sl', 'r', 'zr', 'plus', 'r_stick', 'home'} elif self.controller == Controller.JOYCON_L: - self._available_buttons = {'plus', 'l_stick', 'capture', + self._available_buttons = {'minus', 'l_stick', 'capture', 'down', 'up', 'right', 'left', 'sr', 'sl', 'l', 'zl'} # byte 1 @@ -171,13 +171,13 @@ class ButtonState: def get_available_buttons(self): """ - :returns set of valid buttons + :returns: set of valid buttons """ return set(self._available_buttons) def __iter__(self): """ - :returns iterator over the button bytes + :returns: iterator over the button bytes """ yield self._byte_1 yield self._byte_2 @@ -187,7 +187,12 @@ class ButtonState: self._byte_1 = self._byte_2 = self._byte_3 = 0 -async def button_push(controller_state, *buttons, sec=0.1): +async def button_press(controller_state, *buttons): + """ + Set given buttons in the controller state to the pressed down state and wait till send. + :param controller_state: + :param buttons: Buttons to press down (see ButtonState.get_available_buttons) + """ if not buttons: raise ValueError('No Buttons were given.') @@ -195,20 +200,43 @@ async def button_push(controller_state, *buttons, sec=0.1): for button in buttons: # push button - button_state.set_button(button) + button_state.set_button(button, pushed=True) - # send report + # wait until report is send await controller_state.send() - await asyncio.sleep(sec) + + +async def button_release(controller_state, *buttons): + """ + Set given buttons in the controller state to the unpressed state and wait till send. + :param controller_state: + :param buttons: Buttons to set to unpressed (see ButtonState.get_available_buttons) + """ + if not buttons: + raise ValueError('No Buttons were given.') + + button_state = controller_state.button_state for button in buttons: # release button button_state.set_button(button, pushed=False) - # send report + # wait until report is send await controller_state.send() +async def button_push(controller_state, *buttons, sec=0.1): + """ + Shortly push the given buttons. Wait until the controller state is send. + :param controller_state: + :param buttons: Buttons to push (see ButtonState.get_available_buttons) + :param sec: Seconds to wait before releasing the button, default: 0.1 + """ + await button_press(controller_state, *buttons) + await asyncio.sleep(sec) + await button_release(controller_state, *buttons) + + class _StickCalibration: def __init__(self, h_center, v_center, h_max_above_center, v_max_above_center, h_max_below_center, v_max_below_center): self.h_center = h_center diff --git a/joycontrol/hid.py b/joycontrol/hid.py new file mode 100644 index 0000000..446f215 --- /dev/null +++ b/joycontrol/hid.py @@ -0,0 +1,36 @@ +import asyncio + +import hid + +VENDOR_ID = 1406 +PRODUCT_ID_JL = 8198 +PRODUCT_ID_JR = 8199 +PRODUCT_ID_PC = 8201 + + +async def get_blt_hid_device(): + while True: + for device in hid.enumerate(0, 0): + # looking for devices matching Nintendo's vendor id and JoyCon product id + if device['vendor_id'] == VENDOR_ID and device['product_id'] in ( + PRODUCT_ID_JL, PRODUCT_ID_JR, PRODUCT_ID_PC): + return device + + await asyncio.sleep(2) + + +class AsyncHID(hid.Device): + def __init__(self, *args, loop=asyncio.get_event_loop(), **kwargs): + super().__init__(*args, **kwargs) + self._loop = loop + + self._write_lock = asyncio.Lock() + self._read_lock = asyncio.Lock() + + async def read(self, size, timeout=None): + async with self._read_lock: + return await self._loop.run_in_executor(None, hid.Device.read, self, size, timeout) + + async def write(self, data): + async with self._write_lock: + return await self._loop.run_in_executor(None, hid.Device.write, self, data) \ No newline at end of file diff --git a/joycontrol/ir_nfc_mcu.py b/joycontrol/ir_nfc_mcu.py deleted file mode 100644 index 50b387b..0000000 --- a/joycontrol/ir_nfc_mcu.py +++ /dev/null @@ -1,155 +0,0 @@ -import logging -from enum import Enum -from crc8 import crc8 - -logger = logging.getLogger(__name__) - - -class Action(Enum): - NON = 0 - REQUEST_STATUS = 1 - START_TAG_POLLING = 2 - START_TAG_DISCOVERY = 3 - READ_TAG = 4 - READ_TAG_2 = 5 - READ_FINISHED = 6 - - -class McuState(Enum): - NOT_INITIALIZED = 0 - IRC = 1 - NFC = 2 - STAND_BY = 3 - BUSY = 4 - - -def copyarray(dest, offset, src): - for i in range(len(src)): - dest[offset + i] = src[i] - - -class IrNfcMcu: - """ - TODO: cleanup - """ - - def __init__(self): - self._fw_major = [0, 3] - self._fw_minor = [0, 5] - - self._bytes = [0] * 313 - - self._action = Action.NON - self._state = McuState.NOT_INITIALIZED - - self._nfc_content = None - - def get_fw_major(self): - return self._fw_major - - def get_fw_minor(self): - return self._fw_minor - - def set_action(self, v): - self._action = v - - def get_action(self): - return self._action - - def set_state(self, v): - self._state = v - - def get_state(self): - return self._state - - def _get_state_byte(self): - if self.get_state() == McuState.NFC: - return 4 - elif self.get_state() == McuState.BUSY: - return 6 - elif self.get_state() == McuState.NOT_INITIALIZED: - return 1 - elif self.get_state() == McuState.STAND_BY: - return 1 - else: - return 0 - - def update_status(self): - self._bytes[0] = 1 - self._bytes[1] = 0 - self._bytes[2] = 0 - self._bytes[3] = self._fw_major[0] - self._bytes[4] = self._fw_major[1] - self._bytes[5] = self._fw_minor[0] - self._bytes[6] = self._fw_minor[1] - self._bytes[7] = self._get_state_byte() - - def update_nfc_report(self): - self._bytes = [0] * 313 - if self.get_action() == Action.REQUEST_STATUS: - self.update_status() - elif self.get_action() == Action.NON: - self._bytes[0] = 0xff - elif self.get_action() == Action.START_TAG_DISCOVERY: - self._bytes[0] = 0x2a - self._bytes[1] = 0 - self._bytes[2] = 5 - self._bytes[3] = 0 - self._bytes[4] = 0 - self._bytes[5] = 9 - self._bytes[6] = 0x31 - self._bytes[7] = 0 - elif self.get_action() == Action.START_TAG_POLLING: - self._bytes[0] = 0x2a - self._bytes[1] = 0 - self._bytes[2] = 5 - self._bytes[3] = 0 - self._bytes[4] = 0 - if self._nfc_content is not None: - data = [0x09, 0x31, 0x09, 0x00, 0x00, 0x00, 0x01, 0x01, 0x02, 0x00, 0x07] - copyarray(self._bytes, 5, data) - copyarray(self._bytes, 5 + len(data), self._nfc_content[0:3]) - copyarray(self._bytes, 5 + len(data) + 3, self._nfc_content[4:8]) - else: - logger.info('nfc content is none') - self._bytes[5] = 9 - self._bytes[6] = 0x31 - self._bytes[7] = 0 - elif self.get_action() in (Action.READ_TAG, Action.READ_TAG_2): - self._bytes[0] = 0x3a - self._bytes[1] = 0 - self._bytes[2] = 7 - if self.get_action() == Action.READ_TAG: - data1 = bytes.fromhex('010001310200000001020007') - copyarray(self._bytes, 3, data1) - copyarray(self._bytes, 3 + len(data1), self._nfc_content[0:3]) - copyarray(self._bytes, 3 + len(data1) + 3, self._nfc_content[4:8]) - data2 = bytes.fromhex('000000007DFDF0793651ABD7466E39C191BABEB856CEEDF1CE44CC75EAFB27094D087AE803003B3C7778860000') - copyarray(self._bytes, 3 + len(data1) + 3 + 4, data2) - copyarray(self._bytes, 3 + len(data1) + 3 + 4 + len(data2), self._nfc_content[0:245]) - self.set_action(Action.READ_TAG_2) - else: - data = bytes.fromhex('02000927') - copyarray(self._bytes, 3, data) - copyarray(self._bytes, 3 + len(data), self._nfc_content[245:540]) - self.set_action(Action.READ_FINISHED) - elif self.get_action() == Action.READ_FINISHED: - self._bytes[0] = 0x2a - self._bytes[1] = 0 - self._bytes[2] = 5 - self._bytes[3] = 0 - self._bytes[4] = 0 - data = bytes.fromhex('0931040000000101020007') - copyarray(self._bytes, 5, data) - copyarray(self._bytes, 5 + len(data), self._nfc_content[0:3]) - copyarray(self._bytes, 5 + len(data) + 3, self._nfc_content[4:8]) - - crc = crc8() - crc.update(bytes(self._bytes[:-1])) - self._bytes[-1] = ord(crc.digest()) - - def set_nfc(self, nfc_content): - self._nfc_content = nfc_content - - def __bytes__(self): - return bytes(self._bytes) diff --git a/joycontrol/protocol.py b/joycontrol/protocol.py index 7134f0d..f83a9d9 100644 --- a/joycontrol/protocol.py +++ b/joycontrol/protocol.py @@ -11,8 +11,6 @@ from joycontrol.controller_state import ControllerState from joycontrol.memory import FlashMemory from joycontrol.report import OutputReport, SubCommand, InputReport, OutputReportID from joycontrol.transport import NotConnectedError -from joycontrol.ir_nfc_mcu import IrNfcMcu, McuState, Action -from crc8 import crc8 logger = logging.getLogger(__name__) @@ -42,8 +40,6 @@ class ControllerProtocol(BaseProtocol): self._controller_state = ControllerState(self, controller, spi_flash=spi_flash) self._controller_state_sender = None - self._mcu = IrNfcMcu() - # None = Just answer to sub commands self._input_report_mode = None @@ -166,9 +162,8 @@ class ControllerProtocol(BaseProtocol): elif output_report_id == OutputReportID.SUB_COMMAND: reply_send = await self._reply_to_sub_command(report) elif output_report_id == OutputReportID.REQUEST_IR_NFC_MCU: - # TODO: This does not reply anything - # reply_send = await self._reply_to_ir_nfc_mcu(report) - await self._reply_to_ir_nfc_mcu(report) + # TODO NFC + raise NotImplementedError('NFC communictation is not implemented.') else: logger.warning(f'Report unknown output report "{output_report_id}" - IGNORE') except ValueError as v_err: @@ -184,11 +179,9 @@ class ControllerProtocol(BaseProtocol): # TODO: set some sensor data input_report.set_6axis_data() - # set nfc data + # TODO NFC - set nfc data if input_report.get_input_report_id() == 0x31: - self._mcu.set_nfc(self._controller_state.get_nfc()) - self._mcu.update_nfc_report() - input_report.set_ir_nfc_data(bytes(self._mcu)) + pass await self.write(input_report) @@ -237,50 +230,6 @@ class ControllerProtocol(BaseProtocol): else: logger.warning(f'Output report {output_report_id} not implemented - ignoring') - async def _reply_to_ir_nfc_mcu(self, report): - """ - TODO: Cleanup - We aren't replying to anything here, do we need to? - """ - sub_command = report.data[11] - sub_command_data = report.data[12:] - - # logging.info(f'received output report - Request MCU sub command {sub_command}') - - if self._mcu.get_action() in (Action.READ_TAG, Action.READ_TAG_2, Action.READ_FINISHED): - return - - # Request mcu state - if sub_command == 0x01: - # input_report = InputReport() - # input_report.set_input_report_id(0x21) - # input_report.set_misc() - - # input_report.set_ack(0xA0) - # input_report.reply_to_subcommand_id(0x21) - - self._mcu.set_action(Action.REQUEST_STATUS) - # input_report.set_mcu(self._mcu) - - # await self.write(input_report) - # Send Start tag discovery - elif sub_command == 0x02: - # 0: Cancel all, 4: StartWaitingReceive - if sub_command_data[0] == 0x04: - self._mcu.set_action(Action.START_TAG_DISCOVERY) - # 1: Start polling - elif sub_command_data[0] == 0x01: - self._mcu.set_action(Action.START_TAG_POLLING) - # 2: stop polling - elif sub_command_data[0] == 0x02: - self._mcu.set_action(Action.NON) - elif sub_command_data[0] == 0x06: - self._mcu.set_action(Action.READ_TAG) - else: - logging.info(f'Unknown sub_command_data arg {sub_command_data}') - else: - logging.info(f'Unknown MCU sub command {sub_command}') - async def _reply_to_sub_command(self, report): # classify sub command try: @@ -467,6 +416,7 @@ class ControllerProtocol(BaseProtocol): await self.write(input_report) async def _command_set_nfc_ir_mcu_config(self, sub_command_data): + # TODO NFC input_report = InputReport() input_report.set_input_report_id(0x21) input_report.set_misc() @@ -474,30 +424,14 @@ class ControllerProtocol(BaseProtocol): input_report.set_ack(0xA0) input_report.reply_to_subcommand_id(SubCommand.SET_NFC_IR_MCU_CONFIG.value) - self._mcu.update_status() - data = list(bytes(self._mcu)[0:34]) - crc = crc8() - crc.update(bytes(data[:-1])) - checksum = crc.digest() - data[-1] = ord(checksum) - + data = [1, 0, 255, 0, 8, 0, 27, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 200] for i in range(len(data)): - input_report.data[16+i] = data[i] - - # Set MCU mode cmd - if sub_command_data[1] == 0: - if sub_command_data[2] == 0: - self._mcu.set_state(McuState.STAND_BY) - elif sub_command_data[2] == 4: - self._mcu.set_state(McuState.NFC) - else: - logger.info(f"unknown mcu state {sub_command_data[2]}") - else: - logger.info(f"unknown mcu config command {sub_command_data}") + input_report.data[16 + i] = data[i] await self.write(input_report) async def _command_set_nfc_ir_mcu_state(self, sub_command_data): + # TODO NFC input_report = InputReport() input_report.set_input_report_id(0x21) input_report.set_misc() @@ -506,13 +440,10 @@ class ControllerProtocol(BaseProtocol): # 0x01 = Resume input_report.set_ack(0x80) input_report.reply_to_subcommand_id(SubCommand.SET_NFC_IR_MCU_STATE.value) - self._mcu.set_action(Action.NON) - self._mcu.set_state(McuState.STAND_BY) elif sub_command_data[0] == 0x00: # 0x00 = Suspend input_report.set_ack(0x80) input_report.reply_to_subcommand_id(SubCommand.SET_NFC_IR_MCU_STATE.value) - self._mcu.set_state(McuState.STAND_BY) else: raise NotImplementedError(f'Argument {sub_command_data[0]} of {SubCommand.SET_NFC_IR_MCU_STATE} ' f'not implemented.') diff --git a/joycontrol/report.py b/joycontrol/report.py index 849ac68..7b0e8e6 100644 --- a/joycontrol/report.py +++ b/joycontrol/report.py @@ -272,16 +272,29 @@ class OutputReport: @staticmethod def _encode_rumble_data(freq, amp): - if not 0 <= freq <= 1252: - raise ValueError('Frequency must be in [0, 1252].') + # TODO: Fix LA Byte 2 + + if not (40 <= freq <= 1253): + raise ValueError('Frequency must be in [40, 1253].') + + if amp > 1.003: + raise ValueError('Amplitudes higher than 1.003 are not safe ' + 'for the integrity of the linear resonant actuators') # Float frequency to hex conversion encoded_hex_freq = int(round(math.log2(freq / 10) * 32)) # Convert to Joy-Con HF range. Range in big-endian: 0x0004-0x01FC with +0x0004 steps. - hf = (encoded_hex_freq - 0x60) * 4 + if freq <= 80: + hf = 0x00 + else: + hf = (encoded_hex_freq - 0x60) * 4 + # Convert to Joy-Con LF range. Range: 0x01-0x7F. - lf = encoded_hex_freq - 0x40 + if freq >= 640: + lf = 0x00 + else: + lf = encoded_hex_freq - 0x40 # Float amplitude to hex conversion encoded_hex_amp = 0 diff --git a/joycontrol/server.py b/joycontrol/server.py index 7d7a212..eb3e19a 100644 --- a/joycontrol/server.py +++ b/joycontrol/server.py @@ -76,10 +76,9 @@ async def create_hid_server(protocol_factory, ctl_psm=17, itr_psm=19, device_id= hid.powered(True) hid.pairable(True) - - # setting bluetooth adapter name and class to the device we wish to emulate + + # setting bluetooth adapter name to the device we wish to emulate await hid.set_name(protocol.controller.device_name()) - await hid.set_class() logger.info('Advertising the Bluetooth SDP record...') try: @@ -88,6 +87,9 @@ async def create_hid_server(protocol_factory, ctl_psm=17, itr_psm=19, device_id= # Already registered (If multiple controllers are being emulated and this method is called consecutive times) logger.debug(dbus_err) + # set the device class to "Gamepad/joystick" + await hid.set_class() + # start advertising hid.discoverable() diff --git a/joycontrol/utils.py b/joycontrol/utils.py index 4eadc31..496cb02 100644 --- a/joycontrol/utils.py +++ b/joycontrol/utils.py @@ -2,28 +2,9 @@ import asyncio import logging from contextlib import contextmanager -import hid - logger = logging.getLogger(__name__) -class AsyncHID(hid.Device): - def __init__(self, *args, loop=asyncio.get_event_loop(), **kwargs): - super().__init__(*args, **kwargs) - self._loop = loop - - self._write_lock = asyncio.Lock() - self._read_lock = asyncio.Lock() - - async def read(self, size, timeout=None): - async with self._read_lock: - return await self._loop.run_in_executor(None, hid.Device.read, self, size, timeout) - - async def write(self, data): - async with self._write_lock: - return await self._loop.run_in_executor(None, hid.Device.write, self, data) - - @contextmanager def get_output(path=None, open_flags='wb', default=None): """ diff --git a/run_controller_cli.py b/run_controller_cli.py old mode 100644 new mode 100755 index c90e506..c84597b --- a/run_controller_cli.py +++ b/run_controller_cli.py @@ -10,7 +10,7 @@ from aioconsole import ainput from joycontrol import logging_default as log, utils from joycontrol.command_line_interface import ControllerCLI from joycontrol.controller import Controller -from joycontrol.controller_state import ControllerState, button_push +from joycontrol.controller_state import ControllerState, button_push, button_press, button_release from joycontrol.memory import FlashMemory from joycontrol.protocol import controller_protocol_factory from joycontrol.server import create_hid_server @@ -136,25 +136,21 @@ async def test_controller_buttons(controller_state: ControllerState): await button_push(controller_state, 'home') -async def set_nfc(controller_state, file_path): +def ensure_valid_button(controller_state, *buttons): """ - Sets nfc content of the controller state to contents of the given file. - :param controller_state: Emulated controller state - :param file_path: Path to nfc dump file + Raise ValueError if any of the given buttons os not part of the controller state. + :param controller_state: + :param buttons: Any number of buttons to check (see ButtonState.get_available_buttons) """ - loop = asyncio.get_event_loop() - - with open(file_path, 'rb') as nfc_file: - content = await loop.run_in_executor(None, nfc_file.read) - controller_state.set_nfc(content) + for button in buttons: + if button not in controller_state.button_state.get_available_buttons(): + raise ValueError(f'Button {button} does not exist on {controller_state.get_controller()}') async def mash_button(controller_state, button, interval): - # waits until controller is fully connected + # wait until controller is fully connected await controller_state.connect() - - if button not in controller_state.button_state.get_available_buttons(): - raise ValueError(f'Button {button} does not exist on {controller_state.get_controller()}') + ensure_valid_button(controller_state, button) user_input = asyncio.ensure_future( ainput(prompt=f'Pressing the {button} button every {interval} seconds... Press to stop.') @@ -168,6 +164,107 @@ async def mash_button(controller_state, button, interval): await user_input +def _register_commands_with_controller_state(controller_state, cli): + """ + Commands registered here can use the given controller state. + The doc string of commands will be printed by the CLI when calling "help" + :param cli: + :param controller_state: + """ + async def test_buttons(): + """ + test_buttons - Navigates to the "Test Controller Buttons" menu and presses all buttons. + """ + await test_controller_buttons(controller_state) + + cli.add_command(test_buttons.__name__, test_buttons) + + # Mash a button command + async def mash(*args): + """ + mash - Mash a specified button at a set interval + + Usage: + mash