1# Copyright 2023 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import asyncio 16import avatar 17import itertools 18import logging 19 20from avatar import BumblePandoraDevice 21from avatar import PandoraDevice 22from avatar import PandoraDevices 23from avatar import pandora 24from bumble.hci import HCI_CENTRAL_ROLE 25from bumble.hci import HCI_PERIPHERAL_ROLE 26from bumble.hci import HCI_Write_Default_Link_Policy_Settings_Command 27from bumble.keys import PairingKeys 28from bumble.pairing import PairingConfig 29from bumble.pairing import PairingDelegate 30from mobly import base_test 31from mobly import signals 32from mobly import test_runner 33from mobly.asserts import assert_equal # type: ignore 34from mobly.asserts import assert_in # type: ignore 35from mobly.asserts import assert_is_not_none # type: ignore 36from mobly.asserts import fail # type: ignore 37from pandora.host_pb2 import RANDOM 38from pandora.host_pb2 import RESOLVABLE_OR_PUBLIC 39from pandora.host_pb2 import Connection as PandoraConnection 40from pandora.host_pb2 import DataTypes 41from pandora.security_pb2 import LE_LEVEL2 42from pandora.security_pb2 import LEVEL2 43from pandora.security_pb2 import PairingEventAnswer 44from pandora.security_pb2 import SecureResponse 45from pandora.security_pb2 import WaitSecurityResponse 46from typing import Any, List, Literal, Optional, Tuple, Union 47 48DEFAULT_SMP_KEY_DISTRIBUTION = ( 49 PairingDelegate.KeyDistribution.DISTRIBUTE_ENCRYPTION_KEY 50 | PairingDelegate.KeyDistribution.DISTRIBUTE_IDENTITY_KEY 51 | PairingDelegate.KeyDistribution.DISTRIBUTE_SIGNING_KEY 52) 53 54 55async def le_connect_with_rpa_and_encrypt(central: PandoraDevice, peripheral: PandoraDevice) -> None: 56 # Note: Android doesn't support own_address_type=RESOLVABLE_OR_PUBLIC(offloaded resolution) 57 # But own_address_type=RANDOM still set a public RPA generated in host 58 advertisement = peripheral.aio.host.Advertise( 59 legacy=True, 60 connectable=True, 61 own_address_type=RANDOM if peripheral.name == 'android' else RESOLVABLE_OR_PUBLIC, 62 data=DataTypes(manufacturer_specific_data=b'pause cafe'), 63 ) 64 65 (cen_res, per_res) = await asyncio.gather( 66 central.aio.host.ConnectLE( 67 own_address_type=RANDOM if central.name == 'android' else RESOLVABLE_OR_PUBLIC, 68 public=peripheral.address, 69 ), 70 anext(aiter(advertisement)), # pytype: disable=name-error 71 ) 72 73 advertisement.cancel() 74 assert_equal(cen_res.result_variant(), 'connection') 75 cen_per = cen_res.connection 76 per_cen = per_res.connection 77 assert cen_per is not None and per_cen is not None 78 79 encryption = await peripheral.aio.security.Secure(connection=per_cen, le=LE_LEVEL2) 80 assert_equal(encryption.result_variant(), 'success') 81 82 83class SecurityTest(base_test.BaseTestClass): # type: ignore[misc] 84 ''' 85 This class aim to test SSP (Secure Simple Pairing) on Classic 86 Bluetooth devices. 87 ''' 88 89 devices: Optional[PandoraDevices] = None 90 91 # pandora devices. 92 dut: PandoraDevice 93 ref: PandoraDevice 94 95 @avatar.asynchronous 96 async def setup_class(self) -> None: 97 self.devices = PandoraDevices(self) 98 self.dut, self.ref, *_ = self.devices 99 100 # Enable BR/EDR mode and SSP for Bumble devices. 101 for device in self.devices: 102 if isinstance(device, BumblePandoraDevice): 103 device.config.setdefault('address_resolution_offload', True) 104 device.config.setdefault('classic_enabled', True) 105 device.config.setdefault('classic_ssp_enabled', True) 106 device.config.setdefault( 107 'server', 108 { 109 'io_capability': 'display_output_and_yes_no_input', 110 }, 111 ) 112 113 await asyncio.gather(self.dut.reset(), self.ref.reset()) 114 115 def teardown_class(self) -> None: 116 if self.devices: 117 self.devices.stop_all() 118 119 @avatar.parameterized( 120 *itertools.product( 121 ('outgoing_connection', 'incoming_connection'), 122 ('outgoing_pairing', 'incoming_pairing'), 123 ( 124 'accept', 125 'reject', 126 'rejected', 127 'disconnect', 128 'disconnected', 129 'accept_ctkd', 130 ), 131 ( 132 'against_default_io_cap', 133 'against_no_output_no_input', 134 'against_keyboard_only', 135 'against_display_only', 136 'against_display_yes_no', 137 ), 138 ('against_central', 'against_peripheral'), 139 ) 140 ) # type: ignore[misc] 141 @avatar.asynchronous 142 async def test_ssp( 143 self, 144 connect: Union[Literal['outgoing_connection'], Literal['incoming_connection']], 145 pair: Union[Literal['outgoing_pairing'], Literal['incoming_pairing']], 146 variant: Union[ 147 Literal['accept'], 148 Literal['reject'], 149 Literal['rejected'], 150 Literal['disconnect'], 151 Literal['disconnected'], 152 Literal['accept_ctkd'], 153 ], 154 ref_io_capability: Union[ 155 Literal['against_default_io_cap'], 156 Literal['against_no_output_no_input'], 157 Literal['against_keyboard_only'], 158 Literal['against_display_only'], 159 Literal['against_display_yes_no'], 160 ], 161 ref_role: Union[ 162 Literal['against_central'], 163 Literal['against_peripheral'], 164 ], 165 ) -> None: 166 if self.dut.name == 'android' and connect == 'outgoing_connection' and pair == 'incoming_pairing': 167 # TODO: do not skip when doing physical tests. 168 raise signals.TestSkip( 169 'TODO: Fix rootcanal when both side trigger authentication:\n' 170 + 'Android always trigger auth for outgoing connections.' 171 ) 172 173 if self.dut.name == 'android' and 'disconnect' in variant: 174 raise signals.TestSkip( 175 'TODO: Fix AOSP pandora server for this variant:\n' 176 + '- Looks like `Disconnect` never complete.\n' 177 + '- When disconnected the `Secure/WaitSecurity` never returns.' 178 ) 179 180 if self.dut.name == 'android' and pair == 'outgoing_pairing' and ref_role == 'against_central': 181 raise signals.TestSkip( 182 'TODO: Fix PandoraSecurity server for android:\n' 183 + 'report the encryption state the with the bonding state' 184 ) 185 186 if self.ref.name == 'android': 187 raise signals.TestSkip( 188 'TODO: (add bug number) Fix core stack:\n' 189 + 'BOND_BONDED event is triggered before the encryption changed' 190 ) 191 192 if isinstance(self.ref, BumblePandoraDevice) and ref_io_capability == 'against_default_io_cap': 193 raise signals.TestSkip('Skip default IO cap for Bumble REF.') 194 195 if not isinstance(self.ref, BumblePandoraDevice) and ref_io_capability != 'against_default_io_cap': 196 raise signals.TestSkip('Unable to override IO capability on non Bumble device.') 197 198 # CTKD 199 if 'ctkd' in variant and ref_io_capability not in ('against_display_yes_no'): 200 raise signals.TestSkip('CTKD cases must be conducted under Security Level 4') 201 202 # Factory reset both DUT and REF devices. 203 await asyncio.gather(self.dut.reset(), self.ref.reset()) 204 205 # Override REF IO capability if supported. 206 if isinstance(self.ref, BumblePandoraDevice): 207 io_capability = { 208 'against_no_output_no_input': PairingDelegate.IoCapability.NO_OUTPUT_NO_INPUT, 209 'against_keyboard_only': PairingDelegate.IoCapability.KEYBOARD_INPUT_ONLY, 210 'against_display_only': PairingDelegate.IoCapability.DISPLAY_OUTPUT_ONLY, 211 'against_display_yes_no': PairingDelegate.IoCapability.DISPLAY_OUTPUT_AND_YES_NO_INPUT, 212 }[ref_io_capability] 213 self.ref.server_config.io_capability = io_capability 214 self.ref.server_config.smp_local_initiator_key_distribution = DEFAULT_SMP_KEY_DISTRIBUTION 215 self.ref.server_config.smp_local_responder_key_distribution = DEFAULT_SMP_KEY_DISTRIBUTION 216 # Distribute Public identity address 217 self.ref.server_config.identity_address_type = PairingConfig.AddressType.PUBLIC 218 # Allow role switch 219 # TODO: Remove direct Bumble usage 220 await self.ref.device.send_command(HCI_Write_Default_Link_Policy_Settings_Command(default_link_policy_settings=0x01), check_result=True) # type: ignore 221 222 # Override DUT Bumble device capabilities. 223 if isinstance(self.dut, BumblePandoraDevice): 224 self.dut.server_config.smp_local_initiator_key_distribution = DEFAULT_SMP_KEY_DISTRIBUTION 225 self.dut.server_config.smp_local_responder_key_distribution = DEFAULT_SMP_KEY_DISTRIBUTION 226 # Distribute Public identity address 227 self.dut.server_config.identity_address_type = PairingConfig.AddressType.PUBLIC 228 # Allow role switch 229 # TODO: Remove direct Bumble usage 230 await self.dut.device.send_command(HCI_Write_Default_Link_Policy_Settings_Command(default_link_policy_settings=0x01), check_result=True) # type: ignore 231 232 # Pandora connection tokens 233 ref_dut: Optional[PandoraConnection] = None 234 dut_ref: Optional[PandoraConnection] = None 235 # Bumble connection 236 ref_dut_bumble = None 237 dut_ref_bumble = None 238 # CTKD async task 239 ctkd_task = None 240 need_ctkd = 'ctkd' in variant 241 242 # Connection/pairing task. 243 async def connect_and_pair() -> Tuple[SecureResponse, WaitSecurityResponse]: 244 nonlocal ref_dut 245 nonlocal dut_ref 246 nonlocal ref_dut_bumble 247 nonlocal dut_ref_bumble 248 nonlocal ctkd_task 249 250 # Make classic connection. 251 if connect == 'incoming_connection': 252 ref_dut, dut_ref = await pandora.connect(initiator=self.ref, acceptor=self.dut) 253 else: 254 dut_ref, ref_dut = await pandora.connect(initiator=self.dut, acceptor=self.ref) 255 256 # Retrieve Bumble connection 257 if isinstance(self.dut, BumblePandoraDevice): 258 dut_ref_bumble = pandora.get_raw_connection(self.dut, dut_ref) 259 # Role switch. 260 if isinstance(self.ref, BumblePandoraDevice): 261 ref_dut_bumble = pandora.get_raw_connection(self.ref, ref_dut) 262 if ref_dut_bumble is not None: 263 role = { 264 'against_central': HCI_CENTRAL_ROLE, 265 'against_peripheral': HCI_PERIPHERAL_ROLE, 266 }[ref_role] 267 268 if ref_dut_bumble.role != role: 269 self.ref.log.info( 270 f"Role switch to: {'`CENTRAL`' if role == HCI_CENTRAL_ROLE else '`PERIPHERAL`'}" 271 ) 272 await ref_dut_bumble.switch_role(role) 273 274 # TODO: Remove direct Bumble usage 275 async def wait_ctkd_keys() -> List[PairingKeys]: 276 futures: List[asyncio.Future[PairingKeys]] = [] 277 if ref_dut_bumble is not None: 278 ref_dut_fut = asyncio.get_event_loop().create_future() 279 futures.append(ref_dut_fut) 280 281 def on_pairing(keys: PairingKeys) -> None: 282 ref_dut_fut.set_result(keys) 283 284 ref_dut_bumble.on('pairing', on_pairing) 285 if dut_ref_bumble is not None: 286 dut_ref_fut = asyncio.get_event_loop().create_future() 287 futures.append(dut_ref_fut) 288 289 def on_pairing(keys: PairingKeys) -> None: 290 dut_ref_fut.set_result(keys) 291 292 dut_ref_bumble.on('pairing', on_pairing) 293 294 return await asyncio.gather(*futures) 295 296 if need_ctkd: 297 # CTKD might be triggered by devices automatically, so CTKD listener must be started here 298 ctkd_task = asyncio.create_task(wait_ctkd_keys()) 299 300 # Pairing. 301 if pair == 'incoming_pairing': 302 return await asyncio.gather( 303 self.ref.aio.security.Secure(connection=ref_dut, classic=LEVEL2), 304 self.dut.aio.security.WaitSecurity(connection=dut_ref, classic=LEVEL2), 305 ) 306 307 return await asyncio.gather( 308 self.dut.aio.security.Secure(connection=dut_ref, classic=LEVEL2), 309 self.ref.aio.security.WaitSecurity(connection=ref_dut, classic=LEVEL2), 310 ) 311 312 # Listen for pairing event on bot DUT and REF. 313 dut_pairing, ref_pairing = self.dut.aio.security.OnPairing(), self.ref.aio.security.OnPairing() 314 315 # Start connection/pairing. 316 connect_and_pair_task = asyncio.create_task(connect_and_pair()) 317 318 shall_pass = variant == 'accept' or 'ctkd' in variant 319 try: 320 dut_pairing_fut = asyncio.create_task(anext(dut_pairing)) 321 ref_pairing_fut = asyncio.create_task(anext(ref_pairing)) 322 323 def on_done(_: Any) -> None: 324 if not dut_pairing_fut.done(): 325 dut_pairing_fut.cancel() 326 if not ref_pairing_fut.done(): 327 ref_pairing_fut.cancel() 328 329 connect_and_pair_task.add_done_callback(on_done) 330 331 ref_ev = await asyncio.wait_for(ref_pairing_fut, timeout=5.0) 332 self.ref.log.info(f'REF pairing event: {ref_ev.method_variant()}') 333 334 dut_ev_answer, ref_ev_answer = None, None 335 if not connect_and_pair_task.done(): 336 dut_ev = await asyncio.wait_for(dut_pairing_fut, timeout=15.0) 337 self.dut.log.info(f'DUT pairing event: {dut_ev.method_variant()}') 338 339 if dut_ev.method_variant() in ('numeric_comparison', 'just_works'): 340 assert_in(ref_ev.method_variant(), ('numeric_comparison', 'just_works')) 341 342 confirm = True 343 if ( 344 dut_ev.method_variant() == 'numeric_comparison' 345 and ref_ev.method_variant() == 'numeric_comparison' 346 ): 347 confirm = ref_ev.numeric_comparison == dut_ev.numeric_comparison 348 349 dut_ev_answer = PairingEventAnswer(event=dut_ev, confirm=False if variant == 'reject' else confirm) 350 ref_ev_answer = PairingEventAnswer( 351 event=ref_ev, confirm=False if variant == 'rejected' else confirm 352 ) 353 354 elif dut_ev.method_variant() == 'passkey_entry_notification': 355 assert_equal(ref_ev.method_variant(), 'passkey_entry_request') 356 assert_is_not_none(dut_ev.passkey_entry_notification) 357 assert dut_ev.passkey_entry_notification is not None 358 359 if variant == 'reject': 360 # DUT cannot reject, pairing shall pass. 361 shall_pass = True 362 363 ref_ev_answer = PairingEventAnswer( 364 event=ref_ev, 365 passkey=None if variant == 'rejected' else dut_ev.passkey_entry_notification, 366 ) 367 368 elif dut_ev.method_variant() == 'passkey_entry_request': 369 assert_equal(ref_ev.method_variant(), 'passkey_entry_notification') 370 assert_is_not_none(ref_ev.passkey_entry_notification) 371 372 if variant == 'rejected': 373 # REF cannot reject, pairing shall pass. 374 shall_pass = True 375 376 assert ref_ev.passkey_entry_notification is not None 377 dut_ev_answer = PairingEventAnswer( 378 event=dut_ev, 379 passkey=None if variant == 'reject' else ref_ev.passkey_entry_notification, 380 ) 381 382 else: 383 fail("") 384 385 if variant == 'disconnect': 386 # Disconnect: 387 # - REF respond to pairing event if any. 388 # - DUT trigger disconnect. 389 if ref_ev_answer is not None: 390 ref_pairing.send_nowait(ref_ev_answer) 391 assert dut_ref is not None 392 await self.dut.aio.host.Disconnect(connection=dut_ref) 393 394 elif variant == 'disconnected': 395 # Disconnected: 396 # - DUT respond to pairing event if any. 397 # - REF trigger disconnect. 398 if dut_ev_answer is not None: 399 dut_pairing.send_nowait(dut_ev_answer) 400 assert ref_dut is not None 401 await self.ref.aio.host.Disconnect(connection=ref_dut) 402 403 else: 404 # Otherwise: 405 # - REF respond to pairing event if any. 406 # - DUT respond to pairing event if any. 407 if ref_ev_answer is not None: 408 ref_pairing.send_nowait(ref_ev_answer) 409 if dut_ev_answer is not None: 410 dut_pairing.send_nowait(dut_ev_answer) 411 412 except (asyncio.CancelledError, asyncio.TimeoutError): 413 logging.error('Pairing timed-out or has been canceled.') 414 415 except AssertionError: 416 logging.exception('Pairing failed.') 417 if not connect_and_pair_task.done(): 418 connect_and_pair_task.cancel() 419 420 finally: 421 try: 422 (secure, wait_security) = await asyncio.wait_for(connect_and_pair_task, 15.0) 423 logging.info(f'Pairing result: {secure.result_variant()}/{wait_security.result_variant()}') 424 425 if shall_pass: 426 assert_equal(secure.result_variant(), 'success') 427 assert_equal(wait_security.result_variant(), 'success') 428 else: 429 assert_in( 430 secure.result_variant(), 431 ('connection_died', 'pairing_failure', 'authentication_failure', 'not_reached'), 432 ) 433 assert_in( 434 wait_security.result_variant(), 435 ('connection_died', 'pairing_failure', 'authentication_failure', 'not_reached'), 436 ) 437 438 finally: 439 dut_pairing.cancel() 440 ref_pairing.cancel() 441 442 if not need_ctkd: 443 return 444 445 ctkd_shall_pass = variant == 'accept_ctkd' 446 447 if variant == 'accept_ctkd': 448 # TODO: Remove direct Bumble usage 449 async def ctkd_over_bredr() -> None: 450 if ref_role == 'against_central': 451 if ref_dut_bumble is not None: 452 await ref_dut_bumble.pair() 453 else: 454 if dut_ref_bumble is not None: 455 await dut_ref_bumble.pair() 456 assert ctkd_task is not None 457 await ctkd_task 458 459 await ctkd_over_bredr() 460 else: 461 fail("Unsupported variant " + variant) 462 463 if ctkd_shall_pass: 464 # Try to connect with RPA(to verify IRK), and encrypt(to verify LTK) 465 await le_connect_with_rpa_and_encrypt(self.dut, self.ref) 466 467 468if __name__ == '__main__': 469 logging.basicConfig(level=logging.DEBUG) 470 test_runner.main() # type: ignore 471