1# Copyright (c) 2012 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""Utilities for PyAuto.""" 6 7from __future__ import print_function 8 9import httplib 10import logging 11import os 12import shutil 13import socket 14import sys 15import tempfile 16import unittest 17import urlparse 18import zipfile 19 20 21class ExistingPathReplacer(object): 22 """Facilitates backing up a given path (file or dir).. 23 24 Often you want to manipulate a directory or file for testing but don't want to 25 meddle with the existing contents. This class lets you make a backup, and 26 reinstate the backup when done. A backup is made in an adjacent directory, 27 so you need to make sure you have write permissions to the parent directory. 28 29 Works seemlessly in cases where the requested path already exists, or not. 30 31 Automatically reinstates the backed up path (if any) when object is deleted. 32 """ 33 _path = '' 34 _backup_dir = None # dir to which existing content is backed up 35 _backup_basename = '' 36 37 def __init__(self, path, path_type='dir'): 38 """Initialize the object, making backups if necessary. 39 40 Args: 41 path: the requested path to file or directory 42 path_type: path type. Options: 'file', 'dir'. Default: 'dir' 43 """ 44 assert path_type in ('file', 45 'dir'), 'Invalid path_type: %s' % path_type 46 self._path_type = path_type 47 self._path = path 48 if os.path.exists(self._path): 49 if 'dir' == self._path_type: 50 assert os.path.isdir( 51 self._path), '%s is not a directory' % self._path 52 else: 53 assert os.path.isfile( 54 self._path), '%s is not a file' % self._path 55 # take a backup 56 self._backup_basename = os.path.basename(self._path) 57 self._backup_dir = tempfile.mkdtemp( 58 dir=os.path.dirname(self._path), 59 prefix='bkp-' + self._backup_basename) 60 logging.info('Backing up %s in %s' % 61 (self._path, self._backup_dir)) 62 shutil.move(self._path, 63 os.path.join(self._backup_dir, self._backup_basename)) 64 self._CreateRequestedPath() 65 66 def __del__(self): 67 """Cleanup. Reinstate backup.""" 68 self._CleanupRequestedPath() 69 if self._backup_dir: # Reinstate, if backed up. 70 from_path = os.path.join(self._backup_dir, self._backup_basename) 71 logging.info('Reinstating backup from %s to %s' % 72 (from_path, self._path)) 73 shutil.move(from_path, self._path) 74 self._RemoveBackupDir() 75 76 def _CreateRequestedPath(self): 77 # Create intermediate dirs if needed. 78 if not os.path.exists(os.path.dirname(self._path)): 79 os.makedirs(os.path.dirname(self._path)) 80 if 'dir' == self._path_type: 81 os.mkdir(self._path) 82 else: 83 open(self._path, 'w').close() 84 85 def _CleanupRequestedPath(self): 86 if os.path.exists(self._path): 87 if os.path.isdir(self._path): 88 shutil.rmtree(self._path, ignore_errors=True) 89 else: 90 os.remove(self._path) 91 92 def _RemoveBackupDir(self): 93 if self._backup_dir and os.path.isdir(self._backup_dir): 94 shutil.rmtree(self._backup_dir, ignore_errors=True) 95 96 97def RemovePath(path): 98 """Remove the given path (file or dir).""" 99 if os.path.isdir(path): 100 shutil.rmtree(path, ignore_errors=True) 101 return 102 try: 103 os.remove(path) 104 except OSError: 105 pass 106 107 108def UnzipFilenameToDir(filename, dir): 109 """Unzip |filename| to directory |dir|. 110 111 This works with as low as python2.4 (used on win). 112 """ 113 zf = zipfile.ZipFile(filename) 114 pushd = os.getcwd() 115 if not os.path.isdir(dir): 116 os.mkdir(dir) 117 os.chdir(dir) 118 # Extract files. 119 for info in zf.infolist(): 120 name = info.filename 121 if name.endswith('/'): # dir 122 if not os.path.isdir(name): 123 os.makedirs(name) 124 else: # file 125 dir = os.path.dirname(name) 126 if not os.path.isdir(dir): 127 os.makedirs(dir) 128 out = open(name, 'wb') 129 out.write(zf.read(name)) 130 out.close() 131 # Set permissions. Permission info in external_attr is shifted 16 bits. 132 os.chmod(name, info.external_attr >> 16) 133 os.chdir(pushd) 134 135 136def GetCurrentPlatform(): 137 """Get a string representation for the current platform. 138 139 Returns: 140 'mac', 'win' or 'linux' 141 """ 142 if sys.platform == 'darwin': 143 return 'mac' 144 if sys.platform == 'win32': 145 return 'win' 146 if sys.platform.startswith('linux'): 147 return 'linux' 148 raise RuntimeError('Unknown platform') 149 150 151def PrintPerfResult(graph_name, series_name, data_point, units, 152 show_on_waterfall=False): 153 """Prints a line to stdout that is specially formatted for the perf bots. 154 155 Args: 156 graph_name: String name for the graph on which to plot the data. 157 series_name: String name for the series (line on the graph) associated with 158 the data. This is also the string displayed on the waterfall 159 if |show_on_waterfall| is True. 160 data_point: Numeric data value to plot on the graph for the current build. 161 This can be a single value or an array of values. If an array, 162 the graph will plot the average of the values, along with error 163 bars. 164 units: The string unit of measurement for the given |data_point|. 165 show_on_waterfall: Whether or not to display this result directly on the 166 buildbot waterfall itself (in the buildbot step running 167 this test on the waterfall page, not the stdio page). 168 """ 169 waterfall_indicator = ['', '*'][show_on_waterfall] 170 print('%sRESULT %s: %s= %s %s' % 171 (waterfall_indicator, graph_name, series_name, 172 str(data_point).replace(' ', ''), units)) 173 sys.stdout.flush() 174 175 176def Shard(ilist, shard_index, num_shards): 177 """Shard a given list and return the group at index |shard_index|. 178 179 Args: 180 ilist: input list 181 shard_index: 0-based sharding index 182 num_shards: shard count 183 """ 184 chunk_size = len(ilist) / num_shards 185 chunk_start = shard_index * chunk_size 186 if shard_index == num_shards - 1: # Exhaust the remainder in the last shard. 187 chunk_end = len(ilist) 188 else: 189 chunk_end = chunk_start + chunk_size 190 return ilist[chunk_start:chunk_end] 191 192 193def WaitForDomElement(pyauto, driver, xpath): 194 """Wait for the UI element to appear. 195 196 Args: 197 pyauto: an instance of pyauto.PyUITest. 198 driver: an instance of chrome driver or a web element. 199 xpath: the xpath of the element to wait for. 200 201 Returns: 202 The element if it is found. 203 NoSuchElementException if it is not found. 204 """ 205 pyauto.WaitUntil(lambda: len(driver.find_elements_by_xpath(xpath)) > 0) 206 return driver.find_element_by_xpath(xpath) 207 208 209def DoesUrlExist(url): 210 """Determines whether a resource exists at the given URL. 211 212 Args: 213 url: URL to be verified. 214 215 Returns: 216 True if url exists, otherwise False. 217 """ 218 parsed = urlparse.urlparse(url) 219 try: 220 conn = httplib.HTTPConnection(parsed.netloc) 221 conn.request('HEAD', parsed.path) 222 response = conn.getresponse() 223 except (socket.gaierror, socket.error): 224 return False 225 finally: 226 conn.close() 227 # Follow both permanent (301) and temporary (302) redirects. 228 if response.status == 302 or response.status == 301: 229 return DoesUrlExist(response.getheader('location')) 230 return response.status == 200 231 232 233class _GTestTextTestResult(unittest._TextTestResult): 234 """A test result class that can print formatted text results to a stream. 235 236 Results printed in conformance with gtest output format, like: 237 [ RUN ] autofill.AutofillTest.testAutofillInvalid: "test desc." 238 [ OK ] autofill.AutofillTest.testAutofillInvalid 239 [ RUN ] autofill.AutofillTest.testFillProfile: "test desc." 240 [ OK ] autofill.AutofillTest.testFillProfile 241 [ RUN ] autofill.AutofillTest.testFillProfileComplexCharacters: "Test." 242 [ OK ] autofill.AutofillTest.testFillProfileComplexCharacters 243 """ 244 245 def __init__(self, stream, descriptions, verbosity): 246 unittest._TextTestResult.__init__(self, stream, descriptions, 247 verbosity) 248 249 def _GetTestURI(self, test): 250 return '%s.%s' % (unittest._strclass( 251 test.__class__), test._testMethodName) 252 253 def getDescription(self, test): 254 return '%s: "%s"' % (self._GetTestURI(test), test.shortDescription()) 255 256 def startTest(self, test): 257 unittest.TestResult.startTest(self, test) 258 self.stream.writeln('[ RUN ] %s' % self.getDescription(test)) 259 260 def addSuccess(self, test): 261 unittest.TestResult.addSuccess(self, test) 262 self.stream.writeln('[ OK ] %s' % self._GetTestURI(test)) 263 264 def addError(self, test, err): 265 unittest.TestResult.addError(self, test, err) 266 self.stream.writeln('[ ERROR ] %s' % self._GetTestURI(test)) 267 268 def addFailure(self, test, err): 269 unittest.TestResult.addFailure(self, test, err) 270 self.stream.writeln('[ FAILED ] %s' % self._GetTestURI(test)) 271 272 273class GTestTextTestRunner(unittest.TextTestRunner): 274 """Test Runner for displaying test results in textual format. 275 276 Results are displayed in conformance with gtest output. 277 """ 278 279 def __init__(self, verbosity=1): 280 unittest.TextTestRunner.__init__(self, 281 stream=sys.stderr, 282 verbosity=verbosity) 283 284 def _makeResult(self): 285 return _GTestTextTestResult(self.stream, self.descriptions, 286 self.verbosity) 287