1# Copyright 2015 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Utilities for the gRPC Python Beta API.""" 15 16import threading 17import time 18 19# implementations is referenced from specification in this module. 20from grpc.beta import implementations # pylint: disable=unused-import 21from grpc.beta import interfaces 22from grpc.framework.foundation import callable_util 23from grpc.framework.foundation import future 24 25_DONE_CALLBACK_EXCEPTION_LOG_MESSAGE = ( 26 'Exception calling connectivity future "done" callback!' 27) 28 29 30class _ChannelReadyFuture(future.Future): 31 def __init__(self, channel): 32 self._condition = threading.Condition() 33 self._channel = channel 34 35 self._matured = False 36 self._cancelled = False 37 self._done_callbacks = [] 38 39 def _block(self, timeout): 40 until = None if timeout is None else time.time() + timeout 41 with self._condition: 42 while True: 43 if self._cancelled: 44 raise future.CancelledError() 45 elif self._matured: 46 return 47 else: 48 if until is None: 49 self._condition.wait() 50 else: 51 remaining = until - time.time() 52 if remaining < 0: 53 raise future.TimeoutError() 54 else: 55 self._condition.wait(timeout=remaining) 56 57 def _update(self, connectivity): 58 with self._condition: 59 if ( 60 not self._cancelled 61 and connectivity is interfaces.ChannelConnectivity.READY 62 ): 63 self._matured = True 64 self._channel.unsubscribe(self._update) 65 self._condition.notify_all() 66 done_callbacks = tuple(self._done_callbacks) 67 self._done_callbacks = None 68 else: 69 return 70 71 for done_callback in done_callbacks: 72 callable_util.call_logging_exceptions( 73 done_callback, _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE, self 74 ) 75 76 def cancel(self): 77 with self._condition: 78 if not self._matured: 79 self._cancelled = True 80 self._channel.unsubscribe(self._update) 81 self._condition.notify_all() 82 done_callbacks = tuple(self._done_callbacks) 83 self._done_callbacks = None 84 else: 85 return False 86 87 for done_callback in done_callbacks: 88 callable_util.call_logging_exceptions( 89 done_callback, _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE, self 90 ) 91 92 return True 93 94 def cancelled(self): 95 with self._condition: 96 return self._cancelled 97 98 def running(self): 99 with self._condition: 100 return not self._cancelled and not self._matured 101 102 def done(self): 103 with self._condition: 104 return self._cancelled or self._matured 105 106 def result(self, timeout=None): 107 self._block(timeout) 108 return None 109 110 def exception(self, timeout=None): 111 self._block(timeout) 112 return None 113 114 def traceback(self, timeout=None): 115 self._block(timeout) 116 return None 117 118 def add_done_callback(self, fn): 119 with self._condition: 120 if not self._cancelled and not self._matured: 121 self._done_callbacks.append(fn) 122 return 123 124 fn(self) 125 126 def start(self): 127 with self._condition: 128 self._channel.subscribe(self._update, try_to_connect=True) 129 130 def __del__(self): 131 with self._condition: 132 if not self._cancelled and not self._matured: 133 self._channel.unsubscribe(self._update) 134 135 136def channel_ready_future(channel): 137 """Creates a future.Future tracking when an implementations.Channel is ready. 138 139 Cancelling the returned future.Future does not tell the given 140 implementations.Channel to abandon attempts it may have been making to 141 connect; cancelling merely deactivates the return future.Future's 142 subscription to the given implementations.Channel's connectivity. 143 144 Args: 145 channel: An implementations.Channel. 146 147 Returns: 148 A future.Future that matures when the given Channel has connectivity 149 interfaces.ChannelConnectivity.READY. 150 """ 151 ready_future = _ChannelReadyFuture(channel) 152 ready_future.start() 153 return ready_future 154