1# Copyright 2014 The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import glob 16import json 17import logging 18import os 19import os.path 20import re 21import subprocess 22import sys 23import tempfile 24import time 25import types 26 27import camera_properties_utils 28import capture_request_utils 29import image_processing_utils 30import its_device_utils 31import its_session_utils 32import lighting_control_utils 33import numpy as np 34import yaml 35 36 37YAML_FILE_DIR = os.environ['CAMERA_ITS_TOP'] 38CONFIG_FILE = os.path.join(YAML_FILE_DIR, 'config.yml') 39TEST_KEY_TABLET = 'tablet' 40TEST_KEY_SENSOR_FUSION = 'sensor_fusion' 41ACTIVITY_START_WAIT = 1.5 # seconds 42MERGE_RESULTS_TIMEOUT = 3600 # seconds 43 44NUM_TRIES = 2 45RESULT_PASS = 'PASS' 46RESULT_FAIL = 'FAIL' 47RESULT_NOT_EXECUTED = 'NOT_EXECUTED' 48RESULT_KEY = 'result' 49METRICS_KEY = 'mpc_metrics' 50PERFORMANCE_KEY = 'performance_metrics' 51FEATURE_QUERY_KEY = 'feature_query_proto' 52SUMMARY_KEY = 'summary' 53RESULT_VALUES = (RESULT_PASS, RESULT_FAIL, RESULT_NOT_EXECUTED) 54CTS_VERIFIER_PACKAGE_NAME = 'com.android.cts.verifier' 55ACTION_ITS_RESULT = 'com.android.cts.verifier.camera.its.ACTION_ITS_RESULT' 56EXTRA_VERSION = 'camera.its.extra.VERSION' 57CURRENT_ITS_VERSION = '1.0' # version number to sync with CtsVerifier 58EXTRA_CAMERA_ID = 'camera.its.extra.CAMERA_ID' 59EXTRA_RESULTS = 'camera.its.extra.RESULTS' 60EXTRA_TABLET_NAME = 'camera.its.extra.TABLET_NAME' 61TIME_KEY_START = 'start' 62TIME_KEY_END = 'end' 63VALID_CONTROLLERS = ('arduino', 'canakit') 64_FRONT_CAMERA_ID = '1' 65# recover replaced '_' in scene def 66_INT_STR_DICT = types.MappingProxyType({'11': '1_1', '12': '1_2'}) 67_MAIN_TESTBED = 0 68_PROPERTIES_TO_MATCH = ( 69 'ro.product.model', 'ro.product.name', 'ro.build.display.id', 'ro.revision' 70) 71 72# Scenes that can be automated through tablet display 73# Notes on scene names: 74# scene*_1/2/... are same scene split to load balance run times for scenes 75# scene*_a/b/... are similar scenes that share one or more tests 76_TABLET_SCENES = ( 77 'scene0', 'scene1_1', 'scene1_2', 'scene1_3', 'scene2_a', 'scene2_b', 78 'scene2_c', 'scene2_d', 'scene2_e', 'scene2_f', 'scene2_g', 'scene3', 79 'scene4', 'scene6', 'scene6_tele', 'scene7', 'scene8', 'scene9', 80 os.path.join('scene_extensions', 'scene_hdr'), 81 os.path.join('scene_extensions', 'scene_low_light'), 82 'scene_video', 83) 84 85# Scenes that use the 'sensor_fusion' test rig 86_MOTION_SCENES = ('sensor_fusion', 'feature_combination',) 87 88# Scenes that uses lighting control 89_FLASH_SCENES = ('scene_flash',) 90 91# Scenes that uses checkerboard as chart 92_CHECKERBOARD_SCENES = ('sensor_fusion', 'scene_flash', 'feature_combination',) 93 94# Scenes that have to be run manually regardless of configuration 95_MANUAL_SCENES = ('scene5',) 96 97# Scene extensions 98_EXTENSIONS_SCENES = (os.path.join('scene_extensions', 'scene_hdr'), 99 os.path.join('scene_extensions', 'scene_low_light'), 100 ) 101 102# All possible scenes 103_ALL_SCENES = _TABLET_SCENES + _MANUAL_SCENES + _MOTION_SCENES + _FLASH_SCENES 104 105# Scenes that are logically grouped and can be called as group 106# scene6_tele is not grouped with scene6 because it requires extension rig 107_GROUPED_SCENES = types.MappingProxyType({ 108 'scene1': ('scene1_1', 'scene1_2'), 109 'scene2': ('scene2_a', 'scene2_b', 'scene2_c', 'scene2_d', 'scene2_e', 110 'scene2_f', 'scene2_g') 111}) 112 113# Scene requirements for manual testing. 114_SCENE_REQ = types.MappingProxyType({ 115 'scene0': None, 116 'scene1_1': 'A grey card covering at least the middle 30% of the scene', 117 'scene1_2': 'A grey card covering at least the middle 30% of the scene', 118 'scene1_3': 'A grey card covering at least the middle 30% of the scene, ' 119 'without a white border like scene1_1 or scene1_2', 120 'scene2_a': 'The picture with 3 faces in tests/scene2_a/scene2_a.png', 121 'scene2_b': 'The picture with 3 faces in tests/scene2_b/scene2_b.png', 122 'scene2_c': 'The picture with 3 faces in tests/scene2_c/scene2_c.png', 123 'scene2_d': 'The picture with 3 faces in tests/scene2_d/scene2_d.png', 124 'scene2_e': 'The picture with 3 faces in tests/scene2_e/scene2_e.png', 125 'scene2_f': 'The picture with 3 faces in tests/scene2_f/scene2_f.png', 126 'scene2_g': 'The picture with 3 profile faces in tests/scene2_g/scene2_g.png', 127 'scene3': 'The ISO12233 chart', 128 'scene4': 'A test chart of a circle covering at least the middle 50% of ' 129 'the scene. See tests/scene4/scene4.png', 130 'scene5': 'Capture images with a diffuser attached to the camera. See ' 131 'source.android.com/docs/compatibility/cts/camera-its-tests#scene5/diffuser ' # pylint: disable line-too-long 132 'for more details', 133 'scene6': 'A grid of ArUco markers on a white background. ' 134 'See tests/scene6/scene6.png', 135 'scene6_tele': 'A grid of ArUco markers on a white background. Identical ' 136 'to scene6, but for tele cameras. ' 137 'See tests/scene6_tele/scene6_tele.png', 138 'scene7': 'The picture with 4 different colors, slanted edge and' 139 '4 ArUco markers. See tests/scene7/scene7.png', 140 'scene8': 'The picture with 4 faces in 4 different colors overlay.' 141 'See tests/scene8/scene8.png', 142 'scene9': 'A scene with high entropy consisting of random size and colored ' 143 'circles. See tests/scene9/scene9.png', 144 # Use os.path to avoid confusion on other platforms 145 os.path.join('scene_extensions', 'scene_hdr'): ( 146 'A tablet displayed scene with a face on the left ' 147 'and a low-contrast QR code on the right. ' 148 'See tests/scene_extensions/scene_hdr/scene_hdr.png' 149 ), 150 os.path.join('scene_extensions', 'scene_low_light'): ( 151 'A tablet displayed scene with a grid of squares of varying ' 152 'brightness. See ' 153 'tests/scene_extensions/scene_low_light/scene_low_light.png' 154 ), 155 'sensor_fusion': 'A checkerboard pattern for phone to rotate in front of ' 156 'in tests/sensor_fusion/checkerboard.pdf\n' 157 'See tests/sensor_fusion/SensorFusion.pdf for detailed ' 158 'instructions.\nNote that this test will be skipped ' 159 'on devices not supporting REALTIME camera timestamp.', 160 'feature_combination': 'The same scene as sensor_fusion, ' 161 'separated for easier testing.', 162 'scene_flash': 'A checkerboard pattern chart with lights off.', 163 'scene_video': 'A tablet displayed scene with a series of circles moving ' 164 'at different simulated frame rates. ' 165 'See tests/scene_video/scene_video.mp4', 166}) 167 168# Made mutable to allow for test augmentation based on first API level 169SUB_CAMERA_TESTS = { 170 'scene0': ( 171 'test_jitter', 172 'test_metadata', 173 'test_request_capture_match', 174 'test_sensor_events', 175 'test_solid_color_test_pattern', 176 'test_unified_timestamps', 177 ), 178 'scene1_1': ( 179 'test_burst_capture', 180 'test_burst_sameness_manual', 181 'test_exposure_x_iso', 182 'test_linearity', 183 ), 184 'scene1_2': ( 185 'test_raw_exposure', 186 ), 187 'scene1_3': ( 188 'test_dng_noise_model', 189 'test_raw_sensitivity', 190 'test_yuv_plus_raw', 191 ), 192 'scene2_a': ( 193 'test_num_faces', 194 ), 195 'scene4': ( 196 'test_aspect_ratio_and_crop', 197 ), 198 'scene6_tele': ( 199 'test_zoom_tele', 200 ), 201 'scene_video': ( 202 'test_preview_frame_drop', 203 ), 204 'sensor_fusion': ( 205 'test_sensor_fusion', 206 ), 207} 208 209_LIGHTING_CONTROL_TESTS = ( 210 'test_auto_flash.py', 211 'test_preview_min_frame_rate.py', 212 'test_led_snapshot.py', 213 'test_night_extension.py', 214 'test_low_light_boost_extension.py', 215 'test_hdr_extension.py', 216 ) 217 218_EXTENSION_NAMES = ( 219 'hdr', 220 'low_light', 221) 222 223_DST_SCENE_DIR = '/sdcard/Download/' 224_SUB_CAMERA_LEVELS = 2 225MOBLY_TEST_SUMMARY_TXT_FILE = 'test_mobly_summary.txt' 226 227 228def report_result(device_id, camera_id, tablet_name, results): 229 """Sends a pass/fail result to the device, via an intent. 230 231 Args: 232 device_id: The ID string of the device to report the results to. 233 camera_id: The ID string of the camera for which to report pass/fail. 234 tablet_name: The tablet name to identify model and build. 235 results: a dictionary contains all ITS scenes as key and result/summary of 236 current ITS run. See test_report_result unit test for an example. 237 """ 238 adb = f'adb -s {device_id}' 239 its_device_utils.start_its_test_activity(device_id) 240 time.sleep(ACTIVITY_START_WAIT) 241 242 # Validate/process results argument 243 for scene in results: 244 if RESULT_KEY not in results[scene]: 245 raise ValueError(f'ITS result not found for {scene}') 246 if results[scene][RESULT_KEY] not in RESULT_VALUES: 247 raise ValueError(f'Unknown ITS result for {scene}: {results[RESULT_KEY]}') 248 if SUMMARY_KEY in results[scene]: 249 device_summary_path = f'/sdcard/its_camera{camera_id}_{scene}.txt' 250 its_device_utils.run( 251 f'{adb} push {results[scene][SUMMARY_KEY]} {device_summary_path}') 252 results[scene][SUMMARY_KEY] = device_summary_path 253 254 json_results = json.dumps(results) 255 cmd = (f"{adb} shell am broadcast -a {ACTION_ITS_RESULT} --es {EXTRA_VERSION}" 256 f" {CURRENT_ITS_VERSION} --es {EXTRA_CAMERA_ID} {camera_id} --es " 257 f"{EXTRA_TABLET_NAME} {tablet_name} --es " 258 f"{EXTRA_RESULTS} \'{json_results}\'") 259 its_device_utils.run(cmd) 260 261 262def write_result(testbed_index, device_id, camera_id, results): 263 """Writes results to a temporary file for merging. 264 265 Args: 266 testbed_index: the index of a finished testbed. 267 device_id: the ID string of the device that created results. 268 camera_id: the ID string of the camera of the device. 269 results: a dictionary that contains all ITS scenes as key 270 and result/summary of current ITS run. 271 """ 272 result = {'device_id': device_id, 'results': results} 273 file_name = f'testbed_{testbed_index}_camera_{camera_id}.tmp' 274 with open(file_name, 'w') as f: 275 json.dump(result, f) 276 277 278def parse_testbeds(completed_testbeds): 279 """Parses completed testbeds and yields device_id, camera_id, and results. 280 281 Args: 282 completed_testbeds: an iterable of completed testbed indices. 283 Yields: 284 device_id: the device associated with the testbed. 285 camera_id: one of the camera_ids associated with the testbed. 286 results: the dictionary with scenes and result/summary of testbed's run. 287 """ 288 for i in completed_testbeds: 289 for file_name in glob.glob(f'testbed_{i}_camera_*.tmp'): 290 camera_id = file_name.split('camera_')[1].split('.tmp')[0] 291 device_id = '' 292 results = {} 293 with open(file_name, 'r') as f: 294 testbed_data = json.load(f) 295 device_id = testbed_data['device_id'] 296 results = testbed_data['results'] 297 if not device_id or not results: 298 raise ValueError(f'device_id or results for {file_name} not found.') 299 yield device_id, camera_id, results 300 301 302def get_device_property(device_id, property_name): 303 """Get property of a given device. 304 305 Args: 306 device_id: the ID string of a device. 307 property_name: the desired property string. 308 Returns: 309 The value of the property. 310 """ 311 property_cmd = f'adb -s {device_id} shell getprop {property_name}' 312 raw_output = subprocess.check_output( 313 property_cmd, stderr=subprocess.STDOUT, shell=True) 314 return str(raw_output.decode('utf-8')).strip() 315 316 317def are_devices_similar(device_id_1, device_id_2): 318 """Checks if key dimensions are the same between devices. 319 320 Args: 321 device_id_1: the ID string of the _MAIN_TESTBED device. 322 device_id_2: the ID string of another device. 323 Returns: 324 True if both devices share key dimensions. 325 """ 326 for property_to_match in _PROPERTIES_TO_MATCH: 327 property_value_1 = get_device_property(device_id_1, property_to_match) 328 property_value_2 = get_device_property(device_id_2, property_to_match) 329 if property_value_1 != property_value_2: 330 logging.error('%s does not match %s for %s', 331 property_value_1, property_value_2, property_to_match) 332 return False 333 return True 334 335 336def check_manual_scenes(device_id, camera_id, scene, out_path): 337 """Halt run to change scenes. 338 339 Args: 340 device_id: id of device 341 camera_id: id of camera 342 scene: Name of the scene to copy image files. 343 out_path: output file location 344 """ 345 hidden_physical_id = None 346 if its_session_utils.SUB_CAMERA_SEPARATOR in camera_id: 347 split_camera_ids = camera_id.split(its_session_utils.SUB_CAMERA_SEPARATOR) 348 if len(split_camera_ids) == _SUB_CAMERA_LEVELS: 349 camera_id = split_camera_ids[0] 350 hidden_physical_id = split_camera_ids[1] 351 352 with its_session_utils.ItsSession( 353 device_id=device_id, 354 camera_id=camera_id, 355 hidden_physical_id=hidden_physical_id) as cam: 356 props = cam.get_camera_properties() 357 props = cam.override_with_hidden_physical_camera_props(props) 358 359 while True: 360 input(f'\n Press <ENTER> after positioning camera {camera_id} with ' 361 f'{scene}.\n The scene setup should be: \n {_SCENE_REQ[scene]}\n') 362 # Converge 3A prior to capture 363 if scene == 'scene5': 364 cam.do_3a(do_af=False, lock_ae=camera_properties_utils.ae_lock(props), 365 lock_awb=camera_properties_utils.awb_lock(props)) 366 else: 367 cam.do_3a() 368 req, fmt = capture_request_utils.get_fastest_auto_capture_settings(props) 369 logging.info('Capturing an image to check the test scene') 370 cap = cam.do_capture(req, fmt) 371 img = image_processing_utils.convert_capture_to_rgb_image(cap) 372 img_name = os.path.join(out_path, f'test_{scene.replace("/", "_")}.jpg') 373 logging.info('Please check scene setup in %s', img_name) 374 image_processing_utils.write_image(img, img_name) 375 choice = input(f'Is the image okay for ITS {scene}? (Y/N)').lower() 376 if choice == 'y': 377 break 378 379 380def get_config_file_contents(): 381 """Read the config file contents from a YML file. 382 383 Args: 384 None 385 386 Returns: 387 config_file_contents: a dict read from config.yml 388 """ 389 with open(CONFIG_FILE) as file: 390 config_file_contents = yaml.safe_load(file) 391 return config_file_contents 392 393 394def get_test_params(config_file_contents): 395 """Reads the config file parameters. 396 397 Args: 398 config_file_contents: dict read from config.yml file 399 400 Returns: 401 dict of test parameters 402 """ 403 test_params = None 404 for _, j in config_file_contents.items(): 405 for datadict in j: 406 test_params = datadict.get('TestParams') 407 return test_params 408 409 410def get_device_serial_number(device, config_file_contents): 411 """Returns the serial number of the device with label from the config file. 412 413 The config file contains TestBeds dictionary which contains Controllers and 414 Android Device dicts.The two devices used by the test per box are listed 415 here labels dut and tablet. Parse through the nested TestBeds dict to get 416 the Android device details. 417 418 Args: 419 device: String device label as specified in config file.dut/tablet 420 config_file_contents: dict read from config.yml file 421 """ 422 423 for _, j in config_file_contents.items(): 424 for datadict in j: 425 android_device_contents = datadict.get('Controllers') 426 for device_dict in android_device_contents.get('AndroidDevice'): 427 for _, label in device_dict.items(): 428 if label == 'tablet': 429 tablet_device_id = str(device_dict.get('serial')) 430 if label == 'dut': 431 dut_device_id = str(device_dict.get('serial')) 432 if device == 'tablet': 433 return tablet_device_id 434 else: 435 return dut_device_id 436 437 438def get_updated_yml_file(yml_file_contents): 439 """Create a new yml file and write the testbed contents in it. 440 441 This testbed file is per box and contains all the parameters and 442 device id used by the mobly tests. 443 444 Args: 445 yml_file_contents: Data to write in yml file. 446 447 Returns: 448 Updated yml file contents. 449 """ 450 os.chmod(YAML_FILE_DIR, 0o755) 451 file_descriptor, new_yaml_file = tempfile.mkstemp( 452 suffix='.yml', prefix='config_', dir=YAML_FILE_DIR) 453 os.close(file_descriptor) 454 with open(new_yaml_file, 'w') as f: 455 yaml.dump(yml_file_contents, stream=f, default_flow_style=False) 456 new_yaml_file_name = os.path.basename(new_yaml_file) 457 return new_yaml_file_name 458 459 460def enable_external_storage(device_id): 461 """Override apk mode to allow write to external storage. 462 463 Args: 464 device_id: Serial number of the device. 465 466 """ 467 cmd = (f'adb -s {device_id} shell appops ' 468 'set com.android.cts.verifier MANAGE_EXTERNAL_STORAGE allow') 469 its_device_utils.run(cmd) 470 471 472def get_available_cameras(device_id, camera_id): 473 """Get available camera devices in the current state. 474 475 Args: 476 device_id: Serial number of the device. 477 camera_id: Logical camera_id 478 479 Returns: 480 List of all the available camera_ids. 481 """ 482 with its_session_utils.ItsSession( 483 device_id=device_id, 484 camera_id=camera_id) as cam: 485 props = cam.get_camera_properties() 486 props = cam.override_with_hidden_physical_camera_props(props) 487 unavailable_physical_cameras = cam.get_unavailable_physical_cameras( 488 camera_id) 489 unavailable_physical_ids = unavailable_physical_cameras[ 490 'unavailablePhysicalCamerasArray'] 491 output = cam.get_camera_ids() 492 all_camera_ids = output['cameraIdArray'] 493 # Concat camera_id, physical camera_id and sub camera separator 494 unavailable_physical_ids = [f'{camera_id}.{s}' 495 for s in unavailable_physical_ids] 496 for i in unavailable_physical_ids: 497 if i in all_camera_ids: 498 all_camera_ids.remove(i) 499 logging.debug('available camera ids: %s', all_camera_ids) 500 return all_camera_ids 501 502 503def get_unavailable_physical_cameras(device_id, camera_id): 504 """Get unavailable physical cameras in the current state. 505 506 Args: 507 device_id: Serial number of the device. 508 camera_id: Logical camera device id 509 510 Returns: 511 List of all the unavailable camera_ids. 512 """ 513 with its_session_utils.ItsSession( 514 device_id=device_id, 515 camera_id=camera_id) as cam: 516 unavailable_physical_cameras = cam.get_unavailable_physical_cameras( 517 camera_id) 518 unavailable_physical_ids = unavailable_physical_cameras[ 519 'unavailablePhysicalCamerasArray'] 520 unavailable_physical_ids = [f'{camera_id}.{s}' 521 for s in unavailable_physical_ids] 522 logging.debug('Unavailable physical camera ids: %s', 523 unavailable_physical_ids) 524 return unavailable_physical_ids 525 526 527def is_device_folded(device_id): 528 """Returns True if the foldable device is in folded state. 529 530 Args: 531 device_id: Serial number of the foldable device. 532 """ 533 cmd = (f'adb -s {device_id} shell cmd device_state state') 534 result = subprocess.getoutput(cmd) 535 if 'CLOSE' in result: 536 return True 537 return False 538 539 540def augment_sub_camera_tests(first_api_level): 541 """Adds certain tests to SUB_CAMERA_TESTS depending on first_api_level. 542 543 Args: 544 first_api_level: First api level of the device. 545 """ 546 if (first_api_level >= its_session_utils.ANDROID15_API_LEVEL): 547 logging.debug('Augmenting sub camera tests') 548 SUB_CAMERA_TESTS['scene6'] = ('test_in_sensor_zoom',) 549 550 551def main(): 552 """Run all the Camera ITS automated tests. 553 554 Script should be run from the top-level CameraITS directory. 555 556 Command line arguments: 557 camera: the camera(s) to be tested. Use comma to separate multiple 558 camera Ids. Ex: "camera=0,1" or "camera=1" 559 scenes: the test scene(s) to be executed. Use comma to separate 560 multiple scenes. Ex: "scenes=scene0,scene1_1" or 561 "scenes=0,1_1,sensor_fusion" (sceneX can be abbreviated by X 562 where X is scene name minus 'scene') 563 """ 564 logging.basicConfig(level=logging.INFO) 565 # Make output directories to hold the generated files. 566 topdir = tempfile.mkdtemp(prefix='CameraITS_') 567 try: 568 subprocess.call(['chmod', 'g+rx', topdir]) 569 except OSError as e: 570 logging.info(repr(e)) 571 572 scenes = [] 573 camera_id_combos = [] 574 testbed_index = None 575 num_testbeds = None 576 # Override camera, scenes and testbed with cmd line values if available 577 for s in list(sys.argv[1:]): 578 if 'scenes=' in s: 579 scenes = s.split('=')[1].split(',') 580 elif 'camera=' in s: 581 camera_id_combos = s.split('=')[1].split(',') 582 elif 'testbed_index=' in s: 583 testbed_index = int(s.split('=')[1]) 584 elif 'num_testbeds=' in s: 585 num_testbeds = int(s.split('=')[1]) 586 else: 587 raise ValueError(f'Unknown argument {s}') 588 if testbed_index is None and num_testbeds is not None: 589 raise ValueError( 590 'testbed_index must be specified if num_testbeds is specified.') 591 if (testbed_index is not None and num_testbeds is not None and 592 testbed_index >= num_testbeds): 593 raise ValueError('testbed_index must be less than num_testbeds. ' 594 'testbed_index starts at 0.') 595 596 # Prepend 'scene' if not specified at cmd line 597 for i, s in enumerate(scenes): 598 if (not s.startswith('scene') and 599 not s.startswith(('checkerboard', 'sensor_fusion', 600 'flash', 'feature_combination', '<scene-name>'))): 601 scenes[i] = f'scene{s}' 602 if s.startswith('flash') or s.startswith('extensions'): 603 scenes[i] = f'scene_{s}' 604 # Handle scene_extensions 605 if any(s.startswith(extension) for extension in _EXTENSION_NAMES): 606 scenes[i] = f'scene_extensions/scene_{s}' 607 if (any(s.startswith('scene_' + extension) 608 for extension in _EXTENSION_NAMES)): 609 scenes[i] = f'scene_extensions/{s}' 610 611 # Read config file and extract relevant TestBed 612 config_file_contents = get_config_file_contents() 613 if testbed_index is None: 614 for i in config_file_contents['TestBeds']: 615 if scenes in ( 616 ['sensor_fusion'], ['checkerboard'], ['scene_flash'], 617 ['feature_combination'] 618 ): 619 if TEST_KEY_SENSOR_FUSION not in i['Name'].lower(): 620 config_file_contents['TestBeds'].remove(i) 621 else: 622 if TEST_KEY_SENSOR_FUSION in i['Name'].lower(): 623 config_file_contents['TestBeds'].remove(i) 624 else: 625 config_file_contents = { 626 'TestBeds': [config_file_contents['TestBeds'][testbed_index]] 627 } 628 629 # Get test parameters from config file 630 test_params_content = get_test_params(config_file_contents) 631 if not camera_id_combos: 632 camera_id_combos = str(test_params_content['camera']).split(',') 633 if not scenes: 634 scenes = str(test_params_content['scene']).split(',') 635 scenes = [_INT_STR_DICT.get(n, n) for n in scenes] # recover '1_1' & '1_2' 636 637 device_id = get_device_serial_number('dut', config_file_contents) 638 # Enable external storage on DUT to send summary report to CtsVerifier.apk 639 enable_external_storage(device_id) 640 641 # Add to SUB_CAMERA_TESTS depending on first_api_level 642 augment_sub_camera_tests(its_session_utils.get_first_api_level(device_id)) 643 644 # Verify that CTS Verifier is installed 645 its_session_utils.check_apk_installed(device_id, CTS_VERIFIER_PACKAGE_NAME) 646 # Check whether the dut is foldable or not 647 testing_foldable_device = True if test_params_content[ 648 'foldable_device'] == 'True' else False 649 available_camera_ids_to_test_foldable = [] 650 if testing_foldable_device: 651 logging.debug('Testing foldable device.') 652 # Check the state of foldable device. True if device is folded, 653 # false if the device is opened. 654 device_folded = is_device_folded(device_id) 655 # list of available camera_ids to be tested in device state 656 available_camera_ids_to_test_foldable = get_available_cameras( 657 device_id, _FRONT_CAMERA_ID) 658 659 config_file_test_key = config_file_contents['TestBeds'][0]['Name'].lower() 660 logging.info('Saving %s output files to: %s', config_file_test_key, topdir) 661 if TEST_KEY_TABLET in config_file_test_key: 662 tablet_id = get_device_serial_number('tablet', config_file_contents) 663 tablet_name_cmd = f'adb -s {tablet_id} shell getprop ro.product.device' 664 raw_output = subprocess.check_output( 665 tablet_name_cmd, stderr=subprocess.STDOUT, shell=True) 666 tablet_name = str(raw_output.decode('utf-8')).strip() 667 logging.debug('Tablet name: %s', tablet_name) 668 brightness = test_params_content['brightness'] 669 its_session_utils.validate_tablet(tablet_name, brightness, tablet_id) 670 else: 671 tablet_id = None 672 tablet_name = "sensor_fusion" 673 674 testing_sensor_fusion_with_controller = False 675 if TEST_KEY_SENSOR_FUSION in config_file_test_key: 676 if test_params_content['rotator_cntl'].lower() in VALID_CONTROLLERS: 677 testing_sensor_fusion_with_controller = True 678 679 testing_flash_with_controller = False 680 if (test_params_content.get('lighting_cntl', 'None').lower() == 'arduino' and 681 'manual' not in config_file_test_key): 682 testing_flash_with_controller = True 683 684 # Expand GROUPED_SCENES and remove any duplicates 685 scenes = [_GROUPED_SCENES[s] if s in _GROUPED_SCENES else s for s in scenes] 686 scenes = np.hstack(scenes).tolist() 687 scenes = sorted(set(scenes), key=scenes.index) 688 # List of scenes to be executed in folded state will have '_folded' 689 # prefix. This will help distinguish the test results from folded vs 690 # open device state for front camera_ids. 691 folded_device_scenes = [] 692 for scene in scenes: 693 folded_device_scenes.append(f'{scene}_folded') 694 695 logging.info('Running ITS on device: %s, camera(s): %s, scene(s): %s', 696 device_id, camera_id_combos, scenes) 697 698 # Determine if manual run 699 if tablet_id is not None and not set(scenes).intersection(_MANUAL_SCENES): 700 auto_scene_switch = True 701 else: 702 auto_scene_switch = False 703 logging.info('Manual, checkerboard scenes, or scene5 testing.') 704 705 folded_prompted = False 706 opened_prompted = False 707 for camera_id in camera_id_combos: 708 test_params_content['camera'] = camera_id 709 results = {} 710 unav_cameras = [] 711 # Get the list of unavailable cameras in current device state. 712 # These camera_ids should not be tested in current device state. 713 if testing_foldable_device: 714 unav_cameras = get_unavailable_physical_cameras( 715 device_id, _FRONT_CAMERA_ID) 716 717 if testing_foldable_device: 718 device_state = 'folded' if device_folded else 'opened' 719 720 testing_folded_front_camera = (testing_foldable_device and 721 device_folded and 722 _FRONT_CAMERA_ID in camera_id) 723 724 # Raise an assertion error if there is any camera unavailable in 725 # current device state. Usually scenes with suffix 'folded' will 726 # be executed in folded state. 727 if (testing_foldable_device and 728 _FRONT_CAMERA_ID in camera_id and camera_id in unav_cameras): 729 raise AssertionError( 730 f'Camera {camera_id} is unavailable in device state {device_state}' 731 f' and cannot be tested with device {device_state}!') 732 733 if (testing_folded_front_camera and camera_id not in unav_cameras 734 and not folded_prompted): 735 folded_prompted = True 736 input('\nYou are testing a foldable device in folded state. ' 737 'Please make sure the device is folded and press <ENTER> ' 738 'after positioning properly.\n') 739 740 if (testing_foldable_device and 741 not device_folded and _FRONT_CAMERA_ID in camera_id and 742 camera_id not in unav_cameras and not opened_prompted): 743 opened_prompted = True 744 input('\nYou are testing a foldable device in opened state. ' 745 'Please make sure the device is unfolded and press <ENTER> ' 746 'after positioning properly.\n') 747 748 # Run through all scenes if user does not supply one and config file doesn't 749 # have specific scene name listed. 750 if its_session_utils.SUB_CAMERA_SEPARATOR in camera_id: 751 possible_scenes = list(SUB_CAMERA_TESTS.keys()) 752 if auto_scene_switch: 753 possible_scenes.remove('sensor_fusion') 754 else: 755 if 'checkerboard' in scenes: 756 possible_scenes = _CHECKERBOARD_SCENES 757 elif 'scene_flash' in scenes: 758 possible_scenes = _FLASH_SCENES 759 elif 'scene_extensions' in scenes: 760 possible_scenes = _EXTENSIONS_SCENES 761 else: 762 possible_scenes = _TABLET_SCENES if auto_scene_switch else _ALL_SCENES 763 764 if ('<scene-name>' in scenes or 'checkerboard' in scenes or 765 'scene_extensions' in scenes): 766 per_camera_scenes = possible_scenes 767 else: 768 # Validate user input scene names 769 per_camera_scenes = [] 770 for s in scenes: 771 if s in possible_scenes: 772 per_camera_scenes.append(s) 773 if not per_camera_scenes: 774 raise ValueError('No valid scene specified for this camera.') 775 776 # Folded state scenes will have 'folded' suffix only for 777 # front cameras since rear cameras are common in both folded 778 # and unfolded state. 779 foldable_per_camera_scenes = [] 780 if testing_folded_front_camera: 781 if camera_id not in available_camera_ids_to_test_foldable: 782 raise AssertionError(f'camera {camera_id} is not available.') 783 for s in per_camera_scenes: 784 foldable_per_camera_scenes.append(f'{s}_folded') 785 786 if foldable_per_camera_scenes: 787 per_camera_scenes = foldable_per_camera_scenes 788 789 logging.info('camera: %s, scene(s): %s', camera_id, per_camera_scenes) 790 791 if testing_folded_front_camera: 792 all_scenes = [f'{s}_folded' for s in _ALL_SCENES] 793 else: 794 all_scenes = _ALL_SCENES 795 796 for s in all_scenes: 797 results[s] = {RESULT_KEY: RESULT_NOT_EXECUTED} 798 799 # assert device folded testing scenes with suffix 'folded' 800 if testing_foldable_device and 'folded' in s: 801 if not device_folded: 802 raise AssertionError('Device should be folded during' 803 ' testing scenes with suffix "folded"') 804 805 # A subdir in topdir will be created for each camera_id. All scene test 806 # output logs for each camera id will be stored in this subdir. 807 # This output log path is a mobly param : LogPath 808 camera_id_str = ( 809 camera_id.replace(its_session_utils.SUB_CAMERA_SEPARATOR, '_') 810 ) 811 mobly_output_logs_path = os.path.join(topdir, f'cam_id_{camera_id_str}') 812 os.mkdir(mobly_output_logs_path) 813 tot_pass = 0 814 for s in per_camera_scenes: 815 results[s]['TEST_STATUS'] = [] 816 results[s][METRICS_KEY] = [] 817 results[s][PERFORMANCE_KEY] = [] 818 results[s][FEATURE_QUERY_KEY] = [] 819 820 # unit is millisecond for execution time record in CtsVerifier 821 scene_start_time = int(round(time.time() * 1000)) 822 scene_test_summary = f'Cam{camera_id} {s}' + '\n' 823 mobly_scene_output_logs_path = os.path.join(mobly_output_logs_path, s) 824 825 # Since test directories do not have 'folded' in the name, we need 826 # to remove that suffix for the path of the scenes to be loaded 827 # on the tablets 828 testing_scene = s 829 if 'folded' in s: 830 testing_scene = s.split('_folded')[0] 831 test_params_content['scene'] = testing_scene 832 test_params_content['scene_with_suffix'] = s 833 834 if auto_scene_switch: 835 # Copy scene images onto the tablet 836 if 'scene0' not in testing_scene: 837 its_session_utils.copy_scenes_to_tablet(testing_scene, tablet_id) 838 else: 839 # Check manual scenes for correctness 840 if ('scene0' not in testing_scene and 841 not testing_sensor_fusion_with_controller): 842 check_manual_scenes(device_id, camera_id, testing_scene, 843 mobly_output_logs_path) 844 845 scene_test_list = [] 846 config_file_contents['TestBeds'][0]['TestParams'] = test_params_content 847 # Add the MoblyParams to config.yml file with the path to store camera_id 848 # test results. This is a separate dict other than TestBeds. 849 mobly_params_dict = { 850 'MoblyParams': { 851 'LogPath': mobly_scene_output_logs_path 852 } 853 } 854 config_file_contents.update(mobly_params_dict) 855 logging.debug('Final config file contents: %s', config_file_contents) 856 new_yml_file_name = get_updated_yml_file(config_file_contents) 857 logging.info('Using %s as temporary config yml file', new_yml_file_name) 858 if camera_id.rfind(its_session_utils.SUB_CAMERA_SEPARATOR) == -1: 859 scene_dir = os.listdir( 860 os.path.join(os.environ['CAMERA_ITS_TOP'], 'tests', testing_scene)) 861 for file_name in scene_dir: 862 if file_name.endswith('.py') and 'test' in file_name: 863 scene_test_list.append(file_name) 864 else: # sub-camera 865 if SUB_CAMERA_TESTS.get(testing_scene): 866 scene_test_list = [f'{test}.py' for test in SUB_CAMERA_TESTS[ 867 testing_scene]] 868 else: 869 scene_test_list = [] 870 scene_test_list.sort() 871 872 # Run tests for scene 873 logging.info('Running tests for %s with camera %s', 874 testing_scene, camera_id) 875 num_pass = 0 876 num_skip = 0 877 num_not_mandated_fail = 0 878 num_fail = 0 879 for test in scene_test_list: 880 # Handle repeated test 881 if 'tests/' in test: 882 cmd = [ 883 'python3', 884 os.path.join(os.environ['CAMERA_ITS_TOP'], test), '-c', 885 f'{new_yml_file_name}' 886 ] 887 else: 888 cmd = [ 889 'python3', 890 os.path.join(os.environ['CAMERA_ITS_TOP'], 'tests', 891 testing_scene, test), 892 '-c', 893 f'{new_yml_file_name}' 894 ] 895 return_string = '' 896 for num_try in range(NUM_TRIES): 897 # Saves to mobly test summary file 898 # print only messages for manual lighting control testing 899 output = subprocess.Popen( 900 cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT 901 ) 902 with output.stdout, open( 903 os.path.join(topdir, MOBLY_TEST_SUMMARY_TXT_FILE), 'wb' 904 ) as file: 905 for line in iter(output.stdout.readline, b''): 906 out = line.decode('utf-8').strip() 907 if '<ENTER>' in out: print(out) 908 file.write(line) 909 output.wait() 910 911 # Parse mobly logs to determine PASS/FAIL(*)/SKIP & socket FAILs 912 with open( 913 os.path.join(topdir, MOBLY_TEST_SUMMARY_TXT_FILE), 'r') as file: 914 test_code = output.returncode 915 test_skipped = False 916 test_not_yet_mandated = False 917 test_mpc_req = '' 918 perf_test_metrics = '' 919 hdr_mpc_req = '' 920 feature_query_proto = '' 921 content = file.read() 922 923 # Find media performance class logging 924 lines = content.splitlines() 925 for one_line in lines: 926 # regular expression pattern must match 927 # MPC12_CAMERA_LAUNCH_PATTERN or MPC12_JPEG_CAPTURE_PATTERN in 928 # ItsTestActivity.java. 929 mpc_string_match = re.search( 930 '^(1080p_jpeg_capture_time_ms:|camera_launch_time_ms:)', 931 one_line) 932 if mpc_string_match: 933 test_mpc_req = one_line 934 break 935 936 for one_line in lines: 937 # regular expression pattern must match in ItsTestActivity.java. 938 gainmap_string_match = re.search('^has_gainmap:', one_line) 939 if gainmap_string_match: 940 hdr_mpc_req = one_line 941 break 942 943 # Find feature combination query proto 944 for one_line in lines: 945 # regular expression pattern must match in ItsTestActivity.java. 946 feature_comb_query_string_match = re.search( 947 '^feature_query_proto:(.*)', one_line 948 ) 949 if feature_comb_query_string_match: 950 feature_query_proto = feature_comb_query_string_match.group(1) 951 break 952 953 # Find performance metrics logging 954 for one_line in lines: 955 # regular expression pattern must match in ItsTestActivity.java. 956 perf_metrics_string_match = re.search( 957 '^test.*:', 958 one_line) 959 if perf_metrics_string_match: 960 perf_test_metrics = one_line 961 # each test can add multiple metrics 962 results[s][PERFORMANCE_KEY].append(perf_test_metrics) 963 964 if 'Test skipped' in content: 965 return_string = 'SKIP ' 966 num_skip += 1 967 test_skipped = True 968 break 969 970 if its_session_utils.NOT_YET_MANDATED_MESSAGE in content: 971 return_string = 'FAIL*' 972 num_not_mandated_fail += 1 973 test_not_yet_mandated = True 974 break 975 976 if test_code == 0 and not test_skipped: 977 return_string = 'PASS ' 978 num_pass += 1 979 break 980 981 if test_code == 1 and not test_not_yet_mandated: 982 return_string = 'FAIL ' 983 if 'Problem with socket' in content and num_try != NUM_TRIES-1: 984 logging.info('Retry %s/%s', s, test) 985 else: 986 num_fail += 1 987 break 988 os.remove(os.path.join(topdir, MOBLY_TEST_SUMMARY_TXT_FILE)) 989 status_prefix = '' 990 if testbed_index is not None: 991 status_prefix = config_file_test_key + ':' 992 logging.info('%s%s %s/%s', status_prefix, return_string, s, test) 993 test_name = test.split('/')[-1].split('.')[0] 994 results[s]['TEST_STATUS'].append({ 995 'test': test_name, 996 'status': return_string.strip()}) 997 if test_mpc_req: 998 results[s][METRICS_KEY].append(test_mpc_req) 999 if hdr_mpc_req: 1000 results[s][METRICS_KEY].append(hdr_mpc_req) 1001 if feature_query_proto: 1002 results[s][FEATURE_QUERY_KEY].append(feature_query_proto) 1003 1004 msg_short = f'{return_string} {test}' 1005 scene_test_summary += msg_short + '\n' 1006 if (test in _LIGHTING_CONTROL_TESTS and 1007 not testing_flash_with_controller): 1008 print('Turn lights ON in rig and press <ENTER> to continue.') 1009 1010 # unit is millisecond for execution time record in CtsVerifier 1011 scene_end_time = int(round(time.time() * 1000)) 1012 skip_string = '' 1013 tot_tests = len(scene_test_list) 1014 tot_tests_run = tot_tests - num_skip 1015 if tot_tests_run != 0: 1016 tests_passed_ratio = (num_pass + num_not_mandated_fail) / tot_tests_run 1017 else: 1018 tests_passed_ratio = (num_pass + num_not_mandated_fail) / 100.0 1019 tests_passed_ratio_format = f'{(100 * tests_passed_ratio):.1f}%' 1020 if num_skip > 0: 1021 skip_string = f",{num_skip} test{'s' if num_skip > 1 else ''} skipped" 1022 test_result = (f'{num_pass + num_not_mandated_fail} / {tot_tests_run} ' 1023 f'tests passed ({tests_passed_ratio_format}){skip_string}') 1024 logging.info(test_result) 1025 if num_not_mandated_fail > 0: 1026 logging.info('(*) %s not_yet_mandated tests failed', 1027 num_not_mandated_fail) 1028 1029 tot_pass += num_pass 1030 logging.info('scene tests: %s, Total tests passed: %s', tot_tests, 1031 tot_pass) 1032 if tot_tests > 0: 1033 logging.info('%s compatibility score: %.f/100\n', 1034 s, 100 * num_pass / tot_tests) 1035 scene_test_summary_path = os.path.join(mobly_scene_output_logs_path, 1036 'scene_test_summary.txt') 1037 with open(scene_test_summary_path, 'w') as f: 1038 f.write(scene_test_summary) 1039 results[s][RESULT_KEY] = (RESULT_PASS if num_fail == 0 else RESULT_FAIL) 1040 results[s][SUMMARY_KEY] = scene_test_summary_path 1041 results[s][TIME_KEY_START] = scene_start_time 1042 results[s][TIME_KEY_END] = scene_end_time 1043 else: 1044 logging.info('%s compatibility score: 0/100\n') 1045 1046 # Delete temporary yml file after scene run. 1047 new_yaml_file_path = os.path.join(YAML_FILE_DIR, new_yml_file_name) 1048 os.remove(new_yaml_file_path) 1049 1050 # Log results per camera 1051 if num_testbeds is None or testbed_index == _MAIN_TESTBED: 1052 logging.info('Reporting camera %s ITS results to CtsVerifier', camera_id) 1053 logging.info('ITS results to CtsVerifier: %s', results) 1054 report_result(device_id, camera_id, tablet_name, results) 1055 else: 1056 write_result(testbed_index, device_id, camera_id, results) 1057 1058 logging.info('Test execution completed.') 1059 1060 # Power down tablet 1061 if tablet_id: 1062 cmd = f'adb -s {tablet_id} shell input keyevent KEYCODE_POWER' 1063 subprocess.Popen(cmd.split()) 1064 1065 # establish connection with lighting controller 1066 lighting_cntl = test_params_content.get('lighting_cntl', 'None') 1067 lighting_ch = test_params_content.get('lighting_ch', 'None') 1068 arduino_serial_port = lighting_control_utils.lighting_control( 1069 lighting_cntl, lighting_ch) 1070 1071 # turn OFF lights 1072 lighting_control_utils.set_lighting_state( 1073 arduino_serial_port, lighting_ch, 'OFF') 1074 1075 if num_testbeds is not None: 1076 if testbed_index == _MAIN_TESTBED: 1077 logging.info('Waiting for all testbeds to finish.') 1078 start = time.time() 1079 completed_testbeds = set() 1080 while time.time() < start + MERGE_RESULTS_TIMEOUT: 1081 for i in range(num_testbeds): 1082 if os.path.isfile(f'testbed_{i}_completed.tmp'): 1083 start = time.time() 1084 completed_testbeds.add(i) 1085 # Already reported _MAIN_TESTBED's results. 1086 if len(completed_testbeds) == num_testbeds - 1: 1087 logging.info('All testbeds completed, merging results.') 1088 for parsed_id, parsed_camera, parsed_results in ( 1089 parse_testbeds(completed_testbeds)): 1090 logging.debug('Parsed id: %s, parsed cam: %s, parsed results: %s', 1091 parsed_id, parsed_camera, parsed_results) 1092 if not are_devices_similar(device_id, parsed_id): 1093 logging.error('Device %s and device %s are not the same ' 1094 'model/type/build/revision.', 1095 device_id, parsed_id) 1096 return 1097 report_result(device_id, parsed_camera, tablet_name, parsed_results) 1098 for temp_file in glob.glob('testbed_*.tmp'): 1099 os.remove(temp_file) 1100 break 1101 else: 1102 logging.error('No testbeds finished in the last %d seconds, ' 1103 'but still expected data. ' 1104 'Completed testbed indices: %s, ' 1105 'expected number of testbeds: %d', 1106 MERGE_RESULTS_TIMEOUT, list(completed_testbeds), 1107 num_testbeds) 1108 else: 1109 with open(f'testbed_{testbed_index}_completed.tmp', 'w') as _: 1110 pass 1111 1112if __name__ == '__main__': 1113 main() 1114