diff --git a/joycontrol/hid.py b/joycontrol/hid.py new file mode 100644 index 0000000..446f215 --- /dev/null +++ b/joycontrol/hid.py @@ -0,0 +1,36 @@ +import asyncio + +import hid + +VENDOR_ID = 1406 +PRODUCT_ID_JL = 8198 +PRODUCT_ID_JR = 8199 +PRODUCT_ID_PC = 8201 + + +async def get_blt_hid_device(): + while True: + for device in hid.enumerate(0, 0): + # looking for devices matching Nintendo's vendor id and JoyCon product id + if device['vendor_id'] == VENDOR_ID and device['product_id'] in ( + PRODUCT_ID_JL, PRODUCT_ID_JR, PRODUCT_ID_PC): + return device + + await asyncio.sleep(2) + + +class AsyncHID(hid.Device): + def __init__(self, *args, loop=asyncio.get_event_loop(), **kwargs): + super().__init__(*args, **kwargs) + self._loop = loop + + self._write_lock = asyncio.Lock() + self._read_lock = asyncio.Lock() + + async def read(self, size, timeout=None): + async with self._read_lock: + return await self._loop.run_in_executor(None, hid.Device.read, self, size, timeout) + + async def write(self, data): + async with self._write_lock: + return await self._loop.run_in_executor(None, hid.Device.write, self, data) \ No newline at end of file diff --git a/joycontrol/report.py b/joycontrol/report.py index 936ca0a..440464e 100644 --- a/joycontrol/report.py +++ b/joycontrol/report.py @@ -1,3 +1,4 @@ +import math from enum import Enum from joycontrol.controller import Controller @@ -296,6 +297,69 @@ class OutputReport: def get_rumble_data(self): return self.data[3:11] + @staticmethod + def _encode_rumble_data(freq, amp): + # TODO: Fix LA Byte 2 + + if not (40 <= freq <= 1253): + raise ValueError('Frequency must be in [40, 1253].') + + if amp > 1.003: + raise ValueError('Amplitudes higher than 1.003 are not safe ' + 'for the integrity of the linear resonant actuators') + + # Float frequency to hex conversion + encoded_hex_freq = int(round(math.log2(freq / 10) * 32)) + + # Convert to Joy-Con HF range. Range in big-endian: 0x0004-0x01FC with +0x0004 steps. + if freq <= 80: + hf = 0x00 + else: + hf = (encoded_hex_freq - 0x60) * 4 + + # Convert to Joy-Con LF range. Range: 0x01-0x7F. + if freq >= 640: + lf = 0x00 + else: + lf = encoded_hex_freq - 0x40 + + # Float amplitude to hex conversion + encoded_hex_amp = 0 + if amp > 0.23: + encoded_hex_amp = int(round(math.log2(amp * 8.7) * 32)) + elif amp > 0.12: + encoded_hex_amp = int(round(math.log2(amp * 17) * 16)) + else: + # TBD + pass + + hf_amp = encoded_hex_amp << 1 + lf_amp = (encoded_hex_amp >> 1) + 0x40 + + return hf, hf_amp, lf, lf_amp + + def set_left_rumble_data(self, freq, amp): + hf, hf_amp, lf, lf_amp = OutputReport._encode_rumble_data(freq, amp) + + # Byte swapping + self.data[3] = hf & 0xFF + self.data[4] = hf_amp + ((hf >> 8) & 0xFF) # Add amp + 1st byte of frequency to amplitude byte + + # Byte swapping + self.data[5] = lf + ((lf_amp >> 8) & 0xFF) # Add freq + 1st byte of LF amplitude to the frequency byte + self.data[6] = lf_amp & 0xFF + + def set_right_rumble_data(self, freq, amp): + hf, hf_amp, lf, lf_amp = OutputReport._encode_rumble_data(freq, amp) + + # Byte swapping + self.data[7] = hf & 0xFF + self.data[8] = hf_amp + ((hf >> 8) & 0xFF) # Add amp + 1st byte of frequency to amplitude byte + + # Byte swapping + self.data[9] = lf + ((lf_amp >> 8) & 0xFF) # Add freq + 1st byte of LF amplitude to the frequency byte + self.data[10] = lf_amp & 0xFF + def get_sub_command(self): if len(self.data) < 12: return None diff --git a/joycontrol/utils.py b/joycontrol/utils.py index 4eadc31..496cb02 100644 --- a/joycontrol/utils.py +++ b/joycontrol/utils.py @@ -2,28 +2,9 @@ import asyncio import logging from contextlib import contextmanager -import hid - logger = logging.getLogger(__name__) -class AsyncHID(hid.Device): - def __init__(self, *args, loop=asyncio.get_event_loop(), **kwargs): - super().__init__(*args, **kwargs) - self._loop = loop - - self._write_lock = asyncio.Lock() - self._read_lock = asyncio.Lock() - - async def read(self, size, timeout=None): - async with self._read_lock: - return await self._loop.run_in_executor(None, hid.Device.read, self, size, timeout) - - async def write(self, data): - async with self._write_lock: - return await self._loop.run_in_executor(None, hid.Device.write, self, data) - - @contextmanager def get_output(path=None, open_flags='wb', default=None): """ diff --git a/scripts/__init__.py b/scripts/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/scripts/joycon_rumble.py b/scripts/joycon_rumble.py new file mode 100644 index 0000000..72ad027 --- /dev/null +++ b/scripts/joycon_rumble.py @@ -0,0 +1,110 @@ +import asyncio +import logging +import os + +from joycontrol import logging_default as log +from joycontrol.hid import get_blt_hid_device, AsyncHID +from joycontrol.report import InputReport, OutputReport, OutputReportID, SubCommand + +logger = logging.getLogger(__name__) + +""" +Sends some vibration reports to a joycon. Only works with the right joycon atm. +""" + + +async def print_outputs(hid_device): + while True: + data = await hid_device.read(255) + # add byte for input report + data = b'\xa1' + data + + input_report = InputReport(list(data)) + vibrator_input = input_report.data[13] + # print(hex(vibrator_input)) + + +async def send_vibration_report(hid_device): + reader = asyncio.ensure_future(print_outputs(hid_device)) + + CHANGE_INPUT_REPORT_MODE = [1, 8, 0, 0, 0, 0, 0, 1, 64, 64, 3, 48, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] + data = CHANGE_INPUT_REPORT_MODE + print('writing', data) + await hid_device.write(bytes(data)) + await asyncio.sleep(0.1) + + report = OutputReport() + report.set_timer(1) + report.set_output_report_id(OutputReportID.SUB_COMMAND) + report.set_sub_command(SubCommand.ENABLE_VIBRATION) + report.set_sub_command_data([0x01]) + data = bytes(report)[1:] + + print('writing', data) + await hid_device.write(bytes(data)) + await asyncio.sleep(0.1) + + scale = [261.63, 293.66, 329.63, 349.23, 392.00, 440.00, 493.88, 523.25] + scale = [int(round(n)) for n in scale] + + amp = 1 + time = 2 + while True: + rumble_report = OutputReport() + report.set_timer(time) + time += 1 + rumble_report.set_output_report_id(OutputReportID.RUMBLE_ONLY) + # increase frequency + rumble_report.set_right_rumble_data(scale[time % len(scale)], amp) + data = bytes(rumble_report)[1:] + print('writing', data) + await hid_device.write(bytes(data)) + + await asyncio.sleep(.2) + + try: + await reader + except KeyboardInterrupt: + pass + + +async def _main(loop): + logger.info('Waiting for HID devices... Please connect one JoyCon (left OR right), ' + 'or a Pro Controller over Bluetooth. ' + 'Note: The bluez "input" plugin needs to be enabled (default)') + + controller = await get_blt_hid_device() + logger.info(f'Found controller "{controller}".') + + with AsyncHID(path=controller['path'], loop=loop) as hid_controller: + await send_vibration_report(hid_controller) + + +if __name__ == '__main__': + # check if root + if os.geteuid() != 0: + raise PermissionError('Script must be run as root!') + + # h = lambda bla: list(map(hex, bla)) + # report = OutputReport() + # report.set_left_rumble_data(1253, 0.012) + # exit() + + # setup logging + log.configure() + + loop = asyncio.get_event_loop() + task = asyncio.ensure_future(_main(loop)) + + try: + loop.run_until_complete(task) + except KeyboardInterrupt: + task.cancel() + try: + loop.run_until_complete(task) + except asyncio.CancelledError: + pass + finally: + loop.stop() + loop.close()