1#!/usr/bin/env python 2# 3# Copyright (C) 2020 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17"""Unit tests for apex_compression_tool.""" 18import hashlib 19import logging 20import os 21import shutil 22import stat 23import subprocess 24import tempfile 25import unittest 26from importlib import resources 27from zipfile import ZipFile, ZIP_STORED, ZIP_DEFLATED 28 29import apex_manifest_pb2 30 31logger = logging.getLogger(__name__) 32 33TEST_APEX = 'com.android.example.apex' 34 35# In order to debug test failures, set DEBUG_TEST to True and run the test from 36# local workstation bypassing atest, e.g.: 37# $ m apex_compression_tool_test && \ 38# out/host/linux-x86/nativetest64/apex_compression_tool_test/\ 39# apex_compression_tool_test 40# 41# the test will print out the command used, and the temporary files used by the 42# test. 43DEBUG_TEST = False 44 45 46def run(args, verbose=None, **kwargs): 47 """Creates and returns a subprocess.Popen object. 48 49 Args: 50 args: The command represented as a list of strings. 51 verbose: Whether the commands should be shown. Default to the global 52 verbosity if unspecified. 53 kwargs: Any additional args to be passed to subprocess.Popen(), such as env, 54 stdin, etc. stdout and stderr will default to subprocess.PIPE and 55 subprocess.STDOUT respectively unless caller specifies any of them. 56 universal_newlines will default to True, as most of the users in 57 releasetools expect string output. 58 59 Returns: 60 A subprocess.Popen object. 61 """ 62 if 'stdout' not in kwargs and 'stderr' not in kwargs: 63 kwargs['stdout'] = subprocess.PIPE 64 kwargs['stderr'] = subprocess.STDOUT 65 if 'universal_newlines' not in kwargs: 66 kwargs['universal_newlines'] = True 67 if DEBUG_TEST: 68 print('\nRunning: \n%s\n' % ' '.join(args)) 69 # Don't log any if caller explicitly says so. 70 if verbose: 71 logger.info(' Running: \'%s\'', ' '.join(args)) 72 return subprocess.Popen(args, **kwargs) 73 74 75def run_and_check_output(args, verbose=None, **kwargs): 76 """Runs the given command and returns the output. 77 78 Args: 79 args: The command represented as a list of strings. 80 verbose: Whether the commands should be shown. Default to the global 81 verbosity if unspecified. 82 kwargs: Any additional args to be passed to subprocess.Popen(), such as env, 83 stdin, etc. stdout and stderr will default to subprocess.PIPE and 84 subprocess.STDOUT respectively unless caller specifies any of them. 85 86 Returns: 87 The output string. 88 89 Raises: 90 ExternalError: On non-zero exit from the command. 91 """ 92 93 proc = run(args, verbose=verbose, **kwargs) 94 output, _ = proc.communicate() 95 if output is None: 96 output = '' 97 # Don't log any if caller explicitly says so. 98 if verbose: 99 logger.info('%s', output.rstrip()) 100 if proc.returncode != 0: 101 raise RuntimeError( 102 "Failed to run command '{}' (exit code {}):\n{}".format( 103 args, proc.returncode, output)) 104 return output 105 106 107def get_current_dir(): 108 """Returns the current dir, relative to the script dir.""" 109 # The script dir is the one we want, which could be different from pwd. 110 current_dir = os.path.dirname(os.path.realpath(__file__)) 111 return current_dir 112 113 114def get_sha1sum(file_path): 115 h = hashlib.sha256() 116 117 with open(file_path, 'rb') as file: 118 while True: 119 # Reading is buffered, so we can read smaller chunks. 120 chunk = file.read(h.block_size) 121 if not chunk: 122 break 123 h.update(chunk) 124 125 return h.hexdigest() 126 127 128class ApexCompressionTest(unittest.TestCase): 129 def setUp(self): 130 self._to_cleanup = [] 131 self._get_host_tools() 132 133 def tearDown(self): 134 if not DEBUG_TEST: 135 for i in self._to_cleanup: 136 if os.path.isdir(i): 137 shutil.rmtree(i, ignore_errors=True) 138 else: 139 os.remove(i) 140 del self._to_cleanup[:] 141 else: 142 print('Cleanup: ' + str(self._to_cleanup)) 143 144 def _get_host_tools(self): 145 dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_host_tools_") 146 self._to_cleanup.append(dir_name) 147 for tool in ["avbtool", "conv_apex_manifest", "apex_compression_tool", "deapexer", "soong_zip"]: 148 with ( 149 resources.files("apex_compression_test").joinpath(tool).open('rb') as tool_resource, 150 open(os.path.join(dir_name, tool), 'wb') as f 151 ): 152 shutil.copyfileobj(tool_resource, f) 153 os.chmod(os.path.join(dir_name, tool), stat.S_IRUSR | stat.S_IXUSR) 154 os.environ['APEX_COMPRESSION_TOOL_PATH'] = dir_name 155 path = dir_name 156 if "PATH" in os.environ: 157 path += ":" + os.environ["PATH"] 158 os.environ["PATH"] = path 159 160 def _get_test_apex(self): 161 tmpdir = tempfile.mkdtemp() 162 self._to_cleanup.append(tmpdir) 163 apexPath = os.path.join(tmpdir, TEST_APEX + '.apex') 164 with ( 165 resources.files('apex_compression_test').joinpath(TEST_APEX + '.apex').open('rb') as f, 166 open(apexPath, 'wb') as f2, 167 ): 168 shutil.copyfileobj(f, f2) 169 170 return apexPath 171 172 def _get_container_files(self, apex_file_path): 173 dir_name = tempfile.mkdtemp( 174 prefix=self._testMethodName + '_container_files_') 175 self._to_cleanup.append(dir_name) 176 with ZipFile(apex_file_path, 'r') as zip_obj: 177 zip_obj.extractall(path=dir_name) 178 files = {} 179 for i in ['apex_manifest.json', 'apex_manifest.pb', 'apex_pubkey', 180 'apex_build_info.pb', 'apex_payload.img', 'apex_payload.zip', 181 'AndroidManifest.xml', 'original_apex']: 182 file_path = os.path.join(dir_name, i) 183 if os.path.exists(file_path): 184 files[i] = file_path 185 186 image_file = files.get('apex_payload.img', None) 187 if image_file is None: 188 image_file = files.get('apex_payload.zip', None) 189 else: 190 files['apex_payload'] = image_file 191 # Also retrieve the root digest of the image 192 avbtool_cmd = ['avbtool', 193 'print_partition_digests', '--image', files['apex_payload']] 194 # avbtool_cmd output has format "<name>: <value>" 195 files['digest'] = run_and_check_output( 196 avbtool_cmd, True).split(': ')[1].strip() 197 198 return files 199 200 def _get_manifest_string(self, manifest_path): 201 cmd = ['conv_apex_manifest'] 202 cmd.extend([ 203 'print', 204 manifest_path 205 ]) 206 return run_and_check_output(cmd, 'True') 207 208 # Mutates the manifest located at |manifest_path| 209 def _unset_original_apex_digest(self, manifest_path): 210 # Open the protobuf 211 with open(manifest_path, 'rb') as f: 212 pb = apex_manifest_pb2.ApexManifest() 213 pb.ParseFromString(f.read()) 214 pb.ClearField('capexMetadata') 215 with open(manifest_path, 'wb') as f: 216 f.write(pb.SerializeToString()) 217 218 def _compress_apex(self, uncompressed_apex_fp): 219 """Returns file path to compressed APEX""" 220 fd, compressed_apex_fp = tempfile.mkstemp( 221 prefix=self._testMethodName + '_compressed_', 222 suffix='.capex') 223 os.close(fd) 224 self._to_cleanup.append(compressed_apex_fp) 225 run_and_check_output([ 226 'apex_compression_tool', 227 'compress', 228 '--input', uncompressed_apex_fp, 229 '--output', compressed_apex_fp 230 ]) 231 return compressed_apex_fp 232 233 def _decompress_apex(self, compressed_apex_fp): 234 """Returns file path to decompressed APEX""" 235 decompressed_apex_fp = tempfile. \ 236 NamedTemporaryFile(prefix=self._testMethodName + '_decompressed_', 237 suffix='.apex').name 238 # Use deapexer to decompress 239 cmd = ['deapexer'] 240 cmd.extend([ 241 'decompress', 242 '--input', compressed_apex_fp, 243 '--output', decompressed_apex_fp 244 ]) 245 run_and_check_output(cmd, True) 246 247 self.assertTrue(os.path.exists(decompressed_apex_fp), 248 'Decompressed APEX does not exist') 249 self._to_cleanup.append(decompressed_apex_fp) 250 return decompressed_apex_fp 251 252 def _get_type(self, apex_file_path): 253 cmd = ['deapexer', 'info', '--print-type', apex_file_path] 254 return run_and_check_output(cmd, True).strip() 255 256 def test_compression(self): 257 uncompressed_apex_fp = self._get_test_apex() 258 # TODO(samiul): try compressing a compressed APEX 259 compressed_apex_fp = self._compress_apex(uncompressed_apex_fp) 260 261 # Verify output file has been created and is smaller than input file 262 uncompressed_file_size = os.path.getsize(uncompressed_apex_fp) 263 compressed_file_size = os.path.getsize(compressed_apex_fp) 264 self.assertGreater(compressed_file_size, 0, 'Compressed APEX is empty') 265 self.assertLess(compressed_file_size, uncompressed_file_size, 266 'Compressed APEX is not smaller than uncompressed APEX') 267 268 # Verify type of the apex is 'COMPRESSED' 269 self.assertEqual(self._get_type(compressed_apex_fp), 'COMPRESSED') 270 271 # Verify the contents of the compressed apex files 272 content_in_compressed_apex = self._get_container_files(compressed_apex_fp) 273 self.assertIsNotNone(content_in_compressed_apex['original_apex']) 274 content_in_uncompressed_apex = self._get_container_files( 275 uncompressed_apex_fp) 276 self.assertIsNotNone(content_in_uncompressed_apex['apex_payload']) 277 self.assertIsNotNone(content_in_uncompressed_apex['digest']) 278 279 # Verify that CAPEX manifest contains digest of original_apex 280 manifest_string = self._get_manifest_string( 281 content_in_compressed_apex['apex_manifest.pb']) 282 self.assertIn('originalApexDigest: "' 283 + content_in_uncompressed_apex['digest'] + '"', manifest_string) 284 285 for i in ['apex_manifest.json', 'apex_manifest.pb', 'apex_pubkey', 286 'apex_build_info.pb', 'AndroidManifest.xml']: 287 if i in content_in_uncompressed_apex: 288 if i == 'apex_manifest.pb': 289 # Get rid of originalApexDigest field, which should be the 290 # only difference 291 self._unset_original_apex_digest(content_in_compressed_apex[i]) 292 self.assertEqual(get_sha1sum(content_in_compressed_apex[i]), 293 get_sha1sum(content_in_uncompressed_apex[i])) 294 295 def test_decompression(self): 296 # setup: create compressed APEX 297 uncompressed_apex_fp = self._get_test_apex() 298 compressed_apex_fp = self._compress_apex(uncompressed_apex_fp) 299 300 # Decompress it 301 decompressed_apex_fp = self._decompress_apex(compressed_apex_fp) 302 303 # Verify type of the apex is 'UNCOMPRESSED' 304 self.assertEqual(self._get_type(decompressed_apex_fp), 'UNCOMPRESSED') 305 306 # Verify decompressed APEX is same as uncompressed APEX 307 self.assertEqual(get_sha1sum(uncompressed_apex_fp), 308 get_sha1sum(decompressed_apex_fp), 309 'Decompressed APEX is not same as uncompressed APEX') 310 311 # Try decompressing uncompressed APEX. It should not work. 312 with self.assertRaises(RuntimeError) as error: 313 self._decompress_apex(uncompressed_apex_fp) 314 315 self.assertIn(uncompressed_apex_fp 316 + ' is not a compressed APEX', str(error.exception)) 317 318 def test_only_original_apex_is_compressed(self): 319 uncompressed_apex_fp = self._get_test_apex() 320 compressed_apex_fp = self._compress_apex(uncompressed_apex_fp) 321 322 with ZipFile(compressed_apex_fp, 'r') as zip_obj: 323 self.assertEqual(zip_obj.getinfo('original_apex').compress_type, 324 ZIP_DEFLATED) 325 content_in_uncompressed_apex = self._get_container_files( 326 uncompressed_apex_fp) 327 for i in ['apex_manifest.json', 'apex_manifest.pb', 'apex_pubkey', 328 'apex_build_info.pb', 'AndroidManifest.xml']: 329 if i in content_in_uncompressed_apex: 330 self.assertEqual(zip_obj.getinfo(i).compress_type, ZIP_STORED) 331 332if __name__ == '__main__': 333 unittest.main(verbosity=2) 334