1*f585d8a3SJacky Wang /* 2*f585d8a3SJacky Wang * Copyright (C) 2016 The Dagger Authors. 3*f585d8a3SJacky Wang * 4*f585d8a3SJacky Wang * Licensed under the Apache License, Version 2.0 (the "License"); 5*f585d8a3SJacky Wang * you may not use this file except in compliance with the License. 6*f585d8a3SJacky Wang * You may obtain a copy of the License at 7*f585d8a3SJacky Wang * 8*f585d8a3SJacky Wang * http://www.apache.org/licenses/LICENSE-2.0 9*f585d8a3SJacky Wang * 10*f585d8a3SJacky Wang * Unless required by applicable law or agreed to in writing, software 11*f585d8a3SJacky Wang * distributed under the License is distributed on an "AS IS" BASIS, 12*f585d8a3SJacky Wang * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13*f585d8a3SJacky Wang * See the License for the specific language governing permissions and 14*f585d8a3SJacky Wang * limitations under the License. 15*f585d8a3SJacky Wang */ 16*f585d8a3SJacky Wang 17*f585d8a3SJacky Wang package dagger.grpc.server; 18*f585d8a3SJacky Wang 19*f585d8a3SJacky Wang import io.grpc.Metadata; 20*f585d8a3SJacky Wang import io.grpc.MethodDescriptor; 21*f585d8a3SJacky Wang import io.grpc.MethodDescriptor.Marshaller; 22*f585d8a3SJacky Wang import io.grpc.ServerCall; 23*f585d8a3SJacky Wang import io.grpc.ServerCall.Listener; 24*f585d8a3SJacky Wang import io.grpc.ServerCallHandler; 25*f585d8a3SJacky Wang import io.grpc.ServerMethodDefinition; 26*f585d8a3SJacky Wang import io.grpc.ServerServiceDefinition; 27*f585d8a3SJacky Wang import io.grpc.Status; 28*f585d8a3SJacky Wang import java.io.InputStream; 29*f585d8a3SJacky Wang 30*f585d8a3SJacky Wang /** 31*f585d8a3SJacky Wang * A {@link ServerCallHandler} that handles calls for a particular method by delegating to a handler 32*f585d8a3SJacky Wang * in a {@link ServerServiceDefinition} returned by a factory. 33*f585d8a3SJacky Wang * 34*f585d8a3SJacky Wang * @param <RequestT> the type of the request payloads 35*f585d8a3SJacky Wang * @param <ResponseT> the type of the response payloads 36*f585d8a3SJacky Wang */ 37*f585d8a3SJacky Wang public final class ProxyServerCallHandler<RequestT, ResponseT> 38*f585d8a3SJacky Wang implements ServerCallHandler<InputStream, InputStream> { 39*f585d8a3SJacky Wang 40*f585d8a3SJacky Wang /** 41*f585d8a3SJacky Wang * A factory for the {@link ServerServiceDefinition} that a {@link ProxyServerCallHandler} 42*f585d8a3SJacky Wang * delegates to. 43*f585d8a3SJacky Wang */ 44*f585d8a3SJacky Wang public interface ServiceDefinitionFactory { 45*f585d8a3SJacky Wang /** 46*f585d8a3SJacky Wang * Returns a service definition that contains a {@link ServerCallHandler} for the 47*f585d8a3SJacky Wang * {@link ProxyServerCallHandler}'s method. 48*f585d8a3SJacky Wang */ getServiceDefinition(Metadata headers)49*f585d8a3SJacky Wang ServerServiceDefinition getServiceDefinition(Metadata headers); 50*f585d8a3SJacky Wang } 51*f585d8a3SJacky Wang 52*f585d8a3SJacky Wang private final MethodDescriptor<RequestT, ResponseT> delegateMethodDescriptor; 53*f585d8a3SJacky Wang private final ServiceDefinitionFactory delegateServiceDefinitionFactory; 54*f585d8a3SJacky Wang 55*f585d8a3SJacky Wang /** 56*f585d8a3SJacky Wang * Returns a proxy method definition for {@code methodDescriptor}. 57*f585d8a3SJacky Wang * 58*f585d8a3SJacky Wang * @param delegateServiceDefinitionFactory factory for the delegate service definition 59*f585d8a3SJacky Wang */ proxyMethod( MethodDescriptor<RequestT, ResponseT> delegateMethodDescriptor, ServiceDefinitionFactory delegateServiceDefinitionFactory)60*f585d8a3SJacky Wang public static <RequestT, ResponseT> ServerMethodDefinition<InputStream, InputStream> proxyMethod( 61*f585d8a3SJacky Wang MethodDescriptor<RequestT, ResponseT> delegateMethodDescriptor, 62*f585d8a3SJacky Wang ServiceDefinitionFactory delegateServiceDefinitionFactory) { 63*f585d8a3SJacky Wang return ServerMethodDefinition.create( 64*f585d8a3SJacky Wang MethodDescriptor.create( 65*f585d8a3SJacky Wang delegateMethodDescriptor.getType(), 66*f585d8a3SJacky Wang delegateMethodDescriptor.getFullMethodName(), 67*f585d8a3SJacky Wang IDENTITY_MARSHALLER, 68*f585d8a3SJacky Wang IDENTITY_MARSHALLER), 69*f585d8a3SJacky Wang new ProxyServerCallHandler<>(delegateMethodDescriptor, delegateServiceDefinitionFactory)); 70*f585d8a3SJacky Wang } 71*f585d8a3SJacky Wang ProxyServerCallHandler( MethodDescriptor<RequestT, ResponseT> delegateMethodDescriptor, ServiceDefinitionFactory delegateServiceDefinitionFactory)72*f585d8a3SJacky Wang ProxyServerCallHandler( 73*f585d8a3SJacky Wang MethodDescriptor<RequestT, ResponseT> delegateMethodDescriptor, 74*f585d8a3SJacky Wang ServiceDefinitionFactory delegateServiceDefinitionFactory) { 75*f585d8a3SJacky Wang this.delegateMethodDescriptor = delegateMethodDescriptor; 76*f585d8a3SJacky Wang this.delegateServiceDefinitionFactory = delegateServiceDefinitionFactory; 77*f585d8a3SJacky Wang } 78*f585d8a3SJacky Wang 79*f585d8a3SJacky Wang @Override startCall( ServerCall<InputStream, InputStream> call, Metadata headers)80*f585d8a3SJacky Wang public Listener<InputStream> startCall( 81*f585d8a3SJacky Wang ServerCall<InputStream, InputStream> call, 82*f585d8a3SJacky Wang Metadata headers) { 83*f585d8a3SJacky Wang ServerMethodDefinition<RequestT, ResponseT> delegateMethod = getMethodDefinition(headers); 84*f585d8a3SJacky Wang Listener<RequestT> delegateListener = 85*f585d8a3SJacky Wang delegateMethod 86*f585d8a3SJacky Wang .getServerCallHandler() 87*f585d8a3SJacky Wang .startCall(new ServerCallAdapter(call, delegateMethod.getMethodDescriptor()), headers); 88*f585d8a3SJacky Wang return new ServerCallListenerAdapter(delegateListener); 89*f585d8a3SJacky Wang } 90*f585d8a3SJacky Wang 91*f585d8a3SJacky Wang @SuppressWarnings("unchecked") // Method definition is the correct type. getMethodDefinition(Metadata headers)92*f585d8a3SJacky Wang private ServerMethodDefinition<RequestT, ResponseT> getMethodDefinition(Metadata headers) { 93*f585d8a3SJacky Wang String fullMethodName = delegateMethodDescriptor.getFullMethodName(); 94*f585d8a3SJacky Wang for (ServerMethodDefinition<?, ?> methodDefinition : 95*f585d8a3SJacky Wang delegateServiceDefinitionFactory.getServiceDefinition(headers).getMethods()) { 96*f585d8a3SJacky Wang if (methodDefinition.getMethodDescriptor().getFullMethodName().equals(fullMethodName)) { 97*f585d8a3SJacky Wang return (ServerMethodDefinition<RequestT, ResponseT>) methodDefinition; 98*f585d8a3SJacky Wang } 99*f585d8a3SJacky Wang } 100*f585d8a3SJacky Wang throw new IllegalStateException("Could not find " + fullMethodName); 101*f585d8a3SJacky Wang } 102*f585d8a3SJacky Wang 103*f585d8a3SJacky Wang private static final Marshaller<InputStream> IDENTITY_MARSHALLER = 104*f585d8a3SJacky Wang new Marshaller<InputStream>() { 105*f585d8a3SJacky Wang @Override 106*f585d8a3SJacky Wang public InputStream stream(InputStream value) { 107*f585d8a3SJacky Wang return value; 108*f585d8a3SJacky Wang } 109*f585d8a3SJacky Wang 110*f585d8a3SJacky Wang @Override 111*f585d8a3SJacky Wang public InputStream parse(InputStream stream) { 112*f585d8a3SJacky Wang return stream; 113*f585d8a3SJacky Wang } 114*f585d8a3SJacky Wang }; 115*f585d8a3SJacky Wang 116*f585d8a3SJacky Wang /** A {@link Listener} that adapts {@code Listener<RequestT>} to {@code Listener<InputStream>}. */ 117*f585d8a3SJacky Wang private final class ServerCallListenerAdapter extends Listener<InputStream> { 118*f585d8a3SJacky Wang 119*f585d8a3SJacky Wang private final Listener<RequestT> delegate; 120*f585d8a3SJacky Wang ServerCallListenerAdapter(Listener<RequestT> delegate)121*f585d8a3SJacky Wang public ServerCallListenerAdapter(Listener<RequestT> delegate) { 122*f585d8a3SJacky Wang this.delegate = delegate; 123*f585d8a3SJacky Wang } 124*f585d8a3SJacky Wang 125*f585d8a3SJacky Wang @Override onMessage(InputStream message)126*f585d8a3SJacky Wang public void onMessage(InputStream message) { 127*f585d8a3SJacky Wang delegate.onMessage(delegateMethodDescriptor.parseRequest(message)); 128*f585d8a3SJacky Wang } 129*f585d8a3SJacky Wang 130*f585d8a3SJacky Wang @Override onHalfClose()131*f585d8a3SJacky Wang public void onHalfClose() { 132*f585d8a3SJacky Wang delegate.onHalfClose(); 133*f585d8a3SJacky Wang } 134*f585d8a3SJacky Wang 135*f585d8a3SJacky Wang @Override onCancel()136*f585d8a3SJacky Wang public void onCancel() { 137*f585d8a3SJacky Wang delegate.onCancel(); 138*f585d8a3SJacky Wang } 139*f585d8a3SJacky Wang 140*f585d8a3SJacky Wang @Override onComplete()141*f585d8a3SJacky Wang public void onComplete() { 142*f585d8a3SJacky Wang delegate.onComplete(); 143*f585d8a3SJacky Wang } 144*f585d8a3SJacky Wang } 145*f585d8a3SJacky Wang 146*f585d8a3SJacky Wang /** 147*f585d8a3SJacky Wang * A {@link ServerCall} that adapts {@code ServerCall<InputStream>} to {@code 148*f585d8a3SJacky Wang * ServerCall<ResponseT>}. 149*f585d8a3SJacky Wang */ 150*f585d8a3SJacky Wang final class ServerCallAdapter extends ServerCall<RequestT, ResponseT> { 151*f585d8a3SJacky Wang 152*f585d8a3SJacky Wang private final ServerCall<InputStream, InputStream> delegate; 153*f585d8a3SJacky Wang private final MethodDescriptor<RequestT, ResponseT> method; 154*f585d8a3SJacky Wang ServerCallAdapter(ServerCall<InputStream, InputStream> delegate, MethodDescriptor<RequestT, ResponseT> method)155*f585d8a3SJacky Wang ServerCallAdapter(ServerCall<InputStream, InputStream> delegate, 156*f585d8a3SJacky Wang MethodDescriptor<RequestT, ResponseT> method) { 157*f585d8a3SJacky Wang this.delegate = delegate; 158*f585d8a3SJacky Wang this.method = method; 159*f585d8a3SJacky Wang } 160*f585d8a3SJacky Wang 161*f585d8a3SJacky Wang @Override getMethodDescriptor()162*f585d8a3SJacky Wang public MethodDescriptor<RequestT, ResponseT> getMethodDescriptor() { 163*f585d8a3SJacky Wang return method; 164*f585d8a3SJacky Wang } 165*f585d8a3SJacky Wang 166*f585d8a3SJacky Wang @Override request(int numMessages)167*f585d8a3SJacky Wang public void request(int numMessages) { 168*f585d8a3SJacky Wang delegate.request(numMessages); 169*f585d8a3SJacky Wang } 170*f585d8a3SJacky Wang 171*f585d8a3SJacky Wang @Override sendHeaders(Metadata headers)172*f585d8a3SJacky Wang public void sendHeaders(Metadata headers) { 173*f585d8a3SJacky Wang delegate.sendHeaders(headers); 174*f585d8a3SJacky Wang } 175*f585d8a3SJacky Wang 176*f585d8a3SJacky Wang @Override sendMessage(ResponseT message)177*f585d8a3SJacky Wang public void sendMessage(ResponseT message) { 178*f585d8a3SJacky Wang delegate.sendMessage(delegateMethodDescriptor.streamResponse(message)); 179*f585d8a3SJacky Wang } 180*f585d8a3SJacky Wang 181*f585d8a3SJacky Wang @Override close(Status status, Metadata trailers)182*f585d8a3SJacky Wang public void close(Status status, Metadata trailers) { 183*f585d8a3SJacky Wang delegate.close(status, trailers); 184*f585d8a3SJacky Wang } 185*f585d8a3SJacky Wang 186*f585d8a3SJacky Wang @Override isCancelled()187*f585d8a3SJacky Wang public boolean isCancelled() { 188*f585d8a3SJacky Wang return delegate.isCancelled(); 189*f585d8a3SJacky Wang } 190*f585d8a3SJacky Wang } 191*f585d8a3SJacky Wang } 192