README.md
1### Cancellation
2
3In the example, we implement a silly algorithm. We search for bytestrings whose
4hashes are similar to a given search string. For example, say we're looking for
5the string "doctor". Our algorithm may return `JrqhZVkTDoctYrUlXDbL6pfYQHU=` or
6`RC9/7mlM3ldy4TdoctOc6WzYbO4=`. This is a brute force algorithm, so the server
7performing the search must be conscious of the resources it allows to each client
8and each client must be conscientious of the resources it demands of the server.
9
10In particular, we ensure that client processes cancel the stream explicitly
11before terminating and we ensure that server processes cancel RPCs that have gone on longer
12than a certain number of iterations.
13
14#### Cancellation on the Client Side
15
16A client may cancel an RPC for several reasons. Perhaps the data it requested
17has been made irrelevant. Perhaps you, as the client, want to be a good citizen
18of the server and are conserving compute resources.
19
20##### Cancelling a Server-Side Unary RPC from the Client
21
22The default RPC methods on a stub will simply return the result of an RPC.
23
24```python
25>>> stub = hash_name_pb2_grpc.HashFinderStub(channel)
26>>> stub.Find(hash_name_pb2.HashNameRequest(desired_name=name))
27<hash_name_pb2.HashNameResponse object at 0x7fe2eb8ce2d0>
28```
29
30But you may use the `future()` method to receive an instance of `grpc.Future`.
31This interface allows you to wait on a response with a timeout, add a callback
32to be executed when the RPC completes, or to cancel the RPC before it has
33completed.
34
35In the example, we use this interface to cancel our in-progress RPC when the
36user interrupts the process with ctrl-c.
37
38```python
39stub = hash_name_pb2_grpc.HashFinderStub(channel)
40future = stub.Find.future(hash_name_pb2.HashNameRequest(desired_name=name))
41def cancel_request(unused_signum, unused_frame):
42 future.cancel()
43 sys.exit(0)
44signal.signal(signal.SIGINT, cancel_request)
45
46result = future.result()
47print(result)
48```
49
50We also call `sys.exit(0)` to terminate the process. If we do not do this, then
51`future.result()` with throw an `RpcError`. Alternatively, you may catch this
52exception.
53
54
55##### Cancelling a Server-Side Streaming RPC from the Client
56
57Cancelling a Server-side streaming RPC is even simpler from the perspective of
58the gRPC API. The default stub method is already an instance of `grpc.Future`,
59so the methods outlined above still apply. It is also a generator, so we may
60iterate over it to yield the results of our RPC.
61
62```python
63stub = hash_name_pb2_grpc.HashFinderStub(channel)
64result_generator = stub.FindRange(hash_name_pb2.HashNameRequest(desired_name=name))
65def cancel_request(unused_signum, unused_frame):
66 result_generator.cancel()
67 sys.exit(0)
68signal.signal(signal.SIGINT, cancel_request)
69for result in result_generator:
70 print(result)
71```
72
73We also call `sys.exit(0)` here to terminate the process. Alternatively, you may
74catch the `RpcError` raised by the for loop upon cancellation.
75
76
77#### Cancellation on the Server Side
78
79A server is responsible for cancellation in two ways. It must respond in some way
80when a client initiates a cancellation, otherwise long-running computations
81could continue indefinitely.
82
83It may also decide to cancel the RPC for its own reasons. In our example, the
84server can be configured to cancel an RPC after a certain number of hashes has
85been computed in order to conserve compute resources.
86
87##### Responding to Cancellations from a Servicer Thread
88
89It's important to remember that a gRPC Python server is backed by a thread pool
90with a fixed size. When an RPC is cancelled, the library does *not* terminate
91your servicer thread. It is your responsibility as the application author to
92ensure that your servicer thread terminates soon after the RPC has been
93cancelled.
94
95In this example, we use the `ServicerContext.add_callback` method to set a
96`threading.Event` object when the RPC is terminated. We pass this `Event` object
97down through our hashing algorithm and ensure to check that the RPC is still
98ongoing before each iteration.
99
100```python
101stop_event = threading.Event()
102def on_rpc_done():
103 # Regain servicer thread.
104 stop_event.set()
105context.add_callback(on_rpc_done)
106secret = _find_secret(stop_event)
107```
108
109##### Initiating a Cancellation on the Server Side
110
111Initiating a cancellation from the server side is simpler. Just call
112`ServicerContext.cancel()`.
113
114In our example, we ensure that no single client is monopolizing the server by
115cancelling after a configurable number of hashes have been checked.
116
117```python
118try:
119 for candidate in secret_generator:
120 yield candidate
121except ResourceLimitExceededError:
122 print("Cancelling RPC due to exhausted resources.")
123 context.cancel()
124```
125
126In this type of situation, you may also consider returning a more specific error
127using the [`grpcio-status`](https://pypi.org/project/grpcio-status/) package.
128