1# Copyright 2020 Google Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import os
16
17import mock
18import pytest
19
20from google.auth import environment_vars
21from google.auth import exceptions
22import google.auth.compute_engine._metadata
23from google.oauth2 import _id_token_async as id_token
24from google.oauth2 import _service_account_async
25from google.oauth2 import id_token as sync_id_token
26from tests.oauth2 import test_id_token
27
28
29def make_request(status, data=None):
30    response = mock.AsyncMock(spec=["transport.Response"])
31    response.status = status
32
33    if data is not None:
34        response.data = mock.AsyncMock(spec=["__call__", "read"])
35        response.data.read = mock.AsyncMock(spec=["__call__"], return_value=data)
36
37    request = mock.AsyncMock(spec=["transport.Request"])
38    request.return_value = response
39    return request
40
41
42@pytest.mark.asyncio
43async def test__fetch_certs_success():
44    certs = {"1": "cert"}
45    request = make_request(200, certs)
46
47    returned_certs = await id_token._fetch_certs(request, mock.sentinel.cert_url)
48
49    request.assert_called_once_with(mock.sentinel.cert_url, method="GET")
50    assert returned_certs == certs
51
52
53@pytest.mark.asyncio
54async def test__fetch_certs_failure():
55    request = make_request(404)
56
57    with pytest.raises(exceptions.TransportError):
58        await id_token._fetch_certs(request, mock.sentinel.cert_url)
59
60    request.assert_called_once_with(mock.sentinel.cert_url, method="GET")
61
62
63@mock.patch("google.auth.jwt.decode", autospec=True)
64@mock.patch("google.oauth2._id_token_async._fetch_certs", autospec=True)
65@pytest.mark.asyncio
66async def test_verify_token(_fetch_certs, decode):
67    result = await id_token.verify_token(mock.sentinel.token, mock.sentinel.request)
68
69    assert result == decode.return_value
70    _fetch_certs.assert_called_once_with(
71        mock.sentinel.request, sync_id_token._GOOGLE_OAUTH2_CERTS_URL
72    )
73    decode.assert_called_once_with(
74        mock.sentinel.token,
75        certs=_fetch_certs.return_value,
76        audience=None,
77        clock_skew_in_seconds=0,
78    )
79
80
81@mock.patch("google.auth.jwt.decode", autospec=True)
82@mock.patch("google.oauth2._id_token_async._fetch_certs", autospec=True)
83@pytest.mark.asyncio
84async def test_verify_token_clock_skew(_fetch_certs, decode):
85    result = await id_token.verify_token(
86        mock.sentinel.token, mock.sentinel.request, clock_skew_in_seconds=10
87    )
88
89    assert result == decode.return_value
90    _fetch_certs.assert_called_once_with(
91        mock.sentinel.request, sync_id_token._GOOGLE_OAUTH2_CERTS_URL
92    )
93    decode.assert_called_once_with(
94        mock.sentinel.token,
95        certs=_fetch_certs.return_value,
96        audience=None,
97        clock_skew_in_seconds=10,
98    )
99
100
101@mock.patch("google.auth.jwt.decode", autospec=True)
102@mock.patch("google.oauth2._id_token_async._fetch_certs", autospec=True)
103@pytest.mark.asyncio
104async def test_verify_token_args(_fetch_certs, decode):
105    result = await id_token.verify_token(
106        mock.sentinel.token,
107        mock.sentinel.request,
108        audience=mock.sentinel.audience,
109        certs_url=mock.sentinel.certs_url,
110    )
111
112    assert result == decode.return_value
113    _fetch_certs.assert_called_once_with(mock.sentinel.request, mock.sentinel.certs_url)
114    decode.assert_called_once_with(
115        mock.sentinel.token,
116        certs=_fetch_certs.return_value,
117        audience=mock.sentinel.audience,
118        clock_skew_in_seconds=0,
119    )
120
121
122@mock.patch("google.oauth2._id_token_async.verify_token", autospec=True)
123@pytest.mark.asyncio
124async def test_verify_oauth2_token(verify_token):
125    verify_token.return_value = {"iss": "accounts.google.com"}
126    result = await id_token.verify_oauth2_token(
127        mock.sentinel.token, mock.sentinel.request, audience=mock.sentinel.audience
128    )
129
130    assert result == verify_token.return_value
131    verify_token.assert_called_once_with(
132        mock.sentinel.token,
133        mock.sentinel.request,
134        audience=mock.sentinel.audience,
135        certs_url=sync_id_token._GOOGLE_OAUTH2_CERTS_URL,
136        clock_skew_in_seconds=0,
137    )
138
139
140@mock.patch("google.oauth2._id_token_async.verify_token", autospec=True)
141@pytest.mark.asyncio
142async def test_verify_oauth2_token_clock_skew(verify_token):
143    verify_token.return_value = {"iss": "accounts.google.com"}
144    result = await id_token.verify_oauth2_token(
145        mock.sentinel.token,
146        mock.sentinel.request,
147        audience=mock.sentinel.audience,
148        clock_skew_in_seconds=10,
149    )
150
151    assert result == verify_token.return_value
152    verify_token.assert_called_once_with(
153        mock.sentinel.token,
154        mock.sentinel.request,
155        audience=mock.sentinel.audience,
156        certs_url=sync_id_token._GOOGLE_OAUTH2_CERTS_URL,
157        clock_skew_in_seconds=10,
158    )
159
160
161@mock.patch("google.oauth2._id_token_async.verify_token", autospec=True)
162@pytest.mark.asyncio
163async def test_verify_oauth2_token_invalid_iss(verify_token):
164    verify_token.return_value = {"iss": "invalid_issuer"}
165
166    with pytest.raises(exceptions.GoogleAuthError):
167        await id_token.verify_oauth2_token(
168            mock.sentinel.token, mock.sentinel.request, audience=mock.sentinel.audience
169        )
170
171
172@mock.patch("google.oauth2._id_token_async.verify_token", autospec=True)
173@pytest.mark.asyncio
174async def test_verify_firebase_token(verify_token):
175    result = await id_token.verify_firebase_token(
176        mock.sentinel.token, mock.sentinel.request, audience=mock.sentinel.audience
177    )
178
179    assert result == verify_token.return_value
180    verify_token.assert_called_once_with(
181        mock.sentinel.token,
182        mock.sentinel.request,
183        audience=mock.sentinel.audience,
184        certs_url=sync_id_token._GOOGLE_APIS_CERTS_URL,
185        clock_skew_in_seconds=0,
186    )
187
188
189@mock.patch("google.oauth2._id_token_async.verify_token", autospec=True)
190@pytest.mark.asyncio
191async def test_verify_firebase_token_clock_skew(verify_token):
192    result = await id_token.verify_firebase_token(
193        mock.sentinel.token,
194        mock.sentinel.request,
195        audience=mock.sentinel.audience,
196        clock_skew_in_seconds=10,
197    )
198
199    assert result == verify_token.return_value
200    verify_token.assert_called_once_with(
201        mock.sentinel.token,
202        mock.sentinel.request,
203        audience=mock.sentinel.audience,
204        certs_url=sync_id_token._GOOGLE_APIS_CERTS_URL,
205        clock_skew_in_seconds=10,
206    )
207
208
209@pytest.mark.asyncio
210async def test_fetch_id_token_from_metadata_server(monkeypatch):
211    monkeypatch.delenv(environment_vars.CREDENTIALS, raising=False)
212
213    def mock_init(self, request, audience, use_metadata_identity_endpoint):
214        assert use_metadata_identity_endpoint
215        self.token = "id_token"
216
217    with mock.patch("google.auth.compute_engine._metadata.ping", return_value=True):
218        with mock.patch.multiple(
219            google.auth.compute_engine.IDTokenCredentials,
220            __init__=mock_init,
221            refresh=mock.Mock(),
222        ):
223            request = mock.AsyncMock()
224            token = await id_token.fetch_id_token(
225                request, "https://pubsub.googleapis.com"
226            )
227            assert token == "id_token"
228
229
230@pytest.mark.asyncio
231async def test_fetch_id_token_from_explicit_cred_json_file(monkeypatch):
232    monkeypatch.setenv(environment_vars.CREDENTIALS, test_id_token.SERVICE_ACCOUNT_FILE)
233
234    async def mock_refresh(self, request):
235        self.token = "id_token"
236
237    with mock.patch.object(
238        _service_account_async.IDTokenCredentials, "refresh", mock_refresh
239    ):
240        request = mock.AsyncMock()
241        token = await id_token.fetch_id_token(request, "https://pubsub.googleapis.com")
242        assert token == "id_token"
243
244
245@pytest.mark.asyncio
246async def test_fetch_id_token_no_cred_exists(monkeypatch):
247    monkeypatch.delenv(environment_vars.CREDENTIALS, raising=False)
248
249    with mock.patch(
250        "google.auth.compute_engine._metadata.ping",
251        side_effect=exceptions.TransportError(),
252    ):
253        with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
254            request = mock.AsyncMock()
255            await id_token.fetch_id_token(request, "https://pubsub.googleapis.com")
256        assert excinfo.match(
257            r"Neither metadata server or valid service account credentials are found."
258        )
259
260    with mock.patch("google.auth.compute_engine._metadata.ping", return_value=False):
261        with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
262            request = mock.AsyncMock()
263            await id_token.fetch_id_token(request, "https://pubsub.googleapis.com")
264        assert excinfo.match(
265            r"Neither metadata server or valid service account credentials are found."
266        )
267
268
269@pytest.mark.asyncio
270async def test_fetch_id_token_invalid_cred_file(monkeypatch):
271    not_json_file = os.path.join(
272        os.path.dirname(__file__), "../../tests/data/public_cert.pem"
273    )
274    monkeypatch.setenv(environment_vars.CREDENTIALS, not_json_file)
275
276    with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
277        request = mock.AsyncMock()
278        await id_token.fetch_id_token(request, "https://pubsub.googleapis.com")
279    assert excinfo.match(
280        r"GOOGLE_APPLICATION_CREDENTIALS is not valid service account credentials."
281    )
282
283
284@pytest.mark.asyncio
285async def test_fetch_id_token_invalid_cred_type(monkeypatch):
286    user_credentials_file = os.path.join(
287        os.path.dirname(__file__), "../../tests/data/authorized_user.json"
288    )
289    monkeypatch.setenv(environment_vars.CREDENTIALS, user_credentials_file)
290
291    with mock.patch("google.auth.compute_engine._metadata.ping", return_value=False):
292        with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
293            request = mock.AsyncMock()
294            await id_token.fetch_id_token(request, "https://pubsub.googleapis.com")
295        assert excinfo.match(
296            r"Neither metadata server or valid service account credentials are found."
297        )
298
299
300@pytest.mark.asyncio
301async def test_fetch_id_token_invalid_cred_path(monkeypatch):
302    not_json_file = os.path.join(
303        os.path.dirname(__file__), "../../tests/data/not_exists.json"
304    )
305    monkeypatch.setenv(environment_vars.CREDENTIALS, not_json_file)
306
307    with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
308        request = mock.AsyncMock()
309        await id_token.fetch_id_token(request, "https://pubsub.googleapis.com")
310    assert excinfo.match(
311        r"GOOGLE_APPLICATION_CREDENTIALS path is either not found or invalid."
312    )
313