Files
joycontrol/scripts/relay_joycon.py
T
2020-04-12 22:34:26 -04:00

152 lines
5.0 KiB
Python

import argparse
import asyncio
import logging
import os
import socket
import struct
import time
import hid
from joycontrol import logging_default as log, utils
from joycontrol.device import HidDevice
from joycontrol.server import PROFILE_PATH
from joycontrol.utils import AsyncHID
logger = logging.getLogger(__name__)
VENDOR_ID = 1406
PRODUCT_ID_JL = 8198
PRODUCT_ID_JR = 8199
PRODUCT_ID_PC = 8201
class Relay:
def __init__(self, capture_file=None):
self._capture_file = capture_file
async def relay_input(self, hid_device, client_itr):
loop = asyncio.get_event_loop()
while True:
data = await hid_device.read(100)
# add adding byte for input report
data = b'\xa1' + data
if self._capture_file is not None:
# write data to log file
current_time = struct.pack('d', time.time())
size = struct.pack('i', len(data))
self._capture_file.write(current_time + size + data)
await loop.sock_sendall(client_itr, data)
await asyncio.sleep(0)
async def relay_output(self, hid_device, client_itr):
loop = asyncio.get_event_loop()
while True:
data = await loop.sock_recv(client_itr, 50)
if self._capture_file is not None:
# write data to log file
current_time = struct.pack('d', time.time())
size = struct.pack('i', len(data))
self._capture_file.write(current_time + size + data)
# remove padding byte for output report (not required when using the hid driver)
data = data[1:]
await hid_device.write(data)
await asyncio.sleep(0)
async def _main(capture_file=None):
# Creating l2cap sockets
ctl_sock = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP)
itr_sock = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP)
# HACK: To circumvent incompatibilities with the bluetooth "input" plugin, we need to restart Bluetooth here.
# The Switch does not connect to the sockets if we don't.
# For more info see: https://github.com/mart1nro/joycontrol/issues/8
logger.info('Restarting bluetooth service...')
await utils.run_system_command('systemctl restart bluetooth.service')
await asyncio.sleep(1)
logger.info('Waiting for HID devices... Please connect one JoyCon (left OR right), '
'or a Pro Controller over Bluetooth. Note: The bluez "input" plugin needs to be enabled (default)')
controller = None
while controller is None:
for device in hid.enumerate(0, 0):
# looking for devices matching Nintendo's vendor id and JoyCon product id
if device['vendor_id'] == VENDOR_ID and device['product_id'] in (PRODUCT_ID_JL, PRODUCT_ID_JR, PRODUCT_ID_PC):
controller = device
break
else:
await asyncio.sleep(2)
logger.info(f'Found controller "{controller}".')
logger.info('Connecting with the Switch... Please open the "Change Grip/Order" menu.')
ctl_sock.setblocking(False)
itr_sock.setblocking(False)
ctl_sock.bind((socket.BDADDR_ANY, 17))
itr_sock.bind((socket.BDADDR_ANY, 19))
ctl_sock.listen(1)
itr_sock.listen(1)
emulated_hid = HidDevice()
# setting bluetooth adapter name and class to the device we wish to emulate
await emulated_hid.set_name(controller['product_string'])
await emulated_hid.set_class()
logger.info('Advertising the Bluetooth SDP record...')
emulated_hid.register_sdp_record(PROFILE_PATH)
emulated_hid.discoverable()
loop = asyncio.get_event_loop()
client_ctl, ctl_address = await loop.sock_accept(ctl_sock)
logger.info(f'Accepted connection at psm 17 from {ctl_address}')
client_itr, itr_address = await loop.sock_accept(itr_sock)
logger.info(f'Accepted connection at psm 19 from {itr_address}')
assert ctl_address[0] == itr_address[0]
# stop advertising
emulated_hid.discoverable(False)
relay = Relay(capture_file)
try:
with AsyncHID(path=controller['path'], loop=loop) as hid_controller:
await asyncio.gather(
asyncio.ensure_future(relay.relay_input(hid_controller, client_itr)),
asyncio.ensure_future(relay.relay_output(hid_controller, client_itr)),
)
finally:
logger.info('Stopping communication...')
client_itr.close()
client_ctl.close()
if __name__ == '__main__':
# check if root
if not os.geteuid() == 0:
raise PermissionError('Script must be run as root!')
parser = argparse.ArgumentParser()
parser.add_argument('-l', '--log', help='log file path for capturing communication')
args = parser.parse_args()
# setup logging
log.configure()
with utils.get_output(args.log, default=None) as capture_file:
loop = asyncio.get_event_loop()
loop.run_until_complete(
_main(capture_file=capture_file)
)