什么是拦截器?
拦截器是一种横切维度的功能延展。
具象说明一下,高速收费站就是一种拦截器。它可以做什么?收费,查证,交通控制等等,面向所有穿行过往的车辆。
gRPC 拦截器主要分为两种:客户端拦截器(ClientInterceptor),服务端拦截器(ServerInterceptor),顾名思义,分别于请求的两端执行相应的前拦截处理。
请求被分发出去之前。
a)、请求日志记录及监控
b)、添加请求头数据、以便代理转发使用
c)、请求或者结果重写
通常,如果要提供认证信息的话,可以使用 CallCredentials 实现,虽然,拦截器里也可以通过设置 CallOptions 来提供。
@ThreadSafe public interface ClientInterceptor { <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall( MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next); }
它只有一个方法:interceptCall,对于注册了相应拦截器的客户端调用,都要经过这个方法,
参数:
1、method:MethodDescriptor 类型,标示请求方法。包括方法全限定名称、请求服务名称、请求、结果、序列化工具、幂等等。
2、callOptions:此次请求的附带信息。
3、next:执行此次 RPC 请求的抽象链接管道(Channel)
返回结果:
ClientCall,包含请求及结果信息,并且不为null。
什么也不做:
public class MyGrpcClientInterceptor implements ClientInterceptor { @Override public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) { return next.newCall(method, callOptions); } }
可以看到我们的实现里,没有实现任何逻辑,直接执行了 next.newCall 继续执行客户端的此次调用。
next.newCall 只能在当前上下文中执行,每次调用以及返回都必须是一个完整地回路,逃逸使用会导致不必要的内存泄漏问题。
通过 callOption 设置超时及认证信息:
public class MyGrpcClientInterceptor implements ClientInterceptor { @Override public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) { return next.newCall(method, callOptions .withDeadlineAfter(500, TimeUnit.MILLISECONDS) //设置超时 .withCallCredentials(new CallCredentials() { //设置认证信息 @Override public void applyRequestMetadata(RequestInfo requestInfo, Executor appExecutor, MetadataApplier applier) { Metadata metadata = new Metadata(); metadata.put(Metadata.Key.of("id", Metadata.ASCII_STRING_MARSHALLER), "king"); applier.apply(metadata); } @Override public void thisUsesUnstableApi() {} })); } }
看着是不是很熟悉,stub 调用时设置,只不过在这里换了一个设置场景。
日志记录:
public class MyGrpcClientInterceptor implements ClientInterceptor { @Override public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) { CallOptions myCallOptions = callOptions .withDeadlineAfter(500, TimeUnit.MILLISECONDS) //设置超时 .withCallCredentials(new CallCredentials() { //设置认证信息 @Override public void applyRequestMetadata(RequestInfo requestInfo, Executor appExecutor, MetadataApplier applier) { Metadata metadata = new Metadata(); metadata.put(Metadata.Key.of("id", Metadata.ASCII_STRING_MARSHALLER), "king"); applier.apply(metadata); } @Override public void thisUsesUnstableApi() {} }); return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, myCallOptions)) { @Override public void sendMessage(ReqT message) { System.out.println("request method: " + method.getFullMethodName()); System.out.println("request param:" + message.toString()); super.sendMessage(message); } }; } }
ForwardingClientCall:ClientCall 的一个抽象实现类,用以请求的代理转发。为什么我们这里要用这个类呢?
其实我们完全可以直接使用 ClientCall 实现,只不过作为顶级抽闲类,我们必须要实现很多方法。而使用 ForwardingClientCall,则我们只需要去重写我们需要的方法就可以。
如上代码:
sendMessage 发送消息到请求服务器,可能会执行多次。此处我们记录相应的请求参数信息。
请求被具体的Handler相应前。
a)访问认证
b)请求日志记录及监控
c)代理转发
@ThreadSafe public interface ServerInterceptor { <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall( ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next); }
参数:
call:ServerCall 对象,包含客户端请求的 MethodDescriptor
headers:请求头信息
next:处理链条上的下一个处理。
public class MyGrpcServerInterceptor implements ServerInterceptor { @Override public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) { return next.startCall(call, headers); } }
ServerCallHandler:定义用以实现请求处理的接口类。
public class MyGrpcServerInterceptor implements ServerInterceptor { @Override public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) { //提取认证信息 String id = headers.get(Metadata.Key.of("id", Metadata.ASCII_STRING_MARSHALLER)); return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(next.startCall(call, headers)) { private long startTime = 0; //处理开始时间 private ReqT request; private boolean valid = false; //认证状态 @Override public void onComplete() { //记录请求参数及耗时 System.out.println("process cost: " + (System.nanoTime() - startTime)); System.out.println("process param: " + request.toString()); super.onComplete(); } @Override public void onMessage(ReqT message) { startTime = System.nanoTime(); request = message; if (StringUtils.equals("king", id)) { super.onMessage(message); } else { valid = false; } } @Override public void onHalfClose() { //验证失败则返回 Status.UNAUTHENTICATED if (!valid) { call.close(Status.UNAUTHENTICATED.withDescription("auth failed"), new Metadata()); } else { super.onHalfClose(); } } }; } }
onMessage:接收到请求时进行相应处理,我们这记录处理开始时间,及请求参数,同时根据提取的认证信息进行访问验证,验证通过则继续后续处理,否则设置认证状态为 false。
onHalfClose:处理认证标示及返回。
onComplete:处理结束记录请求参数及耗时。