forked from mirror/joycontrol
Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 1511174bcc | |||
| 887d7c8687 | |||
| 2bec09f443 |
@@ -373,7 +373,7 @@ class ControllerProtocol(BaseProtocol):
|
|||||||
input_report.set_misc()
|
input_report.set_misc()
|
||||||
|
|
||||||
input_report.set_ack(0x80)
|
input_report.set_ack(0x80)
|
||||||
input_report.reply_to_subcommand_id(0x03)
|
input_report.reply_to_subcommand_id(SubCommand.SET_INPUT_REPORT_MODE)
|
||||||
|
|
||||||
await self.write(input_report)
|
await self.write(input_report)
|
||||||
|
|
||||||
|
|||||||
@@ -113,6 +113,33 @@ class InputReport:
|
|||||||
for i in range(14, 50):
|
for i in range(14, 50):
|
||||||
self.data[i] = 0x00
|
self.data[i] = 0x00
|
||||||
|
|
||||||
|
def _parse_16_bit_le_triplet(self, offset):
|
||||||
|
"""
|
||||||
|
:param offset: in the data list
|
||||||
|
:returns three int values each packed into two bytes in little endian
|
||||||
|
"""
|
||||||
|
return (self.data[offset] << 8) + self.data[offset + 1], \
|
||||||
|
(self.data[offset + 2] << 8) + self.data[offset + 3], \
|
||||||
|
(self.data[offset + 4] << 8) + self.data[offset + 5]
|
||||||
|
|
||||||
|
def get_imu_data(self):
|
||||||
|
if not 0x30 <= self.get_input_report_id() <= 0x33:
|
||||||
|
raise ValueError('No IMU data!')
|
||||||
|
|
||||||
|
acc, gyro = [], []
|
||||||
|
|
||||||
|
offset = 14
|
||||||
|
for i in range(3):
|
||||||
|
x, y, z = self._parse_16_bit_le_triplet(offset)
|
||||||
|
acc.append((x, y, z))
|
||||||
|
offset += 6
|
||||||
|
|
||||||
|
roll, pitch, yaw = self._parse_16_bit_le_triplet(offset)
|
||||||
|
gyro.append((roll, pitch, yaw))
|
||||||
|
offset += 6
|
||||||
|
|
||||||
|
return acc, gyro
|
||||||
|
|
||||||
def set_ir_nfc_data(self, data):
|
def set_ir_nfc_data(self, data):
|
||||||
if 50 + len(data) > len(self.data):
|
if 50 + len(data) > len(self.data):
|
||||||
raise ValueError('Too much data.')
|
raise ValueError('Too much data.')
|
||||||
|
|||||||
@@ -0,0 +1,78 @@
|
|||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
from joycontrol import logging_default as log
|
||||||
|
from joycontrol.hid import get_blt_hid_device, AsyncHID
|
||||||
|
from joycontrol.report import OutputReport, OutputReportID, SubCommand, InputReport
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
async def _main():
|
||||||
|
logger.info('Waiting for HID devices... Please connect one JoyCon (left OR right), '
|
||||||
|
'or a Pro Controller over Bluetooth. '
|
||||||
|
'Note: The bluez "input" plugin needs to be enabled (default)')
|
||||||
|
|
||||||
|
controller = await get_blt_hid_device()
|
||||||
|
logger.info(f'Found controller "{controller}".')
|
||||||
|
|
||||||
|
timer = 0
|
||||||
|
|
||||||
|
with AsyncHID(path=controller['path'], loop=loop) as hid_controller:
|
||||||
|
# enable imu
|
||||||
|
output_report = OutputReport()
|
||||||
|
output_report.set_output_report_id(OutputReportID.SUB_COMMAND)
|
||||||
|
output_report.set_sub_command(SubCommand.ENABLE_6AXIS_SENSOR)
|
||||||
|
output_report.set_sub_command_data([0x01])
|
||||||
|
output_report.set_timer(timer)
|
||||||
|
timer += 1
|
||||||
|
|
||||||
|
await hid_controller.write(bytes(output_report)[1:])
|
||||||
|
|
||||||
|
# wait for ack
|
||||||
|
while True:
|
||||||
|
data = await hid_controller.read(50)
|
||||||
|
report = InputReport([0xA1] + list(data))
|
||||||
|
if report.get_input_report_id() == 0x21 and report.get_ack() == 0x80:
|
||||||
|
logger.info(f'Ack received {report.get_reply_to_subcommand_id()}')
|
||||||
|
break
|
||||||
|
|
||||||
|
# switch to 0x30 input report mode
|
||||||
|
output_report = OutputReport()
|
||||||
|
output_report.set_output_report_id(OutputReportID.SUB_COMMAND)
|
||||||
|
output_report.set_sub_command(SubCommand.SET_INPUT_REPORT_MODE)
|
||||||
|
output_report.set_sub_command_data([0x30])
|
||||||
|
output_report.set_timer(timer)
|
||||||
|
timer += 1
|
||||||
|
|
||||||
|
await hid_controller.write(bytes(output_report)[1:])
|
||||||
|
|
||||||
|
# wait for ack
|
||||||
|
while True:
|
||||||
|
data = await hid_controller.read(50)
|
||||||
|
report = InputReport([0xA1] + list(data))
|
||||||
|
if report.get_input_report_id() == 0x21 and report.get_ack() == 0x80:
|
||||||
|
logger.info(f'Ack received {report.get_reply_to_subcommand_id()}')
|
||||||
|
break
|
||||||
|
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
data = await hid_controller.read(50)
|
||||||
|
report = InputReport([0xA1] + list(data))
|
||||||
|
acc, gyro = report.get_imu_data()
|
||||||
|
print(np.array(acc))
|
||||||
|
# print(report.data)
|
||||||
|
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
# setup logging
|
||||||
|
log.configure()
|
||||||
|
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
loop.run_until_complete(
|
||||||
|
_main()
|
||||||
|
)
|
||||||
Reference in New Issue
Block a user