Merge pull request #29 from spacemeowx2/nfc-r3

feat: send amiibo report
This commit is contained in:
Robert Martin
2020-05-04 16:43:08 +02:00
committed by GitHub
9 changed files with 470 additions and 102 deletions
+50 -12
View File
@@ -1,5 +1,6 @@
import inspect import inspect
import logging import logging
import shlex
from aioconsole import ainput from aioconsole import ainput
@@ -37,15 +38,16 @@ def _print_doc(string):
print(line[prefix_i:] if line.strip() else line) print(line[prefix_i:] if line.strip() else line)
class ControllerCLI: class CLI:
def __init__(self, controller_state: ControllerState): def __init__(self):
self.controller_state = controller_state
self.commands = {} self.commands = {}
def add_command(self, name, command):
if name in self.commands:
raise ValueError(f'Command {name} already registered.')
self.commands[name] = command
async def cmd_help(self): async def cmd_help(self):
print('Button commands:')
print(', '.join(self.controller_state.button_state.get_available_buttons()))
print()
print('Commands:') print('Commands:')
for name, fun in inspect.getmembers(self): for name, fun in inspect.getmembers(self):
if name.startswith('cmd_') and fun.__doc__: if name.startswith('cmd_') and fun.__doc__:
@@ -58,6 +60,47 @@ class ControllerCLI:
print('Commands can be chained using "&&"') print('Commands can be chained using "&&"')
print('Type "exit" to close.') print('Type "exit" to close.')
async def run(self):
while True:
user_input = await ainput(prompt='cmd >> ')
if not user_input:
continue
for command in user_input.split('&&'):
cmd, *args = shlex.split(command)
if cmd == 'exit':
return
if hasattr(self, f'cmd_{cmd}'):
try:
result = await getattr(self, f'cmd_{cmd}')(*args)
if result:
print(result)
except Exception as e:
print(e)
elif cmd in self.commands:
try:
result = await self.commands[cmd](*args)
if result:
print(result)
except Exception as e:
print(e)
else:
print('command', cmd, 'not found, call help for help.')
class ControllerCLI(CLI):
def __init__(self, controller_state: ControllerState):
super().__init__()
self.controller_state = controller_state
async def cmd_help(self):
print('Button commands:')
print(', '.join(self.controller_state.button_state.get_available_buttons()))
print()
await super().cmd_help()
@staticmethod @staticmethod
def _set_stick(stick, direction, value): def _set_stick(stick, direction, value):
if direction == 'center': if direction == 'center':
@@ -108,11 +151,6 @@ class ControllerCLI:
else: else:
raise ValueError('Value of side must be "l", "left" or "r", "right"') raise ValueError('Value of side must be "l", "left" or "r", "right"')
def add_command(self, name, command):
if name in self.commands:
raise ValueError(f'Command {name} already registered.')
self.commands[name] = command
async def run(self): async def run(self):
while True: while True:
user_input = await ainput(prompt='cmd >> ') user_input = await ainput(prompt='cmd >> ')
@@ -122,7 +160,7 @@ class ControllerCLI:
buttons_to_push = [] buttons_to_push = []
for command in user_input.split('&&'): for command in user_input.split('&&'):
cmd, *args = command.split() cmd, *args = shlex.split(command)
if cmd == 'exit': if cmd == 'exit':
return return
+11
View File
@@ -18,3 +18,14 @@ class Controller(enum.Enum):
return 'Pro Controller' return 'Pro Controller'
else: else:
raise NotImplementedError() raise NotImplementedError()
@staticmethod
def from_arg(arg):
if arg == 'JOYCON_R':
return Controller.JOYCON_R
elif arg == 'JOYCON_L':
return Controller.JOYCON_L
elif arg == 'PRO_CONTROLLER':
return Controller.PRO_CONTROLLER
else:
raise ValueError(f'Unknown controller "{arg}".')
+7
View File
@@ -9,6 +9,7 @@ class ControllerState:
def __init__(self, protocol, controller: Controller, spi_flash: FlashMemory = None): def __init__(self, protocol, controller: Controller, spi_flash: FlashMemory = None):
self._protocol = protocol self._protocol = protocol
self._controller = controller self._controller = controller
self._nfc_content = None
self._spi_flash = spi_flash self._spi_flash = spi_flash
@@ -47,6 +48,12 @@ class ControllerState:
def get_flash_memory(self): def get_flash_memory(self):
return self._spi_flash return self._spi_flash
def set_nfc(self, nfc_content):
self._nfc_content = nfc_content
def get_nfc(self):
return self._nfc_content
async def send(self): async def send(self):
""" """
Invokes protocol.send_controller_state(). Returns after the controller state was send. Invokes protocol.send_controller_state(). Returns after the controller state was send.
+155
View File
@@ -0,0 +1,155 @@
import logging
from enum import Enum
from crc8 import crc8
logger = logging.getLogger(__name__)
class Action(Enum):
NON = 0
REQUEST_STATUS = 1
START_TAG_POLLING = 2
START_TAG_DISCOVERY = 3
READ_TAG = 4
READ_TAG_2 = 5
READ_FINISHED = 6
class McuState(Enum):
NOT_INITIALIZED = 0
IRC = 1
NFC = 2
STAND_BY = 3
BUSY = 4
def copyarray(dest, offset, src):
for i in range(len(src)):
dest[offset + i] = src[i]
class IrNfcMcu:
"""
TODO: cleanup
"""
def __init__(self):
self._fw_major = [0, 3]
self._fw_minor = [0, 5]
self._bytes = [0] * 313
self._action = Action.NON
self._state = McuState.NOT_INITIALIZED
self._nfc_content = None
def get_fw_major(self):
return self._fw_major
def get_fw_minor(self):
return self._fw_minor
def set_action(self, v):
self._action = v
def get_action(self):
return self._action
def set_state(self, v):
self._state = v
def get_state(self):
return self._state
def _get_state_byte(self):
if self.get_state() == McuState.NFC:
return 4
elif self.get_state() == McuState.BUSY:
return 6
elif self.get_state() == McuState.NOT_INITIALIZED:
return 1
elif self.get_state() == McuState.STAND_BY:
return 1
else:
return 0
def update_status(self):
self._bytes[0] = 1
self._bytes[1] = 0
self._bytes[2] = 0
self._bytes[3] = self._fw_major[0]
self._bytes[4] = self._fw_major[1]
self._bytes[5] = self._fw_minor[0]
self._bytes[6] = self._fw_minor[1]
self._bytes[7] = self._get_state_byte()
def update_nfc_report(self):
self._bytes = [0] * 313
if self.get_action() == Action.REQUEST_STATUS:
self.update_status()
elif self.get_action() == Action.NON:
self._bytes[0] = 0xff
elif self.get_action() == Action.START_TAG_DISCOVERY:
self._bytes[0] = 0x2a
self._bytes[1] = 0
self._bytes[2] = 5
self._bytes[3] = 0
self._bytes[4] = 0
self._bytes[5] = 9
self._bytes[6] = 0x31
self._bytes[7] = 0
elif self.get_action() == Action.START_TAG_POLLING:
self._bytes[0] = 0x2a
self._bytes[1] = 0
self._bytes[2] = 5
self._bytes[3] = 0
self._bytes[4] = 0
if self._nfc_content is not None:
data = [0x09, 0x31, 0x09, 0x00, 0x00, 0x00, 0x01, 0x01, 0x02, 0x00, 0x07]
copyarray(self._bytes, 5, data)
copyarray(self._bytes, 5 + len(data), self._nfc_content[0:3])
copyarray(self._bytes, 5 + len(data) + 3, self._nfc_content[4:8])
else:
logger.info('nfc content is none')
self._bytes[5] = 9
self._bytes[6] = 0x31
self._bytes[7] = 0
elif self.get_action() == Action.READ_TAG or self.get_action() == Action.READ_TAG_2:
self._bytes[0] = 0x3a
self._bytes[1] = 0
self._bytes[2] = 7
if self.get_action() == Action.READ_TAG:
data1 = bytes.fromhex('010001310200000001020007')
copyarray(self._bytes, 3, data1)
copyarray(self._bytes, 3 + len(data1), self._nfc_content[0:3])
copyarray(self._bytes, 3 + len(data1) + 3, self._nfc_content[4:8])
data2 = bytes.fromhex('000000007DFDF0793651ABD7466E39C191BABEB856CEEDF1CE44CC75EAFB27094D087AE803003B3C7778860000')
copyarray(self._bytes, 3 + len(data1) + 3 + 4, data2)
copyarray(self._bytes, 3 + len(data1) + 3 + 4 + len(data2), self._nfc_content[0:245])
self.set_action(Action.READ_TAG_2)
else:
data = bytes.fromhex('02000927')
copyarray(self._bytes, 3, data)
copyarray(self._bytes, 3 + len(data), self._nfc_content[245:])
self.set_action(Action.READ_FINISHED)
elif self.get_action() == Action.READ_FINISHED:
self._bytes[0] = 0x2a
self._bytes[1] = 0
self._bytes[2] = 5
self._bytes[3] = 0
self._bytes[4] = 0
data = bytes.fromhex('0931040000000101020007')
copyarray(self._bytes, 5, data)
copyarray(self._bytes, 5 + len(data), self._nfc_content[0:3])
copyarray(self._bytes, 5 + len(data) + 3, self._nfc_content[4:8])
crc = crc8()
crc.update(bytes(self._bytes[:-1]))
self._bytes[-1] = ord(crc.digest())
def set_nfc(self, nfc_content):
self._nfc_content = nfc_content
def __bytes__(self):
return bytes(self._bytes)
+143 -42
View File
@@ -1,5 +1,6 @@
import asyncio import asyncio
import logging import logging
import time
from asyncio import BaseTransport, BaseProtocol from asyncio import BaseTransport, BaseProtocol
from contextlib import suppress from contextlib import suppress
from typing import Optional, Union, Tuple, Text from typing import Optional, Union, Tuple, Text
@@ -10,6 +11,8 @@ from joycontrol.controller_state import ControllerState
from joycontrol.memory import FlashMemory from joycontrol.memory import FlashMemory
from joycontrol.report import OutputReport, SubCommand, InputReport, OutputReportID from joycontrol.report import OutputReport, SubCommand, InputReport, OutputReportID
from joycontrol.transport import NotConnectedError from joycontrol.transport import NotConnectedError
from joycontrol.ir_nfc_mcu import IrNfcMcu, McuState, Action
from crc8 import crc8
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -39,6 +42,8 @@ class ControllerProtocol(BaseProtocol):
self._controller_state = ControllerState(self, controller, spi_flash=spi_flash) self._controller_state = ControllerState(self, controller, spi_flash=spi_flash)
self._controller_state_sender = None self._controller_state_sender = None
self._mcu = IrNfcMcu()
# None = Just answer to sub commands # None = Just answer to sub commands
self._input_report_mode = None self._input_report_mode = None
@@ -51,7 +56,7 @@ class ControllerProtocol(BaseProtocol):
Raises NotConnected exception if the transport is not connected or the connection was lost. Raises NotConnected exception if the transport is not connected or the connection was lost.
""" """
# TODO: Call write directly if not in 0x30 input report mode # TODO: Call write directly if in continuously sending input report mode
if self.transport is None: if self.transport is None:
raise NotConnectedError('Transport not registered.') raise NotConnectedError('Transport not registered.')
@@ -120,30 +125,31 @@ class ControllerProtocol(BaseProtocol):
# TODO? # TODO?
raise NotImplementedError() raise NotImplementedError()
async def input_report_mode_0x30(self): async def input_report_mode_full(self):
""" """
Continuously sends 0x30 input reports containing the controller state. Continuously sends:
0x30 input reports containing the controller state OR
0x31 input reports containing the controller state and nfc data
""" """
if self.transport.is_reading(): if self.transport.is_reading():
raise ValueError('Transport must be paused in 0x30 input report mode') raise ValueError('Transport must be paused in full input report mode')
# send state at 66Hz
send_delay = 0.015
await asyncio.sleep(send_delay)
last_send_time = time.time()
input_report = InputReport() input_report = InputReport()
input_report.set_input_report_id(0x30)
input_report.set_vibrator_input() input_report.set_vibrator_input()
input_report.set_misc() input_report.set_misc()
if self._input_report_mode is None:
raise ValueError('Input report mode is not set.')
input_report.set_input_report_id(self._input_report_mode)
reader = asyncio.ensure_future(self.transport.read()) reader = asyncio.ensure_future(self.transport.read())
try: try:
while True: while True:
# TODO: improve timing
if self.controller == Controller.PRO_CONTROLLER:
# send state at 120Hz
await asyncio.sleep(1 / 120)
else:
# send state at 60Hz
await asyncio.sleep(1 / 60)
reply_send = False reply_send = False
if reader.done(): if reader.done():
data = await reader data = await reader
@@ -159,6 +165,12 @@ class ControllerProtocol(BaseProtocol):
pass pass
elif output_report_id == OutputReportID.SUB_COMMAND: elif output_report_id == OutputReportID.SUB_COMMAND:
reply_send = await self._reply_to_sub_command(report) reply_send = await self._reply_to_sub_command(report)
elif output_report_id == OutputReportID.REQUEST_IR_NFC_MCU:
# TODO: This does not reply anything
# reply_send = await self._reply_to_ir_nfc_mcu(report)
await self._reply_to_ir_nfc_mcu(report)
else:
logger.warning(f'Report unknown output report "{output_report_id}" - IGNORE')
except ValueError as v_err: except ValueError as v_err:
logger.warning(f'Report parsing error "{v_err}" - IGNORE') logger.warning(f'Report parsing error "{v_err}" - IGNORE')
except NotImplementedError as err: except NotImplementedError as err:
@@ -172,8 +184,26 @@ class ControllerProtocol(BaseProtocol):
# TODO: set some sensor data # TODO: set some sensor data
input_report.set_6axis_data() input_report.set_6axis_data()
# set nfc data
if input_report.get_input_report_id() == 0x31:
self._mcu.set_nfc(self._controller_state.get_nfc())
self._mcu.update_nfc_report()
input_report.set_ir_nfc_data(bytes(self._mcu))
await self.write(input_report) await self.write(input_report)
# calculate delay
current_time = time.time()
time_delta = time.time() - last_send_time
sleep_time = send_delay - time_delta
last_send_time = current_time
if sleep_time < 0:
# logger.warning(f'Code is running {abs(sleep_time)} s too slow!')
sleep_time = 0
await asyncio.sleep(sleep_time)
except NotConnectedError as err: except NotConnectedError as err:
# Stop 0x30 input report mode if disconnected. # Stop 0x30 input report mode if disconnected.
logger.error(err) logger.error(err)
@@ -207,6 +237,50 @@ class ControllerProtocol(BaseProtocol):
else: else:
logger.warning(f'Output report {output_report_id} not implemented - ignoring') logger.warning(f'Output report {output_report_id} not implemented - ignoring')
async def _reply_to_ir_nfc_mcu(self, report):
"""
TODO: Cleanup
We aren't replying to anything here, do we need to?
"""
sub_command = report.data[11]
sub_command_data = report.data[12:]
# logging.info(f'received output report - Request MCU sub command {sub_command}')
if self._mcu.get_action() in (Action.READ_TAG, Action.READ_TAG_2, Action.READ_FINISHED):
return
# Request mcu state
if sub_command == 0x01:
# input_report = InputReport()
# input_report.set_input_report_id(0x21)
# input_report.set_misc()
# input_report.set_ack(0xA0)
# input_report.reply_to_subcommand_id(0x21)
self._mcu.set_action(Action.REQUEST_STATUS)
# input_report.set_mcu(self._mcu)
# await self.write(input_report)
# Send Start tag discovery
elif sub_command == 0x02:
# 0: Cancel all, 4: StartWaitingReceive
if sub_command_data[0] == 0x04:
self._mcu.set_action(Action.START_TAG_DISCOVERY)
# 1: Start polling
elif sub_command_data[0] == 0x01:
self._mcu.set_action(Action.START_TAG_POLLING)
# 2: stop polling
elif sub_command_data[0] == 0x02:
self._mcu.set_action(Action.NON)
elif sub_command_data[0] == 0x06:
self._mcu.set_action(Action.READ_TAG)
else:
logging.info(f'Unknown sub_command_data arg {sub_command_data}')
else:
logging.info(f'Unknown MCU sub command {sub_command}')
async def _reply_to_sub_command(self, report): async def _reply_to_sub_command(self, report):
# classify sub command # classify sub command
try: try:
@@ -316,35 +390,43 @@ class ControllerProtocol(BaseProtocol):
await self.write(input_report) await self.write(input_report)
async def _command_set_input_report_mode(self, sub_command_data): async def _command_set_input_report_mode(self, sub_command_data):
if sub_command_data[0] == 0x30: if self._input_report_mode == sub_command_data[0]:
logger.info('Setting input report mode to 0x30...') logger.warning(f'Already in input report mode {sub_command_data[0]} - ignoring request')
input_report = InputReport() # Start input report reader
input_report.set_input_report_id(0x21) if sub_command_data[0] in (0x30, 0x31):
input_report.set_misc() new_reader = asyncio.ensure_future(self.input_report_mode_full())
input_report.set_ack(0x80)
input_report.reply_to_subcommand_id(0x03)
await self.write(input_report)
# start sending 0x30 input reports
if self._input_report_mode != 0x30:
self._input_report_mode = 0x30
self.transport.pause_reading()
new_reader = asyncio.ensure_future(self.input_report_mode_0x30())
# We need to swap the reader in the future because this function was probably called by it
async def set_reader():
await self.transport.set_reader(new_reader)
self.transport.resume_reading()
asyncio.ensure_future(set_reader()).add_done_callback(
utils.create_error_check_callback()
)
else: else:
logger.error(f'input report mode {sub_command_data[0]} not implemented - ignoring request') logger.error(f'input report mode {sub_command_data[0]} not implemented - ignoring request')
return
# Replace the currently running reader with the input report mode sender,
# which will also handle incoming requests in the future
self.transport.pause_reading()
# We need to replace the reader in the future because this function was probably called by it
async def set_reader():
await self.transport.set_reader(new_reader)
logger.info(f'Setting input report mode to {hex(sub_command_data[0])}...')
self._input_report_mode = sub_command_data[0]
self.transport.resume_reading()
asyncio.ensure_future(set_reader()).add_done_callback(
utils.create_error_check_callback()
)
# Send acknowledgement
input_report = InputReport()
input_report.set_input_report_id(0x21)
input_report.set_misc()
input_report.set_ack(0x80)
input_report.reply_to_subcommand_id(0x03)
await self.write(input_report)
async def _command_trigger_buttons_elapsed_time(self, sub_command_data): async def _command_trigger_buttons_elapsed_time(self, sub_command_data):
input_report = InputReport() input_report = InputReport()
@@ -392,10 +474,26 @@ class ControllerProtocol(BaseProtocol):
input_report.set_ack(0xA0) input_report.set_ack(0xA0)
input_report.reply_to_subcommand_id(SubCommand.SET_NFC_IR_MCU_CONFIG.value) input_report.reply_to_subcommand_id(SubCommand.SET_NFC_IR_MCU_CONFIG.value)
# TODO self._mcu.update_status()
data = [1, 0, 255, 0, 8, 0, 27, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 200] data = list(bytes(self._mcu)[0:34])
crc = crc8()
crc.update(bytes(data[:-1]))
checksum = crc.digest()
data[-1] = ord(checksum)
for i in range(len(data)): for i in range(len(data)):
input_report.data[16 + i] = data[i] input_report.data[16+i] = data[i]
# Set MCU mode cmd
if sub_command_data[1] == 0:
if sub_command_data[2] == 0:
self._mcu.set_state(McuState.STAND_BY)
elif sub_command_data[2] == 4:
self._mcu.set_state(McuState.NFC)
else:
logger.info(f"unknown mcu state {sub_command_data[2]}")
else:
logger.info(f"unknown mcu config command {sub_command_data}")
await self.write(input_report) await self.write(input_report)
@@ -408,10 +506,13 @@ class ControllerProtocol(BaseProtocol):
# 0x01 = Resume # 0x01 = Resume
input_report.set_ack(0x80) input_report.set_ack(0x80)
input_report.reply_to_subcommand_id(SubCommand.SET_NFC_IR_MCU_STATE.value) input_report.reply_to_subcommand_id(SubCommand.SET_NFC_IR_MCU_STATE.value)
self._mcu.set_action(Action.NON)
self._mcu.set_state(McuState.STAND_BY)
elif sub_command_data[0] == 0x00: elif sub_command_data[0] == 0x00:
# 0x00 = Suspend # 0x00 = Suspend
input_report.set_ack(0x80) input_report.set_ack(0x80)
input_report.reply_to_subcommand_id(SubCommand.SET_NFC_IR_MCU_STATE.value) input_report.reply_to_subcommand_id(SubCommand.SET_NFC_IR_MCU_STATE.value)
self._mcu.set_state(McuState.STAND_BY)
else: else:
raise NotImplementedError(f'Argument {sub_command_data[0]} of {SubCommand.SET_NFC_IR_MCU_STATE} ' raise NotImplementedError(f'Argument {sub_command_data[0]} of {SubCommand.SET_NFC_IR_MCU_STATE} '
f'not implemented.') f'not implemented.')
+31 -3
View File
@@ -10,8 +10,7 @@ class InputReport:
""" """
def __init__(self, data=None): def __init__(self, data=None):
if not data: if not data:
# TODO: not enough space for NFC/IR data input report self.data = [0x00] * 364
self.data = [0x00] * 51
# all input reports are prepended with 0xA1 # all input reports are prepended with 0xA1
self.data[0] = 0xA1 self.data[0] = 0xA1
else: else:
@@ -113,6 +112,14 @@ class InputReport:
for i in range(14, 50): for i in range(14, 50):
self.data[i] = 0x00 self.data[i] = 0x00
def set_ir_nfc_data(self, data):
if 50 + len(data) > len(self.data):
raise ValueError('Too much data.')
# write to data
for i in range(len(data)):
self.data[50 + i] = data[i]
def reply_to_subcommand_id(self, _id): def reply_to_subcommand_id(self, _id):
if isinstance(_id, SubCommand): if isinstance(_id, SubCommand):
self.data[15] = _id.value self.data[15] = _id.value
@@ -195,8 +202,19 @@ class InputReport:
return bytes(self.data[:51]) return bytes(self.data[:51])
elif _id == 0x30: elif _id == 0x30:
return bytes(self.data[:14]) return bytes(self.data[:14])
elif _id == 0x31:
return bytes(self.data[:363])
else: else:
return bytes(self.data) return bytes(self.data[:51])
def __str__(self):
_id = f'Input {self.get_input_report_id():x}'
_info = ''
if self.get_input_report_id() == 0x21:
_info = self.get_reply_to_subcommand_id()
_bytes = ' '.join(f'{byte:x}' for byte in bytes(self))
return f'{_id} {_info}\n{_bytes}'
class SubCommand(Enum): class SubCommand(Enum):
@@ -215,6 +233,7 @@ class SubCommand(Enum):
class OutputReportID(Enum): class OutputReportID(Enum):
SUB_COMMAND = 0x01 SUB_COMMAND = 0x01
RUMBLE_ONLY = 0x10 RUMBLE_ONLY = 0x10
REQUEST_IR_NFC_MCU = 0x11
class OutputReport: class OutputReport:
@@ -298,3 +317,12 @@ class OutputReport:
def __bytes__(self): def __bytes__(self):
return bytes(self.data) return bytes(self.data)
def __str__(self):
_id = f'Output {self.get_output_report_id()}'
_info = ''
if self.get_output_report_id() == OutputReportID.SUB_COMMAND:
_info = self.get_sub_command()
_bytes = ' '.join(f'{byte:x}' for byte in bytes(self))
return f'{_id} {_info}\n{_bytes}'
+71 -44
View File
@@ -4,7 +4,6 @@ import argparse
import asyncio import asyncio
import logging import logging
import os import os
from contextlib import contextmanager
from aioconsole import ainput from aioconsole import ainput
@@ -133,31 +132,79 @@ async def test_controller_buttons(controller_state: ControllerState):
await button_push(controller_state, 'home') await button_push(controller_state, 'home')
async def _main(controller, reconnect_bt_addr=None, capture_file=None, spi_flash=None, device_id=None): async def set_amiibo(controller_state, file_path):
factory = controller_protocol_factory(controller, spi_flash=spi_flash) """
ctl_psm, itr_psm = 17, 19 Sets nfc content of the controller state to contents of the given file.
transport, protocol = await create_hid_server(factory, reconnect_bt_addr=reconnect_bt_addr, ctl_psm=ctl_psm, :param controller_state: Emulated controller state
itr_psm=itr_psm, capture_file=capture_file, device_id=device_id) :param file_path: Path to amiibo dump file
"""
loop = asyncio.get_event_loop()
controller_state = protocol.get_controller_state() with open(file_path, 'rb') as amiibo_file:
content = await loop.run_in_executor(None, amiibo_file.read)
controller_state.set_nfc(content)
# Create command line interface and add some extra commands
cli = ControllerCLI(controller_state)
# Wrap the script so we can pass the controller state. The doc string will be printed when calling 'help' async def _main(args):
async def _run_test_controller_buttons(): # parse the spi flash
""" spi_flash = None
test_buttons - Navigates to the "Test Controller Buttons" menu and presses all buttons. if args.spi_flash:
""" with open(args.spi_flash, 'rb') as spi_flash_file:
await test_controller_buttons(controller_state) spi_flash = FlashMemory(spi_flash_file.read())
# add the script from above # Get controller name to emulate from arguments
cli.add_command('test_buttons', _run_test_controller_buttons) controller = Controller.from_arg(args.controller)
await cli.run() with utils.get_output(path=args.log, default=None) as capture_file:
factory = controller_protocol_factory(controller, spi_flash=spi_flash)
ctl_psm, itr_psm = 17, 19
transport, protocol = await create_hid_server(factory, reconnect_bt_addr=args.reconnect_bt_addr,
ctl_psm=ctl_psm,
itr_psm=itr_psm, capture_file=capture_file,
device_id=args.device_id)
logger.info('Stopping communication...') controller_state = protocol.get_controller_state()
await transport.close()
# Create command line interface and add some extra commands
cli = ControllerCLI(controller_state)
# Wrap the script so we can pass the controller state. The doc string will be printed when calling 'help'
async def _run_test_controller_buttons():
"""
test_buttons - Navigates to the "Test Controller Buttons" menu and presses all buttons.
"""
await test_controller_buttons(controller_state)
# add the script from above
cli.add_command('test_buttons', _run_test_controller_buttons)
# Create amiibo command
async def amiibo(*args):
"""
amiibo - Sets amiibo content
Usage:
amiibo <file_name> Set controller state NFC content to file
amiibo remove Remove NFC content from controller state
"""
if controller_state.get_controller() == Controller.JOYCON_L:
raise ValueError('NFC content cannot be set for JOYCON_L')
elif not args:
raise ValueError('"amiibo" command requires amiibo dump file path as argument!')
elif args[0] == 'remove':
controller_state.set_nfc(None)
print('Removed nfc content.')
else:
await set_amiibo(controller_state, args[0])
# add the script from above
cli.add_command('amiibo', amiibo)
try:
await cli.run()
finally:
logger.info('Stopping communication...')
await transport.close()
if __name__ == '__main__': if __name__ == '__main__':
@@ -178,27 +225,7 @@ if __name__ == '__main__':
help='The Switch console Bluetooth address, for reconnecting as an already paired controller') help='The Switch console Bluetooth address, for reconnecting as an already paired controller')
args = parser.parse_args() args = parser.parse_args()
if args.controller == 'JOYCON_R': loop = asyncio.get_event_loop()
controller = Controller.JOYCON_R loop.run_until_complete(
elif args.controller == 'JOYCON_L': _main(args)
controller = Controller.JOYCON_L )
elif args.controller == 'PRO_CONTROLLER':
controller = Controller.PRO_CONTROLLER
else:
raise ValueError(f'Unknown controller "{args.controller}".')
spi_flash = None
if args.spi_flash:
with open(args.spi_flash, 'rb') as spi_flash_file:
spi_flash = FlashMemory(spi_flash_file.read())
with utils.get_output(path=args.log, default=None) as capture_file:
loop = asyncio.get_event_loop()
loop.run_until_complete(
_main(controller,
reconnect_bt_addr=args.reconnect_bt_addr,
capture_file=capture_file,
spi_flash=spi_flash,
device_id=args.device_id
)
)
+1
View File
@@ -20,6 +20,7 @@ PRODUCT_ID_JL = 8198
PRODUCT_ID_JR = 8199 PRODUCT_ID_JR = 8199
PRODUCT_ID_PC = 8201 PRODUCT_ID_PC = 8201
class Relay: class Relay:
def __init__(self, capture_file=None): def __init__(self, capture_file=None):
self._capture_file = capture_file self._capture_file = capture_file
+1 -1
View File
@@ -10,7 +10,7 @@ setup(name='joycontrol',
package_data={'joycontrol': ['profile/sdp_record_hid.xml']}, package_data={'joycontrol': ['profile/sdp_record_hid.xml']},
zip_safe=False, zip_safe=False,
install_requires=[ install_requires=[
'hid', 'aioconsole', 'dbus-python' 'hid', 'aioconsole', 'dbus-python', 'crc8'
] ]
) )