Merge branch 'rumble_experiments' into imu_data_experiments

This commit is contained in:
Robert Martin
2021-03-07 13:31:27 +01:00
5 changed files with 210 additions and 19 deletions
+36
View File
@@ -0,0 +1,36 @@
import asyncio
import hid
VENDOR_ID = 1406
PRODUCT_ID_JL = 8198
PRODUCT_ID_JR = 8199
PRODUCT_ID_PC = 8201
async def get_blt_hid_device():
while True:
for device in hid.enumerate(0, 0):
# looking for devices matching Nintendo's vendor id and JoyCon product id
if device['vendor_id'] == VENDOR_ID and device['product_id'] in (
PRODUCT_ID_JL, PRODUCT_ID_JR, PRODUCT_ID_PC):
return device
await asyncio.sleep(2)
class AsyncHID(hid.Device):
def __init__(self, *args, loop=asyncio.get_event_loop(), **kwargs):
super().__init__(*args, **kwargs)
self._loop = loop
self._write_lock = asyncio.Lock()
self._read_lock = asyncio.Lock()
async def read(self, size, timeout=None):
async with self._read_lock:
return await self._loop.run_in_executor(None, hid.Device.read, self, size, timeout)
async def write(self, data):
async with self._write_lock:
return await self._loop.run_in_executor(None, hid.Device.write, self, data)
+64
View File
@@ -1,3 +1,4 @@
import math
from enum import Enum
from joycontrol.controller import Controller
@@ -296,6 +297,69 @@ class OutputReport:
def get_rumble_data(self):
return self.data[3:11]
@staticmethod
def _encode_rumble_data(freq, amp):
# TODO: Fix LA Byte 2
if not (40 <= freq <= 1253):
raise ValueError('Frequency must be in [40, 1253].')
if amp > 1.003:
raise ValueError('Amplitudes higher than 1.003 are not safe '
'for the integrity of the linear resonant actuators')
# Float frequency to hex conversion
encoded_hex_freq = int(round(math.log2(freq / 10) * 32))
# Convert to Joy-Con HF range. Range in big-endian: 0x0004-0x01FC with +0x0004 steps.
if freq <= 80:
hf = 0x00
else:
hf = (encoded_hex_freq - 0x60) * 4
# Convert to Joy-Con LF range. Range: 0x01-0x7F.
if freq >= 640:
lf = 0x00
else:
lf = encoded_hex_freq - 0x40
# Float amplitude to hex conversion
encoded_hex_amp = 0
if amp > 0.23:
encoded_hex_amp = int(round(math.log2(amp * 8.7) * 32))
elif amp > 0.12:
encoded_hex_amp = int(round(math.log2(amp * 17) * 16))
else:
# TBD
pass
hf_amp = encoded_hex_amp << 1
lf_amp = (encoded_hex_amp >> 1) + 0x40
return hf, hf_amp, lf, lf_amp
def set_left_rumble_data(self, freq, amp):
hf, hf_amp, lf, lf_amp = OutputReport._encode_rumble_data(freq, amp)
# Byte swapping
self.data[3] = hf & 0xFF
self.data[4] = hf_amp + ((hf >> 8) & 0xFF) # Add amp + 1st byte of frequency to amplitude byte
# Byte swapping
self.data[5] = lf + ((lf_amp >> 8) & 0xFF) # Add freq + 1st byte of LF amplitude to the frequency byte
self.data[6] = lf_amp & 0xFF
def set_right_rumble_data(self, freq, amp):
hf, hf_amp, lf, lf_amp = OutputReport._encode_rumble_data(freq, amp)
# Byte swapping
self.data[7] = hf & 0xFF
self.data[8] = hf_amp + ((hf >> 8) & 0xFF) # Add amp + 1st byte of frequency to amplitude byte
# Byte swapping
self.data[9] = lf + ((lf_amp >> 8) & 0xFF) # Add freq + 1st byte of LF amplitude to the frequency byte
self.data[10] = lf_amp & 0xFF
def get_sub_command(self):
if len(self.data) < 12:
return None
-19
View File
@@ -2,28 +2,9 @@ import asyncio
import logging
from contextlib import contextmanager
import hid
logger = logging.getLogger(__name__)
class AsyncHID(hid.Device):
def __init__(self, *args, loop=asyncio.get_event_loop(), **kwargs):
super().__init__(*args, **kwargs)
self._loop = loop
self._write_lock = asyncio.Lock()
self._read_lock = asyncio.Lock()
async def read(self, size, timeout=None):
async with self._read_lock:
return await self._loop.run_in_executor(None, hid.Device.read, self, size, timeout)
async def write(self, data):
async with self._write_lock:
return await self._loop.run_in_executor(None, hid.Device.write, self, data)
@contextmanager
def get_output(path=None, open_flags='wb', default=None):
"""
View File
+110
View File
@@ -0,0 +1,110 @@
import asyncio
import logging
import os
from joycontrol import logging_default as log
from joycontrol.hid import get_blt_hid_device, AsyncHID
from joycontrol.report import InputReport, OutputReport, OutputReportID, SubCommand
logger = logging.getLogger(__name__)
"""
Sends some vibration reports to a joycon. Only works with the right joycon atm.
"""
async def print_outputs(hid_device):
while True:
data = await hid_device.read(255)
# add byte for input report
data = b'\xa1' + data
input_report = InputReport(list(data))
vibrator_input = input_report.data[13]
# print(hex(vibrator_input))
async def send_vibration_report(hid_device):
reader = asyncio.ensure_future(print_outputs(hid_device))
CHANGE_INPUT_REPORT_MODE = [1, 8, 0, 0, 0, 0, 0, 1, 64, 64, 3, 48, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
data = CHANGE_INPUT_REPORT_MODE
print('writing', data)
await hid_device.write(bytes(data))
await asyncio.sleep(0.1)
report = OutputReport()
report.set_timer(1)
report.set_output_report_id(OutputReportID.SUB_COMMAND)
report.set_sub_command(SubCommand.ENABLE_VIBRATION)
report.set_sub_command_data([0x01])
data = bytes(report)[1:]
print('writing', data)
await hid_device.write(bytes(data))
await asyncio.sleep(0.1)
scale = [261.63, 293.66, 329.63, 349.23, 392.00, 440.00, 493.88, 523.25]
scale = [int(round(n)) for n in scale]
amp = 1
time = 2
while True:
rumble_report = OutputReport()
report.set_timer(time)
time += 1
rumble_report.set_output_report_id(OutputReportID.RUMBLE_ONLY)
# increase frequency
rumble_report.set_right_rumble_data(scale[time % len(scale)], amp)
data = bytes(rumble_report)[1:]
print('writing', data)
await hid_device.write(bytes(data))
await asyncio.sleep(.2)
try:
await reader
except KeyboardInterrupt:
pass
async def _main(loop):
logger.info('Waiting for HID devices... Please connect one JoyCon (left OR right), '
'or a Pro Controller over Bluetooth. '
'Note: The bluez "input" plugin needs to be enabled (default)')
controller = await get_blt_hid_device()
logger.info(f'Found controller "{controller}".')
with AsyncHID(path=controller['path'], loop=loop) as hid_controller:
await send_vibration_report(hid_controller)
if __name__ == '__main__':
# check if root
if os.geteuid() != 0:
raise PermissionError('Script must be run as root!')
# h = lambda bla: list(map(hex, bla))
# report = OutputReport()
# report.set_left_rumble_data(1253, 0.012)
# exit()
# setup logging
log.configure()
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(_main(loop))
try:
loop.run_until_complete(task)
except KeyboardInterrupt:
task.cancel()
try:
loop.run_until_complete(task)
except asyncio.CancelledError:
pass
finally:
loop.stop()
loop.close()