import logging import os import re import subprocess import time import uuid from enum import Enum, auto from time import sleep import logging_default as log import bluetooth as blt import dbus logger = logging.getLogger(__name__) # Defining the ports for the control and interrupt sockets CTL_PSM = 17 ITR_PSM = 19 class HidDevice: _HID_UUID = '00001124-0000-1000-8000-00805f9b34fb' _HID_PATH = '/bluez/switch/hid' PRO_CONTROLLER = 'Pro Controller' JOYCON_R = 'Joy-Con (R)' JOYCON_L = 'Joy-Con (L)' def __init__(self): self._uuid = str(uuid.uuid4()) # Setting up dbus to advertise the service record bus = dbus.SystemBus() obj = bus.get_object('org.bluez', '/org/bluez/hci0') self.adapter = dbus.Interface(obj, 'org.bluez.Adapter1') self.properties = dbus.Interface(self.adapter, 'org.freedesktop.DBus.Properties') def discoverable(self, boolean=True): #self.properties.Set(self.adapter.dbus_interface, 'Powered', True) self.properties.Set(self.adapter.dbus_interface, 'Discoverable', boolean) def set_class(self, cls=0x002508): """ :param cls: default 0x002508 (Gamepad/joystick device class) """ logger.info(f'setting device class to {cls}...') subprocess.call(['hciconfig', 'hci0', 'class', str(cls)]) def set_name(self, name: str): logger.info(f'setting device name to {name}...') subprocess.call(['hciconfig', 'hci0', 'name', name]) def register_sdp_record(self, record_path): with open(record_path) as record: opts = { 'ServiceRecord': record.read(), 'Role': 'server', 'Service': self._HID_UUID, 'RequireAuthentication': False, 'RequireAuthorization': False } bus = dbus.SystemBus() manager = dbus.Interface(bus.get_object("org.bluez", "/org/bluez"), "org.bluez.ProfileManager1") manager.RegisterProfile(self._HID_PATH, self._uuid, opts) class InputReport: def __init__(self): self.m1 = [0xA1, 0x21, 0x05, 0x8E, 0x84, 0x00, 0x12, 0x01, 0x18, 0x80, 0x01, 0x18, 0x80, 0x80, 0x82, 0x02, 0x03, 0x48, 0x01, 0x02, 0xDC, 0xA6, 0x32, 0x71, 0x58, 0xBB, 0x01, 0x01] self.m2 = [0xA1, 0x21, 0x06, 0x8E, 0x84, 0x00, 0x12, 0x01, 0x18, 0x80, 0x01, 0x18, 0x80, 0x80, 0x80, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00] def create(self, message): miss = 49 - len(message) message = message + miss * [0x00] assert len(message) == 49 return message class InputReport2: def __init__(self): self.data = [0x00] * 50 # all input reports are prepended with 0xA1 self.data[0] = 0xA1 def set(self, input_report_id, timer=0x00): self.data[1] = input_report_id self.data[2] = timer % 256 # battery level + connection info self.data[3] = 0x8E # Todo: Button status, analog stick data, vibrator input # ACK byte for subcmd reply self.data[14] = 0x82 # Reply-to subcommand ID self.data[14] = 0x02 def sub_0x2_device_info(self, mac, fm_version=(0x03, 0x48), controller=0x01): """ Sub command 0x02 request device info response. :param mac: Controller MAC address in Big Endian (6 Bytes) :param fm_version: TODO :param controller: 1=Left Joy-Con, 2=Right Joy-Con, 3=Pro Controller """ if len(fm_version) != 2: raise ValueError('Firmware version must consist of 2 bytes!') elif len(mac) != 6: raise ValueError('Bluetooth mac address must consist of 6 bytes!') # reply to sub command ID self.data[14] = 0x02 # sub command reply data offset = 15 self.data[offset: offset + 1] = fm_version self.data[offset + 2] = controller self.data[offset + 3] = 0x02 self.data[offset + 4: offset + 9] = mac self.data[offset + 10] = 0x01 self.data[offset + 11] = 0x01 def __bytes__(self): return bytes(self.data) class SubCommand(Enum): REQUEST_DEVICE_INFO = auto() NOT_IMPLEMENTED = auto() class OutputReport: def __init__(self, data): if data[0] != 0xA2: raise ValueError('Output reports must start with 0xA2') self.data = data def sub_command(self): if self.data[11] == 0x02: return SubCommand.REQUEST_DEVICE_INFO else: return None def __bytes__(self): return bytes(self.data) def get_bt_mac_address(dev=0): output = subprocess.check_output(['hciconfig', f'hci{dev}'], encoding='UTF-8') match = re.search(r'BD Address: (?P\w\w:\w\w:\w\w:\w\w:\w\w:\w\w)', output) if match: return list(map(lambda x: int(x, 16), match.group('mac').split(':'))) else: raise ValueError(f'BD Address not found in "{output}"') def pair_switch(client_itr, own_bd_mac_address): while True: in_report = InputReport2() reply = None while reply is None: # It seems like we have to initiate the conversation, so send and empty input report client_itr.send(bytes(in_report)) sleep(.1) try: reply = client_itr.recv(50) except blt.btcommon.BluetoothError as bt_err: print(bt_err) out_report = OutputReport(list(reply)) # DEVIVCE INFO REQUEST sub_command = out_report.sub_command() if sub_command is None: logger.error(f'No sub command found in "{reply}"') continue elif sub_command == SubCommand.REQUEST_DEVICE_INFO: # send device info device_info_in_report = InputReport2() device_info_in_report.sub_0x2_device_info(own_bd_mac_address) logger.info('Sending device info...') client_itr.send(bytes(device_info_in_report)) elif sub_command == SubCommand.NOT_IMPLEMENTED: logger.error(f'Sub command not implemented of "{reply}"') continue # awaiting Subcommand 0x08: Set shipment reply = None while reply is None: try: reply = client_itr.recv(50) except blt.btcommon.BluetoothError as bt_err: print(bt_err) sleep(.5) print(reply) return True return False if __name__ == '__main__': import argparse parser = argparse.ArgumentParser(description='open bluetooth hid socket') # parser.add_argument('port', type=int, help='socket port') args = parser.parse_args() # setup logging log.configure() # check if root if not os.geteuid() == 0: raise PermissionError('Script must be run as root!') # Creating L2CAP sockets for control and interrupt channel ctl_sock = blt.BluetoothSocket(blt.L2CAP) itr_sock = blt.BluetoothSocket(blt.L2CAP) logger.info('Restarting bluetooth service...') os.system('systemctl restart bluetooth.service') time.sleep(1) logger.info(f'Binding control channel to {CTL_PSM}...') ctl_sock.bind(("", CTL_PSM)) logger.info(f'Binding interrupt channel to {ITR_PSM}...') itr_sock.bind(("", ITR_PSM)) logger.info('start listening on the server sockets') ctl_sock.listen(1) # Limit of 1 connection itr_sock.listen(1) hid = HidDevice() # setting bluetooth adapter name and class to the device we wish to emulate hid.set_name(HidDevice.JOYCON_L) hid.set_class() logger.info('Advertising the Bluetooth SDP record...') hid.register_sdp_record('profile/sdp_record_hid_pro.xml') hid.discoverable() logger.info('Waiting for connection...') client_ctl, address = ctl_sock.accept() logger.info(f'Accepted connection at {CTL_PSM} from {address}') client_itr, address = itr_sock.accept() client_itr.settimeout(0) logger.info(f'Accepted connection at {ITR_PSM} from {address}') """ data = [0] * 49 data[0] = 0xA1 hello = 0 ip = InputReport() for i in range(100): logger.info(f'sending data {data}...') client_itr.send(bytes(data)) try: print("received", client_itr.recv(49)) hello += 1 except blt.btcommon.BluetoothError as bt_err: print(bt_err) if hello == 1: data = ip.create(ip.m1) print("GO 1") elif hello == 2: data = ip.create(ip.m2) print("GO 2") sleep(0.5) """ pair_switch(client_itr, get_bt_mac_address()) itr_sock.close() ctl_sock.close()