Merge branch 'joycon_rumble' into rumble_experiments

This commit is contained in:
Robert Martin
2021-03-06 21:29:10 +01:00
12 changed files with 251 additions and 369 deletions
+5 -2
View File
@@ -7,7 +7,7 @@ Tested on Ubuntu 19.10, and with Raspberry Pi 3B+ and 4B Raspbian GNU/Linux 10 (
Emulation of JOYCON_R, JOYCON_L and PRO_CONTROLLER. Able to send: Emulation of JOYCON_R, JOYCON_L and PRO_CONTROLLER. Able to send:
- button commands - button commands
- stick state - stick state
- nfc data - ~~nfc data~~ (removed, see [#80](https://github.com/mart1nro/joycontrol/issues/80))
## Installation ## Installation
- Install dependencies - Install dependencies
@@ -24,7 +24,7 @@ Arch Linux Derivatives: Install the `hidapi` and `bluez-utils-compat`(AUR) packa
```bash ```bash
sudo pip3 install . sudo pip3 install .
``` ```
- Disable the bluez "input" plugin, see [#8](https://github.com/mart1nro/joycontrol/issues/8) - Consider to disable the bluez "input" plugin, see [#8](https://github.com/mart1nro/joycontrol/issues/8)
## Command line interface example ## Command line interface example
- Run the script - Run the script
@@ -57,6 +57,9 @@ Call "help" to see a list of available commands.
opening of the "Change Grip/Order" menu is not required. opening of the "Change Grip/Order" menu is not required.
- ... - ...
## Thanks
- Special thanks to https://github.com/dekuNukem/Nintendo_Switch_Reverse_Engineering for reverse engineering of the joycon protocol
- Thanks to the growing number of contributers and users
## Resources ## Resources
+36 -8
View File
@@ -118,7 +118,7 @@ class ButtonState:
self._available_buttons = {'y', 'x', 'b', 'a', 'sr', 'sl', 'r', 'zr', self._available_buttons = {'y', 'x', 'b', 'a', 'sr', 'sl', 'r', 'zr',
'plus', 'r_stick', 'home'} 'plus', 'r_stick', 'home'}
elif self.controller == Controller.JOYCON_L: elif self.controller == Controller.JOYCON_L:
self._available_buttons = {'plus', 'l_stick', 'capture', self._available_buttons = {'minus', 'l_stick', 'capture',
'down', 'up', 'right', 'left', 'sr', 'sl', 'l', 'zl'} 'down', 'up', 'right', 'left', 'sr', 'sl', 'l', 'zl'}
# byte 1 # byte 1
@@ -171,13 +171,13 @@ class ButtonState:
def get_available_buttons(self): def get_available_buttons(self):
""" """
:returns set of valid buttons :returns: set of valid buttons
""" """
return set(self._available_buttons) return set(self._available_buttons)
def __iter__(self): def __iter__(self):
""" """
:returns iterator over the button bytes :returns: iterator over the button bytes
""" """
yield self._byte_1 yield self._byte_1
yield self._byte_2 yield self._byte_2
@@ -187,7 +187,12 @@ class ButtonState:
self._byte_1 = self._byte_2 = self._byte_3 = 0 self._byte_1 = self._byte_2 = self._byte_3 = 0
async def button_push(controller_state, *buttons, sec=0.1): async def button_press(controller_state, *buttons):
"""
Set given buttons in the controller state to the pressed down state and wait till send.
:param controller_state:
:param buttons: Buttons to press down (see ButtonState.get_available_buttons)
"""
if not buttons: if not buttons:
raise ValueError('No Buttons were given.') raise ValueError('No Buttons were given.')
@@ -195,20 +200,43 @@ async def button_push(controller_state, *buttons, sec=0.1):
for button in buttons: for button in buttons:
# push button # push button
button_state.set_button(button) button_state.set_button(button, pushed=True)
# send report # wait until report is send
await controller_state.send() await controller_state.send()
await asyncio.sleep(sec)
async def button_release(controller_state, *buttons):
"""
Set given buttons in the controller state to the unpressed state and wait till send.
:param controller_state:
:param buttons: Buttons to set to unpressed (see ButtonState.get_available_buttons)
"""
if not buttons:
raise ValueError('No Buttons were given.')
button_state = controller_state.button_state
for button in buttons: for button in buttons:
# release button # release button
button_state.set_button(button, pushed=False) button_state.set_button(button, pushed=False)
# send report # wait until report is send
await controller_state.send() await controller_state.send()
async def button_push(controller_state, *buttons, sec=0.1):
"""
Shortly push the given buttons. Wait until the controller state is send.
:param controller_state:
:param buttons: Buttons to push (see ButtonState.get_available_buttons)
:param sec: Seconds to wait before releasing the button, default: 0.1
"""
await button_press(controller_state, *buttons)
await asyncio.sleep(sec)
await button_release(controller_state, *buttons)
class _StickCalibration: class _StickCalibration:
def __init__(self, h_center, v_center, h_max_above_center, v_max_above_center, h_max_below_center, v_max_below_center): def __init__(self, h_center, v_center, h_max_above_center, v_max_above_center, h_max_below_center, v_max_below_center):
self.h_center = h_center self.h_center = h_center
+36
View File
@@ -0,0 +1,36 @@
import asyncio
import hid
VENDOR_ID = 1406
PRODUCT_ID_JL = 8198
PRODUCT_ID_JR = 8199
PRODUCT_ID_PC = 8201
async def get_blt_hid_device():
while True:
for device in hid.enumerate(0, 0):
# looking for devices matching Nintendo's vendor id and JoyCon product id
if device['vendor_id'] == VENDOR_ID and device['product_id'] in (
PRODUCT_ID_JL, PRODUCT_ID_JR, PRODUCT_ID_PC):
return device
await asyncio.sleep(2)
class AsyncHID(hid.Device):
def __init__(self, *args, loop=asyncio.get_event_loop(), **kwargs):
super().__init__(*args, **kwargs)
self._loop = loop
self._write_lock = asyncio.Lock()
self._read_lock = asyncio.Lock()
async def read(self, size, timeout=None):
async with self._read_lock:
return await self._loop.run_in_executor(None, hid.Device.read, self, size, timeout)
async def write(self, data):
async with self._write_lock:
return await self._loop.run_in_executor(None, hid.Device.write, self, data)
-155
View File
@@ -1,155 +0,0 @@
import logging
from enum import Enum
from crc8 import crc8
logger = logging.getLogger(__name__)
class Action(Enum):
NON = 0
REQUEST_STATUS = 1
START_TAG_POLLING = 2
START_TAG_DISCOVERY = 3
READ_TAG = 4
READ_TAG_2 = 5
READ_FINISHED = 6
class McuState(Enum):
NOT_INITIALIZED = 0
IRC = 1
NFC = 2
STAND_BY = 3
BUSY = 4
def copyarray(dest, offset, src):
for i in range(len(src)):
dest[offset + i] = src[i]
class IrNfcMcu:
"""
TODO: cleanup
"""
def __init__(self):
self._fw_major = [0, 3]
self._fw_minor = [0, 5]
self._bytes = [0] * 313
self._action = Action.NON
self._state = McuState.NOT_INITIALIZED
self._nfc_content = None
def get_fw_major(self):
return self._fw_major
def get_fw_minor(self):
return self._fw_minor
def set_action(self, v):
self._action = v
def get_action(self):
return self._action
def set_state(self, v):
self._state = v
def get_state(self):
return self._state
def _get_state_byte(self):
if self.get_state() == McuState.NFC:
return 4
elif self.get_state() == McuState.BUSY:
return 6
elif self.get_state() == McuState.NOT_INITIALIZED:
return 1
elif self.get_state() == McuState.STAND_BY:
return 1
else:
return 0
def update_status(self):
self._bytes[0] = 1
self._bytes[1] = 0
self._bytes[2] = 0
self._bytes[3] = self._fw_major[0]
self._bytes[4] = self._fw_major[1]
self._bytes[5] = self._fw_minor[0]
self._bytes[6] = self._fw_minor[1]
self._bytes[7] = self._get_state_byte()
def update_nfc_report(self):
self._bytes = [0] * 313
if self.get_action() == Action.REQUEST_STATUS:
self.update_status()
elif self.get_action() == Action.NON:
self._bytes[0] = 0xff
elif self.get_action() == Action.START_TAG_DISCOVERY:
self._bytes[0] = 0x2a
self._bytes[1] = 0
self._bytes[2] = 5
self._bytes[3] = 0
self._bytes[4] = 0
self._bytes[5] = 9
self._bytes[6] = 0x31
self._bytes[7] = 0
elif self.get_action() == Action.START_TAG_POLLING:
self._bytes[0] = 0x2a
self._bytes[1] = 0
self._bytes[2] = 5
self._bytes[3] = 0
self._bytes[4] = 0
if self._nfc_content is not None:
data = [0x09, 0x31, 0x09, 0x00, 0x00, 0x00, 0x01, 0x01, 0x02, 0x00, 0x07]
copyarray(self._bytes, 5, data)
copyarray(self._bytes, 5 + len(data), self._nfc_content[0:3])
copyarray(self._bytes, 5 + len(data) + 3, self._nfc_content[4:8])
else:
logger.info('nfc content is none')
self._bytes[5] = 9
self._bytes[6] = 0x31
self._bytes[7] = 0
elif self.get_action() in (Action.READ_TAG, Action.READ_TAG_2):
self._bytes[0] = 0x3a
self._bytes[1] = 0
self._bytes[2] = 7
if self.get_action() == Action.READ_TAG:
data1 = bytes.fromhex('010001310200000001020007')
copyarray(self._bytes, 3, data1)
copyarray(self._bytes, 3 + len(data1), self._nfc_content[0:3])
copyarray(self._bytes, 3 + len(data1) + 3, self._nfc_content[4:8])
data2 = bytes.fromhex('000000007DFDF0793651ABD7466E39C191BABEB856CEEDF1CE44CC75EAFB27094D087AE803003B3C7778860000')
copyarray(self._bytes, 3 + len(data1) + 3 + 4, data2)
copyarray(self._bytes, 3 + len(data1) + 3 + 4 + len(data2), self._nfc_content[0:245])
self.set_action(Action.READ_TAG_2)
else:
data = bytes.fromhex('02000927')
copyarray(self._bytes, 3, data)
copyarray(self._bytes, 3 + len(data), self._nfc_content[245:540])
self.set_action(Action.READ_FINISHED)
elif self.get_action() == Action.READ_FINISHED:
self._bytes[0] = 0x2a
self._bytes[1] = 0
self._bytes[2] = 5
self._bytes[3] = 0
self._bytes[4] = 0
data = bytes.fromhex('0931040000000101020007')
copyarray(self._bytes, 5, data)
copyarray(self._bytes, 5 + len(data), self._nfc_content[0:3])
copyarray(self._bytes, 5 + len(data) + 3, self._nfc_content[4:8])
crc = crc8()
crc.update(bytes(self._bytes[:-1]))
self._bytes[-1] = ord(crc.digest())
def set_nfc(self, nfc_content):
self._nfc_content = nfc_content
def __bytes__(self):
return bytes(self._bytes)
+8 -77
View File
@@ -11,8 +11,6 @@ from joycontrol.controller_state import ControllerState
from joycontrol.memory import FlashMemory from joycontrol.memory import FlashMemory
from joycontrol.report import OutputReport, SubCommand, InputReport, OutputReportID from joycontrol.report import OutputReport, SubCommand, InputReport, OutputReportID
from joycontrol.transport import NotConnectedError from joycontrol.transport import NotConnectedError
from joycontrol.ir_nfc_mcu import IrNfcMcu, McuState, Action
from crc8 import crc8
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -42,8 +40,6 @@ class ControllerProtocol(BaseProtocol):
self._controller_state = ControllerState(self, controller, spi_flash=spi_flash) self._controller_state = ControllerState(self, controller, spi_flash=spi_flash)
self._controller_state_sender = None self._controller_state_sender = None
self._mcu = IrNfcMcu()
# None = Just answer to sub commands # None = Just answer to sub commands
self._input_report_mode = None self._input_report_mode = None
@@ -166,9 +162,8 @@ class ControllerProtocol(BaseProtocol):
elif output_report_id == OutputReportID.SUB_COMMAND: elif output_report_id == OutputReportID.SUB_COMMAND:
reply_send = await self._reply_to_sub_command(report) reply_send = await self._reply_to_sub_command(report)
elif output_report_id == OutputReportID.REQUEST_IR_NFC_MCU: elif output_report_id == OutputReportID.REQUEST_IR_NFC_MCU:
# TODO: This does not reply anything # TODO NFC
# reply_send = await self._reply_to_ir_nfc_mcu(report) raise NotImplementedError('NFC communictation is not implemented.')
await self._reply_to_ir_nfc_mcu(report)
else: else:
logger.warning(f'Report unknown output report "{output_report_id}" - IGNORE') logger.warning(f'Report unknown output report "{output_report_id}" - IGNORE')
except ValueError as v_err: except ValueError as v_err:
@@ -184,11 +179,9 @@ class ControllerProtocol(BaseProtocol):
# TODO: set some sensor data # TODO: set some sensor data
input_report.set_6axis_data() input_report.set_6axis_data()
# set nfc data # TODO NFC - set nfc data
if input_report.get_input_report_id() == 0x31: if input_report.get_input_report_id() == 0x31:
self._mcu.set_nfc(self._controller_state.get_nfc()) pass
self._mcu.update_nfc_report()
input_report.set_ir_nfc_data(bytes(self._mcu))
await self.write(input_report) await self.write(input_report)
@@ -237,50 +230,6 @@ class ControllerProtocol(BaseProtocol):
else: else:
logger.warning(f'Output report {output_report_id} not implemented - ignoring') logger.warning(f'Output report {output_report_id} not implemented - ignoring')
async def _reply_to_ir_nfc_mcu(self, report):
"""
TODO: Cleanup
We aren't replying to anything here, do we need to?
"""
sub_command = report.data[11]
sub_command_data = report.data[12:]
# logging.info(f'received output report - Request MCU sub command {sub_command}')
if self._mcu.get_action() in (Action.READ_TAG, Action.READ_TAG_2, Action.READ_FINISHED):
return
# Request mcu state
if sub_command == 0x01:
# input_report = InputReport()
# input_report.set_input_report_id(0x21)
# input_report.set_misc()
# input_report.set_ack(0xA0)
# input_report.reply_to_subcommand_id(0x21)
self._mcu.set_action(Action.REQUEST_STATUS)
# input_report.set_mcu(self._mcu)
# await self.write(input_report)
# Send Start tag discovery
elif sub_command == 0x02:
# 0: Cancel all, 4: StartWaitingReceive
if sub_command_data[0] == 0x04:
self._mcu.set_action(Action.START_TAG_DISCOVERY)
# 1: Start polling
elif sub_command_data[0] == 0x01:
self._mcu.set_action(Action.START_TAG_POLLING)
# 2: stop polling
elif sub_command_data[0] == 0x02:
self._mcu.set_action(Action.NON)
elif sub_command_data[0] == 0x06:
self._mcu.set_action(Action.READ_TAG)
else:
logging.info(f'Unknown sub_command_data arg {sub_command_data}')
else:
logging.info(f'Unknown MCU sub command {sub_command}')
async def _reply_to_sub_command(self, report): async def _reply_to_sub_command(self, report):
# classify sub command # classify sub command
try: try:
@@ -467,6 +416,7 @@ class ControllerProtocol(BaseProtocol):
await self.write(input_report) await self.write(input_report)
async def _command_set_nfc_ir_mcu_config(self, sub_command_data): async def _command_set_nfc_ir_mcu_config(self, sub_command_data):
# TODO NFC
input_report = InputReport() input_report = InputReport()
input_report.set_input_report_id(0x21) input_report.set_input_report_id(0x21)
input_report.set_misc() input_report.set_misc()
@@ -474,30 +424,14 @@ class ControllerProtocol(BaseProtocol):
input_report.set_ack(0xA0) input_report.set_ack(0xA0)
input_report.reply_to_subcommand_id(SubCommand.SET_NFC_IR_MCU_CONFIG.value) input_report.reply_to_subcommand_id(SubCommand.SET_NFC_IR_MCU_CONFIG.value)
self._mcu.update_status() data = [1, 0, 255, 0, 8, 0, 27, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 200]
data = list(bytes(self._mcu)[0:34])
crc = crc8()
crc.update(bytes(data[:-1]))
checksum = crc.digest()
data[-1] = ord(checksum)
for i in range(len(data)): for i in range(len(data)):
input_report.data[16+i] = data[i] input_report.data[16 + i] = data[i]
# Set MCU mode cmd
if sub_command_data[1] == 0:
if sub_command_data[2] == 0:
self._mcu.set_state(McuState.STAND_BY)
elif sub_command_data[2] == 4:
self._mcu.set_state(McuState.NFC)
else:
logger.info(f"unknown mcu state {sub_command_data[2]}")
else:
logger.info(f"unknown mcu config command {sub_command_data}")
await self.write(input_report) await self.write(input_report)
async def _command_set_nfc_ir_mcu_state(self, sub_command_data): async def _command_set_nfc_ir_mcu_state(self, sub_command_data):
# TODO NFC
input_report = InputReport() input_report = InputReport()
input_report.set_input_report_id(0x21) input_report.set_input_report_id(0x21)
input_report.set_misc() input_report.set_misc()
@@ -506,13 +440,10 @@ class ControllerProtocol(BaseProtocol):
# 0x01 = Resume # 0x01 = Resume
input_report.set_ack(0x80) input_report.set_ack(0x80)
input_report.reply_to_subcommand_id(SubCommand.SET_NFC_IR_MCU_STATE.value) input_report.reply_to_subcommand_id(SubCommand.SET_NFC_IR_MCU_STATE.value)
self._mcu.set_action(Action.NON)
self._mcu.set_state(McuState.STAND_BY)
elif sub_command_data[0] == 0x00: elif sub_command_data[0] == 0x00:
# 0x00 = Suspend # 0x00 = Suspend
input_report.set_ack(0x80) input_report.set_ack(0x80)
input_report.reply_to_subcommand_id(SubCommand.SET_NFC_IR_MCU_STATE.value) input_report.reply_to_subcommand_id(SubCommand.SET_NFC_IR_MCU_STATE.value)
self._mcu.set_state(McuState.STAND_BY)
else: else:
raise NotImplementedError(f'Argument {sub_command_data[0]} of {SubCommand.SET_NFC_IR_MCU_STATE} ' raise NotImplementedError(f'Argument {sub_command_data[0]} of {SubCommand.SET_NFC_IR_MCU_STATE} '
f'not implemented.') f'not implemented.')
+17 -4
View File
@@ -272,16 +272,29 @@ class OutputReport:
@staticmethod @staticmethod
def _encode_rumble_data(freq, amp): def _encode_rumble_data(freq, amp):
if not 0 <= freq <= 1252: # TODO: Fix LA Byte 2
raise ValueError('Frequency must be in [0, 1252].')
if not (40 <= freq <= 1253):
raise ValueError('Frequency must be in [40, 1253].')
if amp > 1.003:
raise ValueError('Amplitudes higher than 1.003 are not safe '
'for the integrity of the linear resonant actuators')
# Float frequency to hex conversion # Float frequency to hex conversion
encoded_hex_freq = int(round(math.log2(freq / 10) * 32)) encoded_hex_freq = int(round(math.log2(freq / 10) * 32))
# Convert to Joy-Con HF range. Range in big-endian: 0x0004-0x01FC with +0x0004 steps. # Convert to Joy-Con HF range. Range in big-endian: 0x0004-0x01FC with +0x0004 steps.
hf = (encoded_hex_freq - 0x60) * 4 if freq <= 80:
hf = 0x00
else:
hf = (encoded_hex_freq - 0x60) * 4
# Convert to Joy-Con LF range. Range: 0x01-0x7F. # Convert to Joy-Con LF range. Range: 0x01-0x7F.
lf = encoded_hex_freq - 0x40 if freq >= 640:
lf = 0x00
else:
lf = encoded_hex_freq - 0x40
# Float amplitude to hex conversion # Float amplitude to hex conversion
encoded_hex_amp = 0 encoded_hex_amp = 0
+5 -3
View File
@@ -76,10 +76,9 @@ async def create_hid_server(protocol_factory, ctl_psm=17, itr_psm=19, device_id=
hid.powered(True) hid.powered(True)
hid.pairable(True) hid.pairable(True)
# setting bluetooth adapter name and class to the device we wish to emulate # setting bluetooth adapter name to the device we wish to emulate
await hid.set_name(protocol.controller.device_name()) await hid.set_name(protocol.controller.device_name())
await hid.set_class()
logger.info('Advertising the Bluetooth SDP record...') logger.info('Advertising the Bluetooth SDP record...')
try: try:
@@ -88,6 +87,9 @@ async def create_hid_server(protocol_factory, ctl_psm=17, itr_psm=19, device_id=
# Already registered (If multiple controllers are being emulated and this method is called consecutive times) # Already registered (If multiple controllers are being emulated and this method is called consecutive times)
logger.debug(dbus_err) logger.debug(dbus_err)
# set the device class to "Gamepad/joystick"
await hid.set_class()
# start advertising # start advertising
hid.discoverable() hid.discoverable()
-19
View File
@@ -2,28 +2,9 @@ import asyncio
import logging import logging
from contextlib import contextmanager from contextlib import contextmanager
import hid
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class AsyncHID(hid.Device):
def __init__(self, *args, loop=asyncio.get_event_loop(), **kwargs):
super().__init__(*args, **kwargs)
self._loop = loop
self._write_lock = asyncio.Lock()
self._read_lock = asyncio.Lock()
async def read(self, size, timeout=None):
async with self._read_lock:
return await self._loop.run_in_executor(None, hid.Device.read, self, size, timeout)
async def write(self, data):
async with self._write_lock:
return await self._loop.run_in_executor(None, hid.Device.write, self, data)
@contextmanager @contextmanager
def get_output(path=None, open_flags='wb', default=None): def get_output(path=None, open_flags='wb', default=None):
""" """
Regular → Executable
+117 -66
View File
@@ -10,7 +10,7 @@ from aioconsole import ainput
from joycontrol import logging_default as log, utils from joycontrol import logging_default as log, utils
from joycontrol.command_line_interface import ControllerCLI from joycontrol.command_line_interface import ControllerCLI
from joycontrol.controller import Controller from joycontrol.controller import Controller
from joycontrol.controller_state import ControllerState, button_push from joycontrol.controller_state import ControllerState, button_push, button_press, button_release
from joycontrol.memory import FlashMemory from joycontrol.memory import FlashMemory
from joycontrol.protocol import controller_protocol_factory from joycontrol.protocol import controller_protocol_factory
from joycontrol.server import create_hid_server from joycontrol.server import create_hid_server
@@ -136,25 +136,21 @@ async def test_controller_buttons(controller_state: ControllerState):
await button_push(controller_state, 'home') await button_push(controller_state, 'home')
async def set_nfc(controller_state, file_path): def ensure_valid_button(controller_state, *buttons):
""" """
Sets nfc content of the controller state to contents of the given file. Raise ValueError if any of the given buttons os not part of the controller state.
:param controller_state: Emulated controller state :param controller_state:
:param file_path: Path to nfc dump file :param buttons: Any number of buttons to check (see ButtonState.get_available_buttons)
""" """
loop = asyncio.get_event_loop() for button in buttons:
if button not in controller_state.button_state.get_available_buttons():
with open(file_path, 'rb') as nfc_file: raise ValueError(f'Button {button} does not exist on {controller_state.get_controller()}')
content = await loop.run_in_executor(None, nfc_file.read)
controller_state.set_nfc(content)
async def mash_button(controller_state, button, interval): async def mash_button(controller_state, button, interval):
# waits until controller is fully connected # wait until controller is fully connected
await controller_state.connect() await controller_state.connect()
ensure_valid_button(controller_state, button)
if button not in controller_state.button_state.get_available_buttons():
raise ValueError(f'Button {button} does not exist on {controller_state.get_controller()}')
user_input = asyncio.ensure_future( user_input = asyncio.ensure_future(
ainput(prompt=f'Pressing the {button} button every {interval} seconds... Press <enter> to stop.') ainput(prompt=f'Pressing the {button} button every {interval} seconds... Press <enter> to stop.')
@@ -168,6 +164,107 @@ async def mash_button(controller_state, button, interval):
await user_input await user_input
def _register_commands_with_controller_state(controller_state, cli):
"""
Commands registered here can use the given controller state.
The doc string of commands will be printed by the CLI when calling "help"
:param cli:
:param controller_state:
"""
async def test_buttons():
"""
test_buttons - Navigates to the "Test Controller Buttons" menu and presses all buttons.
"""
await test_controller_buttons(controller_state)
cli.add_command(test_buttons.__name__, test_buttons)
# Mash a button command
async def mash(*args):
"""
mash - Mash a specified button at a set interval
Usage:
mash <button> <interval>
"""
if not len(args) == 2:
raise ValueError('"mash_button" command requires a button and interval as arguments!')
button, interval = args
await mash_button(controller_state, button, interval)
cli.add_command(mash.__name__, mash)
# Hold a button command
async def hold(*args):
"""
hold - Press and hold specified buttons
Usage:
hold <button>
Example:
hold a b
"""
if not args:
raise ValueError('"hold" command requires a button!')
ensure_valid_button(controller_state, *args)
# wait until controller is fully connected
await controller_state.connect()
await button_press(controller_state, *args)
cli.add_command(hold.__name__, hold)
# Release a button command
async def release(*args):
"""
release - Release specified buttons
Usage:
release <button>
Example:
release a b
"""
if not args:
raise ValueError('"release" command requires a button!')
ensure_valid_button(controller_state, *args)
# wait until controller is fully connected
await controller_state.connect()
await button_release(controller_state, *args)
cli.add_command(release.__name__, release)
# Create nfc command
async def nfc(*args):
"""
nfc - Sets nfc content
Usage:
nfc <file_name> Set controller state NFC content to file
nfc remove Remove NFC content from controller state
"""
logger.error('NFC Support was removed from joycontrol - see https://github.com/mart1nro/joycontrol/issues/80')
if controller_state.get_controller() == Controller.JOYCON_L:
raise ValueError('NFC content cannot be set for JOYCON_L')
elif not args:
raise ValueError('"nfc" command requires file path to an nfc dump as argument!')
elif args[0] == 'remove':
controller_state.set_nfc(None)
print('Removed nfc content.')
else:
_loop = asyncio.get_event_loop()
with open(args[0], 'rb') as nfc_file:
content = await _loop.run_in_executor(None, nfc_file.read)
controller_state.set_nfc(content)
cli.add_command(nfc.__name__, nfc)
async def _main(args): async def _main(args):
# parse the spi flash # parse the spi flash
if args.spi_flash: if args.spi_flash:
@@ -181,6 +278,7 @@ async def _main(args):
controller = Controller.from_arg(args.controller) controller = Controller.from_arg(args.controller)
with utils.get_output(path=args.log, default=None) as capture_file: with utils.get_output(path=args.log, default=None) as capture_file:
# prepare the the emulated controller
factory = controller_protocol_factory(controller, spi_flash=spi_flash) factory = controller_protocol_factory(controller, spi_flash=spi_flash)
ctl_psm, itr_psm = 17, 19 ctl_psm, itr_psm = 17, 19
transport, protocol = await create_hid_server(factory, reconnect_bt_addr=args.reconnect_bt_addr, transport, protocol = await create_hid_server(factory, reconnect_bt_addr=args.reconnect_bt_addr,
@@ -192,61 +290,14 @@ async def _main(args):
# Create command line interface and add some extra commands # Create command line interface and add some extra commands
cli = ControllerCLI(controller_state) cli = ControllerCLI(controller_state)
_register_commands_with_controller_state(controller_state, cli)
cli.add_command('amiibo', ControllerCLI.deprecated('Command was removed - use "nfc" instead!'))
# Wrap the script so we can pass the controller state. The doc string will be printed when calling 'help' # set default nfc content supplied by argument
async def _run_test_controller_buttons():
"""
test_buttons - Navigates to the "Test Controller Buttons" menu and presses all buttons.
"""
await test_controller_buttons(controller_state)
# add the script from above
cli.add_command('test_buttons', _run_test_controller_buttons)
# Mash a button command
async def call_mash_button(*args):
"""
mash - Mash a specified button at a set interval
Usage:
mash <button> <interval>
"""
if not len(args) == 2:
raise ValueError('"mash_button" command requires a button and interval as arguments!')
button, interval = args
await mash_button(controller_state, button, interval)
# add the script from above
cli.add_command('mash', call_mash_button)
# Create nfc command
async def nfc(*args):
"""
nfc - Sets nfc content
Usage:
nfc <file_name> Set controller state NFC content to file
nfc remove Remove NFC content from controller state
"""
if controller_state.get_controller() == Controller.JOYCON_L:
raise ValueError('NFC content cannot be set for JOYCON_L')
elif not args:
raise ValueError('"nfc" command requires file path to an nfc dump as argument!')
elif args[0] == 'remove':
controller_state.set_nfc(None)
print('Removed nfc content.')
else:
await set_nfc(controller_state, args[0])
# add the script from above
cli.add_command('nfc', nfc)
cli.add_command('amiibo', ControllerCLI.deprecated('Command is deprecated - use "nfc" instead!'))
if args.nfc is not None: if args.nfc is not None:
await nfc(args.nfc) await cli.commands['nfc'](args.nfc)
# run the cli
try: try:
await cli.run() await cli.run()
finally: finally:
View File
+25 -33
View File
@@ -2,24 +2,17 @@ import asyncio
import logging import logging
import os import os
import hid
from joycontrol import logging_default as log from joycontrol import logging_default as log
from joycontrol.hid import get_blt_hid_device, AsyncHID
from joycontrol.report import InputReport, OutputReport, OutputReportID, SubCommand from joycontrol.report import InputReport, OutputReport, OutputReportID, SubCommand
from joycontrol.utils import AsyncHID
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
VENDOR_ID = 1406
PRODUCT_ID_JL = 8198
PRODUCT_ID_JR = 8199
PRODUCT_ID_PC = 8201
""" """
Sends some vibration reports to a joycon. Only works with the right joycon atm. Sends some vibration reports to a joycon. Only works with the right joycon atm.
""" """
async def print_outputs(hid_device): async def print_outputs(hid_device):
while True: while True:
data = await hid_device.read(255) data = await hid_device.read(255)
@@ -52,21 +45,23 @@ async def send_vibration_report(hid_device):
await hid_device.write(bytes(data)) await hid_device.write(bytes(data))
await asyncio.sleep(0.1) await asyncio.sleep(0.1)
scale = [261.63, 293.66, 329.63, 349.23, 392.00, 440.00, 493.88, 523.25]
scale = [int(round(n)) for n in scale]
amp = 1
time = 2 time = 2
while True: while True:
for i in range(10): rumble_report = OutputReport()
rumble_report = OutputReport() report.set_timer(time)
report.set_timer(time) time += 1
time += 1 rumble_report.set_output_report_id(OutputReportID.RUMBLE_ONLY)
rumble_report.set_output_report_id(OutputReportID.RUMBLE_ONLY) # increase frequency
# increase frequency rumble_report.set_right_rumble_data(scale[time % len(scale)], amp)
rumble_report.set_right_rumble_data(100 + i * 100, 1) data = bytes(rumble_report)[1:]
data = bytes(rumble_report)[1:] print('writing', data)
print('writing', data) await hid_device.write(bytes(data))
await hid_device.write(bytes(data))
await asyncio.sleep(.5) await asyncio.sleep(.2)
break
try: try:
await reader await reader
@@ -75,19 +70,11 @@ async def send_vibration_report(hid_device):
async def _main(loop): async def _main(loop):
logger.info('Waiting for HID devices... Please connect one JoyCon (left OR right), or a Pro Controller over Bluetooth. ' logger.info('Waiting for HID devices... Please connect one JoyCon (left OR right), '
'or a Pro Controller over Bluetooth. '
'Note: The bluez "input" plugin needs to be enabled (default)') 'Note: The bluez "input" plugin needs to be enabled (default)')
controller = None controller = await get_blt_hid_device()
while controller is None:
for device in hid.enumerate(0, 0):
# looking for devices matching Nintendo's vendor id and JoyCon product id
if device['vendor_id'] == VENDOR_ID and device['product_id'] in (PRODUCT_ID_JL, PRODUCT_ID_JR, PRODUCT_ID_PC):
controller = device
break
else:
await asyncio.sleep(2)
logger.info(f'Found controller "{controller}".') logger.info(f'Found controller "{controller}".')
with AsyncHID(path=controller['path'], loop=loop) as hid_controller: with AsyncHID(path=controller['path'], loop=loop) as hid_controller:
@@ -96,9 +83,14 @@ async def _main(loop):
if __name__ == '__main__': if __name__ == '__main__':
# check if root # check if root
if not os.geteuid() == 0: if os.geteuid() != 0:
raise PermissionError('Script must be run as root!') raise PermissionError('Script must be run as root!')
# h = lambda bla: list(map(hex, bla))
# report = OutputReport()
# report.set_left_rumble_data(1253, 0.012)
# exit()
# setup logging # setup logging
log.configure() log.configure()
+2 -2
View File
@@ -2,7 +2,7 @@
from setuptools import setup, find_packages from setuptools import setup, find_packages
setup(name='joycontrol', setup(name='joycontrol',
version='0.14', version='0.15',
author='Robert Martin', author='Robert Martin',
author_email='martinro@informatik.hu-berlin.de', author_email='martinro@informatik.hu-berlin.de',
description='Emulate Nintendo Switch Controllers over Bluetooth', description='Emulate Nintendo Switch Controllers over Bluetooth',
@@ -10,7 +10,7 @@ setup(name='joycontrol',
package_data={'joycontrol': ['profile/sdp_record_hid.xml']}, package_data={'joycontrol': ['profile/sdp_record_hid.xml']},
zip_safe=False, zip_safe=False,
install_requires=[ install_requires=[
'hid', 'aioconsole', 'dbus-python', 'crc8' 'hid', 'aioconsole', 'dbus-python'
] ]
) )