feat: send amiibo report

This commit is contained in:
spacemeowx2
2020-04-23 23:07:21 +08:00
parent e90499393b
commit be8dce71a0
6 changed files with 387 additions and 37 deletions
+5
View File
@@ -9,6 +9,7 @@ class ControllerState:
def __init__(self, protocol, controller: Controller, spi_flash: FlashMemory = None): def __init__(self, protocol, controller: Controller, spi_flash: FlashMemory = None):
self._protocol = protocol self._protocol = protocol
self._controller = controller self._controller = controller
self._nfc_content = None
self._spi_flash = spi_flash self._spi_flash = spi_flash
@@ -198,6 +199,10 @@ async def button_push(controller_state, *buttons, sec=0.1):
await controller_state.send() await controller_state.send()
async def set_nfc(controller_state, nfc_content):
controller_state._nfc_content = nfc_content
class _StickCalibration: class _StickCalibration:
def __init__(self, h_center, v_center, h_max_above_center, v_max_above_center, h_max_below_center, v_max_below_center): def __init__(self, h_center, v_center, h_max_above_center, v_max_above_center, h_max_below_center, v_max_below_center):
self.h_center = h_center self.h_center = h_center
+154
View File
@@ -0,0 +1,154 @@
from enum import Enum
from crc8 import crc8
class Action(Enum):
NON = 0
REQUEST_STATUS = 1
START_TAG_POLLING = 2
START_TAG_DISCOVERY = 3
READ_TAG = 4
READ_TAG_2 = 5
READ_FINISHED = 6
class McuState(Enum):
NOT_INITIALIZED = 0
IRC = 1
NFC = 2
STAND_BY = 3
BUSY = 4
def copyarray(dest, offset, src):
for i in range(len(src)):
dest[offset + i] = src[i]
class Mcu:
def __init__(self):
self._fw_major = [0, 3]
self._fw_minor = [0, 5]
self._bytes = [0] * 313
self._action = Action.NON
self._state = McuState.NOT_INITIALIZED
self._nfc_content = None
def get_fw_major(self):
return self._fw_major
def get_fw_minor(self):
return self._fw_minor
def set_action(self, v):
self._action = v
def get_action(self):
return self._action
def set_state(self, v):
self._state = v
def get_state(self):
return self._state
def _get_state_byte(self):
if self.get_state() == McuState.NFC:
return 4
elif self.get_state() == McuState.BUSY:
return 6
elif self.get_state() == McuState.NOT_INITIALIZED:
return 1
elif self.get_state() == McuState.STAND_BY:
return 1
else:
return 0
def update_status(self):
self._bytes[0] = 1
self._bytes[1] = 0
self._bytes[2] = 0
self._bytes[3] = self._fw_major[0]
self._bytes[4] = self._fw_major[1]
self._bytes[5] = self._fw_minor[0]
self._bytes[6] = self._fw_minor[1]
self._bytes[7] = self._get_state_byte()
def update_nfc_report(self):
self._bytes = [0] * 313
if self.get_action() == Action.REQUEST_STATUS:
self._bytes[0] = 1
self._bytes[1] = 0
self._bytes[2] = 0
self._bytes[3] = self._fw_major[0]
self._bytes[4] = self._fw_major[1]
self._bytes[5] = self._fw_minor[0]
self._bytes[6] = self._fw_minor[1]
self._bytes[7] = self._get_state_byte()
elif self.get_action() == Action.NON:
self._bytes[0] = 0xff
elif self.get_action() == Action.START_TAG_DISCOVERY:
self._bytes[0] = 0x2a
self._bytes[1] = 0
self._bytes[2] = 5
self._bytes[3] = 0
self._bytes[4] = 0
self._bytes[5] = 9
self._bytes[6] = 0x31
self._bytes[7] = 0
elif self.get_action() == Action.START_TAG_POLLING:
self._bytes[0] = 0x2a
self._bytes[1] = 0
self._bytes[2] = 5
self._bytes[3] = 0
self._bytes[4] = 0
if not self._nfc_content is None:
data = [0x09, 0x31, 0x09, 0x00, 0x00, 0x00, 0x01, 0x01, 0x02, 0x00, 0x07]
copyarray(self._bytes, 5, data)
copyarray(self._bytes, 5 + len(data), self._nfc_content[0:3])
copyarray(self._bytes, 5 + len(data) + 3, self._nfc_content[4:8])
else:
print('nfc content is none')
self._bytes[5] = 9
self._bytes[6] = 0x31
self._bytes[7] = 0
elif self.get_action() == Action.READ_TAG or self.get_action() == Action.READ_TAG_2:
self._bytes[0] = 0x3a
self._bytes[1] = 0
self._bytes[2] = 7
if self.get_action() == Action.READ_TAG:
data1 = bytes.fromhex('010001310200000001020007')
copyarray(self._bytes, 3, data1)
copyarray(self._bytes, 3 + len(data1), self._nfc_content[0:3])
copyarray(self._bytes, 3 + len(data1) + 3, self._nfc_content[4:8])
data2 = bytes.fromhex('000000007DFDF0793651ABD7466E39C191BABEB856CEEDF1CE44CC75EAFB27094D087AE803003B3C7778860000')
copyarray(self._bytes, 3 + len(data1) + 3 + 4, data2)
copyarray(self._bytes, 3 + len(data1) + 3 + 4 + len(data2), self._nfc_content[0:245])
self.set_action(Action.READ_TAG_2)
else:
data = bytes.fromhex('02000927')
copyarray(self._bytes, 3, data)
copyarray(self._bytes, 3 + len(data), self._nfc_content[245:])
self.set_action(Action.READ_FINISHED)
elif self.get_action() == Action.READ_FINISHED:
self._bytes[0] = 0x2a
self._bytes[1] = 0
self._bytes[2] = 5
self._bytes[3] = 0
self._bytes[4] = 0
data = bytes.fromhex('0931040000000101020007')
copyarray(self._bytes, 5, data)
copyarray(self._bytes, 5 + len(data), self._nfc_content[0:3])
copyarray(self._bytes, 5 + len(data) + 3, self._nfc_content[4:8])
crc = crc8()
crc.update(bytes(self._bytes[:-1]))
self._bytes[-1] = ord(crc.digest())
def set_nfc(self, nfc_content):
self._nfc_content = nfc_content
def __bytes__(self):
return bytes(self._bytes)
+92 -13
View File
@@ -10,6 +10,8 @@ from joycontrol.controller_state import ControllerState
from joycontrol.memory import FlashMemory from joycontrol.memory import FlashMemory
from joycontrol.report import OutputReport, SubCommand, InputReport, OutputReportID from joycontrol.report import OutputReport, SubCommand, InputReport, OutputReportID
from joycontrol.transport import NotConnectedError from joycontrol.transport import NotConnectedError
from joycontrol.mcu import Mcu, McuState, Action
from crc8 import crc8
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -39,6 +41,8 @@ class ControllerProtocol(BaseProtocol):
self._controller_state = ControllerState(self, controller, spi_flash=spi_flash) self._controller_state = ControllerState(self, controller, spi_flash=spi_flash)
self._controller_state_sender = None self._controller_state_sender = None
self._mcu = Mcu()
# None = Just answer to sub commands # None = Just answer to sub commands
self._input_report_mode = None self._input_report_mode = None
@@ -89,6 +93,11 @@ class ControllerProtocol(BaseProtocol):
input_report.set_timer(self._input_report_timer) input_report.set_timer(self._input_report_timer)
self._input_report_timer = (self._input_report_timer + 1) % 0x100 self._input_report_timer = (self._input_report_timer + 1) % 0x100
if input_report.get_input_report_id() == 0x31:
self._mcu.set_nfc(self._controller_state._nfc_content)
self._mcu.update_nfc_report()
input_report.set_mcu(self._mcu)
await self.transport.write(input_report) await self.transport.write(input_report)
self._controller_state.sig_is_send.set() self._controller_state.sig_is_send.set()
@@ -120,15 +129,14 @@ class ControllerProtocol(BaseProtocol):
# TODO? # TODO?
raise NotImplementedError() raise NotImplementedError()
async def input_report_mode_0x30(self): async def input_report_mode_full(self):
""" """
Continuously sends 0x30 input reports containing the controller state. Continuously sends full input reports containing the controller state.
""" """
if self.transport.is_reading(): if self.transport.is_reading():
raise ValueError('Transport must be paused in 0x30 input report mode') raise ValueError('Transport must be paused in full input report mode')
input_report = InputReport() input_report = InputReport()
input_report.set_input_report_id(0x30)
input_report.set_vibrator_input() input_report.set_vibrator_input()
input_report.set_misc() input_report.set_misc()
@@ -136,6 +144,7 @@ class ControllerProtocol(BaseProtocol):
try: try:
while True: while True:
input_report.set_input_report_id(self._input_report_mode)
# TODO: improve timing # TODO: improve timing
if self.controller == Controller.PRO_CONTROLLER: if self.controller == Controller.PRO_CONTROLLER:
# send state at 120Hz # send state at 120Hz
@@ -159,6 +168,10 @@ class ControllerProtocol(BaseProtocol):
pass pass
elif output_report_id == OutputReportID.SUB_COMMAND: elif output_report_id == OutputReportID.SUB_COMMAND:
reply_send = await self._reply_to_sub_command(report) reply_send = await self._reply_to_sub_command(report)
elif output_report_id == OutputReportID.REQUEST_MCU:
reply_send = await self._reply_to_mcu(report)
else:
logger.warning(f'Report unknown output report "{output_report_id}" - IGNORE')
except ValueError as v_err: except ValueError as v_err:
logger.warning(f'Report parsing error "{v_err}" - IGNORE') logger.warning(f'Report parsing error "{v_err}" - IGNORE')
except NotImplementedError as err: except NotImplementedError as err:
@@ -207,6 +220,47 @@ class ControllerProtocol(BaseProtocol):
else: else:
logger.warning(f'Output report {output_report_id} not implemented - ignoring') logger.warning(f'Output report {output_report_id} not implemented - ignoring')
async def _reply_to_mcu(self, report):
sub_command = report.data[11]
sub_command_data = report.data[12:]
# logging.info(f'received output report - Request MCU sub command {sub_command}')
if self._mcu.get_action() == Action.READ_TAG or self._mcu.get_action() == Action.READ_TAG_2 or self._mcu.get_action() == Action.READ_FINISHED:
return
# Request mcu state
if sub_command == 0x01:
# input_report = InputReport()
# input_report.set_input_report_id(0x21)
# input_report.set_misc()
# input_report.set_ack(0xA0)
# input_report.reply_to_subcommand_id(0x21)
self._mcu.set_action(Action.REQUEST_STATUS)
# input_report.set_mcu(self._mcu)
# await self.write(input_report)
# Send Start tag discovery
elif sub_command == 0x02:
# 0: Cancel all, 4: StartWaitingReceive
if sub_command_data[0] == 0x04:
self._mcu.set_action(Action.START_TAG_DISCOVERY)
# 1: Start polling
elif sub_command_data[0] == 0x01:
self._mcu.set_action(Action.START_TAG_POLLING)
# 2: stop polling
elif sub_command_data[0] == 0x02:
self._mcu.set_action(Action.NON)
elif sub_command_data[0] == 0x06:
self._mcu.set_action(Action.READ_TAG)
else:
logging.info(f'Unknown sub_command_data arg {sub_command_data}')
else:
logging.info(f'Unknown MCU sub command {sub_command}')
async def _reply_to_sub_command(self, report): async def _reply_to_sub_command(self, report):
# classify sub command # classify sub command
try: try:
@@ -317,7 +371,14 @@ class ControllerProtocol(BaseProtocol):
async def _command_set_input_report_mode(self, sub_command_data): async def _command_set_input_report_mode(self, sub_command_data):
if sub_command_data[0] == 0x30: if sub_command_data[0] == 0x30:
logger.info('Setting input report mode to 0x30...') pass
elif sub_command_data[0] == 0x31:
pass
else:
logger.error(f'input report mode {sub_command_data[0]} not implemented - ignoring request')
return
logger.info(f'Setting input report mode to {hex(sub_command_data[0])}...')
input_report = InputReport() input_report = InputReport()
input_report.set_input_report_id(0x21) input_report.set_input_report_id(0x21)
@@ -328,12 +389,11 @@ class ControllerProtocol(BaseProtocol):
await self.write(input_report) await self.write(input_report)
# start sending 0x30 input reports # start sending input reports
if self._input_report_mode != 0x30: if self._input_report_mode is None:
self._input_report_mode = 0x30
self.transport.pause_reading() self.transport.pause_reading()
new_reader = asyncio.ensure_future(self.input_report_mode_0x30()) new_reader = asyncio.ensure_future(self.input_report_mode_full())
# We need to swap the reader in the future because this function was probably called by it # We need to swap the reader in the future because this function was probably called by it
async def set_reader(): async def set_reader():
@@ -343,8 +403,8 @@ class ControllerProtocol(BaseProtocol):
asyncio.ensure_future(set_reader()).add_done_callback( asyncio.ensure_future(set_reader()).add_done_callback(
utils.create_error_check_callback() utils.create_error_check_callback()
) )
else:
logger.error(f'input report mode {sub_command_data[0]} not implemented - ignoring request') self._input_report_mode = sub_command_data[0]
async def _command_trigger_buttons_elapsed_time(self, sub_command_data): async def _command_trigger_buttons_elapsed_time(self, sub_command_data):
input_report = InputReport() input_report = InputReport()
@@ -392,11 +452,27 @@ class ControllerProtocol(BaseProtocol):
input_report.set_ack(0xA0) input_report.set_ack(0xA0)
input_report.reply_to_subcommand_id(SubCommand.SET_NFC_IR_MCU_CONFIG.value) input_report.reply_to_subcommand_id(SubCommand.SET_NFC_IR_MCU_CONFIG.value)
# TODO self._mcu.update_status()
data = [1, 0, 255, 0, 8, 0, 27, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 200] data = list(bytes(self._mcu)[0:34])
crc = crc8()
crc.update(bytes(data[:-1]))
checksum = crc.digest()
data[-1] = ord(checksum)
for i in range(len(data)): for i in range(len(data)):
input_report.data[16+i] = data[i] input_report.data[16+i] = data[i]
# Set MCU mode cmd
if sub_command_data[1] == 0:
if sub_command_data[2] == 0:
self._mcu.set_state(McuState.STAND_BY)
elif sub_command_data[2] == 4:
self._mcu.set_state(McuState.NFC)
else:
logger.info(f"unknown mcu state {sub_command_data[2]}")
else:
logger.info(f"unknown mcu config command {sub_command_data}")
await self.write(input_report) await self.write(input_report)
async def _command_set_nfc_ir_mcu_state(self, sub_command_data): async def _command_set_nfc_ir_mcu_state(self, sub_command_data):
@@ -408,10 +484,13 @@ class ControllerProtocol(BaseProtocol):
# 0x01 = Resume # 0x01 = Resume
input_report.set_ack(0x80) input_report.set_ack(0x80)
input_report.reply_to_subcommand_id(SubCommand.SET_NFC_IR_MCU_STATE.value) input_report.reply_to_subcommand_id(SubCommand.SET_NFC_IR_MCU_STATE.value)
self._mcu.set_action(Action.NON)
self._mcu.set_state(McuState.STAND_BY)
elif sub_command_data[0] == 0x00: elif sub_command_data[0] == 0x00:
# 0x00 = Suspend # 0x00 = Suspend
input_report.set_ack(0x80) input_report.set_ack(0x80)
input_report.reply_to_subcommand_id(SubCommand.SET_NFC_IR_MCU_STATE.value) input_report.reply_to_subcommand_id(SubCommand.SET_NFC_IR_MCU_STATE.value)
self._mcu.set_state(McuState.STAND_BY)
else: else:
raise NotImplementedError(f'Argument {sub_command_data[0]} of {SubCommand.SET_NFC_IR_MCU_STATE} ' raise NotImplementedError(f'Argument {sub_command_data[0]} of {SubCommand.SET_NFC_IR_MCU_STATE} '
f'not implemented.') f'not implemented.')
+11 -3
View File
@@ -10,8 +10,7 @@ class InputReport:
""" """
def __init__(self, data=None): def __init__(self, data=None):
if not data: if not data:
# TODO: not enough space for NFC/IR data input report self.data = [0x00] * 364
self.data = [0x00] * 51
# all input reports are prepended with 0xA1 # all input reports are prepended with 0xA1
self.data[0] = 0xA1 self.data[0] = 0xA1
else: else:
@@ -113,6 +112,12 @@ class InputReport:
for i in range(14, 50): for i in range(14, 50):
self.data[i] = 0x00 self.data[i] = 0x00
def set_mcu(self, data):
# write to data
data = bytes(data)
for i in range(len(data)):
self.data[50 + i] = data[i]
def reply_to_subcommand_id(self, _id): def reply_to_subcommand_id(self, _id):
if isinstance(_id, SubCommand): if isinstance(_id, SubCommand):
self.data[15] = _id.value self.data[15] = _id.value
@@ -195,8 +200,10 @@ class InputReport:
return bytes(self.data[:51]) return bytes(self.data[:51])
elif _id == 0x30: elif _id == 0x30:
return bytes(self.data[:14]) return bytes(self.data[:14])
elif _id == 0x31:
return bytes(self.data[:363])
else: else:
return bytes(self.data) return bytes(self.data[:51])
class SubCommand(Enum): class SubCommand(Enum):
@@ -215,6 +222,7 @@ class SubCommand(Enum):
class OutputReportID(Enum): class OutputReportID(Enum):
SUB_COMMAND = 0x01 SUB_COMMAND = 0x01
RUMBLE_ONLY = 0x10 RUMBLE_ONLY = 0x10
REQUEST_MCU = 0x11
class OutputReport: class OutputReport:
+104
View File
@@ -0,0 +1,104 @@
import argparse
import asyncio
import logging
import os
from contextlib import contextmanager
from joycontrol import logging_default as log
from joycontrol.command_line_interface import ControllerCLI
from joycontrol.controller_state import ControllerState, button_push, set_nfc
from joycontrol.protocol import controller_protocol_factory, Controller
from joycontrol.server import create_hid_server
logger = logging.getLogger(__name__)
async def _main(controller, reconnect_bt_addr=None, capture_file=None, spi_flash=None, device_id=None, amiibo=None):
factory = controller_protocol_factory(controller, spi_flash=spi_flash)
ctl_psm, itr_psm = 17, 19
transport, protocol = await create_hid_server(factory,
reconnect_bt_addr=reconnect_bt_addr,
ctl_psm=ctl_psm, itr_psm=itr_psm, capture_file=capture_file, device_id=device_id)
controller_state = protocol.get_controller_state()
if amiibo:
await set_nfc(controller_state, amiibo.read())
await controller_state.connect()
async def amiibo(filename):
with open(filename, "rb") as amiibo_file:
content = amiibo_file.read()
await set_nfc(controller_state, content)
async def remove_amiibo():
await controller_state.set_nfc(None)
cli = ControllerCLI(controller_state)
cli.add_command('amiibo', amiibo)
cli.add_command('remove_amiibo', remove_amiibo)
await cli.run()
logger.info('Stopping communication...')
await transport.close()
if __name__ == '__main__':
# check if root
if not os.geteuid() == 0:
raise PermissionError('Script must be run as root!')
# setup logging
log.configure()
parser = argparse.ArgumentParser()
#parser.add_argument('controller', help='JOYCON_R, JOYCON_L or PRO_CONTROLLER')
parser.add_argument('-l', '--log')
parser.add_argument('-d', '--device_id')
parser.add_argument('--spi_flash')
parser.add_argument('-r', '--reconnect_bt_addr', type=str, default=None,
help='The Switch console bluetooth address, for reconnecting as an already paired controller')
parser.add_argument('-a', '--amiibo', type=argparse.FileType('rb'), default=None,
help='The amiibo dump file')
args = parser.parse_args()
"""
if args.controller == 'JOYCON_R':
controller = Controller.JOYCON_R
elif args.controller == 'JOYCON_L':
controller = Controller.JOYCON_L
elif args.controller == 'PRO_CONTROLLER':
controller = Controller.PRO_CONTROLLER
else:
raise ValueError(f'Unknown controller "{args.controller}".')
"""
controller = Controller.PRO_CONTROLLER
spi_flash = None
if args.spi_flash:
with open(args.spi_flash, 'rb') as spi_flash_file:
spi_flash = spi_flash_file.read()
# creates file if arg is given
@contextmanager
def get_output(path=None):
"""
Opens file if path is given
"""
if path is not None:
file = open(path, 'wb')
yield file
file.close()
else:
yield None
with get_output(args.log) as capture_file:
loop = asyncio.get_event_loop()
loop.run_until_complete(_main(
controller,
reconnect_bt_addr=args.reconnect_bt_addr,
capture_file=capture_file,
spi_flash=spi_flash,
device_id=args.device_id,
amiibo=args.amiibo
))
+1 -1
View File
@@ -10,7 +10,7 @@ setup(name='joycontrol',
package_data={'joycontrol': ['profile/sdp_record_hid.xml']}, package_data={'joycontrol': ['profile/sdp_record_hid.xml']},
zip_safe=False, zip_safe=False,
install_requires=[ install_requires=[
'hid', 'aioconsole', 'dbus-python' 'hid', 'aioconsole', 'dbus-python', 'crc8'
] ]
) )