move hid stuff to extra module

This commit is contained in:
Robert Martin
2021-03-06 16:15:53 +01:00
parent ea0396968b
commit fd7b41d018
4 changed files with 40 additions and 33 deletions
+36
View File
@@ -0,0 +1,36 @@
import asyncio
import hid
VENDOR_ID = 1406
PRODUCT_ID_JL = 8198
PRODUCT_ID_JR = 8199
PRODUCT_ID_PC = 8201
async def get_blt_hid_device():
while True:
for device in hid.enumerate(0, 0):
# looking for devices matching Nintendo's vendor id and JoyCon product id
if device['vendor_id'] == VENDOR_ID and device['product_id'] in (
PRODUCT_ID_JL, PRODUCT_ID_JR, PRODUCT_ID_PC):
return device
await asyncio.sleep(2)
class AsyncHID(hid.Device):
def __init__(self, *args, loop=asyncio.get_event_loop(), **kwargs):
super().__init__(*args, **kwargs)
self._loop = loop
self._write_lock = asyncio.Lock()
self._read_lock = asyncio.Lock()
async def read(self, size, timeout=None):
async with self._read_lock:
return await self._loop.run_in_executor(None, hid.Device.read, self, size, timeout)
async def write(self, data):
async with self._write_lock:
return await self._loop.run_in_executor(None, hid.Device.write, self, data)
-19
View File
@@ -2,28 +2,9 @@ import asyncio
import logging
from contextlib import contextmanager
import hid
logger = logging.getLogger(__name__)
class AsyncHID(hid.Device):
def __init__(self, *args, loop=asyncio.get_event_loop(), **kwargs):
super().__init__(*args, **kwargs)
self._loop = loop
self._write_lock = asyncio.Lock()
self._read_lock = asyncio.Lock()
async def read(self, size, timeout=None):
async with self._read_lock:
return await self._loop.run_in_executor(None, hid.Device.read, self, size, timeout)
async def write(self, data):
async with self._write_lock:
return await self._loop.run_in_executor(None, hid.Device.write, self, data)
@contextmanager
def get_output(path=None, open_flags='wb', default=None):
"""
View File
+4 -14
View File
@@ -2,11 +2,9 @@ import asyncio
import logging
import os
import hid
from joycontrol import logging_default as log
from joycontrol.hid import get_blt_hid_device, AsyncHID
from joycontrol.report import InputReport, OutputReport, OutputReportID, SubCommand
from joycontrol.utils import AsyncHID
logger = logging.getLogger(__name__)
@@ -75,19 +73,11 @@ async def send_vibration_report(hid_device):
async def _main(loop):
logger.info('Waiting for HID devices... Please connect one JoyCon (left OR right), or a Pro Controller over Bluetooth. '
logger.info('Waiting for HID devices... Please connect one JoyCon (left OR right), '
'or a Pro Controller over Bluetooth. '
'Note: The bluez "input" plugin needs to be enabled (default)')
controller = None
while controller is None:
for device in hid.enumerate(0, 0):
# looking for devices matching Nintendo's vendor id and JoyCon product id
if device['vendor_id'] == VENDOR_ID and device['product_id'] in (PRODUCT_ID_JL, PRODUCT_ID_JR, PRODUCT_ID_PC):
controller = device
break
else:
await asyncio.sleep(2)
controller = await get_blt_hid_device()
logger.info(f'Found controller "{controller}".')
with AsyncHID(path=controller['path'], loop=loop) as hid_controller: