diff --git a/scripts/relay_joycon.py b/scripts/relay_joycon.py index cbd0522..37025a9 100644 --- a/scripts/relay_joycon.py +++ b/scripts/relay_joycon.py @@ -1 +1,151 @@ -import argparse import asyncio import logging import os import socket import struct import time import hid from joycontrol import logging_default as log, utils from joycontrol.device import HidDevice from joycontrol.server import PROFILE_PATH from joycontrol.utils import AsyncHID logger = logging.getLogger(__name__) VENDOR_ID = 1406 PRODUCT_ID_JL = 8198 PRODUCT_ID_JR = 8199 PRODUCT_ID_PC = 8201 class Relay: def __init__(self, capture_file=None): self._capture_file = capture_file async def relay_input(self, hid_device, client_itr): loop = asyncio.get_event_loop() while True: data = await hid_device.read(100) # add adding byte for input report data = b'\xa1' + data if self._capture_file is not None: # write data to log file current_time = struct.pack('d', time.time()) size = struct.pack('i', len(data)) self._capture_file.write(current_time + size + data) await loop.sock_sendall(client_itr, data) await asyncio.sleep(0) async def relay_output(self, hid_device, client_itr): loop = asyncio.get_event_loop() while True: data = await loop.sock_recv(client_itr, 50) if self._capture_file is not None: # write data to log file current_time = struct.pack('d', time.time()) size = struct.pack('i', len(data)) self._capture_file.write(current_time + size + data) # remove padding byte for output report (not required when using the hid driver) data = data[1:] await hid_device.write(data) await asyncio.sleep(0) async def _main(capture_file=None): # Creating l2cap sockets ctl_sock = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP) itr_sock = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP) # HACK: To circumvent incompatibilities with the bluetooth "input" plugin, we need to restart Bluetooth here. # The Switch does not connect to the sockets if we don't. # For more info see: https://github.com/mart1nro/joycontrol/issues/8 logger.info('Restarting bluetooth service...') await utils.run_system_command('systemctl restart bluetooth.service') await asyncio.sleep(1) logger.info('Waiting for HID devices... Please connect one JoyCon (left OR right), ' 'or a Pro Controller over Bluetooth. Note: The bluez "input" plugin needs to be enabled (default)') controller = None while controller is None: for device in hid.enumerate(0, 0): # looking for devices matching Nintendo's vendor id and JoyCon product id if device['vendor_id'] == VENDOR_ID and device['product_id'] in (PRODUCT_ID_JL, PRODUCT_ID_JR, PRODUCT_ID_PC): controller = device break else: await asyncio.sleep(2) logger.info(f'Found controller "{controller}".') logger.info('Connecting with the Switch... Please open the "Change Grip/Order" menu.') ctl_sock.setblocking(False) itr_sock.setblocking(False) ctl_sock.bind((socket.BDADDR_ANY, 17)) itr_sock.bind((socket.BDADDR_ANY, 19)) ctl_sock.listen(1) itr_sock.listen(1) emulated_hid = HidDevice() # setting bluetooth adapter name and class to the device we wish to emulate await emulated_hid.set_name(controller['product_string']) await emulated_hid.set_class() logger.info('Advertising the Bluetooth SDP record...') emulated_hid.register_sdp_record(PROFILE_PATH) emulated_hid.discoverable() loop = asyncio.get_event_loop() client_ctl, ctl_address = await loop.sock_accept(ctl_sock) logger.info(f'Accepted connection at psm 17 from {ctl_address}') client_itr, itr_address = await loop.sock_accept(itr_sock) logger.info(f'Accepted connection at psm 19 from {itr_address}') assert ctl_address[0] == itr_address[0] # stop advertising emulated_hid.discoverable(False) relay = Relay(capture_file) try: with AsyncHID(path=controller['path'], loop=loop) as hid_controller: await asyncio.gather( asyncio.ensure_future(relay.relay_input(hid_controller, client_itr)), asyncio.ensure_future(relay.relay_output(hid_controller, client_itr)), ) finally: logger.info('Stopping communication...') client_itr.close() client_ctl.close() if __name__ == '__main__': # check if root if not os.geteuid() == 0: raise PermissionError('Script must be run as root!') parser = argparse.ArgumentParser() parser.add_argument('-l', '--log', help='log file path for capturing communication') args = parser.parse_args() # setup logging log.configure() with utils.get_output(args.log, default=None) as capture_file: loop = asyncio.get_event_loop() loop.run_until_complete( _main(capture_file=capture_file) ) \ No newline at end of file +import argparse +import asyncio +import logging +import os +import socket +import struct +import time + +import hid + +from joycontrol import logging_default as log, utils +from joycontrol.device import HidDevice +from joycontrol.server import PROFILE_PATH +from joycontrol.utils import AsyncHID + +logger = logging.getLogger(__name__) + +VENDOR_ID = 1406 +PRODUCT_ID_JL = 8198 +PRODUCT_ID_JR = 8199 +PRODUCT_ID_PC = 8201 + +class Relay: + def __init__(self, capture_file=None): + self._capture_file = capture_file + + async def relay_input(self, hid_device, client_itr): + loop = asyncio.get_event_loop() + + while True: + data = await hid_device.read(100) + # add adding byte for input report + data = b'\xa1' + data + + if self._capture_file is not None: + # write data to log file + current_time = struct.pack('d', time.time()) + size = struct.pack('i', len(data)) + self._capture_file.write(current_time + size + data) + + await loop.sock_sendall(client_itr, data) + await asyncio.sleep(0) + + async def relay_output(self, hid_device, client_itr): + loop = asyncio.get_event_loop() + + while True: + data = await loop.sock_recv(client_itr, 50) + + if self._capture_file is not None: + # write data to log file + current_time = struct.pack('d', time.time()) + size = struct.pack('i', len(data)) + self._capture_file.write(current_time + size + data) + + # remove padding byte for output report (not required when using the hid driver) + data = data[1:] + + await hid_device.write(data) + await asyncio.sleep(0) + + +async def _main(capture_file=None): + # Creating l2cap sockets + ctl_sock = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP) + itr_sock = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP) + + # HACK: To circumvent incompatibilities with the bluetooth "input" plugin, we need to restart Bluetooth here. + # The Switch does not connect to the sockets if we don't. + # For more info see: https://github.com/mart1nro/joycontrol/issues/8 + logger.info('Restarting bluetooth service...') + await utils.run_system_command('systemctl restart bluetooth.service') + await asyncio.sleep(1) + + logger.info('Waiting for HID devices... Please connect one JoyCon (left OR right), ' + 'or a Pro Controller over Bluetooth. Note: The bluez "input" plugin needs to be enabled (default)') + + controller = None + while controller is None: + for device in hid.enumerate(0, 0): + # looking for devices matching Nintendo's vendor id and JoyCon product id + if device['vendor_id'] == VENDOR_ID and device['product_id'] in (PRODUCT_ID_JL, PRODUCT_ID_JR, PRODUCT_ID_PC): + controller = device + break + else: + await asyncio.sleep(2) + + logger.info(f'Found controller "{controller}".') + + logger.info('Connecting with the Switch... Please open the "Change Grip/Order" menu.') + + ctl_sock.setblocking(False) + itr_sock.setblocking(False) + + ctl_sock.bind((socket.BDADDR_ANY, 17)) + itr_sock.bind((socket.BDADDR_ANY, 19)) + + ctl_sock.listen(1) + itr_sock.listen(1) + + emulated_hid = HidDevice() + # setting bluetooth adapter name and class to the device we wish to emulate + await emulated_hid.set_name(controller['product_string']) + await emulated_hid.set_class() + + logger.info('Advertising the Bluetooth SDP record...') + emulated_hid.register_sdp_record(PROFILE_PATH) + emulated_hid.discoverable() + + loop = asyncio.get_event_loop() + client_ctl, ctl_address = await loop.sock_accept(ctl_sock) + logger.info(f'Accepted connection at psm 17 from {ctl_address}') + client_itr, itr_address = await loop.sock_accept(itr_sock) + logger.info(f'Accepted connection at psm 19 from {itr_address}') + assert ctl_address[0] == itr_address[0] + + # stop advertising + emulated_hid.discoverable(False) + + relay = Relay(capture_file) + + try: + with AsyncHID(path=controller['path'], loop=loop) as hid_controller: + await asyncio.gather( + asyncio.ensure_future(relay.relay_input(hid_controller, client_itr)), + asyncio.ensure_future(relay.relay_output(hid_controller, client_itr)), + ) + finally: + logger.info('Stopping communication...') + client_itr.close() + client_ctl.close() + + +if __name__ == '__main__': + # check if root + if not os.geteuid() == 0: + raise PermissionError('Script must be run as root!') + + parser = argparse.ArgumentParser() + parser.add_argument('-l', '--log', help='log file path for capturing communication') + args = parser.parse_args() + + # setup logging + log.configure() + + with utils.get_output(args.log, default=None) as capture_file: + loop = asyncio.get_event_loop() + loop.run_until_complete( + _main(capture_file=capture_file) + ) +