1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18import asyncio
19import dataclasses
20import logging
21import sys
22import os
23from bumble.core import BT_BR_EDR_TRANSPORT
24from bumble.device import Device, ScoLink
25from bumble.hci import HCI_Enhanced_Setup_Synchronous_Connection_Command
26from bumble.hfp import DefaultCodecParameters, ESCO_PARAMETERS
27
28from bumble.transport import open_transport_or_link
29
30
31# -----------------------------------------------------------------------------
32async def main() -> None:
33    if len(sys.argv) < 3:
34        print(
35            'Usage: run_esco_connection.py <config-file>'
36            '<transport-spec-for-device-1> <transport-spec-for-device-2>'
37        )
38        print(
39            'example: run_esco_connection.py classic1.json'
40            'tcp-client:127.0.0.1:6402 tcp-client:127.0.0.1:6402'
41        )
42        return
43
44    print('<<< connecting to HCI...')
45    hci_transports = await asyncio.gather(
46        open_transport_or_link(sys.argv[2]), open_transport_or_link(sys.argv[3])
47    )
48    print('<<< connected')
49
50    devices = [
51        Device.from_config_file_with_hci(
52            sys.argv[1], hci_transport.source, hci_transport.sink
53        )
54        for hci_transport in hci_transports
55    ]
56
57    devices[0].classic_enabled = True
58    devices[1].classic_enabled = True
59
60    await asyncio.gather(*[device.power_on() for device in devices])
61
62    connections = await asyncio.gather(
63        devices[0].accept(devices[1].public_address),
64        devices[1].connect(devices[0].public_address, transport=BT_BR_EDR_TRANSPORT),
65    )
66
67    def on_sco(sco_link: ScoLink):
68        connections[0].abort_on('disconnection', sco_link.disconnect())
69
70    devices[0].once('sco_connection', on_sco)
71
72    await devices[0].send_command(
73        HCI_Enhanced_Setup_Synchronous_Connection_Command(
74            connection_handle=connections[0].handle,
75            **ESCO_PARAMETERS[DefaultCodecParameters.ESCO_CVSD_S3].asdict(),
76        )
77    )
78
79    await asyncio.gather(
80        *[hci_transport.source.terminated for hci_transport in hci_transports]
81    )
82
83
84# -----------------------------------------------------------------------------
85logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
86asyncio.run(main())
87