forked from mirror/joycontrol
reorganization - created package
This commit is contained in:
@@ -0,0 +1,75 @@
|
||||
from joycontrol import utils
|
||||
|
||||
|
||||
class ButtonState:
|
||||
"""
|
||||
Utility class to set buttons in the input report
|
||||
https://github.com/dekuNukem/Nintendo_Switch_Reverse_Engineering/blob/master/bluetooth_hid_notes.md
|
||||
Byte 0 1 2 3 4 5 6 7
|
||||
1 Y X B A SR SL R ZR
|
||||
2 Minus Plus R Stick L Stick Home Capture
|
||||
3 Down Up Right Left SR SL L ZL
|
||||
"""
|
||||
def __init__(self):
|
||||
# 3 bytes
|
||||
self._byte_1 = 0
|
||||
self._byte_2 = 0
|
||||
self._byte_3 = 0
|
||||
|
||||
# generating methods for each button
|
||||
def button_method_factory(byte, bit):
|
||||
def flip():
|
||||
setattr(self, byte, utils.flip_bit(getattr(self, byte), bit))
|
||||
|
||||
def getter():
|
||||
return utils.get_bit(getattr(self, byte), bit)
|
||||
return flip, getter
|
||||
|
||||
# byte 1
|
||||
self.y, self.y_is_set = button_method_factory('_byte_1', 0)
|
||||
self.x, self.x_is_set = button_method_factory('_byte_1', 1)
|
||||
self.b, self.b_is_set = button_method_factory('_byte_1', 2)
|
||||
self.a, self.a_is_set = button_method_factory('_byte_1', 3)
|
||||
self.right_sr, self.right_sr_is_set = button_method_factory('_byte_1', 4)
|
||||
self.right_sl, self.right_sl_is_set = button_method_factory('_byte_1', 5)
|
||||
self.r, self.r_is_set = button_method_factory('_byte_1', 6)
|
||||
self.zr, self.zr_is_set = button_method_factory('_byte_1', 7)
|
||||
|
||||
# byte 2
|
||||
self.minus, self.minus_is_set = button_method_factory('_byte_2', 0)
|
||||
self.plus, self.plus_is_set = button_method_factory('_byte_2', 1)
|
||||
self.r_stick, self.r_stick_is_set = button_method_factory('_byte_2', 2)
|
||||
self.l_stick, self.l_stick_is_set = button_method_factory('_byte_2', 3)
|
||||
self.home, self.home_is_set = button_method_factory('_byte_2', 4)
|
||||
self.capture, self.capture_is_set = button_method_factory('_byte_2', 5)
|
||||
|
||||
# byte 3
|
||||
self.down, self.down_is_set = button_method_factory('_byte_3', 0)
|
||||
self.up, self.up_is_set = button_method_factory('_byte_3', 1)
|
||||
self.right, self.right_is_set = button_method_factory('_byte_3', 2)
|
||||
self.left, self.left_is_set = button_method_factory('_byte_3', 3)
|
||||
self.left_sr, self.left_sr_is_set = button_method_factory('_byte_3', 4)
|
||||
self.left_sl, self.left_sl_is_set = button_method_factory('_byte_3', 5)
|
||||
self.l, self.l_is_set = button_method_factory('_byte_3', 6)
|
||||
self.zl, self.zl_is_set = button_method_factory('_byte_3', 7)
|
||||
|
||||
"""
|
||||
Example for generated methods: home button (byte_2, 4)
|
||||
|
||||
def home(self):
|
||||
self.byte_2 = flip_bit(self.byte_2, 4)
|
||||
|
||||
def home_is_set(self):
|
||||
return get_bit(self.byte_2, 4)
|
||||
"""
|
||||
|
||||
def __iter__(self):
|
||||
"""
|
||||
@returns iterator of the button bytes
|
||||
"""
|
||||
yield self._byte_1
|
||||
yield self._byte_2
|
||||
yield self._byte_3
|
||||
|
||||
def clear(self):
|
||||
self._byte_1 = self._byte_2 = self._byte_3 = 0
|
||||
@@ -0,0 +1,20 @@
|
||||
import enum
|
||||
|
||||
|
||||
class Controller(enum.Enum):
|
||||
JOYCON_L = 0x01
|
||||
JOYCON_R = 0x02
|
||||
PRO_CONTROLLER = 0x03
|
||||
|
||||
def device_name(self):
|
||||
"""
|
||||
:returns corresponding bluetooth device name
|
||||
"""
|
||||
if self == Controller.JOYCON_L:
|
||||
return 'Joy-Con (L)'
|
||||
elif self == Controller.JOYCON_R:
|
||||
return 'Joy-Con (R)'
|
||||
elif self == Controller.PRO_CONTROLLER:
|
||||
return 'Pro Controller'
|
||||
else:
|
||||
raise NotImplementedError()
|
||||
@@ -0,0 +1,38 @@
|
||||
import asyncio
|
||||
|
||||
from joycontrol.button_state import ButtonState
|
||||
from joycontrol.protocol import ControllerProtocol
|
||||
|
||||
|
||||
class ControllerState:
|
||||
def __init__(self, transport: asyncio.Transport, protocol: ControllerProtocol):
|
||||
super().__init__()
|
||||
self.transport = transport
|
||||
|
||||
self.protocol = protocol
|
||||
|
||||
async def send(self):
|
||||
await self.protocol.button_input_report.write(self.transport)
|
||||
|
||||
async def connect(self):
|
||||
"""
|
||||
Waits until the switch is paired with the controller and accepts button commands
|
||||
"""
|
||||
# TODO HACK: Hard to say for now.
|
||||
await self.protocol.wait_for_output_report()
|
||||
# The switch sends data to our device, it shouldn't take long until the connection is fully established.
|
||||
await asyncio.sleep(5)
|
||||
|
||||
def set_button_state(self, button_state: ButtonState):
|
||||
"""
|
||||
Sets the button status bytes in the input report
|
||||
"""
|
||||
self.protocol.button_input_report.set_button_status(button_state)
|
||||
|
||||
def set_stick_state(self):
|
||||
"""
|
||||
TODO
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
@@ -0,0 +1,50 @@
|
||||
import logging
|
||||
import uuid
|
||||
|
||||
import dbus
|
||||
|
||||
from joycontrol import utils
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class HidDevice:
|
||||
_HID_UUID = '00001124-0000-1000-8000-00805f9b34fb'
|
||||
_HID_PATH = '/bluez/switch/hid'
|
||||
|
||||
def __init__(self):
|
||||
self._uuid = str(uuid.uuid4())
|
||||
|
||||
# Setting up dbus to advertise the service record
|
||||
bus = dbus.SystemBus()
|
||||
obj = bus.get_object('org.bluez', '/org/bluez/hci0')
|
||||
self.adapter = dbus.Interface(obj, 'org.bluez.Adapter1')
|
||||
self.properties = dbus.Interface(self.adapter, 'org.freedesktop.DBus.Properties')
|
||||
|
||||
def discoverable(self, boolean=True):
|
||||
#self.properties.Set(self.adapter.dbus_interface, 'Powered', True)
|
||||
self.properties.Set(self.adapter.dbus_interface, 'Discoverable', boolean)
|
||||
|
||||
async def set_class(self, cls=0x002508):
|
||||
"""
|
||||
:param cls: default 0x002508 (Gamepad/joystick device class)
|
||||
"""
|
||||
logger.info(f'setting device class to {cls}...')
|
||||
await utils.run_system_command(f'hciconfig hci0 class {cls}')
|
||||
|
||||
async def set_name(self, name: str):
|
||||
logger.info(f'setting device name to {name}...')
|
||||
await utils.run_system_command(f'hciconfig hci0 name "{name}"')
|
||||
|
||||
def register_sdp_record(self, record_path):
|
||||
with open(record_path) as record:
|
||||
opts = {
|
||||
'ServiceRecord': record.read(),
|
||||
'Role': 'server',
|
||||
'Service': self._HID_UUID,
|
||||
'RequireAuthentication': False,
|
||||
'RequireAuthorization': False
|
||||
}
|
||||
bus = dbus.SystemBus()
|
||||
manager = dbus.Interface(bus.get_object("org.bluez", "/org/bluez"), "org.bluez.ProfileManager1")
|
||||
manager.RegisterProfile(self._HID_PATH, self._uuid, opts)
|
||||
@@ -0,0 +1,64 @@
|
||||
import logging
|
||||
import datetime
|
||||
|
||||
|
||||
def configure(console_level=logging.DEBUG, file_level=logging.DEBUG, logfile_name=None):
|
||||
"""
|
||||
Configures logging formatting
|
||||
|
||||
:param console_level: log level of console logger
|
||||
:param file_level: log lever of file logger
|
||||
:param logfile_name: name of logfile
|
||||
"""
|
||||
root_logger = logging.getLogger()
|
||||
root_logger.setLevel(logging.DEBUG)
|
||||
|
||||
formatter = logging.Formatter(
|
||||
"[%(asctime)s] %(name)s %(funcName)s::%(lineno)s %(levelname)s - %(message)s",
|
||||
"%H:%M:%S"
|
||||
)
|
||||
|
||||
# create console logger
|
||||
console_handler = logging.StreamHandler()
|
||||
console_handler.setFormatter(formatter)
|
||||
console_handler.setLevel(console_level)
|
||||
|
||||
root_logger.addHandler(console_handler)
|
||||
|
||||
# create file logger
|
||||
if logfile_name is not None:
|
||||
today = datetime.datetime.now()
|
||||
name_of_file = today.strftime(f'%Y-%m-%d_%H-%M_{logfile_name}.log')
|
||||
|
||||
file_handler = logging.FileHandler(name_of_file)
|
||||
file_handler.setLevel(file_level)
|
||||
file_handler.setFormatter(formatter)
|
||||
|
||||
root_logger.addHandler(file_handler)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Run test output on stdout
|
||||
configure()
|
||||
|
||||
logger = logging.getLogger("test")
|
||||
|
||||
def test():
|
||||
logger.debug("debug msg")
|
||||
logger.info("info msg")
|
||||
logger.warning("warning msg")
|
||||
|
||||
def test2():
|
||||
logger.error("error msg")
|
||||
logger.critical("critical msg")
|
||||
|
||||
# test debug, info, warning
|
||||
test()
|
||||
# test error, critical
|
||||
test2()
|
||||
|
||||
# test exceptions
|
||||
try:
|
||||
raise RuntimeError("It's a trap!")
|
||||
except Exception as e:
|
||||
logger.exception(e)
|
||||
@@ -0,0 +1,114 @@
|
||||
<?xml version="1.0" encoding="UTF-8" ?>
|
||||
<record>
|
||||
<attribute id="0x0001">
|
||||
<sequence>
|
||||
<uuid value="0x1124"/>
|
||||
</sequence>
|
||||
</attribute>
|
||||
<attribute id="0x0004">
|
||||
<sequence>
|
||||
<sequence>
|
||||
<uuid value="0x0100"/>
|
||||
<uint16 value="0x0011"/>
|
||||
</sequence>
|
||||
<sequence>
|
||||
<uuid value="0x0011"/>
|
||||
</sequence>
|
||||
</sequence>
|
||||
</attribute>
|
||||
<attribute id="0x0005">
|
||||
<sequence>
|
||||
<uuid value="0x1002"/>
|
||||
</sequence>
|
||||
</attribute>
|
||||
<attribute id="0x0006">
|
||||
<sequence>
|
||||
<uint16 value="0x656e"/>
|
||||
<uint16 value="0x006a"/>
|
||||
<uint16 value="0x0100"/>
|
||||
</sequence>
|
||||
</attribute>
|
||||
<attribute id="0x0009">
|
||||
<sequence>
|
||||
<sequence>
|
||||
<uuid value="0x1124"/>
|
||||
<uint16 value="0x0100"/>
|
||||
</sequence>
|
||||
</sequence>
|
||||
</attribute>
|
||||
<attribute id="0x000d">
|
||||
<sequence>
|
||||
<sequence>
|
||||
<sequence>
|
||||
<uuid value="0x0100"/>
|
||||
<uint16 value="0x0013"/>
|
||||
</sequence>
|
||||
<sequence>
|
||||
<uuid value="0x0011"/>
|
||||
</sequence>
|
||||
</sequence>
|
||||
</sequence>
|
||||
</attribute>
|
||||
<attribute id="0x0100">
|
||||
<text value="Wireless Gamepad"/>
|
||||
</attribute>
|
||||
<attribute id="0x0101">
|
||||
<text value="Gamepad"/>
|
||||
</attribute>
|
||||
<attribute id="0x0102">
|
||||
<text value="Nintendo"/>
|
||||
</attribute>
|
||||
<attribute id="0x0200">
|
||||
<uint16 value="0x0100"/>
|
||||
</attribute>
|
||||
<attribute id="0x0201">
|
||||
<uint16 value="0x0111"/>
|
||||
</attribute>
|
||||
<attribute id="0x0202">
|
||||
<uint8 value="0x08"/>
|
||||
</attribute>
|
||||
<attribute id="0x0203">
|
||||
<uint8 value="0x00"/>
|
||||
</attribute>
|
||||
<attribute id="0x0204">
|
||||
<boolean value="true"/>
|
||||
</attribute>
|
||||
<attribute id="0x0205">
|
||||
<boolean value="true"/>
|
||||
</attribute>
|
||||
<attribute id="0x0206">
|
||||
<sequence>
|
||||
<sequence>
|
||||
<uint8 value="0x22"/>
|
||||
<text encoding="hex"
|
||||
value="050115000904a1018530050105091901290a150025017501950a5500650081020509190b290e150025017501950481027501950281030b01000100a1000b300001000b310001000b320001000b35000100150027ffff0000751095048102c00b39000100150025073500463b0165147504950181020509190f2912150025017501950481027508953481030600ff852109017508953f8103858109027508953f8103850109037508953f9183851009047508953f9183858009057508953f9183858209067508953f9183c0"/>
|
||||
</sequence>
|
||||
</sequence>
|
||||
</attribute>
|
||||
<attribute id="0x0207">
|
||||
<sequence>
|
||||
<sequence>
|
||||
<uint16 value="0x0409"/>
|
||||
<uint16 value="0x0100"/>
|
||||
</sequence>
|
||||
</sequence>
|
||||
</attribute>
|
||||
<attribute id="0x020b">
|
||||
<uint16 value="0x0100"/>
|
||||
</attribute>
|
||||
<attribute id="0x020c">
|
||||
<uint16 value="0x0c80"/>
|
||||
</attribute>
|
||||
<attribute id="0x020d">
|
||||
<boolean value="false"/>
|
||||
</attribute>
|
||||
<attribute id="0x020e">
|
||||
<boolean value="true"/>
|
||||
</attribute>
|
||||
<attribute id="0x020f">
|
||||
<uint16 value="0x0640"/>
|
||||
</attribute>
|
||||
<attribute id="0x0210">
|
||||
<uint16 value="0x0320"/>
|
||||
</attribute>
|
||||
</record>
|
||||
@@ -0,0 +1,122 @@
|
||||
import asyncio
|
||||
import logging
|
||||
from asyncio import BaseTransport, BaseProtocol
|
||||
from typing import Optional, Union, Tuple, Text
|
||||
|
||||
from joycontrol.controller import Controller
|
||||
from joycontrol.report import OutputReport, SubCommand, InputReport
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def controller_protocol_factory(controller: Controller):
|
||||
def create_controller_protocol():
|
||||
return ControllerProtocol(controller)
|
||||
return create_controller_protocol
|
||||
|
||||
|
||||
class ControllerProtocol(BaseProtocol):
|
||||
def __init__(self, controller: Controller):
|
||||
self.controller = controller
|
||||
|
||||
self.transport = None
|
||||
|
||||
# This must always be an 0x21 input report to be compatible with button events
|
||||
self.button_input_report = InputReport()
|
||||
self.button_input_report.set_input_report_id(0x21)
|
||||
self.button_input_report.set_misc()
|
||||
|
||||
self._data_received = asyncio.Event()
|
||||
|
||||
async def wait_for_output_report(self):
|
||||
self._data_received.clear()
|
||||
await self._data_received.wait()
|
||||
|
||||
def connection_made(self, transport: BaseTransport) -> None:
|
||||
logger.debug('Connection established.')
|
||||
self.transport = transport
|
||||
|
||||
def connection_lost(self, exc: Optional[Exception]) -> None:
|
||||
raise NotImplementedError()
|
||||
|
||||
def error_received(self, exc: Exception) -> None:
|
||||
raise NotImplementedError()
|
||||
|
||||
async def report_received(self, data: Union[bytes, Text], addr: Tuple[str, int]) -> None:
|
||||
self._data_received.set()
|
||||
|
||||
try:
|
||||
report = OutputReport(list(data))
|
||||
except ValueError as v_err:
|
||||
logger.warning(f'Report parsing error "{v_err}" - IGNORE')
|
||||
return
|
||||
|
||||
# classify sub command
|
||||
sc_byte, sub_command = report.get_sub_command()
|
||||
logging.info(f'received output report - {sub_command}')
|
||||
if sub_command is None:
|
||||
logger.warning(f'Received output report does not contain a sub command')
|
||||
elif sub_command == SubCommand.REQUEST_DEVICE_INFO:
|
||||
await self._command_request_device_info(report)
|
||||
|
||||
elif sub_command == SubCommand.SET_SHIPMENT_STATE:
|
||||
await self._command_set_shipment_state(report)
|
||||
|
||||
elif sub_command == SubCommand.SPI_FLASH_READ:
|
||||
await self._command_spi_flash_read(report)
|
||||
|
||||
elif sub_command == SubCommand.SET_INPUT_REPORT_MODE:
|
||||
await self._command_set_input_report_mode(report)
|
||||
|
||||
elif sub_command == SubCommand.TRIGGER_BUTTONS_ELAPSED_TIME:
|
||||
await self._command_trigger_buttons_elapsed_time(report)
|
||||
|
||||
elif sub_command == SubCommand.NOT_IMPLEMENTED:
|
||||
logger.warning(f'Sub command 0x{sc_byte:02x} not implemented - ignoring')
|
||||
|
||||
async def _command_request_device_info(self, output_report):
|
||||
address = self.transport.get_extra_info('sockname')
|
||||
assert address is not None
|
||||
bd_address = list(map(lambda x: int(x, 16), address[0].split(':')))
|
||||
|
||||
self.button_input_report.set_misc()
|
||||
self.button_input_report.set_ack(0x82)
|
||||
self.button_input_report.sub_0x02_device_info(bd_address, controller=self.controller)
|
||||
|
||||
await self.button_input_report.write(self.transport)
|
||||
|
||||
async def _command_set_shipment_state(self, output_report):
|
||||
self.button_input_report.set_misc()
|
||||
self.button_input_report.set_ack(0x80)
|
||||
self.button_input_report.sub_0x08_shipment()
|
||||
|
||||
await self.button_input_report.write(self.transport)
|
||||
|
||||
async def _command_spi_flash_read(self, output_report):
|
||||
self.button_input_report.set_misc()
|
||||
self.button_input_report.set_ack(0x90)
|
||||
self.button_input_report.sub_0x10_spi_flash_read(output_report)
|
||||
|
||||
await self.button_input_report.write(self.transport)
|
||||
|
||||
async def _command_set_input_report_mode(self, output_report):
|
||||
self.button_input_report.set_misc()
|
||||
self.button_input_report.set_ack(0x80)
|
||||
self.button_input_report.sub_0x03_set_input_report_mode()
|
||||
|
||||
await self.button_input_report.write(self.transport)
|
||||
|
||||
async def _command_trigger_buttons_elapsed_time(self, output_report):
|
||||
self.button_input_report.set_misc()
|
||||
self.button_input_report.set_ack(0x83)
|
||||
self.button_input_report.sub_0x04_trigger_buttons_elapsed_time()
|
||||
|
||||
await self.button_input_report.write(self.transport)
|
||||
|
||||
async def _enable_6axis_sensor(self, output_report):
|
||||
self.button_input_report.set_misc()
|
||||
self.button_input_report.set_ack(0x80)
|
||||
|
||||
self.button_input_report.reply_to_subcommand_id(0x40)
|
||||
|
||||
await self.button_input_report.write(self.transport)
|
||||
@@ -0,0 +1,158 @@
|
||||
import asyncio
|
||||
from enum import Enum
|
||||
|
||||
from joycontrol.button_state import ButtonState
|
||||
from joycontrol.controller import Controller
|
||||
|
||||
|
||||
class InputReport:
|
||||
"""
|
||||
Class to create Input Reports. Reference:
|
||||
https://github.com/dekuNukem/Nintendo_Switch_Reverse_Engineering/blob/master/bluetooth_hid_notes.md
|
||||
"""
|
||||
def __init__(self):
|
||||
self.data = [0x00] * 51
|
||||
# all input reports are prepended with 0xA1
|
||||
self.data[0] = 0xA1
|
||||
|
||||
self.subcommand_is_set = False
|
||||
|
||||
self.is_writing = None
|
||||
|
||||
def clear_sub_command(self):
|
||||
for i in range(14, 51):
|
||||
self.data[i] = 0x00
|
||||
self.subcommand_is_set = False
|
||||
|
||||
def set_input_report_id(self, _id):
|
||||
"""
|
||||
:param _id: e.g. 0x21 Standard input reports used for sub command replies
|
||||
etc... (TODO)
|
||||
"""
|
||||
self.data[1] = _id
|
||||
|
||||
def set_timer(self, timer):
|
||||
"""
|
||||
Input report timer (0x00-0xFF), usually set by the transport
|
||||
"""
|
||||
self.data[2] = timer % 256
|
||||
|
||||
def set_misc(self):
|
||||
# battery level + connection info
|
||||
self.data[3] = 0x8E
|
||||
|
||||
def set_button_status(self, button_status: ButtonState):
|
||||
"""
|
||||
Sets the button status bytes
|
||||
"""
|
||||
self.data[4:7] = iter(button_status)
|
||||
|
||||
def set_left_analog_stick(self):
|
||||
"""
|
||||
TODO
|
||||
"""
|
||||
self.data[7:10] = [0x01, 0x18, 0x80]
|
||||
|
||||
def set_right_analog_stick(self):
|
||||
"""
|
||||
TODO
|
||||
"""
|
||||
self.data[10:13] = [0x01, 0x18, 0x80]
|
||||
|
||||
def set_vibrator_input(self):
|
||||
"""
|
||||
TODO
|
||||
"""
|
||||
self.data[13] = 0x80
|
||||
|
||||
def set_ack(self, ack):
|
||||
"""
|
||||
ACK byte for subcmd reply
|
||||
TODO
|
||||
"""
|
||||
self.data[14] = ack
|
||||
|
||||
def sub_0x02_device_info(self, mac, fm_version=(0x03, 0x48), controller=Controller.JOYCON_L):
|
||||
"""
|
||||
Sub command 0x02 request device info response.
|
||||
|
||||
:param mac: Controller MAC address in Big Endian (6 Bytes)
|
||||
:param fm_version: TODO
|
||||
:param controller: 1=Left Joy-Con, 2=Right Joy-Con, 3=Pro Controller
|
||||
"""
|
||||
if len(fm_version) != 2:
|
||||
raise ValueError('Firmware version must consist of 2 bytes!')
|
||||
elif len(mac) != 6:
|
||||
raise ValueError('Bluetooth mac address must consist of 6 bytes!')
|
||||
|
||||
self.reply_to_subcommand_id(0x02)
|
||||
|
||||
# sub command reply data
|
||||
offset = 16
|
||||
self.data[offset: offset + 2] = fm_version
|
||||
self.data[offset + 2] = controller.value
|
||||
self.data[offset + 3] = 0x02
|
||||
self.data[offset + 4: offset + 10] = mac
|
||||
self.data[offset + 10] = 0x01
|
||||
self.data[offset + 11] = 0x01
|
||||
|
||||
def reply_to_subcommand_id(self, id_):
|
||||
self.subcommand_is_set = True
|
||||
self.data[15] = id_
|
||||
|
||||
def sub_0x08_shipment(self):
|
||||
self.reply_to_subcommand_id(0x08)
|
||||
|
||||
def sub_0x10_spi_flash_read(self, output_report):
|
||||
self.reply_to_subcommand_id(0x10)
|
||||
self.data[16:18] = output_report.data[12:14]
|
||||
|
||||
def sub_0x03_set_input_report_mode(self):
|
||||
self.reply_to_subcommand_id(0x03)
|
||||
|
||||
def sub_0x04_trigger_buttons_elapsed_time(self):
|
||||
self.reply_to_subcommand_id(0x04)
|
||||
|
||||
# TODO
|
||||
blub = [0x00, 0xCC, 0x00, 0xEE, 0x00, 0xFF]
|
||||
self.data[16:22] = blub
|
||||
|
||||
async def write(self, transport):
|
||||
if self.is_writing is None:
|
||||
self.is_writing = asyncio.ensure_future(transport.write(self))
|
||||
await self.is_writing
|
||||
self.is_writing = None
|
||||
|
||||
def __bytes__(self):
|
||||
if self.subcommand_is_set:
|
||||
return bytes(self.data)
|
||||
else:
|
||||
return bytes(self.data[:15])
|
||||
|
||||
|
||||
class SubCommand(Enum):
|
||||
REQUEST_DEVICE_INFO = 0x02
|
||||
SET_INPUT_REPORT_MODE = 0x03
|
||||
TRIGGER_BUTTONS_ELAPSED_TIME = 0x04
|
||||
SET_SHIPMENT_STATE = 0x08
|
||||
SPI_FLASH_READ = 0x10
|
||||
ENABLE_6AXIS_SENSOR = 0x40
|
||||
NOT_IMPLEMENTED = 0xFF
|
||||
|
||||
|
||||
class OutputReport:
|
||||
def __init__(self, data):
|
||||
if data[0] != 0xA2:
|
||||
raise ValueError('Output reports must start with 0xA2')
|
||||
self.data = data
|
||||
|
||||
def get_sub_command(self):
|
||||
if len(self.data) < 12:
|
||||
return None, None
|
||||
try:
|
||||
return self.data[11], SubCommand(self.data[11])
|
||||
except ValueError:
|
||||
return self.data[11], SubCommand.NOT_IMPLEMENTED
|
||||
|
||||
def __bytes__(self):
|
||||
return bytes(self.data)
|
||||
@@ -0,0 +1,72 @@
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
import socket
|
||||
|
||||
import joycontrol
|
||||
from joycontrol import utils
|
||||
from joycontrol.device import HidDevice
|
||||
from joycontrol.report import InputReport
|
||||
from joycontrol.transport import L2CAP_Transport
|
||||
|
||||
PROFILE_PATH = os.path.join(os.path.dirname(joycontrol.__file__), 'profile/sdp_record_hid.xml')
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def _send_empty_input_reports(transport):
|
||||
report = InputReport()
|
||||
|
||||
while True:
|
||||
await transport.write(report)
|
||||
await asyncio.sleep(1)
|
||||
|
||||
|
||||
async def create_hid_server(protocol_factory, ctl_psm, itr_psm):
|
||||
ctl_sock = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP)
|
||||
itr_sock = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP)
|
||||
|
||||
# for some reason we need to restart bluetooth here, the Switch does not connect to the sockets if we don't...
|
||||
logger.info('Restarting bluetooth service...')
|
||||
await utils.run_system_command('systemctl restart bluetooth.service')
|
||||
await asyncio.sleep(1)
|
||||
|
||||
ctl_sock.setblocking(False)
|
||||
itr_sock.setblocking(False)
|
||||
|
||||
ctl_sock.bind((socket.BDADDR_ANY, ctl_psm))
|
||||
itr_sock.bind((socket.BDADDR_ANY, itr_psm))
|
||||
|
||||
ctl_sock.listen(1)
|
||||
itr_sock.listen(1)
|
||||
|
||||
protocol = protocol_factory()
|
||||
|
||||
hid = HidDevice()
|
||||
# setting bluetooth adapter name and class to the device we wish to emulate
|
||||
await hid.set_name(protocol.controller.device_name())
|
||||
await hid.set_class()
|
||||
|
||||
logger.info('Advertising the Bluetooth SDP record...')
|
||||
hid.register_sdp_record(PROFILE_PATH)
|
||||
hid.discoverable()
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
client_ctl, ctl_address = await loop.sock_accept(ctl_sock)
|
||||
logger.info(f'Accepted connection at psm {ctl_psm} from {ctl_address}')
|
||||
client_itr, itr_address = await loop.sock_accept(itr_sock)
|
||||
logger.info(f'Accepted connection at psm {itr_psm} from {itr_address}')
|
||||
assert ctl_address[0] == itr_address[0]
|
||||
|
||||
transport = L2CAP_Transport(asyncio.get_event_loop(), protocol, client_itr, 50)
|
||||
protocol.connection_made(transport)
|
||||
|
||||
# send some empty input reports until the switch decides to reply
|
||||
future = asyncio.ensure_future(_send_empty_input_reports(transport))
|
||||
await protocol.wait_for_output_report()
|
||||
future.cancel()
|
||||
try:
|
||||
await future
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
return transport, protocol
|
||||
@@ -0,0 +1,100 @@
|
||||
import asyncio
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
from joycontrol.report import InputReport
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class L2CAP_Transport(asyncio.Transport):
|
||||
def __init__(self, loop, protocol, l2cap_socket, read_buffer_size) -> None:
|
||||
self._loop = loop
|
||||
self._protocol = protocol
|
||||
|
||||
self._sock = l2cap_socket
|
||||
self._read_buffer_size = read_buffer_size
|
||||
|
||||
self._extra_info = {
|
||||
'peername': self._sock.getpeername(),
|
||||
'sockname': self._sock.getsockname()
|
||||
}
|
||||
|
||||
self._read_thread = asyncio.ensure_future(self._read())
|
||||
|
||||
self._is_closing = False
|
||||
self._is_reading = asyncio.Event()
|
||||
self._is_reading.set()
|
||||
|
||||
self._input_report_timer = 0x00
|
||||
|
||||
async def _read(self):
|
||||
while True:
|
||||
await self._is_reading.wait()
|
||||
|
||||
data = await self._loop.sock_recv(self._sock, self._read_buffer_size)
|
||||
logger.debug(f'received "{list(map(hex, list(data)))}"')
|
||||
await self._protocol.report_received(data, self._sock.getpeername())
|
||||
|
||||
def is_reading(self) -> bool:
|
||||
return self._is_reading.is_set()
|
||||
|
||||
def pause_reading(self) -> None:
|
||||
self._is_reading.clear()
|
||||
|
||||
def resume_reading(self) -> None:
|
||||
self._is_reading.set()
|
||||
|
||||
def set_read_buffer_size(self, size):
|
||||
self._read_buffer_size = size
|
||||
|
||||
def set_write_buffer_limits(self, high: int = ..., low: int = ...) -> None:
|
||||
super().set_write_buffer_limits(high, low)
|
||||
|
||||
def get_write_buffer_size(self) -> int:
|
||||
return super().get_write_buffer_size()
|
||||
|
||||
async def write(self, data: Any) -> None:
|
||||
if isinstance(data, bytes):
|
||||
_bytes = data
|
||||
elif isinstance(data, InputReport):
|
||||
# set timer byte of input report
|
||||
data.set_timer(self._input_report_timer)
|
||||
self._input_report_timer = (self._input_report_timer + 1) % 256
|
||||
_bytes = bytes(data)
|
||||
|
||||
if data.subcommand_is_set:
|
||||
data.clear_sub_command()
|
||||
else:
|
||||
raise ValueError('data must be bytes or InputReport')
|
||||
|
||||
logger.debug(f'sending "{_bytes}"')
|
||||
await self._loop.sock_sendall(self._sock, _bytes)
|
||||
|
||||
def abort(self) -> None:
|
||||
super().abort()
|
||||
|
||||
def get_extra_info(self, name: Any, default=None) -> Any:
|
||||
return self._extra_info.get(name, default)
|
||||
|
||||
def is_closing(self) -> bool:
|
||||
return self._is_closing
|
||||
|
||||
async def close(self):
|
||||
"""
|
||||
Stops socket reader and closes socket
|
||||
"""
|
||||
self._is_closing = True
|
||||
self._read_thread.cancel()
|
||||
# wait for reader to cancel
|
||||
try:
|
||||
await self._read_thread
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
self._sock.close()
|
||||
|
||||
def set_protocol(self, protocol: asyncio.BaseProtocol) -> None:
|
||||
self._protocol = protocol
|
||||
|
||||
def get_protocol(self) -> asyncio.BaseProtocol:
|
||||
return self._protocol
|
||||
@@ -0,0 +1,42 @@
|
||||
import asyncio
|
||||
import logging
|
||||
import re
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def get_bit(value, n):
|
||||
return (value >> n & 1) != 0
|
||||
|
||||
|
||||
def flip_bit(value, n):
|
||||
return value ^ (1 << n)
|
||||
|
||||
|
||||
async def run_system_command(cmd):
|
||||
proc = await asyncio.create_subprocess_shell(
|
||||
cmd,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE)
|
||||
|
||||
stdout, stderr = await proc.communicate()
|
||||
|
||||
logger.debug(f'[{cmd!r} exited with {proc.returncode}]')
|
||||
if stdout:
|
||||
logger.debug(f'[stdout]\n{stdout.decode()}')
|
||||
if stderr:
|
||||
logger.debug(f'[stderr]\n{stderr.decode()}')
|
||||
|
||||
return proc.returncode, stdout, stderr
|
||||
|
||||
"""
|
||||
async def get_bt_mac_address(dev=0):
|
||||
ret, stdout, stderr = await run_system_command(f'hciconfig hci{dev}')
|
||||
# TODO: Process error handling
|
||||
|
||||
match = re.search(r'BD Address: (?P<mac>\w\w:\w\w:\w\w:\w\w:\w\w:\w\w)', stdout.decode('UTF-8'))
|
||||
if match:
|
||||
return list(map(lambda x: int(x, 16), match.group('mac').split(':')))
|
||||
else:
|
||||
raise ValueError(f'BD Address not found in "{stdout}"')
|
||||
"""
|
||||
Reference in New Issue
Block a user