1# Copyright 2021 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import re 16import uuid 17 18import google.auth 19 20from google.auth import downscoped 21from google.auth.transport import requests 22from google.cloud import exceptions 23from google.cloud import storage 24from google.oauth2 import credentials 25 26import pytest 27 28 # The object prefix used to test access to files beginning with this prefix. 29_OBJECT_PREFIX = "customer-a" 30# The object name of the object inaccessible by the downscoped token. 31_ACCESSIBLE_OBJECT_NAME = "{0}-data.txt".format(_OBJECT_PREFIX) 32# The content of the object accessible by the downscoped token. 33_ACCESSIBLE_CONTENT = "hello world" 34# The content of the object inaccessible by the downscoped token. 35_INACCESSIBLE_CONTENT = "secret content" 36# The object name of the object inaccessible by the downscoped token. 37_INACCESSIBLE_OBJECT_NAME = "other-customer-data.txt" 38 39 40@pytest.fixture(scope="module") 41def temp_bucket(): 42 """Yields a bucket that is deleted after the test completes.""" 43 bucket = None 44 while bucket is None or bucket.exists(): 45 bucket_name = "auth-python-downscope-test-{}".format(uuid.uuid4()) 46 bucket = storage.Client().bucket(bucket_name) 47 bucket = storage.Client().create_bucket(bucket.name) 48 yield bucket 49 bucket.delete(force=True) 50 51 52@pytest.fixture(scope="module") 53def temp_blobs(temp_bucket): 54 """Yields two blobs that are deleted after the test completes.""" 55 bucket = temp_bucket 56 # Downscoped tokens will have readonly access to this blob. 57 accessible_blob = bucket.blob(_ACCESSIBLE_OBJECT_NAME) 58 accessible_blob.upload_from_string(_ACCESSIBLE_CONTENT) 59 # Downscoped tokens will have no access to this blob. 60 inaccessible_blob = bucket.blob(_INACCESSIBLE_OBJECT_NAME) 61 inaccessible_blob.upload_from_string(_INACCESSIBLE_CONTENT) 62 yield (accessible_blob, inaccessible_blob) 63 bucket.delete_blobs([accessible_blob, inaccessible_blob]) 64 65 66def get_token_from_broker(bucket_name, object_prefix): 67 """Simulates token broker generating downscoped tokens for specified bucket. 68 69 Args: 70 bucket_name (str): The name of the Cloud Storage bucket. 71 object_prefix (str): The prefix string of the object name. This is used 72 to ensure access is restricted to only objects starting with this 73 prefix string. 74 75 Returns: 76 Tuple[str, datetime.datetime]: The downscoped access token and its expiry date. 77 """ 78 # Initialize the Credential Access Boundary rules. 79 available_resource = "//storage.googleapis.com/projects/_/buckets/{0}".format(bucket_name) 80 # Downscoped credentials will have readonly access to the resource. 81 available_permissions = ["inRole:roles/storage.objectViewer"] 82 # Only objects starting with the specified prefix string in the object name 83 # will be allowed read access. 84 availability_expression = ( 85 "resource.name.startsWith('projects/_/buckets/{0}/objects/{1}')".format(bucket_name, object_prefix) 86 ) 87 availability_condition = downscoped.AvailabilityCondition(availability_expression) 88 # Define the single access boundary rule using the above properties. 89 rule = downscoped.AccessBoundaryRule( 90 available_resource=available_resource, 91 available_permissions=available_permissions, 92 availability_condition=availability_condition, 93 ) 94 # Define the Credential Access Boundary with all the relevant rules. 95 credential_access_boundary = downscoped.CredentialAccessBoundary(rules=[rule]) 96 97 # Retrieve the source credentials via ADC. 98 source_credentials, _ = google.auth.default() 99 if source_credentials.requires_scopes: 100 source_credentials = source_credentials.with_scopes( 101 ["https://www.googleapis.com/auth/cloud-platform"] 102 ) 103 104 # Create the downscoped credentials. 105 downscoped_credentials = downscoped.Credentials( 106 source_credentials=source_credentials, 107 credential_access_boundary=credential_access_boundary, 108 ) 109 110 # Refresh the tokens. 111 downscoped_credentials.refresh(requests.Request()) 112 113 # These values will need to be passed to the token consumer. 114 access_token = downscoped_credentials.token 115 expiry = downscoped_credentials.expiry 116 return (access_token, expiry) 117 118 119def test_downscoping(temp_blobs): 120 """Tests token consumer access to cloud storage using downscoped tokens. 121 122 Args: 123 temp_blobs (Tuple[google.cloud.storage.blob.Blob, ...]): The temporarily 124 created test cloud storage blobs (one readonly accessible, the other 125 not). 126 """ 127 accessible_blob, inaccessible_blob = temp_blobs 128 bucket_name = accessible_blob.bucket.name 129 # Create the OAuth credentials from the downscoped token and pass a 130 # refresh handler to handle token expiration. We are passing a 131 # refresh_handler instead of a one-time access token/expiry pair. 132 # This will allow testing this on-demand method for getting access tokens. 133 def refresh_handler(request, scopes=None): 134 # Get readonly access tokens to objects with accessible prefix in 135 # the temporarily created bucket. 136 return get_token_from_broker(bucket_name, _OBJECT_PREFIX) 137 138 creds = credentials.Credentials( 139 None, 140 scopes=["https://www.googleapis.com/auth/cloud-platform"], 141 refresh_handler=refresh_handler, 142 ) 143 144 # Initialize a Cloud Storage client with the oauth2 credentials. 145 storage_client = storage.Client(credentials=creds) 146 147 # Test read access succeeds to accessible blob. 148 bucket = storage_client.bucket(bucket_name) 149 blob = bucket.blob(accessible_blob.name) 150 assert blob.download_as_bytes().decode("utf-8") == _ACCESSIBLE_CONTENT 151 152 # Test write access fails. 153 with pytest.raises(exceptions.Forbidden) as excinfo: 154 blob.upload_from_string("Write operations are not allowed") 155 156 assert excinfo.match(r"does not have storage.objects.create access") 157 158 # Test read access fails to inaccessible blob. 159 with pytest.raises(exceptions.Forbidden) as excinfo: 160 bucket.blob(inaccessible_blob.name).download_as_bytes() 161 162 assert excinfo.match(r"does not have storage.objects.get access") 163