implemented 0x30 input report

This commit is contained in:
Robert Martin
2020-02-05 16:37:27 +09:00
parent 6f11ee0b22
commit 7d5a4f05fe
8 changed files with 291 additions and 176 deletions
-75
View File
@@ -1,75 +0,0 @@
from joycontrol import utils
class ButtonState:
"""
Utility class to set buttons in the input report
https://github.com/dekuNukem/Nintendo_Switch_Reverse_Engineering/blob/master/bluetooth_hid_notes.md
Byte 0 1 2 3 4 5 6 7
1 Y X B A SR SL R ZR
2 Minus Plus R Stick L Stick Home Capture
3 Down Up Right Left SR SL L ZL
"""
def __init__(self):
# 3 bytes
self._byte_1 = 0
self._byte_2 = 0
self._byte_3 = 0
# generating methods for each button
def button_method_factory(byte, bit):
def flip():
setattr(self, byte, utils.flip_bit(getattr(self, byte), bit))
def getter():
return utils.get_bit(getattr(self, byte), bit)
return flip, getter
# byte 1
self.y, self.y_is_set = button_method_factory('_byte_1', 0)
self.x, self.x_is_set = button_method_factory('_byte_1', 1)
self.b, self.b_is_set = button_method_factory('_byte_1', 2)
self.a, self.a_is_set = button_method_factory('_byte_1', 3)
self.right_sr, self.right_sr_is_set = button_method_factory('_byte_1', 4)
self.right_sl, self.right_sl_is_set = button_method_factory('_byte_1', 5)
self.r, self.r_is_set = button_method_factory('_byte_1', 6)
self.zr, self.zr_is_set = button_method_factory('_byte_1', 7)
# byte 2
self.minus, self.minus_is_set = button_method_factory('_byte_2', 0)
self.plus, self.plus_is_set = button_method_factory('_byte_2', 1)
self.r_stick, self.r_stick_is_set = button_method_factory('_byte_2', 2)
self.l_stick, self.l_stick_is_set = button_method_factory('_byte_2', 3)
self.home, self.home_is_set = button_method_factory('_byte_2', 4)
self.capture, self.capture_is_set = button_method_factory('_byte_2', 5)
# byte 3
self.down, self.down_is_set = button_method_factory('_byte_3', 0)
self.up, self.up_is_set = button_method_factory('_byte_3', 1)
self.right, self.right_is_set = button_method_factory('_byte_3', 2)
self.left, self.left_is_set = button_method_factory('_byte_3', 3)
self.left_sr, self.left_sr_is_set = button_method_factory('_byte_3', 4)
self.left_sl, self.left_sl_is_set = button_method_factory('_byte_3', 5)
self.l, self.l_is_set = button_method_factory('_byte_3', 6)
self.zl, self.zl_is_set = button_method_factory('_byte_3', 7)
"""
Example for generated methods: home button (byte_2, 4)
def home(self):
self.byte_2 = flip_bit(self.byte_2, 4)
def home_is_set(self):
return get_bit(self.byte_2, 4)
"""
def __iter__(self):
"""
@returns iterator of the button bytes
"""
yield self._byte_1
yield self._byte_2
yield self._byte_3
def clear(self):
self._byte_1 = self._byte_2 = self._byte_3 = 0
+88 -23
View File
@@ -1,40 +1,100 @@
import asyncio import asyncio
from joycontrol.button_state import ButtonState from joycontrol import utils
from joycontrol.protocol import ControllerProtocol
class ControllerState: class ControllerState:
def __init__(self, transport: asyncio.Transport, protocol: ControllerProtocol): def __init__(self, protocol):
super().__init__() self._protocol = protocol
self.transport = transport
self.protocol = protocol
self.input_report = self.protocol.get_button_input_report() self.button_state = None
self.stick_state = None
self.sig_is_send = asyncio.Event()
async def send(self): async def send(self):
await self.input_report.write(self.transport) self.sig_is_send.clear()
await self.sig_is_send.wait()
async def connect(self): async def connect(self):
""" """
Waits until the switch is paired with the controller and accepts button commands Waits until the switch is paired with the controller and accepts button commands
""" """
# TODO HACK: Hard to say for now. await self._protocol.sig_wait_player_lights.wait()
await self.protocol.wait_for_output_report()
# The switch sends data to our device, it shouldn't take long until the connection is fully established.
await asyncio.sleep(5)
def set_button_state(self, button_state: ButtonState):
"""
Sets the button status bytes in the input report
"""
self.input_report.set_button_status(button_state)
def set_stick_state(self): class ButtonState:
"""
Utility class to set buttons in the input report
https://github.com/dekuNukem/Nintendo_Switch_Reverse_Engineering/blob/master/bluetooth_hid_notes.md
Byte 0 1 2 3 4 5 6 7
1 Y X B A SR SL R ZR
2 Minus Plus R Stick L Stick Home Capture
3 Down Up Right Left SR SL L ZL
"""
def __init__(self):
# 3 bytes
self._byte_1 = 0
self._byte_2 = 0
self._byte_3 = 0
# generating methods for each button
def button_method_factory(byte, bit):
def flip():
setattr(self, byte, utils.flip_bit(getattr(self, byte), bit))
def getter():
return utils.get_bit(getattr(self, byte), bit)
return flip, getter
# byte 1
self.y, self.y_is_set = button_method_factory('_byte_1', 0)
self.x, self.x_is_set = button_method_factory('_byte_1', 1)
self.b, self.b_is_set = button_method_factory('_byte_1', 2)
self.a, self.a_is_set = button_method_factory('_byte_1', 3)
self.right_sr, self.right_sr_is_set = button_method_factory('_byte_1', 4)
self.right_sl, self.right_sl_is_set = button_method_factory('_byte_1', 5)
self.r, self.r_is_set = button_method_factory('_byte_1', 6)
self.zr, self.zr_is_set = button_method_factory('_byte_1', 7)
# byte 2
self.minus, self.minus_is_set = button_method_factory('_byte_2', 0)
self.plus, self.plus_is_set = button_method_factory('_byte_2', 1)
self.r_stick, self.r_stick_is_set = button_method_factory('_byte_2', 2)
self.l_stick, self.l_stick_is_set = button_method_factory('_byte_2', 3)
self.home, self.home_is_set = button_method_factory('_byte_2', 4)
self.capture, self.capture_is_set = button_method_factory('_byte_2', 5)
# byte 3
self.down, self.down_is_set = button_method_factory('_byte_3', 0)
self.up, self.up_is_set = button_method_factory('_byte_3', 1)
self.right, self.right_is_set = button_method_factory('_byte_3', 2)
self.left, self.left_is_set = button_method_factory('_byte_3', 3)
self.left_sr, self.left_sr_is_set = button_method_factory('_byte_3', 4)
self.left_sl, self.left_sl_is_set = button_method_factory('_byte_3', 5)
self.l, self.l_is_set = button_method_factory('_byte_3', 6)
self.zl, self.zl_is_set = button_method_factory('_byte_3', 7)
"""
Example for generated methods: home button (byte_2, 4)
def home(self):
self.byte_2 = flip_bit(self.byte_2, 4)
def home_is_set(self):
return get_bit(self.byte_2, 4)
"""
def __iter__(self):
""" """
TODO @returns iterator of the button bytes
""" """
raise NotImplementedError() yield self._byte_1
yield self._byte_2
yield self._byte_3
def clear(self):
self._byte_1 = self._byte_2 = self._byte_3 = 0
async def button_push(controller_state, button, sec=0.1): async def button_push(controller_state, button, sec=0.1):
@@ -44,7 +104,7 @@ async def button_push(controller_state, button, sec=0.1):
getattr(button_state, button)() getattr(button_state, button)()
# send report # send report
controller_state.set_button_state(button_state) controller_state.button_state = button_state
await controller_state.send() await controller_state.send()
await asyncio.sleep(sec) await asyncio.sleep(sec)
@@ -52,5 +112,10 @@ async def button_push(controller_state, button, sec=0.1):
getattr(button_state, button)() getattr(button_state, button)()
# send report # send report
controller_state.set_button_state(button_state) controller_state.button_state = button_state
await controller_state.send() await controller_state.send()
class StickState:
def __init__(self):
raise NotImplementedError()
+139 -32
View File
@@ -4,6 +4,7 @@ from asyncio import BaseTransport, BaseProtocol
from typing import Optional, Union, Tuple, Text from typing import Optional, Union, Tuple, Text
from joycontrol.controller import Controller from joycontrol.controller import Controller
from joycontrol.controller_state import ControllerState
from joycontrol.report import OutputReport, SubCommand, InputReport, OutputReportID from joycontrol.report import OutputReport, SubCommand, InputReport, OutputReportID
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -21,15 +22,27 @@ class ControllerProtocol(BaseProtocol):
self.transport = None self.transport = None
# This must always be an 0x21 input report to be compatible with button events
self._button_input_report = InputReport()
self._button_input_report.set_input_report_id(0x21)
self._button_input_report.set_misc()
self._data_received = asyncio.Event() self._data_received = asyncio.Event()
def get_button_input_report(self): self._controller_state = ControllerState(self)
return self._button_input_report
self._pending_write = None
self._pending_input_report = None
self._0x30_input_report_sender = None
self.sig_wait_player_lights = asyncio.Event()
async def write(self, input_report: InputReport):
# set button and TODO: stick date
if self._controller_state.button_state is not None:
input_report.set_button_status(self._controller_state.button_state)
self._controller_state.sig_is_send.set()
await self.transport.write(input_report)
def get_controller_state(self):
return self._controller_state
async def wait_for_output_report(self): async def wait_for_output_report(self):
self._data_received.clear() self._data_received.clear()
@@ -45,6 +58,27 @@ class ControllerProtocol(BaseProtocol):
def error_received(self, exc: Exception) -> None: def error_received(self, exc: Exception) -> None:
raise NotImplementedError() raise NotImplementedError()
async def send_0x30_input_reports(self):
input_report = InputReport()
input_report.set_input_report_id(0x30)
input_report.set_misc()
while True:
# TODO: set sensor data
input_report.set_6axis_data()
await self.write(input_report)
"""
if self.controller == Controller.PRO_CONTROLLER:
# send state at 120Hz if Pro Controller
await asyncio.sleep(1 / 120)
else:
# send state at 60Hz
await asyncio.sleep(1 / 60)
"""
await asyncio.sleep(1 / 30)
async def report_received(self, data: Union[bytes, Text], addr: Tuple[str, int]) -> None: async def report_received(self, data: Union[bytes, Text], addr: Tuple[str, int]) -> None:
self._data_received.set() self._data_received.set()
@@ -87,6 +121,19 @@ class ControllerProtocol(BaseProtocol):
elif sub_command == SubCommand.TRIGGER_BUTTONS_ELAPSED_TIME: elif sub_command == SubCommand.TRIGGER_BUTTONS_ELAPSED_TIME:
await self._command_trigger_buttons_elapsed_time(report) await self._command_trigger_buttons_elapsed_time(report)
elif sub_command == SubCommand.ENABLE_6AXIS_SENSOR:
await self._command_enable_6axis_sensor(report)
elif sub_command == SubCommand.ENABLE_VIBRATION:
await self._command_enable_vibration(report)
elif sub_command == SubCommand.SET_NFC_IR_MCU_CONFIG:
await self._command_set_nfc_ir_mcu_config(report)
elif sub_command == SubCommand.SET_PLAYER_LIGHTS:
await self._command_set_player_lights(report)
else: else:
logger.warning(f'Sub command 0x{sub_command.value:02x} not implemented - ignoring') logger.warning(f'Sub command 0x{sub_command.value:02x} not implemented - ignoring')
#elif output_report_id == OutputReportID.RUMBLE_ONLY: #elif output_report_id == OutputReportID.RUMBLE_ONLY:
@@ -95,48 +142,108 @@ class ControllerProtocol(BaseProtocol):
logger.warning(f'Output report {output_report_id} not implemented - ignoring') logger.warning(f'Output report {output_report_id} not implemented - ignoring')
async def _command_request_device_info(self, output_report): async def _command_request_device_info(self, output_report):
input_report = InputReport()
input_report.set_input_report_id(0x21)
input_report.set_misc()
address = self.transport.get_extra_info('sockname') address = self.transport.get_extra_info('sockname')
assert address is not None assert address is not None
bd_address = list(map(lambda x: int(x, 16), address[0].split(':'))) bd_address = list(map(lambda x: int(x, 16), address[0].split(':')))
self._button_input_report.set_misc() input_report.set_ack(0x82)
self._button_input_report.set_ack(0x82) input_report.sub_0x02_device_info(bd_address, controller=self.controller)
self._button_input_report.sub_0x02_device_info(bd_address, controller=self.controller)
await self._button_input_report.write(self.transport) await self.write(input_report)
async def _command_set_shipment_state(self, output_report): async def _command_set_shipment_state(self, output_report):
self._button_input_report.set_misc() input_report = InputReport()
self._button_input_report.set_ack(0x80) input_report.set_input_report_id(0x21)
self._button_input_report.sub_0x08_shipment() input_report.set_misc()
await self._button_input_report.write(self.transport) input_report.set_ack(0x80)
input_report.reply_to_subcommand_id(0x08)
await self.write(input_report)
async def _command_spi_flash_read(self, output_report): async def _command_spi_flash_read(self, output_report):
self._button_input_report.set_misc() input_report = InputReport()
self._button_input_report.set_ack(0x90) input_report.set_input_report_id(0x21)
self._button_input_report.sub_0x10_spi_flash_read(output_report) input_report.set_misc()
await self._button_input_report.write(self.transport) input_report.set_ack(0x90)
input_report.sub_0x10_spi_flash_read(output_report)
await self.write(input_report)
async def _command_set_input_report_mode(self, output_report): async def _command_set_input_report_mode(self, output_report):
self._button_input_report.set_misc() if output_report.data[12] == 0x30:
self._button_input_report.set_ack(0x80) logger.info('Setting input report mode to 0x30...')
self._button_input_report.sub_0x03_set_input_report_mode() # start sending 0x30 input reports
assert self._0x30_input_report_sender is None
self._0x30_input_report_sender = asyncio.ensure_future(self.send_0x30_input_reports())
await self._button_input_report.write(self.transport) input_report = InputReport()
input_report.set_input_report_id(0x21)
input_report.set_misc()
input_report.set_ack(0x80)
input_report.reply_to_subcommand_id(0x03)
await self.write(input_report)
else:
logger.error(f'input report mode {output_report.data[12]} not implemented - ignoring request')
async def _command_trigger_buttons_elapsed_time(self, output_report): async def _command_trigger_buttons_elapsed_time(self, output_report):
self._button_input_report.set_misc() input_report = InputReport()
self._button_input_report.set_ack(0x83) input_report.set_input_report_id(0x21)
self._button_input_report.sub_0x04_trigger_buttons_elapsed_time() input_report.set_misc()
await self._button_input_report.write(self.transport) input_report.set_ack(0x83)
input_report.sub_0x04_trigger_buttons_elapsed_time()
async def _enable_6axis_sensor(self, output_report): await self.write(input_report)
self._button_input_report.set_misc()
self._button_input_report.set_ack(0x80)
self._button_input_report.reply_to_subcommand_id(0x40) async def _command_enable_6axis_sensor(self, output_report):
input_report = InputReport()
input_report.set_input_report_id(0x21)
input_report.set_misc()
await self._button_input_report.write(self.transport) input_report.set_ack(0x80)
input_report.reply_to_subcommand_id(0x40)
await self.write(input_report)
async def _command_enable_vibration(self, output_report):
input_report = InputReport()
input_report.set_input_report_id(0x21)
input_report.set_misc()
input_report.set_ack(0x80)
input_report.reply_to_subcommand_id(SubCommand.ENABLE_VIBRATION.value)
await self.write(input_report)
async def _command_set_nfc_ir_mcu_config(self, output_report):
input_report = InputReport()
input_report.set_input_report_id(0x21)
input_report.set_misc()
input_report.set_ack(0xA0)
input_report.reply_to_subcommand_id(SubCommand.SET_NFC_IR_MCU_CONFIG.value)
for i in range(16, 51):
input_report.data[i] = 0xFF
await self.write(input_report)
async def _command_set_player_lights(self, output_report):
input_report = InputReport()
input_report.set_input_report_id(0x21)
input_report.set_misc()
input_report.set_ack(0x80)
input_report.reply_to_subcommand_id(SubCommand.SET_PLAYER_LIGHTS.value)
await self.write(input_report)
self.sig_wait_player_lights.set()
+39 -35
View File
@@ -1,7 +1,5 @@
import asyncio
from enum import Enum from enum import Enum
from joycontrol.button_state import ButtonState
from joycontrol.controller import Controller from joycontrol.controller import Controller
@@ -10,27 +8,35 @@ class InputReport:
Class to create Input Reports. Reference: Class to create Input Reports. Reference:
https://github.com/dekuNukem/Nintendo_Switch_Reverse_Engineering/blob/master/bluetooth_hid_notes.md https://github.com/dekuNukem/Nintendo_Switch_Reverse_Engineering/blob/master/bluetooth_hid_notes.md
""" """
def __init__(self): def __init__(self, data=None):
self.data = [0x00] * 51 if data is None:
# all input reports are prepended with 0xA1 # TODO: not enough space for NFC/IR data input report
self.data[0] = 0xA1 self.data = [0x00] * 51
# all input reports are prepended with 0xA1
self.subcommand_is_set = False self.data[0] = 0xA1
else:
self.is_writing = None if data[0] != 0xA1:
raise ValueError('Input reports must start with 0xA1')
self.data = data
def clear_sub_command(self): def clear_sub_command(self):
"""
Clear sub command reply data of 0x21 input reports
"""
for i in range(14, 51): for i in range(14, 51):
self.data[i] = 0x00 self.data[i] = 0x00
self.subcommand_is_set = False
def set_input_report_id(self, _id): def set_input_report_id(self, _id):
""" """
:param _id: e.g. 0x21 Standard input reports used for sub command replies :param _id: e.g. 0x21 Standard input reports used for sub command replies
0x30 Input reports with IMU data instead of sub command replies
etc... (TODO) etc... (TODO)
""" """
self.data[1] = _id self.data[1] = _id
def get_input_report_id(self):
return self.data[1]
def set_timer(self, timer): def set_timer(self, timer):
""" """
Input report timer (0x00-0xFF), usually set by the transport Input report timer (0x00-0xFF), usually set by the transport
@@ -41,7 +47,7 @@ class InputReport:
# battery level + connection info # battery level + connection info
self.data[3] = 0x8E self.data[3] = 0x8E
def set_button_status(self, button_status: ButtonState): def set_button_status(self, button_status):
""" """
Sets the button status bytes Sets the button status bytes
""" """
@@ -72,7 +78,18 @@ class InputReport:
""" """
self.data[14] = ack self.data[14] = ack
def sub_0x02_device_info(self, mac, fm_version=(0x03, 0x48), controller=Controller.JOYCON_L): def set_6axis_data(self):
"""
Set accelerator and gyro of 0x30 input reports
"""
# HACK: Set all 0 for now
for i in range(14, 50):
self.data[i] = 0x00
def reply_to_subcommand_id(self, id_):
self.data[15] = id_
def sub_0x02_device_info(self, mac, fm_version=(0x04, 0x00), controller=Controller.JOYCON_L):
""" """
Sub command 0x02 request device info response. Sub command 0x02 request device info response.
@@ -96,38 +113,22 @@ class InputReport:
self.data[offset + 10] = 0x01 self.data[offset + 10] = 0x01
self.data[offset + 11] = 0x01 self.data[offset + 11] = 0x01
def reply_to_subcommand_id(self, id_):
self.subcommand_is_set = True
self.data[15] = id_
def sub_0x08_shipment(self):
self.reply_to_subcommand_id(0x08)
def sub_0x10_spi_flash_read(self, output_report): def sub_0x10_spi_flash_read(self, output_report):
self.reply_to_subcommand_id(0x10) self.reply_to_subcommand_id(0x10)
self.data[16:18] = output_report.data[12:14] self.data[16:18] = output_report.data[12:14]
def sub_0x03_set_input_report_mode(self):
self.reply_to_subcommand_id(0x03)
def sub_0x04_trigger_buttons_elapsed_time(self): def sub_0x04_trigger_buttons_elapsed_time(self):
self.reply_to_subcommand_id(0x04) self.reply_to_subcommand_id(0x04)
# TODO # TODO
blub = [0x00, 0xCC, 0x00, 0xEE, 0x00, 0xFF] blub = [0x00, 0xCC, 0x00, 0xEE, 0x00, 0xFF]
self.data[16:22] = blub self.data[16:22] = blub
async def write(self, transport):
if self.is_writing is None:
self.is_writing = asyncio.ensure_future(transport.write(self))
await self.is_writing
self.is_writing = None
def __bytes__(self): def __bytes__(self):
if self.subcommand_is_set: _id = self.get_input_report_id()
return bytes(self.data) if _id == 0x21:
return bytes(self.data[:51])
else: else:
return bytes(self.data[:15]) return bytes(self.data)
class SubCommand(Enum): class SubCommand(Enum):
@@ -136,7 +137,10 @@ class SubCommand(Enum):
TRIGGER_BUTTONS_ELAPSED_TIME = 0x04 TRIGGER_BUTTONS_ELAPSED_TIME = 0x04
SET_SHIPMENT_STATE = 0x08 SET_SHIPMENT_STATE = 0x08
SPI_FLASH_READ = 0x10 SPI_FLASH_READ = 0x10
SET_NFC_IR_MCU_CONFIG = 0x21
SET_PLAYER_LIGHTS = 0x30
ENABLE_6AXIS_SENSOR = 0x40 ENABLE_6AXIS_SENSOR = 0x40
ENABLE_VIBRATION = 0x48
class OutputReportID(Enum): class OutputReportID(Enum):
@@ -154,7 +158,7 @@ class OutputReport:
try: try:
return OutputReportID(self.data[1]) return OutputReportID(self.data[1])
except ValueError: except ValueError:
raise NotImplementedError(f'Output report id {self.data[1]}') raise NotImplementedError(f'Output report id {hex(self.data[1])} not implemented')
def get_timer(self): def get_timer(self):
return OutputReportID(self.data[2]) return OutputReportID(self.data[2])
@@ -168,7 +172,7 @@ class OutputReport:
try: try:
return SubCommand(self.data[11]) return SubCommand(self.data[11])
except ValueError: except ValueError:
raise NotImplementedError(f'Sub command id {self.data[11]}') raise NotImplementedError(f'Sub command id {hex(self.data[11])} not implemented')
def __bytes__(self): def __bytes__(self):
return bytes(self.data) return bytes(self.data)
+2 -2
View File
@@ -21,7 +21,7 @@ async def _send_empty_input_reports(transport):
await asyncio.sleep(1) await asyncio.sleep(1)
async def create_hid_server(protocol_factory, ctl_psm, itr_psm): async def create_hid_server(protocol_factory, ctl_psm, itr_psm, capture_file=None):
ctl_sock = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP) ctl_sock = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP)
itr_sock = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP) itr_sock = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP)
@@ -57,7 +57,7 @@ async def create_hid_server(protocol_factory, ctl_psm, itr_psm):
logger.info(f'Accepted connection at psm {itr_psm} from {itr_address}') logger.info(f'Accepted connection at psm {itr_psm} from {itr_address}')
assert ctl_address[0] == itr_address[0] assert ctl_address[0] == itr_address[0]
transport = L2CAP_Transport(asyncio.get_event_loop(), protocol, client_itr, 50) transport = L2CAP_Transport(asyncio.get_event_loop(), protocol, client_itr, 50, capture_file=capture_file)
protocol.connection_made(transport) protocol.connection_made(transport)
# send some empty input reports until the switch decides to reply # send some empty input reports until the switch decides to reply
+20 -6
View File
@@ -1,5 +1,7 @@
import asyncio import asyncio
import logging import logging
import struct
import time
from typing import Any from typing import Any
from joycontrol.report import InputReport from joycontrol.report import InputReport
@@ -8,7 +10,7 @@ logger = logging.getLogger(__name__)
class L2CAP_Transport(asyncio.Transport): class L2CAP_Transport(asyncio.Transport):
def __init__(self, loop, protocol, l2cap_socket, read_buffer_size) -> None: def __init__(self, loop, protocol, l2cap_socket, read_buffer_size, capture_file=None) -> None:
self._loop = loop self._loop = loop
self._protocol = protocol self._protocol = protocol
@@ -28,12 +30,21 @@ class L2CAP_Transport(asyncio.Transport):
self._input_report_timer = 0x00 self._input_report_timer = 0x00
self._capture_file = capture_file
async def _read(self): async def _read(self):
while True: while True:
await self._is_reading.wait() await self._is_reading.wait()
data = await self._loop.sock_recv(self._sock, self._read_buffer_size) data = await self._loop.sock_recv(self._sock, self._read_buffer_size)
logger.debug(f'received "{list(map(hex, list(data)))}"')
if self._capture_file is not None:
# write data to log file
_time = struct.pack('d', time.time())
size = struct.pack('i', len(data))
self._capture_file.write(_time + size + data)
#logger.debug(f'received "{list(data)}"')
await self._protocol.report_received(data, self._sock.getpeername()) await self._protocol.report_received(data, self._sock.getpeername())
def is_reading(self) -> bool: def is_reading(self) -> bool:
@@ -62,13 +73,16 @@ class L2CAP_Transport(asyncio.Transport):
data.set_timer(self._input_report_timer) data.set_timer(self._input_report_timer)
self._input_report_timer = (self._input_report_timer + 1) % 256 self._input_report_timer = (self._input_report_timer + 1) % 256
_bytes = bytes(data) _bytes = bytes(data)
if data.subcommand_is_set:
data.clear_sub_command()
else: else:
raise ValueError('data must be bytes or InputReport') raise ValueError('data must be bytes or InputReport')
logger.debug(f'sending "{_bytes}"') if self._capture_file is not None:
# write data to log file
_time = struct.pack('d', time.time())
size = struct.pack('i', len(_bytes))
self._capture_file.write(_time + size + _bytes)
#logger.debug(f'sending "{_bytes}"')
await self._loop.sock_sendall(self._sock, _bytes) await self._loop.sock_sendall(self._sock, _bytes)
def abort(self) -> None: def abort(self) -> None:
+1 -1
View File
@@ -69,7 +69,7 @@ async def test_controller_buttons(controller_state: ControllerState):
async def main(): async def main():
transport, protocol = await create_hid_server(controller_protocol_factory(Controller.PRO_CONTROLLER), 17, 19) transport, protocol = await create_hid_server(controller_protocol_factory(Controller.PRO_CONTROLLER), 17, 19)
await test_controller_buttons(ControllerState(transport, protocol)) await test_controller_buttons(protocol.get_controller_state())
logger.info('Stopping communication...') logger.info('Stopping communication...')
await transport.close() await transport.close()
+2 -2
View File
@@ -2,7 +2,7 @@
from setuptools import setup, find_packages from setuptools import setup, find_packages
setup(name='joycontrol', setup(name='joycontrol',
version='0.1', version='0.11',
author='Robert Martin', author='Robert Martin',
author_email='martinro@informatik.hu-berlin.de', author_email='martinro@informatik.hu-berlin.de',
description='Emulate Nintendo Switch Controllers over Bluetooth', description='Emulate Nintendo Switch Controllers over Bluetooth',
@@ -10,7 +10,7 @@ setup(name='joycontrol',
package_data={'joycontrol': ['profile/sdp_record_hid.xml']}, package_data={'joycontrol': ['profile/sdp_record_hid.xml']},
zip_safe=False, zip_safe=False,
install_requires=[ install_requires=[
# TODO 'hid'
] ]
) )