xref: /aosp_15_r20/cts/apps/CameraITS/utils/preview_processing_utils.py (revision b7c941bb3fa97aba169d73cee0bed2de8ac964bf)
1# Copyright 2024 The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Utility functions for verifying preview stabilization.
15"""
16
17import cv2
18import logging
19import os
20import threading
21import time
22
23import numpy as np
24
25import its_session_utils
26import sensor_fusion_utils
27import video_processing_utils
28
29_AREA_720P_VIDEO = 1280 * 720
30_ASPECT_RATIO_16_9 = 16/9  # determine if preview fmt > 16:9
31_ASPECT_TOL = 0.01
32_GREEN_TOL = 200  # 200 out of 255 Green value in RGB
33_GREEN_PERCENT = 95
34_HIGH_RES_SIZE = '3840x2160'  # Resolution for 4K quality
35_IMG_FORMAT = 'png'
36_MIN_PHONE_MOVEMENT_ANGLE = 5  # degrees
37_NATURAL_ORIENTATION_PORTRAIT = (90, 270)  # orientation in "normal position"
38_NUM_ROTATIONS = 24
39_PREVIEW_DURATION = 400  # milliseconds
40_PREVIEW_MAX_TESTED_AREA = 1920 * 1440
41_PREVIEW_MIN_TESTED_AREA = 320 * 240
42_PREVIEW_STABILIZATION_FACTOR = 0.7  # 70% of gyro movement allowed
43_RED_BLUE_TOL = 20  # 20 out of 255 Red or Blue value in RGB
44_SKIP_INITIAL_FRAMES = 15
45_START_FRAME = 30  # give 3A some frames to warm up
46_VIDEO_DELAY_TIME = 5.5  # seconds
47_VIDEO_DURATION = 5.5  # seconds
48
49
50def get_720p_or_above_size(supported_preview_sizes):
51  """Returns the smallest size above or equal to 720p in preview and video.
52
53  If the largest preview size is under 720P, returns the largest value.
54
55  Args:
56    supported_preview_sizes: list; preview sizes.
57      e.g. ['1920x960', '1600x1200', '1920x1080']
58  Returns:
59    smallest size >= 720p video format
60  """
61
62  size_to_area = lambda s: int(s.split('x')[0])*int(s.split('x')[1])
63  smallest_area = float('inf')
64  smallest_720p_or_above_size = ''
65  largest_supported_preview_size = ''
66  largest_area = 0
67  for size in supported_preview_sizes:
68    area = size_to_area(size)
69    if smallest_area > area >= _AREA_720P_VIDEO:
70      smallest_area = area
71      smallest_720p_or_above_size = size
72    else:
73      if area > largest_area:
74        largest_area = area
75        largest_supported_preview_size = size
76
77  if largest_area > _AREA_720P_VIDEO:
78    logging.debug('Smallest 720p or above size: %s',
79                  smallest_720p_or_above_size)
80    return smallest_720p_or_above_size
81  else:
82    logging.debug('Largest supported preview size: %s',
83                  largest_supported_preview_size)
84    return largest_supported_preview_size
85
86
87def collect_data(cam, tablet_device, preview_size, stabilize, rot_rig,
88                 zoom_ratio=None, fps_range=None, hlg10=False, ois=False):
89  """Capture a new set of data from the device.
90
91  Captures camera preview frames while the user is moving the device in
92  the prescribed manner.
93
94  Args:
95    cam: camera object.
96    tablet_device: boolean; based on config file.
97    preview_size: str; preview stream resolution. ex. '1920x1080'
98    stabilize: boolean; whether preview stabilization is ON.
99    rot_rig: dict with 'cntl' and 'ch' defined.
100    zoom_ratio: float; static zoom ratio. None if default zoom.
101    fps_range: list; target fps range.
102    hlg10: boolean; whether to capture hlg10 output.
103    ois: boolean; whether optical image stabilization is ON.
104  Returns:
105    recording object; a dictionary containing output path, video size, etc.
106  """
107
108  output_surfaces = cam.preview_surface(preview_size, hlg10)
109  return collect_data_with_surfaces(cam, tablet_device, output_surfaces,
110                                    stabilize, rot_rig, zoom_ratio,
111                                    fps_range, ois)
112
113
114def collect_data_with_surfaces(cam, tablet_device, output_surfaces,
115                               stabilize, rot_rig, zoom_ratio=None,
116                               fps_range=None, ois=False):
117  """Capture a new set of data from the device.
118
119  Captures camera preview frames while the user is moving the device in
120  the prescribed manner.
121
122  Args:
123    cam: camera object.
124    tablet_device: boolean; based on config file.
125    output_surfaces: list of dict; The list of output surfaces configured for
126      the recording. Only the first surface is used for recording; the rest are
127      configured, but not requested.
128    stabilize: boolean; whether preview stabilization is ON.
129    rot_rig: dict with 'cntl' and 'ch' defined.
130    zoom_ratio: float; static zoom ratio. None if default zoom.
131    fps_range: list; target fps range.
132    ois: boolean; whether optical image stabilization is ON.
133  Returns:
134    recording object; a dictionary containing output path, video size, etc.
135  """
136
137  logging.debug('Starting sensor event collection')
138  serial_port = None
139  if rot_rig['cntl'].lower() == sensor_fusion_utils.ARDUINO_STRING.lower():
140    # identify port
141    serial_port = sensor_fusion_utils.serial_port_def(
142        sensor_fusion_utils.ARDUINO_STRING)
143    # send test cmd to Arduino until cmd returns properly
144    sensor_fusion_utils.establish_serial_comm(serial_port)
145  # Start camera vibration
146  if tablet_device:
147    servo_speed = sensor_fusion_utils.ARDUINO_SERVO_SPEED_STABILIZATION_TABLET
148  else:
149    servo_speed = sensor_fusion_utils.ARDUINO_SERVO_SPEED_STABILIZATION
150  p = threading.Thread(
151      target=sensor_fusion_utils.rotation_rig,
152      args=(
153          rot_rig['cntl'],
154          rot_rig['ch'],
155          _NUM_ROTATIONS,
156          sensor_fusion_utils.ARDUINO_ANGLES_STABILIZATION,
157          servo_speed,
158          sensor_fusion_utils.ARDUINO_MOVE_TIME_STABILIZATION,
159          serial_port,
160      ),
161  )
162  p.start()
163
164  cam.start_sensor_events()
165  # Allow time for rig to start moving
166  time.sleep(_VIDEO_DELAY_TIME)
167
168  # Record video and return recording object
169  min_fps = fps_range[0] if (fps_range is not None) else None
170  max_fps = fps_range[1] if (fps_range is not None) else None
171  recording_obj = cam.do_preview_recording_multiple_surfaces(
172      output_surfaces, _VIDEO_DURATION, stabilize, ois, zoom_ratio=zoom_ratio,
173      ae_target_fps_min=min_fps, ae_target_fps_max=max_fps)
174
175  logging.debug('Recorded output path: %s', recording_obj['recordedOutputPath'])
176  logging.debug('Tested quality: %s', recording_obj['quality'])
177
178  # Wait for vibration to stop
179  p.join()
180
181  return recording_obj
182
183
184def verify_preview_stabilization(recording_obj, gyro_events,
185                                 test_name, log_path, facing, zoom_ratio=None):
186  """Verify the returned recording is properly stabilized.
187
188  Args:
189    recording_obj: Camcorder recording object.
190    gyro_events: Gyroscope events collected while recording.
191    test_name: Name of the test.
192    log_path: Path for the log file.
193    facing: Facing of the camera device.
194    zoom_ratio: Static zoom ratio. None if default zoom.
195
196  Returns:
197    A dictionary containing the maximum gyro angle, the maximum camera angle,
198    and a failure message if the recorded video isn't properly stablilized.
199  """
200
201  file_name = recording_obj['recordedOutputPath'].split('/')[-1]
202  logging.debug('recorded file name: %s', file_name)
203  video_size = recording_obj['videoSize']
204  logging.debug('video size: %s', video_size)
205
206  # Get all frames from the video
207  file_list = video_processing_utils.extract_all_frames_from_video(
208      log_path, file_name, _IMG_FORMAT
209  )
210  frames = []
211
212  logging.debug('Number of frames %d', len(file_list))
213  for file in file_list:
214    img_bgr = cv2.imread(os.path.join(log_path, file))
215    img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
216    frames.append(img_rgb / 255)
217  frame_h, frame_w, _ = frames[0].shape
218  logging.debug('Frame size %d x %d', frame_w, frame_h)
219
220  # Extract camera rotations
221  if zoom_ratio:
222    zoom_ratio_suffix = f'{zoom_ratio:.1f}'
223  else:
224    zoom_ratio_suffix = '1'
225  file_name_stem = (
226      f'{os.path.join(log_path, test_name)}_{video_size}_{zoom_ratio_suffix}x')
227  cam_rots = sensor_fusion_utils.get_cam_rotations(
228      frames[_START_FRAME:],
229      facing,
230      frame_h,
231      file_name_stem,
232      _START_FRAME,
233      stabilized_video=True
234  )
235  sensor_fusion_utils.plot_camera_rotations(cam_rots, _START_FRAME,
236                                            video_size, file_name_stem)
237  max_camera_angle = sensor_fusion_utils.calc_max_rotation_angle(
238      cam_rots, 'Camera')
239
240  # Extract gyro rotations
241  sensor_fusion_utils.plot_gyro_events(
242      gyro_events, f'{test_name}_{video_size}_{zoom_ratio_suffix}x',
243      log_path)
244  gyro_rots = sensor_fusion_utils.conv_acceleration_to_movement(
245      gyro_events, _VIDEO_DELAY_TIME)
246  max_gyro_angle = sensor_fusion_utils.calc_max_rotation_angle(
247      gyro_rots, 'Gyro')
248  logging.debug(
249      'Max deflection (degrees) %s: video: %.3f, gyro: %.3f ratio: %.4f',
250      video_size, max_camera_angle, max_gyro_angle,
251      max_camera_angle / max_gyro_angle)
252
253  # Assert phone is moved enough during test
254  if max_gyro_angle < _MIN_PHONE_MOVEMENT_ANGLE:
255    raise AssertionError(
256        f'Phone not moved enough! Movement: {max_gyro_angle}, '
257        f'THRESH: {_MIN_PHONE_MOVEMENT_ANGLE} degrees')
258
259  w_x_h = video_size.split('x')
260  if int(w_x_h[0])/int(w_x_h[1]) > _ASPECT_RATIO_16_9:
261    preview_stabilization_factor = _PREVIEW_STABILIZATION_FACTOR * 1.1
262  else:
263    preview_stabilization_factor = _PREVIEW_STABILIZATION_FACTOR
264
265  failure_msg = None
266  if max_camera_angle >= max_gyro_angle * preview_stabilization_factor:
267    failure_msg = (
268        f'{video_size} preview not stabilized enough! '
269        f'Max preview angle:  {max_camera_angle:.3f}, '
270        f'Max gyro angle: {max_gyro_angle:.3f}, '
271        f'ratio: {max_camera_angle/max_gyro_angle:.3f} '
272        f'THRESH: {preview_stabilization_factor}.')
273  # Delete saved frames if the format is a PASS
274  else:
275    for file in file_list:
276      try:
277        os.remove(os.path.join(log_path, file))
278      except FileNotFoundError:
279        logging.debug('File Not Found: %s', str(file))
280    logging.debug('Format %s passes, frame images removed', video_size)
281
282  return {'gyro': max_gyro_angle, 'cam': max_camera_angle,
283          'failure': failure_msg}
284
285
286def collect_preview_data_with_zoom(cam, preview_size, zoom_start,
287                                   zoom_end, step_size, recording_duration_ms,
288                                   padded_frames=False):
289  """Captures a preview video from the device.
290
291  Captures camera preview frames from the passed device.
292
293  Args:
294    cam: camera object.
295    preview_size: str; preview resolution. ex. '1920x1080'.
296    zoom_start: (float) is the starting zoom ratio during recording.
297    zoom_end: (float) is the ending zoom ratio during recording.
298    step_size: (float) is the step for zoom ratio during recording.
299    recording_duration_ms: preview recording duration in ms.
300    padded_frames: boolean; Whether to add additional frames at the beginning
301      and end of recording to workaround issue with MediaRecorder.
302
303  Returns:
304    recording object as described by cam.do_preview_recording_with_dynamic_zoom.
305  """
306  recording_obj = cam.do_preview_recording_with_dynamic_zoom(
307      preview_size,
308      stabilize=False,
309      sweep_zoom=(zoom_start, zoom_end, step_size, recording_duration_ms),
310      padded_frames=padded_frames
311  )
312  logging.debug('Recorded output path: %s', recording_obj['recordedOutputPath'])
313  logging.debug('Tested quality: %s', recording_obj['quality'])
314  return recording_obj
315
316
317def is_aspect_ratio_match(size_str, target_ratio):
318  """Checks if a resolution string matches the target aspect ratio."""
319  width, height = map(int, size_str.split('x'))
320  return abs(width / height - target_ratio) < _ASPECT_TOL
321
322
323def get_max_preview_test_size(cam, camera_id, aspect_ratio=None,
324                              max_tested_area=_PREVIEW_MAX_TESTED_AREA):
325  """Finds the max preview size to be tested.
326
327  If the device supports the _HIGH_RES_SIZE preview size then
328  it uses that for testing, otherwise uses the max supported
329  preview size capped at max_tested_area.
330
331  Args:
332    cam: camera object
333    camera_id: str; camera device id under test
334    aspect_ratio: preferred aspect_ratio For example: '4/3'
335    max_tested_area: area of max preview resolution
336
337  Returns:
338    preview_test_size: str; wxh resolution of the size to be tested
339  """
340  resolution_to_area = lambda s: int(s.split('x')[0])*int(s.split('x')[1])
341  supported_preview_sizes = cam.get_all_supported_preview_sizes(
342      camera_id, filter_recordable=True)
343  logging.debug('Resolutions supported by preview and MediaRecorder: %s',
344                supported_preview_sizes)
345
346  if aspect_ratio is None:
347    supported_preview_sizes = [size for size in supported_preview_sizes
348                               if resolution_to_area(size)
349                               >= video_processing_utils.LOWEST_RES_TESTED_AREA]
350  else:
351    supported_preview_sizes = [size for size in supported_preview_sizes
352                               if resolution_to_area(size)
353                               >= video_processing_utils.LOWEST_RES_TESTED_AREA
354                               and is_aspect_ratio_match(size, aspect_ratio)]
355
356  logging.debug('Supported preview resolutions: %s', supported_preview_sizes)
357
358  if _HIGH_RES_SIZE in supported_preview_sizes:
359    preview_test_size = _HIGH_RES_SIZE
360  else:
361    capped_supported_preview_sizes = [
362        size
363        for size in supported_preview_sizes
364        if (
365            resolution_to_area(size) <= max_tested_area
366            and resolution_to_area(size) >= _PREVIEW_MIN_TESTED_AREA
367        )
368    ]
369    logging.debug('Capped preview resolutions: %s',
370                  capped_supported_preview_sizes)
371    preview_test_size = capped_supported_preview_sizes[-1]
372
373  logging.debug('Selected preview resolution: %s', preview_test_size)
374
375  return preview_test_size
376
377
378def get_max_extension_preview_test_size(cam, camera_id, extension):
379  """Finds the max preview size for an extension to be tested.
380
381  If the device supports the _HIGH_RES_SIZE preview size then
382  it uses that for testing, otherwise uses the max supported
383  preview size capped at _PREVIEW_MAX_TESTED_AREA.
384
385  Args:
386    cam: camera object
387    camera_id: str; camera device id under test
388    extension: int; camera extension mode under test
389
390  Returns:
391    preview_test_size: str; wxh resolution of the size to be tested
392  """
393  resolution_to_area = lambda s: int(s.split('x')[0])*int(s.split('x')[1])
394  supported_preview_sizes = (
395      cam.get_supported_extension_preview_sizes(camera_id, extension))
396  supported_preview_sizes = [size for size in supported_preview_sizes
397                             if resolution_to_area(size)
398                             >= video_processing_utils.LOWEST_RES_TESTED_AREA]
399  logging.debug('Supported preview resolutions for extension %d: %s',
400                extension, supported_preview_sizes)
401
402  if _HIGH_RES_SIZE in supported_preview_sizes:
403    preview_test_size = _HIGH_RES_SIZE
404  else:
405    capped_supported_preview_sizes = [
406        size
407        for size in supported_preview_sizes
408        if (
409            resolution_to_area(size) <= _PREVIEW_MAX_TESTED_AREA
410            and resolution_to_area(size) >= _PREVIEW_MIN_TESTED_AREA
411        )
412    ]
413    preview_test_size = capped_supported_preview_sizes[-1]
414
415  logging.debug('Selected preview resolution: %s', preview_test_size)
416
417  return preview_test_size
418
419
420def mirror_preview_image_by_sensor_orientation(
421    sensor_orientation, input_preview_img):
422  """If testing front camera, mirror preview image to match camera capture.
423
424  Preview are flipped on device's natural orientation, so for sensor
425  orientation 90 or 270, it is up or down. Sensor orientation 0 or 180
426  is left or right.
427
428  Args:
429    sensor_orientation: integer; display orientation in natural position.
430    input_preview_img: numpy array; image extracted from preview recording.
431  Returns:
432    output_preview_img: numpy array; flipped according to natural orientation.
433  """
434  if sensor_orientation in _NATURAL_ORIENTATION_PORTRAIT:
435    # Opencv expects a numpy array but np.flip generates a 'view' which
436    # doesn't work with opencv. ndarray.copy forces copy instead of view.
437    output_preview_img = np.ndarray.copy(np.flipud(input_preview_img))
438    logging.debug(
439        'Found sensor orientation %d, flipping up down', sensor_orientation)
440  else:
441    output_preview_img = np.ndarray.copy(np.fliplr(input_preview_img))
442    logging.debug(
443        'Found sensor orientation %d, flipping left right', sensor_orientation)
444
445  return output_preview_img
446
447
448def is_image_green(image_path):
449  """Checks if an image is mostly green.
450
451  Checks if an image is mostly green by ensuring green is dominant
452  and red/blue values are low.
453
454  Args:
455    image_path: str; The path to the image file.
456
457  Returns:
458    bool: True if mostly green, False otherwise.
459  """
460
461  image = cv2.imread(image_path)
462
463  green_pixels = ((image[:, :, 1] > _GREEN_TOL) &
464                  (image[:, :, 0] < _RED_BLUE_TOL) &
465                  (image[:, :, 2] < _RED_BLUE_TOL)).sum()
466
467  green_percentage = (green_pixels / (image.shape[0] * image.shape[1])) * 100
468
469  if green_percentage >= _GREEN_PERCENT:
470    return True
471  else:
472    return False
473
474
475def preview_over_zoom_range(dut, cam, preview_size, z_min, z_max, z_step_size,
476                            log_path):
477  """Captures a preview video from the device over zoom range.
478
479  Captures camera preview frames at various zoom level in zoom range.
480
481  Args:
482    dut: device under test
483    cam: camera object
484    preview_size: str; preview resolution. ex. '1920x1080'
485    z_min: minimum zoom for preview capture
486    z_max: maximum zoom for preview capture
487    z_step_size: zoom step size from min to max
488    log_path: str; path for video file directory
489
490  Returns:
491    capture_results: total capture results of each frame
492    file_list: file name for each frame
493  """
494  logging.debug('z_min : %.2f, z_max = %.2f, z_step_size = %.2f',
495                z_min, z_max, z_step_size)
496
497  # Converge 3A
498  cam.do_3a()
499
500  # recording preview
501  # TODO: b/350821827 - encode time stamps in camera frames instead of
502  #                     padded green frams
503  # MediaRecorder on some devices drop last few frames. To solve this issue
504  # add green frames as padding at the end of recorded camera frames. This way
505  # green buffer frames would be droped by MediaRecorder instead of actual
506  # frames. Later these green padded frames are removed.
507  preview_rec_obj = collect_preview_data_with_zoom(
508      cam, preview_size, z_min, z_max, z_step_size,
509      _PREVIEW_DURATION, padded_frames=True)
510
511  preview_file_name = its_session_utils.pull_file_from_dut(
512      dut, preview_rec_obj['recordedOutputPath'], log_path)
513
514  logging.debug('recorded video size : %s',
515                str(preview_rec_obj['videoSize']))
516
517  # Extract frames as png from mp4 preview recording
518  file_list = video_processing_utils.extract_all_frames_from_video(
519      log_path, preview_file_name, _IMG_FORMAT
520  )
521
522  first_camera_frame_idx = 0
523  last_camera_frame_idx = len(file_list)
524
525  # Find index of the first-non green frame
526  for (idx, file_name) in enumerate(file_list):
527    file_path = os.path.join(log_path, file_name)
528    if is_image_green(file_path):
529      its_session_utils.remove_file(file_path)
530      logging.debug('Removed green file %s', file_name)
531    else:
532      logging.debug('First camera frame: %s', file_name)
533      first_camera_frame_idx = idx
534      break
535
536  # Find index of last non-green frame
537  for (idx, file_name) in reversed(list(enumerate(file_list))):
538    file_path = os.path.join(log_path, file_name)
539    if is_image_green(file_path):
540      its_session_utils.remove_file(file_path)
541      logging.debug('Removed green file %s', file_name)
542    else:
543      logging.debug('Last camera frame: %s', file_name)
544      last_camera_frame_idx = idx
545      break
546
547  logging.debug('start idx = %d -- end idx = %d', first_camera_frame_idx,
548                last_camera_frame_idx)
549  file_list = file_list[first_camera_frame_idx:last_camera_frame_idx+1]
550
551  # Raise error if capture result and frame count doesn't match
552  capture_results = preview_rec_obj['captureMetadata']
553  extra_capture_result_count = len(capture_results) - len(file_list)
554  logging.debug('Number of frames %d', len(file_list))
555  if extra_capture_result_count != 0:
556    its_session_utils.remove_frame_files(log_path)
557    e_msg = (f'Number of CaptureResult ({len(capture_results)}) '
558             f'vs number of Frames ({len(file_list)}) count mismatch.'
559             ' Retry Test.')
560    raise AssertionError(e_msg)
561
562  # skip frames which might not have 3A converged
563  capture_results = capture_results[_SKIP_INITIAL_FRAMES:]
564  skipped_files = file_list[:_SKIP_INITIAL_FRAMES]
565  file_list = file_list[_SKIP_INITIAL_FRAMES:]
566
567  # delete skipped files
568  for file_name in skipped_files:
569    its_session_utils.remove_file(os.path.join(log_path, file_name))
570
571  return capture_results, file_list
572