目录
1.将生成的jar包拷贝到CentOS0 /opt/module 分发jar包
2.在CentOS0上执行jar程序
配置登录远程服务器立即source一下环境变量
4./bin目录下创建脚本lg.sh
5./bin目录下创建集群时间同步修改脚本dt.sh
6.集群所有进程查看脚本
7.在/opt/module/flume/conf目录下创建file-flume-kafka.conf文件
8.拦截器打包之后,只需要单独包,不需要将依赖的包上传。打包之后要放入flume的lib文件夹下面(YSJ-1.0-SNAPSHOT.jar)
9.分发Flume到其他主机
10.日志采集Flume启动停止脚本
11.Kafka集群启动停止脚本
12.查看所有Kafka Topic
13.创建 Kafka Topic
14.生产消息
15.消费消息
16.Kafka Manager安装
17.进入到/opt/module/kafka-manager-1.3.3.22/conf
18.Kafka Manager启动停止脚本
19.CentOS0:/opt/module/flume/conf目录下创建kafka-flume-hdfs.conf文件
20.CentOS0服务器的/opt/module/flume/conf/flume-env.sh文件中增加如下配置
[root@CentOS0 module]$ java -classpath 包名 com.Charlie.Guo.AppMain > /opt/module/test.log
[root@CentOS0/1/2 ~]$ echo source /etc/profile >> ~/.bashrc
[root@CentOS0 bin]$ vim lg.sh
#! /bin/bash for i in CentOS0 CentOS1 do ssh $i "java -classpath /opt/module/YSJ-1.0-SNAPSHOT-jar-with-dependencies.jar com.Charlie.Guo.AppMain >/opt/module/test.log" done
[root@CentOS0 bin]$ chmod 777 lg.sh [root@CentOS0 module]$ lg.sh [root@CentOS0 module]# cd data 然后ls
[root@CentOS0 bin]$ vim dt.sh
#!/bin/bash log_date=$1" $2" echo $log_date for i in centos0 centos1 centos2 do ssh -t $i "sudo date -s '$log_date'" done
[root@CentOS0 bin]$ chmod 777 dt.sh [root@CentOS0 bin]$ dt.sh 2021-06-02
[root@CentOS0 bin]$ vim xcall.sh
#! /bin/bash for i in centos0 centos1 centos2 do echo --------- $i ---------- ssh $i "$*" done
赋权,执行
[root@CentOS0 conf]$ vim file-flume-kafka.conf
a1.sources=r1 a1.channels=c1 c2 # configure source a1.sources.r1.type = TAILDIR a1.sources.r1.positionFile = /opt/module/flume/test/log_position.json a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /opt/module/data/app.+ a1.sources.r1.fileHeader = true a1.sources.r1.channels = c1 c2 #interceptor a1.sources.r1.interceptors = i1 i2 a1.sources.r1.interceptors.i1.type = com.Charlie.Guo.ETLInterceptor$Builder a1.sources.r1.interceptors.i2.type = com.Charlie.Guo.TypeInterceptor$Builder a1.sources.r1.selector.type = multiplexing a1.sources.r1.selector.header = topic a1.sources.r1.selector.mapping.topic_start = c1 a1.sources.r1.selector.mapping.topic_event = c2 # configure channel a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.kafka.bootstrap.servers = centos0:9092,centos1:9092,centos2:9092 a1.channels.c1.kafka.topic = topic_start a1.channels.c1.parseAsFlumeEvent = false a1.channels.c1.kafka.consumer.group.id = flume-consumer a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.c2.kafka.bootstrap.servers = centos0:9092,centos1:9092,centos2:9092 a1.channels.c2.kafka.topic = topic_event a1.channels.c2.parseAsFlumeEvent = false a1.channels.c2.kafka.consumer.group.id = flume-consumer
[root@CentOS0 module]$ xsync flume/ [root@CentOS0 flume]$ bin/flume-ng agent --name a1 --conf-file conf/file-flume-kafka.conf &
[root@CentOS0 bin]$ vim f1.sh
#! /bin/bash case $1 in "start"){ for i in centos0 centos1 do echo " --------启动 $i 采集flume-------" ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/dev/null 2>&1 &" done };; "stop"){ for i in centos1 centos2 do echo " --------停止 $i 采集flume-------" ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk '{print \$2}' | xargs kill" done };; esac
[root@CentOS0 bin]$ vim f2.sh
#! /bin/bash case $1 in "start"){ for i in CentOS2 do echo " --------启动 $i 消费flume-------" ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log.txt 2>&1 &" done };; "stop"){ for i in CentOS2 do echo " --------停止 $i 消费flume-------" ssh $i "ps -ef | grep kafka-flume-hdfs | grep -v grep |awk '{print \$2}' | xargs kill" done };; esac
[root@hadoop102 bin]$ vim kf.sh
#! /bin/bash case $1 in "start"){ echo " -------- 启动 KafkaManager -------" nohup /opt/module/kafka-manager-1.3.3.15/bin/kafka-manager -Dhttp.port=7456 >start.log 2>&1 & };; "stop"){ echo " -------- 停止 KafkaManager -------" ps -ef | grep ProdServerStart | grep -v grep |awk '{print $2}' | xargs kill };; esac
[root@CentOS0 kafka]$ bin/kafka-topics.sh --zookeeper CentOS0:2181 --list
[root@CentOS0 kafka]$ bin/kafka-topics.sh --zookeeper CentOS0:2181, CentOS1:2181, CentOS2:2181 --create --replication-factor 1 --partitions 1 --topic topic_start
[root@CentOS0 kafka]$ bin/kafka-topics.sh --zookeeper CentOS0:2181, CentOS1:2181, CentOS2:2181 --create --replication-factor 1 --partitions 1 --topic topic_event
注意:这里的两个Topic需要提前在HDFS中创建路径
hadoop fs -mkdir -p + 路径
[root@CentOS0 kafka]$ bin/kafka-console-producer.sh \ --broker-list CentOS0:9092 --topic topic_start >hello world >root root
[root@CentOS0 kafka]$ bin/kafka-topics.sh --zookeeper CentOS0:2181 \ --describe --topic topic_start
[root@CentOS0 module]$ unzip kafka-manager-1.3.3.22.zip
[root@CentOS0 conf]$ vim application.conf
修改为:
kafka-manager.zkhosts="CentOS0:2181,CentOS1:2181,CentOS2:2181"
[root@CentOS0 bin]$ vim km.sh
#! /bin/bash case $1 in "start"){ echo " -------- 启动 KafkaManager -------" nohup /opt/module/kafka-manager-1.3.3.15/bin/kafka-manager -Dhttp.port=7456 >start.log 2>&1 & };; "stop"){ echo " -------- 停止 KafkaManager -------" ps -ef | grep ProdServerStart | grep -v grep |awk '{print $2}' | xargs kill };; esac
vim kafka-flume-hdfs.conf
## 组件 a1.sources=r1 r2 a1.channels=c1 c2 a1.sinks=k1 k2 ## source1 a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.batchSize = 5000 a1.sources.r1.batchDurationMillis = 2000 a1.sources.r1.kafka.bootstrap.servers = CentOS0:9092,CentOS1:9092,CentOS2:9092 a1.sources.r1.kafka.topics=topic_start ## source2 a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r2.batchSize = 5000 a1.sources.r2.batchDurationMillis = 2000 a1.sources.r2.kafka.bootstrap.servers = CentOS0:9092,CentOS1:9092,CentOS2:9092 a1.sources.r2.kafka.topics=topic_event ## channel1 a1.channels.c1.type = file a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1 a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1/ a1.channels.c1.maxFileSize = 2146435071 a1.channels.c1.capacity = 1000000 a1.channels.c1.keep-alive = 6 ## channel2 a1.channels.c2.type = file a1.channels.c2.checkpointDir = /opt/module/flume/checkpoint/behavior2 a1.channels.c2.dataDirs = /opt/module/flume/data/behavior2/ a1.channels.c2.maxFileSize = 2146435071 a1.channels.c2.capacity = 1000000 a1.channels.c2.keep-alive = 6 ## sink1 a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_start/%Y-%m-%d a1.sinks.k1.hdfs.filePrefix = logstart- a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = second ##sink2 a1.sinks.k2.type = hdfs a1.sinks.k2.hdfs.path = /origin_data/gmall/log/topic_event/%Y-%m-%d a1.sinks.k2.hdfs.filePrefix = logevent- a1.sinks.k2.hdfs.round = true a1.sinks.k2.hdfs.roundValue = 10 a1.sinks.k2.hdfs.roundUnit = second ## 不要产生大量小文件 a1.sinks.k1.hdfs.rollInterval = 10 a1.sinks.k1.hdfs.rollSize = 134217728 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k2.hdfs.rollInterval = 10 a1.sinks.k2.hdfs.rollSize = 134217728 a1.sinks.k2.hdfs.rollCount = 0 ## 控制输出文件是原生文件。 a1.sinks.k1.hdfs.fileType = CompressedStream a1.sinks.k2.hdfs.fileType = CompressedStream a1.sinks.k1.hdfs.codeC = lzop a1.sinks.k2.hdfs.codeC = lzop ## 拼装 a1.sources.r1.channels = c1 a1.sinks.k1.channel= c1 a1.sources.r2.channels = c2 a1.sinks.k2.channel= c2
export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"
同步配置到其他机器
conf]$ xsync flume-env.sh