1 //
2 //
3 // Copyright 2019 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 // close_fd_test tests the behavior of grpc core when the transport gets
18 // disconnected.
19 // The test creates an http2 transport over a socket pair and closes the
20 // client or server file descriptor to simulate connection breakage while
21 // an RPC call is in progress.
22 //
23 //
24 #include <stdint.h>
25
26 #include "absl/status/statusor.h"
27 #include "absl/strings/str_format.h"
28
29 #include <grpc/impl/channel_arg_names.h>
30 #include <grpc/impl/propagation_bits.h>
31 #include <grpc/slice.h>
32 #include <grpc/status.h>
33 #include <grpc/support/time.h>
34
35 #include "src/core/lib/channel/channel_args.h"
36 #include "src/core/lib/channel/channelz.h"
37 #include "src/core/lib/gprpp/ref_counted_ptr.h"
38 #include "src/core/lib/iomgr/endpoint.h"
39 #include "src/core/lib/iomgr/error.h"
40 #include "src/core/lib/iomgr/exec_ctx.h"
41 #include "src/core/lib/iomgr/port.h"
42 #include "src/core/lib/surface/channel_create.h"
43 #include "src/core/lib/surface/channel_stack_type.h"
44 #include "src/core/lib/transport/transport.h"
45
46 // This test won't work except with posix sockets enabled
47 #ifdef GRPC_POSIX_SOCKET_TCP
48
49 #include <string.h>
50 #include <unistd.h>
51
52 #include <grpc/byte_buffer.h>
53 #include <grpc/grpc.h>
54 #include <grpc/support/alloc.h>
55 #include <grpc/support/log.h>
56
57 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
58 #include "src/core/lib/gprpp/crash.h"
59 #include "src/core/lib/iomgr/endpoint_pair.h"
60 #include "src/core/lib/surface/channel.h"
61 #include "src/core/lib/surface/completion_queue.h"
62 #include "src/core/lib/surface/server.h"
63 #include "test/core/util/test_config.h"
64
tag(intptr_t t)65 static void* tag(intptr_t t) { return reinterpret_cast<void*>(t); }
66
67 typedef struct test_ctx test_ctx;
68
69 struct test_ctx {
70 // completion queue for call notifications on the server
71 grpc_completion_queue* cq;
72 // completion queue registered to server for shutdown events
73 grpc_completion_queue* shutdown_cq;
74 // client's completion queue
75 grpc_completion_queue* client_cq;
76 // completion queue bound to call on the server
77 grpc_completion_queue* bound_cq;
78 // Server responds to client calls
79 grpc_server* server;
80 // Client calls are sent over the channel
81 grpc_channel* client;
82 // encapsulates client, server endpoints
83 grpc_endpoint_pair* ep;
84 };
85
86 static test_ctx g_ctx;
87
88 // chttp2 transport that is immediately available (used for testing
89 // connected_channel without a client_channel
90
server_setup_transport(grpc_core::Transport * transport)91 static void server_setup_transport(grpc_core::Transport* transport) {
92 grpc_core::ExecCtx exec_ctx;
93 grpc_endpoint_add_to_pollset(g_ctx.ep->server, grpc_cq_pollset(g_ctx.cq));
94 grpc_core::Server* core_server = grpc_core::Server::FromC(g_ctx.server);
95 GPR_ASSERT(GRPC_LOG_IF_ERROR(
96 "SetupTransport",
97 core_server->SetupTransport(transport, nullptr,
98 core_server->channel_args(), nullptr)));
99 }
100
client_setup_transport(grpc_core::Transport * transport)101 static void client_setup_transport(grpc_core::Transport* transport) {
102 grpc_core::ExecCtx exec_ctx;
103 grpc_endpoint_add_to_pollset(g_ctx.ep->client,
104 grpc_cq_pollset(g_ctx.client_cq));
105 grpc_arg authority_arg = grpc_channel_arg_string_create(
106 const_cast<char*>(GRPC_ARG_DEFAULT_AUTHORITY),
107 const_cast<char*>("test-authority"));
108 grpc_channel_args* args =
109 grpc_channel_args_copy_and_add(nullptr, &authority_arg, 1);
110 // TODO (pjaikumar): use GRPC_CLIENT_CHANNEL instead of
111 // GRPC_CLIENT_DIRECT_CHANNEL
112 g_ctx.client = (*grpc_core::ChannelCreate(
113 "socketpair-target", grpc_core::ChannelArgs::FromC(args),
114 GRPC_CLIENT_DIRECT_CHANNEL, transport))
115 .release()
116 ->c_ptr();
117 grpc_channel_args_destroy(args);
118 }
119
init_client()120 static void init_client() {
121 grpc_core::ExecCtx exec_ctx;
122 grpc_core::Transport* transport;
123 transport = grpc_create_chttp2_transport(grpc_core::ChannelArgs(),
124 g_ctx.ep->client, true);
125 client_setup_transport(transport);
126 GPR_ASSERT(g_ctx.client);
127 grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
128 }
129
init_server()130 static void init_server() {
131 grpc_core::ExecCtx exec_ctx;
132 grpc_core::Transport* transport;
133 GPR_ASSERT(!g_ctx.server);
134 g_ctx.server = grpc_server_create(nullptr, nullptr);
135 grpc_server_register_completion_queue(g_ctx.server, g_ctx.cq, nullptr);
136 grpc_server_start(g_ctx.server);
137 transport = grpc_create_chttp2_transport(grpc_core::ChannelArgs(),
138 g_ctx.ep->server, false);
139 server_setup_transport(transport);
140 grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
141 }
142
test_init()143 static void test_init() {
144 grpc_endpoint_pair* sfd =
145 static_cast<grpc_endpoint_pair*>(gpr_malloc(sizeof(grpc_endpoint_pair)));
146 memset(&g_ctx, 0, sizeof(g_ctx));
147 g_ctx.ep = sfd;
148 g_ctx.cq = grpc_completion_queue_create_for_next(nullptr);
149 g_ctx.shutdown_cq = grpc_completion_queue_create_for_pluck(nullptr);
150 g_ctx.bound_cq = grpc_completion_queue_create_for_next(nullptr);
151 g_ctx.client_cq = grpc_completion_queue_create_for_next(nullptr);
152
153 // Create endpoints
154 *sfd = grpc_iomgr_create_endpoint_pair("fixture", nullptr);
155 // Create client, server and setup transport over endpoint pair
156 init_server();
157 init_client();
158 }
159
drain_cq(grpc_completion_queue * cq)160 static void drain_cq(grpc_completion_queue* cq) {
161 grpc_event event;
162 do {
163 event = grpc_completion_queue_next(cq, grpc_timeout_seconds_to_deadline(1),
164 nullptr);
165 } while (event.type != GRPC_QUEUE_SHUTDOWN);
166 }
167
drain_and_destroy_cq(grpc_completion_queue * cq)168 static void drain_and_destroy_cq(grpc_completion_queue* cq) {
169 grpc_completion_queue_shutdown(cq);
170 drain_cq(cq);
171 grpc_completion_queue_destroy(cq);
172 }
173
shutdown_server()174 static void shutdown_server() {
175 if (!g_ctx.server) return;
176 grpc_server_shutdown_and_notify(g_ctx.server, g_ctx.shutdown_cq, tag(1000));
177 GPR_ASSERT(grpc_completion_queue_pluck(g_ctx.shutdown_cq, tag(1000),
178 grpc_timeout_seconds_to_deadline(1),
179 nullptr)
180 .type == GRPC_OP_COMPLETE);
181 grpc_server_destroy(g_ctx.server);
182 g_ctx.server = nullptr;
183 }
184
shutdown_client()185 static void shutdown_client() {
186 if (!g_ctx.client) return;
187 grpc_channel_destroy(g_ctx.client);
188 g_ctx.client = nullptr;
189 }
190
end_test()191 static void end_test() {
192 shutdown_server();
193 shutdown_client();
194
195 drain_and_destroy_cq(g_ctx.cq);
196 drain_and_destroy_cq(g_ctx.client_cq);
197 drain_and_destroy_cq(g_ctx.bound_cq);
198 grpc_completion_queue_destroy(g_ctx.shutdown_cq);
199 gpr_free(g_ctx.ep);
200 }
201
202 typedef enum fd_type { CLIENT_FD, SERVER_FD } fd_type;
203
fd_type_str(fd_type fdtype)204 static const char* fd_type_str(fd_type fdtype) {
205 if (fdtype == CLIENT_FD) {
206 return "client";
207 } else if (fdtype == SERVER_FD) {
208 return "server";
209 } else {
210 grpc_core::Crash(absl::StrFormat("Unexpected fd_type %d", fdtype));
211 }
212 }
213
_test_close_before_server_recv(fd_type fdtype)214 static void _test_close_before_server_recv(fd_type fdtype) {
215 grpc_core::ExecCtx exec_ctx;
216 grpc_call* call;
217 grpc_call* server_call;
218 grpc_event event;
219 grpc_slice request_payload_slice =
220 grpc_slice_from_copied_string("hello world");
221 grpc_slice response_payload_slice =
222 grpc_slice_from_copied_string("hello you");
223 grpc_byte_buffer* request_payload =
224 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
225 grpc_byte_buffer* response_payload =
226 grpc_raw_byte_buffer_create(&response_payload_slice, 1);
227 gpr_log(GPR_INFO, "Running test: test_close_%s_before_server_recv",
228 fd_type_str(fdtype));
229 test_init();
230
231 grpc_op ops[6];
232 grpc_op* op;
233 grpc_metadata_array initial_metadata_recv;
234 grpc_metadata_array trailing_metadata_recv;
235 grpc_metadata_array request_metadata_recv;
236 grpc_byte_buffer* request_payload_recv = nullptr;
237 grpc_byte_buffer* response_payload_recv = nullptr;
238 grpc_call_details call_details;
239 grpc_status_code status = GRPC_STATUS__DO_NOT_USE;
240 grpc_call_error error;
241 grpc_slice details;
242
243 gpr_timespec deadline = grpc_timeout_seconds_to_deadline(1);
244 call = grpc_channel_create_call(
245 g_ctx.client, nullptr, GRPC_PROPAGATE_DEFAULTS, g_ctx.client_cq,
246 grpc_slice_from_static_string("/foo"), nullptr, deadline, nullptr);
247 GPR_ASSERT(call);
248
249 grpc_metadata_array_init(&initial_metadata_recv);
250 grpc_metadata_array_init(&trailing_metadata_recv);
251 grpc_metadata_array_init(&request_metadata_recv);
252 grpc_call_details_init(&call_details);
253
254 memset(ops, 0, sizeof(ops));
255 op = ops;
256 op->op = GRPC_OP_SEND_INITIAL_METADATA;
257 op->data.send_initial_metadata.count = 0;
258 op->flags = 0;
259 op->reserved = nullptr;
260 op++;
261 op->op = GRPC_OP_SEND_MESSAGE;
262 op->data.send_message.send_message = request_payload;
263 op->flags = 0;
264 op->reserved = nullptr;
265 op++;
266 op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
267 op->flags = 0;
268 op->reserved = nullptr;
269 op++;
270 op->op = GRPC_OP_RECV_INITIAL_METADATA;
271 op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
272 op->flags = 0;
273 op->reserved = nullptr;
274 op++;
275 op->op = GRPC_OP_RECV_MESSAGE;
276 op->data.recv_message.recv_message = &response_payload_recv;
277 op->flags = 0;
278 op->reserved = nullptr;
279 op++;
280 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
281 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
282 op->data.recv_status_on_client.status = &status;
283 op->data.recv_status_on_client.status_details = &details;
284 op->flags = 0;
285 op->reserved = nullptr;
286 op++;
287 error = grpc_call_start_batch(call, ops, static_cast<size_t>(op - ops),
288 tag(1), nullptr);
289 GPR_ASSERT(GRPC_CALL_OK == error);
290
291 error = grpc_server_request_call(g_ctx.server, &server_call, &call_details,
292 &request_metadata_recv, g_ctx.bound_cq,
293 g_ctx.cq, tag(101));
294 GPR_ASSERT(GRPC_CALL_OK == error);
295 event = grpc_completion_queue_next(
296 g_ctx.cq, grpc_timeout_milliseconds_to_deadline(100), nullptr);
297 GPR_ASSERT(event.success == 1);
298 GPR_ASSERT(event.tag == tag(101));
299 GPR_ASSERT(event.type == GRPC_OP_COMPLETE);
300
301 memset(ops, 0, sizeof(ops));
302 op = ops;
303 op->op = GRPC_OP_SEND_INITIAL_METADATA;
304 op->data.send_initial_metadata.count = 0;
305 op->flags = 0;
306 op->reserved = nullptr;
307 op++;
308 op->op = GRPC_OP_RECV_MESSAGE;
309 op->data.recv_message.recv_message = &request_payload_recv;
310 op->flags = 0;
311 op->reserved = nullptr;
312 op++;
313
314 grpc_endpoint_pair* sfd = g_ctx.ep;
315 int fd;
316 if (fdtype == SERVER_FD) {
317 fd = sfd->server->vtable->get_fd(sfd->server);
318 } else {
319 GPR_ASSERT(fdtype == CLIENT_FD);
320 fd = sfd->client->vtable->get_fd(sfd->client);
321 }
322 // Connection is closed before the server receives the client's message.
323 close(fd);
324
325 error = grpc_call_start_batch(server_call, ops, static_cast<size_t>(op - ops),
326 tag(102), nullptr);
327 GPR_ASSERT(GRPC_CALL_OK == error);
328
329 event = grpc_completion_queue_next(
330 g_ctx.bound_cq, grpc_timeout_milliseconds_to_deadline(100), nullptr);
331
332 // Batch operation completes on the server side.
333 // event.success will be true if the op completes successfully.
334 // event.success will be false if the op completes with an error. This can
335 // happen due to a race with closing the fd resulting in pending writes
336 // failing due to stream closure.
337 //
338 GPR_ASSERT(event.type == GRPC_OP_COMPLETE);
339 GPR_ASSERT(event.tag == tag(102));
340
341 event = grpc_completion_queue_next(
342 g_ctx.client_cq, grpc_timeout_milliseconds_to_deadline(100), nullptr);
343 // When the client fd is closed, the server gets EPIPE.
344 // When server fd is closed, server gets EBADF.
345 // In both cases server sends GRPC_STATUS_UNAVALABLE to the client. However,
346 // the client may not receive this grpc_status as it's socket is being closed.
347 // If the client didn't get grpc_status from the server it will time out
348 // waiting on the completion queue. So there 2 2 possibilities:
349 // 1. client times out waiting for server's response
350 // 2. client receives GRPC_STATUS_UNAVAILABLE from server
351 //
352 if (event.type == GRPC_QUEUE_TIMEOUT) {
353 GPR_ASSERT(event.success == 0);
354 // status is not initialized
355 GPR_ASSERT(status == GRPC_STATUS__DO_NOT_USE);
356 } else {
357 GPR_ASSERT(event.type == GRPC_OP_COMPLETE);
358 GPR_ASSERT(event.success == 1);
359 GPR_ASSERT(event.tag == tag(1));
360 GPR_ASSERT(status == GRPC_STATUS_UNAVAILABLE);
361 }
362
363 grpc_metadata_array_destroy(&initial_metadata_recv);
364 grpc_metadata_array_destroy(&trailing_metadata_recv);
365 grpc_metadata_array_destroy(&request_metadata_recv);
366 grpc_call_details_destroy(&call_details);
367
368 grpc_call_unref(call);
369 grpc_call_unref(server_call);
370
371 grpc_byte_buffer_destroy(request_payload);
372 grpc_byte_buffer_destroy(response_payload);
373 grpc_byte_buffer_destroy(request_payload_recv);
374 grpc_byte_buffer_destroy(response_payload_recv);
375
376 end_test();
377 }
378
test_close_before_server_recv()379 static void test_close_before_server_recv() {
380 // Close client side of the connection before server receives message from
381 // client
382 _test_close_before_server_recv(CLIENT_FD);
383 // Close server side of the connection before server receives message from
384 // client
385 _test_close_before_server_recv(SERVER_FD);
386 }
387
_test_close_before_server_send(fd_type fdtype)388 static void _test_close_before_server_send(fd_type fdtype) {
389 grpc_core::ExecCtx exec_ctx;
390 grpc_call* call;
391 grpc_call* server_call;
392 grpc_event event;
393 grpc_slice request_payload_slice =
394 grpc_slice_from_copied_string("hello world");
395 grpc_slice response_payload_slice =
396 grpc_slice_from_copied_string("hello you");
397 grpc_byte_buffer* request_payload =
398 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
399 grpc_byte_buffer* response_payload =
400 grpc_raw_byte_buffer_create(&response_payload_slice, 1);
401 gpr_log(GPR_INFO, "Running test: test_close_%s_before_server_send",
402 fd_type_str(fdtype));
403 test_init();
404
405 grpc_op ops[6];
406 grpc_op* op;
407 grpc_metadata_array initial_metadata_recv;
408 grpc_metadata_array trailing_metadata_recv;
409 grpc_metadata_array request_metadata_recv;
410 grpc_byte_buffer* request_payload_recv = nullptr;
411 grpc_byte_buffer* response_payload_recv = nullptr;
412 grpc_call_details call_details;
413 grpc_status_code status = GRPC_STATUS__DO_NOT_USE;
414 grpc_call_error error;
415 grpc_slice details;
416 int was_cancelled = 2;
417
418 gpr_timespec deadline = grpc_timeout_seconds_to_deadline(1);
419 call = grpc_channel_create_call(
420 g_ctx.client, nullptr, GRPC_PROPAGATE_DEFAULTS, g_ctx.client_cq,
421 grpc_slice_from_static_string("/foo"), nullptr, deadline, nullptr);
422 GPR_ASSERT(call);
423
424 grpc_metadata_array_init(&initial_metadata_recv);
425 grpc_metadata_array_init(&trailing_metadata_recv);
426 grpc_metadata_array_init(&request_metadata_recv);
427 grpc_call_details_init(&call_details);
428
429 memset(ops, 0, sizeof(ops));
430 op = ops;
431 op->op = GRPC_OP_SEND_INITIAL_METADATA;
432 op->data.send_initial_metadata.count = 0;
433 op->flags = 0;
434 op->reserved = nullptr;
435 op++;
436 op->op = GRPC_OP_SEND_MESSAGE;
437 op->data.send_message.send_message = request_payload;
438 op->flags = 0;
439 op->reserved = nullptr;
440 op++;
441 op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
442 op->flags = 0;
443 op->reserved = nullptr;
444 op++;
445 op->op = GRPC_OP_RECV_INITIAL_METADATA;
446 op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
447 op->flags = 0;
448 op->reserved = nullptr;
449 op++;
450 op->op = GRPC_OP_RECV_MESSAGE;
451 op->data.recv_message.recv_message = &response_payload_recv;
452 op->flags = 0;
453 op->reserved = nullptr;
454 op++;
455 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
456 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
457 op->data.recv_status_on_client.status = &status;
458 op->data.recv_status_on_client.status_details = &details;
459 op->flags = 0;
460 op->reserved = nullptr;
461 op++;
462 error = grpc_call_start_batch(call, ops, static_cast<size_t>(op - ops),
463 tag(1), nullptr);
464 GPR_ASSERT(GRPC_CALL_OK == error);
465
466 error = grpc_server_request_call(g_ctx.server, &server_call, &call_details,
467 &request_metadata_recv, g_ctx.bound_cq,
468 g_ctx.cq, tag(101));
469 GPR_ASSERT(GRPC_CALL_OK == error);
470 event = grpc_completion_queue_next(
471 g_ctx.cq, grpc_timeout_milliseconds_to_deadline(100), nullptr);
472 GPR_ASSERT(event.success == 1);
473 GPR_ASSERT(event.tag == tag(101));
474 GPR_ASSERT(event.type == GRPC_OP_COMPLETE);
475
476 memset(ops, 0, sizeof(ops));
477 op = ops;
478 op->op = GRPC_OP_SEND_INITIAL_METADATA;
479 op->data.send_initial_metadata.count = 0;
480 op->flags = 0;
481 op->reserved = nullptr;
482 op++;
483 op->op = GRPC_OP_RECV_MESSAGE;
484 op->data.recv_message.recv_message = &request_payload_recv;
485 op->flags = 0;
486 op->reserved = nullptr;
487 op++;
488 error = grpc_call_start_batch(server_call, ops, static_cast<size_t>(op - ops),
489 tag(102), nullptr);
490 GPR_ASSERT(GRPC_CALL_OK == error);
491
492 event = grpc_completion_queue_next(
493 g_ctx.bound_cq, grpc_timeout_milliseconds_to_deadline(100), nullptr);
494 GPR_ASSERT(event.type == GRPC_OP_COMPLETE);
495 GPR_ASSERT(event.success == 1);
496 GPR_ASSERT(event.tag == tag(102));
497
498 memset(ops, 0, sizeof(ops));
499 op = ops;
500 op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
501 op->data.recv_close_on_server.cancelled = &was_cancelled;
502 op->flags = 0;
503 op->reserved = nullptr;
504 op++;
505 op->op = GRPC_OP_SEND_MESSAGE;
506 op->data.send_message.send_message = response_payload;
507 op->flags = 0;
508 op->reserved = nullptr;
509 op++;
510 op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
511 op->data.send_status_from_server.trailing_metadata_count = 0;
512 op->data.send_status_from_server.status = GRPC_STATUS_OK;
513 grpc_slice status_details = grpc_slice_from_static_string("xyz");
514 op->data.send_status_from_server.status_details = &status_details;
515 op->flags = 0;
516 op->reserved = nullptr;
517 op++;
518
519 grpc_endpoint_pair* sfd = g_ctx.ep;
520 int fd;
521 if (fdtype == SERVER_FD) {
522 fd = sfd->server->vtable->get_fd(sfd->server);
523 } else {
524 GPR_ASSERT(fdtype == CLIENT_FD);
525 fd = sfd->client->vtable->get_fd(sfd->client);
526 }
527
528 // Connection is closed before the server sends message and status to the
529 // client.
530 close(fd);
531 error = grpc_call_start_batch(server_call, ops, static_cast<size_t>(op - ops),
532 tag(103), nullptr);
533 GPR_ASSERT(GRPC_CALL_OK == error);
534
535 // Batch operation succeeds on the server side
536 event = grpc_completion_queue_next(
537 g_ctx.bound_cq, grpc_timeout_milliseconds_to_deadline(100), nullptr);
538 GPR_ASSERT(event.type == GRPC_OP_COMPLETE);
539 GPR_ASSERT(event.success == 1);
540 GPR_ASSERT(event.tag == tag(103));
541
542 event = grpc_completion_queue_next(
543 g_ctx.client_cq, grpc_timeout_milliseconds_to_deadline(100), nullptr);
544 // In both cases server sends GRPC_STATUS_UNAVALABLE to the client. However,
545 // the client may not receive this grpc_status as it's socket is being closed.
546 // If the client didn't get grpc_status from the server it will time out
547 // waiting on the completion queue
548 //
549 if (event.type == GRPC_OP_COMPLETE) {
550 GPR_ASSERT(event.success == 1);
551 GPR_ASSERT(event.tag == tag(1));
552 GPR_ASSERT(status == GRPC_STATUS_UNAVAILABLE);
553 } else {
554 GPR_ASSERT(event.type == GRPC_QUEUE_TIMEOUT);
555 GPR_ASSERT(event.success == 0);
556 // status is not initialized
557 GPR_ASSERT(status == GRPC_STATUS__DO_NOT_USE);
558 }
559 GPR_ASSERT(was_cancelled == 0);
560
561 grpc_metadata_array_destroy(&initial_metadata_recv);
562 grpc_metadata_array_destroy(&trailing_metadata_recv);
563 grpc_metadata_array_destroy(&request_metadata_recv);
564 grpc_call_details_destroy(&call_details);
565
566 grpc_call_unref(call);
567 grpc_call_unref(server_call);
568
569 grpc_byte_buffer_destroy(request_payload);
570 grpc_byte_buffer_destroy(response_payload);
571 grpc_byte_buffer_destroy(request_payload_recv);
572 grpc_byte_buffer_destroy(response_payload_recv);
573
574 end_test();
575 }
576
test_close_before_server_send()577 static void test_close_before_server_send() {
578 // Close client side of the connection before server sends message to client
579 //
580 _test_close_before_server_send(CLIENT_FD);
581 // Close server side of the connection before server sends message to client
582 //
583 _test_close_before_server_send(SERVER_FD);
584 }
585
_test_close_before_client_send(fd_type fdtype)586 static void _test_close_before_client_send(fd_type fdtype) {
587 grpc_core::ExecCtx exec_ctx;
588 grpc_call* call;
589 grpc_event event;
590 grpc_slice request_payload_slice =
591 grpc_slice_from_copied_string("hello world");
592 grpc_slice response_payload_slice =
593 grpc_slice_from_copied_string("hello you");
594 grpc_byte_buffer* request_payload =
595 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
596 grpc_byte_buffer* response_payload =
597 grpc_raw_byte_buffer_create(&response_payload_slice, 1);
598 gpr_log(GPR_INFO, "Running test: test_close_%s_before_client_send",
599 fd_type_str(fdtype));
600 test_init();
601
602 grpc_op ops[6];
603 grpc_op* op;
604 grpc_metadata_array initial_metadata_recv;
605 grpc_metadata_array trailing_metadata_recv;
606 grpc_metadata_array request_metadata_recv;
607 grpc_byte_buffer* request_payload_recv = nullptr;
608 grpc_byte_buffer* response_payload_recv = nullptr;
609 grpc_call_details call_details;
610 grpc_status_code status;
611 grpc_call_error error;
612 grpc_slice details;
613
614 gpr_timespec deadline = grpc_timeout_seconds_to_deadline(1);
615 call = grpc_channel_create_call(
616 g_ctx.client, nullptr, GRPC_PROPAGATE_DEFAULTS, g_ctx.client_cq,
617 grpc_slice_from_static_string("/foo"), nullptr, deadline, nullptr);
618 GPR_ASSERT(call);
619
620 grpc_metadata_array_init(&initial_metadata_recv);
621 grpc_metadata_array_init(&trailing_metadata_recv);
622 grpc_metadata_array_init(&request_metadata_recv);
623 grpc_call_details_init(&call_details);
624
625 memset(ops, 0, sizeof(ops));
626 op = ops;
627 op->op = GRPC_OP_SEND_INITIAL_METADATA;
628 op->data.send_initial_metadata.count = 0;
629 op->flags = 0;
630 op->reserved = nullptr;
631 op++;
632 op->op = GRPC_OP_SEND_MESSAGE;
633 op->data.send_message.send_message = request_payload;
634 op->flags = 0;
635 op->reserved = nullptr;
636 op++;
637 op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
638 op->flags = 0;
639 op->reserved = nullptr;
640 op++;
641 op->op = GRPC_OP_RECV_INITIAL_METADATA;
642 op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
643 op->flags = 0;
644 op->reserved = nullptr;
645 op++;
646 op->op = GRPC_OP_RECV_MESSAGE;
647 op->data.recv_message.recv_message = &response_payload_recv;
648 op->flags = 0;
649 op->reserved = nullptr;
650 op++;
651 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
652 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
653 op->data.recv_status_on_client.status = &status;
654 op->data.recv_status_on_client.status_details = &details;
655 op->flags = 0;
656 op->reserved = nullptr;
657 op++;
658
659 grpc_endpoint_pair* sfd = g_ctx.ep;
660 int fd;
661 if (fdtype == SERVER_FD) {
662 fd = sfd->server->vtable->get_fd(sfd->server);
663 } else {
664 GPR_ASSERT(fdtype == CLIENT_FD);
665 fd = sfd->client->vtable->get_fd(sfd->client);
666 }
667 // Connection is closed before the client sends a batch to the server
668 close(fd);
669
670 error = grpc_call_start_batch(call, ops, static_cast<size_t>(op - ops),
671 tag(1), nullptr);
672 GPR_ASSERT(GRPC_CALL_OK == error);
673
674 // Status unavailable is returned to the client when client or server fd is
675 // closed
676 event = grpc_completion_queue_next(
677 g_ctx.client_cq, grpc_timeout_milliseconds_to_deadline(100), nullptr);
678 GPR_ASSERT(event.success == 1);
679 GPR_ASSERT(event.type == GRPC_OP_COMPLETE);
680 GPR_ASSERT(event.tag == tag(1));
681 GPR_ASSERT(status == GRPC_STATUS_UNAVAILABLE);
682
683 // No event is received on the server
684 event = grpc_completion_queue_next(
685 g_ctx.cq, grpc_timeout_milliseconds_to_deadline(100), nullptr);
686 GPR_ASSERT(event.success == 0);
687 GPR_ASSERT(event.type == GRPC_QUEUE_TIMEOUT);
688
689 grpc_slice_unref(details);
690 grpc_metadata_array_destroy(&initial_metadata_recv);
691 grpc_metadata_array_destroy(&trailing_metadata_recv);
692 grpc_metadata_array_destroy(&request_metadata_recv);
693 grpc_call_details_destroy(&call_details);
694
695 grpc_call_unref(call);
696
697 grpc_byte_buffer_destroy(request_payload);
698 grpc_byte_buffer_destroy(response_payload);
699 grpc_byte_buffer_destroy(request_payload_recv);
700 grpc_byte_buffer_destroy(response_payload_recv);
701
702 end_test();
703 }
test_close_before_client_send()704 static void test_close_before_client_send() {
705 // Close client side of the connection before client sends message to server
706 //
707 _test_close_before_client_send(CLIENT_FD);
708 // Close server side of the connection before client sends message to server
709 //
710 _test_close_before_client_send(SERVER_FD);
711 }
712
_test_close_before_call_create(fd_type fdtype)713 static void _test_close_before_call_create(fd_type fdtype) {
714 grpc_core::ExecCtx exec_ctx;
715 grpc_call* call;
716 grpc_event event;
717 test_init();
718
719 gpr_timespec deadline = grpc_timeout_milliseconds_to_deadline(100);
720
721 grpc_endpoint_pair* sfd = g_ctx.ep;
722 int fd;
723 if (fdtype == SERVER_FD) {
724 fd = sfd->server->vtable->get_fd(sfd->server);
725 } else {
726 GPR_ASSERT(fdtype == CLIENT_FD);
727 fd = sfd->client->vtable->get_fd(sfd->client);
728 }
729 // Connection is closed before the client creates a call
730 close(fd);
731
732 call = grpc_channel_create_call(
733 g_ctx.client, nullptr, GRPC_PROPAGATE_DEFAULTS, g_ctx.client_cq,
734 grpc_slice_from_static_string("/foo"), nullptr, deadline, nullptr);
735 GPR_ASSERT(call);
736
737 // Client and server time out waiting on their completion queues and nothing
738 // is sent or received
739 event = grpc_completion_queue_next(
740 g_ctx.client_cq, grpc_timeout_milliseconds_to_deadline(100), nullptr);
741 GPR_ASSERT(event.type == GRPC_QUEUE_TIMEOUT);
742 GPR_ASSERT(event.success == 0);
743
744 event = grpc_completion_queue_next(
745 g_ctx.cq, grpc_timeout_milliseconds_to_deadline(100), nullptr);
746 GPR_ASSERT(event.type == GRPC_QUEUE_TIMEOUT);
747 GPR_ASSERT(event.success == 0);
748
749 grpc_call_unref(call);
750 end_test();
751 }
752
test_close_before_call_create()753 static void test_close_before_call_create() {
754 // Close client side of the connection before client creates a call
755 _test_close_before_call_create(CLIENT_FD);
756 // Close server side of the connection before client creates a call
757 _test_close_before_call_create(SERVER_FD);
758 }
759
main(int argc,char ** argv)760 int main(int argc, char** argv) {
761 grpc::testing::TestEnvironment env(&argc, argv);
762 // Init grpc
763 grpc_init();
764 int iterations = 10;
765
766 for (int i = 0; i < iterations; ++i) {
767 test_close_before_call_create();
768 test_close_before_client_send();
769 test_close_before_server_recv();
770 test_close_before_server_send();
771 }
772
773 grpc_shutdown();
774
775 return 0;
776 }
777
778 #else // GRPC_POSIX_SOCKET_TCP
779
main(int argc,char ** argv)780 int main(int argc, char** argv) { return 1; }
781
782 #endif // GRPC_POSIX_SOCKET_TCP
783