forked from mirror/joycontrol
Merge branch 'master' into joycon_rumble
This commit is contained in:
@@ -7,7 +7,7 @@ Tested on Ubuntu 19.10, and with Raspberry Pi 3B+ and 4B Raspbian GNU/Linux 10 (
|
|||||||
Emulation of JOYCON_R, JOYCON_L and PRO_CONTROLLER. Able to send:
|
Emulation of JOYCON_R, JOYCON_L and PRO_CONTROLLER. Able to send:
|
||||||
- button commands
|
- button commands
|
||||||
- stick state
|
- stick state
|
||||||
- nfc data
|
- ~~nfc data~~ (removed, see [#80](https://github.com/mart1nro/joycontrol/issues/80))
|
||||||
|
|
||||||
## Installation
|
## Installation
|
||||||
- Install dependencies
|
- Install dependencies
|
||||||
@@ -24,7 +24,7 @@ Arch Linux Derivatives: Install the `hidapi` and `bluez-utils-compat`(AUR) packa
|
|||||||
```bash
|
```bash
|
||||||
sudo pip3 install .
|
sudo pip3 install .
|
||||||
```
|
```
|
||||||
- Disable the bluez "input" plugin, see [#8](https://github.com/mart1nro/joycontrol/issues/8)
|
- Consider to disable the bluez "input" plugin, see [#8](https://github.com/mart1nro/joycontrol/issues/8)
|
||||||
|
|
||||||
## Command line interface example
|
## Command line interface example
|
||||||
- Run the script
|
- Run the script
|
||||||
@@ -57,6 +57,9 @@ Call "help" to see a list of available commands.
|
|||||||
opening of the "Change Grip/Order" menu is not required.
|
opening of the "Change Grip/Order" menu is not required.
|
||||||
- ...
|
- ...
|
||||||
|
|
||||||
|
## Thanks
|
||||||
|
- Special thanks to https://github.com/dekuNukem/Nintendo_Switch_Reverse_Engineering for reverse engineering of the joycon protocol
|
||||||
|
- Thanks to the growing number of contributers and users
|
||||||
|
|
||||||
## Resources
|
## Resources
|
||||||
|
|
||||||
|
|||||||
@@ -118,7 +118,7 @@ class ButtonState:
|
|||||||
self._available_buttons = {'y', 'x', 'b', 'a', 'sr', 'sl', 'r', 'zr',
|
self._available_buttons = {'y', 'x', 'b', 'a', 'sr', 'sl', 'r', 'zr',
|
||||||
'plus', 'r_stick', 'home'}
|
'plus', 'r_stick', 'home'}
|
||||||
elif self.controller == Controller.JOYCON_L:
|
elif self.controller == Controller.JOYCON_L:
|
||||||
self._available_buttons = {'plus', 'l_stick', 'capture',
|
self._available_buttons = {'minus', 'l_stick', 'capture',
|
||||||
'down', 'up', 'right', 'left', 'sr', 'sl', 'l', 'zl'}
|
'down', 'up', 'right', 'left', 'sr', 'sl', 'l', 'zl'}
|
||||||
|
|
||||||
# byte 1
|
# byte 1
|
||||||
@@ -171,13 +171,13 @@ class ButtonState:
|
|||||||
|
|
||||||
def get_available_buttons(self):
|
def get_available_buttons(self):
|
||||||
"""
|
"""
|
||||||
:returns set of valid buttons
|
:returns: set of valid buttons
|
||||||
"""
|
"""
|
||||||
return set(self._available_buttons)
|
return set(self._available_buttons)
|
||||||
|
|
||||||
def __iter__(self):
|
def __iter__(self):
|
||||||
"""
|
"""
|
||||||
:returns iterator over the button bytes
|
:returns: iterator over the button bytes
|
||||||
"""
|
"""
|
||||||
yield self._byte_1
|
yield self._byte_1
|
||||||
yield self._byte_2
|
yield self._byte_2
|
||||||
@@ -187,7 +187,12 @@ class ButtonState:
|
|||||||
self._byte_1 = self._byte_2 = self._byte_3 = 0
|
self._byte_1 = self._byte_2 = self._byte_3 = 0
|
||||||
|
|
||||||
|
|
||||||
async def button_push(controller_state, *buttons, sec=0.1):
|
async def button_press(controller_state, *buttons):
|
||||||
|
"""
|
||||||
|
Set given buttons in the controller state to the pressed down state and wait till send.
|
||||||
|
:param controller_state:
|
||||||
|
:param buttons: Buttons to press down (see ButtonState.get_available_buttons)
|
||||||
|
"""
|
||||||
if not buttons:
|
if not buttons:
|
||||||
raise ValueError('No Buttons were given.')
|
raise ValueError('No Buttons were given.')
|
||||||
|
|
||||||
@@ -195,20 +200,43 @@ async def button_push(controller_state, *buttons, sec=0.1):
|
|||||||
|
|
||||||
for button in buttons:
|
for button in buttons:
|
||||||
# push button
|
# push button
|
||||||
button_state.set_button(button)
|
button_state.set_button(button, pushed=True)
|
||||||
|
|
||||||
# send report
|
# wait until report is send
|
||||||
await controller_state.send()
|
await controller_state.send()
|
||||||
await asyncio.sleep(sec)
|
|
||||||
|
|
||||||
|
async def button_release(controller_state, *buttons):
|
||||||
|
"""
|
||||||
|
Set given buttons in the controller state to the unpressed state and wait till send.
|
||||||
|
:param controller_state:
|
||||||
|
:param buttons: Buttons to set to unpressed (see ButtonState.get_available_buttons)
|
||||||
|
"""
|
||||||
|
if not buttons:
|
||||||
|
raise ValueError('No Buttons were given.')
|
||||||
|
|
||||||
|
button_state = controller_state.button_state
|
||||||
|
|
||||||
for button in buttons:
|
for button in buttons:
|
||||||
# release button
|
# release button
|
||||||
button_state.set_button(button, pushed=False)
|
button_state.set_button(button, pushed=False)
|
||||||
|
|
||||||
# send report
|
# wait until report is send
|
||||||
await controller_state.send()
|
await controller_state.send()
|
||||||
|
|
||||||
|
|
||||||
|
async def button_push(controller_state, *buttons, sec=0.1):
|
||||||
|
"""
|
||||||
|
Shortly push the given buttons. Wait until the controller state is send.
|
||||||
|
:param controller_state:
|
||||||
|
:param buttons: Buttons to push (see ButtonState.get_available_buttons)
|
||||||
|
:param sec: Seconds to wait before releasing the button, default: 0.1
|
||||||
|
"""
|
||||||
|
await button_press(controller_state, *buttons)
|
||||||
|
await asyncio.sleep(sec)
|
||||||
|
await button_release(controller_state, *buttons)
|
||||||
|
|
||||||
|
|
||||||
class _StickCalibration:
|
class _StickCalibration:
|
||||||
def __init__(self, h_center, v_center, h_max_above_center, v_max_above_center, h_max_below_center, v_max_below_center):
|
def __init__(self, h_center, v_center, h_max_above_center, v_max_above_center, h_max_below_center, v_max_below_center):
|
||||||
self.h_center = h_center
|
self.h_center = h_center
|
||||||
|
|||||||
@@ -1,155 +0,0 @@
|
|||||||
import logging
|
|
||||||
from enum import Enum
|
|
||||||
from crc8 import crc8
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class Action(Enum):
|
|
||||||
NON = 0
|
|
||||||
REQUEST_STATUS = 1
|
|
||||||
START_TAG_POLLING = 2
|
|
||||||
START_TAG_DISCOVERY = 3
|
|
||||||
READ_TAG = 4
|
|
||||||
READ_TAG_2 = 5
|
|
||||||
READ_FINISHED = 6
|
|
||||||
|
|
||||||
|
|
||||||
class McuState(Enum):
|
|
||||||
NOT_INITIALIZED = 0
|
|
||||||
IRC = 1
|
|
||||||
NFC = 2
|
|
||||||
STAND_BY = 3
|
|
||||||
BUSY = 4
|
|
||||||
|
|
||||||
|
|
||||||
def copyarray(dest, offset, src):
|
|
||||||
for i in range(len(src)):
|
|
||||||
dest[offset + i] = src[i]
|
|
||||||
|
|
||||||
|
|
||||||
class IrNfcMcu:
|
|
||||||
"""
|
|
||||||
TODO: cleanup
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self):
|
|
||||||
self._fw_major = [0, 3]
|
|
||||||
self._fw_minor = [0, 5]
|
|
||||||
|
|
||||||
self._bytes = [0] * 313
|
|
||||||
|
|
||||||
self._action = Action.NON
|
|
||||||
self._state = McuState.NOT_INITIALIZED
|
|
||||||
|
|
||||||
self._nfc_content = None
|
|
||||||
|
|
||||||
def get_fw_major(self):
|
|
||||||
return self._fw_major
|
|
||||||
|
|
||||||
def get_fw_minor(self):
|
|
||||||
return self._fw_minor
|
|
||||||
|
|
||||||
def set_action(self, v):
|
|
||||||
self._action = v
|
|
||||||
|
|
||||||
def get_action(self):
|
|
||||||
return self._action
|
|
||||||
|
|
||||||
def set_state(self, v):
|
|
||||||
self._state = v
|
|
||||||
|
|
||||||
def get_state(self):
|
|
||||||
return self._state
|
|
||||||
|
|
||||||
def _get_state_byte(self):
|
|
||||||
if self.get_state() == McuState.NFC:
|
|
||||||
return 4
|
|
||||||
elif self.get_state() == McuState.BUSY:
|
|
||||||
return 6
|
|
||||||
elif self.get_state() == McuState.NOT_INITIALIZED:
|
|
||||||
return 1
|
|
||||||
elif self.get_state() == McuState.STAND_BY:
|
|
||||||
return 1
|
|
||||||
else:
|
|
||||||
return 0
|
|
||||||
|
|
||||||
def update_status(self):
|
|
||||||
self._bytes[0] = 1
|
|
||||||
self._bytes[1] = 0
|
|
||||||
self._bytes[2] = 0
|
|
||||||
self._bytes[3] = self._fw_major[0]
|
|
||||||
self._bytes[4] = self._fw_major[1]
|
|
||||||
self._bytes[5] = self._fw_minor[0]
|
|
||||||
self._bytes[6] = self._fw_minor[1]
|
|
||||||
self._bytes[7] = self._get_state_byte()
|
|
||||||
|
|
||||||
def update_nfc_report(self):
|
|
||||||
self._bytes = [0] * 313
|
|
||||||
if self.get_action() == Action.REQUEST_STATUS:
|
|
||||||
self.update_status()
|
|
||||||
elif self.get_action() == Action.NON:
|
|
||||||
self._bytes[0] = 0xff
|
|
||||||
elif self.get_action() == Action.START_TAG_DISCOVERY:
|
|
||||||
self._bytes[0] = 0x2a
|
|
||||||
self._bytes[1] = 0
|
|
||||||
self._bytes[2] = 5
|
|
||||||
self._bytes[3] = 0
|
|
||||||
self._bytes[4] = 0
|
|
||||||
self._bytes[5] = 9
|
|
||||||
self._bytes[6] = 0x31
|
|
||||||
self._bytes[7] = 0
|
|
||||||
elif self.get_action() == Action.START_TAG_POLLING:
|
|
||||||
self._bytes[0] = 0x2a
|
|
||||||
self._bytes[1] = 0
|
|
||||||
self._bytes[2] = 5
|
|
||||||
self._bytes[3] = 0
|
|
||||||
self._bytes[4] = 0
|
|
||||||
if self._nfc_content is not None:
|
|
||||||
data = [0x09, 0x31, 0x09, 0x00, 0x00, 0x00, 0x01, 0x01, 0x02, 0x00, 0x07]
|
|
||||||
copyarray(self._bytes, 5, data)
|
|
||||||
copyarray(self._bytes, 5 + len(data), self._nfc_content[0:3])
|
|
||||||
copyarray(self._bytes, 5 + len(data) + 3, self._nfc_content[4:8])
|
|
||||||
else:
|
|
||||||
logger.info('nfc content is none')
|
|
||||||
self._bytes[5] = 9
|
|
||||||
self._bytes[6] = 0x31
|
|
||||||
self._bytes[7] = 0
|
|
||||||
elif self.get_action() in (Action.READ_TAG, Action.READ_TAG_2):
|
|
||||||
self._bytes[0] = 0x3a
|
|
||||||
self._bytes[1] = 0
|
|
||||||
self._bytes[2] = 7
|
|
||||||
if self.get_action() == Action.READ_TAG:
|
|
||||||
data1 = bytes.fromhex('010001310200000001020007')
|
|
||||||
copyarray(self._bytes, 3, data1)
|
|
||||||
copyarray(self._bytes, 3 + len(data1), self._nfc_content[0:3])
|
|
||||||
copyarray(self._bytes, 3 + len(data1) + 3, self._nfc_content[4:8])
|
|
||||||
data2 = bytes.fromhex('000000007DFDF0793651ABD7466E39C191BABEB856CEEDF1CE44CC75EAFB27094D087AE803003B3C7778860000')
|
|
||||||
copyarray(self._bytes, 3 + len(data1) + 3 + 4, data2)
|
|
||||||
copyarray(self._bytes, 3 + len(data1) + 3 + 4 + len(data2), self._nfc_content[0:245])
|
|
||||||
self.set_action(Action.READ_TAG_2)
|
|
||||||
else:
|
|
||||||
data = bytes.fromhex('02000927')
|
|
||||||
copyarray(self._bytes, 3, data)
|
|
||||||
copyarray(self._bytes, 3 + len(data), self._nfc_content[245:540])
|
|
||||||
self.set_action(Action.READ_FINISHED)
|
|
||||||
elif self.get_action() == Action.READ_FINISHED:
|
|
||||||
self._bytes[0] = 0x2a
|
|
||||||
self._bytes[1] = 0
|
|
||||||
self._bytes[2] = 5
|
|
||||||
self._bytes[3] = 0
|
|
||||||
self._bytes[4] = 0
|
|
||||||
data = bytes.fromhex('0931040000000101020007')
|
|
||||||
copyarray(self._bytes, 5, data)
|
|
||||||
copyarray(self._bytes, 5 + len(data), self._nfc_content[0:3])
|
|
||||||
copyarray(self._bytes, 5 + len(data) + 3, self._nfc_content[4:8])
|
|
||||||
|
|
||||||
crc = crc8()
|
|
||||||
crc.update(bytes(self._bytes[:-1]))
|
|
||||||
self._bytes[-1] = ord(crc.digest())
|
|
||||||
|
|
||||||
def set_nfc(self, nfc_content):
|
|
||||||
self._nfc_content = nfc_content
|
|
||||||
|
|
||||||
def __bytes__(self):
|
|
||||||
return bytes(self._bytes)
|
|
||||||
+8
-77
@@ -11,8 +11,6 @@ from joycontrol.controller_state import ControllerState
|
|||||||
from joycontrol.memory import FlashMemory
|
from joycontrol.memory import FlashMemory
|
||||||
from joycontrol.report import OutputReport, SubCommand, InputReport, OutputReportID
|
from joycontrol.report import OutputReport, SubCommand, InputReport, OutputReportID
|
||||||
from joycontrol.transport import NotConnectedError
|
from joycontrol.transport import NotConnectedError
|
||||||
from joycontrol.ir_nfc_mcu import IrNfcMcu, McuState, Action
|
|
||||||
from crc8 import crc8
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -42,8 +40,6 @@ class ControllerProtocol(BaseProtocol):
|
|||||||
self._controller_state = ControllerState(self, controller, spi_flash=spi_flash)
|
self._controller_state = ControllerState(self, controller, spi_flash=spi_flash)
|
||||||
self._controller_state_sender = None
|
self._controller_state_sender = None
|
||||||
|
|
||||||
self._mcu = IrNfcMcu()
|
|
||||||
|
|
||||||
# None = Just answer to sub commands
|
# None = Just answer to sub commands
|
||||||
self._input_report_mode = None
|
self._input_report_mode = None
|
||||||
|
|
||||||
@@ -166,9 +162,8 @@ class ControllerProtocol(BaseProtocol):
|
|||||||
elif output_report_id == OutputReportID.SUB_COMMAND:
|
elif output_report_id == OutputReportID.SUB_COMMAND:
|
||||||
reply_send = await self._reply_to_sub_command(report)
|
reply_send = await self._reply_to_sub_command(report)
|
||||||
elif output_report_id == OutputReportID.REQUEST_IR_NFC_MCU:
|
elif output_report_id == OutputReportID.REQUEST_IR_NFC_MCU:
|
||||||
# TODO: This does not reply anything
|
# TODO NFC
|
||||||
# reply_send = await self._reply_to_ir_nfc_mcu(report)
|
raise NotImplementedError('NFC communictation is not implemented.')
|
||||||
await self._reply_to_ir_nfc_mcu(report)
|
|
||||||
else:
|
else:
|
||||||
logger.warning(f'Report unknown output report "{output_report_id}" - IGNORE')
|
logger.warning(f'Report unknown output report "{output_report_id}" - IGNORE')
|
||||||
except ValueError as v_err:
|
except ValueError as v_err:
|
||||||
@@ -184,11 +179,9 @@ class ControllerProtocol(BaseProtocol):
|
|||||||
# TODO: set some sensor data
|
# TODO: set some sensor data
|
||||||
input_report.set_6axis_data()
|
input_report.set_6axis_data()
|
||||||
|
|
||||||
# set nfc data
|
# TODO NFC - set nfc data
|
||||||
if input_report.get_input_report_id() == 0x31:
|
if input_report.get_input_report_id() == 0x31:
|
||||||
self._mcu.set_nfc(self._controller_state.get_nfc())
|
pass
|
||||||
self._mcu.update_nfc_report()
|
|
||||||
input_report.set_ir_nfc_data(bytes(self._mcu))
|
|
||||||
|
|
||||||
await self.write(input_report)
|
await self.write(input_report)
|
||||||
|
|
||||||
@@ -237,50 +230,6 @@ class ControllerProtocol(BaseProtocol):
|
|||||||
else:
|
else:
|
||||||
logger.warning(f'Output report {output_report_id} not implemented - ignoring')
|
logger.warning(f'Output report {output_report_id} not implemented - ignoring')
|
||||||
|
|
||||||
async def _reply_to_ir_nfc_mcu(self, report):
|
|
||||||
"""
|
|
||||||
TODO: Cleanup
|
|
||||||
We aren't replying to anything here, do we need to?
|
|
||||||
"""
|
|
||||||
sub_command = report.data[11]
|
|
||||||
sub_command_data = report.data[12:]
|
|
||||||
|
|
||||||
# logging.info(f'received output report - Request MCU sub command {sub_command}')
|
|
||||||
|
|
||||||
if self._mcu.get_action() in (Action.READ_TAG, Action.READ_TAG_2, Action.READ_FINISHED):
|
|
||||||
return
|
|
||||||
|
|
||||||
# Request mcu state
|
|
||||||
if sub_command == 0x01:
|
|
||||||
# input_report = InputReport()
|
|
||||||
# input_report.set_input_report_id(0x21)
|
|
||||||
# input_report.set_misc()
|
|
||||||
|
|
||||||
# input_report.set_ack(0xA0)
|
|
||||||
# input_report.reply_to_subcommand_id(0x21)
|
|
||||||
|
|
||||||
self._mcu.set_action(Action.REQUEST_STATUS)
|
|
||||||
# input_report.set_mcu(self._mcu)
|
|
||||||
|
|
||||||
# await self.write(input_report)
|
|
||||||
# Send Start tag discovery
|
|
||||||
elif sub_command == 0x02:
|
|
||||||
# 0: Cancel all, 4: StartWaitingReceive
|
|
||||||
if sub_command_data[0] == 0x04:
|
|
||||||
self._mcu.set_action(Action.START_TAG_DISCOVERY)
|
|
||||||
# 1: Start polling
|
|
||||||
elif sub_command_data[0] == 0x01:
|
|
||||||
self._mcu.set_action(Action.START_TAG_POLLING)
|
|
||||||
# 2: stop polling
|
|
||||||
elif sub_command_data[0] == 0x02:
|
|
||||||
self._mcu.set_action(Action.NON)
|
|
||||||
elif sub_command_data[0] == 0x06:
|
|
||||||
self._mcu.set_action(Action.READ_TAG)
|
|
||||||
else:
|
|
||||||
logging.info(f'Unknown sub_command_data arg {sub_command_data}')
|
|
||||||
else:
|
|
||||||
logging.info(f'Unknown MCU sub command {sub_command}')
|
|
||||||
|
|
||||||
async def _reply_to_sub_command(self, report):
|
async def _reply_to_sub_command(self, report):
|
||||||
# classify sub command
|
# classify sub command
|
||||||
try:
|
try:
|
||||||
@@ -467,6 +416,7 @@ class ControllerProtocol(BaseProtocol):
|
|||||||
await self.write(input_report)
|
await self.write(input_report)
|
||||||
|
|
||||||
async def _command_set_nfc_ir_mcu_config(self, sub_command_data):
|
async def _command_set_nfc_ir_mcu_config(self, sub_command_data):
|
||||||
|
# TODO NFC
|
||||||
input_report = InputReport()
|
input_report = InputReport()
|
||||||
input_report.set_input_report_id(0x21)
|
input_report.set_input_report_id(0x21)
|
||||||
input_report.set_misc()
|
input_report.set_misc()
|
||||||
@@ -474,30 +424,14 @@ class ControllerProtocol(BaseProtocol):
|
|||||||
input_report.set_ack(0xA0)
|
input_report.set_ack(0xA0)
|
||||||
input_report.reply_to_subcommand_id(SubCommand.SET_NFC_IR_MCU_CONFIG.value)
|
input_report.reply_to_subcommand_id(SubCommand.SET_NFC_IR_MCU_CONFIG.value)
|
||||||
|
|
||||||
self._mcu.update_status()
|
data = [1, 0, 255, 0, 8, 0, 27, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 200]
|
||||||
data = list(bytes(self._mcu)[0:34])
|
|
||||||
crc = crc8()
|
|
||||||
crc.update(bytes(data[:-1]))
|
|
||||||
checksum = crc.digest()
|
|
||||||
data[-1] = ord(checksum)
|
|
||||||
|
|
||||||
for i in range(len(data)):
|
for i in range(len(data)):
|
||||||
input_report.data[16+i] = data[i]
|
input_report.data[16 + i] = data[i]
|
||||||
|
|
||||||
# Set MCU mode cmd
|
|
||||||
if sub_command_data[1] == 0:
|
|
||||||
if sub_command_data[2] == 0:
|
|
||||||
self._mcu.set_state(McuState.STAND_BY)
|
|
||||||
elif sub_command_data[2] == 4:
|
|
||||||
self._mcu.set_state(McuState.NFC)
|
|
||||||
else:
|
|
||||||
logger.info(f"unknown mcu state {sub_command_data[2]}")
|
|
||||||
else:
|
|
||||||
logger.info(f"unknown mcu config command {sub_command_data}")
|
|
||||||
|
|
||||||
await self.write(input_report)
|
await self.write(input_report)
|
||||||
|
|
||||||
async def _command_set_nfc_ir_mcu_state(self, sub_command_data):
|
async def _command_set_nfc_ir_mcu_state(self, sub_command_data):
|
||||||
|
# TODO NFC
|
||||||
input_report = InputReport()
|
input_report = InputReport()
|
||||||
input_report.set_input_report_id(0x21)
|
input_report.set_input_report_id(0x21)
|
||||||
input_report.set_misc()
|
input_report.set_misc()
|
||||||
@@ -506,13 +440,10 @@ class ControllerProtocol(BaseProtocol):
|
|||||||
# 0x01 = Resume
|
# 0x01 = Resume
|
||||||
input_report.set_ack(0x80)
|
input_report.set_ack(0x80)
|
||||||
input_report.reply_to_subcommand_id(SubCommand.SET_NFC_IR_MCU_STATE.value)
|
input_report.reply_to_subcommand_id(SubCommand.SET_NFC_IR_MCU_STATE.value)
|
||||||
self._mcu.set_action(Action.NON)
|
|
||||||
self._mcu.set_state(McuState.STAND_BY)
|
|
||||||
elif sub_command_data[0] == 0x00:
|
elif sub_command_data[0] == 0x00:
|
||||||
# 0x00 = Suspend
|
# 0x00 = Suspend
|
||||||
input_report.set_ack(0x80)
|
input_report.set_ack(0x80)
|
||||||
input_report.reply_to_subcommand_id(SubCommand.SET_NFC_IR_MCU_STATE.value)
|
input_report.reply_to_subcommand_id(SubCommand.SET_NFC_IR_MCU_STATE.value)
|
||||||
self._mcu.set_state(McuState.STAND_BY)
|
|
||||||
else:
|
else:
|
||||||
raise NotImplementedError(f'Argument {sub_command_data[0]} of {SubCommand.SET_NFC_IR_MCU_STATE} '
|
raise NotImplementedError(f'Argument {sub_command_data[0]} of {SubCommand.SET_NFC_IR_MCU_STATE} '
|
||||||
f'not implemented.')
|
f'not implemented.')
|
||||||
|
|||||||
@@ -76,10 +76,9 @@ async def create_hid_server(protocol_factory, ctl_psm=17, itr_psm=19, device_id=
|
|||||||
|
|
||||||
hid.powered(True)
|
hid.powered(True)
|
||||||
hid.pairable(True)
|
hid.pairable(True)
|
||||||
|
|
||||||
# setting bluetooth adapter name and class to the device we wish to emulate
|
# setting bluetooth adapter name to the device we wish to emulate
|
||||||
await hid.set_name(protocol.controller.device_name())
|
await hid.set_name(protocol.controller.device_name())
|
||||||
await hid.set_class()
|
|
||||||
|
|
||||||
logger.info('Advertising the Bluetooth SDP record...')
|
logger.info('Advertising the Bluetooth SDP record...')
|
||||||
try:
|
try:
|
||||||
@@ -88,6 +87,9 @@ async def create_hid_server(protocol_factory, ctl_psm=17, itr_psm=19, device_id=
|
|||||||
# Already registered (If multiple controllers are being emulated and this method is called consecutive times)
|
# Already registered (If multiple controllers are being emulated and this method is called consecutive times)
|
||||||
logger.debug(dbus_err)
|
logger.debug(dbus_err)
|
||||||
|
|
||||||
|
# set the device class to "Gamepad/joystick"
|
||||||
|
await hid.set_class()
|
||||||
|
|
||||||
# start advertising
|
# start advertising
|
||||||
hid.discoverable()
|
hid.discoverable()
|
||||||
|
|
||||||
|
|||||||
Regular → Executable
+117
-66
@@ -10,7 +10,7 @@ from aioconsole import ainput
|
|||||||
from joycontrol import logging_default as log, utils
|
from joycontrol import logging_default as log, utils
|
||||||
from joycontrol.command_line_interface import ControllerCLI
|
from joycontrol.command_line_interface import ControllerCLI
|
||||||
from joycontrol.controller import Controller
|
from joycontrol.controller import Controller
|
||||||
from joycontrol.controller_state import ControllerState, button_push
|
from joycontrol.controller_state import ControllerState, button_push, button_press, button_release
|
||||||
from joycontrol.memory import FlashMemory
|
from joycontrol.memory import FlashMemory
|
||||||
from joycontrol.protocol import controller_protocol_factory
|
from joycontrol.protocol import controller_protocol_factory
|
||||||
from joycontrol.server import create_hid_server
|
from joycontrol.server import create_hid_server
|
||||||
@@ -136,25 +136,21 @@ async def test_controller_buttons(controller_state: ControllerState):
|
|||||||
await button_push(controller_state, 'home')
|
await button_push(controller_state, 'home')
|
||||||
|
|
||||||
|
|
||||||
async def set_nfc(controller_state, file_path):
|
def ensure_valid_button(controller_state, *buttons):
|
||||||
"""
|
"""
|
||||||
Sets nfc content of the controller state to contents of the given file.
|
Raise ValueError if any of the given buttons os not part of the controller state.
|
||||||
:param controller_state: Emulated controller state
|
:param controller_state:
|
||||||
:param file_path: Path to nfc dump file
|
:param buttons: Any number of buttons to check (see ButtonState.get_available_buttons)
|
||||||
"""
|
"""
|
||||||
loop = asyncio.get_event_loop()
|
for button in buttons:
|
||||||
|
if button not in controller_state.button_state.get_available_buttons():
|
||||||
with open(file_path, 'rb') as nfc_file:
|
raise ValueError(f'Button {button} does not exist on {controller_state.get_controller()}')
|
||||||
content = await loop.run_in_executor(None, nfc_file.read)
|
|
||||||
controller_state.set_nfc(content)
|
|
||||||
|
|
||||||
|
|
||||||
async def mash_button(controller_state, button, interval):
|
async def mash_button(controller_state, button, interval):
|
||||||
# waits until controller is fully connected
|
# wait until controller is fully connected
|
||||||
await controller_state.connect()
|
await controller_state.connect()
|
||||||
|
ensure_valid_button(controller_state, button)
|
||||||
if button not in controller_state.button_state.get_available_buttons():
|
|
||||||
raise ValueError(f'Button {button} does not exist on {controller_state.get_controller()}')
|
|
||||||
|
|
||||||
user_input = asyncio.ensure_future(
|
user_input = asyncio.ensure_future(
|
||||||
ainput(prompt=f'Pressing the {button} button every {interval} seconds... Press <enter> to stop.')
|
ainput(prompt=f'Pressing the {button} button every {interval} seconds... Press <enter> to stop.')
|
||||||
@@ -168,6 +164,107 @@ async def mash_button(controller_state, button, interval):
|
|||||||
await user_input
|
await user_input
|
||||||
|
|
||||||
|
|
||||||
|
def _register_commands_with_controller_state(controller_state, cli):
|
||||||
|
"""
|
||||||
|
Commands registered here can use the given controller state.
|
||||||
|
The doc string of commands will be printed by the CLI when calling "help"
|
||||||
|
:param cli:
|
||||||
|
:param controller_state:
|
||||||
|
"""
|
||||||
|
async def test_buttons():
|
||||||
|
"""
|
||||||
|
test_buttons - Navigates to the "Test Controller Buttons" menu and presses all buttons.
|
||||||
|
"""
|
||||||
|
await test_controller_buttons(controller_state)
|
||||||
|
|
||||||
|
cli.add_command(test_buttons.__name__, test_buttons)
|
||||||
|
|
||||||
|
# Mash a button command
|
||||||
|
async def mash(*args):
|
||||||
|
"""
|
||||||
|
mash - Mash a specified button at a set interval
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
mash <button> <interval>
|
||||||
|
"""
|
||||||
|
if not len(args) == 2:
|
||||||
|
raise ValueError('"mash_button" command requires a button and interval as arguments!')
|
||||||
|
|
||||||
|
button, interval = args
|
||||||
|
await mash_button(controller_state, button, interval)
|
||||||
|
|
||||||
|
cli.add_command(mash.__name__, mash)
|
||||||
|
|
||||||
|
# Hold a button command
|
||||||
|
async def hold(*args):
|
||||||
|
"""
|
||||||
|
hold - Press and hold specified buttons
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
hold <button>
|
||||||
|
|
||||||
|
Example:
|
||||||
|
hold a b
|
||||||
|
"""
|
||||||
|
if not args:
|
||||||
|
raise ValueError('"hold" command requires a button!')
|
||||||
|
|
||||||
|
ensure_valid_button(controller_state, *args)
|
||||||
|
|
||||||
|
# wait until controller is fully connected
|
||||||
|
await controller_state.connect()
|
||||||
|
await button_press(controller_state, *args)
|
||||||
|
|
||||||
|
cli.add_command(hold.__name__, hold)
|
||||||
|
|
||||||
|
# Release a button command
|
||||||
|
async def release(*args):
|
||||||
|
"""
|
||||||
|
release - Release specified buttons
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
release <button>
|
||||||
|
|
||||||
|
Example:
|
||||||
|
release a b
|
||||||
|
"""
|
||||||
|
if not args:
|
||||||
|
raise ValueError('"release" command requires a button!')
|
||||||
|
|
||||||
|
ensure_valid_button(controller_state, *args)
|
||||||
|
|
||||||
|
# wait until controller is fully connected
|
||||||
|
await controller_state.connect()
|
||||||
|
await button_release(controller_state, *args)
|
||||||
|
|
||||||
|
cli.add_command(release.__name__, release)
|
||||||
|
|
||||||
|
# Create nfc command
|
||||||
|
async def nfc(*args):
|
||||||
|
"""
|
||||||
|
nfc - Sets nfc content
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
nfc <file_name> Set controller state NFC content to file
|
||||||
|
nfc remove Remove NFC content from controller state
|
||||||
|
"""
|
||||||
|
logger.error('NFC Support was removed from joycontrol - see https://github.com/mart1nro/joycontrol/issues/80')
|
||||||
|
if controller_state.get_controller() == Controller.JOYCON_L:
|
||||||
|
raise ValueError('NFC content cannot be set for JOYCON_L')
|
||||||
|
elif not args:
|
||||||
|
raise ValueError('"nfc" command requires file path to an nfc dump as argument!')
|
||||||
|
elif args[0] == 'remove':
|
||||||
|
controller_state.set_nfc(None)
|
||||||
|
print('Removed nfc content.')
|
||||||
|
else:
|
||||||
|
_loop = asyncio.get_event_loop()
|
||||||
|
with open(args[0], 'rb') as nfc_file:
|
||||||
|
content = await _loop.run_in_executor(None, nfc_file.read)
|
||||||
|
controller_state.set_nfc(content)
|
||||||
|
|
||||||
|
cli.add_command(nfc.__name__, nfc)
|
||||||
|
|
||||||
|
|
||||||
async def _main(args):
|
async def _main(args):
|
||||||
# parse the spi flash
|
# parse the spi flash
|
||||||
if args.spi_flash:
|
if args.spi_flash:
|
||||||
@@ -181,6 +278,7 @@ async def _main(args):
|
|||||||
controller = Controller.from_arg(args.controller)
|
controller = Controller.from_arg(args.controller)
|
||||||
|
|
||||||
with utils.get_output(path=args.log, default=None) as capture_file:
|
with utils.get_output(path=args.log, default=None) as capture_file:
|
||||||
|
# prepare the the emulated controller
|
||||||
factory = controller_protocol_factory(controller, spi_flash=spi_flash)
|
factory = controller_protocol_factory(controller, spi_flash=spi_flash)
|
||||||
ctl_psm, itr_psm = 17, 19
|
ctl_psm, itr_psm = 17, 19
|
||||||
transport, protocol = await create_hid_server(factory, reconnect_bt_addr=args.reconnect_bt_addr,
|
transport, protocol = await create_hid_server(factory, reconnect_bt_addr=args.reconnect_bt_addr,
|
||||||
@@ -192,61 +290,14 @@ async def _main(args):
|
|||||||
|
|
||||||
# Create command line interface and add some extra commands
|
# Create command line interface and add some extra commands
|
||||||
cli = ControllerCLI(controller_state)
|
cli = ControllerCLI(controller_state)
|
||||||
|
_register_commands_with_controller_state(controller_state, cli)
|
||||||
|
cli.add_command('amiibo', ControllerCLI.deprecated('Command was removed - use "nfc" instead!'))
|
||||||
|
|
||||||
# Wrap the script so we can pass the controller state. The doc string will be printed when calling 'help'
|
# set default nfc content supplied by argument
|
||||||
async def _run_test_controller_buttons():
|
|
||||||
"""
|
|
||||||
test_buttons - Navigates to the "Test Controller Buttons" menu and presses all buttons.
|
|
||||||
"""
|
|
||||||
await test_controller_buttons(controller_state)
|
|
||||||
|
|
||||||
# add the script from above
|
|
||||||
cli.add_command('test_buttons', _run_test_controller_buttons)
|
|
||||||
|
|
||||||
# Mash a button command
|
|
||||||
async def call_mash_button(*args):
|
|
||||||
"""
|
|
||||||
mash - Mash a specified button at a set interval
|
|
||||||
|
|
||||||
Usage:
|
|
||||||
mash <button> <interval>
|
|
||||||
"""
|
|
||||||
if not len(args) == 2:
|
|
||||||
raise ValueError('"mash_button" command requires a button and interval as arguments!')
|
|
||||||
|
|
||||||
button, interval = args
|
|
||||||
await mash_button(controller_state, button, interval)
|
|
||||||
|
|
||||||
# add the script from above
|
|
||||||
cli.add_command('mash', call_mash_button)
|
|
||||||
|
|
||||||
# Create nfc command
|
|
||||||
async def nfc(*args):
|
|
||||||
"""
|
|
||||||
nfc - Sets nfc content
|
|
||||||
|
|
||||||
Usage:
|
|
||||||
nfc <file_name> Set controller state NFC content to file
|
|
||||||
nfc remove Remove NFC content from controller state
|
|
||||||
"""
|
|
||||||
if controller_state.get_controller() == Controller.JOYCON_L:
|
|
||||||
raise ValueError('NFC content cannot be set for JOYCON_L')
|
|
||||||
elif not args:
|
|
||||||
raise ValueError('"nfc" command requires file path to an nfc dump as argument!')
|
|
||||||
elif args[0] == 'remove':
|
|
||||||
controller_state.set_nfc(None)
|
|
||||||
print('Removed nfc content.')
|
|
||||||
else:
|
|
||||||
await set_nfc(controller_state, args[0])
|
|
||||||
|
|
||||||
# add the script from above
|
|
||||||
cli.add_command('nfc', nfc)
|
|
||||||
cli.add_command('amiibo', ControllerCLI.deprecated('Command is deprecated - use "nfc" instead!'))
|
|
||||||
|
|
||||||
|
|
||||||
if args.nfc is not None:
|
if args.nfc is not None:
|
||||||
await nfc(args.nfc)
|
await cli.commands['nfc'](args.nfc)
|
||||||
|
|
||||||
|
# run the cli
|
||||||
try:
|
try:
|
||||||
await cli.run()
|
await cli.run()
|
||||||
finally:
|
finally:
|
||||||
|
|||||||
@@ -2,7 +2,7 @@
|
|||||||
from setuptools import setup, find_packages
|
from setuptools import setup, find_packages
|
||||||
|
|
||||||
setup(name='joycontrol',
|
setup(name='joycontrol',
|
||||||
version='0.14',
|
version='0.15',
|
||||||
author='Robert Martin',
|
author='Robert Martin',
|
||||||
author_email='martinro@informatik.hu-berlin.de',
|
author_email='martinro@informatik.hu-berlin.de',
|
||||||
description='Emulate Nintendo Switch Controllers over Bluetooth',
|
description='Emulate Nintendo Switch Controllers over Bluetooth',
|
||||||
@@ -10,7 +10,7 @@ setup(name='joycontrol',
|
|||||||
package_data={'joycontrol': ['profile/sdp_record_hid.xml']},
|
package_data={'joycontrol': ['profile/sdp_record_hid.xml']},
|
||||||
zip_safe=False,
|
zip_safe=False,
|
||||||
install_requires=[
|
install_requires=[
|
||||||
'hid', 'aioconsole', 'dbus-python', 'crc8'
|
'hid', 'aioconsole', 'dbus-python'
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user