xref: /aosp_15_r20/external/grpc-grpc/src/python/grpcio/grpc/beta/utilities.py (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1# Copyright 2015 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Utilities for the gRPC Python Beta API."""
15
16import threading
17import time
18
19# implementations is referenced from specification in this module.
20from grpc.beta import implementations  # pylint: disable=unused-import
21from grpc.beta import interfaces
22from grpc.framework.foundation import callable_util
23from grpc.framework.foundation import future
24
25_DONE_CALLBACK_EXCEPTION_LOG_MESSAGE = (
26    'Exception calling connectivity future "done" callback!'
27)
28
29
30class _ChannelReadyFuture(future.Future):
31    def __init__(self, channel):
32        self._condition = threading.Condition()
33        self._channel = channel
34
35        self._matured = False
36        self._cancelled = False
37        self._done_callbacks = []
38
39    def _block(self, timeout):
40        until = None if timeout is None else time.time() + timeout
41        with self._condition:
42            while True:
43                if self._cancelled:
44                    raise future.CancelledError()
45                elif self._matured:
46                    return
47                else:
48                    if until is None:
49                        self._condition.wait()
50                    else:
51                        remaining = until - time.time()
52                        if remaining < 0:
53                            raise future.TimeoutError()
54                        else:
55                            self._condition.wait(timeout=remaining)
56
57    def _update(self, connectivity):
58        with self._condition:
59            if (
60                not self._cancelled
61                and connectivity is interfaces.ChannelConnectivity.READY
62            ):
63                self._matured = True
64                self._channel.unsubscribe(self._update)
65                self._condition.notify_all()
66                done_callbacks = tuple(self._done_callbacks)
67                self._done_callbacks = None
68            else:
69                return
70
71        for done_callback in done_callbacks:
72            callable_util.call_logging_exceptions(
73                done_callback, _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE, self
74            )
75
76    def cancel(self):
77        with self._condition:
78            if not self._matured:
79                self._cancelled = True
80                self._channel.unsubscribe(self._update)
81                self._condition.notify_all()
82                done_callbacks = tuple(self._done_callbacks)
83                self._done_callbacks = None
84            else:
85                return False
86
87        for done_callback in done_callbacks:
88            callable_util.call_logging_exceptions(
89                done_callback, _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE, self
90            )
91
92        return True
93
94    def cancelled(self):
95        with self._condition:
96            return self._cancelled
97
98    def running(self):
99        with self._condition:
100            return not self._cancelled and not self._matured
101
102    def done(self):
103        with self._condition:
104            return self._cancelled or self._matured
105
106    def result(self, timeout=None):
107        self._block(timeout)
108        return None
109
110    def exception(self, timeout=None):
111        self._block(timeout)
112        return None
113
114    def traceback(self, timeout=None):
115        self._block(timeout)
116        return None
117
118    def add_done_callback(self, fn):
119        with self._condition:
120            if not self._cancelled and not self._matured:
121                self._done_callbacks.append(fn)
122                return
123
124        fn(self)
125
126    def start(self):
127        with self._condition:
128            self._channel.subscribe(self._update, try_to_connect=True)
129
130    def __del__(self):
131        with self._condition:
132            if not self._cancelled and not self._matured:
133                self._channel.unsubscribe(self._update)
134
135
136def channel_ready_future(channel):
137    """Creates a future.Future tracking when an implementations.Channel is ready.
138
139    Cancelling the returned future.Future does not tell the given
140    implementations.Channel to abandon attempts it may have been making to
141    connect; cancelling merely deactivates the return future.Future's
142    subscription to the given implementations.Channel's connectivity.
143
144    Args:
145      channel: An implementations.Channel.
146
147    Returns:
148      A future.Future that matures when the given Channel has connectivity
149        interfaces.ChannelConnectivity.READY.
150    """
151    ready_future = _ChannelReadyFuture(channel)
152    ready_future.start()
153    return ready_future
154