xref: /aosp_15_r20/external/grpc-grpc/src/ruby/spec/generic/rpc_server_spec.rb (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1# Copyright 2015 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14require 'spec_helper'
15
16def load_test_certs
17  test_root = File.join(File.dirname(File.dirname(__FILE__)), 'testdata')
18  files = ['ca.pem', 'server1.key', 'server1.pem']
19  files.map { |f| File.open(File.join(test_root, f)).read }
20end
21
22def check_md(wanted_md, received_md)
23  wanted_md.zip(received_md).each do |w, r|
24    w.each do |key, value|
25      expect(r[key]).to eq(value)
26    end
27  end
28end
29
30# A test service with no methods.
31class EmptyService
32  include GRPC::GenericService
33end
34
35# A test service without an implementation.
36class NoRpcImplementation
37  include GRPC::GenericService
38  rpc :an_rpc, EchoMsg, EchoMsg
39end
40
41# A test service with an implementation that fails with BadStatus
42class FailingService
43  include GRPC::GenericService
44  rpc :an_rpc, EchoMsg, EchoMsg
45  attr_reader :details, :code, :md
46
47  def initialize(_default_var = 'ignored')
48    @details = 'app error'
49    @code = 101
50    @md = { 'failed_method' => 'an_rpc' }
51  end
52
53  def an_rpc(_req, _call)
54    fail GRPC::BadStatus.new(@code, @details, @md)
55  end
56end
57
58FailingStub = FailingService.rpc_stub_class
59
60# A slow test service.
61class SlowService
62  include GRPC::GenericService
63  rpc :an_rpc, EchoMsg, EchoMsg
64  rpc :a_server_streaming_rpc, EchoMsg, stream(EchoMsg)
65  attr_reader :received_md, :delay
66
67  def initialize(_default_var = 'ignored')
68    @delay = 0.25
69    @received_md = []
70  end
71
72  def an_rpc(req, call)
73    GRPC.logger.info("starting a slow #{@delay} rpc")
74    sleep @delay
75    @received_md << call.metadata unless call.metadata.nil?
76    req  # send back the req as the response
77  end
78
79  def a_server_streaming_rpc(_, call)
80    GRPC.logger.info("starting a slow #{@delay} server streaming rpc")
81    sleep @delay
82    @received_md << call.metadata unless call.metadata.nil?
83    [EchoMsg.new, EchoMsg.new]
84  end
85end
86
87SlowStub = SlowService.rpc_stub_class
88
89# A test service that allows a synchronized RPC cancellation
90class SynchronizedCancellationService
91  include GRPC::GenericService
92  rpc :an_rpc, EchoMsg, EchoMsg
93  attr_reader :received_md, :delay
94
95  # notify_request_received and wait_until_rpc_cancelled are
96  # callbacks to synchronously allow the client to proceed with
97  # cancellation (after the unary request has been received),
98  # and to synchronously wait until the client has cancelled the
99  # current RPC.
100  def initialize(notify_request_received, wait_until_rpc_cancelled)
101    @notify_request_received = notify_request_received
102    @wait_until_rpc_cancelled = wait_until_rpc_cancelled
103  end
104
105  def an_rpc(req, _call)
106    GRPC.logger.info('starting a synchronusly cancelled rpc')
107    @notify_request_received.call(req)
108    @wait_until_rpc_cancelled.call
109    req  # send back the req as the response
110  end
111end
112
113SynchronizedCancellationStub = SynchronizedCancellationService.rpc_stub_class
114
115# a test service that holds onto call objects
116# and uses them after the server-side call has been
117# finished
118class CheckCallAfterFinishedService
119  include GRPC::GenericService
120  rpc :an_rpc, EchoMsg, EchoMsg
121  rpc :a_client_streaming_rpc, stream(EchoMsg), EchoMsg
122  rpc :a_server_streaming_rpc, EchoMsg, stream(EchoMsg)
123  rpc :a_bidi_rpc, stream(EchoMsg), stream(EchoMsg)
124  attr_reader :server_side_call
125
126  def an_rpc(req, call)
127    fail 'shouldnt reuse service' unless @server_side_call.nil?
128    @server_side_call = call
129    req
130  end
131
132  def a_client_streaming_rpc(call)
133    fail 'shouldnt reuse service' unless @server_side_call.nil?
134    @server_side_call = call
135    # iterate through requests so call can complete
136    call.each_remote_read.each { |r| GRPC.logger.info(r) }
137    EchoMsg.new
138  end
139
140  def a_server_streaming_rpc(_, call)
141    fail 'shouldnt reuse service' unless @server_side_call.nil?
142    @server_side_call = call
143    [EchoMsg.new, EchoMsg.new]
144  end
145
146  def a_bidi_rpc(requests, call)
147    fail 'shouldnt reuse service' unless @server_side_call.nil?
148    @server_side_call = call
149    requests.each { |r| GRPC.logger.info(r) }
150    [EchoMsg.new, EchoMsg.new]
151  end
152end
153
154CheckCallAfterFinishedServiceStub = CheckCallAfterFinishedService.rpc_stub_class
155
156# A service with a bidi streaming method.
157class BidiService
158  include GRPC::GenericService
159  rpc :server_sends_bad_input, stream(EchoMsg), stream(EchoMsg)
160
161  def server_sends_bad_input(_, _)
162    'bad response. (not an enumerable, client sees an error)'
163  end
164end
165
166BidiStub = BidiService.rpc_stub_class
167
168describe GRPC::RpcServer do
169  RpcServer = GRPC::RpcServer
170  StatusCodes = GRPC::Core::StatusCodes
171
172  before(:each) do
173    @method = 'an_rpc_method'
174    @pass = 0
175    @fail = 1
176    @noop = proc { |x| x }
177  end
178
179  describe '#new' do
180    it 'can be created with just some args' do
181      opts = { server_args: { a_channel_arg: 'an_arg' } }
182      blk = proc do
183        new_rpc_server_for_testing(**opts)
184      end
185      expect(&blk).not_to raise_error
186    end
187
188    it 'cannot be created with invalid ServerCredentials' do
189      blk = proc do
190        opts = {
191          server_args: { a_channel_arg: 'an_arg' },
192          creds: Object.new
193        }
194        new_rpc_server_for_testing(**opts)
195      end
196      expect(&blk).to raise_error
197    end
198  end
199
200  describe '#stopped?' do
201    before(:each) do
202      opts = { server_args: { a_channel_arg: 'an_arg' }, poll_period: 1.5 }
203      @srv = new_rpc_server_for_testing(**opts)
204      @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
205    end
206
207    it 'starts out false' do
208      expect(@srv.stopped?).to be(false)
209    end
210
211    it 'stays false after the server starts running', server: true do
212      @srv.handle(EchoService)
213      t = Thread.new { @srv.run }
214      @srv.wait_till_running
215      expect(@srv.stopped?).to be(false)
216      @srv.stop
217      t.join
218    end
219
220    it 'is true after a running server is stopped', server: true do
221      @srv.handle(EchoService)
222      t = Thread.new { @srv.run }
223      @srv.wait_till_running
224      @srv.stop
225      t.join
226      expect(@srv.stopped?).to be(true)
227    end
228  end
229
230  describe '#running?' do
231    it 'starts out false' do
232      opts = {
233        server_args: { a_channel_arg: 'an_arg' }
234      }
235      r = new_rpc_server_for_testing(**opts)
236      expect(r.running?).to be(false)
237    end
238
239    it 'is false if run is called with no services registered', server: true do
240      opts = {
241        server_args: { a_channel_arg: 'an_arg' },
242        poll_period: 2
243      }
244      r = new_rpc_server_for_testing(**opts)
245      r.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
246      expect { r.run }.to raise_error(RuntimeError)
247    end
248
249    it 'is true after run is called with a registered service' do
250      opts = {
251        server_args: { a_channel_arg: 'an_arg' },
252        poll_period: 2.5
253      }
254      r = new_rpc_server_for_testing(**opts)
255      r.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
256      r.handle(EchoService)
257      t = Thread.new { r.run }
258      r.wait_till_running
259      expect(r.running?).to be(true)
260      r.stop
261      t.join
262    end
263  end
264
265  describe '#handle' do
266    before(:each) do
267      @opts = { server_args: { a_channel_arg: 'an_arg' }, poll_period: 1 }
268      @srv = new_rpc_server_for_testing(**@opts)
269      @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
270    end
271
272    it 'raises if #run has already been called' do
273      @srv.handle(EchoService)
274      t = Thread.new { @srv.run }
275      @srv.wait_till_running
276      expect { @srv.handle(EchoService) }.to raise_error
277      @srv.stop
278      t.join
279    end
280
281    it 'raises if the server has been run and stopped' do
282      @srv.handle(EchoService)
283      t = Thread.new { @srv.run }
284      @srv.wait_till_running
285      @srv.stop
286      t.join
287      expect { @srv.handle(EchoService) }.to raise_error
288    end
289
290    it 'raises if the service does not include GenericService ' do
291      expect { @srv.handle(Object) }.to raise_error
292    end
293
294    it 'raises if the service does not declare any rpc methods' do
295      expect { @srv.handle(EmptyService) }.to raise_error
296    end
297
298    it 'raises if a handler method is already registered' do
299      @srv.handle(EchoService)
300      expect { r.handle(EchoService) }.to raise_error
301    end
302  end
303
304  describe '#run' do
305    let(:client_opts) { { channel_override: @ch } }
306    let(:marshal) { EchoService.rpc_descs[:an_rpc].marshal_proc }
307    let(:unmarshal) { EchoService.rpc_descs[:an_rpc].unmarshal_proc(:output) }
308
309    context 'with no connect_metadata' do
310      before(:each) do
311        server_opts = {
312          poll_period: 1
313        }
314        @srv = new_rpc_server_for_testing(**server_opts)
315        server_port = @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
316        @host = "localhost:#{server_port}"
317        @ch = GRPC::Core::Channel.new(@host, nil, :this_channel_is_insecure)
318      end
319
320      it 'should return NOT_FOUND status on unknown methods', server: true do
321        @srv.handle(EchoService)
322        t = Thread.new { @srv.run }
323        @srv.wait_till_running
324        req = EchoMsg.new
325        blk = proc do
326          stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure,
327                                      **client_opts)
328          stub.request_response('/unknown', req, marshal, unmarshal)
329        end
330        expect(&blk).to raise_error GRPC::BadStatus
331        @srv.stop
332        t.join
333      end
334
335      it 'should return UNIMPLEMENTED on unimplemented methods', server: true do
336        @srv.handle(NoRpcImplementation)
337        t = Thread.new { @srv.run }
338        @srv.wait_till_running
339        req = EchoMsg.new
340        blk = proc do
341          stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure,
342                                      **client_opts)
343          stub.request_response('/an_rpc', req, marshal, unmarshal)
344        end
345        expect(&blk).to raise_error do |error|
346          expect(error).to be_a(GRPC::BadStatus)
347          expect(error.code).to be(GRPC::Core::StatusCodes::UNIMPLEMENTED)
348        end
349        @srv.stop
350        t.join
351      end
352
353      it 'should return UNIMPLEMENTED on unimplemented ' \
354         'methods for client_streamer', server: true do
355        @srv.handle(EchoService)
356        t = Thread.new { @srv.run }
357        @srv.wait_till_running
358        blk = proc do
359          stub = EchoStub.new(@host, :this_channel_is_insecure, **client_opts)
360          requests = [EchoMsg.new, EchoMsg.new]
361          stub.a_client_streaming_rpc_unimplemented(requests)
362        end
363
364        begin
365          expect(&blk).to raise_error do |error|
366            expect(error).to be_a(GRPC::BadStatus)
367            expect(error.code).to eq(GRPC::Core::StatusCodes::UNIMPLEMENTED)
368          end
369        ensure
370          @srv.stop # should be call not to crash
371          t.join
372        end
373      end
374
375      it 'should handle multiple sequential requests', server: true do
376        @srv.handle(EchoService)
377        t = Thread.new { @srv.run }
378        @srv.wait_till_running
379        req = EchoMsg.new
380        n = 5  # arbitrary
381        stub = EchoStub.new(@host, :this_channel_is_insecure, **client_opts)
382        n.times { expect(stub.an_rpc(req)).to be_a(EchoMsg) }
383        @srv.stop
384        t.join
385      end
386
387      it 'should receive metadata sent as rpc keyword args', server: true do
388        service = EchoService.new
389        @srv.handle(service)
390        t = Thread.new { @srv.run }
391        @srv.wait_till_running
392        req = EchoMsg.new
393        stub = EchoStub.new(@host, :this_channel_is_insecure, **client_opts)
394        expect(stub.an_rpc(req, metadata: { k1: 'v1', k2: 'v2' }))
395          .to be_a(EchoMsg)
396        wanted_md = [{ 'k1' => 'v1', 'k2' => 'v2' }]
397        check_md(wanted_md, service.received_md)
398        @srv.stop
399        t.join
400      end
401
402      it 'should receive metadata if a deadline is specified', server: true do
403        service = SlowService.new
404        @srv.handle(service)
405        t = Thread.new { @srv.run }
406        @srv.wait_till_running
407        req = EchoMsg.new
408        stub = SlowStub.new(@host, :this_channel_is_insecure, **client_opts)
409        timeout = service.delay + 1.0
410        deadline = GRPC::Core::TimeConsts.from_relative_time(timeout)
411        resp = stub.an_rpc(req,
412                           deadline: deadline,
413                           metadata: { k1: 'v1', k2: 'v2' })
414        expect(resp).to be_a(EchoMsg)
415        wanted_md = [{ 'k1' => 'v1', 'k2' => 'v2' }]
416        check_md(wanted_md, service.received_md)
417        @srv.stop
418        t.join
419      end
420
421      it 'should raise DeadlineExceeded', server: true do
422        service = SlowService.new
423        @srv.handle(service)
424        t = Thread.new { @srv.run }
425        @srv.wait_till_running
426        req = EchoMsg.new
427        stub = SlowStub.new(@host, :this_channel_is_insecure, **client_opts)
428        timeout = service.delay - 0.1
429        deadline = GRPC::Core::TimeConsts.from_relative_time(timeout)
430        responses = stub.a_server_streaming_rpc(req,
431                                                deadline: deadline,
432                                                metadata: { k1: 'v1', k2: 'v2' })
433        expect { responses.to_a }.to raise_error(GRPC::DeadlineExceeded)
434        @srv.stop
435        t.join
436      end
437
438      it 'should handle cancellation correctly', server: true do
439        request_received = false
440        request_received_mu = Mutex.new
441        request_received_cv = ConditionVariable.new
442        notify_request_received = proc do |req|
443          request_received_mu.synchronize do
444            fail 'req is nil' if req.nil?
445            expect(req.is_a?(EchoMsg)).to be true
446            fail 'test bug - already set' if request_received
447            request_received = true
448            request_received_cv.signal
449          end
450        end
451
452        rpc_cancelled = false
453        rpc_cancelled_mu = Mutex.new
454        rpc_cancelled_cv = ConditionVariable.new
455        wait_until_rpc_cancelled = proc do
456          rpc_cancelled_mu.synchronize do
457            loop do
458              break if rpc_cancelled
459              rpc_cancelled_cv.wait(rpc_cancelled_mu)
460            end
461          end
462        end
463
464        service = SynchronizedCancellationService.new(notify_request_received,
465                                                      wait_until_rpc_cancelled)
466        @srv.handle(service)
467        srv_thd = Thread.new { @srv.run }
468        @srv.wait_till_running
469        req = EchoMsg.new
470        stub = SynchronizedCancellationStub.new(@host,
471                                                :this_channel_is_insecure,
472                                                **client_opts)
473        op = stub.an_rpc(req, return_op: true)
474
475        client_thd = Thread.new do
476          expect { op.execute }.to raise_error GRPC::Cancelled
477        end
478
479        request_received_mu.synchronize do
480          loop do
481            break if request_received
482            request_received_cv.wait(request_received_mu)
483          end
484        end
485
486        op.cancel
487
488        rpc_cancelled_mu.synchronize do
489          fail 'test bug - already set' if rpc_cancelled
490          rpc_cancelled = true
491          rpc_cancelled_cv.signal
492        end
493
494        client_thd.join
495        @srv.stop
496        srv_thd.join
497      end
498
499      it 'should handle multiple parallel requests', server: true do
500        @srv.handle(EchoService)
501        t = Thread.new { @srv.run }
502        @srv.wait_till_running
503        req, q = EchoMsg.new, Queue.new
504        n = 5  # arbitrary
505        threads = [t]
506        n.times do
507          threads << Thread.new do
508            stub = EchoStub.new(@host, :this_channel_is_insecure, **client_opts)
509            q << stub.an_rpc(req)
510          end
511        end
512        n.times { expect(q.pop).to be_a(EchoMsg) }
513        @srv.stop
514        threads.each(&:join)
515      end
516
517      it 'should return RESOURCE_EXHAUSTED on too many jobs', server: true do
518        opts = {
519          server_args: { a_channel_arg: 'an_arg' },
520          pool_size: 2,
521          poll_period: 1,
522          max_waiting_requests: 1
523        }
524        alt_srv = new_rpc_server_for_testing(**opts)
525        alt_srv.handle(SlowService)
526        alt_port = alt_srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
527        alt_host = "0.0.0.0:#{alt_port}"
528        t = Thread.new { alt_srv.run }
529        alt_srv.wait_till_running
530        req = EchoMsg.new
531        n = 20 # arbitrary, use as many to ensure the server pool is exceeded
532        threads = []
533        one_failed_as_unavailable = false
534        n.times do
535          threads << Thread.new do
536            stub = SlowStub.new(alt_host, :this_channel_is_insecure)
537            begin
538              stub.an_rpc(req)
539            rescue GRPC::ResourceExhausted
540              one_failed_as_unavailable = true
541            end
542          end
543        end
544        threads.each(&:join)
545        alt_srv.stop
546        t.join
547        expect(one_failed_as_unavailable).to be(true)
548      end
549
550      it 'should send a status UNKNOWN with a relevant message when the' \
551        'servers response stream is not an enumerable' do
552        @srv.handle(BidiService)
553        t = Thread.new { @srv.run }
554        @srv.wait_till_running
555        stub = BidiStub.new(@host, :this_channel_is_insecure, **client_opts)
556        responses = stub.server_sends_bad_input([])
557        exception = nil
558        begin
559          responses.each { |r| r }
560        rescue GRPC::Unknown => e
561          exception = e
562        end
563        # Erroneous responses sent from the server handler should cause an
564        # exception on the client with relevant info.
565        expected_details = 'NoMethodError: undefined method `each\' for '\
566          '"bad response. (not an enumerable, client sees an error)"'
567
568        expect(exception.inspect.include?(expected_details)).to be true
569        @srv.stop
570        t.join
571      end
572    end
573
574    context 'with connect metadata' do
575      let(:test_md_proc) do
576        proc do |mth, md|
577          res = md.clone
578          res['method'] = mth
579          res['connect_k1'] = 'connect_v1'
580          res
581        end
582      end
583      before(:each) do
584        server_opts = {
585          poll_period: 1,
586          connect_md_proc: test_md_proc
587        }
588        @srv = new_rpc_server_for_testing(**server_opts)
589        alt_port = @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
590        @alt_host = "0.0.0.0:#{alt_port}"
591      end
592
593      it 'should send connect metadata to the client', server: true do
594        service = EchoService.new
595        @srv.handle(service)
596        t = Thread.new { @srv.run }
597        @srv.wait_till_running
598        req = EchoMsg.new
599        stub = EchoStub.new(@alt_host, :this_channel_is_insecure)
600        op = stub.an_rpc(req, metadata: { k1: 'v1', k2: 'v2' }, return_op: true)
601        expect(op.metadata).to be nil
602        expect(op.execute).to be_a(EchoMsg)
603        wanted_md = {
604          'k1' => 'v1',
605          'k2' => 'v2',
606          'method' => '/EchoService/an_rpc',
607          'connect_k1' => 'connect_v1'
608        }
609        wanted_md.each do |key, value|
610          GRPC.logger.info("key: #{key}")
611          expect(op.metadata[key]).to eq(value)
612        end
613        @srv.stop
614        t.join
615      end
616    end
617
618    context 'with trailing metadata' do
619      before(:each) do
620        server_opts = {
621          poll_period: 1
622        }
623        @srv = new_rpc_server_for_testing(**server_opts)
624        alt_port = @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
625        @alt_host = "0.0.0.0:#{alt_port}"
626      end
627
628      it 'should be added to BadStatus when requests fail', server: true do
629        service = FailingService.new
630        @srv.handle(service)
631        t = Thread.new { @srv.run }
632        @srv.wait_till_running
633        req = EchoMsg.new
634        stub = FailingStub.new(@alt_host, :this_channel_is_insecure)
635        blk = proc { stub.an_rpc(req) }
636
637        # confirm it raise the expected error
638        expect(&blk).to raise_error GRPC::BadStatus
639
640        # call again and confirm exception contained the trailing metadata.
641        begin
642          blk.call
643        rescue GRPC::BadStatus => e
644          expect(e.code).to eq(service.code)
645          expect(e.details).to eq(service.details)
646          expect(e.metadata).to eq(service.md)
647        end
648        @srv.stop
649        t.join
650      end
651
652      it 'should be received by the client', server: true do
653        wanted_trailers = { 'k1' => 'out_v1', 'k2' => 'out_v2' }
654        service = EchoService.new(k1: 'out_v1', k2: 'out_v2')
655        @srv.handle(service)
656        t = Thread.new { @srv.run }
657        @srv.wait_till_running
658        req = EchoMsg.new
659        stub = EchoStub.new(@alt_host, :this_channel_is_insecure)
660        op = stub.an_rpc(req, return_op: true, metadata: { k1: 'v1', k2: 'v2' })
661        expect(op.metadata).to be nil
662        expect(op.execute).to be_a(EchoMsg)
663        expect(op.trailing_metadata).to eq(wanted_trailers)
664        @srv.stop
665        t.join
666      end
667    end
668
669    context 'when call objects are used after calls have completed' do
670      before(:each) do
671        server_opts = {
672          poll_period: 1
673        }
674        @srv = new_rpc_server_for_testing(**server_opts)
675        alt_port = @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
676        @alt_host = "0.0.0.0:#{alt_port}"
677
678        @service = CheckCallAfterFinishedService.new
679        @srv.handle(@service)
680        @srv_thd  = Thread.new { @srv.run }
681        @srv.wait_till_running
682      end
683
684      # check that the server-side call is still in a usable state even
685      # after it has finished
686      def check_single_req_view_of_finished_call(call)
687        common_check_of_finished_server_call(call)
688
689        expect(call.peer).to be_a(String)
690        expect(call.peer_cert).to be(nil)
691      end
692
693      def check_multi_req_view_of_finished_call(call)
694        common_check_of_finished_server_call(call)
695
696        l = []
697        call.each_remote_read.each { |r| l << r }
698        expect(l.size).to eq(0)
699      end
700
701      def common_check_of_finished_server_call(call)
702        expect do
703          call.merge_metadata_to_send({})
704        end.to raise_error(RuntimeError)
705
706        expect do
707          call.send_initial_metadata
708        end.to_not raise_error
709
710        expect(call.cancelled?).to be(false)
711        expect(call.metadata).to be_a(Hash)
712        expect(call.metadata['user-agent']).to be_a(String)
713
714        expect(call.metadata_sent).to be(true)
715        expect(call.output_metadata).to eq({})
716        expect(call.metadata_to_send).to eq({})
717        expect(call.deadline.is_a?(Time)).to be(true)
718      end
719
720      it 'should not crash when call used after an unary call is finished' do
721        req = EchoMsg.new
722        stub = CheckCallAfterFinishedServiceStub.new(@alt_host,
723                                                     :this_channel_is_insecure)
724        resp = stub.an_rpc(req)
725        expect(resp).to be_a(EchoMsg)
726        @srv.stop
727        @srv_thd.join
728
729        check_single_req_view_of_finished_call(@service.server_side_call)
730      end
731
732      it 'should not crash when call used after client streaming finished' do
733        requests = [EchoMsg.new, EchoMsg.new]
734        stub = CheckCallAfterFinishedServiceStub.new(@alt_host,
735                                                     :this_channel_is_insecure)
736        resp = stub.a_client_streaming_rpc(requests)
737        expect(resp).to be_a(EchoMsg)
738        @srv.stop
739        @srv_thd.join
740
741        check_multi_req_view_of_finished_call(@service.server_side_call)
742      end
743
744      it 'should not crash when call used after server streaming finished' do
745        req = EchoMsg.new
746        stub = CheckCallAfterFinishedServiceStub.new(@alt_host,
747                                                     :this_channel_is_insecure)
748        responses = stub.a_server_streaming_rpc(req)
749        responses.each do |r|
750          expect(r).to be_a(EchoMsg)
751        end
752        @srv.stop
753        @srv_thd.join
754
755        check_single_req_view_of_finished_call(@service.server_side_call)
756      end
757
758      it 'should not crash when call used after a bidi call is finished' do
759        requests = [EchoMsg.new, EchoMsg.new]
760        stub = CheckCallAfterFinishedServiceStub.new(@alt_host,
761                                                     :this_channel_is_insecure)
762        responses = stub.a_bidi_rpc(requests)
763        responses.each do |r|
764          expect(r).to be_a(EchoMsg)
765        end
766        @srv.stop
767        @srv_thd.join
768
769        check_multi_req_view_of_finished_call(@service.server_side_call)
770      end
771    end
772  end
773end
774