// io.grpc.internal.GrpcUtil public static final long DEFAULT_SERVER_KEEPALIVE_TIME_NANOS = TimeUnit.HOURS.toNanos(2L);
io.grpc.internal.KeepAliveManager
, 用于管理KeepAlive状态,ping任务调度与执行.NettyChannelBuilder builder = NettyChannelBuilder.forTarget(String.format("grpc://%s", provider)) // .usePlaintext() // .defaultLoadBalancingPolicy(props.getBalancePolicy()) // .maxInboundMessageSize(props.getMaxInboundMessageSize()) // .keepAliveTime(1,TimeUnit.MINUTES) .keepAliveWithoutCalls(true) .keepAliveTimeout(10,TimeUnit.SECONDS) .intercept(channelManager.getInterceptors()); //
keepAliveTime
,keepAliveTimeout
,keepAliveWithoutCalls
。这三个变量有什么作用呢?Create & Start
NettyChannelBuilder -----> NettyTransportFactory ---------> NettyClientTransport -------------> KeepAliveManager & NettyClientHandler
响应各种事件
当Active、Idle、DataReceived、Started、Termination事件发生时,更改KeepAlive状态,调度发送ping任务。
// 只截取关键代码,详细代码请看`NettyServerBuilder` ServerImpl server = new ServerImpl( this, buildTransportServers(getTracerFactories()), Context.ROOT); for (InternalNotifyOnServerBuild notifyTarget : notifyOnBuildList) { notifyTarget.notifyOnBuild(server); } return server; // 在buildTransportServers方法中创建NettyServer List<NettyServer> transportServers = new ArrayList<>(listenAddresses.size()); for (SocketAddress listenAddress : listenAddresses) { NettyServer transportServer = new NettyServer( listenAddress, resolvedChannelType, channelOptions, bossEventLoopGroupPool, workerEventLoopGroupPool, negotiator, streamTracerFactories, getTransportTracerFactory(), maxConcurrentCallsPerConnection, flowControlWindow, maxMessageSize, maxHeaderListSize, keepAliveTimeInNanos, keepAliveTimeoutInNanos, maxConnectionIdleInNanos, maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos, permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos, getChannelz()); transportServers.add(transportServer); }
Create & Start
NettyServerBuilder ---> NettyServer ---------> NettyServerTransport -------------> NettyServerHandler -----------------> KeepAliveEnforcer
连接准备就绪
调用 io.netty.channel.ChannelHandler的handlerAdded
方法,关于此方法的描述:
Gets called after the ChannelHandler was added to the actual context and it's ready to handle events. NettyServerHandler(handlerAdded) ---> 创建KeepAliveManager对象
响应各种事件
同Client
在上面Server端的简要时序图中,可以看见,server端有一个特有的io.grpc.netty.KeepAliveEnforcer
类
此类的作用是监控clinet ping的频率,以确保其在一个合理范围内。
package io.grpc.netty; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import java.util.concurrent.TimeUnit; import javax.annotation.CheckReturnValue; /** Monitors the client's PING usage to make sure the rate is permitted. */ class KeepAliveEnforcer { @VisibleForTesting static final int MAX_PING_STRIKES = 2; @VisibleForTesting static final long IMPLICIT_PERMIT_TIME_NANOS = TimeUnit.HOURS.toNanos(2); private final boolean permitWithoutCalls; private final long minTimeNanos; private final Ticker ticker; private final long epoch; private long lastValidPingTime; private boolean hasOutstandingCalls; private int pingStrikes; public KeepAliveEnforcer(boolean permitWithoutCalls, long minTime, TimeUnit unit) { this(permitWithoutCalls, minTime, unit, SystemTicker.INSTANCE); } @VisibleForTesting KeepAliveEnforcer(boolean permitWithoutCalls, long minTime, TimeUnit unit, Ticker ticker) { Preconditions.checkArgument(minTime >= 0, "minTime must be non-negative"); this.permitWithoutCalls = permitWithoutCalls; this.minTimeNanos = Math.min(unit.toNanos(minTime), IMPLICIT_PERMIT_TIME_NANOS); this.ticker = ticker; this.epoch = ticker.nanoTime(); lastValidPingTime = epoch; } /** Returns {@code false} when client is misbehaving and should be disconnected. */ @CheckReturnValue public boolean pingAcceptable() { long now = ticker.nanoTime(); boolean valid; if (!hasOutstandingCalls && !permitWithoutCalls) { valid = compareNanos(lastValidPingTime + IMPLICIT_PERMIT_TIME_NANOS, now) <= 0; } else { valid = compareNanos(lastValidPingTime + minTimeNanos, now) <= 0; } if (!valid) { pingStrikes++; return !(pingStrikes > MAX_PING_STRIKES); } else { lastValidPingTime = now; return true; } } /** * Reset any counters because PINGs are allowed in response to something sent. Typically called * when sending HEADERS and DATA frames. */ public void resetCounters() { lastValidPingTime = epoch; pingStrikes = 0; } /** There are outstanding RPCs on the transport. */ public void onTransportActive() { hasOutstandingCalls = true; } /** There are no outstanding RPCs on the transport. */ public void onTransportIdle() { hasOutstandingCalls = false; } /** * Positive when time1 is greater; negative when time2 is greater; 0 when equal. It is important * to use something like this instead of directly comparing nano times. See {@link * System#nanoTime}. */ private static long compareNanos(long time1, long time2) { // Possibility of overflow/underflow is on purpose and necessary for correctness return time1 - time2; } @VisibleForTesting interface Ticker { long nanoTime(); } @VisibleForTesting static class SystemTicker implements Ticker { public static final SystemTicker INSTANCE = new SystemTicker(); @Override public long nanoTime() { return System.nanoTime(); } }
标签:icode9,机制研究,keepAlive,java,grpc,操作系统,,框架创建 来源:
本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享; 2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关; 3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关; 4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除; 5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。