1# Copyright 2021-2024 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18import asyncio 19import logging 20import sys 21import os 22import websockets 23import json 24 25from bumble.core import AdvertisingData 26from bumble.device import ( 27 Device, 28 AdvertisingParameters, 29 AdvertisingEventProperties, 30 Connection, 31 Peer, 32) 33from bumble.hci import ( 34 CodecID, 35 CodingFormat, 36 OwnAddressType, 37) 38from bumble.profiles.ascs import AudioStreamControlService 39from bumble.profiles.bap import ( 40 CodecSpecificCapabilities, 41 ContextType, 42 AudioLocation, 43 SupportedSamplingFrequency, 44 SupportedFrameDuration, 45 UnicastServerAdvertisingData, 46) 47from bumble.profiles.mcp import ( 48 MediaControlServiceProxy, 49 GenericMediaControlServiceProxy, 50 MediaState, 51 MediaControlPointOpcode, 52) 53from bumble.profiles.pacs import PacRecord, PublishedAudioCapabilitiesService 54from bumble.transport import open_transport_or_link 55 56from typing import Optional 57 58 59# ----------------------------------------------------------------------------- 60async def main() -> None: 61 if len(sys.argv) < 3: 62 print('Usage: run_mcp_client.py <config-file>' '<transport-spec-for-device>') 63 return 64 65 print('<<< connecting to HCI...') 66 async with await open_transport_or_link(sys.argv[2]) as hci_transport: 67 print('<<< connected') 68 69 device = Device.from_config_file_with_hci( 70 sys.argv[1], hci_transport.source, hci_transport.sink 71 ) 72 73 await device.power_on() 74 75 # Add "placeholder" services to enable Android LEA features. 76 device.add_service( 77 PublishedAudioCapabilitiesService( 78 supported_source_context=ContextType.PROHIBITED, 79 available_source_context=ContextType.PROHIBITED, 80 supported_sink_context=ContextType.MEDIA, 81 available_sink_context=ContextType.MEDIA, 82 sink_audio_locations=( 83 AudioLocation.FRONT_LEFT | AudioLocation.FRONT_RIGHT 84 ), 85 sink_pac=[ 86 PacRecord( 87 coding_format=CodingFormat(CodecID.LC3), 88 codec_specific_capabilities=CodecSpecificCapabilities( 89 supported_sampling_frequencies=( 90 SupportedSamplingFrequency.FREQ_16000 91 | SupportedSamplingFrequency.FREQ_32000 92 | SupportedSamplingFrequency.FREQ_48000 93 ), 94 supported_frame_durations=( 95 SupportedFrameDuration.DURATION_10000_US_SUPPORTED 96 ), 97 supported_audio_channel_count=[1, 2], 98 min_octets_per_codec_frame=0, 99 max_octets_per_codec_frame=320, 100 supported_max_codec_frames_per_sdu=2, 101 ), 102 ), 103 ], 104 ) 105 ) 106 device.add_service(AudioStreamControlService(device, sink_ase_id=[1])) 107 108 ws: Optional[websockets.WebSocketServerProtocol] = None 109 mcp: Optional[MediaControlServiceProxy] = None 110 111 advertising_data = bytes( 112 AdvertisingData( 113 [ 114 ( 115 AdvertisingData.COMPLETE_LOCAL_NAME, 116 bytes('Bumble LE Audio', 'utf-8'), 117 ), 118 ( 119 AdvertisingData.FLAGS, 120 bytes([AdvertisingData.LE_GENERAL_DISCOVERABLE_MODE_FLAG]), 121 ), 122 ( 123 AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS, 124 bytes(PublishedAudioCapabilitiesService.UUID), 125 ), 126 ] 127 ) 128 ) + bytes(UnicastServerAdvertisingData()) 129 130 await device.create_advertising_set( 131 advertising_parameters=AdvertisingParameters( 132 advertising_event_properties=AdvertisingEventProperties(), 133 own_address_type=OwnAddressType.RANDOM, 134 primary_advertising_interval_max=100, 135 primary_advertising_interval_min=100, 136 ), 137 advertising_data=advertising_data, 138 auto_restart=True, 139 ) 140 141 def on_media_state(media_state: MediaState) -> None: 142 if ws: 143 asyncio.create_task( 144 ws.send(json.dumps({'media_state': media_state.name})) 145 ) 146 147 def on_track_title(title: str) -> None: 148 if ws: 149 asyncio.create_task(ws.send(json.dumps({'title': title}))) 150 151 def on_track_duration(duration: int) -> None: 152 if ws: 153 asyncio.create_task(ws.send(json.dumps({'duration': duration}))) 154 155 def on_track_position(position: int) -> None: 156 if ws: 157 asyncio.create_task(ws.send(json.dumps({'position': position}))) 158 159 def on_connection(connection: Connection) -> None: 160 async def on_connection_async(): 161 async with Peer(connection) as peer: 162 nonlocal mcp 163 mcp = peer.create_service_proxy(MediaControlServiceProxy) 164 if not mcp: 165 mcp = peer.create_service_proxy(GenericMediaControlServiceProxy) 166 mcp.on('media_state', on_media_state) 167 mcp.on('track_title', on_track_title) 168 mcp.on('track_duration', on_track_duration) 169 mcp.on('track_position', on_track_position) 170 await mcp.subscribe_characteristics() 171 172 connection.abort_on('disconnection', on_connection_async()) 173 174 device.on('connection', on_connection) 175 176 async def serve(websocket: websockets.WebSocketServerProtocol, _path): 177 nonlocal ws 178 ws = websocket 179 async for message in websocket: 180 request = json.loads(message) 181 if mcp: 182 await mcp.write_control_point( 183 MediaControlPointOpcode(request['opcode']) 184 ) 185 ws = None 186 187 await websockets.serve(serve, 'localhost', 8989) 188 189 await hci_transport.source.terminated 190 191 192# ----------------------------------------------------------------------------- 193logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper()) 194asyncio.run(main()) 195