connection and command test

This commit is contained in:
Robert Martin
2020-01-23 17:59:10 +09:00
parent a5aad83a2d
commit 913848bcf2
4 changed files with 594 additions and 0 deletions
+134
View File
@@ -0,0 +1,134 @@
.idea
# PYTHON
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
+282
View File
@@ -0,0 +1,282 @@
import logging
import os
import re
import subprocess
import time
import uuid
from enum import Enum, auto
from time import sleep
import logging_default as log
import bluetooth as blt
import dbus
logger = logging.getLogger(__name__)
# Defining the ports for the control and interrupt sockets
CTL_PSM = 17
ITR_PSM = 19
class HidDevice:
_HID_UUID = '00001124-0000-1000-8000-00805f9b34fb'
_HID_PATH = '/bluez/switch/hid'
PRO_CONTROLLER = 'Pro Controller'
JOYCON_R = 'Joy-Con (R)'
JOYCON_L = 'Joy-Con (L)'
def __init__(self):
self._uuid = str(uuid.uuid4())
# Setting up dbus to advertise the service record
bus = dbus.SystemBus()
obj = bus.get_object('org.bluez', '/org/bluez/hci0')
self.adapter = dbus.Interface(obj, 'org.bluez.Adapter1')
self.properties = dbus.Interface(self.adapter, 'org.freedesktop.DBus.Properties')
def discoverable(self, boolean=True):
#self.properties.Set(self.adapter.dbus_interface, 'Powered', True)
self.properties.Set(self.adapter.dbus_interface, 'Discoverable', boolean)
def set_class(self, cls=0x002508):
"""
:param cls: default 0x002508 (Gamepad/joystick device class)
"""
logger.info(f'setting device class to {cls}...')
subprocess.call(['hciconfig', 'hci0', 'class', str(cls)])
def set_name(self, name: str):
logger.info(f'setting device name to {name}...')
subprocess.call(['hciconfig', 'hci0', 'name', name])
def register_sdp_record(self, record_path):
with open(record_path) as record:
opts = {
'ServiceRecord': record.read(),
'Role': 'server',
'Service': self._HID_UUID,
'RequireAuthentication': False,
'RequireAuthorization': False
}
bus = dbus.SystemBus()
manager = dbus.Interface(bus.get_object("org.bluez", "/org/bluez"), "org.bluez.ProfileManager1")
manager.RegisterProfile(self._HID_PATH, self._uuid, opts)
class InputReport:
def __init__(self):
self.m1 = [0xA1, 0x21, 0x05, 0x8E, 0x84, 0x00, 0x12, 0x01, 0x18, 0x80, 0x01, 0x18, 0x80, 0x80, 0x82, 0x02, 0x03, 0x48, 0x01, 0x02, 0xDC, 0xA6, 0x32, 0x71, 0x58, 0xBB, 0x01, 0x01]
self.m2 = [0xA1, 0x21, 0x06, 0x8E, 0x84, 0x00, 0x12, 0x01, 0x18, 0x80, 0x01, 0x18, 0x80, 0x80, 0x80, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]
def create(self, message):
miss = 49 - len(message)
message = message + miss * [0x00]
assert len(message) == 49
return message
class InputReport2:
def __init__(self):
self.data = [0x00] * 50
# all input reports are prepended with 0xA1
self.data[0] = 0xA1
def set(self, input_report_id, timer=0x00):
self.data[1] = input_report_id
self.data[2] = timer % 256
# battery level + connection info
self.data[3] = 0x8E
# Todo: Button status, analog stick data, vibrator input
# ACK byte for subcmd reply
self.data[14] = 0x82
# Reply-to subcommand ID
self.data[14] = 0x02
def sub_0x2_device_info(self, mac, fm_version=(0x03, 0x48), controller=0x01):
"""
Sub command 0x02 request device info response.
:param mac: Controller MAC address in Big Endian (6 Bytes)
:param fm_version: TODO
:param controller: 1=Left Joy-Con, 2=Right Joy-Con, 3=Pro Controller
"""
if len(fm_version) != 2:
raise ValueError('Firmware version must consist of 2 bytes!')
elif len(mac) != 6:
raise ValueError('Bluetooth mac address must consist of 6 bytes!')
# reply to sub command ID
self.data[14] = 0x02
# sub command reply data
offset = 15
self.data[offset: offset + 1] = fm_version
self.data[offset + 2] = controller
self.data[offset + 3] = 0x02
self.data[offset + 4: offset + 9] = mac
self.data[offset + 10] = 0x01
self.data[offset + 11] = 0x01
def __bytes__(self):
return bytes(self.data)
class SubCommand(Enum):
REQUEST_DEVICE_INFO = auto()
NOT_IMPLEMENTED = auto()
class OutputReport:
def __init__(self, data):
if data[0] != 0xA2:
raise ValueError('Output reports must start with 0xA2')
self.data = data
def sub_command(self):
if self.data[11] == 0x02:
return SubCommand.REQUEST_DEVICE_INFO
else:
return None
def __bytes__(self):
return bytes(self.data)
def get_bt_mac_address(dev=0):
output = subprocess.check_output(['hciconfig', f'hci{dev}'], encoding='UTF-8')
match = re.search(r'BD Address: (?P<mac>\w\w:\w\w:\w\w:\w\w:\w\w:\w\w)', output)
if match:
return list(map(lambda x: int(x, 16), match.group('mac').split(':')))
else:
raise ValueError(f'BD Address not found in "{output}"')
def pair_switch(client_itr, own_bd_mac_address):
while True:
in_report = InputReport2()
reply = None
while reply is None:
# It seems like we have to initiate the conversation, so send and empty input report
client_itr.send(bytes(in_report))
sleep(.1)
try:
reply = client_itr.recv(50)
except blt.btcommon.BluetoothError as bt_err:
print(bt_err)
out_report = OutputReport(list(reply))
# DEVIVCE INFO REQUEST
sub_command = out_report.sub_command()
if sub_command is None:
logger.error(f'No sub command found in "{reply}"')
continue
elif sub_command == SubCommand.REQUEST_DEVICE_INFO:
# send device info
device_info_in_report = InputReport2()
device_info_in_report.sub_0x2_device_info(own_bd_mac_address)
logger.info('Sending device info...')
client_itr.send(bytes(device_info_in_report))
elif sub_command == SubCommand.NOT_IMPLEMENTED:
logger.error(f'Sub command not implemented of "{reply}"')
continue
# awaiting Subcommand 0x08: Set shipment
reply = None
while reply is None:
try:
reply = client_itr.recv(50)
except blt.btcommon.BluetoothError as bt_err:
print(bt_err)
sleep(.5)
print(reply)
return True
return False
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='open bluetooth hid socket')
# parser.add_argument('port', type=int, help='socket port')
args = parser.parse_args()
# setup logging
log.configure()
# check if root
if not os.geteuid() == 0:
raise PermissionError('Script must be run as root!')
# Creating L2CAP sockets for control and interrupt channel
ctl_sock = blt.BluetoothSocket(blt.L2CAP)
itr_sock = blt.BluetoothSocket(blt.L2CAP)
logger.info(f'Binding control channel to {CTL_PSM}...')
ctl_sock.bind(("", CTL_PSM))
logger.info(f'Binding interrupt channel to {ITR_PSM}...')
itr_sock.bind(("", ITR_PSM))
logger.info('start listening on the server sockets')
ctl_sock.listen(1) # Limit of 1 connection
itr_sock.listen(1)
logger.info('Restarting bluetooth service...')
os.system('systemctl restart bluetooth.service')
time.sleep(1)
hid = HidDevice()
# setting bluetooth adapter name and class to the device we wish to emulate
hid.set_name(HidDevice.JOYCON_L)
hid.set_class()
logger.info('Advertising the Bluetooth SDP record...')
hid.register_sdp_record('profile/sdp_record_hid_pro.xml')
hid.discoverable()
logger.info('Waiting for connection...')
client_ctl, address = ctl_sock.accept()
logger.info(f'Accepted connection at {CTL_PSM} from {address}')
client_itr, address = itr_sock.accept()
client_itr.settimeout(0)
logger.info(f'Accepted connection at {ITR_PSM} from {address}')
"""
data = [0] * 49
data[0] = 0xA1
hello = 0
ip = InputReport()
for i in range(100):
logger.info(f'sending data {data}...')
client_itr.send(bytes(data))
try:
print("received", client_itr.recv(49))
hello += 1
except blt.btcommon.BluetoothError as bt_err:
print(bt_err)
if hello == 1:
data = ip.create(ip.m1)
print("GO 1")
elif hello == 2:
data = ip.create(ip.m2)
print("GO 2")
sleep(0.5)
"""
pair_switch(client_itr, get_bt_mac_address())
itr_sock.close()
ctl_sock.close()
+64
View File
@@ -0,0 +1,64 @@
import logging
import datetime
def configure(console_level=logging.DEBUG, file_level=logging.DEBUG, logfile_name=None):
"""
Configures logging formatting
:param console_level: log level of console logger
:param file_level: log lever of file logger
:param logfile_name: name of logfile
"""
root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG)
formatter = logging.Formatter(
"[%(asctime)s] %(name)s %(funcName)s::%(lineno)s %(levelname)s - %(message)s",
"%H:%M:%S"
)
# create console logger
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
console_handler.setLevel(console_level)
root_logger.addHandler(console_handler)
# create file logger
if logfile_name is not None:
today = datetime.datetime.now()
name_of_file = today.strftime(f'%Y-%m-%d_%H-%M_{logfile_name}.log')
file_handler = logging.FileHandler(name_of_file)
file_handler.setLevel(file_level)
file_handler.setFormatter(formatter)
root_logger.addHandler(file_handler)
if __name__ == "__main__":
# Run test output on stdout
configure()
logger = logging.getLogger("test")
def test():
logger.debug("debug msg")
logger.info("info msg")
logger.warning("warning msg")
def test2():
logger.error("error msg")
logger.critical("critical msg")
# test debug, info, warning
test()
# test error, critical
test2()
# test exceptions
try:
raise RuntimeError("It's a trap!")
except Exception as e:
logger.exception(e)
+114
View File
@@ -0,0 +1,114 @@
<?xml version="1.0" encoding="UTF-8" ?>
<record>
<attribute id="0x0001">
<sequence>
<uuid value="0x1124"/>
</sequence>
</attribute>
<attribute id="0x0004">
<sequence>
<sequence>
<uuid value="0x0100"/>
<uint16 value="0x0011"/>
</sequence>
<sequence>
<uuid value="0x0011"/>
</sequence>
</sequence>
</attribute>
<attribute id="0x0005">
<sequence>
<uuid value="0x1002"/>
</sequence>
</attribute>
<attribute id="0x0006">
<sequence>
<uint16 value="0x656e"/>
<uint16 value="0x006a"/>
<uint16 value="0x0100"/>
</sequence>
</attribute>
<attribute id="0x0009">
<sequence>
<sequence>
<uuid value="0x1124"/>
<uint16 value="0x0100"/>
</sequence>
</sequence>
</attribute>
<attribute id="0x000d">
<sequence>
<sequence>
<sequence>
<uuid value="0x0100"/>
<uint16 value="0x0013"/>
</sequence>
<sequence>
<uuid value="0x0011"/>
</sequence>
</sequence>
</sequence>
</attribute>
<attribute id="0x0100">
<text value="Wireless Gamepad"/>
</attribute>
<attribute id="0x0101">
<text value="Gamepad"/>
</attribute>
<attribute id="0x0102">
<text value="Nintendo"/>
</attribute>
<attribute id="0x0200">
<uint16 value="0x0100"/>
</attribute>
<attribute id="0x0201">
<uint16 value="0x0111"/>
</attribute>
<attribute id="0x0202">
<uint8 value="0x08"/>
</attribute>
<attribute id="0x0203">
<uint8 value="0x00"/>
</attribute>
<attribute id="0x0204">
<boolean value="true"/>
</attribute>
<attribute id="0x0205">
<boolean value="true"/>
</attribute>
<attribute id="0x0206">
<sequence>
<sequence>
<uint8 value="0x22"/>
<text encoding="hex"
value="050115000904a1018530050105091901290a150025017501950a5500650081020509190b290e150025017501950481027501950281030b01000100a1000b300001000b310001000b320001000b35000100150027ffff0000751095048102c00b39000100150025073500463b0165147504950181020509190f2912150025017501950481027508953481030600ff852109017508953f8103858109027508953f8103850109037508953f9183851009047508953f9183858009057508953f9183858209067508953f9183c0"/>
</sequence>
</sequence>
</attribute>
<attribute id="0x0207">
<sequence>
<sequence>
<uint16 value="0x0409"/>
<uint16 value="0x0100"/>
</sequence>
</sequence>
</attribute>
<attribute id="0x020b">
<uint16 value="0x0100"/>
</attribute>
<attribute id="0x020c">
<uint16 value="0x0c80"/>
</attribute>
<attribute id="0x020d">
<boolean value="false"/>
</attribute>
<attribute id="0x020e">
<boolean value="true"/>
</attribute>
<attribute id="0x020f">
<uint16 value="0x0640"/>
</attribute>
<attribute id="0x0210">
<uint16 value="0x0320"/>
</attribute>
</record>