1# Copyright 2016 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Futures for long-running operations returned from Google Cloud APIs.
16
17These futures can be used to synchronously wait for the result of a
18long-running operation using :meth:`Operation.result`:
19
20
21.. code-block:: python
22
23    operation = my_api_client.long_running_method()
24    result = operation.result()
25
26Or asynchronously using callbacks and :meth:`Operation.add_done_callback`:
27
28.. code-block:: python
29
30    operation = my_api_client.long_running_method()
31
32    def my_callback(future):
33        result = future.result()
34
35    operation.add_done_callback(my_callback)
36
37"""
38
39import functools
40import threading
41
42from google.api_core import exceptions
43from google.api_core import protobuf_helpers
44from google.api_core.future import polling
45from google.longrunning import operations_pb2
46from google.protobuf import json_format
47from google.rpc import code_pb2
48
49
50class Operation(polling.PollingFuture):
51    """A Future for interacting with a Google API Long-Running Operation.
52
53    Args:
54        operation (google.longrunning.operations_pb2.Operation): The
55            initial operation.
56        refresh (Callable[[], ~.api_core.operation.Operation]): A callable that
57            returns the latest state of the operation.
58        cancel (Callable[[], None]): A callable that tries to cancel
59            the operation.
60        result_type (func:`type`): The protobuf type for the operation's
61            result.
62        metadata_type (func:`type`): The protobuf type for the operation's
63            metadata.
64        retry (google.api_core.retry.Retry): The retry configuration used
65            when polling. This can be used to control how often :meth:`done`
66            is polled. Regardless of the retry's ``deadline``, it will be
67            overridden by the ``timeout`` argument to :meth:`result`.
68    """
69
70    def __init__(
71        self,
72        operation,
73        refresh,
74        cancel,
75        result_type,
76        metadata_type=None,
77        retry=polling.DEFAULT_RETRY,
78    ):
79        super(Operation, self).__init__(retry=retry)
80        self._operation = operation
81        self._refresh = refresh
82        self._cancel = cancel
83        self._result_type = result_type
84        self._metadata_type = metadata_type
85        self._completion_lock = threading.Lock()
86        # Invoke this in case the operation came back already complete.
87        self._set_result_from_operation()
88
89    @property
90    def operation(self):
91        """google.longrunning.Operation: The current long-running operation."""
92        return self._operation
93
94    @property
95    def metadata(self):
96        """google.protobuf.Message: the current operation metadata."""
97        if not self._operation.HasField("metadata"):
98            return None
99
100        return protobuf_helpers.from_any_pb(
101            self._metadata_type, self._operation.metadata
102        )
103
104    @classmethod
105    def deserialize(self, payload):
106        """Deserialize a ``google.longrunning.Operation`` protocol buffer.
107
108        Args:
109            payload (bytes): A serialized operation protocol buffer.
110
111        Returns:
112            ~.operations_pb2.Operation: An Operation protobuf object.
113        """
114        return operations_pb2.Operation.FromString(payload)
115
116    def _set_result_from_operation(self):
117        """Set the result or exception from the operation if it is complete."""
118        # This must be done in a lock to prevent the polling thread
119        # and main thread from both executing the completion logic
120        # at the same time.
121        with self._completion_lock:
122            # If the operation isn't complete or if the result has already been
123            # set, do not call set_result/set_exception again.
124            # Note: self._result_set is set to True in set_result and
125            # set_exception, in case those methods are invoked directly.
126            if not self._operation.done or self._result_set:
127                return
128
129            if self._operation.HasField("response"):
130                response = protobuf_helpers.from_any_pb(
131                    self._result_type, self._operation.response
132                )
133                self.set_result(response)
134            elif self._operation.HasField("error"):
135                exception = exceptions.from_grpc_status(
136                    status_code=self._operation.error.code,
137                    message=self._operation.error.message,
138                    errors=(self._operation.error,),
139                    response=self._operation,
140                )
141                self.set_exception(exception)
142            else:
143                exception = exceptions.GoogleAPICallError(
144                    "Unexpected state: Long-running operation had neither "
145                    "response nor error set."
146                )
147                self.set_exception(exception)
148
149    def _refresh_and_update(self, retry=polling.DEFAULT_RETRY):
150        """Refresh the operation and update the result if needed.
151
152        Args:
153            retry (google.api_core.retry.Retry): (Optional) How to retry the RPC.
154        """
155        # If the currently cached operation is done, no need to make another
156        # RPC as it will not change once done.
157        if not self._operation.done:
158            self._operation = self._refresh(retry=retry)
159            self._set_result_from_operation()
160
161    def done(self, retry=polling.DEFAULT_RETRY):
162        """Checks to see if the operation is complete.
163
164        Args:
165            retry (google.api_core.retry.Retry): (Optional) How to retry the RPC.
166
167        Returns:
168            bool: True if the operation is complete, False otherwise.
169        """
170        self._refresh_and_update(retry)
171        return self._operation.done
172
173    def cancel(self):
174        """Attempt to cancel the operation.
175
176        Returns:
177            bool: True if the cancel RPC was made, False if the operation is
178                already complete.
179        """
180        if self.done():
181            return False
182
183        self._cancel()
184        return True
185
186    def cancelled(self):
187        """True if the operation was cancelled."""
188        self._refresh_and_update()
189        return (
190            self._operation.HasField("error")
191            and self._operation.error.code == code_pb2.CANCELLED
192        )
193
194
195def _refresh_http(api_request, operation_name, retry=None):
196    """Refresh an operation using a JSON/HTTP client.
197
198    Args:
199        api_request (Callable): A callable used to make an API request. This
200            should generally be
201            :meth:`google.cloud._http.Connection.api_request`.
202        operation_name (str): The name of the operation.
203        retry (google.api_core.retry.Retry): (Optional) retry policy
204
205    Returns:
206        google.longrunning.operations_pb2.Operation: The operation.
207    """
208    path = "operations/{}".format(operation_name)
209
210    if retry is not None:
211        api_request = retry(api_request)
212
213    api_response = api_request(method="GET", path=path)
214    return json_format.ParseDict(api_response, operations_pb2.Operation())
215
216
217def _cancel_http(api_request, operation_name):
218    """Cancel an operation using a JSON/HTTP client.
219
220    Args:
221        api_request (Callable): A callable used to make an API request. This
222            should generally be
223            :meth:`google.cloud._http.Connection.api_request`.
224        operation_name (str): The name of the operation.
225    """
226    path = "operations/{}:cancel".format(operation_name)
227    api_request(method="POST", path=path)
228
229
230def from_http_json(operation, api_request, result_type, **kwargs):
231    """Create an operation future using a HTTP/JSON client.
232
233    This interacts with the long-running operations `service`_ (specific
234    to a given API) via `HTTP/JSON`_.
235
236    .. _HTTP/JSON: https://cloud.google.com/speech/reference/rest/\
237            v1beta1/operations#Operation
238
239    Args:
240        operation (dict): Operation as a dictionary.
241        api_request (Callable): A callable used to make an API request. This
242            should generally be
243            :meth:`google.cloud._http.Connection.api_request`.
244        result_type (:func:`type`): The protobuf result type.
245        kwargs: Keyword args passed into the :class:`Operation` constructor.
246
247    Returns:
248        ~.api_core.operation.Operation: The operation future to track the given
249            operation.
250    """
251    operation_proto = json_format.ParseDict(operation, operations_pb2.Operation())
252    refresh = functools.partial(_refresh_http, api_request, operation_proto.name)
253    cancel = functools.partial(_cancel_http, api_request, operation_proto.name)
254    return Operation(operation_proto, refresh, cancel, result_type, **kwargs)
255
256
257def _refresh_grpc(operations_stub, operation_name, retry=None):
258    """Refresh an operation using a gRPC client.
259
260    Args:
261        operations_stub (google.longrunning.operations_pb2.OperationsStub):
262            The gRPC operations stub.
263        operation_name (str): The name of the operation.
264        retry (google.api_core.retry.Retry): (Optional) retry policy
265
266    Returns:
267        google.longrunning.operations_pb2.Operation: The operation.
268    """
269    request_pb = operations_pb2.GetOperationRequest(name=operation_name)
270
271    rpc = operations_stub.GetOperation
272    if retry is not None:
273        rpc = retry(rpc)
274
275    return rpc(request_pb)
276
277
278def _cancel_grpc(operations_stub, operation_name):
279    """Cancel an operation using a gRPC client.
280
281    Args:
282        operations_stub (google.longrunning.operations_pb2.OperationsStub):
283            The gRPC operations stub.
284        operation_name (str): The name of the operation.
285    """
286    request_pb = operations_pb2.CancelOperationRequest(name=operation_name)
287    operations_stub.CancelOperation(request_pb)
288
289
290def from_grpc(operation, operations_stub, result_type, grpc_metadata=None, **kwargs):
291    """Create an operation future using a gRPC client.
292
293    This interacts with the long-running operations `service`_ (specific
294    to a given API) via gRPC.
295
296    .. _service: https://github.com/googleapis/googleapis/blob/\
297                 050400df0fdb16f63b63e9dee53819044bffc857/\
298                 google/longrunning/operations.proto#L38
299
300    Args:
301        operation (google.longrunning.operations_pb2.Operation): The operation.
302        operations_stub (google.longrunning.operations_pb2.OperationsStub):
303            The operations stub.
304        result_type (:func:`type`): The protobuf result type.
305        grpc_metadata (Optional[List[Tuple[str, str]]]): Additional metadata to pass
306            to the rpc.
307        kwargs: Keyword args passed into the :class:`Operation` constructor.
308
309    Returns:
310        ~.api_core.operation.Operation: The operation future to track the given
311            operation.
312    """
313    refresh = functools.partial(
314        _refresh_grpc, operations_stub, operation.name, metadata=grpc_metadata
315    )
316    cancel = functools.partial(
317        _cancel_grpc, operations_stub, operation.name, metadata=grpc_metadata
318    )
319    return Operation(operation, refresh, cancel, result_type, **kwargs)
320
321
322def from_gapic(operation, operations_client, result_type, grpc_metadata=None, **kwargs):
323    """Create an operation future from a gapic client.
324
325    This interacts with the long-running operations `service`_ (specific
326    to a given API) via a gapic client.
327
328    .. _service: https://github.com/googleapis/googleapis/blob/\
329                 050400df0fdb16f63b63e9dee53819044bffc857/\
330                 google/longrunning/operations.proto#L38
331
332    Args:
333        operation (google.longrunning.operations_pb2.Operation): The operation.
334        operations_client (google.api_core.operations_v1.OperationsClient):
335            The operations client.
336        result_type (:func:`type`): The protobuf result type.
337        grpc_metadata (Optional[List[Tuple[str, str]]]): Additional metadata to pass
338            to the rpc.
339        kwargs: Keyword args passed into the :class:`Operation` constructor.
340
341    Returns:
342        ~.api_core.operation.Operation: The operation future to track the given
343            operation.
344    """
345    refresh = functools.partial(
346        operations_client.get_operation, operation.name, metadata=grpc_metadata
347    )
348    cancel = functools.partial(
349        operations_client.cancel_operation, operation.name, metadata=grpc_metadata
350    )
351    return Operation(operation, refresh, cancel, result_type, **kwargs)
352