消息队列MQ

kafka 实战 (ELK+kafka)

本文主要是介绍kafka 实战 (ELK+kafka),对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

准备

系统IP应用myiddrocker.id
CentOS 7 192.168.100.20 Elasticserach+kibana+kafka+zookeeper 1 1
CentOS 7 192.168.100.30 Elasticserach+kafka+zookeeper+filebeat+logstash 2 2
CentOS 7 192.168.100.40 Elasticserach+kafka+zookeeper+filebeat 3 3

先配置好 Elasticserach 和 Kibana 教程:ELK 日志分析系统 - 花花de代码生活 - 博客园 (cnblogs.com) 做完 kibana就可以了。

一、zookeeper

1、下载 

wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.5.9/apache-zookeeper-3.5.9-bin.tar.gz --no-check-certificate

2、安装及配置 zookeeper

[root@elk-1 ~]# tar -zxf apache-zookeeper-3.6.1-bin.tar.gz -C /usr/local/
[root@elk-1 ~]# mv /usr/local/apache-zookeeper-3.6.1  /usr/local/zookeeper
[root@elk-1 ~]# cd /usr/local/zookeeper
[root@elk-1 zookeeper]# mkdir  {data,logs}
[root@elk-1 zookeeper]# echo 1 > data/myid
[root@elk-1 zookeeper]# cp conf/zoo_sample.cfg conf/zoo.cfg

# 配置文件
[root@elk-1 zookeeper]# vim conf/zoo.cfg
# The number of milliseconds of each tick
tickTime=2000  #zk之间⼼跳间隔2秒
# The number of ticks that the initial
# synchronization phase can take
initLimit=10   #LF初始通信时限
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5   #LF同步通信时限
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/tmp/zookeeper  #Zookeeper保存数据的⽬录
dataLogDir=/usr/local/zookeeper/logs   #Zookeeper保存⽇志⽂件的⽬录
# the port at which the clients will connect
clientPort=2181  #客户端连接 Zookeeper 服务器的端⼝
admin.serverPort=8888   #默认占⽤8080端⼝
# the maximum number of client connections.
# increase this if you need to handle more clients
maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
##http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
autopurge.purgeInterval=1
server.1=elk-1:2888:3888
server.2=elk-2:2888:3888
server.3=elk-3:2888:3888

# 修改配置文件
echo export ZOOKEEPER_HOME=/usr/local/zookeeper >> /etc/profile
echo export PATH=$ZOOKEEPER_HOME/bin:$PATH  >> /etc/profile
# 刷新变量
source /etc/profile
# 测试变量是否成功
echo $ZOOKEEPER_HOME

# 启动 zk
[root@elk-1 zookeeper]# bin/zkServer.sh start
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED    //启动成功

[root@elk-1 ~]# scp /usr/local/zookeeper/conf/zoo.cfg elk-2:/usr/local/zookeeper/conf/zoo.cfg

如果开启不了,查看配置文件没有问题你就查看 data/myid 是否有问题。elk-2 与 elk-3 的这个配置文件不需要更改。

[root@elk-1 ~]# scp /usr/local/zookeeper/conf/zoo.cfg elk-2:/usr/local/zookeeper/conf/zoo.cfg

elk-2:
[root@elk-2 zookeeper]# mkdir {data,logs}
[root@elk-2 zookeeper]# echo 2 > data/myid

elk-3:
[root@elk-3 zookeeper]# mkdir {data,logs}
[root@elk-3 zookeeper]# echo 3 > data /myid

3、测试

 1 # 查看节点状态
 2 [root@elk-1 zookeeper]# /usr/local/zookeeper/bin/zkServer.sh status
 3 /usr/bin/java
 4 ZooKeeper JMX enabled by default
 5 Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
 6 Client port found: 2181. Client address: localhost.
 7 Mode: follower
 8 
 9 [root@elk-1 zookeeper]# bin/zkCli.sh -server elk-2:2181
10 [zk: elk-1:2181(CONNECTED) 0]

二、kafka

1、下载 kafka

wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.8.1/kafka_2.12-2.8.1.tgz --no-check-certificate

2、安装及配置

[root@elk-1 ~]# tar -zxf kafka_2.12-2.8.1.tgz -C /usr/local/
[root@elk-1 ~]# cp /usr/local/kafka_2.12-2.8.1 /usr/local/kafka
[root@elk-1 ~]# cd /usr/local/kafka/
[root@elk-1 kafka]# mkdir logs
[root@elk-1 kafka]# cp config/server.properties{,.bak}

