xref: /aosp_15_r20/cts/apps/CameraITS/tools/run_all_tests.py (revision b7c941bb3fa97aba169d73cee0bed2de8ac964bf)
1# Copyright 2014 The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import glob
16import json
17import logging
18import os
19import os.path
20import re
21import subprocess
22import sys
23import tempfile
24import time
25import types
26
27import camera_properties_utils
28import capture_request_utils
29import image_processing_utils
30import its_device_utils
31import its_session_utils
32import lighting_control_utils
33import numpy as np
34import yaml
35
36
37YAML_FILE_DIR = os.environ['CAMERA_ITS_TOP']
38CONFIG_FILE = os.path.join(YAML_FILE_DIR, 'config.yml')
39TEST_KEY_TABLET = 'tablet'
40TEST_KEY_SENSOR_FUSION = 'sensor_fusion'
41ACTIVITY_START_WAIT = 1.5  # seconds
42MERGE_RESULTS_TIMEOUT = 3600  # seconds
43
44NUM_TRIES = 2
45RESULT_PASS = 'PASS'
46RESULT_FAIL = 'FAIL'
47RESULT_NOT_EXECUTED = 'NOT_EXECUTED'
48RESULT_KEY = 'result'
49METRICS_KEY = 'mpc_metrics'
50PERFORMANCE_KEY = 'performance_metrics'
51FEATURE_QUERY_KEY = 'feature_query_proto'
52SUMMARY_KEY = 'summary'
53RESULT_VALUES = (RESULT_PASS, RESULT_FAIL, RESULT_NOT_EXECUTED)
54CTS_VERIFIER_PACKAGE_NAME = 'com.android.cts.verifier'
55ACTION_ITS_RESULT = 'com.android.cts.verifier.camera.its.ACTION_ITS_RESULT'
56EXTRA_VERSION = 'camera.its.extra.VERSION'
57CURRENT_ITS_VERSION = '1.0'  # version number to sync with CtsVerifier
58EXTRA_CAMERA_ID = 'camera.its.extra.CAMERA_ID'
59EXTRA_RESULTS = 'camera.its.extra.RESULTS'
60EXTRA_TABLET_NAME = 'camera.its.extra.TABLET_NAME'
61TIME_KEY_START = 'start'
62TIME_KEY_END = 'end'
63VALID_CONTROLLERS = ('arduino', 'canakit')
64_FRONT_CAMERA_ID = '1'
65# recover replaced '_' in scene def
66_INT_STR_DICT = types.MappingProxyType({'11': '1_1', '12': '1_2'})
67_MAIN_TESTBED = 0
68_PROPERTIES_TO_MATCH = (
69    'ro.product.model', 'ro.product.name', 'ro.build.display.id', 'ro.revision'
70)
71
72# Scenes that can be automated through tablet display
73# Notes on scene names:
74#   scene*_1/2/... are same scene split to load balance run times for scenes
75#   scene*_a/b/... are similar scenes that share one or more tests
76_TABLET_SCENES = (
77    'scene0', 'scene1_1', 'scene1_2', 'scene1_3', 'scene2_a', 'scene2_b',
78    'scene2_c', 'scene2_d', 'scene2_e', 'scene2_f', 'scene2_g', 'scene3',
79    'scene4', 'scene6', 'scene6_tele', 'scene7', 'scene8', 'scene9',
80    os.path.join('scene_extensions', 'scene_hdr'),
81    os.path.join('scene_extensions', 'scene_low_light'),
82    'scene_video',
83)
84
85# Scenes that use the 'sensor_fusion' test rig
86_MOTION_SCENES = ('sensor_fusion', 'feature_combination',)
87
88# Scenes that uses lighting control
89_FLASH_SCENES = ('scene_flash',)
90
91# Scenes that uses checkerboard as chart
92_CHECKERBOARD_SCENES = ('sensor_fusion', 'scene_flash', 'feature_combination',)
93
94# Scenes that have to be run manually regardless of configuration
95_MANUAL_SCENES = ('scene5',)
96
97# Scene extensions
98_EXTENSIONS_SCENES = (os.path.join('scene_extensions', 'scene_hdr'),
99                      os.path.join('scene_extensions', 'scene_low_light'),
100                      )
101
102# All possible scenes
103_ALL_SCENES = _TABLET_SCENES + _MANUAL_SCENES + _MOTION_SCENES + _FLASH_SCENES
104
105# Scenes that are logically grouped and can be called as group
106# scene6_tele is not grouped with scene6 because it requires extension rig
107_GROUPED_SCENES = types.MappingProxyType({
108        'scene1': ('scene1_1', 'scene1_2'),
109        'scene2': ('scene2_a', 'scene2_b', 'scene2_c', 'scene2_d', 'scene2_e',
110                   'scene2_f', 'scene2_g')
111})
112
113# Scene requirements for manual testing.
114_SCENE_REQ = types.MappingProxyType({
115    'scene0': None,
116    'scene1_1': 'A grey card covering at least the middle 30% of the scene',
117    'scene1_2': 'A grey card covering at least the middle 30% of the scene',
118    'scene1_3': 'A grey card covering at least the middle 30% of the scene, '
119                'without a white border like scene1_1 or scene1_2',
120    'scene2_a': 'The picture with 3 faces in tests/scene2_a/scene2_a.png',
121    'scene2_b': 'The picture with 3 faces in tests/scene2_b/scene2_b.png',
122    'scene2_c': 'The picture with 3 faces in tests/scene2_c/scene2_c.png',
123    'scene2_d': 'The picture with 3 faces in tests/scene2_d/scene2_d.png',
124    'scene2_e': 'The picture with 3 faces in tests/scene2_e/scene2_e.png',
125    'scene2_f': 'The picture with 3 faces in tests/scene2_f/scene2_f.png',
126    'scene2_g': 'The picture with 3 profile faces in tests/scene2_g/scene2_g.png',
127    'scene3': 'The ISO12233 chart',
128    'scene4': 'A test chart of a circle covering at least the middle 50% of '
129              'the scene. See tests/scene4/scene4.png',
130    'scene5': 'Capture images with a diffuser attached to the camera. See '
131              'source.android.com/docs/compatibility/cts/camera-its-tests#scene5/diffuser '  # pylint: disable line-too-long
132              'for more details',
133    'scene6': 'A grid of ArUco markers on a white background. '
134              'See tests/scene6/scene6.png',
135    'scene6_tele': 'A grid of ArUco markers on a white background. Identical '
136                   'to scene6, but for tele cameras. '
137                   'See tests/scene6_tele/scene6_tele.png',
138    'scene7': 'The picture with 4 different colors, slanted edge and'
139              '4 ArUco markers. See tests/scene7/scene7.png',
140    'scene8': 'The picture with 4 faces in 4 different colors overlay.'
141              'See tests/scene8/scene8.png',
142    'scene9': 'A scene with high entropy consisting of random size and colored '
143              'circles. See tests/scene9/scene9.png',
144    # Use os.path to avoid confusion on other platforms
145    os.path.join('scene_extensions', 'scene_hdr'): (
146        'A tablet displayed scene with a face on the left '
147        'and a low-contrast QR code on the right. '
148        'See tests/scene_extensions/scene_hdr/scene_hdr.png'
149    ),
150    os.path.join('scene_extensions', 'scene_low_light'): (
151        'A tablet displayed scene with a grid of squares of varying '
152        'brightness. See '
153        'tests/scene_extensions/scene_low_light/scene_low_light.png'
154    ),
155    'sensor_fusion': 'A checkerboard pattern for phone to rotate in front of '
156                     'in tests/sensor_fusion/checkerboard.pdf\n'
157                     'See tests/sensor_fusion/SensorFusion.pdf for detailed '
158                     'instructions.\nNote that this test will be skipped '
159                     'on devices not supporting REALTIME camera timestamp.',
160    'feature_combination': 'The same scene as sensor_fusion, '
161                           'separated for easier testing.',
162    'scene_flash': 'A checkerboard pattern chart with lights off.',
163    'scene_video': 'A tablet displayed scene with a series of circles moving '
164                   'at different simulated frame rates. '
165                   'See tests/scene_video/scene_video.mp4',
166})
167
168# Made mutable to allow for test augmentation based on first API level
169SUB_CAMERA_TESTS = {
170    'scene0': (
171        'test_jitter',
172        'test_metadata',
173        'test_request_capture_match',
174        'test_sensor_events',
175        'test_solid_color_test_pattern',
176        'test_unified_timestamps',
177    ),
178    'scene1_1': (
179        'test_burst_capture',
180        'test_burst_sameness_manual',
181        'test_exposure_x_iso',
182        'test_linearity',
183    ),
184    'scene1_2': (
185        'test_raw_exposure',
186    ),
187    'scene1_3': (
188        'test_dng_noise_model',
189        'test_raw_sensitivity',
190        'test_yuv_plus_raw',
191    ),
192    'scene2_a': (
193        'test_num_faces',
194    ),
195    'scene4': (
196        'test_aspect_ratio_and_crop',
197    ),
198    'scene6_tele': (
199        'test_zoom_tele',
200    ),
201    'scene_video': (
202        'test_preview_frame_drop',
203    ),
204    'sensor_fusion': (
205        'test_sensor_fusion',
206    ),
207}
208
209_LIGHTING_CONTROL_TESTS = (
210    'test_auto_flash.py',
211    'test_preview_min_frame_rate.py',
212    'test_led_snapshot.py',
213    'test_night_extension.py',
214    'test_low_light_boost_extension.py',
215    'test_hdr_extension.py',
216    )
217
218_EXTENSION_NAMES = (
219    'hdr',
220    'low_light',
221)
222
223_DST_SCENE_DIR = '/sdcard/Download/'
224_SUB_CAMERA_LEVELS = 2
225MOBLY_TEST_SUMMARY_TXT_FILE = 'test_mobly_summary.txt'
226
227
228def report_result(device_id, camera_id, tablet_name, results):
229  """Sends a pass/fail result to the device, via an intent.
230
231  Args:
232   device_id: The ID string of the device to report the results to.
233   camera_id: The ID string of the camera for which to report pass/fail.
234   tablet_name: The tablet name to identify model and build.
235   results: a dictionary contains all ITS scenes as key and result/summary of
236            current ITS run. See test_report_result unit test for an example.
237  """
238  adb = f'adb -s {device_id}'
239  its_device_utils.start_its_test_activity(device_id)
240  time.sleep(ACTIVITY_START_WAIT)
241
242  # Validate/process results argument
243  for scene in results:
244    if RESULT_KEY not in results[scene]:
245      raise ValueError(f'ITS result not found for {scene}')
246    if results[scene][RESULT_KEY] not in RESULT_VALUES:
247      raise ValueError(f'Unknown ITS result for {scene}: {results[RESULT_KEY]}')
248    if SUMMARY_KEY in results[scene]:
249      device_summary_path = f'/sdcard/its_camera{camera_id}_{scene}.txt'
250      its_device_utils.run(
251          f'{adb} push {results[scene][SUMMARY_KEY]} {device_summary_path}')
252      results[scene][SUMMARY_KEY] = device_summary_path
253
254  json_results = json.dumps(results)
255  cmd = (f"{adb} shell am broadcast -a {ACTION_ITS_RESULT} --es {EXTRA_VERSION}"
256         f" {CURRENT_ITS_VERSION} --es {EXTRA_CAMERA_ID} {camera_id} --es "
257         f"{EXTRA_TABLET_NAME} {tablet_name} --es "
258         f"{EXTRA_RESULTS} \'{json_results}\'")
259  its_device_utils.run(cmd)
260
261
262def write_result(testbed_index, device_id, camera_id, results):
263  """Writes results to a temporary file for merging.
264
265  Args:
266    testbed_index: the index of a finished testbed.
267    device_id: the ID string of the device that created results.
268    camera_id: the ID string of the camera of the device.
269    results: a dictionary that contains all ITS scenes as key
270             and result/summary of current ITS run.
271  """
272  result = {'device_id': device_id, 'results': results}
273  file_name = f'testbed_{testbed_index}_camera_{camera_id}.tmp'
274  with open(file_name, 'w') as f:
275    json.dump(result, f)
276
277
278def parse_testbeds(completed_testbeds):
279  """Parses completed testbeds and yields device_id, camera_id, and results.
280
281  Args:
282    completed_testbeds: an iterable of completed testbed indices.
283  Yields:
284    device_id: the device associated with the testbed.
285    camera_id: one of the camera_ids associated with the testbed.
286    results: the dictionary with scenes and result/summary of testbed's run.
287  """
288  for i in completed_testbeds:
289    for file_name in glob.glob(f'testbed_{i}_camera_*.tmp'):
290      camera_id = file_name.split('camera_')[1].split('.tmp')[0]
291      device_id = ''
292      results = {}
293      with open(file_name, 'r') as f:
294        testbed_data = json.load(f)
295        device_id = testbed_data['device_id']
296        results = testbed_data['results']
297      if not device_id or not results:
298        raise ValueError(f'device_id or results for {file_name} not found.')
299      yield device_id, camera_id, results
300
301
302def get_device_property(device_id, property_name):
303  """Get property of a given device.
304
305  Args:
306    device_id: the ID string of a device.
307    property_name: the desired property string.
308  Returns:
309    The value of the property.
310  """
311  property_cmd = f'adb -s {device_id} shell getprop {property_name}'
312  raw_output = subprocess.check_output(
313      property_cmd, stderr=subprocess.STDOUT, shell=True)
314  return str(raw_output.decode('utf-8')).strip()
315
316
317def are_devices_similar(device_id_1, device_id_2):
318  """Checks if key dimensions are the same between devices.
319
320  Args:
321    device_id_1: the ID string of the _MAIN_TESTBED device.
322    device_id_2: the ID string of another device.
323  Returns:
324    True if both devices share key dimensions.
325  """
326  for property_to_match in _PROPERTIES_TO_MATCH:
327    property_value_1 = get_device_property(device_id_1, property_to_match)
328    property_value_2 = get_device_property(device_id_2, property_to_match)
329    if property_value_1 != property_value_2:
330      logging.error('%s does not match %s for %s',
331                    property_value_1, property_value_2, property_to_match)
332      return False
333  return True
334
335
336def check_manual_scenes(device_id, camera_id, scene, out_path):
337  """Halt run to change scenes.
338
339  Args:
340    device_id: id of device
341    camera_id: id of camera
342    scene: Name of the scene to copy image files.
343    out_path: output file location
344  """
345  hidden_physical_id = None
346  if its_session_utils.SUB_CAMERA_SEPARATOR in camera_id:
347    split_camera_ids = camera_id.split(its_session_utils.SUB_CAMERA_SEPARATOR)
348    if len(split_camera_ids) == _SUB_CAMERA_LEVELS:
349      camera_id = split_camera_ids[0]
350      hidden_physical_id = split_camera_ids[1]
351
352  with its_session_utils.ItsSession(
353      device_id=device_id,
354      camera_id=camera_id,
355      hidden_physical_id=hidden_physical_id) as cam:
356    props = cam.get_camera_properties()
357    props = cam.override_with_hidden_physical_camera_props(props)
358
359    while True:
360      input(f'\n Press <ENTER> after positioning camera {camera_id} with '
361            f'{scene}.\n The scene setup should be: \n  {_SCENE_REQ[scene]}\n')
362      # Converge 3A prior to capture
363      if scene == 'scene5':
364        cam.do_3a(do_af=False, lock_ae=camera_properties_utils.ae_lock(props),
365                  lock_awb=camera_properties_utils.awb_lock(props))
366      else:
367        cam.do_3a()
368      req, fmt = capture_request_utils.get_fastest_auto_capture_settings(props)
369      logging.info('Capturing an image to check the test scene')
370      cap = cam.do_capture(req, fmt)
371      img = image_processing_utils.convert_capture_to_rgb_image(cap)
372      img_name = os.path.join(out_path, f'test_{scene.replace("/", "_")}.jpg')
373      logging.info('Please check scene setup in %s', img_name)
374      image_processing_utils.write_image(img, img_name)
375      choice = input(f'Is the image okay for ITS {scene}? (Y/N)').lower()
376      if choice == 'y':
377        break
378
379
380def get_config_file_contents():
381  """Read the config file contents from a YML file.
382
383  Args:
384    None
385
386  Returns:
387    config_file_contents: a dict read from config.yml
388  """
389  with open(CONFIG_FILE) as file:
390    config_file_contents = yaml.safe_load(file)
391  return config_file_contents
392
393
394def get_test_params(config_file_contents):
395  """Reads the config file parameters.
396
397  Args:
398    config_file_contents: dict read from config.yml file
399
400  Returns:
401    dict of test parameters
402  """
403  test_params = None
404  for _, j in config_file_contents.items():
405    for datadict in j:
406      test_params = datadict.get('TestParams')
407  return test_params
408
409
410def get_device_serial_number(device, config_file_contents):
411  """Returns the serial number of the device with label from the config file.
412
413  The config file contains TestBeds dictionary which contains Controllers and
414  Android Device dicts.The two devices used by the test per box are listed
415  here labels dut and tablet. Parse through the nested TestBeds dict to get
416  the Android device details.
417
418  Args:
419    device: String device label as specified in config file.dut/tablet
420    config_file_contents: dict read from config.yml file
421  """
422
423  for _, j in config_file_contents.items():
424    for datadict in j:
425      android_device_contents = datadict.get('Controllers')
426      for device_dict in android_device_contents.get('AndroidDevice'):
427        for _, label in device_dict.items():
428          if label == 'tablet':
429            tablet_device_id = str(device_dict.get('serial'))
430          if label == 'dut':
431            dut_device_id = str(device_dict.get('serial'))
432  if device == 'tablet':
433    return tablet_device_id
434  else:
435    return dut_device_id
436
437
438def get_updated_yml_file(yml_file_contents):
439  """Create a new yml file and write the testbed contents in it.
440
441  This testbed file is per box and contains all the parameters and
442  device id used by the mobly tests.
443
444  Args:
445   yml_file_contents: Data to write in yml file.
446
447  Returns:
448    Updated yml file contents.
449  """
450  os.chmod(YAML_FILE_DIR, 0o755)
451  file_descriptor, new_yaml_file = tempfile.mkstemp(
452      suffix='.yml', prefix='config_', dir=YAML_FILE_DIR)
453  os.close(file_descriptor)
454  with open(new_yaml_file, 'w') as f:
455    yaml.dump(yml_file_contents, stream=f, default_flow_style=False)
456  new_yaml_file_name = os.path.basename(new_yaml_file)
457  return new_yaml_file_name
458
459
460def enable_external_storage(device_id):
461  """Override apk mode to allow write to external storage.
462
463  Args:
464    device_id: Serial number of the device.
465
466  """
467  cmd = (f'adb -s {device_id} shell appops '
468         'set com.android.cts.verifier MANAGE_EXTERNAL_STORAGE allow')
469  its_device_utils.run(cmd)
470
471
472def get_available_cameras(device_id, camera_id):
473  """Get available camera devices in the current state.
474
475  Args:
476    device_id: Serial number of the device.
477    camera_id: Logical camera_id
478
479  Returns:
480    List of all the available camera_ids.
481  """
482  with its_session_utils.ItsSession(
483      device_id=device_id,
484      camera_id=camera_id) as cam:
485    props = cam.get_camera_properties()
486    props = cam.override_with_hidden_physical_camera_props(props)
487    unavailable_physical_cameras = cam.get_unavailable_physical_cameras(
488        camera_id)
489    unavailable_physical_ids = unavailable_physical_cameras[
490        'unavailablePhysicalCamerasArray']
491    output = cam.get_camera_ids()
492    all_camera_ids = output['cameraIdArray']
493    # Concat camera_id, physical camera_id and sub camera separator
494    unavailable_physical_ids = [f'{camera_id}.{s}'
495                                for s in unavailable_physical_ids]
496    for i in unavailable_physical_ids:
497      if i in all_camera_ids:
498        all_camera_ids.remove(i)
499    logging.debug('available camera ids: %s', all_camera_ids)
500  return all_camera_ids
501
502
503def get_unavailable_physical_cameras(device_id, camera_id):
504  """Get unavailable physical cameras in the current state.
505
506  Args:
507    device_id: Serial number of the device.
508    camera_id: Logical camera device id
509
510  Returns:
511    List of all the unavailable camera_ids.
512  """
513  with its_session_utils.ItsSession(
514      device_id=device_id,
515      camera_id=camera_id) as cam:
516    unavailable_physical_cameras = cam.get_unavailable_physical_cameras(
517        camera_id)
518    unavailable_physical_ids = unavailable_physical_cameras[
519        'unavailablePhysicalCamerasArray']
520    unavailable_physical_ids = [f'{camera_id}.{s}'
521                                for s in unavailable_physical_ids]
522    logging.debug('Unavailable physical camera ids: %s',
523                  unavailable_physical_ids)
524  return unavailable_physical_ids
525
526
527def is_device_folded(device_id):
528  """Returns True if the foldable device is in folded state.
529
530  Args:
531    device_id: Serial number of the foldable device.
532  """
533  cmd = (f'adb -s {device_id} shell cmd device_state state')
534  result = subprocess.getoutput(cmd)
535  if 'CLOSE' in result:
536    return True
537  return False
538
539
540def augment_sub_camera_tests(first_api_level):
541  """Adds certain tests to SUB_CAMERA_TESTS depending on first_api_level.
542
543  Args:
544    first_api_level: First api level of the device.
545  """
546  if (first_api_level >= its_session_utils.ANDROID15_API_LEVEL):
547    logging.debug('Augmenting sub camera tests')
548    SUB_CAMERA_TESTS['scene6'] = ('test_in_sensor_zoom',)
549
550
551def main():
552  """Run all the Camera ITS automated tests.
553
554    Script should be run from the top-level CameraITS directory.
555
556    Command line arguments:
557        camera:  the camera(s) to be tested. Use comma to separate multiple
558                 camera Ids. Ex: "camera=0,1" or "camera=1"
559        scenes:  the test scene(s) to be executed. Use comma to separate
560                 multiple scenes. Ex: "scenes=scene0,scene1_1" or
561                 "scenes=0,1_1,sensor_fusion" (sceneX can be abbreviated by X
562                 where X is scene name minus 'scene')
563  """
564  logging.basicConfig(level=logging.INFO)
565  # Make output directories to hold the generated files.
566  topdir = tempfile.mkdtemp(prefix='CameraITS_')
567  try:
568    subprocess.call(['chmod', 'g+rx', topdir])
569  except OSError as e:
570    logging.info(repr(e))
571
572  scenes = []
573  camera_id_combos = []
574  testbed_index = None
575  num_testbeds = None
576  # Override camera, scenes and testbed with cmd line values if available
577  for s in list(sys.argv[1:]):
578    if 'scenes=' in s:
579      scenes = s.split('=')[1].split(',')
580    elif 'camera=' in s:
581      camera_id_combos = s.split('=')[1].split(',')
582    elif 'testbed_index=' in s:
583      testbed_index = int(s.split('=')[1])
584    elif 'num_testbeds=' in s:
585      num_testbeds = int(s.split('=')[1])
586    else:
587      raise ValueError(f'Unknown argument {s}')
588  if testbed_index is None and num_testbeds is not None:
589    raise ValueError(
590        'testbed_index must be specified if num_testbeds is specified.')
591  if (testbed_index is not None and num_testbeds is not None and
592      testbed_index >= num_testbeds):
593    raise ValueError('testbed_index must be less than num_testbeds. '
594                     'testbed_index starts at 0.')
595
596  # Prepend 'scene' if not specified at cmd line
597  for i, s in enumerate(scenes):
598    if (not s.startswith('scene') and
599        not s.startswith(('checkerboard', 'sensor_fusion',
600                          'flash', 'feature_combination', '<scene-name>'))):
601      scenes[i] = f'scene{s}'
602    if s.startswith('flash') or s.startswith('extensions'):
603      scenes[i] = f'scene_{s}'
604    # Handle scene_extensions
605    if any(s.startswith(extension) for extension in _EXTENSION_NAMES):
606      scenes[i] = f'scene_extensions/scene_{s}'
607    if (any(s.startswith('scene_' + extension)
608            for extension in _EXTENSION_NAMES)):
609      scenes[i] = f'scene_extensions/{s}'
610
611  # Read config file and extract relevant TestBed
612  config_file_contents = get_config_file_contents()
613  if testbed_index is None:
614    for i in config_file_contents['TestBeds']:
615      if scenes in (
616          ['sensor_fusion'], ['checkerboard'], ['scene_flash'],
617          ['feature_combination']
618      ):
619        if TEST_KEY_SENSOR_FUSION not in i['Name'].lower():
620          config_file_contents['TestBeds'].remove(i)
621      else:
622        if TEST_KEY_SENSOR_FUSION in i['Name'].lower():
623          config_file_contents['TestBeds'].remove(i)
624  else:
625    config_file_contents = {
626        'TestBeds': [config_file_contents['TestBeds'][testbed_index]]
627    }
628
629  # Get test parameters from config file
630  test_params_content = get_test_params(config_file_contents)
631  if not camera_id_combos:
632    camera_id_combos = str(test_params_content['camera']).split(',')
633  if not scenes:
634    scenes = str(test_params_content['scene']).split(',')
635    scenes = [_INT_STR_DICT.get(n, n) for n in scenes]  # recover '1_1' & '1_2'
636
637  device_id = get_device_serial_number('dut', config_file_contents)
638  # Enable external storage on DUT to send summary report to CtsVerifier.apk
639  enable_external_storage(device_id)
640
641  # Add to SUB_CAMERA_TESTS depending on first_api_level
642  augment_sub_camera_tests(its_session_utils.get_first_api_level(device_id))
643
644  # Verify that CTS Verifier is installed
645  its_session_utils.check_apk_installed(device_id, CTS_VERIFIER_PACKAGE_NAME)
646  # Check whether the dut is foldable or not
647  testing_foldable_device = True if test_params_content[
648      'foldable_device'] == 'True' else False
649  available_camera_ids_to_test_foldable = []
650  if testing_foldable_device:
651    logging.debug('Testing foldable device.')
652    # Check the state of foldable device. True if device is folded,
653    # false if the device is opened.
654    device_folded = is_device_folded(device_id)
655    # list of available camera_ids to be tested in device state
656    available_camera_ids_to_test_foldable = get_available_cameras(
657        device_id, _FRONT_CAMERA_ID)
658
659  config_file_test_key = config_file_contents['TestBeds'][0]['Name'].lower()
660  logging.info('Saving %s output files to: %s', config_file_test_key, topdir)
661  if TEST_KEY_TABLET in config_file_test_key:
662    tablet_id = get_device_serial_number('tablet', config_file_contents)
663    tablet_name_cmd = f'adb -s {tablet_id} shell getprop ro.product.device'
664    raw_output = subprocess.check_output(
665        tablet_name_cmd, stderr=subprocess.STDOUT, shell=True)
666    tablet_name = str(raw_output.decode('utf-8')).strip()
667    logging.debug('Tablet name: %s', tablet_name)
668    brightness = test_params_content['brightness']
669    its_session_utils.validate_tablet(tablet_name, brightness, tablet_id)
670  else:
671    tablet_id = None
672    tablet_name = "sensor_fusion"
673
674  testing_sensor_fusion_with_controller = False
675  if TEST_KEY_SENSOR_FUSION in config_file_test_key:
676    if test_params_content['rotator_cntl'].lower() in VALID_CONTROLLERS:
677      testing_sensor_fusion_with_controller = True
678
679  testing_flash_with_controller = False
680  if (test_params_content.get('lighting_cntl', 'None').lower() == 'arduino' and
681      'manual' not in config_file_test_key):
682    testing_flash_with_controller = True
683
684  # Expand GROUPED_SCENES and remove any duplicates
685  scenes = [_GROUPED_SCENES[s] if s in _GROUPED_SCENES else s for s in scenes]
686  scenes = np.hstack(scenes).tolist()
687  scenes = sorted(set(scenes), key=scenes.index)
688  # List of scenes to be executed in folded state will have '_folded'
689  # prefix. This will help distinguish the test results from folded vs
690  # open device state for front camera_ids.
691  folded_device_scenes = []
692  for scene in scenes:
693    folded_device_scenes.append(f'{scene}_folded')
694
695  logging.info('Running ITS on device: %s, camera(s): %s, scene(s): %s',
696               device_id, camera_id_combos, scenes)
697
698  # Determine if manual run
699  if tablet_id is not None and not set(scenes).intersection(_MANUAL_SCENES):
700    auto_scene_switch = True
701  else:
702    auto_scene_switch = False
703    logging.info('Manual, checkerboard scenes, or scene5 testing.')
704
705  folded_prompted = False
706  opened_prompted = False
707  for camera_id in camera_id_combos:
708    test_params_content['camera'] = camera_id
709    results = {}
710    unav_cameras = []
711    # Get the list of unavailable cameras in current device state.
712    # These camera_ids should not be tested in current device state.
713    if testing_foldable_device:
714      unav_cameras = get_unavailable_physical_cameras(
715          device_id, _FRONT_CAMERA_ID)
716
717    if testing_foldable_device:
718      device_state = 'folded' if device_folded else 'opened'
719
720    testing_folded_front_camera = (testing_foldable_device and
721                                   device_folded and
722                                   _FRONT_CAMERA_ID in camera_id)
723
724    # Raise an assertion error if there is any camera unavailable in
725    # current device state. Usually scenes with suffix 'folded' will
726    # be executed in folded state.
727    if (testing_foldable_device and
728        _FRONT_CAMERA_ID in camera_id and camera_id in unav_cameras):
729      raise AssertionError(
730          f'Camera {camera_id} is unavailable in device state {device_state}'
731          f' and cannot be tested with device {device_state}!')
732
733    if (testing_folded_front_camera and camera_id not in unav_cameras
734        and not folded_prompted):
735      folded_prompted = True
736      input('\nYou are testing a foldable device in folded state. '
737            'Please make sure the device is folded and press <ENTER> '
738            'after positioning properly.\n')
739
740    if (testing_foldable_device and
741        not device_folded and _FRONT_CAMERA_ID in camera_id and
742        camera_id not in unav_cameras and not opened_prompted):
743      opened_prompted = True
744      input('\nYou are testing a foldable device in opened state. '
745            'Please make sure the device is unfolded and press <ENTER> '
746            'after positioning properly.\n')
747
748    # Run through all scenes if user does not supply one and config file doesn't
749    # have specific scene name listed.
750    if its_session_utils.SUB_CAMERA_SEPARATOR in camera_id:
751      possible_scenes = list(SUB_CAMERA_TESTS.keys())
752      if auto_scene_switch:
753        possible_scenes.remove('sensor_fusion')
754    else:
755      if 'checkerboard' in scenes:
756        possible_scenes = _CHECKERBOARD_SCENES
757      elif 'scene_flash' in scenes:
758        possible_scenes = _FLASH_SCENES
759      elif 'scene_extensions' in scenes:
760        possible_scenes = _EXTENSIONS_SCENES
761      else:
762        possible_scenes = _TABLET_SCENES if auto_scene_switch else _ALL_SCENES
763
764    if ('<scene-name>' in scenes or 'checkerboard' in scenes or
765        'scene_extensions' in scenes):
766      per_camera_scenes = possible_scenes
767    else:
768      # Validate user input scene names
769      per_camera_scenes = []
770      for s in scenes:
771        if s in possible_scenes:
772          per_camera_scenes.append(s)
773      if not per_camera_scenes:
774        raise ValueError('No valid scene specified for this camera.')
775
776    # Folded state scenes will have 'folded' suffix only for
777    # front cameras since rear cameras are common in both folded
778    # and unfolded state.
779    foldable_per_camera_scenes = []
780    if testing_folded_front_camera:
781      if camera_id not in available_camera_ids_to_test_foldable:
782        raise AssertionError(f'camera {camera_id} is not available.')
783      for s in per_camera_scenes:
784        foldable_per_camera_scenes.append(f'{s}_folded')
785
786    if foldable_per_camera_scenes:
787      per_camera_scenes = foldable_per_camera_scenes
788
789    logging.info('camera: %s, scene(s): %s', camera_id, per_camera_scenes)
790
791    if testing_folded_front_camera:
792      all_scenes = [f'{s}_folded' for s in _ALL_SCENES]
793    else:
794      all_scenes = _ALL_SCENES
795
796    for s in all_scenes:
797      results[s] = {RESULT_KEY: RESULT_NOT_EXECUTED}
798
799      # assert device folded testing scenes with suffix 'folded'
800      if testing_foldable_device and 'folded' in s:
801        if not device_folded:
802          raise AssertionError('Device should be folded during'
803                               ' testing scenes with suffix "folded"')
804
805    # A subdir in topdir will be created for each camera_id. All scene test
806    # output logs for each camera id will be stored in this subdir.
807    # This output log path is a mobly param : LogPath
808    camera_id_str = (
809        camera_id.replace(its_session_utils.SUB_CAMERA_SEPARATOR, '_')
810    )
811    mobly_output_logs_path = os.path.join(topdir, f'cam_id_{camera_id_str}')
812    os.mkdir(mobly_output_logs_path)
813    tot_pass = 0
814    for s in per_camera_scenes:
815      results[s]['TEST_STATUS'] = []
816      results[s][METRICS_KEY] = []
817      results[s][PERFORMANCE_KEY] = []
818      results[s][FEATURE_QUERY_KEY] = []
819
820      # unit is millisecond for execution time record in CtsVerifier
821      scene_start_time = int(round(time.time() * 1000))
822      scene_test_summary = f'Cam{camera_id} {s}' + '\n'
823      mobly_scene_output_logs_path = os.path.join(mobly_output_logs_path, s)
824
825      # Since test directories do not have 'folded' in the name, we need
826      # to remove that suffix for the path of the scenes to be loaded
827      # on the tablets
828      testing_scene = s
829      if 'folded' in s:
830        testing_scene = s.split('_folded')[0]
831      test_params_content['scene'] = testing_scene
832      test_params_content['scene_with_suffix'] = s
833
834      if auto_scene_switch:
835        # Copy scene images onto the tablet
836        if 'scene0' not in testing_scene:
837          its_session_utils.copy_scenes_to_tablet(testing_scene, tablet_id)
838      else:
839        # Check manual scenes for correctness
840        if ('scene0' not in testing_scene and
841            not testing_sensor_fusion_with_controller):
842          check_manual_scenes(device_id, camera_id, testing_scene,
843                              mobly_output_logs_path)
844
845      scene_test_list = []
846      config_file_contents['TestBeds'][0]['TestParams'] = test_params_content
847      # Add the MoblyParams to config.yml file with the path to store camera_id
848      # test results. This is a separate dict other than TestBeds.
849      mobly_params_dict = {
850          'MoblyParams': {
851              'LogPath': mobly_scene_output_logs_path
852          }
853      }
854      config_file_contents.update(mobly_params_dict)
855      logging.debug('Final config file contents: %s', config_file_contents)
856      new_yml_file_name = get_updated_yml_file(config_file_contents)
857      logging.info('Using %s as temporary config yml file', new_yml_file_name)
858      if camera_id.rfind(its_session_utils.SUB_CAMERA_SEPARATOR) == -1:
859        scene_dir = os.listdir(
860            os.path.join(os.environ['CAMERA_ITS_TOP'], 'tests', testing_scene))
861        for file_name in scene_dir:
862          if file_name.endswith('.py') and 'test' in file_name:
863            scene_test_list.append(file_name)
864      else:  # sub-camera
865        if SUB_CAMERA_TESTS.get(testing_scene):
866          scene_test_list = [f'{test}.py' for test in SUB_CAMERA_TESTS[
867              testing_scene]]
868        else:
869          scene_test_list = []
870      scene_test_list.sort()
871
872      # Run tests for scene
873      logging.info('Running tests for %s with camera %s',
874                   testing_scene, camera_id)
875      num_pass = 0
876      num_skip = 0
877      num_not_mandated_fail = 0
878      num_fail = 0
879      for test in scene_test_list:
880        # Handle repeated test
881        if 'tests/' in test:
882          cmd = [
883              'python3',
884              os.path.join(os.environ['CAMERA_ITS_TOP'], test), '-c',
885              f'{new_yml_file_name}'
886          ]
887        else:
888          cmd = [
889              'python3',
890              os.path.join(os.environ['CAMERA_ITS_TOP'], 'tests',
891                           testing_scene, test),
892              '-c',
893              f'{new_yml_file_name}'
894          ]
895        return_string = ''
896        for num_try in range(NUM_TRIES):
897          # Saves to mobly test summary file
898          # print only messages for manual lighting control testing
899          output = subprocess.Popen(
900              cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
901          )
902          with output.stdout, open(
903              os.path.join(topdir, MOBLY_TEST_SUMMARY_TXT_FILE), 'wb'
904          ) as file:
905            for line in iter(output.stdout.readline, b''):
906              out = line.decode('utf-8').strip()
907              if '<ENTER>' in out: print(out)
908              file.write(line)
909          output.wait()
910
911          # Parse mobly logs to determine PASS/FAIL(*)/SKIP & socket FAILs
912          with open(
913              os.path.join(topdir, MOBLY_TEST_SUMMARY_TXT_FILE), 'r') as file:
914            test_code = output.returncode
915            test_skipped = False
916            test_not_yet_mandated = False
917            test_mpc_req = ''
918            perf_test_metrics = ''
919            hdr_mpc_req = ''
920            feature_query_proto = ''
921            content = file.read()
922
923            # Find media performance class logging
924            lines = content.splitlines()
925            for one_line in lines:
926              # regular expression pattern must match
927              # MPC12_CAMERA_LAUNCH_PATTERN or MPC12_JPEG_CAPTURE_PATTERN in
928              # ItsTestActivity.java.
929              mpc_string_match = re.search(
930                  '^(1080p_jpeg_capture_time_ms:|camera_launch_time_ms:)',
931                  one_line)
932              if mpc_string_match:
933                test_mpc_req = one_line
934                break
935
936            for one_line in lines:
937              # regular expression pattern must match in ItsTestActivity.java.
938              gainmap_string_match = re.search('^has_gainmap:', one_line)
939              if gainmap_string_match:
940                hdr_mpc_req = one_line
941                break
942
943            # Find feature combination query proto
944            for one_line in lines:
945              # regular expression pattern must match in ItsTestActivity.java.
946              feature_comb_query_string_match = re.search(
947                  '^feature_query_proto:(.*)', one_line
948              )
949              if feature_comb_query_string_match:
950                feature_query_proto = feature_comb_query_string_match.group(1)
951                break
952
953            # Find performance metrics logging
954            for one_line in lines:
955              # regular expression pattern must match in ItsTestActivity.java.
956              perf_metrics_string_match = re.search(
957                  '^test.*:',
958                  one_line)
959              if perf_metrics_string_match:
960                perf_test_metrics = one_line
961                # each test can add multiple metrics
962                results[s][PERFORMANCE_KEY].append(perf_test_metrics)
963
964            if 'Test skipped' in content:
965              return_string = 'SKIP '
966              num_skip += 1
967              test_skipped = True
968              break
969
970            if its_session_utils.NOT_YET_MANDATED_MESSAGE in content:
971              return_string = 'FAIL*'
972              num_not_mandated_fail += 1
973              test_not_yet_mandated = True
974              break
975
976            if test_code == 0 and not test_skipped:
977              return_string = 'PASS '
978              num_pass += 1
979              break
980
981            if test_code == 1 and not test_not_yet_mandated:
982              return_string = 'FAIL '
983              if 'Problem with socket' in content and num_try != NUM_TRIES-1:
984                logging.info('Retry %s/%s', s, test)
985              else:
986                num_fail += 1
987                break
988            os.remove(os.path.join(topdir, MOBLY_TEST_SUMMARY_TXT_FILE))
989        status_prefix = ''
990        if testbed_index is not None:
991          status_prefix = config_file_test_key + ':'
992        logging.info('%s%s %s/%s', status_prefix, return_string, s, test)
993        test_name = test.split('/')[-1].split('.')[0]
994        results[s]['TEST_STATUS'].append({
995            'test': test_name,
996            'status': return_string.strip()})
997        if test_mpc_req:
998          results[s][METRICS_KEY].append(test_mpc_req)
999        if hdr_mpc_req:
1000          results[s][METRICS_KEY].append(hdr_mpc_req)
1001        if feature_query_proto:
1002          results[s][FEATURE_QUERY_KEY].append(feature_query_proto)
1003
1004        msg_short = f'{return_string} {test}'
1005        scene_test_summary += msg_short + '\n'
1006        if (test in _LIGHTING_CONTROL_TESTS and
1007            not testing_flash_with_controller):
1008          print('Turn lights ON in rig and press <ENTER> to continue.')
1009
1010      # unit is millisecond for execution time record in CtsVerifier
1011      scene_end_time = int(round(time.time() * 1000))
1012      skip_string = ''
1013      tot_tests = len(scene_test_list)
1014      tot_tests_run = tot_tests - num_skip
1015      if tot_tests_run != 0:
1016        tests_passed_ratio = (num_pass + num_not_mandated_fail) / tot_tests_run
1017      else:
1018        tests_passed_ratio = (num_pass + num_not_mandated_fail) / 100.0
1019      tests_passed_ratio_format = f'{(100 * tests_passed_ratio):.1f}%'
1020      if num_skip > 0:
1021        skip_string = f",{num_skip} test{'s' if num_skip > 1 else ''} skipped"
1022      test_result = (f'{num_pass + num_not_mandated_fail} / {tot_tests_run} '
1023                     f'tests passed ({tests_passed_ratio_format}){skip_string}')
1024      logging.info(test_result)
1025      if num_not_mandated_fail > 0:
1026        logging.info('(*) %s not_yet_mandated tests failed',
1027                     num_not_mandated_fail)
1028
1029      tot_pass += num_pass
1030      logging.info('scene tests: %s, Total tests passed: %s', tot_tests,
1031                   tot_pass)
1032      if tot_tests > 0:
1033        logging.info('%s compatibility score: %.f/100\n',
1034                     s, 100 * num_pass / tot_tests)
1035        scene_test_summary_path = os.path.join(mobly_scene_output_logs_path,
1036                                               'scene_test_summary.txt')
1037        with open(scene_test_summary_path, 'w') as f:
1038          f.write(scene_test_summary)
1039        results[s][RESULT_KEY] = (RESULT_PASS if num_fail == 0 else RESULT_FAIL)
1040        results[s][SUMMARY_KEY] = scene_test_summary_path
1041        results[s][TIME_KEY_START] = scene_start_time
1042        results[s][TIME_KEY_END] = scene_end_time
1043      else:
1044        logging.info('%s compatibility score: 0/100\n')
1045
1046      # Delete temporary yml file after scene run.
1047      new_yaml_file_path = os.path.join(YAML_FILE_DIR, new_yml_file_name)
1048      os.remove(new_yaml_file_path)
1049
1050    # Log results per camera
1051    if num_testbeds is None or testbed_index == _MAIN_TESTBED:
1052      logging.info('Reporting camera %s ITS results to CtsVerifier', camera_id)
1053      logging.info('ITS results to CtsVerifier: %s', results)
1054      report_result(device_id, camera_id, tablet_name, results)
1055    else:
1056      write_result(testbed_index, device_id, camera_id, results)
1057
1058  logging.info('Test execution completed.')
1059
1060  # Power down tablet
1061  if tablet_id:
1062    cmd = f'adb -s {tablet_id} shell input keyevent KEYCODE_POWER'
1063    subprocess.Popen(cmd.split())
1064
1065  # establish connection with lighting controller
1066  lighting_cntl = test_params_content.get('lighting_cntl', 'None')
1067  lighting_ch = test_params_content.get('lighting_ch', 'None')
1068  arduino_serial_port = lighting_control_utils.lighting_control(
1069      lighting_cntl, lighting_ch)
1070
1071  # turn OFF lights
1072  lighting_control_utils.set_lighting_state(
1073      arduino_serial_port, lighting_ch, 'OFF')
1074
1075  if num_testbeds is not None:
1076    if testbed_index == _MAIN_TESTBED:
1077      logging.info('Waiting for all testbeds to finish.')
1078      start = time.time()
1079      completed_testbeds = set()
1080      while time.time() < start + MERGE_RESULTS_TIMEOUT:
1081        for i in range(num_testbeds):
1082          if os.path.isfile(f'testbed_{i}_completed.tmp'):
1083            start = time.time()
1084            completed_testbeds.add(i)
1085        # Already reported _MAIN_TESTBED's results.
1086        if len(completed_testbeds) == num_testbeds - 1:
1087          logging.info('All testbeds completed, merging results.')
1088          for parsed_id, parsed_camera, parsed_results in (
1089              parse_testbeds(completed_testbeds)):
1090            logging.debug('Parsed id: %s, parsed cam: %s, parsed results: %s',
1091                          parsed_id, parsed_camera, parsed_results)
1092            if not are_devices_similar(device_id, parsed_id):
1093              logging.error('Device %s and device %s are not the same '
1094                            'model/type/build/revision.',
1095                            device_id, parsed_id)
1096              return
1097            report_result(device_id, parsed_camera, tablet_name, parsed_results)
1098          for temp_file in glob.glob('testbed_*.tmp'):
1099            os.remove(temp_file)
1100          break
1101      else:
1102        logging.error('No testbeds finished in the last %d seconds, '
1103                      'but still expected data. '
1104                      'Completed testbed indices: %s, '
1105                      'expected number of testbeds: %d',
1106                      MERGE_RESULTS_TIMEOUT, list(completed_testbeds),
1107                      num_testbeds)
1108    else:
1109      with open(f'testbed_{testbed_index}_completed.tmp', 'w') as _:
1110        pass
1111
1112if __name__ == '__main__':
1113  main()
1114