forked from mirror/joycontrol
191 lines
6.4 KiB
Python
191 lines
6.4 KiB
Python
import argparse
|
|
import asyncio
|
|
import inspect
|
|
import logging
|
|
import os
|
|
from contextlib import contextmanager
|
|
|
|
from aioconsole import ainput
|
|
from joycontrol import logging_default as log
|
|
from joycontrol.controller import Controller
|
|
from joycontrol.controller_state import button_push, ControllerState
|
|
from joycontrol.memory import FlashMemory
|
|
from joycontrol.protocol import controller_protocol_factory
|
|
from joycontrol.server import create_hid_server
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ControllerCLI:
|
|
def __init__(self, controller_state: ControllerState):
|
|
self.controller_state = controller_state
|
|
self.commands = {}
|
|
|
|
async def cmd_help(self):
|
|
print('Buttons can be used as commands: ', ', '.join(self.controller_state.button_state.get_available_buttons()))
|
|
|
|
for name, fun in inspect.getmembers(self):
|
|
if name.startswith('cmd_') and fun.__doc__:
|
|
print(fun.__doc__)
|
|
|
|
print('Commands can be chained using "&&"')
|
|
print('Type "exit" to close.')
|
|
|
|
@staticmethod
|
|
def _set_stick(stick, direction, value):
|
|
if direction == 'center':
|
|
stick.set_center()
|
|
elif direction == 'up':
|
|
stick.set_up()
|
|
elif direction == 'down':
|
|
stick.set_down()
|
|
elif direction == 'left':
|
|
stick.set_left()
|
|
elif direction == 'right':
|
|
stick.set_right()
|
|
elif direction in ('h', 'horizontal'):
|
|
if value is None:
|
|
raise ValueError(f'Missing value')
|
|
try:
|
|
val = int(value)
|
|
except ValueError:
|
|
raise ValueError(f'Unexpected stick value "{value}"')
|
|
stick.set_h(val)
|
|
elif direction in ('v', 'vertical'):
|
|
if value is None:
|
|
raise ValueError(f'Missing value')
|
|
try:
|
|
val = int(value)
|
|
except ValueError:
|
|
raise ValueError(f'Unexpected stick value "{value}"')
|
|
stick.set_v(val)
|
|
else:
|
|
raise ValueError(f'Unexpected argument "{direction}"')
|
|
|
|
return f'{stick.__class__.__name__} was set to ({stick.get_h()}, {stick.get_v()}).'
|
|
|
|
async def cmd_stick(self, side, direction, value=None):
|
|
"""
|
|
stick - Command to set stick positions.
|
|
:param side: 'l', 'left' for left control stick; 'r', 'right' for right control stick
|
|
:param direction: 'center', 'up', 'down', 'left', 'right';
|
|
'h', 'horizontal' or 'v', 'vertical' to set the value directly to the "value" argument
|
|
:param value: horizontal or vertical value
|
|
"""
|
|
if side in ('l', 'left'):
|
|
stick = self.controller_state.l_stick_state
|
|
return ControllerCLI._set_stick(stick, direction, value)
|
|
elif side in ('r', 'right'):
|
|
stick = self.controller_state.r_stick_state
|
|
return ControllerCLI._set_stick(stick, direction, value)
|
|
else:
|
|
raise ValueError('Value of side must be "l", "left" or "r", "right"')
|
|
|
|
def add_command(self, name, command):
|
|
if name in self.commands:
|
|
raise ValueError(f'Command {name} already registered.')
|
|
self.commands[name] = command
|
|
|
|
async def run(self):
|
|
while True:
|
|
user_input = await ainput(prompt='cmd >> ')
|
|
if not user_input:
|
|
continue
|
|
|
|
buttons_to_push = []
|
|
|
|
for command in user_input.split('&&'):
|
|
cmd, *args = command.split()
|
|
|
|
if cmd == 'exit':
|
|
return
|
|
|
|
available_buttons = self.controller_state.button_state.get_available_buttons()
|
|
|
|
if hasattr(self, f'cmd_{cmd}'):
|
|
try:
|
|
result = await getattr(self, f'cmd_{cmd}')(*args)
|
|
if result:
|
|
print(result)
|
|
except Exception as e:
|
|
print(e)
|
|
elif cmd in self.commands:
|
|
try:
|
|
result = await self.commands[cmd](*args)
|
|
if result:
|
|
print(result)
|
|
except Exception as e:
|
|
print(e)
|
|
elif cmd in available_buttons:
|
|
buttons_to_push.append(cmd)
|
|
else:
|
|
print('command', cmd, 'not found, call help for help.')
|
|
|
|
if buttons_to_push:
|
|
await button_push(self.controller_state, *buttons_to_push)
|
|
else:
|
|
await self.controller_state.send()
|
|
|
|
|
|
async def _main(controller, capture_file=None, spi_flash=None):
|
|
factory = controller_protocol_factory(controller, spi_flash=spi_flash)
|
|
transport, protocol = await create_hid_server(factory, 17, 19, capture_file=capture_file)
|
|
|
|
controller_state = protocol.get_controller_state()
|
|
|
|
cli = ControllerCLI(controller_state)
|
|
await cli.run()
|
|
|
|
logger.info('Stopping communication...')
|
|
await transport.close()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
# check if root
|
|
if not os.geteuid() == 0:
|
|
raise PermissionError('Script must be run as root!')
|
|
|
|
# setup logging
|
|
log.configure(console_level=logging.ERROR)
|
|
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument('controller', help='JOYCON_R, JOYCON_L or PRO_CONTROLLER')
|
|
parser.add_argument('-l', '--log')
|
|
parser.add_argument('--spi_flash')
|
|
args = parser.parse_args()
|
|
|
|
if args.controller == 'JOYCON_R':
|
|
controller = Controller.JOYCON_R
|
|
elif args.controller == 'JOYCON_L':
|
|
controller = Controller.JOYCON_L
|
|
elif args.controller == 'PRO_CONTROLLER':
|
|
controller = Controller.PRO_CONTROLLER
|
|
else:
|
|
raise ValueError(f'Unknown controller "{args.controller}".')
|
|
|
|
spi_flash = None
|
|
if args.spi_flash:
|
|
with open(args.spi_flash, 'rb') as spi_flash_file:
|
|
spi_flash = FlashMemory(spi_flash_file.read())
|
|
|
|
# creates file if arg is given
|
|
@contextmanager
|
|
def get_output(path=None):
|
|
"""
|
|
Opens file if path is given
|
|
"""
|
|
if path is not None:
|
|
file = open(path, 'wb')
|
|
yield file
|
|
file.close()
|
|
else:
|
|
yield None
|
|
|
|
with get_output(args.log) as capture_file:
|
|
loop = asyncio.get_event_loop()
|
|
loop.run_until_complete(_main(controller, capture_file=capture_file, spi_flash=spi_flash))
|
|
|
|
|
|
|