1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18import asyncio 19import sys 20import os 21import logging 22 23from bumble.colors import color 24 25from bumble.device import Device 26from bumble.transport import open_transport_or_link 27from bumble.core import ( 28 BT_HUMAN_INTERFACE_DEVICE_SERVICE, 29 BT_BR_EDR_TRANSPORT, 30) 31from bumble.hci import Address 32from bumble.hid import Host, Message 33from bumble.sdp import ( 34 Client as SDP_Client, 35 SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, 36 SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, 37 SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, 38 SDP_ALL_ATTRIBUTES_RANGE, 39 SDP_LANGUAGE_BASE_ATTRIBUTE_ID_LIST_ATTRIBUTE_ID, 40 SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, 41 SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, 42 SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID, 43) 44from hid_report_parser import ReportParser 45 46# ----------------------------------------------------------------------------- 47# SDP attributes for Bluetooth HID devices 48SDP_HID_SERVICE_NAME_ATTRIBUTE_ID = 0x0100 49SDP_HID_SERVICE_DESCRIPTION_ATTRIBUTE_ID = 0x0101 50SDP_HID_PROVIDER_NAME_ATTRIBUTE_ID = 0x0102 51SDP_HID_DEVICE_RELEASE_NUMBER_ATTRIBUTE_ID = 0x0200 # [DEPRECATED] 52SDP_HID_PARSER_VERSION_ATTRIBUTE_ID = 0x0201 53SDP_HID_DEVICE_SUBCLASS_ATTRIBUTE_ID = 0x0202 54SDP_HID_COUNTRY_CODE_ATTRIBUTE_ID = 0x0203 55SDP_HID_VIRTUAL_CABLE_ATTRIBUTE_ID = 0x0204 56SDP_HID_RECONNECT_INITIATE_ATTRIBUTE_ID = 0x0205 57SDP_HID_DESCRIPTOR_LIST_ATTRIBUTE_ID = 0x0206 58SDP_HID_LANGID_BASE_LIST_ATTRIBUTE_ID = 0x0207 59SDP_HID_SDP_DISABLE_ATTRIBUTE_ID = 0x0208 # [DEPRECATED] 60SDP_HID_BATTERY_POWER_ATTRIBUTE_ID = 0x0209 61SDP_HID_REMOTE_WAKE_ATTRIBUTE_ID = 0x020A 62SDP_HID_PROFILE_VERSION_ATTRIBUTE_ID = 0x020B # DEPRECATED] 63SDP_HID_SUPERVISION_TIMEOUT_ATTRIBUTE_ID = 0x020C 64SDP_HID_NORMALLY_CONNECTABLE_ATTRIBUTE_ID = 0x020D 65SDP_HID_BOOT_DEVICE_ATTRIBUTE_ID = 0x020E 66SDP_HID_SSR_HOST_MAX_LATENCY_ATTRIBUTE_ID = 0x020F 67SDP_HID_SSR_HOST_MIN_TIMEOUT_ATTRIBUTE_ID = 0x0210 68 69 70# ----------------------------------------------------------------------------- 71 72 73async def get_hid_device_sdp_record(connection): 74 75 # Connect to the SDP Server 76 sdp_client = SDP_Client(connection) 77 await sdp_client.connect() 78 if sdp_client: 79 print(color('Connected to SDP Server', 'blue')) 80 else: 81 print(color('Failed to connect to SDP Server', 'red')) 82 83 # List BT HID Device service in the root browse group 84 service_record_handles = await sdp_client.search_services( 85 [BT_HUMAN_INTERFACE_DEVICE_SERVICE] 86 ) 87 88 if len(service_record_handles) < 1: 89 await sdp_client.disconnect() 90 raise Exception( 91 color(f'BT HID Device service not found on peer device!!!!', 'red') 92 ) 93 94 # For BT_HUMAN_INTERFACE_DEVICE_SERVICE service, get all its attributes 95 for service_record_handle in service_record_handles: 96 attributes = await sdp_client.get_attributes( 97 service_record_handle, [SDP_ALL_ATTRIBUTES_RANGE] 98 ) 99 print(color(f'SERVICE {service_record_handle:04X} attributes:', 'yellow')) 100 print(color(f'SDP attributes for HID device', 'magenta')) 101 for attribute in attributes: 102 if attribute.id == SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID: 103 print( 104 color(' Service Record Handle : ', 'cyan'), 105 hex(attribute.value.value), 106 ) 107 108 elif attribute.id == SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID: 109 print( 110 color(' Service Class : ', 'cyan'), attribute.value.value[0].value 111 ) 112 113 elif attribute.id == SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID: 114 print( 115 color(' SDP Browse Group List : ', 'cyan'), 116 attribute.value.value[0].value, 117 ) 118 119 elif attribute.id == SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID: 120 print( 121 color(' BT_L2CAP_PROTOCOL_ID : ', 'cyan'), 122 attribute.value.value[0].value[0].value, 123 ) 124 print( 125 color(' PSM for Bluetooth HID Control channel : ', 'cyan'), 126 hex(attribute.value.value[0].value[1].value), 127 ) 128 print( 129 color(' BT_HIDP_PROTOCOL_ID : ', 'cyan'), 130 attribute.value.value[1].value[0].value, 131 ) 132 133 elif attribute.id == SDP_LANGUAGE_BASE_ATTRIBUTE_ID_LIST_ATTRIBUTE_ID: 134 print( 135 color(' Lanugage : ', 'cyan'), hex(attribute.value.value[0].value) 136 ) 137 print( 138 color(' Encoding : ', 'cyan'), hex(attribute.value.value[1].value) 139 ) 140 print( 141 color(' PrimaryLanguageBaseID : ', 'cyan'), 142 hex(attribute.value.value[2].value), 143 ) 144 145 elif attribute.id == SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID: 146 print( 147 color(' BT_HUMAN_INTERFACE_DEVICE_SERVICE ', 'cyan'), 148 attribute.value.value[0].value[0].value, 149 ) 150 print( 151 color(' HID Profileversion number : ', 'cyan'), 152 hex(attribute.value.value[0].value[1].value), 153 ) 154 155 elif attribute.id == SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID: 156 print( 157 color(' BT_L2CAP_PROTOCOL_ID : ', 'cyan'), 158 attribute.value.value[0].value[0].value[0].value, 159 ) 160 print( 161 color(' PSM for Bluetooth HID Interrupt channel : ', 'cyan'), 162 hex(attribute.value.value[0].value[0].value[1].value), 163 ) 164 print( 165 color(' BT_HIDP_PROTOCOL_ID : ', 'cyan'), 166 attribute.value.value[0].value[1].value[0].value, 167 ) 168 169 elif attribute.id == SDP_HID_SERVICE_NAME_ATTRIBUTE_ID: 170 print(color(' Service Name: ', 'cyan'), attribute.value.value) 171 172 elif attribute.id == SDP_HID_SERVICE_DESCRIPTION_ATTRIBUTE_ID: 173 print(color(' Service Description: ', 'cyan'), attribute.value.value) 174 175 elif attribute.id == SDP_HID_PROVIDER_NAME_ATTRIBUTE_ID: 176 print(color(' Provider Name: ', 'cyan'), attribute.value.value) 177 178 elif attribute.id == SDP_HID_DEVICE_RELEASE_NUMBER_ATTRIBUTE_ID: 179 print(color(' Release Number: ', 'cyan'), hex(attribute.value.value)) 180 181 elif attribute.id == SDP_HID_PARSER_VERSION_ATTRIBUTE_ID: 182 print( 183 color(' HID Parser Version: ', 'cyan'), hex(attribute.value.value) 184 ) 185 186 elif attribute.id == SDP_HID_DEVICE_SUBCLASS_ATTRIBUTE_ID: 187 print( 188 color(' HIDDeviceSubclass: ', 'cyan'), hex(attribute.value.value) 189 ) 190 191 elif attribute.id == SDP_HID_COUNTRY_CODE_ATTRIBUTE_ID: 192 print(color(' HIDCountryCode: ', 'cyan'), hex(attribute.value.value)) 193 194 elif attribute.id == SDP_HID_VIRTUAL_CABLE_ATTRIBUTE_ID: 195 print(color(' HIDVirtualCable: ', 'cyan'), attribute.value.value) 196 197 elif attribute.id == SDP_HID_RECONNECT_INITIATE_ATTRIBUTE_ID: 198 print(color(' HIDReconnectInitiate: ', 'cyan'), attribute.value.value) 199 200 elif attribute.id == SDP_HID_DESCRIPTOR_LIST_ATTRIBUTE_ID: 201 print( 202 color(' HID Report Descriptor type: ', 'cyan'), 203 hex(attribute.value.value[0].value[0].value), 204 ) 205 print( 206 color(' HID Report DescriptorList: ', 'cyan'), 207 attribute.value.value[0].value[1].value, 208 ) 209 210 elif attribute.id == SDP_HID_LANGID_BASE_LIST_ATTRIBUTE_ID: 211 print( 212 color(' HID LANGID Base Language: ', 'cyan'), 213 hex(attribute.value.value[0].value[0].value), 214 ) 215 print( 216 color(' HID LANGID Base Bluetooth String Offset: ', 'cyan'), 217 hex(attribute.value.value[0].value[1].value), 218 ) 219 220 elif attribute.id == SDP_HID_BATTERY_POWER_ATTRIBUTE_ID: 221 print(color(' HIDBatteryPower: ', 'cyan'), attribute.value.value) 222 223 elif attribute.id == SDP_HID_REMOTE_WAKE_ATTRIBUTE_ID: 224 print(color(' HIDRemoteWake: ', 'cyan'), attribute.value.value) 225 226 elif attribute.id == SDP_HID_PROFILE_VERSION_ATTRIBUTE_ID: 227 print( 228 color(' HIDProfileVersion : ', 'cyan'), hex(attribute.value.value) 229 ) 230 231 elif attribute.id == SDP_HID_SUPERVISION_TIMEOUT_ATTRIBUTE_ID: 232 print( 233 color(' HIDSupervisionTimeout: ', 'cyan'), 234 hex(attribute.value.value), 235 ) 236 237 elif attribute.id == SDP_HID_NORMALLY_CONNECTABLE_ATTRIBUTE_ID: 238 print( 239 color(' HIDNormallyConnectable: ', 'cyan'), attribute.value.value 240 ) 241 242 elif attribute.id == SDP_HID_BOOT_DEVICE_ATTRIBUTE_ID: 243 print(color(' HIDBootDevice: ', 'cyan'), attribute.value.value) 244 245 elif attribute.id == SDP_HID_SSR_HOST_MAX_LATENCY_ATTRIBUTE_ID: 246 print( 247 color(' HIDSSRHostMaxLatency: ', 'cyan'), 248 hex(attribute.value.value), 249 ) 250 251 elif attribute.id == SDP_HID_SSR_HOST_MIN_TIMEOUT_ATTRIBUTE_ID: 252 print( 253 color(' HIDSSRHostMinTimeout: ', 'cyan'), 254 hex(attribute.value.value), 255 ) 256 257 else: 258 print( 259 color( 260 f' Warning: Attribute ID: {attribute.id} match not found.\n Attribute Info: {attribute}', 261 'yellow', 262 ) 263 ) 264 265 await sdp_client.disconnect() 266 267 268# ----------------------------------------------------------------------------- 269async def get_stream_reader(pipe) -> asyncio.StreamReader: 270 loop = asyncio.get_event_loop() 271 reader = asyncio.StreamReader(loop=loop) 272 protocol = asyncio.StreamReaderProtocol(reader) 273 await loop.connect_read_pipe(lambda: protocol, pipe) 274 return reader 275 276 277# ----------------------------------------------------------------------------- 278async def main() -> None: 279 if len(sys.argv) < 4: 280 print( 281 'Usage: run_hid_host.py <device-config> <transport-spec> ' 282 '<bluetooth-address> [test-mode]' 283 ) 284 285 print('example: run_hid_host.py classic1.json usb:0 E1:CA:72:48:C4:E8/P') 286 return 287 288 def on_hid_control_data_cb(pdu: bytes): 289 print(f'Received Control Data, PDU: {pdu.hex()}') 290 291 def on_hid_interrupt_data_cb(pdu: bytes): 292 report_type = pdu[0] & 0x0F 293 if len(pdu) == 1: 294 print(color(f'Warning: No report received', 'yellow')) 295 return 296 report_length = len(pdu[1:]) 297 report_id = pdu[1] 298 299 if report_type != Message.ReportType.OTHER_REPORT: 300 print( 301 color( 302 f' Report type = {report_type}, Report length = {report_length}, Report id = {report_id}', 303 'blue', 304 None, 305 'bold', 306 ) 307 ) 308 309 if (report_length <= 1) or (report_id == 0): 310 return 311 # Parse report over interrupt channel 312 if report_type == Message.ReportType.INPUT_REPORT: 313 ReportParser.parse_input_report(pdu[1:]) # type: ignore 314 315 async def handle_virtual_cable_unplug(): 316 await hid_host.disconnect_interrupt_channel() 317 await hid_host.disconnect_control_channel() 318 await device.keystore.delete(target_address) # type: ignore 319 connection = hid_host.connection 320 if connection is not None: 321 await connection.disconnect() 322 323 def on_hid_virtual_cable_unplug_cb(): 324 asyncio.create_task(handle_virtual_cable_unplug()) 325 326 print('<<< connecting to HCI...') 327 async with await open_transport_or_link(sys.argv[2]) as hci_transport: 328 print('<<< CONNECTED') 329 330 # Create a device 331 device = Device.from_config_file_with_hci( 332 sys.argv[1], hci_transport.source, hci_transport.sink 333 ) 334 device.classic_enabled = True 335 336 # Create HID host and start it 337 print('@@@ Starting HID Host...') 338 hid_host = Host(device) 339 340 # Register for HID data call back 341 hid_host.on('interrupt_data', on_hid_interrupt_data_cb) 342 hid_host.on('control_data', on_hid_control_data_cb) 343 344 # Register for virtual cable unplug call back 345 hid_host.on('virtual_cable_unplug', on_hid_virtual_cable_unplug_cb) 346 347 await device.power_on() 348 349 # Connect to a peer 350 target_address = sys.argv[3] 351 print(f'=== Connecting to {target_address}...') 352 connection = await device.connect(target_address, transport=BT_BR_EDR_TRANSPORT) 353 print(f'=== Connected to {connection.peer_address}!') 354 355 # Request authentication 356 print('*** Authenticating...') 357 await connection.authenticate() 358 print('*** Authenticated...') 359 360 # Enable encryption 361 print('*** Enabling encryption...') 362 await connection.encrypt() 363 print('*** Encryption on') 364 365 await get_hid_device_sdp_record(connection) 366 367 async def menu(): 368 reader = await get_stream_reader(sys.stdin) 369 while True: 370 print( 371 "\n************************ HID Host Menu *****************************\n" 372 ) 373 print(" 1. Connect Control Channel") 374 print(" 2. Connect Interrupt Channel") 375 print(" 3. Disconnect Control Channel") 376 print(" 4. Disconnect Interrupt Channel") 377 print(" 5. Get Report") 378 print(" 6. Set Report") 379 print(" 7. Set Protocol Mode") 380 print(" 8. Get Protocol Mode") 381 print(" 9. Send Report on Interrupt Channel") 382 print("10. Suspend") 383 print("11. Exit Suspend") 384 print("12. Virtual Cable Unplug") 385 print("13. Disconnect device") 386 print("14. Delete Bonding") 387 print("15. Re-connect to device") 388 print("16. Exit") 389 print("\nEnter your choice : \n") 390 391 choice = await reader.readline() 392 choice = choice.decode('utf-8').strip() 393 394 if choice == '1': 395 await hid_host.connect_control_channel() 396 397 elif choice == '2': 398 await hid_host.connect_interrupt_channel() 399 400 elif choice == '3': 401 await hid_host.disconnect_control_channel() 402 403 elif choice == '4': 404 await hid_host.disconnect_interrupt_channel() 405 406 elif choice == '5': 407 print(" 1. Input Report with ID 0x01") 408 print(" 2. Input Report with ID 0x02") 409 print(" 3. Input Report with ID 0x0F - Invalid ReportId") 410 print(" 4. Output Report with ID 0x02") 411 print(" 5. Feature Report with ID 0x05 - Unsupported Request") 412 print(" 6. Input Report with ID 0x02, BufferSize 3") 413 print(" 7. Output Report with ID 0x03, BufferSize 2") 414 print(" 8. Feature Report with ID 0x05, BufferSize 3") 415 choice1 = await reader.readline() 416 choice1 = choice1.decode('utf-8').strip() 417 418 if choice1 == '1': 419 hid_host.get_report(1, 1, 0) 420 421 elif choice1 == '2': 422 hid_host.get_report(1, 2, 0) 423 424 elif choice1 == '3': 425 hid_host.get_report(1, 5, 0) 426 427 elif choice1 == '4': 428 hid_host.get_report(2, 2, 0) 429 430 elif choice1 == '5': 431 hid_host.get_report(3, 15, 0) 432 433 elif choice1 == '6': 434 hid_host.get_report(1, 2, 3) 435 436 elif choice1 == '7': 437 hid_host.get_report(2, 3, 2) 438 439 elif choice1 == '8': 440 hid_host.get_report(3, 5, 3) 441 else: 442 print('Incorrect option selected') 443 444 elif choice == '6': 445 print(" 1. Report type 1 and Report id 0x01") 446 print(" 2. Report type 2 and Report id 0x03") 447 print(" 3. Report type 3 and Report id 0x05") 448 choice1 = await reader.readline() 449 choice1 = choice1.decode('utf-8').strip() 450 451 if choice1 == '1': 452 # data includes first octet as report id 453 data = bytearray( 454 [0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01] 455 ) 456 hid_host.set_report(1, data) 457 458 elif choice1 == '2': 459 data = bytearray([0x03, 0x01, 0x01]) 460 hid_host.set_report(2, data) 461 462 elif choice1 == '3': 463 data = bytearray([0x05, 0x01, 0x01, 0x01]) 464 hid_host.set_report(3, data) 465 466 else: 467 print('Incorrect option selected') 468 469 elif choice == '7': 470 print(" 0. Boot") 471 print(" 1. Report") 472 choice1 = await reader.readline() 473 choice1 = choice1.decode('utf-8').strip() 474 475 if choice1 == '0': 476 hid_host.set_protocol(Message.ProtocolMode.BOOT_PROTOCOL) 477 478 elif choice1 == '1': 479 hid_host.set_protocol(Message.ProtocolMode.REPORT_PROTOCOL) 480 481 else: 482 print('Incorrect option selected') 483 484 elif choice == '8': 485 hid_host.get_protocol() 486 487 elif choice == '9': 488 print(" 1. Report ID 0x01") 489 print(" 2. Report ID 0x03") 490 choice1 = await reader.readline() 491 choice1 = choice1.decode('utf-8').strip() 492 493 if choice1 == '1': 494 data = bytearray( 495 [0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00] 496 ) 497 hid_host.send_data(data) 498 499 elif choice1 == '2': 500 data = bytearray([0x03, 0x00, 0x0D, 0xFD, 0x00, 0x00]) 501 hid_host.send_data(data) 502 503 else: 504 print('Incorrect option selected') 505 506 elif choice == '10': 507 hid_host.suspend() 508 509 elif choice == '11': 510 hid_host.exit_suspend() 511 512 elif choice == '12': 513 hid_host.virtual_cable_unplug() 514 try: 515 await device.keystore.delete(target_address) 516 print("Unpair successful") 517 except KeyError: 518 print('Device not found or Device already unpaired.') 519 520 elif choice == '13': 521 peer_address = Address.from_string_for_transport( 522 target_address, transport=BT_BR_EDR_TRANSPORT 523 ) 524 connection = device.find_connection_by_bd_addr( 525 peer_address, transport=BT_BR_EDR_TRANSPORT 526 ) 527 if connection is not None: 528 await connection.disconnect() 529 else: 530 print("Already disconnected from device") 531 532 elif choice == '14': 533 try: 534 await device.keystore.delete(target_address) 535 print("Unpair successful") 536 except KeyError: 537 print('Device not found or Device already unpaired.') 538 539 elif choice == '15': 540 connection = await device.connect( 541 target_address, transport=BT_BR_EDR_TRANSPORT 542 ) 543 await connection.authenticate() 544 await connection.encrypt() 545 546 elif choice == '16': 547 sys.exit("Exit successful") 548 549 else: 550 print("Invalid option selected.") 551 552 if (len(sys.argv) > 4) and (sys.argv[4] == 'test-mode'): 553 # Enabling menu for testing 554 await menu() 555 else: 556 # HID Connection 557 # Control channel 558 await hid_host.connect_control_channel() 559 # Interrupt Channel 560 await hid_host.connect_interrupt_channel() 561 562 await hci_transport.source.wait_for_termination() 563 564 565# ----------------------------------------------------------------------------- 566 567logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper()) 568asyncio.run(main()) 569