1# Copyright 2020 The gRPC Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15from concurrent.futures import ThreadPoolExecutor 16import logging 17import threading 18import time 19from typing import Iterable 20 21from google.protobuf.json_format import MessageToJson 22import grpc 23 24import phone_pb2 25import phone_pb2_grpc 26 27 28def create_state_response( 29 call_state: phone_pb2.CallState.State, 30) -> phone_pb2.StreamCallResponse: 31 response = phone_pb2.StreamCallResponse() 32 response.call_state.state = call_state 33 return response 34 35 36class Phone(phone_pb2_grpc.PhoneServicer): 37 def __init__(self): 38 self._id_counter = 0 39 self._lock = threading.RLock() 40 41 def _create_call_session(self) -> phone_pb2.CallInfo: 42 call_info = phone_pb2.CallInfo() 43 with self._lock: 44 call_info.session_id = str(self._id_counter) 45 self._id_counter += 1 46 call_info.media = "https://link.to.audio.resources" 47 logging.info("Created a call session [%s]", MessageToJson(call_info)) 48 return call_info 49 50 def _clean_call_session(self, call_info: phone_pb2.CallInfo) -> None: 51 logging.info("Call session cleaned [%s]", MessageToJson(call_info)) 52 53 def StreamCall( 54 self, 55 request_iterator: Iterable[phone_pb2.StreamCallRequest], 56 context: grpc.ServicerContext, 57 ) -> Iterable[phone_pb2.StreamCallResponse]: 58 try: 59 request = next(request_iterator) 60 logging.info( 61 "Received a phone call request for number [%s]", 62 request.phone_number, 63 ) 64 except StopIteration: 65 raise RuntimeError("Failed to receive call request") 66 # Simulate the acceptance of call request 67 time.sleep(1) 68 yield create_state_response(phone_pb2.CallState.NEW) 69 # Simulate the start of the call session 70 time.sleep(1) 71 call_info = self._create_call_session() 72 context.add_callback(lambda: self._clean_call_session(call_info)) 73 response = phone_pb2.StreamCallResponse() 74 response.call_info.session_id = call_info.session_id 75 response.call_info.media = call_info.media 76 yield response 77 yield create_state_response(phone_pb2.CallState.ACTIVE) 78 # Simulate the end of the call 79 time.sleep(2) 80 yield create_state_response(phone_pb2.CallState.ENDED) 81 logging.info("Call finished [%s]", request.phone_number) 82 83 84def serve(address: str) -> None: 85 server = grpc.server(ThreadPoolExecutor()) 86 phone_pb2_grpc.add_PhoneServicer_to_server(Phone(), server) 87 server.add_insecure_port(address) 88 server.start() 89 logging.info("Server serving at %s", address) 90 server.wait_for_termination() 91 92 93if __name__ == "__main__": 94 logging.basicConfig(level=logging.INFO) 95 serve("[::]:50051") 96