1 # Copyright 2020 Google LLC 2 # 3 # Licensed under the Apache License, Version 2.0 (the "License"); 4 # you may not use this file except in compliance with the License. 5 # You may obtain a copy of the License at 6 # 7 # http://www.apache.org/licenses/LICENSE-2.0 8 # 9 # Unless required by applicable law or agreed to in writing, software 10 # distributed under the License is distributed on an "AS IS" BASIS, 11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 # See the License for the specific language governing permissions and 13 # limitations under the License. 14 15 import datetime 16 17 import pytest 18 19 from google.auth import _credentials_async as credentials 20 from google.auth import _helpers 21 22 23 class CredentialsImpl(credentials.Credentials): 24 def refresh(self, request): 25 self.token = request 26 27 def with_quota_project(self, quota_project_id): 28 raise NotImplementedError() 29 30 31 def test_credentials_constructor(): 32 credentials = CredentialsImpl() 33 assert not credentials.token 34 assert not credentials.expiry 35 assert not credentials.expired 36 assert not credentials.valid 37 38 39 def test_expired_and_valid(): 40 credentials = CredentialsImpl() 41 credentials.token = "token" 42 43 assert credentials.valid 44 assert not credentials.expired 45 46 # Set the expiration to one second more than now plus the clock skew 47 # accomodation. These credentials should be valid. 48 credentials.expiry = ( 49 datetime.datetime.utcnow() 50 + _helpers.REFRESH_THRESHOLD 51 + datetime.timedelta(seconds=1) 52 ) 53 54 assert credentials.valid 55 assert not credentials.expired 56 57 # Set the credentials expiration to now. Because of the clock skew 58 # accomodation, these credentials should report as expired. 59 credentials.expiry = datetime.datetime.utcnow() 60 61 assert not credentials.valid 62 assert credentials.expired 63 64 65 @pytest.mark.asyncio 66 async def test_before_request(): 67 credentials = CredentialsImpl() 68 request = "token" 69 headers = {} 70 71 # First call should call refresh, setting the token. 72 await credentials.before_request(request, "http://example.com", "GET", headers) 73 assert credentials.valid 74 assert credentials.token == "token" 75 assert headers["authorization"] == "Bearer token" 76 77 request = "token2" 78 headers = {} 79 80 # Second call shouldn't call refresh. 81 credentials.before_request(request, "http://example.com", "GET", headers) 82 83 assert credentials.valid 84 assert credentials.token == "token" 85 86 87 def test_anonymous_credentials_ctor(): 88 anon = credentials.AnonymousCredentials() 89 90 assert anon.token is None 91 assert anon.expiry is None 92 assert not anon.expired 93 assert anon.valid 94 95 96 def test_anonymous_credentials_refresh(): 97 anon = credentials.AnonymousCredentials() 98 99 request = object() 100 with pytest.raises(ValueError): 101 anon.refresh(request) 102 103 104 def test_anonymous_credentials_apply_default(): 105 anon = credentials.AnonymousCredentials() 106 headers = {} 107 anon.apply(headers) 108 assert headers == {} 109 with pytest.raises(ValueError): 110 anon.apply(headers, token="TOKEN") 111 112 113 def test_anonymous_credentials_before_request(): 114 anon = credentials.AnonymousCredentials() 115 request = object() 116 method = "GET" 117 url = "https://example.com/api/endpoint" 118 headers = {} 119 anon.before_request(request, method, url, headers) 120 assert headers == {} 121 122 123 class ReadOnlyScopedCredentialsImpl(credentials.ReadOnlyScoped, CredentialsImpl): 124 @property 125 def requires_scopes(self): 126 return super(ReadOnlyScopedCredentialsImpl, self).requires_scopes 127 128 129 def test_readonly_scoped_credentials_constructor(): 130 credentials = ReadOnlyScopedCredentialsImpl() 131 assert credentials._scopes is None 132 133 134 def test_readonly_scoped_credentials_scopes(): 135 credentials = ReadOnlyScopedCredentialsImpl() 136 credentials._scopes = ["one", "two"] 137 assert credentials.scopes == ["one", "two"] 138 assert credentials.has_scopes(["one"]) 139 assert credentials.has_scopes(["two"]) 140 assert credentials.has_scopes(["one", "two"]) 141 assert not credentials.has_scopes(["three"]) 142 143 144 def test_readonly_scoped_credentials_requires_scopes(): 145 credentials = ReadOnlyScopedCredentialsImpl() 146 assert not credentials.requires_scopes 147 148 149 class RequiresScopedCredentialsImpl(credentials.Scoped, CredentialsImpl): 150 def __init__(self, scopes=None): 151 super(RequiresScopedCredentialsImpl, self).__init__() 152 self._scopes = scopes 153 154 @property 155 def requires_scopes(self): 156 return not self.scopes 157 158 def with_scopes(self, scopes): 159 return RequiresScopedCredentialsImpl(scopes=scopes) 160 161 162 def test_create_scoped_if_required_scoped(): 163 unscoped_credentials = RequiresScopedCredentialsImpl() 164 scoped_credentials = credentials.with_scopes_if_required( 165 unscoped_credentials, ["one", "two"] 166 ) 167 168 assert scoped_credentials is not unscoped_credentials 169 assert not scoped_credentials.requires_scopes 170 assert scoped_credentials.has_scopes(["one", "two"]) 171 172 173 def test_create_scoped_if_required_not_scopes(): 174 unscoped_credentials = CredentialsImpl() 175 scoped_credentials = credentials.with_scopes_if_required( 176 unscoped_credentials, ["one", "two"] 177 ) 178 179 assert scoped_credentials is unscoped_credentials 180