消息队列MQ

搭建kafka

本文主要是介绍搭建kafka,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

Kafka 被称为下一代分布式消息系统,由 scala 和 Java 编写,是非营利性组织
ASF(Apache Software Foundation,简称为 ASF)基金会中的一个开源项目,比如
HTTP Server、Hadoop、ActiveMQ、Tomcat 等开源软件都属于 Apache 基金会的开
源软件,类似的消息系统还有 RbbitMQ、ActiveMQ、ZeroMQ。

Kafka®用于构建实时数据管道和流应用程序。 它具有水平可伸缩性,容错性,
快速快速性,可在数千家公司中投入生产。

常用消息队列对比:

kafka 最主要的优势是其具备分布式功能、并可以结合 zookeeper 可以实现动态扩
容,Kafka 是一种高吞吐量的分布式发布订阅消息系统。

kafka 优势:

kafka 通过 O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以 TB 的
消息存储也能够保持长时间的稳定性能。
高吞吐量:即使是非常普通的硬件 Kafka 也可以支持每秒数百万的消息。
支持通过 Kafka 服务器分区消息。
支持 Hadoop 并行数据加载。

O(1)就是最低的时空复杂度了,也就是耗时/耗空间与输入数据大小无关,无论
输入数据增大多少倍,耗时/耗空间都不变,哈希算法就是典型的 O(1)时间复杂
度,无论数据规模多大,都可以在一次计算后找到目标

kafka 角色:

Broker:

Kafka 集群包含一个或多个服务器,这种服务器被称为 broker。

Topic :

每条发布到 Kafka 集群的消息都有一个类别,这个类别被称为 topic,(物
理上不同 topic 的消息分开存储在不同的文件夹,逻辑上一个 topic 的消息虽然保
存于一个或多个 broker 上但用户只需指定消息的 topic 即可生产或消费数据而不
必关心数据存于何处),topic 在逻辑上对 record(记录、日志)进行分组保存,消
费者需要订阅相应的 topic 才能消费 topic 中的消息。

Partition :

是物理上的概念,每个 topic 包含一个或多个 partition,创建 topic 时
可指定 parition 数量,每个 partition 对应于一个文件夹,该文件夹下存储该
partition 的数据和索引文件,为了实现实现数据的高可用,比如将分区 0 的数据
分散到不同的 kafka 节点,每一个分区都有一个 broker 作为 leader 和一个 broker
作为 Follower。

分区的优势(分区因子为 3):

一:实现存储空间的横向扩容,即将多个 kafka 服务器的空间结合利用
二:提升性能,多服务器读写
三:实现高可用,分区 leader 分布在不同的 kafka 服务器,比如分区 0 的 leader
为服务器 A,则服务器 B 和服务器 C 为 A 的 follower,而分区 1 的 leader 为服务
器 B,则服务器 A 和 C 为服务器 B 的 follower,而分区 2 的 leader 为 C,则服务
器 A 和 B 为 C 的 follower。

Producer:负责发布消息到 Kafka broker。

Consumer:消费消息,每个 consumer 属于一个特定的 consuer group(可为每个
consumer 指定 group name,若不指定 group name 则属于默认的 group),使用
consumer high level API 时,同一 topic 的一条消息只能被同一个 consumer group
内的一个 consumer 消费,但多个 consumer group 可同时消费这一消息。

kafka 部署:(基于zookeeper)

## 官方教程
https://kafka.apache.org/quickstart

# 下载
https://kafka.apache.org/downloads

部署三台服务器的高可用 kafka 环境。
部署环境:

Server1:172.31.2.41
Server2:172.31.2.42
Server3:172.31.2.43

三台都是一样的操作

下载

[root@mq1 src]# wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.8.0/kafka_2.12-2.8.0.tgz

创建目录

[root@mq1 src]# mkdir -p /apps
[root@mq1 src]# mkdir -p /data/kafka
[root@mq2 src]# mkdir -p /apps
[root@mq2 src]# mkdir -p /data/kafka
[root@mq3 src]# mkdir -p /apps
[root@mq3 src]# mkdir -p /data/kafka

解压

[root@mq1 src]# tar xf kafka_2.12-2.8.0.tgz -C /apps/
[root@mq2 src]# tar xf kafka_2.12-2.8.0.tgz -C /apps/
[root@mq3 src]# tar xf kafka_2.12-2.8.0.tgz -C /apps/

做成软链接

[root@mq1 src]# ln -s /apps/kafka_2.12-2.8.0 /apps/kafka
[root@mq2 src]# ln -s /apps/kafka_2.12-2.8.0 /apps/kafka
[root@mq3 src]# ln -s /apps/kafka_2.12-2.8.0 /apps/kafka

修改配置

