From b2187e297fe6e941ef86ba92b3202c04c87d6506 Mon Sep 17 00:00:00 2001 From: Robert Martin Date: Wed, 29 Jan 2020 00:38:39 +0900 Subject: [PATCH] cleanup --- connect_test.py | 284 ----------------------------------------- run_and_pair_switch.py | 3 +- 2 files changed, 1 insertion(+), 286 deletions(-) delete mode 100644 connect_test.py diff --git a/connect_test.py b/connect_test.py deleted file mode 100644 index b437c7f..0000000 --- a/connect_test.py +++ /dev/null @@ -1,284 +0,0 @@ -import logging -import os -import re -import subprocess -import time -import uuid -from enum import Enum, auto -from time import sleep -import logging_default as log - -import bluetooth as blt -import dbus - -logger = logging.getLogger(__name__) - -# Defining the ports for the control and interrupt sockets -CTL_PSM = 17 -ITR_PSM = 19 - - -class HidDevice: - _HID_UUID = '00001124-0000-1000-8000-00805f9b34fb' - _HID_PATH = '/bluez/switch/hid' - - PRO_CONTROLLER = 'Pro Controller' - JOYCON_R = 'Joy-Con (R)' - JOYCON_L = 'Joy-Con (L)' - - def __init__(self): - self._uuid = str(uuid.uuid4()) - - # Setting up dbus to advertise the service record - bus = dbus.SystemBus() - obj = bus.get_object('org.bluez', '/org/bluez/hci0') - self.adapter = dbus.Interface(obj, 'org.bluez.Adapter1') - self.properties = dbus.Interface(self.adapter, 'org.freedesktop.DBus.Properties') - - def discoverable(self, boolean=True): - #self.properties.Set(self.adapter.dbus_interface, 'Powered', True) - self.properties.Set(self.adapter.dbus_interface, 'Discoverable', boolean) - - def set_class(self, cls=0x002508): - """ - :param cls: default 0x002508 (Gamepad/joystick device class) - """ - logger.info(f'setting device class to {cls}...') - subprocess.call(['hciconfig', 'hci0', 'class', str(cls)]) - - def set_name(self, name: str): - logger.info(f'setting device name to {name}...') - subprocess.call(['hciconfig', 'hci0', 'name', name]) - - def register_sdp_record(self, record_path): - with open(record_path) as record: - opts = { - 'ServiceRecord': record.read(), - 'Role': 'server', - 'Service': self._HID_UUID, - 'RequireAuthentication': False, - 'RequireAuthorization': False - } - bus = dbus.SystemBus() - manager = dbus.Interface(bus.get_object("org.bluez", "/org/bluez"), "org.bluez.ProfileManager1") - manager.RegisterProfile(self._HID_PATH, self._uuid, opts) - - -class InputReport: - def __init__(self): - self.m1 = [0xA1, 0x21, 0x05, 0x8E, 0x84, 0x00, 0x12, 0x01, 0x18, 0x80, 0x01, 0x18, 0x80, 0x80, 0x82, 0x02, 0x03, 0x48, 0x01, 0x02, 0xDC, 0xA6, 0x32, 0x71, 0x58, 0xBB, 0x01, 0x01] - self.m2 = [0xA1, 0x21, 0x06, 0x8E, 0x84, 0x00, 0x12, 0x01, 0x18, 0x80, 0x01, 0x18, 0x80, 0x80, 0x80, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00] - - def create(self, message): - miss = 49 - len(message) - message = message + miss * [0x00] - assert len(message) == 49 - return message - - -class InputReport2: - def __init__(self): - self.data = [0x00] * 50 - # all input reports are prepended with 0xA1 - self.data[0] = 0xA1 - - def set(self, input_report_id, timer=0x00): - self.data[1] = input_report_id - self.data[2] = timer % 256 - # battery level + connection info - self.data[3] = 0x8E - - # Todo: Button status, analog stick data, vibrator input - - # ACK byte for subcmd reply - self.data[14] = 0x82 - - # Reply-to subcommand ID - self.data[14] = 0x02 - - def sub_0x2_device_info(self, mac, fm_version=(0x03, 0x48), controller=0x01): - """ - Sub command 0x02 request device info response. - - :param mac: Controller MAC address in Big Endian (6 Bytes) - :param fm_version: TODO - :param controller: 1=Left Joy-Con, 2=Right Joy-Con, 3=Pro Controller - """ - if len(fm_version) != 2: - raise ValueError('Firmware version must consist of 2 bytes!') - elif len(mac) != 6: - raise ValueError('Bluetooth mac address must consist of 6 bytes!') - - # reply to sub command ID - self.data[14] = 0x02 - - # sub command reply data - offset = 15 - self.data[offset: offset + 1] = fm_version - self.data[offset + 2] = controller - self.data[offset + 3] = 0x02 - self.data[offset + 4: offset + 9] = mac - self.data[offset + 10] = 0x01 - self.data[offset + 11] = 0x01 - - def __bytes__(self): - return bytes(self.data) - - -class SubCommand(Enum): - REQUEST_DEVICE_INFO = auto() - NOT_IMPLEMENTED = auto() - - -class OutputReport: - def __init__(self, data): - if data[0] != 0xA2: - raise ValueError('Output reports must start with 0xA2') - self.data = data - - def sub_command(self): - if self.data[11] == 0x02: - return SubCommand.REQUEST_DEVICE_INFO - else: - return None - - def __bytes__(self): - return bytes(self.data) - - -def get_bt_mac_address(dev=0): - output = subprocess.check_output(['hciconfig', f'hci{dev}'], encoding='UTF-8') - match = re.search(r'BD Address: (?P\w\w:\w\w:\w\w:\w\w:\w\w:\w\w)', output) - if match: - return list(map(lambda x: int(x, 16), match.group('mac').split(':'))) - else: - raise ValueError(f'BD Address not found in "{output}"') - - -def pair_switch(client_itr, own_bd_mac_address): - while True: - in_report = InputReport2() - - reply = None - while reply is None: - # It seems like we have to initiate the conversation, so send and empty input report - client_itr.send(bytes(in_report)) - sleep(.1) - try: - reply = client_itr.recv(50) - except blt.btcommon.BluetoothError as bt_err: - print(bt_err) - - out_report = OutputReport(list(reply)) - - # DEVIVCE INFO REQUEST - sub_command = out_report.sub_command() - if sub_command is None: - logger.error(f'No sub command found in "{reply}"') - continue - elif sub_command == SubCommand.REQUEST_DEVICE_INFO: - # send device info - device_info_in_report = InputReport2() - device_info_in_report.sub_0x2_device_info(own_bd_mac_address) - - logger.info('Sending device info...') - client_itr.send(bytes(device_info_in_report)) - elif sub_command == SubCommand.NOT_IMPLEMENTED: - logger.error(f'Sub command not implemented of "{reply}"') - continue - - # awaiting Subcommand 0x08: Set shipment - reply = None - while reply is None: - try: - reply = client_itr.recv(50) - except blt.btcommon.BluetoothError as bt_err: - print(bt_err) - sleep(.5) - - print(reply) - - return True - return False - - -if __name__ == '__main__': - import argparse - - parser = argparse.ArgumentParser(description='open bluetooth hid socket') - # parser.add_argument('port', type=int, help='socket port') - args = parser.parse_args() - - # setup logging - log.configure() - - # check if root - if not os.geteuid() == 0: - raise PermissionError('Script must be run as root!') - - # Creating L2CAP sockets for control and interrupt channel - ctl_sock = blt.BluetoothSocket(blt.L2CAP) - itr_sock = blt.BluetoothSocket(blt.L2CAP) - - logger.info('Restarting bluetooth service...') - os.system('systemctl restart bluetooth.service') - time.sleep(1) - - logger.info(f'Binding control channel to {CTL_PSM}...') - ctl_sock.bind(("", CTL_PSM)) - logger.info(f'Binding interrupt channel to {ITR_PSM}...') - itr_sock.bind(("", ITR_PSM)) - - logger.info('start listening on the server sockets') - ctl_sock.listen(1) # Limit of 1 connection - itr_sock.listen(1) - - - - hid = HidDevice() - # setting bluetooth adapter name and class to the device we wish to emulate - hid.set_name(HidDevice.JOYCON_L) - hid.set_class() - - logger.info('Advertising the Bluetooth SDP record...') - hid.register_sdp_record('profile/sdp_record_hid_pro.xml') - hid.discoverable() - - logger.info('Waiting for connection...') - client_ctl, address = ctl_sock.accept() - logger.info(f'Accepted connection at {CTL_PSM} from {address}') - client_itr, address = itr_sock.accept() - client_itr.settimeout(0) - logger.info(f'Accepted connection at {ITR_PSM} from {address}') - - """ - data = [0] * 49 - data[0] = 0xA1 - - hello = 0 - ip = InputReport() - - for i in range(100): - logger.info(f'sending data {data}...') - client_itr.send(bytes(data)) - - try: - print("received", client_itr.recv(49)) - hello += 1 - - except blt.btcommon.BluetoothError as bt_err: - print(bt_err) - - if hello == 1: - data = ip.create(ip.m1) - print("GO 1") - elif hello == 2: - data = ip.create(ip.m2) - print("GO 2") - - sleep(0.5) - """ - pair_switch(client_itr, get_bt_mac_address()) - - itr_sock.close() - ctl_sock.close() diff --git a/run_and_pair_switch.py b/run_and_pair_switch.py index 8e4444f..f222f18 100644 --- a/run_and_pair_switch.py +++ b/run_and_pair_switch.py @@ -64,10 +64,9 @@ async def send_empty_input_reports(transport): async def main(): transport, protocol = await create_hid_server(controller_protocol_factory(Controller.JOYCON_L), 17, 19) + # send some empty input reports until the switch decides to reply future = asyncio.ensure_future(send_empty_input_reports(transport)) - await protocol.wait_for_output_report() - future.cancel() try: await future