forked from mirror/joycontrol
improved 0x30 input report mode performance, better error handling
This commit is contained in:
@@ -0,0 +1,119 @@
|
|||||||
|
import inspect
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from aioconsole import ainput
|
||||||
|
|
||||||
|
from joycontrol.controller_state import button_push, ControllerState
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class ControllerCLI:
|
||||||
|
def __init__(self, controller_state: ControllerState):
|
||||||
|
self.controller_state = controller_state
|
||||||
|
self.commands = {}
|
||||||
|
|
||||||
|
async def cmd_help(self):
|
||||||
|
print('Buttons can be used as commands: ', ', '.join(self.controller_state.button_state.get_available_buttons()))
|
||||||
|
|
||||||
|
for name, fun in inspect.getmembers(self):
|
||||||
|
if name.startswith('cmd_') and fun.__doc__:
|
||||||
|
print(fun.__doc__)
|
||||||
|
|
||||||
|
print('Commands can be chained using "&&"')
|
||||||
|
print('Type "exit" to close.')
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _set_stick(stick, direction, value):
|
||||||
|
if direction == 'center':
|
||||||
|
stick.set_center()
|
||||||
|
elif direction == 'up':
|
||||||
|
stick.set_up()
|
||||||
|
elif direction == 'down':
|
||||||
|
stick.set_down()
|
||||||
|
elif direction == 'left':
|
||||||
|
stick.set_left()
|
||||||
|
elif direction == 'right':
|
||||||
|
stick.set_right()
|
||||||
|
elif direction in ('h', 'horizontal'):
|
||||||
|
if value is None:
|
||||||
|
raise ValueError(f'Missing value')
|
||||||
|
try:
|
||||||
|
val = int(value)
|
||||||
|
except ValueError:
|
||||||
|
raise ValueError(f'Unexpected stick value "{value}"')
|
||||||
|
stick.set_h(val)
|
||||||
|
elif direction in ('v', 'vertical'):
|
||||||
|
if value is None:
|
||||||
|
raise ValueError(f'Missing value')
|
||||||
|
try:
|
||||||
|
val = int(value)
|
||||||
|
except ValueError:
|
||||||
|
raise ValueError(f'Unexpected stick value "{value}"')
|
||||||
|
stick.set_v(val)
|
||||||
|
else:
|
||||||
|
raise ValueError(f'Unexpected argument "{direction}"')
|
||||||
|
|
||||||
|
return f'{stick.__class__.__name__} was set to ({stick.get_h()}, {stick.get_v()}).'
|
||||||
|
|
||||||
|
async def cmd_stick(self, side, direction, value=None):
|
||||||
|
"""
|
||||||
|
stick - Command to set stick positions.
|
||||||
|
:param side: 'l', 'left' for left control stick; 'r', 'right' for right control stick
|
||||||
|
:param direction: 'center', 'up', 'down', 'left', 'right';
|
||||||
|
'h', 'horizontal' or 'v', 'vertical' to set the value directly to the "value" argument
|
||||||
|
:param value: horizontal or vertical value
|
||||||
|
"""
|
||||||
|
if side in ('l', 'left'):
|
||||||
|
stick = self.controller_state.l_stick_state
|
||||||
|
return ControllerCLI._set_stick(stick, direction, value)
|
||||||
|
elif side in ('r', 'right'):
|
||||||
|
stick = self.controller_state.r_stick_state
|
||||||
|
return ControllerCLI._set_stick(stick, direction, value)
|
||||||
|
else:
|
||||||
|
raise ValueError('Value of side must be "l", "left" or "r", "right"')
|
||||||
|
|
||||||
|
def add_command(self, name, command):
|
||||||
|
if name in self.commands:
|
||||||
|
raise ValueError(f'Command {name} already registered.')
|
||||||
|
self.commands[name] = command
|
||||||
|
|
||||||
|
async def run(self):
|
||||||
|
while True:
|
||||||
|
user_input = await ainput(prompt='cmd >> ')
|
||||||
|
if not user_input:
|
||||||
|
continue
|
||||||
|
|
||||||
|
buttons_to_push = []
|
||||||
|
|
||||||
|
for command in user_input.split('&&'):
|
||||||
|
cmd, *args = command.split()
|
||||||
|
|
||||||
|
if cmd == 'exit':
|
||||||
|
return
|
||||||
|
|
||||||
|
available_buttons = self.controller_state.button_state.get_available_buttons()
|
||||||
|
|
||||||
|
if hasattr(self, f'cmd_{cmd}'):
|
||||||
|
try:
|
||||||
|
result = await getattr(self, f'cmd_{cmd}')(*args)
|
||||||
|
if result:
|
||||||
|
print(result)
|
||||||
|
except Exception as e:
|
||||||
|
print(e)
|
||||||
|
elif cmd in self.commands:
|
||||||
|
try:
|
||||||
|
result = await self.commands[cmd](self, *args)
|
||||||
|
if result:
|
||||||
|
print(result)
|
||||||
|
except Exception as e:
|
||||||
|
print(e)
|
||||||
|
elif cmd in available_buttons:
|
||||||
|
buttons_to_push.append(cmd)
|
||||||
|
else:
|
||||||
|
print('command', cmd, 'not found, call help for help.')
|
||||||
|
|
||||||
|
if buttons_to_push:
|
||||||
|
await button_push(self.controller_state, *buttons_to_push)
|
||||||
|
else:
|
||||||
|
await self.controller_state.send()
|
||||||
+81
-24
@@ -72,26 +72,40 @@ class ControllerProtocol(BaseProtocol):
|
|||||||
def error_received(self, exc: Exception) -> None:
|
def error_received(self, exc: Exception) -> None:
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
async def send_0x30_input_reports(self):
|
async def input_report_mode_0x30(self):
|
||||||
|
if self.transport.is_reading():
|
||||||
|
raise ValueError('Transport must be paused in 0x30 input report mode')
|
||||||
|
|
||||||
input_report = InputReport()
|
input_report = InputReport()
|
||||||
input_report.set_input_report_id(0x30)
|
input_report.set_input_report_id(0x30)
|
||||||
input_report.set_misc()
|
input_report.set_misc()
|
||||||
|
|
||||||
|
reader = asyncio.ensure_future(self.transport.read())
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
# TODO: set sensor data
|
|
||||||
input_report.set_6axis_data()
|
|
||||||
|
|
||||||
await self.write(input_report)
|
|
||||||
|
|
||||||
"""
|
|
||||||
if self.controller == Controller.PRO_CONTROLLER:
|
|
||||||
# send state at 120Hz if Pro Controller
|
|
||||||
await asyncio.sleep(1 / 120)
|
|
||||||
else:
|
|
||||||
# send state at 60Hz
|
# send state at 60Hz
|
||||||
await asyncio.sleep(1 / 60)
|
await asyncio.sleep(1 / 60)
|
||||||
"""
|
|
||||||
await asyncio.sleep(1 / 30)
|
reply_send = False
|
||||||
|
if reader.done():
|
||||||
|
data = await reader
|
||||||
|
reader = asyncio.ensure_future(self.transport.read())
|
||||||
|
|
||||||
|
try:
|
||||||
|
report = OutputReport(list(data))
|
||||||
|
output_report_id = report.get_output_report_id()
|
||||||
|
|
||||||
|
if output_report_id == OutputReportID.SUB_COMMAND:
|
||||||
|
reply_send = await self._reply_to_sub_command(report)
|
||||||
|
except ValueError as v_err:
|
||||||
|
logger.warning(f'Report parsing error "{v_err}" - IGNORE')
|
||||||
|
except NotImplementedError as err:
|
||||||
|
logger.warning(err)
|
||||||
|
|
||||||
|
if not reply_send:
|
||||||
|
# write 0x30 input report. TODO: set some sensor data
|
||||||
|
input_report.set_6axis_data()
|
||||||
|
await self.write(input_report)
|
||||||
|
|
||||||
async def report_received(self, data: Union[bytes, Text], addr: Tuple[str, int]) -> None:
|
async def report_received(self, data: Union[bytes, Text], addr: Tuple[str, int]) -> None:
|
||||||
self._data_received.set()
|
self._data_received.set()
|
||||||
@@ -109,12 +123,19 @@ class ControllerProtocol(BaseProtocol):
|
|||||||
return
|
return
|
||||||
|
|
||||||
if output_report_id == OutputReportID.SUB_COMMAND:
|
if output_report_id == OutputReportID.SUB_COMMAND:
|
||||||
|
await self._reply_to_sub_command(report)
|
||||||
|
#elif output_report_id == OutputReportID.RUMBLE_ONLY:
|
||||||
|
# pass
|
||||||
|
else:
|
||||||
|
logger.warning(f'Output report {output_report_id} not implemented - ignoring')
|
||||||
|
|
||||||
|
async def _reply_to_sub_command(self, report):
|
||||||
# classify sub command
|
# classify sub command
|
||||||
try:
|
try:
|
||||||
sub_command = report.get_sub_command()
|
sub_command = report.get_sub_command()
|
||||||
except NotImplementedError as err:
|
except NotImplementedError as err:
|
||||||
logger.warning(err)
|
logger.warning(err)
|
||||||
return
|
return False
|
||||||
|
|
||||||
if sub_command is None:
|
if sub_command is None:
|
||||||
raise ValueError('Received output report does not contain a sub command')
|
raise ValueError('Received output report does not contain a sub command')
|
||||||
@@ -124,6 +145,7 @@ class ControllerProtocol(BaseProtocol):
|
|||||||
sub_command_data = report.get_sub_command_data()
|
sub_command_data = report.get_sub_command_data()
|
||||||
assert sub_command_data is not None
|
assert sub_command_data is not None
|
||||||
|
|
||||||
|
try:
|
||||||
# answer to sub command
|
# answer to sub command
|
||||||
if sub_command == SubCommand.REQUEST_DEVICE_INFO:
|
if sub_command == SubCommand.REQUEST_DEVICE_INFO:
|
||||||
await self._command_request_device_info(sub_command_data)
|
await self._command_request_device_info(sub_command_data)
|
||||||
@@ -149,15 +171,18 @@ class ControllerProtocol(BaseProtocol):
|
|||||||
elif sub_command == SubCommand.SET_NFC_IR_MCU_CONFIG:
|
elif sub_command == SubCommand.SET_NFC_IR_MCU_CONFIG:
|
||||||
await self._command_set_nfc_ir_mcu_config(sub_command_data)
|
await self._command_set_nfc_ir_mcu_config(sub_command_data)
|
||||||
|
|
||||||
|
elif sub_command == SubCommand.SET_NFC_IR_MCU_STATE:
|
||||||
|
await self._command_set_nfc_ir_mcu_state(sub_command_data)
|
||||||
|
|
||||||
elif sub_command == SubCommand.SET_PLAYER_LIGHTS:
|
elif sub_command == SubCommand.SET_PLAYER_LIGHTS:
|
||||||
await self._command_set_player_lights(sub_command_data)
|
await self._command_set_player_lights(sub_command_data)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
logger.warning(f'Sub command 0x{sub_command.value:02x} not implemented - ignoring')
|
logger.warning(f'Sub command 0x{sub_command.value:02x} not implemented - ignoring')
|
||||||
#elif output_report_id == OutputReportID.RUMBLE_ONLY:
|
return False
|
||||||
# pass
|
except Exception as err:
|
||||||
else:
|
logger.error(f'Failed to answer {sub_command} - {err}')
|
||||||
logger.warning(f'Output report {output_report_id} not implemented - ignoring')
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
async def _command_request_device_info(self, sub_command_data):
|
async def _command_request_device_info(self, sub_command_data):
|
||||||
input_report = InputReport()
|
input_report = InputReport()
|
||||||
@@ -211,9 +236,6 @@ class ControllerProtocol(BaseProtocol):
|
|||||||
async def _command_set_input_report_mode(self, sub_command_data):
|
async def _command_set_input_report_mode(self, sub_command_data):
|
||||||
if sub_command_data[0] == 0x30:
|
if sub_command_data[0] == 0x30:
|
||||||
logger.info('Setting input report mode to 0x30...')
|
logger.info('Setting input report mode to 0x30...')
|
||||||
# start sending 0x30 input reports
|
|
||||||
assert self._0x30_input_report_sender is None
|
|
||||||
self._0x30_input_report_sender = asyncio.ensure_future(self.send_0x30_input_reports())
|
|
||||||
|
|
||||||
input_report = InputReport()
|
input_report = InputReport()
|
||||||
input_report.set_input_report_id(0x21)
|
input_report.set_input_report_id(0x21)
|
||||||
@@ -223,6 +245,20 @@ class ControllerProtocol(BaseProtocol):
|
|||||||
input_report.reply_to_subcommand_id(0x03)
|
input_report.reply_to_subcommand_id(0x03)
|
||||||
|
|
||||||
await self.write(input_report)
|
await self.write(input_report)
|
||||||
|
|
||||||
|
# start sending 0x30 input reports
|
||||||
|
if self._0x30_input_report_sender is None:
|
||||||
|
self.transport.pause_reading()
|
||||||
|
self._0x30_input_report_sender = asyncio.ensure_future(self.input_report_mode_0x30())
|
||||||
|
|
||||||
|
# create callback to check for exceptions
|
||||||
|
def callback(future):
|
||||||
|
try:
|
||||||
|
future.result()
|
||||||
|
except Exception as err:
|
||||||
|
logger.exception(err)
|
||||||
|
|
||||||
|
self._0x30_input_report_sender.add_done_callback(callback)
|
||||||
else:
|
else:
|
||||||
logger.error(f'input report mode {sub_command_data[0]} not implemented - ignoring request')
|
logger.error(f'input report mode {sub_command_data[0]} not implemented - ignoring request')
|
||||||
|
|
||||||
@@ -264,8 +300,29 @@ class ControllerProtocol(BaseProtocol):
|
|||||||
input_report.set_ack(0xA0)
|
input_report.set_ack(0xA0)
|
||||||
input_report.reply_to_subcommand_id(SubCommand.SET_NFC_IR_MCU_CONFIG.value)
|
input_report.reply_to_subcommand_id(SubCommand.SET_NFC_IR_MCU_CONFIG.value)
|
||||||
|
|
||||||
for i in range(16, 51):
|
# TODO
|
||||||
input_report.data[i] = 0xFF
|
data = [1, 0, 255, 0, 8, 0, 27, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 200]
|
||||||
|
for i in range(len(data)):
|
||||||
|
input_report.data[16+i] = data[i]
|
||||||
|
|
||||||
|
await self.write(input_report)
|
||||||
|
|
||||||
|
async def _command_set_nfc_ir_mcu_state(self, sub_command_data):
|
||||||
|
input_report = InputReport()
|
||||||
|
input_report.set_input_report_id(0x21)
|
||||||
|
input_report.set_misc()
|
||||||
|
|
||||||
|
if sub_command_data[0] == 0x01:
|
||||||
|
# 0x01 = Resume
|
||||||
|
input_report.set_ack(0x80)
|
||||||
|
input_report.reply_to_subcommand_id(SubCommand.SET_NFC_IR_MCU_STATE.value)
|
||||||
|
elif sub_command_data[0] == 0x00:
|
||||||
|
# 0x00 = Suspend
|
||||||
|
input_report.set_ack(0x80)
|
||||||
|
input_report.reply_to_subcommand_id(SubCommand.SET_NFC_IR_MCU_STATE.value)
|
||||||
|
else:
|
||||||
|
raise NotImplementedError(f'Argument {sub_command_data[0]} of {SubCommand.SET_NFC_IR_MCU_STATE} '
|
||||||
|
f'not implemented.')
|
||||||
|
|
||||||
await self.write(input_report)
|
await self.write(input_report)
|
||||||
|
|
||||||
|
|||||||
@@ -178,6 +178,7 @@ class SubCommand(Enum):
|
|||||||
SET_SHIPMENT_STATE = 0x08
|
SET_SHIPMENT_STATE = 0x08
|
||||||
SPI_FLASH_READ = 0x10
|
SPI_FLASH_READ = 0x10
|
||||||
SET_NFC_IR_MCU_CONFIG = 0x21
|
SET_NFC_IR_MCU_CONFIG = 0x21
|
||||||
|
SET_NFC_IR_MCU_STATE = 0x22
|
||||||
SET_PLAYER_LIGHTS = 0x30
|
SET_PLAYER_LIGHTS = 0x30
|
||||||
ENABLE_6AXIS_SENSOR = 0x40
|
ENABLE_6AXIS_SENSOR = 0x40
|
||||||
ENABLE_VIBRATION = 0x48
|
ENABLE_VIBRATION = 0x48
|
||||||
|
|||||||
+29
-5
@@ -19,10 +19,20 @@ class L2CAP_Transport(asyncio.Transport):
|
|||||||
|
|
||||||
self._extra_info = {
|
self._extra_info = {
|
||||||
'peername': self._sock.getpeername(),
|
'peername': self._sock.getpeername(),
|
||||||
'sockname': self._sock.getsockname()
|
'sockname': self._sock.getsockname(),
|
||||||
|
'socket': self._sock
|
||||||
}
|
}
|
||||||
|
|
||||||
self._read_thread = asyncio.ensure_future(self._read())
|
self._read_thread = asyncio.ensure_future(self._reader())
|
||||||
|
|
||||||
|
# create callback to check for exceptions
|
||||||
|
def callback(future):
|
||||||
|
try:
|
||||||
|
future.result()
|
||||||
|
except Exception as err:
|
||||||
|
logger.exception(err)
|
||||||
|
|
||||||
|
self._read_thread.add_done_callback(callback)
|
||||||
|
|
||||||
self._is_closing = False
|
self._is_closing = False
|
||||||
self._is_reading = asyncio.Event()
|
self._is_reading = asyncio.Event()
|
||||||
@@ -32,10 +42,16 @@ class L2CAP_Transport(asyncio.Transport):
|
|||||||
|
|
||||||
self._capture_file = capture_file
|
self._capture_file = capture_file
|
||||||
|
|
||||||
async def _read(self):
|
async def _reader(self):
|
||||||
while True:
|
while True:
|
||||||
await self._is_reading.wait()
|
await self._is_reading.wait()
|
||||||
|
|
||||||
|
data = await self.read()
|
||||||
|
|
||||||
|
#logger.debug(f'received "{list(data)}"')
|
||||||
|
await self._protocol.report_received(data, self._sock.getpeername())
|
||||||
|
|
||||||
|
async def read(self):
|
||||||
data = await self._loop.sock_recv(self._sock, self._read_buffer_size)
|
data = await self._loop.sock_recv(self._sock, self._read_buffer_size)
|
||||||
|
|
||||||
if self._capture_file is not None:
|
if self._capture_file is not None:
|
||||||
@@ -44,16 +60,24 @@ class L2CAP_Transport(asyncio.Transport):
|
|||||||
size = struct.pack('i', len(data))
|
size = struct.pack('i', len(data))
|
||||||
self._capture_file.write(_time + size + data)
|
self._capture_file.write(_time + size + data)
|
||||||
|
|
||||||
#logger.debug(f'received "{list(data)}"')
|
return data
|
||||||
await self._protocol.report_received(data, self._sock.getpeername())
|
|
||||||
|
|
||||||
def is_reading(self) -> bool:
|
def is_reading(self) -> bool:
|
||||||
|
"""
|
||||||
|
:returns True if the reader is running
|
||||||
|
"""
|
||||||
return self._is_reading.is_set()
|
return self._is_reading.is_set()
|
||||||
|
|
||||||
def pause_reading(self) -> None:
|
def pause_reading(self) -> None:
|
||||||
|
"""
|
||||||
|
Pauses the reader
|
||||||
|
"""
|
||||||
self._is_reading.clear()
|
self._is_reading.clear()
|
||||||
|
|
||||||
def resume_reading(self) -> None:
|
def resume_reading(self) -> None:
|
||||||
|
"""
|
||||||
|
Resumes the reader
|
||||||
|
"""
|
||||||
self._is_reading.set()
|
self._is_reading.set()
|
||||||
|
|
||||||
def set_read_buffer_size(self, size):
|
def set_read_buffer_size(self, size):
|
||||||
|
|||||||
+3
-116
@@ -1,133 +1,19 @@
|
|||||||
import argparse
|
import argparse
|
||||||
import asyncio
|
import asyncio
|
||||||
import inspect
|
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
from contextlib import contextmanager
|
from contextlib import contextmanager
|
||||||
|
|
||||||
from aioconsole import ainput
|
|
||||||
from joycontrol import logging_default as log
|
from joycontrol import logging_default as log
|
||||||
|
from joycontrol.command_line_interface import ControllerCLI
|
||||||
from joycontrol.controller import Controller
|
from joycontrol.controller import Controller
|
||||||
from joycontrol.controller_state import button_push, ControllerState
|
|
||||||
from joycontrol.memory import FlashMemory
|
from joycontrol.memory import FlashMemory
|
||||||
from joycontrol.protocol import controller_protocol_factory
|
from joycontrol.protocol import controller_protocol_factory
|
||||||
from joycontrol.server import create_hid_server
|
from joycontrol.server import create_hid_server
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class ControllerCLI:
|
|
||||||
def __init__(self, controller_state: ControllerState):
|
|
||||||
self.controller_state = controller_state
|
|
||||||
self.commands = {}
|
|
||||||
|
|
||||||
async def cmd_help(self):
|
|
||||||
print('Buttons can be used as commands: ', ', '.join(self.controller_state.button_state.get_available_buttons()))
|
|
||||||
|
|
||||||
for name, fun in inspect.getmembers(self):
|
|
||||||
if name.startswith('cmd_') and fun.__doc__:
|
|
||||||
print(fun.__doc__)
|
|
||||||
|
|
||||||
print('Commands can be chained using "&&"')
|
|
||||||
print('Type "exit" to close.')
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _set_stick(stick, direction, value):
|
|
||||||
if direction == 'center':
|
|
||||||
stick.set_center()
|
|
||||||
elif direction == 'up':
|
|
||||||
stick.set_up()
|
|
||||||
elif direction == 'down':
|
|
||||||
stick.set_down()
|
|
||||||
elif direction == 'left':
|
|
||||||
stick.set_left()
|
|
||||||
elif direction == 'right':
|
|
||||||
stick.set_right()
|
|
||||||
elif direction in ('h', 'horizontal'):
|
|
||||||
if value is None:
|
|
||||||
raise ValueError(f'Missing value')
|
|
||||||
try:
|
|
||||||
val = int(value)
|
|
||||||
except ValueError:
|
|
||||||
raise ValueError(f'Unexpected stick value "{value}"')
|
|
||||||
stick.set_h(val)
|
|
||||||
elif direction in ('v', 'vertical'):
|
|
||||||
if value is None:
|
|
||||||
raise ValueError(f'Missing value')
|
|
||||||
try:
|
|
||||||
val = int(value)
|
|
||||||
except ValueError:
|
|
||||||
raise ValueError(f'Unexpected stick value "{value}"')
|
|
||||||
stick.set_v(val)
|
|
||||||
else:
|
|
||||||
raise ValueError(f'Unexpected argument "{direction}"')
|
|
||||||
|
|
||||||
return f'{stick.__class__.__name__} was set to ({stick.get_h()}, {stick.get_v()}).'
|
|
||||||
|
|
||||||
async def cmd_stick(self, side, direction, value=None):
|
|
||||||
"""
|
|
||||||
stick - Command to set stick positions.
|
|
||||||
:param side: 'l', 'left' for left control stick; 'r', 'right' for right control stick
|
|
||||||
:param direction: 'center', 'up', 'down', 'left', 'right';
|
|
||||||
'h', 'horizontal' or 'v', 'vertical' to set the value directly to the "value" argument
|
|
||||||
:param value: horizontal or vertical value
|
|
||||||
"""
|
|
||||||
if side in ('l', 'left'):
|
|
||||||
stick = self.controller_state.l_stick_state
|
|
||||||
return ControllerCLI._set_stick(stick, direction, value)
|
|
||||||
elif side in ('r', 'right'):
|
|
||||||
stick = self.controller_state.r_stick_state
|
|
||||||
return ControllerCLI._set_stick(stick, direction, value)
|
|
||||||
else:
|
|
||||||
raise ValueError('Value of side must be "l", "left" or "r", "right"')
|
|
||||||
|
|
||||||
def add_command(self, name, command):
|
|
||||||
if name in self.commands:
|
|
||||||
raise ValueError(f'Command {name} already registered.')
|
|
||||||
self.commands[name] = command
|
|
||||||
|
|
||||||
async def run(self):
|
|
||||||
while True:
|
|
||||||
user_input = await ainput(prompt='cmd >> ')
|
|
||||||
if not user_input:
|
|
||||||
continue
|
|
||||||
|
|
||||||
buttons_to_push = []
|
|
||||||
|
|
||||||
for command in user_input.split('&&'):
|
|
||||||
cmd, *args = command.split()
|
|
||||||
|
|
||||||
if cmd == 'exit':
|
|
||||||
return
|
|
||||||
|
|
||||||
available_buttons = self.controller_state.button_state.get_available_buttons()
|
|
||||||
|
|
||||||
if hasattr(self, f'cmd_{cmd}'):
|
|
||||||
try:
|
|
||||||
result = await getattr(self, f'cmd_{cmd}')(*args)
|
|
||||||
if result:
|
|
||||||
print(result)
|
|
||||||
except Exception as e:
|
|
||||||
print(e)
|
|
||||||
elif cmd in self.commands:
|
|
||||||
try:
|
|
||||||
result = await self.commands[cmd](*args)
|
|
||||||
if result:
|
|
||||||
print(result)
|
|
||||||
except Exception as e:
|
|
||||||
print(e)
|
|
||||||
elif cmd in available_buttons:
|
|
||||||
buttons_to_push.append(cmd)
|
|
||||||
else:
|
|
||||||
print('command', cmd, 'not found, call help for help.')
|
|
||||||
|
|
||||||
if buttons_to_push:
|
|
||||||
await button_push(self.controller_state, *buttons_to_push)
|
|
||||||
else:
|
|
||||||
await self.controller_state.send()
|
|
||||||
|
|
||||||
|
|
||||||
async def _main(controller, capture_file=None, spi_flash=None):
|
async def _main(controller, capture_file=None, spi_flash=None):
|
||||||
factory = controller_protocol_factory(controller, spi_flash=spi_flash)
|
factory = controller_protocol_factory(controller, spi_flash=spi_flash)
|
||||||
transport, protocol = await create_hid_server(factory, 17, 19, capture_file=capture_file)
|
transport, protocol = await create_hid_server(factory, 17, 19, capture_file=capture_file)
|
||||||
@@ -147,7 +33,8 @@ if __name__ == '__main__':
|
|||||||
raise PermissionError('Script must be run as root!')
|
raise PermissionError('Script must be run as root!')
|
||||||
|
|
||||||
# setup logging
|
# setup logging
|
||||||
log.configure(console_level=logging.ERROR)
|
#log.configure(console_level=logging.ERROR)
|
||||||
|
log.configure()
|
||||||
|
|
||||||
parser = argparse.ArgumentParser()
|
parser = argparse.ArgumentParser()
|
||||||
parser.add_argument('controller', help='JOYCON_R, JOYCON_L or PRO_CONTROLLER')
|
parser.add_argument('controller', help='JOYCON_R, JOYCON_L or PRO_CONTROLLER')
|
||||||
|
|||||||
Reference in New Issue
Block a user