added support for multiple Bluetooth adapters

This commit is contained in:
Robert Martin
2020-03-31 00:18:38 +09:00
parent 26715f1be0
commit 44c684a224
5 changed files with 106 additions and 42 deletions
+46 -16
View File
@@ -1,6 +1,5 @@
import logging
import uuid
import dbus
from joycontrol import utils
@@ -8,45 +7,76 @@ from joycontrol import utils
logger = logging.getLogger(__name__)
HID_UUID = '00001124-0000-1000-8000-00805f9b34fb'
HID_PATH = '/bluez/switch/hid'
class HidDevice:
_HID_UUID = '00001124-0000-1000-8000-00805f9b34fb'
_HID_PATH = '/bluez/switch/hid'
def __init__(self):
self._uuid = str(uuid.uuid4())
# Setting up dbus to advertise the service record
def __init__(self, device_id=None):
bus = dbus.SystemBus()
obj = bus.get_object('org.bluez', '/org/bluez/hci0')
self.adapter = dbus.Interface(obj, 'org.bluez.Adapter1')
self.properties = dbus.Interface(self.adapter, 'org.freedesktop.DBus.Properties')
# Get Bluetooth adapter from dbus interface
manager = dbus.Interface(bus.get_object('org.bluez', '/'), 'org.freedesktop.DBus.ObjectManager')
for path, ifaces in manager.GetManagedObjects().items():
adapter_info = ifaces.get('org.bluez.Adapter1')
if adapter_info is None:
continue
elif device_id is None or device_id == adapter_info['Address'] or path.endswith(str(device_id)):
obj = bus.get_object('org.bluez', path)
self.adapter = dbus.Interface(obj, 'org.bluez.Adapter1')
self.address = adapter_info['Address']
self._adapter_name = path.split('/')[-1]
self.properties = dbus.Interface(self.adapter, 'org.freedesktop.DBus.Properties')
break
else:
raise ValueError(f'Adapter {device_id} not found.')
def get_address(self) -> str:
"""
:returns adapter Bluetooth address
"""
return self.address
def powered(self, boolean=True):
self.properties.Set(self.adapter.dbus_interface, 'Powered', boolean)
def discoverable(self, boolean=True):
"""
Make adapter discoverable, starts advertising.
"""
self.properties.Set(self.adapter.dbus_interface, 'Discoverable', boolean)
async def set_class(self, cls='0x002508'):
"""
Sets Bluetooth device class. Requires hciconfig system command.
:param cls: default 0x002508 (Gamepad/joystick device class)
"""
logger.info(f'setting device class to {cls}...')
await utils.run_system_command(f'hciconfig hci0 class {cls}')
await utils.run_system_command(f'hciconfig {self._adapter_name} class {cls}')
async def set_name(self, name: str):
"""
Set Bluetooth device name.
:param name: to set.
"""
logger.info(f'setting device name to {name}...')
await utils.run_system_command(f'hciconfig hci0 name "{name}"')
self.properties.Set(self.adapter.dbus_interface, 'Alias', name)
@staticmethod
def register_sdp_record(record_path):
_uuid = str(uuid.uuid4())
def register_sdp_record(self, record_path):
with open(record_path) as record:
opts = {
'ServiceRecord': record.read(),
'Role': 'server',
'Service': self._HID_UUID,
'Service': HID_UUID,
'RequireAuthentication': False,
'RequireAuthorization': False
}
bus = dbus.SystemBus()
manager = dbus.Interface(bus.get_object("org.bluez", "/org/bluez"), "org.bluez.ProfileManager1")
manager.RegisterProfile(self._HID_PATH, self._uuid, opts)
manager.RegisterProfile(HID_PATH, _uuid, opts)
return _uuid
+45 -14
View File
@@ -2,6 +2,7 @@ import asyncio
import logging
import socket
import dbus
import pkg_resources
from joycontrol import utils
@@ -21,38 +22,68 @@ async def _send_empty_input_reports(transport):
await asyncio.sleep(1)
async def create_hid_server(protocol_factory, ctl_psm, itr_psm, capture_file=None):
async def create_hid_server(protocol_factory, ctl_psm=17, itr_psm=19, device_id=None, capture_file=None):
"""
:param protocol_factory: Factory function returning a ControllerProtocol instance
:param ctl_psm: hid control channel port
:param itr_psm: hid interrupt channel port
:param device_id: ID of the bluetooth adapter.
Integer matching the digit in the hci* notation (e.g. hci0, hci1, ...) or
Bluetooth mac address in string notation of the adapter (e.g. "FF:FF:FF:FF:FF:FF").
If None, choose any device.
Note: Selection of adapters may currently not work if the bluez "input" plugin is enabled.
:param capture_file: opened file to log incoming and outgoing messages
:returns transport for input reports and protocol which handles incoming output reports
"""
ctl_sock = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP)
itr_sock = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP)
# HACK: To avoid incompatibilities with the bluetooth "input" plugin, we need to restart Bluetooth here.
# The Switch does not connect to the sockets if we don't.
# For more info see: https://github.com/mart1nro/joycontrol/issues/8
logger.info('Restarting bluetooth service...')
await utils.run_system_command('systemctl restart bluetooth.service')
await asyncio.sleep(1)
ctl_sock.setblocking(False)
itr_sock.setblocking(False)
ctl_sock.bind((socket.BDADDR_ANY, ctl_psm))
itr_sock.bind((socket.BDADDR_ANY, itr_psm))
try:
hid = HidDevice(device_id=device_id)
ctl_sock.bind((hid.address, ctl_psm))
itr_sock.bind((hid.address, itr_psm))
except OSError as err:
logger.warning(err)
# If the ports are already taken, this probably means that the bluez "input" plugin is enabled.
logger.warning('Fallback: Restarting bluetooth due to incompatibilities with the bluez "input" plugin. '
'Disable the plugin to avoid issues. See https://github.com/mart1nro/joycontrol/issues/8.')
# HACK: To circumvent incompatibilities with the bluetooth "input" plugin, we need to restart Bluetooth here.
# The Switch does not connect to the sockets if we don't.
# For more info see: https://github.com/mart1nro/joycontrol/issues/8
logger.info('Restarting bluetooth service...')
await utils.run_system_command('systemctl restart bluetooth.service')
await asyncio.sleep(1)
hid = HidDevice(device_id=device_id)
ctl_sock.bind((socket.BDADDR_ANY, ctl_psm))
itr_sock.bind((socket.BDADDR_ANY, itr_psm))
ctl_sock.listen(1)
itr_sock.listen(1)
protocol = protocol_factory()
hid = HidDevice()
hid.powered(True)
# setting bluetooth adapter name and class to the device we wish to emulate
await hid.set_name(protocol.controller.device_name())
await hid.set_class()
logger.info('Advertising the Bluetooth SDP record...')
hid.register_sdp_record(PROFILE_PATH)
try:
HidDevice.register_sdp_record(PROFILE_PATH)
except dbus.exceptions.DBusException as dbus_err:
# Already registered (If multiple controllers are being emulated and this method is called consecutive times)
logger.debug(dbus_err)
# start advertising
hid.discoverable()
logger.info('Waiting for Switch to connect... Please open the "Change Grip/Order" menu.')
loop = asyncio.get_event_loop()
client_ctl, ctl_address = await loop.sock_accept(ctl_sock)
logger.info(f'Accepted connection at psm {ctl_psm} from {ctl_address}')
@@ -66,7 +97,7 @@ async def create_hid_server(protocol_factory, ctl_psm, itr_psm, capture_file=Non
transport = L2CAP_Transport(asyncio.get_event_loop(), protocol, client_itr, 50, capture_file=capture_file)
protocol.connection_made(transport)
# send some empty input reports until the switch decides to reply
# send some empty input reports until the Switch decides to reply
future = asyncio.ensure_future(_send_empty_input_reports(transport))
await protocol.wait_for_output_report()
future.cancel()