1# Copyright 2024 The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Utility functions for verifying preview stabilization. 15""" 16 17import cv2 18import logging 19import os 20import threading 21import time 22 23import numpy as np 24 25import its_session_utils 26import sensor_fusion_utils 27import video_processing_utils 28 29_AREA_720P_VIDEO = 1280 * 720 30_ASPECT_RATIO_16_9 = 16/9 # determine if preview fmt > 16:9 31_ASPECT_TOL = 0.01 32_GREEN_TOL = 200 # 200 out of 255 Green value in RGB 33_GREEN_PERCENT = 95 34_HIGH_RES_SIZE = '3840x2160' # Resolution for 4K quality 35_IMG_FORMAT = 'png' 36_MIN_PHONE_MOVEMENT_ANGLE = 5 # degrees 37_NATURAL_ORIENTATION_PORTRAIT = (90, 270) # orientation in "normal position" 38_NUM_ROTATIONS = 24 39_PREVIEW_DURATION = 400 # milliseconds 40_PREVIEW_MAX_TESTED_AREA = 1920 * 1440 41_PREVIEW_MIN_TESTED_AREA = 320 * 240 42_PREVIEW_STABILIZATION_FACTOR = 0.7 # 70% of gyro movement allowed 43_RED_BLUE_TOL = 20 # 20 out of 255 Red or Blue value in RGB 44_SKIP_INITIAL_FRAMES = 15 45_START_FRAME = 30 # give 3A some frames to warm up 46_VIDEO_DELAY_TIME = 5.5 # seconds 47_VIDEO_DURATION = 5.5 # seconds 48 49 50def get_720p_or_above_size(supported_preview_sizes): 51 """Returns the smallest size above or equal to 720p in preview and video. 52 53 If the largest preview size is under 720P, returns the largest value. 54 55 Args: 56 supported_preview_sizes: list; preview sizes. 57 e.g. ['1920x960', '1600x1200', '1920x1080'] 58 Returns: 59 smallest size >= 720p video format 60 """ 61 62 size_to_area = lambda s: int(s.split('x')[0])*int(s.split('x')[1]) 63 smallest_area = float('inf') 64 smallest_720p_or_above_size = '' 65 largest_supported_preview_size = '' 66 largest_area = 0 67 for size in supported_preview_sizes: 68 area = size_to_area(size) 69 if smallest_area > area >= _AREA_720P_VIDEO: 70 smallest_area = area 71 smallest_720p_or_above_size = size 72 else: 73 if area > largest_area: 74 largest_area = area 75 largest_supported_preview_size = size 76 77 if largest_area > _AREA_720P_VIDEO: 78 logging.debug('Smallest 720p or above size: %s', 79 smallest_720p_or_above_size) 80 return smallest_720p_or_above_size 81 else: 82 logging.debug('Largest supported preview size: %s', 83 largest_supported_preview_size) 84 return largest_supported_preview_size 85 86 87def collect_data(cam, tablet_device, preview_size, stabilize, rot_rig, 88 zoom_ratio=None, fps_range=None, hlg10=False, ois=False): 89 """Capture a new set of data from the device. 90 91 Captures camera preview frames while the user is moving the device in 92 the prescribed manner. 93 94 Args: 95 cam: camera object. 96 tablet_device: boolean; based on config file. 97 preview_size: str; preview stream resolution. ex. '1920x1080' 98 stabilize: boolean; whether preview stabilization is ON. 99 rot_rig: dict with 'cntl' and 'ch' defined. 100 zoom_ratio: float; static zoom ratio. None if default zoom. 101 fps_range: list; target fps range. 102 hlg10: boolean; whether to capture hlg10 output. 103 ois: boolean; whether optical image stabilization is ON. 104 Returns: 105 recording object; a dictionary containing output path, video size, etc. 106 """ 107 108 output_surfaces = cam.preview_surface(preview_size, hlg10) 109 return collect_data_with_surfaces(cam, tablet_device, output_surfaces, 110 stabilize, rot_rig, zoom_ratio, 111 fps_range, ois) 112 113 114def collect_data_with_surfaces(cam, tablet_device, output_surfaces, 115 stabilize, rot_rig, zoom_ratio=None, 116 fps_range=None, ois=False): 117 """Capture a new set of data from the device. 118 119 Captures camera preview frames while the user is moving the device in 120 the prescribed manner. 121 122 Args: 123 cam: camera object. 124 tablet_device: boolean; based on config file. 125 output_surfaces: list of dict; The list of output surfaces configured for 126 the recording. Only the first surface is used for recording; the rest are 127 configured, but not requested. 128 stabilize: boolean; whether preview stabilization is ON. 129 rot_rig: dict with 'cntl' and 'ch' defined. 130 zoom_ratio: float; static zoom ratio. None if default zoom. 131 fps_range: list; target fps range. 132 ois: boolean; whether optical image stabilization is ON. 133 Returns: 134 recording object; a dictionary containing output path, video size, etc. 135 """ 136 137 logging.debug('Starting sensor event collection') 138 serial_port = None 139 if rot_rig['cntl'].lower() == sensor_fusion_utils.ARDUINO_STRING.lower(): 140 # identify port 141 serial_port = sensor_fusion_utils.serial_port_def( 142 sensor_fusion_utils.ARDUINO_STRING) 143 # send test cmd to Arduino until cmd returns properly 144 sensor_fusion_utils.establish_serial_comm(serial_port) 145 # Start camera vibration 146 if tablet_device: 147 servo_speed = sensor_fusion_utils.ARDUINO_SERVO_SPEED_STABILIZATION_TABLET 148 else: 149 servo_speed = sensor_fusion_utils.ARDUINO_SERVO_SPEED_STABILIZATION 150 p = threading.Thread( 151 target=sensor_fusion_utils.rotation_rig, 152 args=( 153 rot_rig['cntl'], 154 rot_rig['ch'], 155 _NUM_ROTATIONS, 156 sensor_fusion_utils.ARDUINO_ANGLES_STABILIZATION, 157 servo_speed, 158 sensor_fusion_utils.ARDUINO_MOVE_TIME_STABILIZATION, 159 serial_port, 160 ), 161 ) 162 p.start() 163 164 cam.start_sensor_events() 165 # Allow time for rig to start moving 166 time.sleep(_VIDEO_DELAY_TIME) 167 168 # Record video and return recording object 169 min_fps = fps_range[0] if (fps_range is not None) else None 170 max_fps = fps_range[1] if (fps_range is not None) else None 171 recording_obj = cam.do_preview_recording_multiple_surfaces( 172 output_surfaces, _VIDEO_DURATION, stabilize, ois, zoom_ratio=zoom_ratio, 173 ae_target_fps_min=min_fps, ae_target_fps_max=max_fps) 174 175 logging.debug('Recorded output path: %s', recording_obj['recordedOutputPath']) 176 logging.debug('Tested quality: %s', recording_obj['quality']) 177 178 # Wait for vibration to stop 179 p.join() 180 181 return recording_obj 182 183 184def verify_preview_stabilization(recording_obj, gyro_events, 185 test_name, log_path, facing, zoom_ratio=None): 186 """Verify the returned recording is properly stabilized. 187 188 Args: 189 recording_obj: Camcorder recording object. 190 gyro_events: Gyroscope events collected while recording. 191 test_name: Name of the test. 192 log_path: Path for the log file. 193 facing: Facing of the camera device. 194 zoom_ratio: Static zoom ratio. None if default zoom. 195 196 Returns: 197 A dictionary containing the maximum gyro angle, the maximum camera angle, 198 and a failure message if the recorded video isn't properly stablilized. 199 """ 200 201 file_name = recording_obj['recordedOutputPath'].split('/')[-1] 202 logging.debug('recorded file name: %s', file_name) 203 video_size = recording_obj['videoSize'] 204 logging.debug('video size: %s', video_size) 205 206 # Get all frames from the video 207 file_list = video_processing_utils.extract_all_frames_from_video( 208 log_path, file_name, _IMG_FORMAT 209 ) 210 frames = [] 211 212 logging.debug('Number of frames %d', len(file_list)) 213 for file in file_list: 214 img_bgr = cv2.imread(os.path.join(log_path, file)) 215 img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB) 216 frames.append(img_rgb / 255) 217 frame_h, frame_w, _ = frames[0].shape 218 logging.debug('Frame size %d x %d', frame_w, frame_h) 219 220 # Extract camera rotations 221 if zoom_ratio: 222 zoom_ratio_suffix = f'{zoom_ratio:.1f}' 223 else: 224 zoom_ratio_suffix = '1' 225 file_name_stem = ( 226 f'{os.path.join(log_path, test_name)}_{video_size}_{zoom_ratio_suffix}x') 227 cam_rots = sensor_fusion_utils.get_cam_rotations( 228 frames[_START_FRAME:], 229 facing, 230 frame_h, 231 file_name_stem, 232 _START_FRAME, 233 stabilized_video=True 234 ) 235 sensor_fusion_utils.plot_camera_rotations(cam_rots, _START_FRAME, 236 video_size, file_name_stem) 237 max_camera_angle = sensor_fusion_utils.calc_max_rotation_angle( 238 cam_rots, 'Camera') 239 240 # Extract gyro rotations 241 sensor_fusion_utils.plot_gyro_events( 242 gyro_events, f'{test_name}_{video_size}_{zoom_ratio_suffix}x', 243 log_path) 244 gyro_rots = sensor_fusion_utils.conv_acceleration_to_movement( 245 gyro_events, _VIDEO_DELAY_TIME) 246 max_gyro_angle = sensor_fusion_utils.calc_max_rotation_angle( 247 gyro_rots, 'Gyro') 248 logging.debug( 249 'Max deflection (degrees) %s: video: %.3f, gyro: %.3f ratio: %.4f', 250 video_size, max_camera_angle, max_gyro_angle, 251 max_camera_angle / max_gyro_angle) 252 253 # Assert phone is moved enough during test 254 if max_gyro_angle < _MIN_PHONE_MOVEMENT_ANGLE: 255 raise AssertionError( 256 f'Phone not moved enough! Movement: {max_gyro_angle}, ' 257 f'THRESH: {_MIN_PHONE_MOVEMENT_ANGLE} degrees') 258 259 w_x_h = video_size.split('x') 260 if int(w_x_h[0])/int(w_x_h[1]) > _ASPECT_RATIO_16_9: 261 preview_stabilization_factor = _PREVIEW_STABILIZATION_FACTOR * 1.1 262 else: 263 preview_stabilization_factor = _PREVIEW_STABILIZATION_FACTOR 264 265 failure_msg = None 266 if max_camera_angle >= max_gyro_angle * preview_stabilization_factor: 267 failure_msg = ( 268 f'{video_size} preview not stabilized enough! ' 269 f'Max preview angle: {max_camera_angle:.3f}, ' 270 f'Max gyro angle: {max_gyro_angle:.3f}, ' 271 f'ratio: {max_camera_angle/max_gyro_angle:.3f} ' 272 f'THRESH: {preview_stabilization_factor}.') 273 # Delete saved frames if the format is a PASS 274 else: 275 for file in file_list: 276 try: 277 os.remove(os.path.join(log_path, file)) 278 except FileNotFoundError: 279 logging.debug('File Not Found: %s', str(file)) 280 logging.debug('Format %s passes, frame images removed', video_size) 281 282 return {'gyro': max_gyro_angle, 'cam': max_camera_angle, 283 'failure': failure_msg} 284 285 286def collect_preview_data_with_zoom(cam, preview_size, zoom_start, 287 zoom_end, step_size, recording_duration_ms, 288 padded_frames=False): 289 """Captures a preview video from the device. 290 291 Captures camera preview frames from the passed device. 292 293 Args: 294 cam: camera object. 295 preview_size: str; preview resolution. ex. '1920x1080'. 296 zoom_start: (float) is the starting zoom ratio during recording. 297 zoom_end: (float) is the ending zoom ratio during recording. 298 step_size: (float) is the step for zoom ratio during recording. 299 recording_duration_ms: preview recording duration in ms. 300 padded_frames: boolean; Whether to add additional frames at the beginning 301 and end of recording to workaround issue with MediaRecorder. 302 303 Returns: 304 recording object as described by cam.do_preview_recording_with_dynamic_zoom. 305 """ 306 recording_obj = cam.do_preview_recording_with_dynamic_zoom( 307 preview_size, 308 stabilize=False, 309 sweep_zoom=(zoom_start, zoom_end, step_size, recording_duration_ms), 310 padded_frames=padded_frames 311 ) 312 logging.debug('Recorded output path: %s', recording_obj['recordedOutputPath']) 313 logging.debug('Tested quality: %s', recording_obj['quality']) 314 return recording_obj 315 316 317def is_aspect_ratio_match(size_str, target_ratio): 318 """Checks if a resolution string matches the target aspect ratio.""" 319 width, height = map(int, size_str.split('x')) 320 return abs(width / height - target_ratio) < _ASPECT_TOL 321 322 323def get_max_preview_test_size(cam, camera_id, aspect_ratio=None, 324 max_tested_area=_PREVIEW_MAX_TESTED_AREA): 325 """Finds the max preview size to be tested. 326 327 If the device supports the _HIGH_RES_SIZE preview size then 328 it uses that for testing, otherwise uses the max supported 329 preview size capped at max_tested_area. 330 331 Args: 332 cam: camera object 333 camera_id: str; camera device id under test 334 aspect_ratio: preferred aspect_ratio For example: '4/3' 335 max_tested_area: area of max preview resolution 336 337 Returns: 338 preview_test_size: str; wxh resolution of the size to be tested 339 """ 340 resolution_to_area = lambda s: int(s.split('x')[0])*int(s.split('x')[1]) 341 supported_preview_sizes = cam.get_all_supported_preview_sizes( 342 camera_id, filter_recordable=True) 343 logging.debug('Resolutions supported by preview and MediaRecorder: %s', 344 supported_preview_sizes) 345 346 if aspect_ratio is None: 347 supported_preview_sizes = [size for size in supported_preview_sizes 348 if resolution_to_area(size) 349 >= video_processing_utils.LOWEST_RES_TESTED_AREA] 350 else: 351 supported_preview_sizes = [size for size in supported_preview_sizes 352 if resolution_to_area(size) 353 >= video_processing_utils.LOWEST_RES_TESTED_AREA 354 and is_aspect_ratio_match(size, aspect_ratio)] 355 356 logging.debug('Supported preview resolutions: %s', supported_preview_sizes) 357 358 if _HIGH_RES_SIZE in supported_preview_sizes: 359 preview_test_size = _HIGH_RES_SIZE 360 else: 361 capped_supported_preview_sizes = [ 362 size 363 for size in supported_preview_sizes 364 if ( 365 resolution_to_area(size) <= max_tested_area 366 and resolution_to_area(size) >= _PREVIEW_MIN_TESTED_AREA 367 ) 368 ] 369 logging.debug('Capped preview resolutions: %s', 370 capped_supported_preview_sizes) 371 preview_test_size = capped_supported_preview_sizes[-1] 372 373 logging.debug('Selected preview resolution: %s', preview_test_size) 374 375 return preview_test_size 376 377 378def get_max_extension_preview_test_size(cam, camera_id, extension): 379 """Finds the max preview size for an extension to be tested. 380 381 If the device supports the _HIGH_RES_SIZE preview size then 382 it uses that for testing, otherwise uses the max supported 383 preview size capped at _PREVIEW_MAX_TESTED_AREA. 384 385 Args: 386 cam: camera object 387 camera_id: str; camera device id under test 388 extension: int; camera extension mode under test 389 390 Returns: 391 preview_test_size: str; wxh resolution of the size to be tested 392 """ 393 resolution_to_area = lambda s: int(s.split('x')[0])*int(s.split('x')[1]) 394 supported_preview_sizes = ( 395 cam.get_supported_extension_preview_sizes(camera_id, extension)) 396 supported_preview_sizes = [size for size in supported_preview_sizes 397 if resolution_to_area(size) 398 >= video_processing_utils.LOWEST_RES_TESTED_AREA] 399 logging.debug('Supported preview resolutions for extension %d: %s', 400 extension, supported_preview_sizes) 401 402 if _HIGH_RES_SIZE in supported_preview_sizes: 403 preview_test_size = _HIGH_RES_SIZE 404 else: 405 capped_supported_preview_sizes = [ 406 size 407 for size in supported_preview_sizes 408 if ( 409 resolution_to_area(size) <= _PREVIEW_MAX_TESTED_AREA 410 and resolution_to_area(size) >= _PREVIEW_MIN_TESTED_AREA 411 ) 412 ] 413 preview_test_size = capped_supported_preview_sizes[-1] 414 415 logging.debug('Selected preview resolution: %s', preview_test_size) 416 417 return preview_test_size 418 419 420def mirror_preview_image_by_sensor_orientation( 421 sensor_orientation, input_preview_img): 422 """If testing front camera, mirror preview image to match camera capture. 423 424 Preview are flipped on device's natural orientation, so for sensor 425 orientation 90 or 270, it is up or down. Sensor orientation 0 or 180 426 is left or right. 427 428 Args: 429 sensor_orientation: integer; display orientation in natural position. 430 input_preview_img: numpy array; image extracted from preview recording. 431 Returns: 432 output_preview_img: numpy array; flipped according to natural orientation. 433 """ 434 if sensor_orientation in _NATURAL_ORIENTATION_PORTRAIT: 435 # Opencv expects a numpy array but np.flip generates a 'view' which 436 # doesn't work with opencv. ndarray.copy forces copy instead of view. 437 output_preview_img = np.ndarray.copy(np.flipud(input_preview_img)) 438 logging.debug( 439 'Found sensor orientation %d, flipping up down', sensor_orientation) 440 else: 441 output_preview_img = np.ndarray.copy(np.fliplr(input_preview_img)) 442 logging.debug( 443 'Found sensor orientation %d, flipping left right', sensor_orientation) 444 445 return output_preview_img 446 447 448def is_image_green(image_path): 449 """Checks if an image is mostly green. 450 451 Checks if an image is mostly green by ensuring green is dominant 452 and red/blue values are low. 453 454 Args: 455 image_path: str; The path to the image file. 456 457 Returns: 458 bool: True if mostly green, False otherwise. 459 """ 460 461 image = cv2.imread(image_path) 462 463 green_pixels = ((image[:, :, 1] > _GREEN_TOL) & 464 (image[:, :, 0] < _RED_BLUE_TOL) & 465 (image[:, :, 2] < _RED_BLUE_TOL)).sum() 466 467 green_percentage = (green_pixels / (image.shape[0] * image.shape[1])) * 100 468 469 if green_percentage >= _GREEN_PERCENT: 470 return True 471 else: 472 return False 473 474 475def preview_over_zoom_range(dut, cam, preview_size, z_min, z_max, z_step_size, 476 log_path): 477 """Captures a preview video from the device over zoom range. 478 479 Captures camera preview frames at various zoom level in zoom range. 480 481 Args: 482 dut: device under test 483 cam: camera object 484 preview_size: str; preview resolution. ex. '1920x1080' 485 z_min: minimum zoom for preview capture 486 z_max: maximum zoom for preview capture 487 z_step_size: zoom step size from min to max 488 log_path: str; path for video file directory 489 490 Returns: 491 capture_results: total capture results of each frame 492 file_list: file name for each frame 493 """ 494 logging.debug('z_min : %.2f, z_max = %.2f, z_step_size = %.2f', 495 z_min, z_max, z_step_size) 496 497 # Converge 3A 498 cam.do_3a() 499 500 # recording preview 501 # TODO: b/350821827 - encode time stamps in camera frames instead of 502 # padded green frams 503 # MediaRecorder on some devices drop last few frames. To solve this issue 504 # add green frames as padding at the end of recorded camera frames. This way 505 # green buffer frames would be droped by MediaRecorder instead of actual 506 # frames. Later these green padded frames are removed. 507 preview_rec_obj = collect_preview_data_with_zoom( 508 cam, preview_size, z_min, z_max, z_step_size, 509 _PREVIEW_DURATION, padded_frames=True) 510 511 preview_file_name = its_session_utils.pull_file_from_dut( 512 dut, preview_rec_obj['recordedOutputPath'], log_path) 513 514 logging.debug('recorded video size : %s', 515 str(preview_rec_obj['videoSize'])) 516 517 # Extract frames as png from mp4 preview recording 518 file_list = video_processing_utils.extract_all_frames_from_video( 519 log_path, preview_file_name, _IMG_FORMAT 520 ) 521 522 first_camera_frame_idx = 0 523 last_camera_frame_idx = len(file_list) 524 525 # Find index of the first-non green frame 526 for (idx, file_name) in enumerate(file_list): 527 file_path = os.path.join(log_path, file_name) 528 if is_image_green(file_path): 529 its_session_utils.remove_file(file_path) 530 logging.debug('Removed green file %s', file_name) 531 else: 532 logging.debug('First camera frame: %s', file_name) 533 first_camera_frame_idx = idx 534 break 535 536 # Find index of last non-green frame 537 for (idx, file_name) in reversed(list(enumerate(file_list))): 538 file_path = os.path.join(log_path, file_name) 539 if is_image_green(file_path): 540 its_session_utils.remove_file(file_path) 541 logging.debug('Removed green file %s', file_name) 542 else: 543 logging.debug('Last camera frame: %s', file_name) 544 last_camera_frame_idx = idx 545 break 546 547 logging.debug('start idx = %d -- end idx = %d', first_camera_frame_idx, 548 last_camera_frame_idx) 549 file_list = file_list[first_camera_frame_idx:last_camera_frame_idx+1] 550 551 # Raise error if capture result and frame count doesn't match 552 capture_results = preview_rec_obj['captureMetadata'] 553 extra_capture_result_count = len(capture_results) - len(file_list) 554 logging.debug('Number of frames %d', len(file_list)) 555 if extra_capture_result_count != 0: 556 its_session_utils.remove_frame_files(log_path) 557 e_msg = (f'Number of CaptureResult ({len(capture_results)}) ' 558 f'vs number of Frames ({len(file_list)}) count mismatch.' 559 ' Retry Test.') 560 raise AssertionError(e_msg) 561 562 # skip frames which might not have 3A converged 563 capture_results = capture_results[_SKIP_INITIAL_FRAMES:] 564 skipped_files = file_list[:_SKIP_INITIAL_FRAMES] 565 file_list = file_list[_SKIP_INITIAL_FRAMES:] 566 567 # delete skipped files 568 for file_name in skipped_files: 569 its_session_utils.remove_file(os.path.join(log_path, file_name)) 570 571 return capture_results, file_list 572