1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18import asyncio
19import sys
20import os
21import logging
22
23from bumble.colors import color
24
25from bumble.device import Device
26from bumble.transport import open_transport_or_link
27from bumble.core import (
28    BT_HUMAN_INTERFACE_DEVICE_SERVICE,
29    BT_BR_EDR_TRANSPORT,
30)
31from bumble.hci import Address
32from bumble.hid import Host, Message
33from bumble.sdp import (
34    Client as SDP_Client,
35    SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
36    SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
37    SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
38    SDP_ALL_ATTRIBUTES_RANGE,
39    SDP_LANGUAGE_BASE_ATTRIBUTE_ID_LIST_ATTRIBUTE_ID,
40    SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
41    SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
42    SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID,
43)
44from hid_report_parser import ReportParser
45
46# -----------------------------------------------------------------------------
47# SDP attributes for Bluetooth HID devices
48SDP_HID_SERVICE_NAME_ATTRIBUTE_ID = 0x0100
49SDP_HID_SERVICE_DESCRIPTION_ATTRIBUTE_ID = 0x0101
50SDP_HID_PROVIDER_NAME_ATTRIBUTE_ID = 0x0102
51SDP_HID_DEVICE_RELEASE_NUMBER_ATTRIBUTE_ID = 0x0200  # [DEPRECATED]
52SDP_HID_PARSER_VERSION_ATTRIBUTE_ID = 0x0201
53SDP_HID_DEVICE_SUBCLASS_ATTRIBUTE_ID = 0x0202
54SDP_HID_COUNTRY_CODE_ATTRIBUTE_ID = 0x0203
55SDP_HID_VIRTUAL_CABLE_ATTRIBUTE_ID = 0x0204
56SDP_HID_RECONNECT_INITIATE_ATTRIBUTE_ID = 0x0205
57SDP_HID_DESCRIPTOR_LIST_ATTRIBUTE_ID = 0x0206
58SDP_HID_LANGID_BASE_LIST_ATTRIBUTE_ID = 0x0207
59SDP_HID_SDP_DISABLE_ATTRIBUTE_ID = 0x0208  # [DEPRECATED]
60SDP_HID_BATTERY_POWER_ATTRIBUTE_ID = 0x0209
61SDP_HID_REMOTE_WAKE_ATTRIBUTE_ID = 0x020A
62SDP_HID_PROFILE_VERSION_ATTRIBUTE_ID = 0x020B  # DEPRECATED]
63SDP_HID_SUPERVISION_TIMEOUT_ATTRIBUTE_ID = 0x020C
64SDP_HID_NORMALLY_CONNECTABLE_ATTRIBUTE_ID = 0x020D
65SDP_HID_BOOT_DEVICE_ATTRIBUTE_ID = 0x020E
66SDP_HID_SSR_HOST_MAX_LATENCY_ATTRIBUTE_ID = 0x020F
67SDP_HID_SSR_HOST_MIN_TIMEOUT_ATTRIBUTE_ID = 0x0210
68
69
70# -----------------------------------------------------------------------------
71
72
73async def get_hid_device_sdp_record(connection):
74
75    # Connect to the SDP Server
76    sdp_client = SDP_Client(connection)
77    await sdp_client.connect()
78    if sdp_client:
79        print(color('Connected to SDP Server', 'blue'))
80    else:
81        print(color('Failed to connect to SDP Server', 'red'))
82
83    # List BT HID Device service in the root browse group
84    service_record_handles = await sdp_client.search_services(
85        [BT_HUMAN_INTERFACE_DEVICE_SERVICE]
86    )
87
88    if len(service_record_handles) < 1:
89        await sdp_client.disconnect()
90        raise Exception(
91            color(f'BT HID Device service not found on peer device!!!!', 'red')
92        )
93
94    # For BT_HUMAN_INTERFACE_DEVICE_SERVICE service, get all its attributes
95    for service_record_handle in service_record_handles:
96        attributes = await sdp_client.get_attributes(
97            service_record_handle, [SDP_ALL_ATTRIBUTES_RANGE]
98        )
99        print(color(f'SERVICE {service_record_handle:04X} attributes:', 'yellow'))
100        print(color(f'SDP attributes for HID device', 'magenta'))
101        for attribute in attributes:
102            if attribute.id == SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID:
103                print(
104                    color('  Service Record Handle : ', 'cyan'),
105                    hex(attribute.value.value),
106                )
107
108            elif attribute.id == SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID:
109                print(
110                    color('  Service Class : ', 'cyan'), attribute.value.value[0].value
111                )
112
113            elif attribute.id == SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID:
114                print(
115                    color('  SDP Browse Group List : ', 'cyan'),
116                    attribute.value.value[0].value,
117                )
118
119            elif attribute.id == SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID:
120                print(
121                    color('  BT_L2CAP_PROTOCOL_ID : ', 'cyan'),
122                    attribute.value.value[0].value[0].value,
123                )
124                print(
125                    color('  PSM for Bluetooth HID Control channel : ', 'cyan'),
126                    hex(attribute.value.value[0].value[1].value),
127                )
128                print(
129                    color('  BT_HIDP_PROTOCOL_ID : ', 'cyan'),
130                    attribute.value.value[1].value[0].value,
131                )
132
133            elif attribute.id == SDP_LANGUAGE_BASE_ATTRIBUTE_ID_LIST_ATTRIBUTE_ID:
134                print(
135                    color('  Lanugage : ', 'cyan'), hex(attribute.value.value[0].value)
136                )
137                print(
138                    color('  Encoding : ', 'cyan'), hex(attribute.value.value[1].value)
139                )
140                print(
141                    color('  PrimaryLanguageBaseID : ', 'cyan'),
142                    hex(attribute.value.value[2].value),
143                )
144
145            elif attribute.id == SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID:
146                print(
147                    color('  BT_HUMAN_INTERFACE_DEVICE_SERVICE ', 'cyan'),
148                    attribute.value.value[0].value[0].value,
149                )
150                print(
151                    color('  HID Profileversion number : ', 'cyan'),
152                    hex(attribute.value.value[0].value[1].value),
153                )
154
155            elif attribute.id == SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID:
156                print(
157                    color('  BT_L2CAP_PROTOCOL_ID : ', 'cyan'),
158                    attribute.value.value[0].value[0].value[0].value,
159                )
160                print(
161                    color('  PSM for Bluetooth HID Interrupt channel : ', 'cyan'),
162                    hex(attribute.value.value[0].value[0].value[1].value),
163                )
164                print(
165                    color('  BT_HIDP_PROTOCOL_ID : ', 'cyan'),
166                    attribute.value.value[0].value[1].value[0].value,
167                )
168
169            elif attribute.id == SDP_HID_SERVICE_NAME_ATTRIBUTE_ID:
170                print(color('  Service Name: ', 'cyan'), attribute.value.value)
171
172            elif attribute.id == SDP_HID_SERVICE_DESCRIPTION_ATTRIBUTE_ID:
173                print(color('  Service Description: ', 'cyan'), attribute.value.value)
174
175            elif attribute.id == SDP_HID_PROVIDER_NAME_ATTRIBUTE_ID:
176                print(color('  Provider Name: ', 'cyan'), attribute.value.value)
177
178            elif attribute.id == SDP_HID_DEVICE_RELEASE_NUMBER_ATTRIBUTE_ID:
179                print(color('  Release Number: ', 'cyan'), hex(attribute.value.value))
180
181            elif attribute.id == SDP_HID_PARSER_VERSION_ATTRIBUTE_ID:
182                print(
183                    color('  HID Parser Version: ', 'cyan'), hex(attribute.value.value)
184                )
185
186            elif attribute.id == SDP_HID_DEVICE_SUBCLASS_ATTRIBUTE_ID:
187                print(
188                    color('  HIDDeviceSubclass: ', 'cyan'), hex(attribute.value.value)
189                )
190
191            elif attribute.id == SDP_HID_COUNTRY_CODE_ATTRIBUTE_ID:
192                print(color('  HIDCountryCode: ', 'cyan'), hex(attribute.value.value))
193
194            elif attribute.id == SDP_HID_VIRTUAL_CABLE_ATTRIBUTE_ID:
195                print(color('  HIDVirtualCable: ', 'cyan'), attribute.value.value)
196
197            elif attribute.id == SDP_HID_RECONNECT_INITIATE_ATTRIBUTE_ID:
198                print(color('  HIDReconnectInitiate: ', 'cyan'), attribute.value.value)
199
200            elif attribute.id == SDP_HID_DESCRIPTOR_LIST_ATTRIBUTE_ID:
201                print(
202                    color('  HID Report Descriptor type: ', 'cyan'),
203                    hex(attribute.value.value[0].value[0].value),
204                )
205                print(
206                    color('  HID Report DescriptorList: ', 'cyan'),
207                    attribute.value.value[0].value[1].value,
208                )
209
210            elif attribute.id == SDP_HID_LANGID_BASE_LIST_ATTRIBUTE_ID:
211                print(
212                    color('  HID LANGID Base Language: ', 'cyan'),
213                    hex(attribute.value.value[0].value[0].value),
214                )
215                print(
216                    color('  HID LANGID Base Bluetooth String Offset: ', 'cyan'),
217                    hex(attribute.value.value[0].value[1].value),
218                )
219
220            elif attribute.id == SDP_HID_BATTERY_POWER_ATTRIBUTE_ID:
221                print(color('  HIDBatteryPower: ', 'cyan'), attribute.value.value)
222
223            elif attribute.id == SDP_HID_REMOTE_WAKE_ATTRIBUTE_ID:
224                print(color('  HIDRemoteWake: ', 'cyan'), attribute.value.value)
225
226            elif attribute.id == SDP_HID_PROFILE_VERSION_ATTRIBUTE_ID:
227                print(
228                    color('  HIDProfileVersion : ', 'cyan'), hex(attribute.value.value)
229                )
230
231            elif attribute.id == SDP_HID_SUPERVISION_TIMEOUT_ATTRIBUTE_ID:
232                print(
233                    color('  HIDSupervisionTimeout: ', 'cyan'),
234                    hex(attribute.value.value),
235                )
236
237            elif attribute.id == SDP_HID_NORMALLY_CONNECTABLE_ATTRIBUTE_ID:
238                print(
239                    color('  HIDNormallyConnectable: ', 'cyan'), attribute.value.value
240                )
241
242            elif attribute.id == SDP_HID_BOOT_DEVICE_ATTRIBUTE_ID:
243                print(color('  HIDBootDevice: ', 'cyan'), attribute.value.value)
244
245            elif attribute.id == SDP_HID_SSR_HOST_MAX_LATENCY_ATTRIBUTE_ID:
246                print(
247                    color('  HIDSSRHostMaxLatency: ', 'cyan'),
248                    hex(attribute.value.value),
249                )
250
251            elif attribute.id == SDP_HID_SSR_HOST_MIN_TIMEOUT_ATTRIBUTE_ID:
252                print(
253                    color('  HIDSSRHostMinTimeout: ', 'cyan'),
254                    hex(attribute.value.value),
255                )
256
257            else:
258                print(
259                    color(
260                        f'  Warning: Attribute ID: {attribute.id} match not found.\n  Attribute Info: {attribute}',
261                        'yellow',
262                    )
263                )
264
265    await sdp_client.disconnect()
266
267
268# -----------------------------------------------------------------------------
269async def get_stream_reader(pipe) -> asyncio.StreamReader:
270    loop = asyncio.get_event_loop()
271    reader = asyncio.StreamReader(loop=loop)
272    protocol = asyncio.StreamReaderProtocol(reader)
273    await loop.connect_read_pipe(lambda: protocol, pipe)
274    return reader
275
276
277# -----------------------------------------------------------------------------
278async def main() -> None:
279    if len(sys.argv) < 4:
280        print(
281            'Usage: run_hid_host.py <device-config> <transport-spec> '
282            '<bluetooth-address> [test-mode]'
283        )
284
285        print('example: run_hid_host.py classic1.json usb:0 E1:CA:72:48:C4:E8/P')
286        return
287
288    def on_hid_control_data_cb(pdu: bytes):
289        print(f'Received Control Data, PDU: {pdu.hex()}')
290
291    def on_hid_interrupt_data_cb(pdu: bytes):
292        report_type = pdu[0] & 0x0F
293        if len(pdu) == 1:
294            print(color(f'Warning: No report received', 'yellow'))
295            return
296        report_length = len(pdu[1:])
297        report_id = pdu[1]
298
299        if report_type != Message.ReportType.OTHER_REPORT:
300            print(
301                color(
302                    f' Report type = {report_type}, Report length = {report_length}, Report id = {report_id}',
303                    'blue',
304                    None,
305                    'bold',
306                )
307            )
308
309        if (report_length <= 1) or (report_id == 0):
310            return
311        # Parse report over interrupt channel
312        if report_type == Message.ReportType.INPUT_REPORT:
313            ReportParser.parse_input_report(pdu[1:])  # type: ignore
314
315    async def handle_virtual_cable_unplug():
316        await hid_host.disconnect_interrupt_channel()
317        await hid_host.disconnect_control_channel()
318        await device.keystore.delete(target_address)  # type: ignore
319        connection = hid_host.connection
320        if connection is not None:
321            await connection.disconnect()
322
323    def on_hid_virtual_cable_unplug_cb():
324        asyncio.create_task(handle_virtual_cable_unplug())
325
326    print('<<< connecting to HCI...')
327    async with await open_transport_or_link(sys.argv[2]) as hci_transport:
328        print('<<< CONNECTED')
329
330        # Create a device
331        device = Device.from_config_file_with_hci(
332            sys.argv[1], hci_transport.source, hci_transport.sink
333        )
334        device.classic_enabled = True
335
336        # Create HID host and start it
337        print('@@@ Starting HID Host...')
338        hid_host = Host(device)
339
340        # Register for HID data call back
341        hid_host.on('interrupt_data', on_hid_interrupt_data_cb)
342        hid_host.on('control_data', on_hid_control_data_cb)
343
344        # Register for virtual cable unplug call back
345        hid_host.on('virtual_cable_unplug', on_hid_virtual_cable_unplug_cb)
346
347        await device.power_on()
348
349        # Connect to a peer
350        target_address = sys.argv[3]
351        print(f'=== Connecting to {target_address}...')
352        connection = await device.connect(target_address, transport=BT_BR_EDR_TRANSPORT)
353        print(f'=== Connected to {connection.peer_address}!')
354
355        # Request authentication
356        print('*** Authenticating...')
357        await connection.authenticate()
358        print('*** Authenticated...')
359
360        # Enable encryption
361        print('*** Enabling encryption...')
362        await connection.encrypt()
363        print('*** Encryption on')
364
365        await get_hid_device_sdp_record(connection)
366
367        async def menu():
368            reader = await get_stream_reader(sys.stdin)
369            while True:
370                print(
371                    "\n************************ HID Host Menu *****************************\n"
372                )
373                print(" 1. Connect Control Channel")
374                print(" 2. Connect Interrupt Channel")
375                print(" 3. Disconnect Control Channel")
376                print(" 4. Disconnect Interrupt Channel")
377                print(" 5. Get Report")
378                print(" 6. Set Report")
379                print(" 7. Set Protocol Mode")
380                print(" 8. Get Protocol Mode")
381                print(" 9. Send Report on Interrupt Channel")
382                print("10. Suspend")
383                print("11. Exit Suspend")
384                print("12. Virtual Cable Unplug")
385                print("13. Disconnect device")
386                print("14. Delete Bonding")
387                print("15. Re-connect to device")
388                print("16. Exit")
389                print("\nEnter your choice : \n")
390
391                choice = await reader.readline()
392                choice = choice.decode('utf-8').strip()
393
394                if choice == '1':
395                    await hid_host.connect_control_channel()
396
397                elif choice == '2':
398                    await hid_host.connect_interrupt_channel()
399
400                elif choice == '3':
401                    await hid_host.disconnect_control_channel()
402
403                elif choice == '4':
404                    await hid_host.disconnect_interrupt_channel()
405
406                elif choice == '5':
407                    print(" 1. Input Report with ID 0x01")
408                    print(" 2. Input Report with ID 0x02")
409                    print(" 3. Input Report with ID 0x0F - Invalid ReportId")
410                    print(" 4. Output Report with ID 0x02")
411                    print(" 5. Feature Report with ID 0x05 - Unsupported Request")
412                    print(" 6. Input Report with ID 0x02, BufferSize 3")
413                    print(" 7. Output Report with ID 0x03, BufferSize 2")
414                    print(" 8. Feature Report with ID 0x05,  BufferSize 3")
415                    choice1 = await reader.readline()
416                    choice1 = choice1.decode('utf-8').strip()
417
418                    if choice1 == '1':
419                        hid_host.get_report(1, 1, 0)
420
421                    elif choice1 == '2':
422                        hid_host.get_report(1, 2, 0)
423
424                    elif choice1 == '3':
425                        hid_host.get_report(1, 5, 0)
426
427                    elif choice1 == '4':
428                        hid_host.get_report(2, 2, 0)
429
430                    elif choice1 == '5':
431                        hid_host.get_report(3, 15, 0)
432
433                    elif choice1 == '6':
434                        hid_host.get_report(1, 2, 3)
435
436                    elif choice1 == '7':
437                        hid_host.get_report(2, 3, 2)
438
439                    elif choice1 == '8':
440                        hid_host.get_report(3, 5, 3)
441                    else:
442                        print('Incorrect option selected')
443
444                elif choice == '6':
445                    print(" 1. Report type 1 and Report id 0x01")
446                    print(" 2. Report type 2 and Report id 0x03")
447                    print(" 3. Report type 3 and Report id 0x05")
448                    choice1 = await reader.readline()
449                    choice1 = choice1.decode('utf-8').strip()
450
451                    if choice1 == '1':
452                        # data includes first octet as report id
453                        data = bytearray(
454                            [0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01]
455                        )
456                        hid_host.set_report(1, data)
457
458                    elif choice1 == '2':
459                        data = bytearray([0x03, 0x01, 0x01])
460                        hid_host.set_report(2, data)
461
462                    elif choice1 == '3':
463                        data = bytearray([0x05, 0x01, 0x01, 0x01])
464                        hid_host.set_report(3, data)
465
466                    else:
467                        print('Incorrect option selected')
468
469                elif choice == '7':
470                    print(" 0. Boot")
471                    print(" 1. Report")
472                    choice1 = await reader.readline()
473                    choice1 = choice1.decode('utf-8').strip()
474
475                    if choice1 == '0':
476                        hid_host.set_protocol(Message.ProtocolMode.BOOT_PROTOCOL)
477
478                    elif choice1 == '1':
479                        hid_host.set_protocol(Message.ProtocolMode.REPORT_PROTOCOL)
480
481                    else:
482                        print('Incorrect option selected')
483
484                elif choice == '8':
485                    hid_host.get_protocol()
486
487                elif choice == '9':
488                    print(" 1. Report ID 0x01")
489                    print(" 2. Report ID 0x03")
490                    choice1 = await reader.readline()
491                    choice1 = choice1.decode('utf-8').strip()
492
493                    if choice1 == '1':
494                        data = bytearray(
495                            [0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]
496                        )
497                        hid_host.send_data(data)
498
499                    elif choice1 == '2':
500                        data = bytearray([0x03, 0x00, 0x0D, 0xFD, 0x00, 0x00])
501                        hid_host.send_data(data)
502
503                    else:
504                        print('Incorrect option selected')
505
506                elif choice == '10':
507                    hid_host.suspend()
508
509                elif choice == '11':
510                    hid_host.exit_suspend()
511
512                elif choice == '12':
513                    hid_host.virtual_cable_unplug()
514                    try:
515                        await device.keystore.delete(target_address)
516                        print("Unpair successful")
517                    except KeyError:
518                        print('Device not found or Device already unpaired.')
519
520                elif choice == '13':
521                    peer_address = Address.from_string_for_transport(
522                        target_address, transport=BT_BR_EDR_TRANSPORT
523                    )
524                    connection = device.find_connection_by_bd_addr(
525                        peer_address, transport=BT_BR_EDR_TRANSPORT
526                    )
527                    if connection is not None:
528                        await connection.disconnect()
529                    else:
530                        print("Already disconnected from device")
531
532                elif choice == '14':
533                    try:
534                        await device.keystore.delete(target_address)
535                        print("Unpair successful")
536                    except KeyError:
537                        print('Device not found or Device already unpaired.')
538
539                elif choice == '15':
540                    connection = await device.connect(
541                        target_address, transport=BT_BR_EDR_TRANSPORT
542                    )
543                    await connection.authenticate()
544                    await connection.encrypt()
545
546                elif choice == '16':
547                    sys.exit("Exit successful")
548
549                else:
550                    print("Invalid option selected.")
551
552        if (len(sys.argv) > 4) and (sys.argv[4] == 'test-mode'):
553            # Enabling menu for testing
554            await menu()
555        else:
556            # HID Connection
557            # Control channel
558            await hid_host.connect_control_channel()
559            # Interrupt Channel
560            await hid_host.connect_interrupt_channel()
561
562        await hci_transport.source.wait_for_termination()
563
564
565# -----------------------------------------------------------------------------
566
567logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
568asyncio.run(main())
569