1# Copyright 2016 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import datetime
16import json
17import os
18
19import mock
20import pytest
21from six.moves import http_client
22from six.moves import reload_module
23
24from google.auth import _helpers
25from google.auth import environment_vars
26from google.auth import exceptions
27from google.auth import transport
28from google.auth.compute_engine import _metadata
29
30PATH = "instance/service-accounts/default"
31
32
33def make_request(data, status=http_client.OK, headers=None, retry=False):
34    response = mock.create_autospec(transport.Response, instance=True)
35    response.status = status
36    response.data = _helpers.to_bytes(data)
37    response.headers = headers or {}
38
39    request = mock.create_autospec(transport.Request)
40    if retry:
41        request.side_effect = [exceptions.TransportError(), response]
42    else:
43        request.return_value = response
44
45    return request
46
47
48def test_ping_success():
49    request = make_request("", headers=_metadata._METADATA_HEADERS)
50
51    assert _metadata.ping(request)
52
53    request.assert_called_once_with(
54        method="GET",
55        url=_metadata._METADATA_IP_ROOT,
56        headers=_metadata._METADATA_HEADERS,
57        timeout=_metadata._METADATA_DEFAULT_TIMEOUT,
58    )
59
60
61def test_ping_success_retry():
62    request = make_request("", headers=_metadata._METADATA_HEADERS, retry=True)
63
64    assert _metadata.ping(request)
65
66    request.assert_called_with(
67        method="GET",
68        url=_metadata._METADATA_IP_ROOT,
69        headers=_metadata._METADATA_HEADERS,
70        timeout=_metadata._METADATA_DEFAULT_TIMEOUT,
71    )
72    assert request.call_count == 2
73
74
75def test_ping_failure_bad_flavor():
76    request = make_request("", headers={_metadata._METADATA_FLAVOR_HEADER: "meep"})
77
78    assert not _metadata.ping(request)
79
80
81def test_ping_failure_connection_failed():
82    request = make_request("")
83    request.side_effect = exceptions.TransportError()
84
85    assert not _metadata.ping(request)
86
87
88def test_ping_success_custom_root():
89    request = make_request("", headers=_metadata._METADATA_HEADERS)
90
91    fake_ip = "1.2.3.4"
92    os.environ[environment_vars.GCE_METADATA_IP] = fake_ip
93    reload_module(_metadata)
94
95    try:
96        assert _metadata.ping(request)
97    finally:
98        del os.environ[environment_vars.GCE_METADATA_IP]
99        reload_module(_metadata)
100
101    request.assert_called_once_with(
102        method="GET",
103        url="http://" + fake_ip,
104        headers=_metadata._METADATA_HEADERS,
105        timeout=_metadata._METADATA_DEFAULT_TIMEOUT,
106    )
107
108
109def test_get_success_json():
110    key, value = "foo", "bar"
111
112    data = json.dumps({key: value})
113    request = make_request(data, headers={"content-type": "application/json"})
114
115    result = _metadata.get(request, PATH)
116
117    request.assert_called_once_with(
118        method="GET",
119        url=_metadata._METADATA_ROOT + PATH,
120        headers=_metadata._METADATA_HEADERS,
121    )
122    assert result[key] == value
123
124
125def test_get_success_retry():
126    key, value = "foo", "bar"
127
128    data = json.dumps({key: value})
129    request = make_request(
130        data, headers={"content-type": "application/json"}, retry=True
131    )
132
133    result = _metadata.get(request, PATH)
134
135    request.assert_called_with(
136        method="GET",
137        url=_metadata._METADATA_ROOT + PATH,
138        headers=_metadata._METADATA_HEADERS,
139    )
140    assert request.call_count == 2
141    assert result[key] == value
142
143
144def test_get_success_text():
145    data = "foobar"
146    request = make_request(data, headers={"content-type": "text/plain"})
147
148    result = _metadata.get(request, PATH)
149
150    request.assert_called_once_with(
151        method="GET",
152        url=_metadata._METADATA_ROOT + PATH,
153        headers=_metadata._METADATA_HEADERS,
154    )
155    assert result == data
156
157
158def test_get_success_params():
159    data = "foobar"
160    request = make_request(data, headers={"content-type": "text/plain"})
161    params = {"recursive": "true"}
162
163    result = _metadata.get(request, PATH, params=params)
164
165    request.assert_called_once_with(
166        method="GET",
167        url=_metadata._METADATA_ROOT + PATH + "?recursive=true",
168        headers=_metadata._METADATA_HEADERS,
169    )
170    assert result == data
171
172
173def test_get_success_recursive_and_params():
174    data = "foobar"
175    request = make_request(data, headers={"content-type": "text/plain"})
176    params = {"recursive": "false"}
177    result = _metadata.get(request, PATH, recursive=True, params=params)
178
179    request.assert_called_once_with(
180        method="GET",
181        url=_metadata._METADATA_ROOT + PATH + "?recursive=true",
182        headers=_metadata._METADATA_HEADERS,
183    )
184    assert result == data
185
186
187def test_get_success_recursive():
188    data = "foobar"
189    request = make_request(data, headers={"content-type": "text/plain"})
190
191    result = _metadata.get(request, PATH, recursive=True)
192
193    request.assert_called_once_with(
194        method="GET",
195        url=_metadata._METADATA_ROOT + PATH + "?recursive=true",
196        headers=_metadata._METADATA_HEADERS,
197    )
198    assert result == data
199
200
201def test_get_success_custom_root_new_variable():
202    request = make_request("{}", headers={"content-type": "application/json"})
203
204    fake_root = "another.metadata.service"
205    os.environ[environment_vars.GCE_METADATA_HOST] = fake_root
206    reload_module(_metadata)
207
208    try:
209        _metadata.get(request, PATH)
210    finally:
211        del os.environ[environment_vars.GCE_METADATA_HOST]
212        reload_module(_metadata)
213
214    request.assert_called_once_with(
215        method="GET",
216        url="http://{}/computeMetadata/v1/{}".format(fake_root, PATH),
217        headers=_metadata._METADATA_HEADERS,
218    )
219
220
221def test_get_success_custom_root_old_variable():
222    request = make_request("{}", headers={"content-type": "application/json"})
223
224    fake_root = "another.metadata.service"
225    os.environ[environment_vars.GCE_METADATA_ROOT] = fake_root
226    reload_module(_metadata)
227
228    try:
229        _metadata.get(request, PATH)
230    finally:
231        del os.environ[environment_vars.GCE_METADATA_ROOT]
232        reload_module(_metadata)
233
234    request.assert_called_once_with(
235        method="GET",
236        url="http://{}/computeMetadata/v1/{}".format(fake_root, PATH),
237        headers=_metadata._METADATA_HEADERS,
238    )
239
240
241def test_get_failure():
242    request = make_request("Metadata error", status=http_client.NOT_FOUND)
243
244    with pytest.raises(exceptions.TransportError) as excinfo:
245        _metadata.get(request, PATH)
246
247    assert excinfo.match(r"Metadata error")
248
249    request.assert_called_once_with(
250        method="GET",
251        url=_metadata._METADATA_ROOT + PATH,
252        headers=_metadata._METADATA_HEADERS,
253    )
254
255
256def test_get_failure_connection_failed():
257    request = make_request("")
258    request.side_effect = exceptions.TransportError()
259
260    with pytest.raises(exceptions.TransportError) as excinfo:
261        _metadata.get(request, PATH)
262
263    assert excinfo.match(r"Compute Engine Metadata server unavailable")
264
265    request.assert_called_with(
266        method="GET",
267        url=_metadata._METADATA_ROOT + PATH,
268        headers=_metadata._METADATA_HEADERS,
269    )
270    assert request.call_count == 5
271
272
273def test_get_failure_bad_json():
274    request = make_request("{", headers={"content-type": "application/json"})
275
276    with pytest.raises(exceptions.TransportError) as excinfo:
277        _metadata.get(request, PATH)
278
279    assert excinfo.match(r"invalid JSON")
280
281    request.assert_called_once_with(
282        method="GET",
283        url=_metadata._METADATA_ROOT + PATH,
284        headers=_metadata._METADATA_HEADERS,
285    )
286
287
288def test_get_project_id():
289    project = "example-project"
290    request = make_request(project, headers={"content-type": "text/plain"})
291
292    project_id = _metadata.get_project_id(request)
293
294    request.assert_called_once_with(
295        method="GET",
296        url=_metadata._METADATA_ROOT + "project/project-id",
297        headers=_metadata._METADATA_HEADERS,
298    )
299    assert project_id == project
300
301
302@mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
303def test_get_service_account_token(utcnow):
304    ttl = 500
305    request = make_request(
306        json.dumps({"access_token": "token", "expires_in": ttl}),
307        headers={"content-type": "application/json"},
308    )
309
310    token, expiry = _metadata.get_service_account_token(request)
311
312    request.assert_called_once_with(
313        method="GET",
314        url=_metadata._METADATA_ROOT + PATH + "/token",
315        headers=_metadata._METADATA_HEADERS,
316    )
317    assert token == "token"
318    assert expiry == utcnow() + datetime.timedelta(seconds=ttl)
319
320
321@mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
322def test_get_service_account_token_with_scopes_list(utcnow):
323    ttl = 500
324    request = make_request(
325        json.dumps({"access_token": "token", "expires_in": ttl}),
326        headers={"content-type": "application/json"},
327    )
328
329    token, expiry = _metadata.get_service_account_token(request, scopes=["foo", "bar"])
330
331    request.assert_called_once_with(
332        method="GET",
333        url=_metadata._METADATA_ROOT + PATH + "/token" + "?scopes=foo%2Cbar",
334        headers=_metadata._METADATA_HEADERS,
335    )
336    assert token == "token"
337    assert expiry == utcnow() + datetime.timedelta(seconds=ttl)
338
339
340@mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
341def test_get_service_account_token_with_scopes_string(utcnow):
342    ttl = 500
343    request = make_request(
344        json.dumps({"access_token": "token", "expires_in": ttl}),
345        headers={"content-type": "application/json"},
346    )
347
348    token, expiry = _metadata.get_service_account_token(request, scopes="foo,bar")
349
350    request.assert_called_once_with(
351        method="GET",
352        url=_metadata._METADATA_ROOT + PATH + "/token" + "?scopes=foo%2Cbar",
353        headers=_metadata._METADATA_HEADERS,
354    )
355    assert token == "token"
356    assert expiry == utcnow() + datetime.timedelta(seconds=ttl)
357
358
359def test_get_service_account_info():
360    key, value = "foo", "bar"
361    request = make_request(
362        json.dumps({key: value}), headers={"content-type": "application/json"}
363    )
364
365    info = _metadata.get_service_account_info(request)
366
367    request.assert_called_once_with(
368        method="GET",
369        url=_metadata._METADATA_ROOT + PATH + "/?recursive=true",
370        headers=_metadata._METADATA_HEADERS,
371    )
372
373    assert info[key] == value
374