关于 Apache Pulsar
Apache Pulsar 是 Apache 软件基金会顶级项目,是下一代云原生分布式消息流平台,集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐、低延时及高可扩展性等流数据存储特性。
GitHub 地址:http://github.com/apache/pulsar/
导语:本文是 StreamNative 解决方案工程师魏彬在 TGIP-CN 032 直播活动的文字整理版本。在活动中,带领大家认识了 Pulsar 测试环境搭建、周边工具到组件等方面,帮助大家快速入手 Apache Pulsar。
今天为大家带来 Apache Pulsar 快速上手的内容,本次主要是面对刚刚接触 Pulsar 的同学,介绍如何快速的搭建 Pulsar 测试环境,熟悉 Pulsar 周边工具、相关组件。希望通过这次分享,大家后续能自己按照这次分享的演练,快速地把 Pulsar 相关集群、周边工具跑起来,为下一步进阶做好准备。
本文内容主要分为以下三部分:
简单介绍一下 Apache Pulsar,它是新一代的云原生分布式消息流平台,这里面有几个关键词。云原生的话,相信大家应该都听得非常多了,简单可以理解成是面向 K8S,非常适合在 K8S 这种容器编排的系统里面运作。消息流平台是指 Apache Pulsar 是融合了消息队列以及流处理两种特性的数据平台。
Pulsar 的出现时间是最晚的,大家可以从上图看到,Pulsar 是在 2012 年设计的,诞生原因是在它之前的项目没有满足当时创造者的需求。
主要分为 Streaming(流处理消费模式) 和 Queuing(队列消费模式),具体如上图。
对消息队列来讲,它们的差异是什么?讲到消息队列,拿 RabbitMQ 举例,一条消息进来之后,只会给一个消费者去消费,也就是说它只 deliver 一次,消费完就结束了,不会去存它。
相对应的,如 Kafka 代表的 Streaming 系统,它的消息允许有多个消费者,也就是说消息进来一次,但可以被消费多次。另外它的消息是可以持久化到 Streaming 的平台,也意味着后续是可以做历史性数据的重复消费。
此外,在消息队列场景中,对于消息的消费没有严格的顺序限制,一个消息进来不会有严格的消费顺序,不是必须先消费这个消息。但是在 Streaming 平台,往往我们会遇到严格的顺序,必须按照消息进来的顺序去消费。
消息队列更多被运用到异步解耦的场景。简单举个例子,比如我们做电商平台,用户下了一个单,之后你想给他发邮件或短信提醒一下。一般情况下,这种任务不太可能被放到主处理流程里。因为发邮件、短信相对都是比较慢的处理逻辑,如果把它放到下单流程里,会导致下单服务的存储量很难上去。
这个时候我们往往引入消息队列,让消息队列去把下单的服务和发邮件、发短信的服务解耦掉。但是流处理的场景更多是大数据的实时计算场景。消息队列和流处理应用场景不一样导致催生了不同的产品。
Pulsar 的出现让我们可以在一个产品里面同时把消息队列和流平台统一在一起,用户没有必要再去定位得那么细。在现有技术的场景里面,消息队列和流处理的边界其实没有那么清晰。在用 Kafka 时,很多用户把它当一个消息队列。如果严格按照刚才的定义,很多用户其实是把 Kafka 当成一个异步解耦的平台在用。
消息队列和流处理两者的核心的差异其实就是消费模型,也就是消费一次还是多次消费、是否有顺序。
Pulsar 在消费模型上能够支持队列、流的消费模型,所以它可以统一这两个场景,这也就是前面最早讲的“统一的云原生的消息流平台”的一个解释。
如上图所示,展示了 Pulsar 支持的消费语义。通过前面对于流的定义,Exclusive、Failover 消费语义可以理解为 Streaming 流处理的消费模式,是严格按照消息的顺序消费的。
Shared 和 Key_Shared 模式允许乱序的消费,可以加很多的消费者从同一个 topic 里面读取数据。这个时候它是无序的,类似于我们上面提到的 Queuing 消费队列的消费模式。
当 Pulsar 把这两种场景统一后,实际上在用的时候没有必要特别严格地再去想这到底是一个消息队列的场景还是流场景,可以直接按照消费的需求去应用不同的消费语义。
Pulsar 为什么要统一这两种平台?它可以降低用户使用时候的心智成本,用户不需要再去在那么多的产品项目里面去挑选,在一个产品项目里面都可以解决。
Pulsar 是存储和计算分离的,最上层是 Producer、Consumer 用于生产消息、消费消息。下面一层是 Broker 计算层,再下面这层是 Bookie,可以看到有很多 Segment,这就是存储层,它以 Segment 为颗粒度提供一个存储层的服务。每一层节点都是对等的,相互支持独立扩展,非常便捷和灵活地支持我们做扩容或者容错处理。
上图展示了很多 Pulsar 的特性,其中包括一些企业级的特性,大家感兴趣可以前往官网了解一下,如:
这里不再赘述。
上图展示了 Pulsar 的三个核心组件:
Broker 主要用于 Producer 和 Consumer 交互时的协议解析。Pulsar 有自己的一套交互协议,当 Producer 和 Consumer 与 Pulsar 交互需要基于 Pulsar 的协议去处理。
Kafka 在对外暴露服务的时候,也把自己的一套协议暴露了。当 Kafka 的 Producer 和 Consumer 消费的时候,用户要遵循 Kafka 的生产、消费协议,才能与 Kafka 去做交互。
Pulsar 的计算与存储层分离意味着只要在计算层去兼容 Kafka 协议,就允许 Kafka 的 Producer 和 Consumer 把数据写到 Pulsar,我们在计算层可以做不同的协议兼容。
除了 Kafka,我们也可以做 AMQP、MQTT、RocketMQ 等协议兼容。因为计算层就是做协议的解析处理,本身这一层不会存任何数据,所有的数据在做了解析处理之后会被存到下游的 BookKeeper(存储层)里,当消费者来消费的时候,就从它里面读出来。
接下来我们看 Storage Layer ,大家可以把它理解成是一个面向分片的设计。BookKeeper 提供的是一个面向 Segment 的存储语义,与它相对比的就是 Partition (分区)的语义。分区的语义最常见的是像 Kafka 的这种语义。
Partition 跟 Segment 最大的区别在于 Partition 的量级往往是比较大的。Kafka 的分区一旦与一个存储节点、存储设备、磁盘绑定,该 Partition 就只能在这个磁盘随着数据的写入越来越大,与这个存储磁盘绑定在一起。当我们去做扩容或者迁移的时候,Partition 越大,操作的成本就会越高。
Segment 的好处是什么?我们可以去定义的它的大小上限。BookKeeper 或者在 Pulsar 的存储层其实也有 Partition 的概念,但是 Partition 已经被打散成 Segment,不同的 Segment 又与底层的存储设备绑定。由于 Segment 是有大小的限定,成本就会非常小。这就是存储层以 Segment 语义去提供读写的服务带来的最大好处。
ZooKeeper 的职能:
上图简单描述了一个流向,让大家可以理解组件之间的交互。Broker 与 BookKeeper 都会与 Zookeeper 发生请求交互。Broker 与 BookKeeper 的关系是:Broker 会把 Producer 或 Consumer 发来读写请求写到 BookKeeper 或者从 BookKeeper 读出去,Broker 相当于 BookKeeper 的 Client。
再来看一下 Pulsar 组件对外的端口,启动服务的时候,我们要对它的端口有清晰的了解。在上图中,Broker 暴露了一个 TCP端口:6650,还有 HTTP 的端口:8080。6650 端口主要给 Client 去连接,当 Produce 或者 Consume 时,它都会去连到 6650
的 TCP 端口;8080 端口暴露了一些监控指标,也就是兼容 Prometheus 协议的指标。另外一个就是它也暴露了 admin 的 API,当你要对 Pulsar 做运维的配置管理或者去取指标数据的时候,可以通过这个端口去获取。
BookKeeper 暴露的是 3181 TCP 端口,这个端口主要就是给 Broker 用,它通过连 3181 端口去连接 BookKeeper。它也同时有 8080 HTTP 端口,主要暴露 Prometheus的监控指标,是 Bookie 的一些监控指标。
ZooKeeper 的 2181 是很常见的 ZooKeeper 的 TCP 对外服务端口,这个端口是给 Broker、BookKeeper 去连接的。Pulsar 的 ZooKeeper 里面也做了 8080 的 HTTP 端口,它主要是把 ZooKeeper 的一些指标给暴露出来。这些端口都是可以在配置里面自己去定义的,这里提到的都是默认端口号。
当然 Pulsar 所有的组件都是支持分布式的:在生产上,Broker 一般建议至少两个,这样可以在其中一个挂了情况下,将流量切到另一个;bookie 建议至少三个,在其中一个挂掉的情况下,数据在另外两个节点上还会存有备份;ZooKeeper 一般三个,足够使用。每一个组件都可以分布式,也就意味着每一个组件都可以按照自己的需求扩容/缩容,存储层不够可以加 BookKeeper 的节点,计算层不够可以加 Broker 上 Bookie 的节点,ZooKeeper 压力大可以管理 ZooKeeper。
希望前文的介绍让大家对 Pulsar 有了概念,接下来我们讲上手 Pulsar。Pulsar 有两个下载地址,官方地址与项目镜像地址(国内镜像源)。在官方地址中提供所有和 Pulsar 相关的组件,binary 即主 Pulsar;在上手阶段,大家可以跳过网站上周边生态组件;集成 SDK 可以在 Client 中查询;Pulsar Manager 是我们后续会用到的 UI Dashboard 工具。国内用户从镜像源下载会更快速。
首先我们需要一个 Java 的环境,在本机装在 Oracle JDK/ Open JDK 1.8 版本。Standalone 是 Pulsar 提供的本机开发模式,上手阶段和本地起开发的时候推荐使用这个模式。Standalone 很简单,下载后只需要跑命令。此处列了两个命令,bin/pulsar standalone
在前台运行,bin/pulsar-daemon standalone
在后台运行。详情可以参考Standalone 文档,在视频 28:40 ~ 32:54 可参考运用 Pulsar 2.7.1 Standalone 的 demo。
bin/pulsar-admin clusters list
//展示 Pulsar 集群bin/pulsar-admin broker list test
//查看集群 Brokerbin/pulsar-admin topics list public/default
//查看 topicbin/pulsar-client produce my topic --messages “hello-pulsar”
//生产消息bin/pulsar-client consume my topic -s “first-subscription”
//消费消息进一步了解 Pulsar 提供的命令行的、工具,可以查阅官网 client 文档和 Pulsar admin 文档。
除了 Standalone 模式,很多人希望在本地测试时可以使用和生产环境类似的集群模式,即拥有多个 Broker 和 Bookie。一种可行的方式是在本地复制多个数据目录和配置文件去执行,可以参考 deploy-bare-metal 文档。另一种方式是,如果想在测试环境中如果想用集群模式,推荐使用 Docker 运行,参考 Github 仓库 Docker Compose 文档。
有了可以运行的 Pulsar 后,接下来分享周边运维工具,比如如何去高效地管理集群、运维集群并观测集群;集群的生产消费数;topic 数据量等。
Pulsar Manager 用来管理 Pulsar 集群,其实现架构比较简单。后端用于连接 Pulsar Broker 和 Bookie,本地也有持久化存储。
Pulsar Manager 架构图
建议大家打开两个配置:一个是在 application.properties 配置文件内打开 bookie.enable=true
和 pulsar.peek.massage=true
,另一个是在 bkvm.conf 配置文件内打开 bookie.enable=true
。
Pulsar 的监控工具用的是云原生场景下最常见的 Prometheus 和 Grafana。因为 Pulsar 在设计之初就考虑到了云原生,所以 Broker、Bookie、Zookeeper 都直接暴露了 8080 端口,访问端口就会直接吐出 Prometheus 兼容的 metrics,只需要在 Prometheus 内加上 Broker、Bookie、Zookeeper。
StreamNative 建立了仓库,里面内置大量可用的 Dashboard。
首先需要配置监控工具抓到相关指标。本地环境只需要通过 8080 端口配置,而在生产环境中 broker 等分散在自己的端口内,多个 Broker、Bookie 都要独立配置。
接下来导入指标。在上面提到的 apache-pulsar-grafana-dashboard 仓库中有 Prometheus 文件夹,里面为大家提供了一些模版,大家可以参照模版去配置。
.scripts/general-dashboards.sh <prometheus-url> <clustername>
。最后,也介绍了 Perf 压力测试,这里不再赘述,大家可查看下面回顾视频查看相关演示 Demo。
视频播放地址:https://www.bilibili.com/video/BV1oK4y1A76N?from=search&seid=7311952598788771684
嘉宾介绍
魏彬(@rockybean) Solution Engineer@StreamNative,Elastic Certified Engineer & Analyst,阿里云 MVP。