xref: /aosp_15_r20/external/autotest/server/cros/ap_configurators/pyauto_utils.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# Copyright (c) 2012 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Utilities for PyAuto."""
6
7from __future__ import print_function
8
9import httplib
10import logging
11import os
12import shutil
13import socket
14import sys
15import tempfile
16import unittest
17import urlparse
18import zipfile
19
20
21class ExistingPathReplacer(object):
22    """Facilitates backing up a given path (file or dir)..
23
24  Often you want to manipulate a directory or file for testing but don't want to
25  meddle with the existing contents.  This class lets you make a backup, and
26  reinstate the backup when done.  A backup is made in an adjacent directory,
27  so you need to make sure you have write permissions to the parent directory.
28
29  Works seemlessly in cases where the requested path already exists, or not.
30
31  Automatically reinstates the backed up path (if any) when object is deleted.
32  """
33    _path = ''
34    _backup_dir = None  # dir to which existing content is backed up
35    _backup_basename = ''
36
37    def __init__(self, path, path_type='dir'):
38        """Initialize the object, making backups if necessary.
39
40    Args:
41      path: the requested path to file or directory
42      path_type: path type. Options: 'file', 'dir'. Default: 'dir'
43    """
44        assert path_type in ('file',
45                             'dir'), 'Invalid path_type: %s' % path_type
46        self._path_type = path_type
47        self._path = path
48        if os.path.exists(self._path):
49            if 'dir' == self._path_type:
50                assert os.path.isdir(
51                        self._path), '%s is not a directory' % self._path
52            else:
53                assert os.path.isfile(
54                        self._path), '%s is not a file' % self._path
55            # take a backup
56            self._backup_basename = os.path.basename(self._path)
57            self._backup_dir = tempfile.mkdtemp(
58                    dir=os.path.dirname(self._path),
59                    prefix='bkp-' + self._backup_basename)
60            logging.info('Backing up %s in %s' %
61                         (self._path, self._backup_dir))
62            shutil.move(self._path,
63                        os.path.join(self._backup_dir, self._backup_basename))
64        self._CreateRequestedPath()
65
66    def __del__(self):
67        """Cleanup. Reinstate backup."""
68        self._CleanupRequestedPath()
69        if self._backup_dir:  # Reinstate, if backed up.
70            from_path = os.path.join(self._backup_dir, self._backup_basename)
71            logging.info('Reinstating backup from %s to %s' %
72                         (from_path, self._path))
73            shutil.move(from_path, self._path)
74        self._RemoveBackupDir()
75
76    def _CreateRequestedPath(self):
77        # Create intermediate dirs if needed.
78        if not os.path.exists(os.path.dirname(self._path)):
79            os.makedirs(os.path.dirname(self._path))
80        if 'dir' == self._path_type:
81            os.mkdir(self._path)
82        else:
83            open(self._path, 'w').close()
84
85    def _CleanupRequestedPath(self):
86        if os.path.exists(self._path):
87            if os.path.isdir(self._path):
88                shutil.rmtree(self._path, ignore_errors=True)
89            else:
90                os.remove(self._path)
91
92    def _RemoveBackupDir(self):
93        if self._backup_dir and os.path.isdir(self._backup_dir):
94            shutil.rmtree(self._backup_dir, ignore_errors=True)
95
96
97def RemovePath(path):
98    """Remove the given path (file or dir)."""
99    if os.path.isdir(path):
100        shutil.rmtree(path, ignore_errors=True)
101        return
102    try:
103        os.remove(path)
104    except OSError:
105        pass
106
107
108def UnzipFilenameToDir(filename, dir):
109    """Unzip |filename| to directory |dir|.
110
111  This works with as low as python2.4 (used on win).
112  """
113    zf = zipfile.ZipFile(filename)
114    pushd = os.getcwd()
115    if not os.path.isdir(dir):
116        os.mkdir(dir)
117    os.chdir(dir)
118    # Extract files.
119    for info in zf.infolist():
120        name = info.filename
121        if name.endswith('/'):  # dir
122            if not os.path.isdir(name):
123                os.makedirs(name)
124        else:  # file
125            dir = os.path.dirname(name)
126            if not os.path.isdir(dir):
127                os.makedirs(dir)
128            out = open(name, 'wb')
129            out.write(zf.read(name))
130            out.close()
131        # Set permissions. Permission info in external_attr is shifted 16 bits.
132        os.chmod(name, info.external_attr >> 16)
133    os.chdir(pushd)
134
135
136def GetCurrentPlatform():
137    """Get a string representation for the current platform.
138
139  Returns:
140    'mac', 'win' or 'linux'
141  """
142    if sys.platform == 'darwin':
143        return 'mac'
144    if sys.platform == 'win32':
145        return 'win'
146    if sys.platform.startswith('linux'):
147        return 'linux'
148    raise RuntimeError('Unknown platform')
149
150
151def PrintPerfResult(graph_name, series_name, data_point, units,
152                    show_on_waterfall=False):
153    """Prints a line to stdout that is specially formatted for the perf bots.
154
155  Args:
156    graph_name: String name for the graph on which to plot the data.
157    series_name: String name for the series (line on the graph) associated with
158                 the data.  This is also the string displayed on the waterfall
159                 if |show_on_waterfall| is True.
160    data_point: Numeric data value to plot on the graph for the current build.
161                This can be a single value or an array of values.  If an array,
162                the graph will plot the average of the values, along with error
163                bars.
164    units: The string unit of measurement for the given |data_point|.
165    show_on_waterfall: Whether or not to display this result directly on the
166                       buildbot waterfall itself (in the buildbot step running
167                       this test on the waterfall page, not the stdio page).
168  """
169    waterfall_indicator = ['', '*'][show_on_waterfall]
170    print('%sRESULT %s: %s= %s %s' %
171          (waterfall_indicator, graph_name, series_name,
172           str(data_point).replace(' ', ''), units))
173    sys.stdout.flush()
174
175
176def Shard(ilist, shard_index, num_shards):
177    """Shard a given list and return the group at index |shard_index|.
178
179  Args:
180    ilist: input list
181    shard_index: 0-based sharding index
182    num_shards: shard count
183  """
184    chunk_size = len(ilist) / num_shards
185    chunk_start = shard_index * chunk_size
186    if shard_index == num_shards - 1:  # Exhaust the remainder in the last shard.
187        chunk_end = len(ilist)
188    else:
189        chunk_end = chunk_start + chunk_size
190    return ilist[chunk_start:chunk_end]
191
192
193def WaitForDomElement(pyauto, driver, xpath):
194    """Wait for the UI element to appear.
195
196  Args:
197    pyauto: an instance of pyauto.PyUITest.
198    driver: an instance of chrome driver or a web element.
199    xpath: the xpath of the element to wait for.
200
201  Returns:
202    The element if it is found.
203    NoSuchElementException if it is not found.
204  """
205    pyauto.WaitUntil(lambda: len(driver.find_elements_by_xpath(xpath)) > 0)
206    return driver.find_element_by_xpath(xpath)
207
208
209def DoesUrlExist(url):
210    """Determines whether a resource exists at the given URL.
211
212  Args:
213    url: URL to be verified.
214
215  Returns:
216    True if url exists, otherwise False.
217  """
218    parsed = urlparse.urlparse(url)
219    try:
220        conn = httplib.HTTPConnection(parsed.netloc)
221        conn.request('HEAD', parsed.path)
222        response = conn.getresponse()
223    except (socket.gaierror, socket.error):
224        return False
225    finally:
226        conn.close()
227    # Follow both permanent (301) and temporary (302) redirects.
228    if response.status == 302 or response.status == 301:
229        return DoesUrlExist(response.getheader('location'))
230    return response.status == 200
231
232
233class _GTestTextTestResult(unittest._TextTestResult):
234    """A test result class that can print formatted text results to a stream.
235
236  Results printed in conformance with gtest output format, like:
237  [ RUN        ] autofill.AutofillTest.testAutofillInvalid: "test desc."
238  [         OK ] autofill.AutofillTest.testAutofillInvalid
239  [ RUN        ] autofill.AutofillTest.testFillProfile: "test desc."
240  [         OK ] autofill.AutofillTest.testFillProfile
241  [ RUN        ] autofill.AutofillTest.testFillProfileComplexCharacters: "Test."
242  [         OK ] autofill.AutofillTest.testFillProfileComplexCharacters
243  """
244
245    def __init__(self, stream, descriptions, verbosity):
246        unittest._TextTestResult.__init__(self, stream, descriptions,
247                                          verbosity)
248
249    def _GetTestURI(self, test):
250        return '%s.%s' % (unittest._strclass(
251                test.__class__), test._testMethodName)
252
253    def getDescription(self, test):
254        return '%s: "%s"' % (self._GetTestURI(test), test.shortDescription())
255
256    def startTest(self, test):
257        unittest.TestResult.startTest(self, test)
258        self.stream.writeln('[ RUN        ] %s' % self.getDescription(test))
259
260    def addSuccess(self, test):
261        unittest.TestResult.addSuccess(self, test)
262        self.stream.writeln('[         OK ] %s' % self._GetTestURI(test))
263
264    def addError(self, test, err):
265        unittest.TestResult.addError(self, test, err)
266        self.stream.writeln('[      ERROR ] %s' % self._GetTestURI(test))
267
268    def addFailure(self, test, err):
269        unittest.TestResult.addFailure(self, test, err)
270        self.stream.writeln('[     FAILED ] %s' % self._GetTestURI(test))
271
272
273class GTestTextTestRunner(unittest.TextTestRunner):
274    """Test Runner for displaying test results in textual format.
275
276  Results are displayed in conformance with gtest output.
277  """
278
279    def __init__(self, verbosity=1):
280        unittest.TextTestRunner.__init__(self,
281                                         stream=sys.stderr,
282                                         verbosity=verbosity)
283
284    def _makeResult(self):
285        return _GTestTextTestResult(self.stream, self.descriptions,
286                                    self.verbosity)
287