pro controller test

This commit is contained in:
Robert Martin
2020-01-29 13:55:26 +09:00
parent b482e61809
commit 07f539743d
3 changed files with 15 additions and 18 deletions
+1 -1
View File
@@ -80,7 +80,7 @@ class ControllerProtocol(BaseProtocol):
#input_report.set_left_analog_stick() #input_report.set_left_analog_stick()
#input_report.set_right_analog_stick() #input_report.set_right_analog_stick()
#input_report.set_vibrator_input() #input_report.set_vibrator_input()
input_report.sub_0x02_device_info(bd_address) input_report.sub_0x02_device_info(bd_address, controller=self.controller)
asyncio.ensure_future(self.transport.write(input_report)) asyncio.ensure_future(self.transport.write(input_report))
+7 -5
View File
@@ -31,9 +31,11 @@ async def create_hid_server(protocol_factory, ctl_psm, itr_psm):
ctl_sock.listen(1) ctl_sock.listen(1)
itr_sock.listen(1) itr_sock.listen(1)
protocol = protocol_factory()
hid = HidDevice() hid = HidDevice()
# setting bluetooth adapter name and class to the device we wish to emulate # setting bluetooth adapter name and class to the device we wish to emulate
await hid.set_name(Controller.JOYCON_L.device_name()) await hid.set_name(protocol.controller.device_name())
await hid.set_class() await hid.set_class()
logger.info('Advertising the Bluetooth SDP record...') logger.info('Advertising the Bluetooth SDP record...')
@@ -47,7 +49,6 @@ async def create_hid_server(protocol_factory, ctl_psm, itr_psm):
logger.info(f'Accepted connection at psm {itr_psm} from {itr_address}') logger.info(f'Accepted connection at psm {itr_psm} from {itr_address}')
assert ctl_address[0] == itr_address[0] assert ctl_address[0] == itr_address[0]
protocol = protocol_factory()
transport = L2CAP_Transport(asyncio.get_event_loop(), protocol, client_itr, 50) transport = L2CAP_Transport(asyncio.get_event_loop(), protocol, client_itr, 50)
protocol.connection_made(transport) protocol.connection_made(transport)
@@ -58,12 +59,12 @@ async def send_empty_input_reports(transport):
report = InputReport() report = InputReport()
while True: while True:
await transport.write(bytes(report)) await transport.write(report)
await asyncio.sleep(1) await asyncio.sleep(1)
async def main(): async def main():
transport, protocol = await create_hid_server(controller_protocol_factory(Controller.JOYCON_L), 17, 19) transport, protocol = await create_hid_server(controller_protocol_factory(Controller.PRO_CONTROLLER), 17, 19)
# send some empty input reports until the switch decides to reply # send some empty input reports until the switch decides to reply
future = asyncio.ensure_future(send_empty_input_reports(transport)) future = asyncio.ensure_future(send_empty_input_reports(transport))
@@ -74,8 +75,9 @@ async def main():
except asyncio.CancelledError: except asyncio.CancelledError:
pass pass
# stop communication after some time
await asyncio.sleep(60) await asyncio.sleep(60)
logger.info('Stopping communication...')
await transport.close() await transport.close()
+2 -7
View File
@@ -29,17 +29,12 @@ class L2CAP_Transport(asyncio.Transport):
self._input_report_timer = 0x00 self._input_report_timer = 0x00
async def _read(self): async def _read(self):
try:
while True: while True:
await self._is_reading.wait() await self._is_reading.wait()
data = await self._loop.sock_recv(self._sock, self._read_buffer_size) data = await self._loop.sock_recv(self._sock, self._read_buffer_size)
logger.debug(f'received "{data}') logger.debug(f'received "{data}"')
await self._protocol.report_received(data, self._sock.getpeername()) await self._protocol.report_received(data, self._sock.getpeername())
except asyncio.CancelledError:
# reading has been stopped
pass
def is_reading(self) -> bool: def is_reading(self) -> bool:
return self._is_reading.is_set() return self._is_reading.is_set()
@@ -76,7 +71,7 @@ class L2CAP_Transport(asyncio.Transport):
def abort(self) -> None: def abort(self) -> None:
super().abort() super().abort()
def get_extra_info(self, name: Any, default: Any = ...) -> Any: def get_extra_info(self, name: Any, default=None) -> Any:
return self._extra_info.get(name, default) return self._extra_info.get(name, default)
def is_closing(self) -> bool: def is_closing(self) -> bool: