1# Copyright 2020 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""External Account Credentials. 16 17This module provides credentials that exchange workload identity pool external 18credentials for Google access tokens. This facilitates accessing Google Cloud 19Platform resources from on-prem and non-Google Cloud platforms (e.g. AWS, 20Microsoft Azure, OIDC identity providers), using native credentials retrieved 21from the current environment without the need to copy, save and manage 22long-lived service account credentials. 23 24Specifically, this is intended to use access tokens acquired using the GCP STS 25token exchange endpoint following the `OAuth 2.0 Token Exchange`_ spec. 26 27.. _OAuth 2.0 Token Exchange: https://tools.ietf.org/html/rfc8693 28""" 29 30import abc 31import copy 32import datetime 33import json 34import re 35 36import six 37 38from google.auth import _helpers 39from google.auth import credentials 40from google.auth import exceptions 41from google.auth import impersonated_credentials 42from google.oauth2 import sts 43from google.oauth2 import utils 44 45# External account JSON type identifier. 46_EXTERNAL_ACCOUNT_JSON_TYPE = "external_account" 47# The token exchange grant_type used for exchanging credentials. 48_STS_GRANT_TYPE = "urn:ietf:params:oauth:grant-type:token-exchange" 49# The token exchange requested_token_type. This is always an access_token. 50_STS_REQUESTED_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:access_token" 51# Cloud resource manager URL used to retrieve project information. 52_CLOUD_RESOURCE_MANAGER = "https://cloudresourcemanager.googleapis.com/v1/projects/" 53 54 55@six.add_metaclass(abc.ABCMeta) 56class Credentials(credentials.Scoped, credentials.CredentialsWithQuotaProject): 57 """Base class for all external account credentials. 58 59 This is used to instantiate Credentials for exchanging external account 60 credentials for Google access token and authorizing requests to Google APIs. 61 The base class implements the common logic for exchanging external account 62 credentials for Google access tokens. 63 """ 64 65 def __init__( 66 self, 67 audience, 68 subject_token_type, 69 token_url, 70 credential_source, 71 service_account_impersonation_url=None, 72 client_id=None, 73 client_secret=None, 74 quota_project_id=None, 75 scopes=None, 76 default_scopes=None, 77 workforce_pool_user_project=None, 78 ): 79 """Instantiates an external account credentials object. 80 81 Args: 82 audience (str): The STS audience field. 83 subject_token_type (str): The subject token type. 84 token_url (str): The STS endpoint URL. 85 credential_source (Mapping): The credential source dictionary. 86 service_account_impersonation_url (Optional[str]): The optional service account 87 impersonation generateAccessToken URL. 88 client_id (Optional[str]): The optional client ID. 89 client_secret (Optional[str]): The optional client secret. 90 quota_project_id (Optional[str]): The optional quota project ID. 91 scopes (Optional[Sequence[str]]): Optional scopes to request during the 92 authorization grant. 93 default_scopes (Optional[Sequence[str]]): Default scopes passed by a 94 Google client library. Use 'scopes' for user-defined scopes. 95 workforce_pool_user_project (Optona[str]): The optional workforce pool user 96 project number when the credential corresponds to a workforce pool and not 97 a workload identity pool. The underlying principal must still have 98 serviceusage.services.use IAM permission to use the project for 99 billing/quota. 100 Raises: 101 google.auth.exceptions.RefreshError: If the generateAccessToken 102 endpoint returned an error. 103 """ 104 super(Credentials, self).__init__() 105 self._audience = audience 106 self._subject_token_type = subject_token_type 107 self._token_url = token_url 108 self._credential_source = credential_source 109 self._service_account_impersonation_url = service_account_impersonation_url 110 self._client_id = client_id 111 self._client_secret = client_secret 112 self._quota_project_id = quota_project_id 113 self._scopes = scopes 114 self._default_scopes = default_scopes 115 self._workforce_pool_user_project = workforce_pool_user_project 116 117 if self._client_id: 118 self._client_auth = utils.ClientAuthentication( 119 utils.ClientAuthType.basic, self._client_id, self._client_secret 120 ) 121 else: 122 self._client_auth = None 123 self._sts_client = sts.Client(self._token_url, self._client_auth) 124 125 if self._service_account_impersonation_url: 126 self._impersonated_credentials = self._initialize_impersonated_credentials() 127 else: 128 self._impersonated_credentials = None 129 self._project_id = None 130 131 if not self.is_workforce_pool and self._workforce_pool_user_project: 132 # Workload identity pools do not support workforce pool user projects. 133 raise ValueError( 134 "workforce_pool_user_project should not be set for non-workforce pool " 135 "credentials" 136 ) 137 138 @property 139 def info(self): 140 """Generates the dictionary representation of the current credentials. 141 142 Returns: 143 Mapping: The dictionary representation of the credentials. This is the 144 reverse of "from_info" defined on the subclasses of this class. It is 145 useful for serializing the current credentials so it can deserialized 146 later. 147 """ 148 config_info = { 149 "type": _EXTERNAL_ACCOUNT_JSON_TYPE, 150 "audience": self._audience, 151 "subject_token_type": self._subject_token_type, 152 "token_url": self._token_url, 153 "service_account_impersonation_url": self._service_account_impersonation_url, 154 "credential_source": copy.deepcopy(self._credential_source), 155 "quota_project_id": self._quota_project_id, 156 "client_id": self._client_id, 157 "client_secret": self._client_secret, 158 "workforce_pool_user_project": self._workforce_pool_user_project, 159 } 160 return {key: value for key, value in config_info.items() if value is not None} 161 162 @property 163 def service_account_email(self): 164 """Returns the service account email if service account impersonation is used. 165 166 Returns: 167 Optional[str]: The service account email if impersonation is used. Otherwise 168 None is returned. 169 """ 170 if self._service_account_impersonation_url: 171 # Parse email from URL. The formal looks as follows: 172 # https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/name@project-id.iam.gserviceaccount.com:generateAccessToken 173 url = self._service_account_impersonation_url 174 start_index = url.rfind("/") 175 end_index = url.find(":generateAccessToken") 176 if start_index != -1 and end_index != -1 and start_index < end_index: 177 start_index = start_index + 1 178 return url[start_index:end_index] 179 return None 180 181 @property 182 def is_user(self): 183 """Returns whether the credentials represent a user (True) or workload (False). 184 Workloads behave similarly to service accounts. Currently workloads will use 185 service account impersonation but will eventually not require impersonation. 186 As a result, this property is more reliable than the service account email 187 property in determining if the credentials represent a user or workload. 188 189 Returns: 190 bool: True if the credentials represent a user. False if they represent a 191 workload. 192 """ 193 # If service account impersonation is used, the credentials will always represent a 194 # service account. 195 if self._service_account_impersonation_url: 196 return False 197 return self.is_workforce_pool 198 199 @property 200 def is_workforce_pool(self): 201 """Returns whether the credentials represent a workforce pool (True) or 202 workload (False) based on the credentials' audience. 203 204 This will also return True for impersonated workforce pool credentials. 205 206 Returns: 207 bool: True if the credentials represent a workforce pool. False if they 208 represent a workload. 209 """ 210 # Workforce pools representing users have the following audience format: 211 # //iam.googleapis.com/locations/$location/workforcePools/$poolId/providers/$providerId 212 p = re.compile(r"//iam\.googleapis\.com/locations/[^/]+/workforcePools/") 213 return p.match(self._audience or "") is not None 214 215 @property 216 def requires_scopes(self): 217 """Checks if the credentials requires scopes. 218 219 Returns: 220 bool: True if there are no scopes set otherwise False. 221 """ 222 return not self._scopes and not self._default_scopes 223 224 @property 225 def project_number(self): 226 """Optional[str]: The project number corresponding to the workload identity pool.""" 227 228 # STS audience pattern: 229 # //iam.googleapis.com/projects/$PROJECT_NUMBER/locations/... 230 components = self._audience.split("/") 231 try: 232 project_index = components.index("projects") 233 if project_index + 1 < len(components): 234 return components[project_index + 1] or None 235 except ValueError: 236 return None 237 238 @_helpers.copy_docstring(credentials.Scoped) 239 def with_scopes(self, scopes, default_scopes=None): 240 d = dict( 241 audience=self._audience, 242 subject_token_type=self._subject_token_type, 243 token_url=self._token_url, 244 credential_source=self._credential_source, 245 service_account_impersonation_url=self._service_account_impersonation_url, 246 client_id=self._client_id, 247 client_secret=self._client_secret, 248 quota_project_id=self._quota_project_id, 249 scopes=scopes, 250 default_scopes=default_scopes, 251 workforce_pool_user_project=self._workforce_pool_user_project, 252 ) 253 if not self.is_workforce_pool: 254 d.pop("workforce_pool_user_project") 255 return self.__class__(**d) 256 257 @abc.abstractmethod 258 def retrieve_subject_token(self, request): 259 """Retrieves the subject token using the credential_source object. 260 261 Args: 262 request (google.auth.transport.Request): A callable used to make 263 HTTP requests. 264 Returns: 265 str: The retrieved subject token. 266 """ 267 # pylint: disable=missing-raises-doc 268 # (pylint doesn't recognize that this is abstract) 269 raise NotImplementedError("retrieve_subject_token must be implemented") 270 271 def get_project_id(self, request): 272 """Retrieves the project ID corresponding to the workload identity or workforce pool. 273 For workforce pool credentials, it returns the project ID corresponding to 274 the workforce_pool_user_project. 275 276 When not determinable, None is returned. 277 278 This is introduced to support the current pattern of using the Auth library: 279 280 credentials, project_id = google.auth.default() 281 282 The resource may not have permission (resourcemanager.projects.get) to 283 call this API or the required scopes may not be selected: 284 https://cloud.google.com/resource-manager/reference/rest/v1/projects/get#authorization-scopes 285 286 Args: 287 request (google.auth.transport.Request): A callable used to make 288 HTTP requests. 289 Returns: 290 Optional[str]: The project ID corresponding to the workload identity pool 291 or workforce pool if determinable. 292 """ 293 if self._project_id: 294 # If already retrieved, return the cached project ID value. 295 return self._project_id 296 scopes = self._scopes if self._scopes is not None else self._default_scopes 297 # Scopes are required in order to retrieve a valid access token. 298 project_number = self.project_number or self._workforce_pool_user_project 299 if project_number and scopes: 300 headers = {} 301 url = _CLOUD_RESOURCE_MANAGER + project_number 302 self.before_request(request, "GET", url, headers) 303 response = request(url=url, method="GET", headers=headers) 304 305 response_body = ( 306 response.data.decode("utf-8") 307 if hasattr(response.data, "decode") 308 else response.data 309 ) 310 response_data = json.loads(response_body) 311 312 if response.status == 200: 313 # Cache result as this field is immutable. 314 self._project_id = response_data.get("projectId") 315 return self._project_id 316 317 return None 318 319 @_helpers.copy_docstring(credentials.Credentials) 320 def refresh(self, request): 321 scopes = self._scopes if self._scopes is not None else self._default_scopes 322 if self._impersonated_credentials: 323 self._impersonated_credentials.refresh(request) 324 self.token = self._impersonated_credentials.token 325 self.expiry = self._impersonated_credentials.expiry 326 else: 327 now = _helpers.utcnow() 328 additional_options = None 329 # Do not pass workforce_pool_user_project when client authentication 330 # is used. The client ID is sufficient for determining the user project. 331 if self._workforce_pool_user_project and not self._client_id: 332 additional_options = {"userProject": self._workforce_pool_user_project} 333 response_data = self._sts_client.exchange_token( 334 request=request, 335 grant_type=_STS_GRANT_TYPE, 336 subject_token=self.retrieve_subject_token(request), 337 subject_token_type=self._subject_token_type, 338 audience=self._audience, 339 scopes=scopes, 340 requested_token_type=_STS_REQUESTED_TOKEN_TYPE, 341 additional_options=additional_options, 342 ) 343 self.token = response_data.get("access_token") 344 lifetime = datetime.timedelta(seconds=response_data.get("expires_in")) 345 self.expiry = now + lifetime 346 347 @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject) 348 def with_quota_project(self, quota_project_id): 349 # Return copy of instance with the provided quota project ID. 350 d = dict( 351 audience=self._audience, 352 subject_token_type=self._subject_token_type, 353 token_url=self._token_url, 354 credential_source=self._credential_source, 355 service_account_impersonation_url=self._service_account_impersonation_url, 356 client_id=self._client_id, 357 client_secret=self._client_secret, 358 quota_project_id=quota_project_id, 359 scopes=self._scopes, 360 default_scopes=self._default_scopes, 361 workforce_pool_user_project=self._workforce_pool_user_project, 362 ) 363 if not self.is_workforce_pool: 364 d.pop("workforce_pool_user_project") 365 return self.__class__(**d) 366 367 def _initialize_impersonated_credentials(self): 368 """Generates an impersonated credentials. 369 370 For more details, see `projects.serviceAccounts.generateAccessToken`_. 371 372 .. _projects.serviceAccounts.generateAccessToken: https://cloud.google.com/iam/docs/reference/credentials/rest/v1/projects.serviceAccounts/generateAccessToken 373 374 Returns: 375 impersonated_credentials.Credential: The impersonated credentials 376 object. 377 378 Raises: 379 google.auth.exceptions.RefreshError: If the generateAccessToken 380 endpoint returned an error. 381 """ 382 # Return copy of instance with no service account impersonation. 383 d = dict( 384 audience=self._audience, 385 subject_token_type=self._subject_token_type, 386 token_url=self._token_url, 387 credential_source=self._credential_source, 388 service_account_impersonation_url=None, 389 client_id=self._client_id, 390 client_secret=self._client_secret, 391 quota_project_id=self._quota_project_id, 392 scopes=self._scopes, 393 default_scopes=self._default_scopes, 394 workforce_pool_user_project=self._workforce_pool_user_project, 395 ) 396 if not self.is_workforce_pool: 397 d.pop("workforce_pool_user_project") 398 source_credentials = self.__class__(**d) 399 400 # Determine target_principal. 401 target_principal = self.service_account_email 402 if not target_principal: 403 raise exceptions.RefreshError( 404 "Unable to determine target principal from service account impersonation URL." 405 ) 406 407 scopes = self._scopes if self._scopes is not None else self._default_scopes 408 # Initialize and return impersonated credentials. 409 return impersonated_credentials.Credentials( 410 source_credentials=source_credentials, 411 target_principal=target_principal, 412 target_scopes=scopes, 413 quota_project_id=self._quota_project_id, 414 iam_endpoint_override=self._service_account_impersonation_url, 415 ) 416