#配置文件
[root@elk-1 kafka]# vim config/server.properties
# broker的id 需要于本机 zk 的 myid 里的数一样
broker.id=1
# 监听端口
listeners=PLAINTEXT://192.168.100.20:9092
# 处理消息的最大线程数,一般情况下不要去修改
num.network.threads=3
# socket的发送缓冲区
num.io.threads=8
# socket的发送缓冲区
socket.send.buffer.bytes=102400    
# socket接收缓冲区
socket.receive.buffer.bytes=102400
# 请求最大值
socket.request.max.bytes=104857600
# log日志文件
log.dirs=/opt/data/kafka/logs
# 分区
num.partitions=3
# kafka会使用可配置的线程池来处理日志片段
num.recovery.threads.per.data.dir=1    
#断分偏移量
offsets.topic.replication.factor=1
# 传输日志的复制状态
transaction.state.log.replication.factor=1
# 传输日志最小的信息
transaction.state.log.min.isr=1
# 数据存储的最大时间
log.retention.hours=168                
# topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖
log.segment.bytes=1073741824
# 检查文件大小检查的周期时间
log.retention.check.interval.ms=300000
# zookeeper连接,维护集群状态
zookeeper.connect=192.168.100.20:2181,192.168.100.30:2181,192.168.100.40:2181
# 连接超时
zookeeper.connection.timeout.ms=18000
# 指定时间内没有消息到达就抛出异常,一般不需要改
group.initial.rebalance.delay.ms=0

# 修改环境变量⽂件
echo export KAFKA_HOME=/usr/local/kafka >> /etc/profile
echo export PATH=$KAFKA_HOME/bin:$PATH >> /etc/profile
source /etc/profile
echo $KAFKA_HOME

# 启动 kafka 两种开启方式
[root@elk-1 kafka]# bin/kafka-server-start.sh -daemon config/server.properties
[root@elk-1 kafka]# nohup bin/kafka-server-start.sh config/server.properties > logs/kafka.log 2>1 &
[1] 14653
# 查看kafka是否开启成功
[root@elk-1 kafka]# jps
14481 QuorumPeerMain
13538 Elasticsearch
15061 Jps
13239 -- process information unavailable
14653 Kafka

主要修改 elk-2 与 elk-3 修改 kafka 配置文件

[root@elk-1 ~]# scp /usr/local/kafka/config/server.properties elk-2:/usr/local/kafka/config/server.properties
#第二个节点的
broker.id=2
listeners=PLAINTEXT://192.168.100.30:9092
#第三个节点的
broker.id=3
listeners=PLAINTEXT://192.168.100.40:909

3、测试 kafka

[root@elk-1 zookeeper]# yum install -y lsof
[root@elk-1 kafka]# lsof -i:2181
COMMAND   PID USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
java    13527 root   48u  IPv6  50879      0t0  TCP *:eforward (LISTEN)
java    13527 root   64u  IPv6  53903      0t0  TCP elk-1:eforward->elk-1:36448 (ESTABLISHED)
java    13527 root   66u  IPv6  54636      0t0  TCP elk-1:eforward->elk-3:43152 (ESTABLISHED)
java    13601 root  122u  IPv6  54546      0t0  TCP elk-1:36448->elk-1:eforward (ESTABLISHED)

# 创建一个 test-ken 文件
[root@elk-1 kafka]# bin/kafka-topics.sh --create --bootstrap-server elk-1:9092 --replication-factor 3 --partitions 2 --topic test-ken
# 在其他节点查看是否升成
[root@elk-3 kafka]# bin/kafka-topics.sh --list --bootstrap-server 192.168.100.30:9092
test-ken

# 查看Topic详情
[root@elk-1 kafka]# bin/kafka-topics.sh --describe --bootstrap-server elk-1:9092 --topic test-ken
Topic: test-ken TopicId: n_OMjCKNQ7SsabGsfen_cA PartitionCount: 2       ReplicationFactor: 3    Configs: segment.bytes=1073741824
        Topic: test-ken Partition: 0    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3
        Topic: test-ken Partition: 1    Leader: 2       Replicas: 2,3,1 Isr: 2,3,1

# 从消费者 elk-1 输出消息 elk-2 即可就可以收到消息
[root@elk-1 kafka]# bin/kafka-console-producer.sh --broker-list elk-1:9092 --topic test-ken
>>test    
>ceshicgo
>
# elk-2 这里就可以直接可以收到
[root@elk-2 kafka]# bin/kafka-console-consumer.sh --bootstrap-server elk-2:9092 --topic test-ken
test
ceshicgo

--------  扩列知识 --------
# 删除Topic:
[root@elk-1 kafka]# bin/kafka-topics.sh --delete --bootstrap-server elk-1:9092 --topic test-ken
# 查看删除信息:
[root@elk-1 kafka]# bin/kafka-topics.sh --list --bootstrap-server elk-2:9092

三、filebeat

 1、下载

[root@elk-2 ~]# wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.0.0-x86_64.rpm

2、安装及配置、

(1.)elk-1

[root@elk1 ~]# rpm -ivh filebeat-6.0.0-x86_64.rpm

[root@elk-1 ~]# vim /etc/filebeat/filebeat.yml
filebeat.prospectors:
- type: log
 enabled: true
 paths:
      - /var/log/es_access.log   //此处可⾃⾏改为想要监听的⽇志⽂件
output.kafka:
   enabled: true
   hosts: ["elk-1:9092","elk-2:9092","elk-3:9092"]
   topic: es_access           //对应zookeeper⽣成的topic
   keep_alive: 10s
必须注释

 

写到最下即可

 

 

 [root@elk-1 ~]# systemctl start filebeat

