1# Copyright 2023 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import asyncio
16import avatar
17import itertools
18import logging
19
20from avatar import BumblePandoraDevice
21from avatar import PandoraDevice
22from avatar import PandoraDevices
23from avatar import pandora
24from bumble.hci import HCI_CENTRAL_ROLE
25from bumble.hci import HCI_PERIPHERAL_ROLE
26from bumble.hci import HCI_Write_Default_Link_Policy_Settings_Command
27from bumble.keys import PairingKeys
28from bumble.pairing import PairingConfig
29from bumble.pairing import PairingDelegate
30from mobly import base_test
31from mobly import signals
32from mobly import test_runner
33from mobly.asserts import assert_equal  # type: ignore
34from mobly.asserts import assert_in  # type: ignore
35from mobly.asserts import assert_is_not_none  # type: ignore
36from mobly.asserts import fail  # type: ignore
37from pandora.host_pb2 import RANDOM
38from pandora.host_pb2 import RESOLVABLE_OR_PUBLIC
39from pandora.host_pb2 import Connection as PandoraConnection
40from pandora.host_pb2 import DataTypes
41from pandora.security_pb2 import LE_LEVEL2
42from pandora.security_pb2 import LEVEL2
43from pandora.security_pb2 import PairingEventAnswer
44from pandora.security_pb2 import SecureResponse
45from pandora.security_pb2 import WaitSecurityResponse
46from typing import Any, List, Literal, Optional, Tuple, Union
47
48DEFAULT_SMP_KEY_DISTRIBUTION = (
49    PairingDelegate.KeyDistribution.DISTRIBUTE_ENCRYPTION_KEY
50    | PairingDelegate.KeyDistribution.DISTRIBUTE_IDENTITY_KEY
51    | PairingDelegate.KeyDistribution.DISTRIBUTE_SIGNING_KEY
52)
53
54
55async def le_connect_with_rpa_and_encrypt(central: PandoraDevice, peripheral: PandoraDevice) -> None:
56    # Note: Android doesn't support own_address_type=RESOLVABLE_OR_PUBLIC(offloaded resolution)
57    # But own_address_type=RANDOM still set a public RPA generated in host
58    advertisement = peripheral.aio.host.Advertise(
59        legacy=True,
60        connectable=True,
61        own_address_type=RANDOM if peripheral.name == 'android' else RESOLVABLE_OR_PUBLIC,
62        data=DataTypes(manufacturer_specific_data=b'pause cafe'),
63    )
64
65    (cen_res, per_res) = await asyncio.gather(
66        central.aio.host.ConnectLE(
67            own_address_type=RANDOM if central.name == 'android' else RESOLVABLE_OR_PUBLIC,
68            public=peripheral.address,
69        ),
70        anext(aiter(advertisement)),  # pytype: disable=name-error
71    )
72
73    advertisement.cancel()
74    assert_equal(cen_res.result_variant(), 'connection')
75    cen_per = cen_res.connection
76    per_cen = per_res.connection
77    assert cen_per is not None and per_cen is not None
78
79    encryption = await peripheral.aio.security.Secure(connection=per_cen, le=LE_LEVEL2)
80    assert_equal(encryption.result_variant(), 'success')
81
82
83class SecurityTest(base_test.BaseTestClass):  # type: ignore[misc]
84    '''
85    This class aim to test SSP (Secure Simple Pairing) on Classic
86    Bluetooth devices.
87    '''
88
89    devices: Optional[PandoraDevices] = None
90
91    # pandora devices.
92    dut: PandoraDevice
93    ref: PandoraDevice
94
95    @avatar.asynchronous
96    async def setup_class(self) -> None:
97        self.devices = PandoraDevices(self)
98        self.dut, self.ref, *_ = self.devices
99
100        # Enable BR/EDR mode and SSP for Bumble devices.
101        for device in self.devices:
102            if isinstance(device, BumblePandoraDevice):
103                device.config.setdefault('address_resolution_offload', True)
104                device.config.setdefault('classic_enabled', True)
105                device.config.setdefault('classic_ssp_enabled', True)
106                device.config.setdefault(
107                    'server',
108                    {
109                        'io_capability': 'display_output_and_yes_no_input',
110                    },
111                )
112
113        await asyncio.gather(self.dut.reset(), self.ref.reset())
114
115    def teardown_class(self) -> None:
116        if self.devices:
117            self.devices.stop_all()
118
119    @avatar.parameterized(
120        *itertools.product(
121            ('outgoing_connection', 'incoming_connection'),
122            ('outgoing_pairing', 'incoming_pairing'),
123            (
124                'accept',
125                'reject',
126                'rejected',
127                'disconnect',
128                'disconnected',
129                'accept_ctkd',
130            ),
131            (
132                'against_default_io_cap',
133                'against_no_output_no_input',
134                'against_keyboard_only',
135                'against_display_only',
136                'against_display_yes_no',
137            ),
138            ('against_central', 'against_peripheral'),
139        )
140    )  # type: ignore[misc]
141    @avatar.asynchronous
142    async def test_ssp(
143        self,
144        connect: Union[Literal['outgoing_connection'], Literal['incoming_connection']],
145        pair: Union[Literal['outgoing_pairing'], Literal['incoming_pairing']],
146        variant: Union[
147            Literal['accept'],
148            Literal['reject'],
149            Literal['rejected'],
150            Literal['disconnect'],
151            Literal['disconnected'],
152            Literal['accept_ctkd'],
153        ],
154        ref_io_capability: Union[
155            Literal['against_default_io_cap'],
156            Literal['against_no_output_no_input'],
157            Literal['against_keyboard_only'],
158            Literal['against_display_only'],
159            Literal['against_display_yes_no'],
160        ],
161        ref_role: Union[
162            Literal['against_central'],
163            Literal['against_peripheral'],
164        ],
165    ) -> None:
166        if self.dut.name == 'android' and connect == 'outgoing_connection' and pair == 'incoming_pairing':
167            # TODO: do not skip when doing physical tests.
168            raise signals.TestSkip(
169                'TODO: Fix rootcanal when both side trigger authentication:\n'
170                + 'Android always trigger auth for outgoing connections.'
171            )
172
173        if self.dut.name == 'android' and 'disconnect' in variant:
174            raise signals.TestSkip(
175                'TODO: Fix AOSP pandora server for this variant:\n'
176                + '- Looks like `Disconnect`  never complete.\n'
177                + '- When disconnected the `Secure/WaitSecurity` never returns.'
178            )
179
180        if self.dut.name == 'android' and pair == 'outgoing_pairing' and ref_role == 'against_central':
181            raise signals.TestSkip(
182                'TODO: Fix PandoraSecurity server for android:\n'
183                + 'report the encryption state the with the bonding state'
184            )
185
186        if self.ref.name == 'android':
187            raise signals.TestSkip(
188                'TODO: (add bug number) Fix core stack:\n'
189                + 'BOND_BONDED event is triggered before the encryption changed'
190            )
191
192        if isinstance(self.ref, BumblePandoraDevice) and ref_io_capability == 'against_default_io_cap':
193            raise signals.TestSkip('Skip default IO cap for Bumble REF.')
194
195        if not isinstance(self.ref, BumblePandoraDevice) and ref_io_capability != 'against_default_io_cap':
196            raise signals.TestSkip('Unable to override IO capability on non Bumble device.')
197
198        # CTKD
199        if 'ctkd' in variant and ref_io_capability not in ('against_display_yes_no'):
200            raise signals.TestSkip('CTKD cases must be conducted under Security Level 4')
201
202        # Factory reset both DUT and REF devices.
203        await asyncio.gather(self.dut.reset(), self.ref.reset())
204
205        # Override REF IO capability if supported.
206        if isinstance(self.ref, BumblePandoraDevice):
207            io_capability = {
208                'against_no_output_no_input': PairingDelegate.IoCapability.NO_OUTPUT_NO_INPUT,
209                'against_keyboard_only': PairingDelegate.IoCapability.KEYBOARD_INPUT_ONLY,
210                'against_display_only': PairingDelegate.IoCapability.DISPLAY_OUTPUT_ONLY,
211                'against_display_yes_no': PairingDelegate.IoCapability.DISPLAY_OUTPUT_AND_YES_NO_INPUT,
212            }[ref_io_capability]
213            self.ref.server_config.io_capability = io_capability
214            self.ref.server_config.smp_local_initiator_key_distribution = DEFAULT_SMP_KEY_DISTRIBUTION
215            self.ref.server_config.smp_local_responder_key_distribution = DEFAULT_SMP_KEY_DISTRIBUTION
216            # Distribute Public identity address
217            self.ref.server_config.identity_address_type = PairingConfig.AddressType.PUBLIC
218            # Allow role switch
219            # TODO: Remove direct Bumble usage
220            await self.ref.device.send_command(HCI_Write_Default_Link_Policy_Settings_Command(default_link_policy_settings=0x01), check_result=True)  # type: ignore
221
222        # Override DUT Bumble device capabilities.
223        if isinstance(self.dut, BumblePandoraDevice):
224            self.dut.server_config.smp_local_initiator_key_distribution = DEFAULT_SMP_KEY_DISTRIBUTION
225            self.dut.server_config.smp_local_responder_key_distribution = DEFAULT_SMP_KEY_DISTRIBUTION
226            # Distribute Public identity address
227            self.dut.server_config.identity_address_type = PairingConfig.AddressType.PUBLIC
228            # Allow role switch
229            # TODO: Remove direct Bumble usage
230            await self.dut.device.send_command(HCI_Write_Default_Link_Policy_Settings_Command(default_link_policy_settings=0x01), check_result=True)  # type: ignore
231
232        # Pandora connection tokens
233        ref_dut: Optional[PandoraConnection] = None
234        dut_ref: Optional[PandoraConnection] = None
235        # Bumble connection
236        ref_dut_bumble = None
237        dut_ref_bumble = None
238        # CTKD async task
239        ctkd_task = None
240        need_ctkd = 'ctkd' in variant
241
242        # Connection/pairing task.
243        async def connect_and_pair() -> Tuple[SecureResponse, WaitSecurityResponse]:
244            nonlocal ref_dut
245            nonlocal dut_ref
246            nonlocal ref_dut_bumble
247            nonlocal dut_ref_bumble
248            nonlocal ctkd_task
249
250            # Make classic connection.
251            if connect == 'incoming_connection':
252                ref_dut, dut_ref = await pandora.connect(initiator=self.ref, acceptor=self.dut)
253            else:
254                dut_ref, ref_dut = await pandora.connect(initiator=self.dut, acceptor=self.ref)
255
256            # Retrieve Bumble connection
257            if isinstance(self.dut, BumblePandoraDevice):
258                dut_ref_bumble = pandora.get_raw_connection(self.dut, dut_ref)
259            # Role switch.
260            if isinstance(self.ref, BumblePandoraDevice):
261                ref_dut_bumble = pandora.get_raw_connection(self.ref, ref_dut)
262                if ref_dut_bumble is not None:
263                    role = {
264                        'against_central': HCI_CENTRAL_ROLE,
265                        'against_peripheral': HCI_PERIPHERAL_ROLE,
266                    }[ref_role]
267
268                    if ref_dut_bumble.role != role:
269                        self.ref.log.info(
270                            f"Role switch to: {'`CENTRAL`' if role == HCI_CENTRAL_ROLE else '`PERIPHERAL`'}"
271                        )
272                        await ref_dut_bumble.switch_role(role)
273
274            # TODO: Remove direct Bumble usage
275            async def wait_ctkd_keys() -> List[PairingKeys]:
276                futures: List[asyncio.Future[PairingKeys]] = []
277                if ref_dut_bumble is not None:
278                    ref_dut_fut = asyncio.get_event_loop().create_future()
279                    futures.append(ref_dut_fut)
280
281                    def on_pairing(keys: PairingKeys) -> None:
282                        ref_dut_fut.set_result(keys)
283
284                    ref_dut_bumble.on('pairing', on_pairing)
285                if dut_ref_bumble is not None:
286                    dut_ref_fut = asyncio.get_event_loop().create_future()
287                    futures.append(dut_ref_fut)
288
289                    def on_pairing(keys: PairingKeys) -> None:
290                        dut_ref_fut.set_result(keys)
291
292                    dut_ref_bumble.on('pairing', on_pairing)
293
294                return await asyncio.gather(*futures)
295
296            if need_ctkd:
297                # CTKD might be triggered by devices automatically, so CTKD listener must be started here
298                ctkd_task = asyncio.create_task(wait_ctkd_keys())
299
300            # Pairing.
301            if pair == 'incoming_pairing':
302                return await asyncio.gather(
303                    self.ref.aio.security.Secure(connection=ref_dut, classic=LEVEL2),
304                    self.dut.aio.security.WaitSecurity(connection=dut_ref, classic=LEVEL2),
305                )
306
307            return await asyncio.gather(
308                self.dut.aio.security.Secure(connection=dut_ref, classic=LEVEL2),
309                self.ref.aio.security.WaitSecurity(connection=ref_dut, classic=LEVEL2),
310            )
311
312        # Listen for pairing event on bot DUT and REF.
313        dut_pairing, ref_pairing = self.dut.aio.security.OnPairing(), self.ref.aio.security.OnPairing()
314
315        # Start connection/pairing.
316        connect_and_pair_task = asyncio.create_task(connect_and_pair())
317
318        shall_pass = variant == 'accept' or 'ctkd' in variant
319        try:
320            dut_pairing_fut = asyncio.create_task(anext(dut_pairing))
321            ref_pairing_fut = asyncio.create_task(anext(ref_pairing))
322
323            def on_done(_: Any) -> None:
324                if not dut_pairing_fut.done():
325                    dut_pairing_fut.cancel()
326                if not ref_pairing_fut.done():
327                    ref_pairing_fut.cancel()
328
329            connect_and_pair_task.add_done_callback(on_done)
330
331            ref_ev = await asyncio.wait_for(ref_pairing_fut, timeout=5.0)
332            self.ref.log.info(f'REF pairing event: {ref_ev.method_variant()}')
333
334            dut_ev_answer, ref_ev_answer = None, None
335            if not connect_and_pair_task.done():
336                dut_ev = await asyncio.wait_for(dut_pairing_fut, timeout=15.0)
337                self.dut.log.info(f'DUT pairing event: {dut_ev.method_variant()}')
338
339                if dut_ev.method_variant() in ('numeric_comparison', 'just_works'):
340                    assert_in(ref_ev.method_variant(), ('numeric_comparison', 'just_works'))
341
342                    confirm = True
343                    if (
344                        dut_ev.method_variant() == 'numeric_comparison'
345                        and ref_ev.method_variant() == 'numeric_comparison'
346                    ):
347                        confirm = ref_ev.numeric_comparison == dut_ev.numeric_comparison
348
349                    dut_ev_answer = PairingEventAnswer(event=dut_ev, confirm=False if variant == 'reject' else confirm)
350                    ref_ev_answer = PairingEventAnswer(
351                        event=ref_ev, confirm=False if variant == 'rejected' else confirm
352                    )
353
354                elif dut_ev.method_variant() == 'passkey_entry_notification':
355                    assert_equal(ref_ev.method_variant(), 'passkey_entry_request')
356                    assert_is_not_none(dut_ev.passkey_entry_notification)
357                    assert dut_ev.passkey_entry_notification is not None
358
359                    if variant == 'reject':
360                        # DUT cannot reject, pairing shall pass.
361                        shall_pass = True
362
363                    ref_ev_answer = PairingEventAnswer(
364                        event=ref_ev,
365                        passkey=None if variant == 'rejected' else dut_ev.passkey_entry_notification,
366                    )
367
368                elif dut_ev.method_variant() == 'passkey_entry_request':
369                    assert_equal(ref_ev.method_variant(), 'passkey_entry_notification')
370                    assert_is_not_none(ref_ev.passkey_entry_notification)
371
372                    if variant == 'rejected':
373                        # REF cannot reject, pairing shall pass.
374                        shall_pass = True
375
376                    assert ref_ev.passkey_entry_notification is not None
377                    dut_ev_answer = PairingEventAnswer(
378                        event=dut_ev,
379                        passkey=None if variant == 'reject' else ref_ev.passkey_entry_notification,
380                    )
381
382                else:
383                    fail("")
384
385                if variant == 'disconnect':
386                    # Disconnect:
387                    # - REF respond to pairing event if any.
388                    # - DUT trigger disconnect.
389                    if ref_ev_answer is not None:
390                        ref_pairing.send_nowait(ref_ev_answer)
391                    assert dut_ref is not None
392                    await self.dut.aio.host.Disconnect(connection=dut_ref)
393
394                elif variant == 'disconnected':
395                    # Disconnected:
396                    # - DUT respond to pairing event if any.
397                    # - REF trigger disconnect.
398                    if dut_ev_answer is not None:
399                        dut_pairing.send_nowait(dut_ev_answer)
400                    assert ref_dut is not None
401                    await self.ref.aio.host.Disconnect(connection=ref_dut)
402
403                else:
404                    # Otherwise:
405                    # - REF respond to pairing event if any.
406                    # - DUT respond to pairing event if any.
407                    if ref_ev_answer is not None:
408                        ref_pairing.send_nowait(ref_ev_answer)
409                    if dut_ev_answer is not None:
410                        dut_pairing.send_nowait(dut_ev_answer)
411
412        except (asyncio.CancelledError, asyncio.TimeoutError):
413            logging.error('Pairing timed-out or has been canceled.')
414
415        except AssertionError:
416            logging.exception('Pairing failed.')
417            if not connect_and_pair_task.done():
418                connect_and_pair_task.cancel()
419
420        finally:
421            try:
422                (secure, wait_security) = await asyncio.wait_for(connect_and_pair_task, 15.0)
423                logging.info(f'Pairing result: {secure.result_variant()}/{wait_security.result_variant()}')
424
425                if shall_pass:
426                    assert_equal(secure.result_variant(), 'success')
427                    assert_equal(wait_security.result_variant(), 'success')
428                else:
429                    assert_in(
430                        secure.result_variant(),
431                        ('connection_died', 'pairing_failure', 'authentication_failure', 'not_reached'),
432                    )
433                    assert_in(
434                        wait_security.result_variant(),
435                        ('connection_died', 'pairing_failure', 'authentication_failure', 'not_reached'),
436                    )
437
438            finally:
439                dut_pairing.cancel()
440                ref_pairing.cancel()
441
442        if not need_ctkd:
443            return
444
445        ctkd_shall_pass = variant == 'accept_ctkd'
446
447        if variant == 'accept_ctkd':
448            # TODO: Remove direct Bumble usage
449            async def ctkd_over_bredr() -> None:
450                if ref_role == 'against_central':
451                    if ref_dut_bumble is not None:
452                        await ref_dut_bumble.pair()
453                else:
454                    if dut_ref_bumble is not None:
455                        await dut_ref_bumble.pair()
456                assert ctkd_task is not None
457                await ctkd_task
458
459            await ctkd_over_bredr()
460        else:
461            fail("Unsupported variant " + variant)
462
463        if ctkd_shall_pass:
464            # Try to connect with RPA(to verify IRK), and encrypt(to verify LTK)
465            await le_connect_with_rpa_and_encrypt(self.dut, self.ref)
466
467
468if __name__ == '__main__':
469    logging.basicConfig(level=logging.DEBUG)
470    test_runner.main()  # type: ignore
471