1# Copyright 2021-2023 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18import asyncio
19import datetime
20import functools
21import logging
22import sys
23import os
24import io
25import struct
26import secrets
27
28from typing import Dict
29
30from bumble.core import AdvertisingData
31from bumble.device import Device
32from bumble.hci import (
33    CodecID,
34    CodingFormat,
35    HCI_IsoDataPacket,
36)
37from bumble.profiles.ascs import AseStateMachine, AudioStreamControlService
38from bumble.profiles.bap import (
39    UnicastServerAdvertisingData,
40    CodecSpecificConfiguration,
41    CodecSpecificCapabilities,
42    ContextType,
43    AudioLocation,
44    SupportedSamplingFrequency,
45    SupportedFrameDuration,
46)
47from bumble.profiles.cap import CommonAudioServiceService
48from bumble.profiles.csip import CoordinatedSetIdentificationService, SirkType
49from bumble.profiles.pacs import PacRecord, PublishedAudioCapabilitiesService
50from bumble.transport import open_transport_or_link
51
52
53def _sink_pac_record() -> PacRecord:
54    return PacRecord(
55        coding_format=CodingFormat(CodecID.LC3),
56        codec_specific_capabilities=CodecSpecificCapabilities(
57            supported_sampling_frequencies=(
58                SupportedSamplingFrequency.FREQ_8000
59                | SupportedSamplingFrequency.FREQ_16000
60                | SupportedSamplingFrequency.FREQ_24000
61                | SupportedSamplingFrequency.FREQ_32000
62                | SupportedSamplingFrequency.FREQ_48000
63            ),
64            supported_frame_durations=(
65                SupportedFrameDuration.DURATION_7500_US_SUPPORTED
66                | SupportedFrameDuration.DURATION_10000_US_SUPPORTED
67            ),
68            supported_audio_channel_count=[1, 2],
69            min_octets_per_codec_frame=26,
70            max_octets_per_codec_frame=240,
71            supported_max_codec_frames_per_sdu=2,
72        ),
73    )
74
75
76file_outputs: Dict[AseStateMachine, io.BufferedWriter] = {}
77
78
79# -----------------------------------------------------------------------------
80async def main() -> None:
81    if len(sys.argv) < 3:
82        print('Usage: run_cig_setup.py <config-file>' '<transport-spec-for-device>')
83        return
84
85    print('<<< connecting to HCI...')
86    async with await open_transport_or_link(sys.argv[2]) as hci_transport:
87        print('<<< connected')
88
89        device = Device.from_config_file_with_hci(
90            sys.argv[1], hci_transport.source, hci_transport.sink
91        )
92        device.cis_enabled = True
93
94        await device.power_on()
95
96        csis = CoordinatedSetIdentificationService(
97            set_identity_resolving_key=secrets.token_bytes(16),
98            set_identity_resolving_key_type=SirkType.PLAINTEXT,
99        )
100        device.add_service(CommonAudioServiceService(csis))
101        device.add_service(
102            PublishedAudioCapabilitiesService(
103                supported_source_context=ContextType.PROHIBITED,
104                available_source_context=ContextType.PROHIBITED,
105                supported_sink_context=ContextType(0xFF),  # All context types
106                available_sink_context=ContextType(0xFF),  # All context types
107                sink_audio_locations=(
108                    AudioLocation.FRONT_LEFT | AudioLocation.FRONT_RIGHT
109                ),
110                sink_pac=[_sink_pac_record()],
111            )
112        )
113
114        ascs = AudioStreamControlService(device, sink_ase_id=[1], source_ase_id=[2])
115        device.add_service(ascs)
116
117        advertising_data = (
118            bytes(
119                AdvertisingData(
120                    [
121                        (
122                            AdvertisingData.COMPLETE_LOCAL_NAME,
123                            bytes('Bumble LE Audio', 'utf-8'),
124                        ),
125                        (
126                            AdvertisingData.FLAGS,
127                            bytes(
128                                [
129                                    AdvertisingData.LE_GENERAL_DISCOVERABLE_MODE_FLAG
130                                    | AdvertisingData.BR_EDR_HOST_FLAG
131                                    | AdvertisingData.BR_EDR_CONTROLLER_FLAG
132                                ]
133                            ),
134                        ),
135                        (
136                            AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
137                            bytes(PublishedAudioCapabilitiesService.UUID),
138                        ),
139                    ]
140                )
141            )
142            + csis.get_advertising_data()
143            + bytes(UnicastServerAdvertisingData())
144        )
145
146        def on_pdu(ase: AseStateMachine, pdu: HCI_IsoDataPacket):
147            # LC3 format: |frame_length(2)| + |frame(length)|.
148            sdu = b''
149            if pdu.iso_sdu_length:
150                sdu = struct.pack('<H', pdu.iso_sdu_length)
151            sdu += pdu.iso_sdu_fragment
152            file_outputs[ase].write(sdu)
153
154        def on_ase_state_change(
155            state: AseStateMachine.State,
156            ase: AseStateMachine,
157        ) -> None:
158            if state != AseStateMachine.State.STREAMING:
159                if file_output := file_outputs.pop(ase):
160                    file_output.close()
161            else:
162                file_output = open(f'{datetime.datetime.now().isoformat()}.lc3', 'wb')
163                codec_configuration = ase.codec_specific_configuration
164                assert isinstance(codec_configuration, CodecSpecificConfiguration)
165                # Write a LC3 header.
166                file_output.write(
167                    bytes([0x1C, 0xCC])  # Header.
168                    + struct.pack(
169                        '<HHHHHHI',
170                        18,  # Header length.
171                        codec_configuration.sampling_frequency.hz
172                        // 100,  # Sampling Rate(/100Hz).
173                        0,  # Bitrate(unused).
174                        bin(codec_configuration.audio_channel_allocation).count(
175                            '1'
176                        ),  # Channels.
177                        codec_configuration.frame_duration.us
178                        // 10,  # Frame duration(/10us).
179                        0,  # RFU.
180                        0x0FFFFFFF,  # Frame counts.
181                    )
182                )
183                file_outputs[ase] = file_output
184                assert ase.cis_link
185                ase.cis_link.sink = functools.partial(on_pdu, ase)
186
187        for ase in ascs.ase_state_machines.values():
188            ase.on(
189                'state_change',
190                functools.partial(on_ase_state_change, ase=ase),
191            )
192
193        await device.create_advertising_set(
194            advertising_data=advertising_data,
195            auto_restart=True,
196        )
197
198        await hci_transport.source.terminated
199
200
201# -----------------------------------------------------------------------------
202logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
203asyncio.run(main())
204