(2.)elk-2

跟上一步一样就是修改了一下要看到日志文件

[root@elk-2 ~]# rpm -ivh filebeat-6.0.0-x86_64.rpm

编辑配置⽂件:
[root@elk2 ~]# vim /etc/filebeat/filebeat.yml
filebeat.prospectors:
- type: log
  enabled: true
  paths:
       - /var/log/vmware-network.log //此处可⾃⾏改为想要监听的⽇志⽂件
output.kafka:
    enabled: true
    hosts: ["elk-1:9092","elk-2:9092","elk-3:9092"]
    topic: vmware-network //对应zookeeper⽣成的topic
    keep_alive: 10s

[root@elk2 ~]# systemctl start filebeat

(3.)elk-3

[root@elk-3 ~]# rpm -ivh filebeat-6.0.0-x86_64.rpm

编辑配置⽂件:
[root@elk-3 ~]# vim /etc/filebeat/filebeat.yml
filebeat.prospectors:
- type: log
   enabled: true
   paths:
        - /var/log/access.log //此处可⾃⾏改为想要监听的⽇志⽂件
output.kafka:
   enabled: true
   hosts: ["elk-1:9092","elk-2:9092","elk-3:9092"]
   topic: access //对应zookeeper⽣成的topic
   keep_alive: 10s

[root@elk-3 ~]# systemctl start filebeat

四、Logstash

1、下载 

[root@elk_2 ~]# wget https://artifacts.elastic.co/downloads/logstash/logstash-6.0.0.rpm

2、安装及部署

[root@elk-2 ~]# rpm -ivh logstash-6.0.0.0.rpm

[root@elk1 ~]# vi /etc/logstash/logstash.yml
http.host: "192.168.100.30"

# 配置logstash收集es_access的⽇志:
[root@elk-2 ~]# cat /etc/logstash/conf.d/es_access.conf 
# Settings file in YAML
input {
    kafka {
        bootstrap_servers => "192.168.100.20:9092,192.168.100.30:9092,192.168.100.40:9092"
        group_id => "logstash"
        auto_offset_reset => "earliest"
        decorate_events => true
        topics => ["es_access"]
        type => "messages"
    }
}

output {
   if [type] == "messages" {
        elasticsearch {
            hosts => ["192.168.100.20:9200","192.168.100.30:9200","192.168.100.40:9200"]
            index => "es_access-%{+YYYY-MM-dd}"
        }
   }
}

# 配置收集vmare的日志
[root@elk-2 ~]# cat /etc/logstash/conf.d/vmware.conf 
# Settings file in YAML
input {
    kafka {
        bootstrap_servers => "192.168.100.20:9092,192.168.100.30:9092,192.168.100.40:9092"
        group_id => "logstash"
        auto_offset_reset => "earliest"
        decorate_events => true
        topics => ["vmare"]
        type => "messages"
    }
}

output {
   if [type] == "messages" {
        elasticsearch {
            hosts => ["192.168.100.20:9200","192.168.100.30:9200","192.168.100.40:9200"]
            index => "vmare-%{+YYYY-MM-dd}"
        }
   }
}

# 配置收集nginx日志
[root@elk-2 ~]# cat /etc/logstash/conf.d/nginx.conf 
# Settings file in YAML
input {
    kafka {
        bootstrap_servers => "192.168.100.20:9092,192.168.100.30:9092,192.168.100.40:9092"
        group_id => "logstash"
        auto_offset_reset => "earliest"
        decorate_events => true
        topics => ["nginx"]
        type => "messages"
    }
}

output {
   if [type] == "messages" {
        elasticsearch {
            hosts => ["192.168.100.20:9200","192.168.100.30:9200","192.168.100.40:9200"]
            index => "nginx-%{+YYYY-MM-dd}"
        }
   }
}

[root@elk_2 ~]# chmod 755 /var/log/messages
[root@elk_2 ~]# chown -R logstash /var/lib/logstash/
[root@elk-2 ~]# ln -s /usr/share/logstash/bin/logstash /usr/bin
# 测试脚本是否可以使用 返回ok就是可以使用
[root@elk-2 ~]# logstash --path.settings /etc/logstash/ -f /etc/logstash/conf.d/es_access.conf --config.test_and_exit

3、测试是否成功

第一步:(因为 nginx 跟 vmware 没有这个日志文件所以没有索引)

# 查看是否升成日志索引
[root@elk-1 ~]# curl '192.168.100.20:9200/_cat/indices?v'
health status index                uuid                   pri rep docs.count docs.deleted store.size pri.store.size
yellow open   es_access-2022-04-18 PfGB-dZUTWC8goIxhHgUlA   5   1         11            0       36kb           36kb
yellow open   .kibana              lp_TV8zKTWay24UMkEPtqQ   1   1          2            0      6.8kb          6.8kb

第二步:

第三步:

[root@elk-1 ~]# echo 111 > /var/log/es_access.log 
[root@elk-1 ~]# cat /var/log/es_access.log
111

第四步:

这篇关于kafka 实战 (ELK+kafka)的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!