forked from mirror/joycontrol
Compare commits
11 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 1511174bcc | |||
| 887d7c8687 | |||
| 2bec09f443 | |||
| be4df7ee57 | |||
| ab37bf5182 | |||
| d936717580 | |||
| fd7b41d018 | |||
| ea0396968b | |||
| 1d2123149f | |||
| a21fc76405 | |||
| 779a198ca8 |
@@ -0,0 +1,36 @@
|
|||||||
|
import asyncio
|
||||||
|
|
||||||
|
import hid
|
||||||
|
|
||||||
|
VENDOR_ID = 1406
|
||||||
|
PRODUCT_ID_JL = 8198
|
||||||
|
PRODUCT_ID_JR = 8199
|
||||||
|
PRODUCT_ID_PC = 8201
|
||||||
|
|
||||||
|
|
||||||
|
async def get_blt_hid_device():
|
||||||
|
while True:
|
||||||
|
for device in hid.enumerate(0, 0):
|
||||||
|
# looking for devices matching Nintendo's vendor id and JoyCon product id
|
||||||
|
if device['vendor_id'] == VENDOR_ID and device['product_id'] in (
|
||||||
|
PRODUCT_ID_JL, PRODUCT_ID_JR, PRODUCT_ID_PC):
|
||||||
|
return device
|
||||||
|
|
||||||
|
await asyncio.sleep(2)
|
||||||
|
|
||||||
|
|
||||||
|
class AsyncHID(hid.Device):
|
||||||
|
def __init__(self, *args, loop=asyncio.get_event_loop(), **kwargs):
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
self._loop = loop
|
||||||
|
|
||||||
|
self._write_lock = asyncio.Lock()
|
||||||
|
self._read_lock = asyncio.Lock()
|
||||||
|
|
||||||
|
async def read(self, size, timeout=None):
|
||||||
|
async with self._read_lock:
|
||||||
|
return await self._loop.run_in_executor(None, hid.Device.read, self, size, timeout)
|
||||||
|
|
||||||
|
async def write(self, data):
|
||||||
|
async with self._write_lock:
|
||||||
|
return await self._loop.run_in_executor(None, hid.Device.write, self, data)
|
||||||
@@ -373,7 +373,7 @@ class ControllerProtocol(BaseProtocol):
|
|||||||
input_report.set_misc()
|
input_report.set_misc()
|
||||||
|
|
||||||
input_report.set_ack(0x80)
|
input_report.set_ack(0x80)
|
||||||
input_report.reply_to_subcommand_id(0x03)
|
input_report.reply_to_subcommand_id(SubCommand.SET_INPUT_REPORT_MODE)
|
||||||
|
|
||||||
await self.write(input_report)
|
await self.write(input_report)
|
||||||
|
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
import math
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
|
|
||||||
from joycontrol.controller import Controller
|
from joycontrol.controller import Controller
|
||||||
@@ -112,6 +113,33 @@ class InputReport:
|
|||||||
for i in range(14, 50):
|
for i in range(14, 50):
|
||||||
self.data[i] = 0x00
|
self.data[i] = 0x00
|
||||||
|
|
||||||
|
def _parse_16_bit_le_triplet(self, offset):
|
||||||
|
"""
|
||||||
|
:param offset: in the data list
|
||||||
|
:returns three int values each packed into two bytes in little endian
|
||||||
|
"""
|
||||||
|
return (self.data[offset] << 8) + self.data[offset + 1], \
|
||||||
|
(self.data[offset + 2] << 8) + self.data[offset + 3], \
|
||||||
|
(self.data[offset + 4] << 8) + self.data[offset + 5]
|
||||||
|
|
||||||
|
def get_imu_data(self):
|
||||||
|
if not 0x30 <= self.get_input_report_id() <= 0x33:
|
||||||
|
raise ValueError('No IMU data!')
|
||||||
|
|
||||||
|
acc, gyro = [], []
|
||||||
|
|
||||||
|
offset = 14
|
||||||
|
for i in range(3):
|
||||||
|
x, y, z = self._parse_16_bit_le_triplet(offset)
|
||||||
|
acc.append((x, y, z))
|
||||||
|
offset += 6
|
||||||
|
|
||||||
|
roll, pitch, yaw = self._parse_16_bit_le_triplet(offset)
|
||||||
|
gyro.append((roll, pitch, yaw))
|
||||||
|
offset += 6
|
||||||
|
|
||||||
|
return acc, gyro
|
||||||
|
|
||||||
def set_ir_nfc_data(self, data):
|
def set_ir_nfc_data(self, data):
|
||||||
if 50 + len(data) > len(self.data):
|
if 50 + len(data) > len(self.data):
|
||||||
raise ValueError('Too much data.')
|
raise ValueError('Too much data.')
|
||||||
@@ -269,6 +297,69 @@ class OutputReport:
|
|||||||
def get_rumble_data(self):
|
def get_rumble_data(self):
|
||||||
return self.data[3:11]
|
return self.data[3:11]
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _encode_rumble_data(freq, amp):
|
||||||
|
# TODO: Fix LA Byte 2
|
||||||
|
|
||||||
|
if not (40 <= freq <= 1253):
|
||||||
|
raise ValueError('Frequency must be in [40, 1253].')
|
||||||
|
|
||||||
|
if amp > 1.003:
|
||||||
|
raise ValueError('Amplitudes higher than 1.003 are not safe '
|
||||||
|
'for the integrity of the linear resonant actuators')
|
||||||
|
|
||||||
|
# Float frequency to hex conversion
|
||||||
|
encoded_hex_freq = int(round(math.log2(freq / 10) * 32))
|
||||||
|
|
||||||
|
# Convert to Joy-Con HF range. Range in big-endian: 0x0004-0x01FC with +0x0004 steps.
|
||||||
|
if freq <= 80:
|
||||||
|
hf = 0x00
|
||||||
|
else:
|
||||||
|
hf = (encoded_hex_freq - 0x60) * 4
|
||||||
|
|
||||||
|
# Convert to Joy-Con LF range. Range: 0x01-0x7F.
|
||||||
|
if freq >= 640:
|
||||||
|
lf = 0x00
|
||||||
|
else:
|
||||||
|
lf = encoded_hex_freq - 0x40
|
||||||
|
|
||||||
|
# Float amplitude to hex conversion
|
||||||
|
encoded_hex_amp = 0
|
||||||
|
if amp > 0.23:
|
||||||
|
encoded_hex_amp = int(round(math.log2(amp * 8.7) * 32))
|
||||||
|
elif amp > 0.12:
|
||||||
|
encoded_hex_amp = int(round(math.log2(amp * 17) * 16))
|
||||||
|
else:
|
||||||
|
# TBD
|
||||||
|
pass
|
||||||
|
|
||||||
|
hf_amp = encoded_hex_amp << 1
|
||||||
|
lf_amp = (encoded_hex_amp >> 1) + 0x40
|
||||||
|
|
||||||
|
return hf, hf_amp, lf, lf_amp
|
||||||
|
|
||||||
|
def set_left_rumble_data(self, freq, amp):
|
||||||
|
hf, hf_amp, lf, lf_amp = OutputReport._encode_rumble_data(freq, amp)
|
||||||
|
|
||||||
|
# Byte swapping
|
||||||
|
self.data[3] = hf & 0xFF
|
||||||
|
self.data[4] = hf_amp + ((hf >> 8) & 0xFF) # Add amp + 1st byte of frequency to amplitude byte
|
||||||
|
|
||||||
|
# Byte swapping
|
||||||
|
self.data[5] = lf + ((lf_amp >> 8) & 0xFF) # Add freq + 1st byte of LF amplitude to the frequency byte
|
||||||
|
self.data[6] = lf_amp & 0xFF
|
||||||
|
|
||||||
|
def set_right_rumble_data(self, freq, amp):
|
||||||
|
hf, hf_amp, lf, lf_amp = OutputReport._encode_rumble_data(freq, amp)
|
||||||
|
|
||||||
|
# Byte swapping
|
||||||
|
self.data[7] = hf & 0xFF
|
||||||
|
self.data[8] = hf_amp + ((hf >> 8) & 0xFF) # Add amp + 1st byte of frequency to amplitude byte
|
||||||
|
|
||||||
|
# Byte swapping
|
||||||
|
self.data[9] = lf + ((lf_amp >> 8) & 0xFF) # Add freq + 1st byte of LF amplitude to the frequency byte
|
||||||
|
self.data[10] = lf_amp & 0xFF
|
||||||
|
|
||||||
def get_sub_command(self):
|
def get_sub_command(self):
|
||||||
if len(self.data) < 12:
|
if len(self.data) < 12:
|
||||||
return None
|
return None
|
||||||
|
|||||||
@@ -2,28 +2,9 @@ import asyncio
|
|||||||
import logging
|
import logging
|
||||||
from contextlib import contextmanager
|
from contextlib import contextmanager
|
||||||
|
|
||||||
import hid
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class AsyncHID(hid.Device):
|
|
||||||
def __init__(self, *args, loop=asyncio.get_event_loop(), **kwargs):
|
|
||||||
super().__init__(*args, **kwargs)
|
|
||||||
self._loop = loop
|
|
||||||
|
|
||||||
self._write_lock = asyncio.Lock()
|
|
||||||
self._read_lock = asyncio.Lock()
|
|
||||||
|
|
||||||
async def read(self, size, timeout=None):
|
|
||||||
async with self._read_lock:
|
|
||||||
return await self._loop.run_in_executor(None, hid.Device.read, self, size, timeout)
|
|
||||||
|
|
||||||
async def write(self, data):
|
|
||||||
async with self._write_lock:
|
|
||||||
return await self._loop.run_in_executor(None, hid.Device.write, self, data)
|
|
||||||
|
|
||||||
|
|
||||||
@contextmanager
|
@contextmanager
|
||||||
def get_output(path=None, open_flags='wb', default=None):
|
def get_output(path=None, open_flags='wb', default=None):
|
||||||
"""
|
"""
|
||||||
|
|||||||
@@ -0,0 +1,78 @@
|
|||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
from joycontrol import logging_default as log
|
||||||
|
from joycontrol.hid import get_blt_hid_device, AsyncHID
|
||||||
|
from joycontrol.report import OutputReport, OutputReportID, SubCommand, InputReport
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
async def _main():
|
||||||
|
logger.info('Waiting for HID devices... Please connect one JoyCon (left OR right), '
|
||||||
|
'or a Pro Controller over Bluetooth. '
|
||||||
|
'Note: The bluez "input" plugin needs to be enabled (default)')
|
||||||
|
|
||||||
|
controller = await get_blt_hid_device()
|
||||||
|
logger.info(f'Found controller "{controller}".')
|
||||||
|
|
||||||
|
timer = 0
|
||||||
|
|
||||||
|
with AsyncHID(path=controller['path'], loop=loop) as hid_controller:
|
||||||
|
# enable imu
|
||||||
|
output_report = OutputReport()
|
||||||
|
output_report.set_output_report_id(OutputReportID.SUB_COMMAND)
|
||||||
|
output_report.set_sub_command(SubCommand.ENABLE_6AXIS_SENSOR)
|
||||||
|
output_report.set_sub_command_data([0x01])
|
||||||
|
output_report.set_timer(timer)
|
||||||
|
timer += 1
|
||||||
|
|
||||||
|
await hid_controller.write(bytes(output_report)[1:])
|
||||||
|
|
||||||
|
# wait for ack
|
||||||
|
while True:
|
||||||
|
data = await hid_controller.read(50)
|
||||||
|
report = InputReport([0xA1] + list(data))
|
||||||
|
if report.get_input_report_id() == 0x21 and report.get_ack() == 0x80:
|
||||||
|
logger.info(f'Ack received {report.get_reply_to_subcommand_id()}')
|
||||||
|
break
|
||||||
|
|
||||||
|
# switch to 0x30 input report mode
|
||||||
|
output_report = OutputReport()
|
||||||
|
output_report.set_output_report_id(OutputReportID.SUB_COMMAND)
|
||||||
|
output_report.set_sub_command(SubCommand.SET_INPUT_REPORT_MODE)
|
||||||
|
output_report.set_sub_command_data([0x30])
|
||||||
|
output_report.set_timer(timer)
|
||||||
|
timer += 1
|
||||||
|
|
||||||
|
await hid_controller.write(bytes(output_report)[1:])
|
||||||
|
|
||||||
|
# wait for ack
|
||||||
|
while True:
|
||||||
|
data = await hid_controller.read(50)
|
||||||
|
report = InputReport([0xA1] + list(data))
|
||||||
|
if report.get_input_report_id() == 0x21 and report.get_ack() == 0x80:
|
||||||
|
logger.info(f'Ack received {report.get_reply_to_subcommand_id()}')
|
||||||
|
break
|
||||||
|
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
data = await hid_controller.read(50)
|
||||||
|
report = InputReport([0xA1] + list(data))
|
||||||
|
acc, gyro = report.get_imu_data()
|
||||||
|
print(np.array(acc))
|
||||||
|
# print(report.data)
|
||||||
|
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
# setup logging
|
||||||
|
log.configure()
|
||||||
|
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
loop.run_until_complete(
|
||||||
|
_main()
|
||||||
|
)
|
||||||
@@ -0,0 +1,110 @@
|
|||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
|
||||||
|
from joycontrol import logging_default as log
|
||||||
|
from joycontrol.hid import get_blt_hid_device, AsyncHID
|
||||||
|
from joycontrol.report import InputReport, OutputReport, OutputReportID, SubCommand
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
"""
|
||||||
|
Sends some vibration reports to a joycon. Only works with the right joycon atm.
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
async def print_outputs(hid_device):
|
||||||
|
while True:
|
||||||
|
data = await hid_device.read(255)
|
||||||
|
# add byte for input report
|
||||||
|
data = b'\xa1' + data
|
||||||
|
|
||||||
|
input_report = InputReport(list(data))
|
||||||
|
vibrator_input = input_report.data[13]
|
||||||
|
# print(hex(vibrator_input))
|
||||||
|
|
||||||
|
|
||||||
|
async def send_vibration_report(hid_device):
|
||||||
|
reader = asyncio.ensure_future(print_outputs(hid_device))
|
||||||
|
|
||||||
|
CHANGE_INPUT_REPORT_MODE = [1, 8, 0, 0, 0, 0, 0, 1, 64, 64, 3, 48, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
|
||||||
|
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
|
||||||
|
data = CHANGE_INPUT_REPORT_MODE
|
||||||
|
print('writing', data)
|
||||||
|
await hid_device.write(bytes(data))
|
||||||
|
await asyncio.sleep(0.1)
|
||||||
|
|
||||||
|
report = OutputReport()
|
||||||
|
report.set_timer(1)
|
||||||
|
report.set_output_report_id(OutputReportID.SUB_COMMAND)
|
||||||
|
report.set_sub_command(SubCommand.ENABLE_VIBRATION)
|
||||||
|
report.set_sub_command_data([0x01])
|
||||||
|
data = bytes(report)[1:]
|
||||||
|
|
||||||
|
print('writing', data)
|
||||||
|
await hid_device.write(bytes(data))
|
||||||
|
await asyncio.sleep(0.1)
|
||||||
|
|
||||||
|
scale = [261.63, 293.66, 329.63, 349.23, 392.00, 440.00, 493.88, 523.25]
|
||||||
|
scale = [int(round(n)) for n in scale]
|
||||||
|
|
||||||
|
amp = 1
|
||||||
|
time = 2
|
||||||
|
while True:
|
||||||
|
rumble_report = OutputReport()
|
||||||
|
report.set_timer(time)
|
||||||
|
time += 1
|
||||||
|
rumble_report.set_output_report_id(OutputReportID.RUMBLE_ONLY)
|
||||||
|
# increase frequency
|
||||||
|
rumble_report.set_right_rumble_data(scale[time % len(scale)], amp)
|
||||||
|
data = bytes(rumble_report)[1:]
|
||||||
|
print('writing', data)
|
||||||
|
await hid_device.write(bytes(data))
|
||||||
|
|
||||||
|
await asyncio.sleep(.2)
|
||||||
|
|
||||||
|
try:
|
||||||
|
await reader
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
async def _main(loop):
|
||||||
|
logger.info('Waiting for HID devices... Please connect one JoyCon (left OR right), '
|
||||||
|
'or a Pro Controller over Bluetooth. '
|
||||||
|
'Note: The bluez "input" plugin needs to be enabled (default)')
|
||||||
|
|
||||||
|
controller = await get_blt_hid_device()
|
||||||
|
logger.info(f'Found controller "{controller}".')
|
||||||
|
|
||||||
|
with AsyncHID(path=controller['path'], loop=loop) as hid_controller:
|
||||||
|
await send_vibration_report(hid_controller)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
# check if root
|
||||||
|
if os.geteuid() != 0:
|
||||||
|
raise PermissionError('Script must be run as root!')
|
||||||
|
|
||||||
|
# h = lambda bla: list(map(hex, bla))
|
||||||
|
# report = OutputReport()
|
||||||
|
# report.set_left_rumble_data(1253, 0.012)
|
||||||
|
# exit()
|
||||||
|
|
||||||
|
# setup logging
|
||||||
|
log.configure()
|
||||||
|
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
task = asyncio.ensure_future(_main(loop))
|
||||||
|
|
||||||
|
try:
|
||||||
|
loop.run_until_complete(task)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
task.cancel()
|
||||||
|
try:
|
||||||
|
loop.run_until_complete(task)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
loop.stop()
|
||||||
|
loop.close()
|
||||||
Reference in New Issue
Block a user