1# Lint as: python2, python3
2# Copyright 2019 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""Test to validate basic, critical servo controls for the lab work."""
7
8import logging
9import os
10import time
11
12from autotest_lib.client.common_lib import error
13from autotest_lib.server import test
14from autotest_lib.server.hosts import servo_host
15from autotest_lib.server.hosts import servo_constants
16
17class servo_LabControlVerification(test.test):
18    """Ensure control list works, ensure all consoles are talkable."""
19    version = 1
20
21    CTRLS_FILE = 'control_sequence.txt'
22
23    def load_ctrl_sequence(self):
24        """Look for |CTRLS_FILE| file in same directory to load sequence."""
25        ctrls = []
26        current_dir = os.path.dirname(os.path.realpath(__file__))
27        ctrl_path = os.path.join(current_dir, self.CTRLS_FILE)
28        if not os.path.exists(ctrl_path):
29            raise error.TestError('File %s needs to exist.' % self.CTRLS_FILE)
30        with open(ctrl_path, 'r') as f:
31            raw_ctrl_sequence = f.read().strip().split('\n')
32        # In addition to the common file, there might be controls that can
33        # only be tested for specific versions of servo. Load the additional
34        # controls for that servo.
35        self.servo_version = self.servo_proxy.get_servo_version()
36        for servo_type in self.servo_version.split('_with_'):
37            # The split with 'with' is the current servo convention for multi
38            # systems such as servo_v4_with_servo_micro.
39            servo_type_sequence = '%s_%s' % (servo_type, self.CTRLS_FILE)
40            servo_type_sequence_path = os.path.join(current_dir,
41                                                    servo_type_sequence)
42            if os.path.exists(servo_type_sequence_path):
43                with open(servo_type_sequence_path, 'r') as f:
44                    raw_ctrl_sequence.extend(f.read().strip().split('\n'))
45            else:
46                logging.debug('No control sequence file found under %s',
47                              servo_type_sequence)
48        control_sequence = [raw_ctrl.split(':') for raw_ctrl in
49                            raw_ctrl_sequence]
50        for ctrl_elems in control_sequence:
51            # Note: the attributes are named after the arguments expected in
52            # servo.py to be able to use the dictionary as a kwargs. Be mindful
53            # of changing them &| keep them in sync.
54            ctrl = {'ctrl_name': ctrl_elems[0]}
55            if len(ctrl_elems) == 2:
56                # This a set servod control.
57                ctrl['ctrl_value'] = ctrl_elems[1]
58            elif len(ctrl_elems) > 2:
59                logging.warning('The line containing %r in the control sequence '
60                             'file has an unkown format. Ignoring for now.',
61                             ctrl)
62            ctrls.append(ctrl)
63        return ctrls
64
65    def assert_servod_running(self, host):
66        """Assert that servod is running on the host.
67
68        Args:
69          host: host running servod
70
71        Raises:
72          AutoservRunError: if servod is not running on the host.
73        """
74        host.run_grep('servodutil show -p %d' % self.servo_port,
75                      stdout_err_regexp='No servod scratch entry found')
76
77    def initialize(self, host, port=9999, board='nami'):
78        """Check whether servo is already running on |port| and start if not.
79
80        Args:
81          port: port on which servod should run
82          board: board to start servod with, if servod not already running
83
84        raises:
85          error.AutoservRunError if trying to start servod and it fails
86        """
87        # TODO(coconutruben): board is set to nami for now as that will allow
88        # servod to come up and the nami overlay does not have any complex changes
89        # from normal boards. When the new servod is rolled out and it can infer
90        # board names itself, remove the board attribute here.
91        self.servo_port = port
92        self.test_managed_servod = False
93        try:
94            self.assert_servod_running(host=host)
95            logging.debug('Servod already running.')
96        except error.AutoservRunError:
97            self.test_managed_servod = True
98            logging.debug('Servod not running. Test will manage it.')
99        if self.test_managed_servod:
100            # This indicates the test should start (and manage) servod itself.
101            host.run_background('sudo servod -p %d -b %s' % (self.servo_port,
102                                                             board))
103            # Give servod plenty of time to come up.
104            time.sleep(20)
105            self.assert_servod_running(host=host)
106        # Servod came up successfully - build a servo host and use it to verify
107        # basic functionality.
108        servo_args = {servo_constants.SERVO_HOST_ATTR: host.hostname,
109                      servo_constants.SERVO_PORT_ATTR: 9999}
110        self.servo_host_proxy = servo_host.ServoHost(is_in_lab=False,
111                                                     **servo_args)
112        self.servo_host_proxy.connect_servo()
113        self.servo_proxy = self.servo_host_proxy.get_servo()
114        self.ctrls = self.load_ctrl_sequence()
115
116    def run_once(self):
117        """Go through control sequence, and verify each of Cr50, EC, AP UART."""
118        failed = False
119        for ctrl in self.ctrls:
120            # Depending on the number of elements in the control determine
121            # whether it's 'get' or 'set'
122            if len(ctrl) == 1:
123                ctrl_type = 'get'
124                ctrl_func = self.servo_proxy.get
125            if len(ctrl) == 2:
126                ctrl_type = 'set'
127                ctrl_func = self.servo_proxy.set_nocheck
128            logstr = 'About to %s control %r' % (ctrl_type, ctrl['ctrl_name'])
129            if ctrl_type == 'set':
130                logstr = '%s to %s' % (logstr, ctrl['ctrl_value'])
131            logging.info(logstr)
132            try:
133                ctrl_func(**ctrl)
134                logging.info('Success running %r', ctrl['ctrl_name'])
135            except error.TestFail as e:
136                failed = True
137                logging.error('Error running %r. %s', ctrl['ctrl_name'], str(e))
138        if self.servo_version != 'servo_v3':
139            # Servo V3 does not support Cr50 console. Skip this verification.
140            try:
141                self.servo_proxy.get('cr50_version')
142                logging.info('Success talking on the Cr50 UART.')
143            except error.TestFail as e:
144                failed = True
145                logging.error('Failed to get output on the Cr50 UART.')
146        try:
147            self.servo_proxy.get('ec_board')
148            logging.info('Success talking on the EC UART.')
149        except error.TestFail as e:
150            failed = True
151            logging.error('Failed to get output on the EC UART.')
152        try:
153            # Note: Since the AP UART has a login challange, the way we verify
154            # whether there is output is by rebooting the device, and catching
155            # any coreboot or kernel output.
156            self.servo_proxy.set_nocheck('cpu_uart_capture', 'on')
157            self.servo_proxy.set_nocheck('cold_reset', 'on')
158            self.servo_proxy.set_nocheck('cold_reset', 'off')
159            # Give the system 7s to reboot and fill some data in the AP UART
160            # buffer.
161            time.sleep(7)
162            content = self.servo_proxy.get('cpu_uart_stream')
163            if content:
164                logging.debug('Content from the AP UART: %s', str(content))
165                logging.info('Success talking on the AP UART.')
166            else:
167                raise error.TestFail('No output found on AP UART after reboot.')
168        except error.TestFail as e:
169            failed = True
170            logging.error('Failed to get output on the AP UART.')
171        if failed:
172            raise error.TestFail('At least one control failed. Check logs.')
173
174    def cleanup(self, host):
175        """Stop servod if the test started it."""
176        self.servo_host_proxy.close()
177        if self.test_managed_servod:
178            # If test started servod, then turn it off.
179            # Note: this is still valid even if there was a failure on init
180            # to start servod as it's a noop if no servod is on that port.
181            host.run_background('sudo servodutil stop -p %d' % self.servo_port)
182