From 18a423aa5b92d1db085322d0d522ec29a9a0dc93 Mon Sep 17 00:00:00 2001 From: Robert Martin Date: Sat, 11 Apr 2020 16:34:16 +0900 Subject: [PATCH] added scripts --- joycontrol/utils.py | 33 +++++++++ run_controller_cli.py | 17 +---- .../dump_spi_flash.py | 2 +- scripts/parse_capture.py | 68 +++++++++++++++++++ scripts/relay_joycon.py | 1 + 5 files changed, 105 insertions(+), 16 deletions(-) rename dump_spi_flash.py => scripts/dump_spi_flash.py (99%) create mode 100644 scripts/parse_capture.py create mode 100644 scripts/relay_joycon.py diff --git a/joycontrol/utils.py b/joycontrol/utils.py index 77c0498..4eadc31 100644 --- a/joycontrol/utils.py +++ b/joycontrol/utils.py @@ -1,9 +1,42 @@ import asyncio import logging +from contextlib import contextmanager + +import hid logger = logging.getLogger(__name__) +class AsyncHID(hid.Device): + def __init__(self, *args, loop=asyncio.get_event_loop(), **kwargs): + super().__init__(*args, **kwargs) + self._loop = loop + + self._write_lock = asyncio.Lock() + self._read_lock = asyncio.Lock() + + async def read(self, size, timeout=None): + async with self._read_lock: + return await self._loop.run_in_executor(None, hid.Device.read, self, size, timeout) + + async def write(self, data): + async with self._write_lock: + return await self._loop.run_in_executor(None, hid.Device.write, self, data) + + +@contextmanager +def get_output(path=None, open_flags='wb', default=None): + """ + Context manager that open the file a path was given, otherwise returns default value. + """ + if path is not None: + file = open(path, open_flags) + yield file + file.close() + else: + yield default + + def get_bit(value, n): return (value >> n & 1) != 0 diff --git a/run_controller_cli.py b/run_controller_cli.py index 0221eee..6f517b2 100644 --- a/run_controller_cli.py +++ b/run_controller_cli.py @@ -8,7 +8,7 @@ from contextlib import contextmanager from aioconsole import ainput -from joycontrol import logging_default as log +from joycontrol import logging_default as log, utils from joycontrol.command_line_interface import ControllerCLI from joycontrol.controller import Controller from joycontrol.controller_state import ControllerState, button_push @@ -192,20 +192,7 @@ if __name__ == '__main__': with open(args.spi_flash, 'rb') as spi_flash_file: spi_flash = FlashMemory(spi_flash_file.read()) - # creates file if arg is given - @contextmanager - def get_output(path=None): - """ - Opens file if path is given - """ - if path is not None: - file = open(path, 'wb') - yield file - file.close() - else: - yield None - - with get_output(args.log) as capture_file: + with utils.get_output(path=args.log, default=None) as capture_file: loop = asyncio.get_event_loop() loop.run_until_complete( _main(controller, diff --git a/dump_spi_flash.py b/scripts/dump_spi_flash.py similarity index 99% rename from dump_spi_flash.py rename to scripts/dump_spi_flash.py index 7f54e7c..05c82b4 100644 --- a/dump_spi_flash.py +++ b/scripts/dump_spi_flash.py @@ -11,7 +11,7 @@ from joycontrol.report import OutputReport, InputReport, SubCommand logger = logging.getLogger(__name__) - +# TODO: Add Pro Controller VENDOR_ID = 1406 PRODUCT_ID_JL = 8198 PRODUCT_ID_JR = 8199 diff --git a/scripts/parse_capture.py b/scripts/parse_capture.py new file mode 100644 index 0000000..2b47613 --- /dev/null +++ b/scripts/parse_capture.py @@ -0,0 +1,68 @@ +import argparse +import struct + +from joycontrol.report import InputReport, OutputReport, SubCommand + +""" joycontrol capture parsing example. + +Usage: + parse_capture.py + parse_capture.py -h | --help +""" + + +def _eof_read(file, size): + """ + Raises EOFError if end of file is reached. + """ + data = file.read(size) + if not data: + raise EOFError() + return data + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument('capture_file') + args = parser.parse_args() + + # list of time, report tuples + input_reports = [] + output_reports = [] + + with open(args.capture_file, 'rb') as capture: + try: + start_time = None + while True: + # parse capture time + time = struct.unpack('d', _eof_read(capture, 8))[0] + if start_time is None: + start_time = time + + # parse data size + size = struct.unpack('i', _eof_read(capture, 4))[0] + # parse data + data = list(_eof_read(capture, size)) + + if data[0] == 0xA1: + report = InputReport(data) + # normalise time + input_reports.append((time - start_time, report)) + elif data[0] == 0xA2: + report = OutputReport(data) + # normalise time + output_reports.append((time - start_time, report)) + else: + raise ValueError(f'Unexpected data.') + + # only interested in pairing + if isinstance(report, OutputReport) and report.get_sub_command() == SubCommand.SET_PLAYER_LIGHTS: + break + except EOFError: + pass + + print('Finished parsing reports.') + print('Input reports:', len(input_reports)) + print('Output reports:', len(output_reports)) + + # Do some investigation... diff --git a/scripts/relay_joycon.py b/scripts/relay_joycon.py new file mode 100644 index 0000000..30f5b45 --- /dev/null +++ b/scripts/relay_joycon.py @@ -0,0 +1 @@ +import argparse import asyncio import logging import os import socket import struct import time import hid from joycontrol import logging_default as log, utils from joycontrol.device import HidDevice from joycontrol.server import PROFILE_PATH from joycontrol.utils import AsyncHID logger = logging.getLogger(__name__) # TODO: Add Pro Controller VENDOR_ID = 1406 PRODUCT_ID_JL = 8198 PRODUCT_ID_JR = 8199 class Relay: def __init__(self, capture_file=None): self._capture_file = capture_file async def relay_input(self, hid_device, client_itr): loop = asyncio.get_event_loop() while True: data = await hid_device.read(100) # add adding byte for input report data = b'\xa1' + data if self._capture_file is not None: # write data to log file current_time = struct.pack('d', time.time()) size = struct.pack('i', len(data)) self._capture_file.write(current_time + size + data) await loop.sock_sendall(client_itr, data) await asyncio.sleep(0) async def relay_output(self, hid_device, client_itr): loop = asyncio.get_event_loop() while True: data = await loop.sock_recv(client_itr, 50) if self._capture_file is not None: # write data to log file current_time = struct.pack('d', time.time()) size = struct.pack('i', len(data)) self._capture_file.write(current_time + size + data) # remove padding byte for output report (not required when using the hid driver) data = data[1:] await hid_device.write(data) await asyncio.sleep(0) async def _main(capture_file=None): # Creating l2cap sockets ctl_sock = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP) itr_sock = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP) # HACK: To circumvent incompatibilities with the bluetooth "input" plugin, we need to restart Bluetooth here. # The Switch does not connect to the sockets if we don't. # For more info see: https://github.com/mart1nro/joycontrol/issues/8 logger.info('Restarting bluetooth service...') await utils.run_system_command('systemctl restart bluetooth.service') await asyncio.sleep(1) logger.info('Waiting for HID devices... Please connect JoyCon over bluetooth. ' 'Note: The bluez "input" plugin needs to be enabled (default)"') controller = None while controller is None: for device in hid.enumerate(0, 0): # looking for devices matching Nintendo's vendor id and JoyCon product id if device['vendor_id'] == VENDOR_ID and device['product_id'] in (PRODUCT_ID_JL, PRODUCT_ID_JR): controller = device break else: await asyncio.sleep(2) logger.info(f'Found controller "{controller}".') logger.info('Connecting with the Switch... Please open the "Change Grip/Order" menu.') ctl_sock.setblocking(False) itr_sock.setblocking(False) ctl_sock.bind((socket.BDADDR_ANY, 17)) itr_sock.bind((socket.BDADDR_ANY, 19)) ctl_sock.listen(1) itr_sock.listen(1) emulated_hid = HidDevice() # setting bluetooth adapter name and class to the device we wish to emulate await emulated_hid.set_name(controller['product_string']) await emulated_hid.set_class() logger.info('Advertising the Bluetooth SDP record...') emulated_hid.register_sdp_record(PROFILE_PATH) emulated_hid.discoverable() loop = asyncio.get_event_loop() client_ctl, ctl_address = await loop.sock_accept(ctl_sock) logger.info(f'Accepted connection at psm 17 from {ctl_address}') client_itr, itr_address = await loop.sock_accept(itr_sock) logger.info(f'Accepted connection at psm 19 from {itr_address}') assert ctl_address[0] == itr_address[0] # stop advertising emulated_hid.discoverable(False) relay = Relay(capture_file) try: with AsyncHID(path=controller['path'], loop=loop) as hid_controller: await asyncio.gather( asyncio.ensure_future(relay.relay_input(hid_controller, client_itr)), asyncio.ensure_future(relay.relay_output(hid_controller, client_itr)), ) finally: logger.info('Stopping communication...') client_itr.close() client_ctl.close() if __name__ == '__main__': # check if root if not os.geteuid() == 0: raise PermissionError('Script must be run as root!') parser = argparse.ArgumentParser() parser.add_argument('-l', '--log', help='log file path for capturing communication') args = parser.parse_args() # setup logging log.configure() with utils.get_output(args.log, default=None) as capture_file: loop = asyncio.get_event_loop() loop.run_until_complete( _main(capture_file=capture_file) ) \ No newline at end of file