forked from mirror/joycontrol
added scripts
This commit is contained in:
@@ -1,9 +1,42 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
|
from contextlib import contextmanager
|
||||||
|
|
||||||
|
import hid
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class AsyncHID(hid.Device):
|
||||||
|
def __init__(self, *args, loop=asyncio.get_event_loop(), **kwargs):
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
self._loop = loop
|
||||||
|
|
||||||
|
self._write_lock = asyncio.Lock()
|
||||||
|
self._read_lock = asyncio.Lock()
|
||||||
|
|
||||||
|
async def read(self, size, timeout=None):
|
||||||
|
async with self._read_lock:
|
||||||
|
return await self._loop.run_in_executor(None, hid.Device.read, self, size, timeout)
|
||||||
|
|
||||||
|
async def write(self, data):
|
||||||
|
async with self._write_lock:
|
||||||
|
return await self._loop.run_in_executor(None, hid.Device.write, self, data)
|
||||||
|
|
||||||
|
|
||||||
|
@contextmanager
|
||||||
|
def get_output(path=None, open_flags='wb', default=None):
|
||||||
|
"""
|
||||||
|
Context manager that open the file a path was given, otherwise returns default value.
|
||||||
|
"""
|
||||||
|
if path is not None:
|
||||||
|
file = open(path, open_flags)
|
||||||
|
yield file
|
||||||
|
file.close()
|
||||||
|
else:
|
||||||
|
yield default
|
||||||
|
|
||||||
|
|
||||||
def get_bit(value, n):
|
def get_bit(value, n):
|
||||||
return (value >> n & 1) != 0
|
return (value >> n & 1) != 0
|
||||||
|
|
||||||
|
|||||||
+2
-15
@@ -8,7 +8,7 @@ from contextlib import contextmanager
|
|||||||
|
|
||||||
from aioconsole import ainput
|
from aioconsole import ainput
|
||||||
|
|
||||||
from joycontrol import logging_default as log
|
from joycontrol import logging_default as log, utils
|
||||||
from joycontrol.command_line_interface import ControllerCLI
|
from joycontrol.command_line_interface import ControllerCLI
|
||||||
from joycontrol.controller import Controller
|
from joycontrol.controller import Controller
|
||||||
from joycontrol.controller_state import ControllerState, button_push
|
from joycontrol.controller_state import ControllerState, button_push
|
||||||
@@ -192,20 +192,7 @@ if __name__ == '__main__':
|
|||||||
with open(args.spi_flash, 'rb') as spi_flash_file:
|
with open(args.spi_flash, 'rb') as spi_flash_file:
|
||||||
spi_flash = FlashMemory(spi_flash_file.read())
|
spi_flash = FlashMemory(spi_flash_file.read())
|
||||||
|
|
||||||
# creates file if arg is given
|
with utils.get_output(path=args.log, default=None) as capture_file:
|
||||||
@contextmanager
|
|
||||||
def get_output(path=None):
|
|
||||||
"""
|
|
||||||
Opens file if path is given
|
|
||||||
"""
|
|
||||||
if path is not None:
|
|
||||||
file = open(path, 'wb')
|
|
||||||
yield file
|
|
||||||
file.close()
|
|
||||||
else:
|
|
||||||
yield None
|
|
||||||
|
|
||||||
with get_output(args.log) as capture_file:
|
|
||||||
loop = asyncio.get_event_loop()
|
loop = asyncio.get_event_loop()
|
||||||
loop.run_until_complete(
|
loop.run_until_complete(
|
||||||
_main(controller,
|
_main(controller,
|
||||||
|
|||||||
@@ -11,7 +11,7 @@ from joycontrol.report import OutputReport, InputReport, SubCommand
|
|||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# TODO: Add Pro Controller
|
||||||
VENDOR_ID = 1406
|
VENDOR_ID = 1406
|
||||||
PRODUCT_ID_JL = 8198
|
PRODUCT_ID_JL = 8198
|
||||||
PRODUCT_ID_JR = 8199
|
PRODUCT_ID_JR = 8199
|
||||||
@@ -0,0 +1,68 @@
|
|||||||
|
import argparse
|
||||||
|
import struct
|
||||||
|
|
||||||
|
from joycontrol.report import InputReport, OutputReport, SubCommand
|
||||||
|
|
||||||
|
""" joycontrol capture parsing example.
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
parse_capture.py <capture_file>
|
||||||
|
parse_capture.py -h | --help
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
def _eof_read(file, size):
|
||||||
|
"""
|
||||||
|
Raises EOFError if end of file is reached.
|
||||||
|
"""
|
||||||
|
data = file.read(size)
|
||||||
|
if not data:
|
||||||
|
raise EOFError()
|
||||||
|
return data
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
parser = argparse.ArgumentParser()
|
||||||
|
parser.add_argument('capture_file')
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
# list of time, report tuples
|
||||||
|
input_reports = []
|
||||||
|
output_reports = []
|
||||||
|
|
||||||
|
with open(args.capture_file, 'rb') as capture:
|
||||||
|
try:
|
||||||
|
start_time = None
|
||||||
|
while True:
|
||||||
|
# parse capture time
|
||||||
|
time = struct.unpack('d', _eof_read(capture, 8))[0]
|
||||||
|
if start_time is None:
|
||||||
|
start_time = time
|
||||||
|
|
||||||
|
# parse data size
|
||||||
|
size = struct.unpack('i', _eof_read(capture, 4))[0]
|
||||||
|
# parse data
|
||||||
|
data = list(_eof_read(capture, size))
|
||||||
|
|
||||||
|
if data[0] == 0xA1:
|
||||||
|
report = InputReport(data)
|
||||||
|
# normalise time
|
||||||
|
input_reports.append((time - start_time, report))
|
||||||
|
elif data[0] == 0xA2:
|
||||||
|
report = OutputReport(data)
|
||||||
|
# normalise time
|
||||||
|
output_reports.append((time - start_time, report))
|
||||||
|
else:
|
||||||
|
raise ValueError(f'Unexpected data.')
|
||||||
|
|
||||||
|
# only interested in pairing
|
||||||
|
if isinstance(report, OutputReport) and report.get_sub_command() == SubCommand.SET_PLAYER_LIGHTS:
|
||||||
|
break
|
||||||
|
except EOFError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
print('Finished parsing reports.')
|
||||||
|
print('Input reports:', len(input_reports))
|
||||||
|
print('Output reports:', len(output_reports))
|
||||||
|
|
||||||
|
# Do some investigation...
|
||||||
File diff suppressed because one or more lines are too long
Reference in New Issue
Block a user