1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18import asyncio
19import sys
20import os
21import logging
22from bumble.colors import color
23from bumble.device import Device
24from bumble.hci import Address
25from bumble.transport import open_transport
26from bumble.profiles.heart_rate_service import HeartRateServiceProxy
27
28
29# -----------------------------------------------------------------------------
30async def main() -> None:
31    if len(sys.argv) != 3:
32        print('Usage: heart_rate_client.py <transport-spec> <bluetooth-address>')
33        print('example: heart_rate_client.py usb:0 E1:CA:72:48:C4:E8')
34        return
35
36    print('<<< connecting to HCI...')
37    async with await open_transport(sys.argv[1]) as hci_transport:
38        print('<<< connected')
39
40        # Create and start a device
41        device = Device.with_hci(
42            'Bumble',
43            Address('F0:F1:F2:F3:F4:F5'),
44            hci_transport.source,
45            hci_transport.sink,
46        )
47        await device.power_on()
48
49        # Connect to the peer
50        target_address = sys.argv[2]
51        print(f'=== Connecting to {target_address}...')
52        async with device.connect_as_gatt(target_address) as peer:
53            print(f'=== Connected to {peer}')
54
55            heart_rate_service = peer.create_service_proxy(HeartRateServiceProxy)
56
57            # Check that the service was found
58            if not heart_rate_service:
59                print('!!! Service not found')
60                return
61
62            # Read the body sensor location
63            if heart_rate_service.body_sensor_location:
64                location = await heart_rate_service.body_sensor_location.read_value()
65                print(color('Sensor Location:', 'green'), location)
66
67            # Subscribe to the heart rate measurement
68            if heart_rate_service.heart_rate_measurement:
69                await heart_rate_service.heart_rate_measurement.subscribe(
70                    lambda value: print(
71                        f'{color("Heart Rate Measurement:", "green")} {value}'
72                    )
73                )
74
75            await peer.sustain()
76
77
78# -----------------------------------------------------------------------------
79logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
80asyncio.run(main())
81