1# Copyright 2016 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import datetime 16import json 17import os 18 19import mock 20import pytest 21from six.moves import http_client 22from six.moves import reload_module 23 24from google.auth import _helpers 25from google.auth import environment_vars 26from google.auth import exceptions 27from google.auth import transport 28from google.auth.compute_engine import _metadata 29 30PATH = "instance/service-accounts/default" 31 32 33def make_request(data, status=http_client.OK, headers=None, retry=False): 34 response = mock.create_autospec(transport.Response, instance=True) 35 response.status = status 36 response.data = _helpers.to_bytes(data) 37 response.headers = headers or {} 38 39 request = mock.create_autospec(transport.Request) 40 if retry: 41 request.side_effect = [exceptions.TransportError(), response] 42 else: 43 request.return_value = response 44 45 return request 46 47 48def test_ping_success(): 49 request = make_request("", headers=_metadata._METADATA_HEADERS) 50 51 assert _metadata.ping(request) 52 53 request.assert_called_once_with( 54 method="GET", 55 url=_metadata._METADATA_IP_ROOT, 56 headers=_metadata._METADATA_HEADERS, 57 timeout=_metadata._METADATA_DEFAULT_TIMEOUT, 58 ) 59 60 61def test_ping_success_retry(): 62 request = make_request("", headers=_metadata._METADATA_HEADERS, retry=True) 63 64 assert _metadata.ping(request) 65 66 request.assert_called_with( 67 method="GET", 68 url=_metadata._METADATA_IP_ROOT, 69 headers=_metadata._METADATA_HEADERS, 70 timeout=_metadata._METADATA_DEFAULT_TIMEOUT, 71 ) 72 assert request.call_count == 2 73 74 75def test_ping_failure_bad_flavor(): 76 request = make_request("", headers={_metadata._METADATA_FLAVOR_HEADER: "meep"}) 77 78 assert not _metadata.ping(request) 79 80 81def test_ping_failure_connection_failed(): 82 request = make_request("") 83 request.side_effect = exceptions.TransportError() 84 85 assert not _metadata.ping(request) 86 87 88def test_ping_success_custom_root(): 89 request = make_request("", headers=_metadata._METADATA_HEADERS) 90 91 fake_ip = "1.2.3.4" 92 os.environ[environment_vars.GCE_METADATA_IP] = fake_ip 93 reload_module(_metadata) 94 95 try: 96 assert _metadata.ping(request) 97 finally: 98 del os.environ[environment_vars.GCE_METADATA_IP] 99 reload_module(_metadata) 100 101 request.assert_called_once_with( 102 method="GET", 103 url="http://" + fake_ip, 104 headers=_metadata._METADATA_HEADERS, 105 timeout=_metadata._METADATA_DEFAULT_TIMEOUT, 106 ) 107 108 109def test_get_success_json(): 110 key, value = "foo", "bar" 111 112 data = json.dumps({key: value}) 113 request = make_request(data, headers={"content-type": "application/json"}) 114 115 result = _metadata.get(request, PATH) 116 117 request.assert_called_once_with( 118 method="GET", 119 url=_metadata._METADATA_ROOT + PATH, 120 headers=_metadata._METADATA_HEADERS, 121 ) 122 assert result[key] == value 123 124 125def test_get_success_retry(): 126 key, value = "foo", "bar" 127 128 data = json.dumps({key: value}) 129 request = make_request( 130 data, headers={"content-type": "application/json"}, retry=True 131 ) 132 133 result = _metadata.get(request, PATH) 134 135 request.assert_called_with( 136 method="GET", 137 url=_metadata._METADATA_ROOT + PATH, 138 headers=_metadata._METADATA_HEADERS, 139 ) 140 assert request.call_count == 2 141 assert result[key] == value 142 143 144def test_get_success_text(): 145 data = "foobar" 146 request = make_request(data, headers={"content-type": "text/plain"}) 147 148 result = _metadata.get(request, PATH) 149 150 request.assert_called_once_with( 151 method="GET", 152 url=_metadata._METADATA_ROOT + PATH, 153 headers=_metadata._METADATA_HEADERS, 154 ) 155 assert result == data 156 157 158def test_get_success_params(): 159 data = "foobar" 160 request = make_request(data, headers={"content-type": "text/plain"}) 161 params = {"recursive": "true"} 162 163 result = _metadata.get(request, PATH, params=params) 164 165 request.assert_called_once_with( 166 method="GET", 167 url=_metadata._METADATA_ROOT + PATH + "?recursive=true", 168 headers=_metadata._METADATA_HEADERS, 169 ) 170 assert result == data 171 172 173def test_get_success_recursive_and_params(): 174 data = "foobar" 175 request = make_request(data, headers={"content-type": "text/plain"}) 176 params = {"recursive": "false"} 177 result = _metadata.get(request, PATH, recursive=True, params=params) 178 179 request.assert_called_once_with( 180 method="GET", 181 url=_metadata._METADATA_ROOT + PATH + "?recursive=true", 182 headers=_metadata._METADATA_HEADERS, 183 ) 184 assert result == data 185 186 187def test_get_success_recursive(): 188 data = "foobar" 189 request = make_request(data, headers={"content-type": "text/plain"}) 190 191 result = _metadata.get(request, PATH, recursive=True) 192 193 request.assert_called_once_with( 194 method="GET", 195 url=_metadata._METADATA_ROOT + PATH + "?recursive=true", 196 headers=_metadata._METADATA_HEADERS, 197 ) 198 assert result == data 199 200 201def test_get_success_custom_root_new_variable(): 202 request = make_request("{}", headers={"content-type": "application/json"}) 203 204 fake_root = "another.metadata.service" 205 os.environ[environment_vars.GCE_METADATA_HOST] = fake_root 206 reload_module(_metadata) 207 208 try: 209 _metadata.get(request, PATH) 210 finally: 211 del os.environ[environment_vars.GCE_METADATA_HOST] 212 reload_module(_metadata) 213 214 request.assert_called_once_with( 215 method="GET", 216 url="http://{}/computeMetadata/v1/{}".format(fake_root, PATH), 217 headers=_metadata._METADATA_HEADERS, 218 ) 219 220 221def test_get_success_custom_root_old_variable(): 222 request = make_request("{}", headers={"content-type": "application/json"}) 223 224 fake_root = "another.metadata.service" 225 os.environ[environment_vars.GCE_METADATA_ROOT] = fake_root 226 reload_module(_metadata) 227 228 try: 229 _metadata.get(request, PATH) 230 finally: 231 del os.environ[environment_vars.GCE_METADATA_ROOT] 232 reload_module(_metadata) 233 234 request.assert_called_once_with( 235 method="GET", 236 url="http://{}/computeMetadata/v1/{}".format(fake_root, PATH), 237 headers=_metadata._METADATA_HEADERS, 238 ) 239 240 241def test_get_failure(): 242 request = make_request("Metadata error", status=http_client.NOT_FOUND) 243 244 with pytest.raises(exceptions.TransportError) as excinfo: 245 _metadata.get(request, PATH) 246 247 assert excinfo.match(r"Metadata error") 248 249 request.assert_called_once_with( 250 method="GET", 251 url=_metadata._METADATA_ROOT + PATH, 252 headers=_metadata._METADATA_HEADERS, 253 ) 254 255 256def test_get_failure_connection_failed(): 257 request = make_request("") 258 request.side_effect = exceptions.TransportError() 259 260 with pytest.raises(exceptions.TransportError) as excinfo: 261 _metadata.get(request, PATH) 262 263 assert excinfo.match(r"Compute Engine Metadata server unavailable") 264 265 request.assert_called_with( 266 method="GET", 267 url=_metadata._METADATA_ROOT + PATH, 268 headers=_metadata._METADATA_HEADERS, 269 ) 270 assert request.call_count == 5 271 272 273def test_get_failure_bad_json(): 274 request = make_request("{", headers={"content-type": "application/json"}) 275 276 with pytest.raises(exceptions.TransportError) as excinfo: 277 _metadata.get(request, PATH) 278 279 assert excinfo.match(r"invalid JSON") 280 281 request.assert_called_once_with( 282 method="GET", 283 url=_metadata._METADATA_ROOT + PATH, 284 headers=_metadata._METADATA_HEADERS, 285 ) 286 287 288def test_get_project_id(): 289 project = "example-project" 290 request = make_request(project, headers={"content-type": "text/plain"}) 291 292 project_id = _metadata.get_project_id(request) 293 294 request.assert_called_once_with( 295 method="GET", 296 url=_metadata._METADATA_ROOT + "project/project-id", 297 headers=_metadata._METADATA_HEADERS, 298 ) 299 assert project_id == project 300 301 302@mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min) 303def test_get_service_account_token(utcnow): 304 ttl = 500 305 request = make_request( 306 json.dumps({"access_token": "token", "expires_in": ttl}), 307 headers={"content-type": "application/json"}, 308 ) 309 310 token, expiry = _metadata.get_service_account_token(request) 311 312 request.assert_called_once_with( 313 method="GET", 314 url=_metadata._METADATA_ROOT + PATH + "/token", 315 headers=_metadata._METADATA_HEADERS, 316 ) 317 assert token == "token" 318 assert expiry == utcnow() + datetime.timedelta(seconds=ttl) 319 320 321@mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min) 322def test_get_service_account_token_with_scopes_list(utcnow): 323 ttl = 500 324 request = make_request( 325 json.dumps({"access_token": "token", "expires_in": ttl}), 326 headers={"content-type": "application/json"}, 327 ) 328 329 token, expiry = _metadata.get_service_account_token(request, scopes=["foo", "bar"]) 330 331 request.assert_called_once_with( 332 method="GET", 333 url=_metadata._METADATA_ROOT + PATH + "/token" + "?scopes=foo%2Cbar", 334 headers=_metadata._METADATA_HEADERS, 335 ) 336 assert token == "token" 337 assert expiry == utcnow() + datetime.timedelta(seconds=ttl) 338 339 340@mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min) 341def test_get_service_account_token_with_scopes_string(utcnow): 342 ttl = 500 343 request = make_request( 344 json.dumps({"access_token": "token", "expires_in": ttl}), 345 headers={"content-type": "application/json"}, 346 ) 347 348 token, expiry = _metadata.get_service_account_token(request, scopes="foo,bar") 349 350 request.assert_called_once_with( 351 method="GET", 352 url=_metadata._METADATA_ROOT + PATH + "/token" + "?scopes=foo%2Cbar", 353 headers=_metadata._METADATA_HEADERS, 354 ) 355 assert token == "token" 356 assert expiry == utcnow() + datetime.timedelta(seconds=ttl) 357 358 359def test_get_service_account_info(): 360 key, value = "foo", "bar" 361 request = make_request( 362 json.dumps({key: value}), headers={"content-type": "application/json"} 363 ) 364 365 info = _metadata.get_service_account_info(request) 366 367 request.assert_called_once_with( 368 method="GET", 369 url=_metadata._METADATA_ROOT + PATH + "/?recursive=true", 370 headers=_metadata._METADATA_HEADERS, 371 ) 372 373 assert info[key] == value 374