xref: /aosp_15_r20/external/grpc-grpc/test/core/bad_connection/close_fd_test.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2019 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 // close_fd_test tests the behavior of grpc core when the transport gets
18 // disconnected.
19 // The test creates an http2 transport over a socket pair and closes the
20 // client or server file descriptor to simulate connection breakage while
21 // an RPC call is in progress.
22 //
23 //
24 #include <stdint.h>
25 
26 #include "absl/status/statusor.h"
27 #include "absl/strings/str_format.h"
28 
29 #include <grpc/impl/channel_arg_names.h>
30 #include <grpc/impl/propagation_bits.h>
31 #include <grpc/slice.h>
32 #include <grpc/status.h>
33 #include <grpc/support/time.h>
34 
35 #include "src/core/lib/channel/channel_args.h"
36 #include "src/core/lib/channel/channelz.h"
37 #include "src/core/lib/gprpp/ref_counted_ptr.h"
38 #include "src/core/lib/iomgr/endpoint.h"
39 #include "src/core/lib/iomgr/error.h"
40 #include "src/core/lib/iomgr/exec_ctx.h"
41 #include "src/core/lib/iomgr/port.h"
42 #include "src/core/lib/surface/channel_create.h"
43 #include "src/core/lib/surface/channel_stack_type.h"
44 #include "src/core/lib/transport/transport.h"
45 
46 // This test won't work except with posix sockets enabled
47 #ifdef GRPC_POSIX_SOCKET_TCP
48 
49 #include <string.h>
50 #include <unistd.h>
51 
52 #include <grpc/byte_buffer.h>
53 #include <grpc/grpc.h>
54 #include <grpc/support/alloc.h>
55 #include <grpc/support/log.h>
56 
57 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
58 #include "src/core/lib/gprpp/crash.h"
59 #include "src/core/lib/iomgr/endpoint_pair.h"
60 #include "src/core/lib/surface/channel.h"
61 #include "src/core/lib/surface/completion_queue.h"
62 #include "src/core/lib/surface/server.h"
63 #include "test/core/util/test_config.h"
64 
tag(intptr_t t)65 static void* tag(intptr_t t) { return reinterpret_cast<void*>(t); }
66 
67 typedef struct test_ctx test_ctx;
68 
69 struct test_ctx {
70   // completion queue for call notifications on the server
71   grpc_completion_queue* cq;
72   // completion queue registered to server for shutdown events
73   grpc_completion_queue* shutdown_cq;
74   // client's completion queue
75   grpc_completion_queue* client_cq;
76   // completion queue bound to call on the server
77   grpc_completion_queue* bound_cq;
78   // Server responds to client calls
79   grpc_server* server;
80   // Client calls are sent over the channel
81   grpc_channel* client;
82   // encapsulates client, server endpoints
83   grpc_endpoint_pair* ep;
84 };
85 
86 static test_ctx g_ctx;
87 
88 // chttp2 transport that is immediately available (used for testing
89 // connected_channel without a client_channel
90 
server_setup_transport(grpc_core::Transport * transport)91 static void server_setup_transport(grpc_core::Transport* transport) {
92   grpc_core::ExecCtx exec_ctx;
93   grpc_endpoint_add_to_pollset(g_ctx.ep->server, grpc_cq_pollset(g_ctx.cq));
94   grpc_core::Server* core_server = grpc_core::Server::FromC(g_ctx.server);
95   GPR_ASSERT(GRPC_LOG_IF_ERROR(
96       "SetupTransport",
97       core_server->SetupTransport(transport, nullptr,
98                                   core_server->channel_args(), nullptr)));
99 }
100 
client_setup_transport(grpc_core::Transport * transport)101 static void client_setup_transport(grpc_core::Transport* transport) {
102   grpc_core::ExecCtx exec_ctx;
103   grpc_endpoint_add_to_pollset(g_ctx.ep->client,
104                                grpc_cq_pollset(g_ctx.client_cq));
105   grpc_arg authority_arg = grpc_channel_arg_string_create(
106       const_cast<char*>(GRPC_ARG_DEFAULT_AUTHORITY),
107       const_cast<char*>("test-authority"));
108   grpc_channel_args* args =
109       grpc_channel_args_copy_and_add(nullptr, &authority_arg, 1);
110   // TODO (pjaikumar): use GRPC_CLIENT_CHANNEL instead of
111   // GRPC_CLIENT_DIRECT_CHANNEL
112   g_ctx.client = (*grpc_core::ChannelCreate(
113                       "socketpair-target", grpc_core::ChannelArgs::FromC(args),
114                       GRPC_CLIENT_DIRECT_CHANNEL, transport))
115                      .release()
116                      ->c_ptr();
117   grpc_channel_args_destroy(args);
118 }
119 
init_client()120 static void init_client() {
121   grpc_core::ExecCtx exec_ctx;
122   grpc_core::Transport* transport;
123   transport = grpc_create_chttp2_transport(grpc_core::ChannelArgs(),
124                                            g_ctx.ep->client, true);
125   client_setup_transport(transport);
126   GPR_ASSERT(g_ctx.client);
127   grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
128 }
129 
init_server()130 static void init_server() {
131   grpc_core::ExecCtx exec_ctx;
132   grpc_core::Transport* transport;
133   GPR_ASSERT(!g_ctx.server);
134   g_ctx.server = grpc_server_create(nullptr, nullptr);
135   grpc_server_register_completion_queue(g_ctx.server, g_ctx.cq, nullptr);
136   grpc_server_start(g_ctx.server);
137   transport = grpc_create_chttp2_transport(grpc_core::ChannelArgs(),
138                                            g_ctx.ep->server, false);
139   server_setup_transport(transport);
140   grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
141 }
142 
test_init()143 static void test_init() {
144   grpc_endpoint_pair* sfd =
145       static_cast<grpc_endpoint_pair*>(gpr_malloc(sizeof(grpc_endpoint_pair)));
146   memset(&g_ctx, 0, sizeof(g_ctx));
147   g_ctx.ep = sfd;
148   g_ctx.cq = grpc_completion_queue_create_for_next(nullptr);
149   g_ctx.shutdown_cq = grpc_completion_queue_create_for_pluck(nullptr);
150   g_ctx.bound_cq = grpc_completion_queue_create_for_next(nullptr);
151   g_ctx.client_cq = grpc_completion_queue_create_for_next(nullptr);
152 
153   // Create endpoints
154   *sfd = grpc_iomgr_create_endpoint_pair("fixture", nullptr);
155   // Create client, server and setup transport over endpoint pair
156   init_server();
157   init_client();
158 }
159 
drain_cq(grpc_completion_queue * cq)160 static void drain_cq(grpc_completion_queue* cq) {
161   grpc_event event;
162   do {
163     event = grpc_completion_queue_next(cq, grpc_timeout_seconds_to_deadline(1),
164                                        nullptr);
165   } while (event.type != GRPC_QUEUE_SHUTDOWN);
166 }
167 
drain_and_destroy_cq(grpc_completion_queue * cq)168 static void drain_and_destroy_cq(grpc_completion_queue* cq) {
169   grpc_completion_queue_shutdown(cq);
170   drain_cq(cq);
171   grpc_completion_queue_destroy(cq);
172 }
173 
shutdown_server()174 static void shutdown_server() {
175   if (!g_ctx.server) return;
176   grpc_server_shutdown_and_notify(g_ctx.server, g_ctx.shutdown_cq, tag(1000));
177   GPR_ASSERT(grpc_completion_queue_pluck(g_ctx.shutdown_cq, tag(1000),
178                                          grpc_timeout_seconds_to_deadline(1),
179                                          nullptr)
180                  .type == GRPC_OP_COMPLETE);
181   grpc_server_destroy(g_ctx.server);
182   g_ctx.server = nullptr;
183 }
184 
shutdown_client()185 static void shutdown_client() {
186   if (!g_ctx.client) return;
187   grpc_channel_destroy(g_ctx.client);
188   g_ctx.client = nullptr;
189 }
190 
end_test()191 static void end_test() {
192   shutdown_server();
193   shutdown_client();
194 
195   drain_and_destroy_cq(g_ctx.cq);
196   drain_and_destroy_cq(g_ctx.client_cq);
197   drain_and_destroy_cq(g_ctx.bound_cq);
198   grpc_completion_queue_destroy(g_ctx.shutdown_cq);
199   gpr_free(g_ctx.ep);
200 }
201 
202 typedef enum fd_type { CLIENT_FD, SERVER_FD } fd_type;
203 
fd_type_str(fd_type fdtype)204 static const char* fd_type_str(fd_type fdtype) {
205   if (fdtype == CLIENT_FD) {
206     return "client";
207   } else if (fdtype == SERVER_FD) {
208     return "server";
209   } else {
210     grpc_core::Crash(absl::StrFormat("Unexpected fd_type %d", fdtype));
211   }
212 }
213 
_test_close_before_server_recv(fd_type fdtype)214 static void _test_close_before_server_recv(fd_type fdtype) {
215   grpc_core::ExecCtx exec_ctx;
216   grpc_call* call;
217   grpc_call* server_call;
218   grpc_event event;
219   grpc_slice request_payload_slice =
220       grpc_slice_from_copied_string("hello world");
221   grpc_slice response_payload_slice =
222       grpc_slice_from_copied_string("hello you");
223   grpc_byte_buffer* request_payload =
224       grpc_raw_byte_buffer_create(&request_payload_slice, 1);
225   grpc_byte_buffer* response_payload =
226       grpc_raw_byte_buffer_create(&response_payload_slice, 1);
227   gpr_log(GPR_INFO, "Running test: test_close_%s_before_server_recv",
228           fd_type_str(fdtype));
229   test_init();
230 
231   grpc_op ops[6];
232   grpc_op* op;
233   grpc_metadata_array initial_metadata_recv;
234   grpc_metadata_array trailing_metadata_recv;
235   grpc_metadata_array request_metadata_recv;
236   grpc_byte_buffer* request_payload_recv = nullptr;
237   grpc_byte_buffer* response_payload_recv = nullptr;
238   grpc_call_details call_details;
239   grpc_status_code status = GRPC_STATUS__DO_NOT_USE;
240   grpc_call_error error;
241   grpc_slice details;
242 
243   gpr_timespec deadline = grpc_timeout_seconds_to_deadline(1);
244   call = grpc_channel_create_call(
245       g_ctx.client, nullptr, GRPC_PROPAGATE_DEFAULTS, g_ctx.client_cq,
246       grpc_slice_from_static_string("/foo"), nullptr, deadline, nullptr);
247   GPR_ASSERT(call);
248 
249   grpc_metadata_array_init(&initial_metadata_recv);
250   grpc_metadata_array_init(&trailing_metadata_recv);
251   grpc_metadata_array_init(&request_metadata_recv);
252   grpc_call_details_init(&call_details);
253 
254   memset(ops, 0, sizeof(ops));
255   op = ops;
256   op->op = GRPC_OP_SEND_INITIAL_METADATA;
257   op->data.send_initial_metadata.count = 0;
258   op->flags = 0;
259   op->reserved = nullptr;
260   op++;
261   op->op = GRPC_OP_SEND_MESSAGE;
262   op->data.send_message.send_message = request_payload;
263   op->flags = 0;
264   op->reserved = nullptr;
265   op++;
266   op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
267   op->flags = 0;
268   op->reserved = nullptr;
269   op++;
270   op->op = GRPC_OP_RECV_INITIAL_METADATA;
271   op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
272   op->flags = 0;
273   op->reserved = nullptr;
274   op++;
275   op->op = GRPC_OP_RECV_MESSAGE;
276   op->data.recv_message.recv_message = &response_payload_recv;
277   op->flags = 0;
278   op->reserved = nullptr;
279   op++;
280   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
281   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
282   op->data.recv_status_on_client.status = &status;
283   op->data.recv_status_on_client.status_details = &details;
284   op->flags = 0;
285   op->reserved = nullptr;
286   op++;
287   error = grpc_call_start_batch(call, ops, static_cast<size_t>(op - ops),
288                                 tag(1), nullptr);
289   GPR_ASSERT(GRPC_CALL_OK == error);
290 
291   error = grpc_server_request_call(g_ctx.server, &server_call, &call_details,
292                                    &request_metadata_recv, g_ctx.bound_cq,
293                                    g_ctx.cq, tag(101));
294   GPR_ASSERT(GRPC_CALL_OK == error);
295   event = grpc_completion_queue_next(
296       g_ctx.cq, grpc_timeout_milliseconds_to_deadline(100), nullptr);
297   GPR_ASSERT(event.success == 1);
298   GPR_ASSERT(event.tag == tag(101));
299   GPR_ASSERT(event.type == GRPC_OP_COMPLETE);
300 
301   memset(ops, 0, sizeof(ops));
302   op = ops;
303   op->op = GRPC_OP_SEND_INITIAL_METADATA;
304   op->data.send_initial_metadata.count = 0;
305   op->flags = 0;
306   op->reserved = nullptr;
307   op++;
308   op->op = GRPC_OP_RECV_MESSAGE;
309   op->data.recv_message.recv_message = &request_payload_recv;
310   op->flags = 0;
311   op->reserved = nullptr;
312   op++;
313 
314   grpc_endpoint_pair* sfd = g_ctx.ep;
315   int fd;
316   if (fdtype == SERVER_FD) {
317     fd = sfd->server->vtable->get_fd(sfd->server);
318   } else {
319     GPR_ASSERT(fdtype == CLIENT_FD);
320     fd = sfd->client->vtable->get_fd(sfd->client);
321   }
322   // Connection is closed before the server receives the client's message.
323   close(fd);
324 
325   error = grpc_call_start_batch(server_call, ops, static_cast<size_t>(op - ops),
326                                 tag(102), nullptr);
327   GPR_ASSERT(GRPC_CALL_OK == error);
328 
329   event = grpc_completion_queue_next(
330       g_ctx.bound_cq, grpc_timeout_milliseconds_to_deadline(100), nullptr);
331 
332   // Batch operation completes on the server side.
333   // event.success will be true if the op completes successfully.
334   // event.success will be false if the op completes with an error. This can
335   // happen due to a race with closing the fd resulting in pending writes
336   // failing due to stream closure.
337   //
338   GPR_ASSERT(event.type == GRPC_OP_COMPLETE);
339   GPR_ASSERT(event.tag == tag(102));
340 
341   event = grpc_completion_queue_next(
342       g_ctx.client_cq, grpc_timeout_milliseconds_to_deadline(100), nullptr);
343   // When the client fd is closed, the server gets EPIPE.
344   // When server fd is closed, server gets EBADF.
345   // In both cases server sends GRPC_STATUS_UNAVALABLE to the client. However,
346   // the client may not receive this grpc_status as it's socket is being closed.
347   // If the client didn't get grpc_status from the server it will time out
348   // waiting on the completion queue. So there 2 2 possibilities:
349   // 1. client times out waiting for server's response
350   // 2. client receives GRPC_STATUS_UNAVAILABLE from server
351   //
352   if (event.type == GRPC_QUEUE_TIMEOUT) {
353     GPR_ASSERT(event.success == 0);
354     // status is not initialized
355     GPR_ASSERT(status == GRPC_STATUS__DO_NOT_USE);
356   } else {
357     GPR_ASSERT(event.type == GRPC_OP_COMPLETE);
358     GPR_ASSERT(event.success == 1);
359     GPR_ASSERT(event.tag == tag(1));
360     GPR_ASSERT(status == GRPC_STATUS_UNAVAILABLE);
361   }
362 
363   grpc_metadata_array_destroy(&initial_metadata_recv);
364   grpc_metadata_array_destroy(&trailing_metadata_recv);
365   grpc_metadata_array_destroy(&request_metadata_recv);
366   grpc_call_details_destroy(&call_details);
367 
368   grpc_call_unref(call);
369   grpc_call_unref(server_call);
370 
371   grpc_byte_buffer_destroy(request_payload);
372   grpc_byte_buffer_destroy(response_payload);
373   grpc_byte_buffer_destroy(request_payload_recv);
374   grpc_byte_buffer_destroy(response_payload_recv);
375 
376   end_test();
377 }
378 
test_close_before_server_recv()379 static void test_close_before_server_recv() {
380   // Close client side of the connection before server receives message from
381   // client
382   _test_close_before_server_recv(CLIENT_FD);
383   // Close server side of the connection before server receives message from
384   // client
385   _test_close_before_server_recv(SERVER_FD);
386 }
387 
_test_close_before_server_send(fd_type fdtype)388 static void _test_close_before_server_send(fd_type fdtype) {
389   grpc_core::ExecCtx exec_ctx;
390   grpc_call* call;
391   grpc_call* server_call;
392   grpc_event event;
393   grpc_slice request_payload_slice =
394       grpc_slice_from_copied_string("hello world");
395   grpc_slice response_payload_slice =
396       grpc_slice_from_copied_string("hello you");
397   grpc_byte_buffer* request_payload =
398       grpc_raw_byte_buffer_create(&request_payload_slice, 1);
399   grpc_byte_buffer* response_payload =
400       grpc_raw_byte_buffer_create(&response_payload_slice, 1);
401   gpr_log(GPR_INFO, "Running test: test_close_%s_before_server_send",
402           fd_type_str(fdtype));
403   test_init();
404 
405   grpc_op ops[6];
406   grpc_op* op;
407   grpc_metadata_array initial_metadata_recv;
408   grpc_metadata_array trailing_metadata_recv;
409   grpc_metadata_array request_metadata_recv;
410   grpc_byte_buffer* request_payload_recv = nullptr;
411   grpc_byte_buffer* response_payload_recv = nullptr;
412   grpc_call_details call_details;
413   grpc_status_code status = GRPC_STATUS__DO_NOT_USE;
414   grpc_call_error error;
415   grpc_slice details;
416   int was_cancelled = 2;
417 
418   gpr_timespec deadline = grpc_timeout_seconds_to_deadline(1);
419   call = grpc_channel_create_call(
420       g_ctx.client, nullptr, GRPC_PROPAGATE_DEFAULTS, g_ctx.client_cq,
421       grpc_slice_from_static_string("/foo"), nullptr, deadline, nullptr);
422   GPR_ASSERT(call);
423 
424   grpc_metadata_array_init(&initial_metadata_recv);
425   grpc_metadata_array_init(&trailing_metadata_recv);
426   grpc_metadata_array_init(&request_metadata_recv);
427   grpc_call_details_init(&call_details);
428 
429   memset(ops, 0, sizeof(ops));
430   op = ops;
431   op->op = GRPC_OP_SEND_INITIAL_METADATA;
432   op->data.send_initial_metadata.count = 0;
433   op->flags = 0;
434   op->reserved = nullptr;
435   op++;
436   op->op = GRPC_OP_SEND_MESSAGE;
437   op->data.send_message.send_message = request_payload;
438   op->flags = 0;
439   op->reserved = nullptr;
440   op++;
441   op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
442   op->flags = 0;
443   op->reserved = nullptr;
444   op++;
445   op->op = GRPC_OP_RECV_INITIAL_METADATA;
446   op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
447   op->flags = 0;
448   op->reserved = nullptr;
449   op++;
450   op->op = GRPC_OP_RECV_MESSAGE;
451   op->data.recv_message.recv_message = &response_payload_recv;
452   op->flags = 0;
453   op->reserved = nullptr;
454   op++;
455   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
456   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
457   op->data.recv_status_on_client.status = &status;
458   op->data.recv_status_on_client.status_details = &details;
459   op->flags = 0;
460   op->reserved = nullptr;
461   op++;
462   error = grpc_call_start_batch(call, ops, static_cast<size_t>(op - ops),
463                                 tag(1), nullptr);
464   GPR_ASSERT(GRPC_CALL_OK == error);
465 
466   error = grpc_server_request_call(g_ctx.server, &server_call, &call_details,
467                                    &request_metadata_recv, g_ctx.bound_cq,
468                                    g_ctx.cq, tag(101));
469   GPR_ASSERT(GRPC_CALL_OK == error);
470   event = grpc_completion_queue_next(
471       g_ctx.cq, grpc_timeout_milliseconds_to_deadline(100), nullptr);
472   GPR_ASSERT(event.success == 1);
473   GPR_ASSERT(event.tag == tag(101));
474   GPR_ASSERT(event.type == GRPC_OP_COMPLETE);
475 
476   memset(ops, 0, sizeof(ops));
477   op = ops;
478   op->op = GRPC_OP_SEND_INITIAL_METADATA;
479   op->data.send_initial_metadata.count = 0;
480   op->flags = 0;
481   op->reserved = nullptr;
482   op++;
483   op->op = GRPC_OP_RECV_MESSAGE;
484   op->data.recv_message.recv_message = &request_payload_recv;
485   op->flags = 0;
486   op->reserved = nullptr;
487   op++;
488   error = grpc_call_start_batch(server_call, ops, static_cast<size_t>(op - ops),
489                                 tag(102), nullptr);
490   GPR_ASSERT(GRPC_CALL_OK == error);
491 
492   event = grpc_completion_queue_next(
493       g_ctx.bound_cq, grpc_timeout_milliseconds_to_deadline(100), nullptr);
494   GPR_ASSERT(event.type == GRPC_OP_COMPLETE);
495   GPR_ASSERT(event.success == 1);
496   GPR_ASSERT(event.tag == tag(102));
497 
498   memset(ops, 0, sizeof(ops));
499   op = ops;
500   op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
501   op->data.recv_close_on_server.cancelled = &was_cancelled;
502   op->flags = 0;
503   op->reserved = nullptr;
504   op++;
505   op->op = GRPC_OP_SEND_MESSAGE;
506   op->data.send_message.send_message = response_payload;
507   op->flags = 0;
508   op->reserved = nullptr;
509   op++;
510   op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
511   op->data.send_status_from_server.trailing_metadata_count = 0;
512   op->data.send_status_from_server.status = GRPC_STATUS_OK;
513   grpc_slice status_details = grpc_slice_from_static_string("xyz");
514   op->data.send_status_from_server.status_details = &status_details;
515   op->flags = 0;
516   op->reserved = nullptr;
517   op++;
518 
519   grpc_endpoint_pair* sfd = g_ctx.ep;
520   int fd;
521   if (fdtype == SERVER_FD) {
522     fd = sfd->server->vtable->get_fd(sfd->server);
523   } else {
524     GPR_ASSERT(fdtype == CLIENT_FD);
525     fd = sfd->client->vtable->get_fd(sfd->client);
526   }
527 
528   // Connection is closed before the server sends message and status to the
529   // client.
530   close(fd);
531   error = grpc_call_start_batch(server_call, ops, static_cast<size_t>(op - ops),
532                                 tag(103), nullptr);
533   GPR_ASSERT(GRPC_CALL_OK == error);
534 
535   // Batch operation succeeds on the server side
536   event = grpc_completion_queue_next(
537       g_ctx.bound_cq, grpc_timeout_milliseconds_to_deadline(100), nullptr);
538   GPR_ASSERT(event.type == GRPC_OP_COMPLETE);
539   GPR_ASSERT(event.success == 1);
540   GPR_ASSERT(event.tag == tag(103));
541 
542   event = grpc_completion_queue_next(
543       g_ctx.client_cq, grpc_timeout_milliseconds_to_deadline(100), nullptr);
544   // In both cases server sends GRPC_STATUS_UNAVALABLE to the client. However,
545   // the client may not receive this grpc_status as it's socket is being closed.
546   // If the client didn't get grpc_status from the server it will time out
547   // waiting on the completion queue
548   //
549   if (event.type == GRPC_OP_COMPLETE) {
550     GPR_ASSERT(event.success == 1);
551     GPR_ASSERT(event.tag == tag(1));
552     GPR_ASSERT(status == GRPC_STATUS_UNAVAILABLE);
553   } else {
554     GPR_ASSERT(event.type == GRPC_QUEUE_TIMEOUT);
555     GPR_ASSERT(event.success == 0);
556     // status is not initialized
557     GPR_ASSERT(status == GRPC_STATUS__DO_NOT_USE);
558   }
559   GPR_ASSERT(was_cancelled == 0);
560 
561   grpc_metadata_array_destroy(&initial_metadata_recv);
562   grpc_metadata_array_destroy(&trailing_metadata_recv);
563   grpc_metadata_array_destroy(&request_metadata_recv);
564   grpc_call_details_destroy(&call_details);
565 
566   grpc_call_unref(call);
567   grpc_call_unref(server_call);
568 
569   grpc_byte_buffer_destroy(request_payload);
570   grpc_byte_buffer_destroy(response_payload);
571   grpc_byte_buffer_destroy(request_payload_recv);
572   grpc_byte_buffer_destroy(response_payload_recv);
573 
574   end_test();
575 }
576 
test_close_before_server_send()577 static void test_close_before_server_send() {
578   // Close client side of the connection before server sends message to client
579   //
580   _test_close_before_server_send(CLIENT_FD);
581   // Close server side of the connection before server sends message to client
582   //
583   _test_close_before_server_send(SERVER_FD);
584 }
585 
_test_close_before_client_send(fd_type fdtype)586 static void _test_close_before_client_send(fd_type fdtype) {
587   grpc_core::ExecCtx exec_ctx;
588   grpc_call* call;
589   grpc_event event;
590   grpc_slice request_payload_slice =
591       grpc_slice_from_copied_string("hello world");
592   grpc_slice response_payload_slice =
593       grpc_slice_from_copied_string("hello you");
594   grpc_byte_buffer* request_payload =
595       grpc_raw_byte_buffer_create(&request_payload_slice, 1);
596   grpc_byte_buffer* response_payload =
597       grpc_raw_byte_buffer_create(&response_payload_slice, 1);
598   gpr_log(GPR_INFO, "Running test: test_close_%s_before_client_send",
599           fd_type_str(fdtype));
600   test_init();
601 
602   grpc_op ops[6];
603   grpc_op* op;
604   grpc_metadata_array initial_metadata_recv;
605   grpc_metadata_array trailing_metadata_recv;
606   grpc_metadata_array request_metadata_recv;
607   grpc_byte_buffer* request_payload_recv = nullptr;
608   grpc_byte_buffer* response_payload_recv = nullptr;
609   grpc_call_details call_details;
610   grpc_status_code status;
611   grpc_call_error error;
612   grpc_slice details;
613 
614   gpr_timespec deadline = grpc_timeout_seconds_to_deadline(1);
615   call = grpc_channel_create_call(
616       g_ctx.client, nullptr, GRPC_PROPAGATE_DEFAULTS, g_ctx.client_cq,
617       grpc_slice_from_static_string("/foo"), nullptr, deadline, nullptr);
618   GPR_ASSERT(call);
619 
620   grpc_metadata_array_init(&initial_metadata_recv);
621   grpc_metadata_array_init(&trailing_metadata_recv);
622   grpc_metadata_array_init(&request_metadata_recv);
623   grpc_call_details_init(&call_details);
624 
625   memset(ops, 0, sizeof(ops));
626   op = ops;
627   op->op = GRPC_OP_SEND_INITIAL_METADATA;
628   op->data.send_initial_metadata.count = 0;
629   op->flags = 0;
630   op->reserved = nullptr;
631   op++;
632   op->op = GRPC_OP_SEND_MESSAGE;
633   op->data.send_message.send_message = request_payload;
634   op->flags = 0;
635   op->reserved = nullptr;
636   op++;
637   op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
638   op->flags = 0;
639   op->reserved = nullptr;
640   op++;
641   op->op = GRPC_OP_RECV_INITIAL_METADATA;
642   op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
643   op->flags = 0;
644   op->reserved = nullptr;
645   op++;
646   op->op = GRPC_OP_RECV_MESSAGE;
647   op->data.recv_message.recv_message = &response_payload_recv;
648   op->flags = 0;
649   op->reserved = nullptr;
650   op++;
651   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
652   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
653   op->data.recv_status_on_client.status = &status;
654   op->data.recv_status_on_client.status_details = &details;
655   op->flags = 0;
656   op->reserved = nullptr;
657   op++;
658 
659   grpc_endpoint_pair* sfd = g_ctx.ep;
660   int fd;
661   if (fdtype == SERVER_FD) {
662     fd = sfd->server->vtable->get_fd(sfd->server);
663   } else {
664     GPR_ASSERT(fdtype == CLIENT_FD);
665     fd = sfd->client->vtable->get_fd(sfd->client);
666   }
667   // Connection is closed before the client sends a batch to the server
668   close(fd);
669 
670   error = grpc_call_start_batch(call, ops, static_cast<size_t>(op - ops),
671                                 tag(1), nullptr);
672   GPR_ASSERT(GRPC_CALL_OK == error);
673 
674   // Status unavailable is returned to the client when client or server fd is
675   // closed
676   event = grpc_completion_queue_next(
677       g_ctx.client_cq, grpc_timeout_milliseconds_to_deadline(100), nullptr);
678   GPR_ASSERT(event.success == 1);
679   GPR_ASSERT(event.type == GRPC_OP_COMPLETE);
680   GPR_ASSERT(event.tag == tag(1));
681   GPR_ASSERT(status == GRPC_STATUS_UNAVAILABLE);
682 
683   // No event is received on the server
684   event = grpc_completion_queue_next(
685       g_ctx.cq, grpc_timeout_milliseconds_to_deadline(100), nullptr);
686   GPR_ASSERT(event.success == 0);
687   GPR_ASSERT(event.type == GRPC_QUEUE_TIMEOUT);
688 
689   grpc_slice_unref(details);
690   grpc_metadata_array_destroy(&initial_metadata_recv);
691   grpc_metadata_array_destroy(&trailing_metadata_recv);
692   grpc_metadata_array_destroy(&request_metadata_recv);
693   grpc_call_details_destroy(&call_details);
694 
695   grpc_call_unref(call);
696 
697   grpc_byte_buffer_destroy(request_payload);
698   grpc_byte_buffer_destroy(response_payload);
699   grpc_byte_buffer_destroy(request_payload_recv);
700   grpc_byte_buffer_destroy(response_payload_recv);
701 
702   end_test();
703 }
test_close_before_client_send()704 static void test_close_before_client_send() {
705   // Close client side of the connection before client sends message to server
706   //
707   _test_close_before_client_send(CLIENT_FD);
708   // Close server side of the connection before client sends message to server
709   //
710   _test_close_before_client_send(SERVER_FD);
711 }
712 
_test_close_before_call_create(fd_type fdtype)713 static void _test_close_before_call_create(fd_type fdtype) {
714   grpc_core::ExecCtx exec_ctx;
715   grpc_call* call;
716   grpc_event event;
717   test_init();
718 
719   gpr_timespec deadline = grpc_timeout_milliseconds_to_deadline(100);
720 
721   grpc_endpoint_pair* sfd = g_ctx.ep;
722   int fd;
723   if (fdtype == SERVER_FD) {
724     fd = sfd->server->vtable->get_fd(sfd->server);
725   } else {
726     GPR_ASSERT(fdtype == CLIENT_FD);
727     fd = sfd->client->vtable->get_fd(sfd->client);
728   }
729   // Connection is closed before the client creates a call
730   close(fd);
731 
732   call = grpc_channel_create_call(
733       g_ctx.client, nullptr, GRPC_PROPAGATE_DEFAULTS, g_ctx.client_cq,
734       grpc_slice_from_static_string("/foo"), nullptr, deadline, nullptr);
735   GPR_ASSERT(call);
736 
737   // Client and server time out waiting on their completion queues and nothing
738   // is sent or received
739   event = grpc_completion_queue_next(
740       g_ctx.client_cq, grpc_timeout_milliseconds_to_deadline(100), nullptr);
741   GPR_ASSERT(event.type == GRPC_QUEUE_TIMEOUT);
742   GPR_ASSERT(event.success == 0);
743 
744   event = grpc_completion_queue_next(
745       g_ctx.cq, grpc_timeout_milliseconds_to_deadline(100), nullptr);
746   GPR_ASSERT(event.type == GRPC_QUEUE_TIMEOUT);
747   GPR_ASSERT(event.success == 0);
748 
749   grpc_call_unref(call);
750   end_test();
751 }
752 
test_close_before_call_create()753 static void test_close_before_call_create() {
754   // Close client side of the connection before client creates a call
755   _test_close_before_call_create(CLIENT_FD);
756   // Close server side of the connection before client creates a call
757   _test_close_before_call_create(SERVER_FD);
758 }
759 
main(int argc,char ** argv)760 int main(int argc, char** argv) {
761   grpc::testing::TestEnvironment env(&argc, argv);
762   // Init grpc
763   grpc_init();
764   int iterations = 10;
765 
766   for (int i = 0; i < iterations; ++i) {
767     test_close_before_call_create();
768     test_close_before_client_send();
769     test_close_before_server_recv();
770     test_close_before_server_send();
771   }
772 
773   grpc_shutdown();
774 
775   return 0;
776 }
777 
778 #else  // GRPC_POSIX_SOCKET_TCP
779 
main(int argc,char ** argv)780 int main(int argc, char** argv) { return 1; }
781 
782 #endif  // GRPC_POSIX_SOCKET_TCP
783