消息队列MQ

Kafka 入门

本文主要是介绍Kafka 入门,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

一、Kafka概述

Kafka是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。 Kafka专为分布式高吞吐量系统而设计。

最新定义:Kafka是一个开源的分布式事件流平台(EventStreamingPlatform),被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。

主要应用场景包括:

  • 缓存/消峰:有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
  • 解耦:允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
  • 异步通信:允许用户把一个消息放入队列,但并不立即处理它,然后在需要的时候再去处理它们。

消息队列得两种模式:

  • 点对点模式:消费者主动拉取数据,消息收到后清除队列中得消息;
  • 发布/订阅者模式:可以有多个主题(如:浏览信息、点赞、收藏、评论等);消费者消费数据后不删除数据;每个消费者都是相互独立的,可以的消费到数据。

二、Kafka架构

 

  •  Consumer Group:消费者组,由多个consumer组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
  • Broker:一台Kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
  • Topic:可以理解为一个队列,生产者和消费者面向的都是一个topic。
  • Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。 
  • Laeder:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是Leader
  • Follower:每个分区多个副本中的“从”,实时从Leader中同步数据,保持和Leader数据的同步。Leader发生故障时,某个Follower会成为新的Leader。

三、安装部署

下载地址:http://kafka.apache.org/downloads.html 

Kafka集群需要首先配置好Hadoop集群以及Zookeeper集群,一个Hadoop节点以及zookeeper结点对应一个Kafka集群节点。

1、解压安装

[root@node01 software]# tar -zxvf kafka_2.11-2.4.1.tgz -C /opt/module/
#修改名称
[root@node01 module]# mv kafka_2.11-2.4.1/  kafka

2、修改配置文件

$ cd /opt/module/kafka/config
$ vim server.properties 

按以下内容配置:

#kafka运行日志(数据)存放的路径,路径不需要提前创建,kafka自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔 
log.dirs=/opt/module/kafka/datas 

#配置连接Zookeeper集群地址(在zk根目录下创建/kafka,方便管理) 
zookeeper.connect=node01:2181,node02:2181,node03:2181/kafka 

3、使用分发脚本分发安装包

$ xsync kafka/

4、修改另外两台服务器配置文件/opt/module/kafka/config/server.properties中的broker.id=1、broker.id=2 

5、在/etc/profile.d/my_env.sh文件中增加kafka环境变量配置 (三台服务器都需要配置)

#KAFKA_HOME 
export KAFKA_HOME=/opt/module/kafka 
export PATH=$PATH:$KAFKA_HOME/bin 

#保存退出,刷新环境变量
$ source /etc/profile

6、编写Kafka群起脚本(kfk.sh)

#!/bin/bash
case $1 in
"start"){  
        for i in node01 node02 node03           
        do  
                echo " --------启动 $i Kafka-------"  
                ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"  
                done 
        };; 
"stop"){  
        for i in node01 node02 node03           
        do  
                echo " --------停止 $i Kafka-------"  
                ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh "  
        done 
        };; 
esac
#给脚本添加执行权限
$ chmod +x kfk.sh

7、启动kafka集群

注意:先启动Zookeeper集群,然后启动Kafka。停止Kafka集群时,一定要等Kafka所有节点进程全部停止后再停止Zookeeper集群。因为Zookeeper集群当中记录着Kafka集群相关信息,Zookeeper集群一旦先停止,Kafka集群就没有办法再获取停止进程的信息,只能手动杀死Kafka进程了。 

#启动集群
kfk.sh start

#关闭集群
kfk.sh stop

 

这篇关于Kafka 入门的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!