1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18import asyncio
19import sys
20import os
21import logging
22from bumble.colors import color
23from bumble.device import Device
24from bumble.hci import Address
25from bumble.transport import open_transport
26from bumble.profiles.battery_service import BatteryServiceProxy
27
28
29# -----------------------------------------------------------------------------
30async def main() -> None:
31    if len(sys.argv) != 3:
32        print('Usage: battery_client.py <transport-spec> <bluetooth-address>')
33        print('example: battery_client.py usb:0 E1:CA:72:48:C4:E8')
34        return
35
36    print('<<< connecting to HCI...')
37    async with await open_transport(sys.argv[1]) as hci_transport:
38        print('<<< connected')
39
40        # Create and start a device
41        device = Device.with_hci(
42            'Bumble',
43            Address('F0:F1:F2:F3:F4:F5'),
44            hci_transport.source,
45            hci_transport.sink,
46        )
47        await device.power_on()
48
49        # Connect to the peer
50        target_address = sys.argv[2]
51        print(f'=== Connecting to {target_address}...')
52        async with device.connect_as_gatt(target_address) as peer:
53            print(f'=== Connected to {peer}')
54            battery_service = peer.create_service_proxy(BatteryServiceProxy)
55
56            # Check that the service was found
57            if not battery_service:
58                print('!!! Service not found')
59                return
60
61            # Subscribe to and read the battery level
62            if battery_service.battery_level:
63                await battery_service.battery_level.subscribe(
64                    lambda value: print(
65                        f'{color("Battery Level Update:", "green")} {value}'
66                    )
67                )
68                value = await battery_service.battery_level.read_value()
69                print(f'{color("Initial Battery Level:", "green")} {value}')
70
71            await peer.sustain()
72
73
74# -----------------------------------------------------------------------------
75logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
76asyncio.run(main())
77