[root@mq1 ~]# vim /apps/kafka/config/server.properties
broker.id=41  # 唯一的
listeners=PLAINTEXT://172.31.2.41:9092  # 本机的
# 调优
num.network.threads=8
num.io.threads=12
# 数据目录
log.dirs=/data/kafka
# zookeeper 集群
zookeeper.connect=172.31.2.41:2181,172.31.2.42:2181,172.31.2.43:2181

以守护进程启动

[root@mq1 ~]# /apps/kafka/bin/kafka-server-start.sh -daemon /apps/kafka/config/server.properties

[root@mq2 ~]# /apps/kafka/bin/kafka-server-start.sh -daemon /apps/kafka/config/server.properties

[root@mq3 ~]# /apps/kafka/bin/kafka-server-start.sh -daemon /apps/kafka/config/server.properties

查看端口9092

[root@mq1 ~]# ss -tanl
LISTEN              0                    50                                  [::ffff:172.31.2.41]:9092                                         *:*

验证 zookeeper 中 kafka 元数据:

1、Broker 依赖于 Zookeeper,每个 Broker 的 id 和 Topic、Partition 这些元数据信
息都会写入 Zookeeper 的 ZNode 节点中;
2、Consumer 依赖于 Zookeeper,Consumer 在消费消息时,每消费完一条消息,
会将产生的 offset 保存到 Zookeeper 中,下次消费在当前 offset 往后继续消费;
ps:kafka0.9 之前 Consumer 的 offset 存储在 Zookeeper 中,kafka0,9 以后 offset
存储在本地。
3、Partition 依赖于 Zookeeper,Partition 完成 Replication 备份后,选举出一个 Leader,
这个是依托于 Zookeeper 的选举机制实现的;

测试 kafka 读写数据:

创建 topic:
创建名为 logstashtest,partitions(分区)为 3,replication(每个分区的副本数/每个
分区的分区因子)为 3 的 topic(主题):
在任意 kafaka 服务器操作:

[root@mq1 kafka]# bin/kafka-topics.sh --create --zookeeper 172.31.2.41:2181,172.31.2.42:2181,172.31.2.43:2181 --partitions 3  --replication-factor 3 --topic m66

Created topic m66.

验证 topic:
状态说明:logstashtest 有三个分区分别为 0、1、2,分区 0 的 leader 是 3(broker.id),
分区 0 有三个副本,并且状态都为 lsr(ln-sync,表示可以参加选举成为 leader)。

[root@mq1 kafka]# bin/kafka-topics.sh --describe --zookeeper 172.31.2.41:2181,172.31.2.42:2181,172.31.2.43:2181 --topic m66

Topic: m66      TopicId: NIgq-gm3SFaxsTpvwCoshA PartitionCount: 3       ReplicationFactor: 3    Configs:
        Topic: m66      Partition: 0    Leader: 43      Replicas: 43,41,42      Isr: 43,41,42
        Topic: m66      Partition: 1    Leader: 41      Replicas: 41,42,43      Isr: 41,42,43
        Topic: m66      Partition: 2    Leader: 42      Replicas: 42,43,41      Isr: 42,43,41

获取所有 topic:

[root@mq1 kafka]# bin/kafka-topics.sh --list --zookeeper 172.31.2.41:2181,172.31.2.42:2181,172.31.2.43:2181 m66

m66

测试发送消息:

[root@mq1 kafka]# bin/kafka-console-producer.sh --broker-list 172.31.2.41:9092,172.31.2.42:9092,172.31.2.43:9092 --topic m66

>msg1
>msg2
>mgs3

测试获取消息:

可以到任意一台 kafka 服务器测试消息获取,只要有相应的消息获取客户端即可。

[root@mq2 ~]# /apps/kafka/bin/kafka-console-consumer.sh --topic m66 --bootstrap-server 172.31.2.42:9092 --from-beginning msg1

msg1
msg2
mgs3

删除 topic:(必须有的才能删除,没有就会报错)

[root@mq3 src]# /apps/kafka/bin/kafka-topics.sh --delete --zookeeper 172.31.2.41:2181,172.31.2.42:2181,172.31.2.43:2181 --topic m44

Topic m44 is marked for deletion.

范例:

[root@mq3 src]# /apps/kafka/bin/kafka-topics.sh --delete --zookeeper 172.31.2.41:2181,172.31.2.42:2181,172.31.2.43:2181 --topic m10

Error while executing topic command : Topic 'm10' does not exist as expected
[2021-08-17 06:30:06,677] ERROR java.lang.IllegalArgumentException: Topic 'm10' does not exist as expected
        at kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:542)
        at kafka.admin.TopicCommand$ZookeeperTopicService.deleteTopic(TopicCommand.scala:500)
        at kafka.admin.TopicCommand$.main(TopicCommand.scala:71)
        at kafka.admin.TopicCommand.main(TopicCommand.scala)
 (kafka.admin.TopicCommand$)
这篇关于搭建kafka的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!