1# Copyright 2020 Google Inc. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import os 16 17import mock 18import pytest 19 20from google.auth import environment_vars 21from google.auth import exceptions 22import google.auth.compute_engine._metadata 23from google.oauth2 import _id_token_async as id_token 24from google.oauth2 import _service_account_async 25from google.oauth2 import id_token as sync_id_token 26from tests.oauth2 import test_id_token 27 28 29def make_request(status, data=None): 30 response = mock.AsyncMock(spec=["transport.Response"]) 31 response.status = status 32 33 if data is not None: 34 response.data = mock.AsyncMock(spec=["__call__", "read"]) 35 response.data.read = mock.AsyncMock(spec=["__call__"], return_value=data) 36 37 request = mock.AsyncMock(spec=["transport.Request"]) 38 request.return_value = response 39 return request 40 41 42@pytest.mark.asyncio 43async def test__fetch_certs_success(): 44 certs = {"1": "cert"} 45 request = make_request(200, certs) 46 47 returned_certs = await id_token._fetch_certs(request, mock.sentinel.cert_url) 48 49 request.assert_called_once_with(mock.sentinel.cert_url, method="GET") 50 assert returned_certs == certs 51 52 53@pytest.mark.asyncio 54async def test__fetch_certs_failure(): 55 request = make_request(404) 56 57 with pytest.raises(exceptions.TransportError): 58 await id_token._fetch_certs(request, mock.sentinel.cert_url) 59 60 request.assert_called_once_with(mock.sentinel.cert_url, method="GET") 61 62 63@mock.patch("google.auth.jwt.decode", autospec=True) 64@mock.patch("google.oauth2._id_token_async._fetch_certs", autospec=True) 65@pytest.mark.asyncio 66async def test_verify_token(_fetch_certs, decode): 67 result = await id_token.verify_token(mock.sentinel.token, mock.sentinel.request) 68 69 assert result == decode.return_value 70 _fetch_certs.assert_called_once_with( 71 mock.sentinel.request, sync_id_token._GOOGLE_OAUTH2_CERTS_URL 72 ) 73 decode.assert_called_once_with( 74 mock.sentinel.token, 75 certs=_fetch_certs.return_value, 76 audience=None, 77 clock_skew_in_seconds=0, 78 ) 79 80 81@mock.patch("google.auth.jwt.decode", autospec=True) 82@mock.patch("google.oauth2._id_token_async._fetch_certs", autospec=True) 83@pytest.mark.asyncio 84async def test_verify_token_clock_skew(_fetch_certs, decode): 85 result = await id_token.verify_token( 86 mock.sentinel.token, mock.sentinel.request, clock_skew_in_seconds=10 87 ) 88 89 assert result == decode.return_value 90 _fetch_certs.assert_called_once_with( 91 mock.sentinel.request, sync_id_token._GOOGLE_OAUTH2_CERTS_URL 92 ) 93 decode.assert_called_once_with( 94 mock.sentinel.token, 95 certs=_fetch_certs.return_value, 96 audience=None, 97 clock_skew_in_seconds=10, 98 ) 99 100 101@mock.patch("google.auth.jwt.decode", autospec=True) 102@mock.patch("google.oauth2._id_token_async._fetch_certs", autospec=True) 103@pytest.mark.asyncio 104async def test_verify_token_args(_fetch_certs, decode): 105 result = await id_token.verify_token( 106 mock.sentinel.token, 107 mock.sentinel.request, 108 audience=mock.sentinel.audience, 109 certs_url=mock.sentinel.certs_url, 110 ) 111 112 assert result == decode.return_value 113 _fetch_certs.assert_called_once_with(mock.sentinel.request, mock.sentinel.certs_url) 114 decode.assert_called_once_with( 115 mock.sentinel.token, 116 certs=_fetch_certs.return_value, 117 audience=mock.sentinel.audience, 118 clock_skew_in_seconds=0, 119 ) 120 121 122@mock.patch("google.oauth2._id_token_async.verify_token", autospec=True) 123@pytest.mark.asyncio 124async def test_verify_oauth2_token(verify_token): 125 verify_token.return_value = {"iss": "accounts.google.com"} 126 result = await id_token.verify_oauth2_token( 127 mock.sentinel.token, mock.sentinel.request, audience=mock.sentinel.audience 128 ) 129 130 assert result == verify_token.return_value 131 verify_token.assert_called_once_with( 132 mock.sentinel.token, 133 mock.sentinel.request, 134 audience=mock.sentinel.audience, 135 certs_url=sync_id_token._GOOGLE_OAUTH2_CERTS_URL, 136 clock_skew_in_seconds=0, 137 ) 138 139 140@mock.patch("google.oauth2._id_token_async.verify_token", autospec=True) 141@pytest.mark.asyncio 142async def test_verify_oauth2_token_clock_skew(verify_token): 143 verify_token.return_value = {"iss": "accounts.google.com"} 144 result = await id_token.verify_oauth2_token( 145 mock.sentinel.token, 146 mock.sentinel.request, 147 audience=mock.sentinel.audience, 148 clock_skew_in_seconds=10, 149 ) 150 151 assert result == verify_token.return_value 152 verify_token.assert_called_once_with( 153 mock.sentinel.token, 154 mock.sentinel.request, 155 audience=mock.sentinel.audience, 156 certs_url=sync_id_token._GOOGLE_OAUTH2_CERTS_URL, 157 clock_skew_in_seconds=10, 158 ) 159 160 161@mock.patch("google.oauth2._id_token_async.verify_token", autospec=True) 162@pytest.mark.asyncio 163async def test_verify_oauth2_token_invalid_iss(verify_token): 164 verify_token.return_value = {"iss": "invalid_issuer"} 165 166 with pytest.raises(exceptions.GoogleAuthError): 167 await id_token.verify_oauth2_token( 168 mock.sentinel.token, mock.sentinel.request, audience=mock.sentinel.audience 169 ) 170 171 172@mock.patch("google.oauth2._id_token_async.verify_token", autospec=True) 173@pytest.mark.asyncio 174async def test_verify_firebase_token(verify_token): 175 result = await id_token.verify_firebase_token( 176 mock.sentinel.token, mock.sentinel.request, audience=mock.sentinel.audience 177 ) 178 179 assert result == verify_token.return_value 180 verify_token.assert_called_once_with( 181 mock.sentinel.token, 182 mock.sentinel.request, 183 audience=mock.sentinel.audience, 184 certs_url=sync_id_token._GOOGLE_APIS_CERTS_URL, 185 clock_skew_in_seconds=0, 186 ) 187 188 189@mock.patch("google.oauth2._id_token_async.verify_token", autospec=True) 190@pytest.mark.asyncio 191async def test_verify_firebase_token_clock_skew(verify_token): 192 result = await id_token.verify_firebase_token( 193 mock.sentinel.token, 194 mock.sentinel.request, 195 audience=mock.sentinel.audience, 196 clock_skew_in_seconds=10, 197 ) 198 199 assert result == verify_token.return_value 200 verify_token.assert_called_once_with( 201 mock.sentinel.token, 202 mock.sentinel.request, 203 audience=mock.sentinel.audience, 204 certs_url=sync_id_token._GOOGLE_APIS_CERTS_URL, 205 clock_skew_in_seconds=10, 206 ) 207 208 209@pytest.mark.asyncio 210async def test_fetch_id_token_from_metadata_server(monkeypatch): 211 monkeypatch.delenv(environment_vars.CREDENTIALS, raising=False) 212 213 def mock_init(self, request, audience, use_metadata_identity_endpoint): 214 assert use_metadata_identity_endpoint 215 self.token = "id_token" 216 217 with mock.patch("google.auth.compute_engine._metadata.ping", return_value=True): 218 with mock.patch.multiple( 219 google.auth.compute_engine.IDTokenCredentials, 220 __init__=mock_init, 221 refresh=mock.Mock(), 222 ): 223 request = mock.AsyncMock() 224 token = await id_token.fetch_id_token( 225 request, "https://pubsub.googleapis.com" 226 ) 227 assert token == "id_token" 228 229 230@pytest.mark.asyncio 231async def test_fetch_id_token_from_explicit_cred_json_file(monkeypatch): 232 monkeypatch.setenv(environment_vars.CREDENTIALS, test_id_token.SERVICE_ACCOUNT_FILE) 233 234 async def mock_refresh(self, request): 235 self.token = "id_token" 236 237 with mock.patch.object( 238 _service_account_async.IDTokenCredentials, "refresh", mock_refresh 239 ): 240 request = mock.AsyncMock() 241 token = await id_token.fetch_id_token(request, "https://pubsub.googleapis.com") 242 assert token == "id_token" 243 244 245@pytest.mark.asyncio 246async def test_fetch_id_token_no_cred_exists(monkeypatch): 247 monkeypatch.delenv(environment_vars.CREDENTIALS, raising=False) 248 249 with mock.patch( 250 "google.auth.compute_engine._metadata.ping", 251 side_effect=exceptions.TransportError(), 252 ): 253 with pytest.raises(exceptions.DefaultCredentialsError) as excinfo: 254 request = mock.AsyncMock() 255 await id_token.fetch_id_token(request, "https://pubsub.googleapis.com") 256 assert excinfo.match( 257 r"Neither metadata server or valid service account credentials are found." 258 ) 259 260 with mock.patch("google.auth.compute_engine._metadata.ping", return_value=False): 261 with pytest.raises(exceptions.DefaultCredentialsError) as excinfo: 262 request = mock.AsyncMock() 263 await id_token.fetch_id_token(request, "https://pubsub.googleapis.com") 264 assert excinfo.match( 265 r"Neither metadata server or valid service account credentials are found." 266 ) 267 268 269@pytest.mark.asyncio 270async def test_fetch_id_token_invalid_cred_file(monkeypatch): 271 not_json_file = os.path.join( 272 os.path.dirname(__file__), "../../tests/data/public_cert.pem" 273 ) 274 monkeypatch.setenv(environment_vars.CREDENTIALS, not_json_file) 275 276 with pytest.raises(exceptions.DefaultCredentialsError) as excinfo: 277 request = mock.AsyncMock() 278 await id_token.fetch_id_token(request, "https://pubsub.googleapis.com") 279 assert excinfo.match( 280 r"GOOGLE_APPLICATION_CREDENTIALS is not valid service account credentials." 281 ) 282 283 284@pytest.mark.asyncio 285async def test_fetch_id_token_invalid_cred_type(monkeypatch): 286 user_credentials_file = os.path.join( 287 os.path.dirname(__file__), "../../tests/data/authorized_user.json" 288 ) 289 monkeypatch.setenv(environment_vars.CREDENTIALS, user_credentials_file) 290 291 with mock.patch("google.auth.compute_engine._metadata.ping", return_value=False): 292 with pytest.raises(exceptions.DefaultCredentialsError) as excinfo: 293 request = mock.AsyncMock() 294 await id_token.fetch_id_token(request, "https://pubsub.googleapis.com") 295 assert excinfo.match( 296 r"Neither metadata server or valid service account credentials are found." 297 ) 298 299 300@pytest.mark.asyncio 301async def test_fetch_id_token_invalid_cred_path(monkeypatch): 302 not_json_file = os.path.join( 303 os.path.dirname(__file__), "../../tests/data/not_exists.json" 304 ) 305 monkeypatch.setenv(environment_vars.CREDENTIALS, not_json_file) 306 307 with pytest.raises(exceptions.DefaultCredentialsError) as excinfo: 308 request = mock.AsyncMock() 309 await id_token.fetch_id_token(request, "https://pubsub.googleapis.com") 310 assert excinfo.match( 311 r"GOOGLE_APPLICATION_CREDENTIALS path is either not found or invalid." 312 ) 313