xref: /aosp_15_r20/system/apex/tools/apex_compression_test.py (revision 33f3758387333dbd2962d7edbd98681940d895da)
1#!/usr/bin/env python
2#
3# Copyright (C) 2020 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17"""Unit tests for apex_compression_tool."""
18import hashlib
19import logging
20import os
21import shutil
22import stat
23import subprocess
24import tempfile
25import unittest
26from importlib import resources
27from zipfile import ZipFile, ZIP_STORED, ZIP_DEFLATED
28
29import apex_manifest_pb2
30
31logger = logging.getLogger(__name__)
32
33TEST_APEX = 'com.android.example.apex'
34
35# In order to debug test failures, set DEBUG_TEST to True and run the test from
36# local workstation bypassing atest, e.g.:
37# $ m apex_compression_tool_test && \
38#   out/host/linux-x86/nativetest64/apex_compression_tool_test/\
39#   apex_compression_tool_test
40#
41# the test will print out the command used, and the temporary files used by the
42# test.
43DEBUG_TEST = False
44
45
46def run(args, verbose=None, **kwargs):
47  """Creates and returns a subprocess.Popen object.
48
49  Args:
50    args: The command represented as a list of strings.
51    verbose: Whether the commands should be shown. Default to the global
52        verbosity if unspecified.
53    kwargs: Any additional args to be passed to subprocess.Popen(), such as env,
54        stdin, etc. stdout and stderr will default to subprocess.PIPE and
55        subprocess.STDOUT respectively unless caller specifies any of them.
56        universal_newlines will default to True, as most of the users in
57        releasetools expect string output.
58
59  Returns:
60    A subprocess.Popen object.
61  """
62  if 'stdout' not in kwargs and 'stderr' not in kwargs:
63    kwargs['stdout'] = subprocess.PIPE
64    kwargs['stderr'] = subprocess.STDOUT
65  if 'universal_newlines' not in kwargs:
66    kwargs['universal_newlines'] = True
67  if DEBUG_TEST:
68    print('\nRunning: \n%s\n' % ' '.join(args))
69  # Don't log any if caller explicitly says so.
70  if verbose:
71    logger.info('  Running: \'%s\'', ' '.join(args))
72  return subprocess.Popen(args, **kwargs)
73
74
75def run_and_check_output(args, verbose=None, **kwargs):
76  """Runs the given command and returns the output.
77
78  Args:
79    args: The command represented as a list of strings.
80    verbose: Whether the commands should be shown. Default to the global
81        verbosity if unspecified.
82    kwargs: Any additional args to be passed to subprocess.Popen(), such as env,
83        stdin, etc. stdout and stderr will default to subprocess.PIPE and
84        subprocess.STDOUT respectively unless caller specifies any of them.
85
86  Returns:
87    The output string.
88
89  Raises:
90    ExternalError: On non-zero exit from the command.
91  """
92
93  proc = run(args, verbose=verbose, **kwargs)
94  output, _ = proc.communicate()
95  if output is None:
96    output = ''
97  # Don't log any if caller explicitly says so.
98  if verbose:
99    logger.info('%s', output.rstrip())
100  if proc.returncode != 0:
101    raise RuntimeError(
102        "Failed to run command '{}' (exit code {}):\n{}".format(
103            args, proc.returncode, output))
104  return output
105
106
107def get_current_dir():
108  """Returns the current dir, relative to the script dir."""
109  # The script dir is the one we want, which could be different from pwd.
110  current_dir = os.path.dirname(os.path.realpath(__file__))
111  return current_dir
112
113
114def get_sha1sum(file_path):
115  h = hashlib.sha256()
116
117  with open(file_path, 'rb') as file:
118    while True:
119      # Reading is buffered, so we can read smaller chunks.
120      chunk = file.read(h.block_size)
121      if not chunk:
122        break
123      h.update(chunk)
124
125  return h.hexdigest()
126
127
128class ApexCompressionTest(unittest.TestCase):
129  def setUp(self):
130    self._to_cleanup = []
131    self._get_host_tools()
132
133  def tearDown(self):
134    if not DEBUG_TEST:
135      for i in self._to_cleanup:
136        if os.path.isdir(i):
137          shutil.rmtree(i, ignore_errors=True)
138        else:
139          os.remove(i)
140      del self._to_cleanup[:]
141    else:
142      print('Cleanup: ' + str(self._to_cleanup))
143
144  def _get_host_tools(self):
145    dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_host_tools_")
146    self._to_cleanup.append(dir_name)
147    for tool in ["avbtool", "conv_apex_manifest", "apex_compression_tool", "deapexer", "soong_zip"]:
148      with (
149        resources.files("apex_compression_test").joinpath(tool).open('rb') as tool_resource,
150        open(os.path.join(dir_name, tool), 'wb') as f
151      ):
152        shutil.copyfileobj(tool_resource, f)
153      os.chmod(os.path.join(dir_name, tool), stat.S_IRUSR | stat.S_IXUSR)
154    os.environ['APEX_COMPRESSION_TOOL_PATH'] = dir_name
155    path = dir_name
156    if "PATH" in os.environ:
157        path += ":" + os.environ["PATH"]
158    os.environ["PATH"] = path
159
160  def _get_test_apex(self):
161    tmpdir = tempfile.mkdtemp()
162    self._to_cleanup.append(tmpdir)
163    apexPath = os.path.join(tmpdir, TEST_APEX + '.apex')
164    with (
165      resources.files('apex_compression_test').joinpath(TEST_APEX + '.apex').open('rb') as f,
166      open(apexPath, 'wb') as f2,
167    ):
168      shutil.copyfileobj(f, f2)
169
170    return apexPath
171
172  def _get_container_files(self, apex_file_path):
173    dir_name = tempfile.mkdtemp(
174        prefix=self._testMethodName + '_container_files_')
175    self._to_cleanup.append(dir_name)
176    with ZipFile(apex_file_path, 'r') as zip_obj:
177      zip_obj.extractall(path=dir_name)
178    files = {}
179    for i in ['apex_manifest.json', 'apex_manifest.pb', 'apex_pubkey',
180              'apex_build_info.pb', 'apex_payload.img', 'apex_payload.zip',
181              'AndroidManifest.xml', 'original_apex']:
182      file_path = os.path.join(dir_name, i)
183      if os.path.exists(file_path):
184        files[i] = file_path
185
186    image_file = files.get('apex_payload.img', None)
187    if image_file is None:
188      image_file = files.get('apex_payload.zip', None)
189    else:
190      files['apex_payload'] = image_file
191      # Also retrieve the root digest of the image
192      avbtool_cmd = ['avbtool',
193        'print_partition_digests', '--image', files['apex_payload']]
194      # avbtool_cmd output has format "<name>: <value>"
195      files['digest'] = run_and_check_output(
196        avbtool_cmd, True).split(': ')[1].strip()
197
198    return files
199
200  def _get_manifest_string(self, manifest_path):
201    cmd = ['conv_apex_manifest']
202    cmd.extend([
203        'print',
204        manifest_path
205    ])
206    return run_and_check_output(cmd, 'True')
207
208  # Mutates the manifest located at |manifest_path|
209  def _unset_original_apex_digest(self, manifest_path):
210    # Open the protobuf
211    with open(manifest_path, 'rb') as f:
212      pb = apex_manifest_pb2.ApexManifest()
213      pb.ParseFromString(f.read())
214    pb.ClearField('capexMetadata')
215    with open(manifest_path, 'wb') as f:
216      f.write(pb.SerializeToString())
217
218  def _compress_apex(self, uncompressed_apex_fp):
219    """Returns file path to compressed APEX"""
220    fd, compressed_apex_fp = tempfile.mkstemp(
221        prefix=self._testMethodName + '_compressed_',
222        suffix='.capex')
223    os.close(fd)
224    self._to_cleanup.append(compressed_apex_fp)
225    run_and_check_output([
226        'apex_compression_tool',
227        'compress',
228        '--input', uncompressed_apex_fp,
229        '--output', compressed_apex_fp
230    ])
231    return compressed_apex_fp
232
233  def _decompress_apex(self, compressed_apex_fp):
234    """Returns file path to decompressed APEX"""
235    decompressed_apex_fp = tempfile. \
236      NamedTemporaryFile(prefix=self._testMethodName + '_decompressed_',
237                         suffix='.apex').name
238    # Use deapexer to decompress
239    cmd = ['deapexer']
240    cmd.extend([
241        'decompress',
242        '--input', compressed_apex_fp,
243        '--output', decompressed_apex_fp
244    ])
245    run_and_check_output(cmd, True)
246
247    self.assertTrue(os.path.exists(decompressed_apex_fp),
248                    'Decompressed APEX does not exist')
249    self._to_cleanup.append(decompressed_apex_fp)
250    return decompressed_apex_fp
251
252  def _get_type(self, apex_file_path):
253    cmd = ['deapexer', 'info', '--print-type', apex_file_path]
254    return run_and_check_output(cmd, True).strip()
255
256  def test_compression(self):
257    uncompressed_apex_fp = self._get_test_apex()
258    # TODO(samiul): try compressing a compressed APEX
259    compressed_apex_fp = self._compress_apex(uncompressed_apex_fp)
260
261    # Verify output file has been created and is smaller than input file
262    uncompressed_file_size = os.path.getsize(uncompressed_apex_fp)
263    compressed_file_size = os.path.getsize(compressed_apex_fp)
264    self.assertGreater(compressed_file_size, 0, 'Compressed APEX is empty')
265    self.assertLess(compressed_file_size, uncompressed_file_size,
266                    'Compressed APEX is not smaller than uncompressed APEX')
267
268    # Verify type of the apex is 'COMPRESSED'
269    self.assertEqual(self._get_type(compressed_apex_fp), 'COMPRESSED')
270
271    # Verify the contents of the compressed apex files
272    content_in_compressed_apex = self._get_container_files(compressed_apex_fp)
273    self.assertIsNotNone(content_in_compressed_apex['original_apex'])
274    content_in_uncompressed_apex = self._get_container_files(
275        uncompressed_apex_fp)
276    self.assertIsNotNone(content_in_uncompressed_apex['apex_payload'])
277    self.assertIsNotNone(content_in_uncompressed_apex['digest'])
278
279    # Verify that CAPEX manifest contains digest of original_apex
280    manifest_string = self._get_manifest_string(
281        content_in_compressed_apex['apex_manifest.pb'])
282    self.assertIn('originalApexDigest: "'
283         + content_in_uncompressed_apex['digest'] + '"', manifest_string)
284
285    for i in ['apex_manifest.json', 'apex_manifest.pb', 'apex_pubkey',
286              'apex_build_info.pb', 'AndroidManifest.xml']:
287      if i in content_in_uncompressed_apex:
288        if i == 'apex_manifest.pb':
289          # Get rid of originalApexDigest field, which should be the
290          # only difference
291          self._unset_original_apex_digest(content_in_compressed_apex[i])
292        self.assertEqual(get_sha1sum(content_in_compressed_apex[i]),
293                         get_sha1sum(content_in_uncompressed_apex[i]))
294
295  def test_decompression(self):
296    # setup: create compressed APEX
297    uncompressed_apex_fp = self._get_test_apex()
298    compressed_apex_fp = self._compress_apex(uncompressed_apex_fp)
299
300    # Decompress it
301    decompressed_apex_fp = self._decompress_apex(compressed_apex_fp)
302
303    # Verify type of the apex is 'UNCOMPRESSED'
304    self.assertEqual(self._get_type(decompressed_apex_fp), 'UNCOMPRESSED')
305
306    # Verify decompressed APEX is same as uncompressed APEX
307    self.assertEqual(get_sha1sum(uncompressed_apex_fp),
308                     get_sha1sum(decompressed_apex_fp),
309                     'Decompressed APEX is not same as uncompressed APEX')
310
311    # Try decompressing uncompressed APEX. It should not work.
312    with self.assertRaises(RuntimeError) as error:
313      self._decompress_apex(uncompressed_apex_fp)
314
315    self.assertIn(uncompressed_apex_fp
316                  + ' is not a compressed APEX', str(error.exception))
317
318  def test_only_original_apex_is_compressed(self):
319    uncompressed_apex_fp = self._get_test_apex()
320    compressed_apex_fp = self._compress_apex(uncompressed_apex_fp)
321
322    with ZipFile(compressed_apex_fp, 'r') as zip_obj:
323      self.assertEqual(zip_obj.getinfo('original_apex').compress_type,
324                       ZIP_DEFLATED)
325      content_in_uncompressed_apex = self._get_container_files(
326          uncompressed_apex_fp)
327      for i in ['apex_manifest.json', 'apex_manifest.pb', 'apex_pubkey',
328                'apex_build_info.pb', 'AndroidManifest.xml']:
329        if i in content_in_uncompressed_apex:
330          self.assertEqual(zip_obj.getinfo(i).compress_type, ZIP_STORED)
331
332if __name__ == '__main__':
333  unittest.main(verbosity=2)
334