1 # Copyright 2020 Google LLC
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 
15 import datetime
16 
17 import pytest
18 
19 from google.auth import _credentials_async as credentials
20 from google.auth import _helpers
21 
22 
23 class CredentialsImpl(credentials.Credentials):
24     def refresh(self, request):
25         self.token = request
26 
27     def with_quota_project(self, quota_project_id):
28         raise NotImplementedError()
29 
30 
31 def test_credentials_constructor():
32     credentials = CredentialsImpl()
33     assert not credentials.token
34     assert not credentials.expiry
35     assert not credentials.expired
36     assert not credentials.valid
37 
38 
39 def test_expired_and_valid():
40     credentials = CredentialsImpl()
41     credentials.token = "token"
42 
43     assert credentials.valid
44     assert not credentials.expired
45 
46     # Set the expiration to one second more than now plus the clock skew
47     # accomodation. These credentials should be valid.
48     credentials.expiry = (
49         datetime.datetime.utcnow()
50         + _helpers.REFRESH_THRESHOLD
51         + datetime.timedelta(seconds=1)
52     )
53 
54     assert credentials.valid
55     assert not credentials.expired
56 
57     # Set the credentials expiration to now. Because of the clock skew
58     # accomodation, these credentials should report as expired.
59     credentials.expiry = datetime.datetime.utcnow()
60 
61     assert not credentials.valid
62     assert credentials.expired
63 
64 
65 @pytest.mark.asyncio
66 async def test_before_request():
67     credentials = CredentialsImpl()
68     request = "token"
69     headers = {}
70 
71     # First call should call refresh, setting the token.
72     await credentials.before_request(request, "http://example.com", "GET", headers)
73     assert credentials.valid
74     assert credentials.token == "token"
75     assert headers["authorization"] == "Bearer token"
76 
77     request = "token2"
78     headers = {}
79 
80     # Second call shouldn't call refresh.
81     credentials.before_request(request, "http://example.com", "GET", headers)
82 
83     assert credentials.valid
84     assert credentials.token == "token"
85 
86 
87 def test_anonymous_credentials_ctor():
88     anon = credentials.AnonymousCredentials()
89 
90     assert anon.token is None
91     assert anon.expiry is None
92     assert not anon.expired
93     assert anon.valid
94 
95 
96 def test_anonymous_credentials_refresh():
97     anon = credentials.AnonymousCredentials()
98 
99     request = object()
100     with pytest.raises(ValueError):
101         anon.refresh(request)
102 
103 
104 def test_anonymous_credentials_apply_default():
105     anon = credentials.AnonymousCredentials()
106     headers = {}
107     anon.apply(headers)
108     assert headers == {}
109     with pytest.raises(ValueError):
110         anon.apply(headers, token="TOKEN")
111 
112 
113 def test_anonymous_credentials_before_request():
114     anon = credentials.AnonymousCredentials()
115     request = object()
116     method = "GET"
117     url = "https://example.com/api/endpoint"
118     headers = {}
119     anon.before_request(request, method, url, headers)
120     assert headers == {}
121 
122 
123 class ReadOnlyScopedCredentialsImpl(credentials.ReadOnlyScoped, CredentialsImpl):
124     @property
125     def requires_scopes(self):
126         return super(ReadOnlyScopedCredentialsImpl, self).requires_scopes
127 
128 
129 def test_readonly_scoped_credentials_constructor():
130     credentials = ReadOnlyScopedCredentialsImpl()
131     assert credentials._scopes is None
132 
133 
134 def test_readonly_scoped_credentials_scopes():
135     credentials = ReadOnlyScopedCredentialsImpl()
136     credentials._scopes = ["one", "two"]
137     assert credentials.scopes == ["one", "two"]
138     assert credentials.has_scopes(["one"])
139     assert credentials.has_scopes(["two"])
140     assert credentials.has_scopes(["one", "two"])
141     assert not credentials.has_scopes(["three"])
142 
143 
144 def test_readonly_scoped_credentials_requires_scopes():
145     credentials = ReadOnlyScopedCredentialsImpl()
146     assert not credentials.requires_scopes
147 
148 
149 class RequiresScopedCredentialsImpl(credentials.Scoped, CredentialsImpl):
150     def __init__(self, scopes=None):
151         super(RequiresScopedCredentialsImpl, self).__init__()
152         self._scopes = scopes
153 
154     @property
155     def requires_scopes(self):
156         return not self.scopes
157 
158     def with_scopes(self, scopes):
159         return RequiresScopedCredentialsImpl(scopes=scopes)
160 
161 
162 def test_create_scoped_if_required_scoped():
163     unscoped_credentials = RequiresScopedCredentialsImpl()
164     scoped_credentials = credentials.with_scopes_if_required(
165         unscoped_credentials, ["one", "two"]
166     )
167 
168     assert scoped_credentials is not unscoped_credentials
169     assert not scoped_credentials.requires_scopes
170     assert scoped_credentials.has_scopes(["one", "two"])
171 
172 
173 def test_create_scoped_if_required_not_scopes():
174     unscoped_credentials = CredentialsImpl()
175     scoped_credentials = credentials.with_scopes_if_required(
176         unscoped_credentials, ["one", "two"]
177     )
178 
179     assert scoped_credentials is unscoped_credentials
180