1# Copyright 2016 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from datetime import datetime
16
17import pytest
18
19import google.auth
20from google.auth import compute_engine
21from google.auth import _helpers
22from google.auth import exceptions
23from google.auth import jwt
24from google.auth.compute_engine import _metadata
25import google.oauth2.id_token
26
27AUDIENCE = "https://pubsub.googleapis.com"
28
29
30@pytest.fixture(autouse=True)
31def check_gce_environment(http_request):
32    try:
33        _metadata.get_service_account_info(http_request)
34    except exceptions.TransportError:
35        pytest.skip("Compute Engine metadata service is not available.")
36
37
38def test_refresh(http_request, token_info):
39    credentials = compute_engine.Credentials()
40
41    credentials.refresh(http_request)
42
43    assert credentials.token is not None
44    assert credentials.service_account_email is not None
45
46    info = token_info(credentials.token)
47    info_scopes = _helpers.string_to_scopes(info["scope"])
48    assert set(info_scopes) == set(credentials.scopes)
49
50
51def test_default(verify_refresh):
52    credentials, project_id = google.auth.default()
53
54    assert project_id is not None
55    assert isinstance(credentials, compute_engine.Credentials)
56    verify_refresh(credentials)
57
58
59def test_id_token_from_metadata(http_request):
60    credentials = compute_engine.IDTokenCredentials(
61        http_request, AUDIENCE, use_metadata_identity_endpoint=True
62    )
63    credentials.refresh(http_request)
64
65    _, payload, _, _ = jwt._unverified_decode(credentials.token)
66    assert credentials.valid
67    assert payload["aud"] == AUDIENCE
68    assert datetime.fromtimestamp(payload["exp"]) == credentials.expiry
69
70
71def test_fetch_id_token(http_request):
72    token = google.oauth2.id_token.fetch_id_token(http_request, AUDIENCE)
73
74    _, payload, _, _ = jwt._unverified_decode(token)
75    assert payload["aud"] == AUDIENCE
76