这是作为sofarpc学习的子篇,主要学习netty在rpc里面的使用。
很重要的三个类NettyByteBuffer(直接跳过没啥好讲),NettyChannel,NettyHelper(也直接跳过,比较简单)
简单介绍:保存服务端boss,work线程池,还有客户端对应io线程池 EventLoopGroup
我们来看下在哪里set进去
com.alipay.sofa.rpc.transport.http.AbstractHttp2ServerTransport#start
很多功能
调用的实现
com.alipay.sofa.rpc.client.AbstractCluster#doSendMsg
同步
Callback调用
request.setSofaResponseCallback(methodResponseCallback)一个重点关注点
Future调用
有很多种协议,主要看下http
com.alipay.sofa.rpc.transport.http.AbstractHttp2ClientTransport
基本参数
com.alipay.sofa.rpc.transport.http.AbstractHttp2ClientTransport#asyncSend
com.alipay.sofa.rpc.transport.http.AbstractHttp2ClientTransport#doInvokeAsync
会分有回调callback还是future调用,最终通过doSend(request, callback, timeoutMillis);去调用
过程是这样,会去生成一个requestId,然后放到map里头,搞个超时机制去remove,如果超时的时候还在里头就会塞入异常信息
requestId是AtomicInteger去递增,channel.write(httpRequest)输出到netty服务端。
responseChannelHandler.put(requestId,channel.write(httpRequest),callback);
把streamId跟future,AbstractHttpClientHandler塞到map里头
这里处理链读到数据之后怎么一个处理过程
com.alipay.sofa.rpc.transport.http.Http2ClientChannelHandler#channelRead0
会根据header streamId去拿到对应的AbstractHttpClientHandler,处理器
callback.receiveHttpResponse(msg);
序列化,然后去setData
在这里实现前面callback或者future等等
com.alipay.sofa.rpc.transport.http.FutureInvokeClientHandler#doOnResponse
com.alipay.sofa.rpc.message.AbstractResponseFuture#setSuccess
往future里头塞result
整个过程流程图
com.alipay.sofa.rpc.transport.http.AbstractHttp2ClientTransport#syncSend
同步调用有个很骚的操作,就是它实际还是一个future去调用,作为一个callback
就是同步帮你get一个结果,异步丢给你个future,你自己去get结果,哈哈哈,你好骚~
最后还是调用doSend方法
同步方法是框架请求完直接帮你调用DefaultFuture的get方法阻塞获取结果,
异步方法是把DefaultFuture返回给用户,让用户决定获取调用get()方法的时机
这里不是线程池优雅关闭,是调用线程,它会一直等到计数器为0,再关闭
com.alipay.sofa.rpc.client.AbstractCluster#select(com.alipay.sofa.rpc.core.request.SofaRequest, java.util.List<com.alipay.sofa.rpc.client.ProviderInfo>)
com.alipay.sofa.rpc.client.AbstractLoadBalancer#select