ZeebeClient通过Grpc调用Gateway,Gateway将请求通过netty分发到Broker,Broker处理逻辑
客户端使用ZeebeClient和Zeebe交互,其中最常用的是注册Worker来完成job
zeebeClient.newWorker() .jobType(ZeebeConfigProperties.CONFIG_UPDATE_PIPELINE_EXECUATION_ERROR) .handler((jobClient, job) -> configChangeWorkers.errorHandle(jobClient, job)) .fetchVariables(Workers.CONTEXT_ID, Workers.CHANGE_REQUEST, Workers.DEVICE_INFO, Workers.MANAGED_OBEJCT) .open()
Open方法实例化JobWorkerImpl,在JobWorkerImpl的构造方法中建立定时任务tryActivateJobs不断拉取job
public JobWorkerImpl(final int maxJobsActive, final ScheduledExecutorService executor, final Duration pollInterval, final JobRunnableFactory jobRunnableFactory, final JobPoller jobPoller) { this.maxJobsActive = maxJobsActive; this.activationThreshold = Math.round((float)maxJobsActive * 0.3F); this.remainingJobs = new AtomicInteger(0); this.executor = executor; this.jobRunnableFactory = jobRunnableFactory; this.jobPoller = new AtomicReference(jobPoller); executor.scheduleWithFixedDelay(this::tryActivateJobs, 0L, pollInterval.toMillis(), TimeUnit.MILLISECONDS); }
在activateJobs方法中jobPoller.poll进行任务拉取,this::submitJob进行拉取任务后对任务处理
private void activateJobs() { JobPoller jobPoller = (JobPoller)this.jobPoller.getAndSet((Object)null); if (jobPoller != null) { int currentRemainingJobs = this.remainingJobs.get(); if (this.shouldActivateJobs(currentRemainingJobs)) { int maxActivatedJobs = this.maxJobsActive - currentRemainingJobs; try { jobPoller.poll(maxActivatedJobs, this::submitJob, (activatedJobs) -> { this.remainingJobs.addAndGet(activatedJobs); this.jobPoller.set(jobPoller); }, this::isOpen); } catch (Exception var5) { LOG.warn("Failed to activate jobs", var5); this.jobPoller.set(jobPoller); } } else { this.jobPoller.set(jobPoller); } } }
poll方法通过Grpc调用Zeebe
private void poll() { LOG.trace("Polling at max {} jobs for worker {} and job type {}", new Object[]{this.requestBuilder.getMaxJobsToActivate(), this.requestBuilder.getWorker(), this.requestBuilder.getType()}); ((GatewayStub)this.gatewayStub.withDeadlineAfter(this.requestTimeout, TimeUnit.MILLISECONDS)).activateJobs(this.requestBuilder.build(), this); }
onNext方法中就是对返回的结果进行比对,如果有对应上的worker则进行处理
public void onNext(final ActivateJobsResponse activateJobsResponse) { this.activatedJobs += activateJobsResponse.getJobsCount(); activateJobsResponse.getJobsList().stream().map((job) -> { return new ActivatedJobImpl(this.objectMapper, job); }).forEach(this.jobConsumer); }
Zeebe的proto文件定义在gateway-protocol项目中
GatewayGrpcService是Zeebe接口的入口处
@Override public void activateJobs( final ActivateJobsRequest request, final StreamObserver<ActivateJobsResponse> responseObserver) { endpointManager.activateJobs( request, ErrorMappingStreamObserver.ofStreamObserver(responseObserver)); } @Override public void cancelProcessInstance( final CancelProcessInstanceRequest request, final StreamObserver<CancelProcessInstanceResponse> responseObserver) { endpointManager.cancelProcessInstance( request, ErrorMappingStreamObserver.ofStreamObserver(responseObserver)); } @Override public void completeJob( final CompleteJobRequest request, final StreamObserver<CompleteJobResponse> responseObserver) { endpointManager.completeJob( request, ErrorMappingStreamObserver.ofStreamObserver(responseObserver)); }
其中completeJob方法中调用sendRequest方法,RequestMapper::toCompleteJobRequest是对请求参数处理的方法引用,ResponseMapper::toCompleteJobResponse是返回结果处理的方法引用
public void completeJob( final CompleteJobRequest request, final ServerStreamObserver<CompleteJobResponse> responseObserver) { sendRequest( request, RequestMapper::toCompleteJobRequest, ResponseMapper::toCompleteJobResponse, responseObserver); }
该方法中先将grpc的request转化为brokerRequest,再通过brokerClient将请求转发给broker
private <GrpcRequestT, BrokerResponseT, GrpcResponseT> void sendRequest( final GrpcRequestT grpcRequest, final Function<GrpcRequestT, BrokerRequest<BrokerResponseT>> requestMapper, final BrokerResponseMapper<BrokerResponseT, GrpcResponseT> responseMapper, final ServerStreamObserver<GrpcResponseT> streamObserver) { final BrokerRequest<BrokerResponseT> brokerRequest; try { brokerRequest = requestMapper.apply(grpcRequest); } catch (final Exception e) { streamObserver.onError(e); return; } brokerClient.sendRequestWithRetry( brokerRequest, (key, response) -> consumeResponse(responseMapper, streamObserver, key, response), streamObserver::onError); }
sendRequestInternal先决定处理的borker,sender.send请求调用,actor.runOnCompletion对返回结果进行处理
private <T> void sendRequestInternal( final BrokerRequest<T> request, final CompletableFuture<BrokerResponse<T>> returnFuture, final TransportRequestSender sender, final Duration requestTimeout) { final BrokerAddressProvider nodeIdProvider; try { nodeIdProvider = determineBrokerNodeIdProvider(request); } catch (final PartitionNotFoundException e) { returnFuture.completeExceptionally(e); GatewayMetrics.registerFailedRequest( request.getPartitionId(), request.getType(), "PARTITION_NOT_FOUND"); return; } catch (final NoTopologyAvailableException e) { returnFuture.completeExceptionally(e); GatewayMetrics.registerFailedRequest( request.getPartitionId(), request.getType(), "NO_TOPOLOGY"); return; } final ActorFuture<DirectBuffer> responseFuture = sender.send(clientTransport, nodeIdProvider, request, requestTimeout); final long startTime = System.currentTimeMillis(); actor.runOnCompletion( responseFuture, (clientResponse, error) -> { RequestResult result = null; try { if (error == null) { final BrokerResponse<T> response = request.getResponse(clientResponse); result = handleResponse(response, returnFuture); if (result.wasProcessed()) { final long elapsedTime = System.currentTimeMillis() - startTime; GatewayMetrics.registerSuccessfulRequest( request.getPartitionId(), request.getType(), elapsedTime); return; } } else { returnFuture.completeExceptionally(error); } } catch (final RuntimeException e) { returnFuture.completeExceptionally(new ClientResponseException(e)); } registerFailure(request, result, error); }); }
sender.send的实际调用方式
private static final TransportRequestSender SENDER_WITH_RETRY = (c, s, r, t) -> c.sendRequestWithRetry(s, BrokerRequestManager::responseValidation, r, t);
**最终调用为 messagingService
.sendAndReceive(nodeAddress, requestContext.getTopicName(), requestBytes, calculateTimeout)通过netty调用broker
**
handler.accept(message, this)方法处理请求
@Override public void dispatch(final ProtocolRequest message) { final String subject = message.subject(); final BiConsumer<ProtocolRequest, ServerConnection> handler = handlers.get(subject); if (handler != null) { log.trace("Received message type {} from {}", subject, message.sender()); handler.accept(message, this); } else { log.debug("No handler for message type {} from {}", subject, message.sender()); byte[] subjectBytes = null; if (subject != null) { subjectBytes = StringUtil.getBytes(subject); } reply(message, ProtocolReply.Status.ERROR_NO_HANDLER, Optional.ofNullable(subjectBytes)); } }