下载地址:http://archive.apache.org/dist/flume/
安装:
(1)将apache-flume-1.9.0-bin.tar.gz上传到linux的/opt/software目录下
(2)解压apache-flume-1.9.0-bin.tar.gz到/opt/module/目录下
tar -zxf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/
(3)修改apache-flume-1.9.0-bin的名称为flume
mv /opt/module/apache-flume-1.9.0-bin /opt/module/flume
(4)将lib文件夹下的guava-11.0.2.jar删除以兼容Hadoop 3.1.3
rm /opt/module/flume/lib/guava-11.0.2.jar
配置环境变量:
vim /etc/profile.d/my_env.sh
将flume的环境变量导进去
export FLUME_HOME=/opt/module/flume-1.9.0
export PATH=$PATH:$FLUME_HOME/bin
测试:
输入flume-ng 若出现语法提示则成功
A.案例需求:实时监控单个文件,并上传到HDFS中 hdfs sink 为常用sink
- 确定 sources sink channel 这三个组件要使用什么类型
exec hdfs memory- 到官方文档看需要填什么参数
#name a1.sources = s1 a1.sinks = k1 a1.channels = c1 #sources 定义 a1.sources.s1.type = exec a1.sources.s1.command = tail -f /opt/module/flume-1.9.0/jobs/tail.txt #channel 定义 a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 #sinks 定义 a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume/%Y%m%d/%H #上传文件的前缀 a1.sinks.k1.hdfs.filePrefix = logs- #是否按照时间滚动文件夹 a1.sinks.k1.hdfs.round = true #多少时间单位创建一个新的文件夹 a1.sinks.k1.hdfs.roundValue = 1 #重新定义时间单位 a1.sinks.k1.hdfs.roundUnit = hour #是否使用本地时间戳 a1.sinks.k1.hdfs.useLocalTimeStamp = true #积攒多少个Event才flush到HDFS一次 a1.sinks.k1.hdfs.batchSize = 100 #设置文件类型,可支持压缩 a1.sinks.k1.hdfs.fileType = DataStream #多久生成一个新的文件 a1.sinks.k1.hdfs.rollInterval = 60 #设置每个文件的滚动大小 a1.sinks.k1.hdfs.rollSize = 134217700 #文件的滚动与Event数量无关 a1.sinks.k1.hdfs.rollCount = 0 #bind a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
B.案例需求:使用Flume监听整个目录的文件,并上传至HDFS (自有当有新文件时才会上传)
#name a1.sources = r1 a1.channels = c1 a1.sink1 = k1 #source a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /opt/module/flume-1.9.0/jobs/spooldir a1.sources.r1.fileSuffix = .COMPLETED a1.sources.r1.fileHeader = true #channels a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 #sinks a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume/%Y%m%d/%H #上传文件的前缀 a1.sinks.k1.hdfs.filePrefix = logs- #是否按照时间滚动文件夹 a1.sinks.k1.hdfs.round = true #多少时间单位创建一个新的文件夹 a1.sinks.k1.hdfs.roundValue = 1 #重新定义时间单位 a1.sinks.k1.hdfs.roundUnit = hour #是否使用本地时间戳 a1.sinks.k1.hdfs.useLocalTimeStamp = true #积攒多少个Event才flush到HDFS一次 a1.sinks.k1.hdfs.batchSize = 100 #设置文件类型,可支持压缩 a1.sinks.k1.hdfs.fileType = DataStream #多久生成一个新的文件 a1.sinks.k1.hdfs.rollInterval = 60 #设置每个文件的滚动大小 a1.sinks.k1.hdfs.rollSize = 134217700 #文件的滚动与Event数量无关 a1.sinks.k1.hdfs.rollCount = 0 #bind a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 启动: flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/spooling-flume-hdfs.conf -n a1 -Dflume.root.logger=INFO,console
C.使用Flume监听整个目录的实时追加文件,并上传至HDFS (目录需要分组,只有是组内的文件追加内容时才会上传)(分组一般用正则表达式识别)
#name a1.sources = r1 a1.channels = c1 a1.sinks = k1 #source a1.sources.r1.type = TAILDIR a1.sources.r1.filegroups = f1 f2 a1.sources.r1.filegroups.f1 = /opt/module/flume-1.9.0/jobs/taildir/.*\.txt a1.sources.r1.filegroups.f2 = /opt/module/flume-1.9.0/jobs/taildir/.*\.log a1.sources.r1.positionFile = /opt/module/flume-1.9.0/jobs/position/position.json #channel a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 #sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume/%Y%m%d/%H #上传文件的前缀 a1.sinks.k1.hdfs.filePrefix = logs- #是否按照时间滚动文件夹 a1.sinks.k1.hdfs.round = true #多少时间单位创建一个新的文件夹 a1.sinks.k1.hdfs.roundValue = 1 #重新定义时间单位 a1.sinks.k1.hdfs.roundUnit = hour #是否使用本地时间戳 a1.sinks.k1.hdfs.useLocalTimeStamp = true #积攒多少个Event才flush到HDFS一次 a1.sinks.k1.hdfs.batchSize = 100 #设置文件类型,可支持压缩 a1.sinks.k1.hdfs.fileType = DataStream #多久生成一个新的文件 a1.sinks.k1.hdfs.rollInterval = 60 #设置每个文件的滚动大小 a1.sinks.k1.hdfs.rollSize = 134217700 #文件的滚动与Event数量无关 a1.sinks.k1.hdfs.rollCount = 0 #bind a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
D.案例需求: 复制
使用Flume-1监控文件变动,Flume-1将变动内容传递给Flume-2,同时Flume-1将变动内容传递给Flume-3,Flume-3负责输出到Local FileSystem,flume2传递到hdfs。
sources channels sinks
分析: flume1 taildir memory avro
flume2 avro memory hdfs
flume3 avro memory file_roll
flume3.conf #name a3.sources = r1 a3.channels = c1 a3.sinks = k1 #sources a3.sources.r1.type = avro a3.sources.r1.bind = localhost a3.sources.r1.port = 8888 #channels a3.channels.c1.type =memory a3.channels.c1.capacity = 10000 a3.channels.c1.transactionCapacity = 100 #sinks a3.sinks.k1.type = file_roll a3.sinks.k1.sink.directory = /opt/module/flume-1.9.0/jobs/fileroll #bind a3.sources.r1.channels = c1 a3.sinks.k1.channel = c1 flume2.conf #name a2.sources = r1 a2.channels = c1 a2.sinks = k1 a2.sources.r1.type = avro a2.sources.r1.bind = localhost a2.sources.r1.port = 7777 #channels a2.channels.c1.type =memory a2.channels.c1.capacity = 10000 a2.channels.c1.transactionCapacity = 100 #sinks a2.sinks.k1.type = hdfs a2.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume/%Y%m%d/%H a2.sinks.k1.hdfs.filePrefix = logs- a2.sinks.k1.hdfs.round = true a2.sinks.k1.hdfs.roundValue = 1 a2.sinks.k1.hdfs.roundUnit = hour a2.sinks.k1.hdfs.useLocalTimeStamp = true a2.sinks.k1.hdfs.batchSize = 100 a2.sinks.k1.hdfs.fileType = DataStream a2.sinks.k1.hdfs.rollInterval = 60 a2.sinks.k1.hdfs.rollSize = 134217700 a2.sinks.k1.hdfs.rollCount = 0 #bind a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1 flume1 #name a1.sources = r1 a1.channels = c1 c2 a1.sinks = k1 k2 #sources a1.sources.r1.type = TAILDIR a1.sources.r1.filegroups = f1 f2 a1.sources.r1.filegroups.f1 = /opt/module/flume-1.9.0/jobs/taildir/.*\.txt a1.sources.r1.filegroups.f2 = /opt/module/flume-1.9.0/jobs/taildir/.*\.log a1.sources.r1.positionFile = /opt/module/flume-1.9.0/jobs/position/position.json #ChannelSelector a1.sources.r1.selector.type = replicating #channels a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 10000 a1.channels.c2.transactionCapacity = 100 #sinks a1.sinks.k1.type = avro a1.sinks.k1.hostname = localhost a1.sinks.k1.port = 7777 a1.sinks.k2.type = avro a1.sinks.k2.hostname = localhost a1.sinks.k2.port = 8888 #bind a1.sources.r1.channels = c1 c2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2 启动3:flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/replicating/flume3.conf -n a3 -Dflume.root.logger=INFO,console 启动2:flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/replicating/flume2.conf -n a2 -Dflume.root.logger=INFO,console 启动1:flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/replicating/flume1.conf -n a1 -Dflume.root.logger=INFO,console
案例需求:负载均衡案例
使用Flume-1监控文件变动,Flume-1将变动内容(轮询或者随机传递给)传递给Flume-2,flume-3 Flume-3负责输出到Local FileSystem,flume2传递到hdfs。
sources channels sinks
分析: flume1 taildir memroy avro
flume2 avro memroy hdfs
flume3 avro memroy file_roll
flume3.conf #name a3.sources = r1 a3.channels = c1 a3.sinks = k1 #sources a3.sources.r1.type = avro a3.sources.r1.bind = localhost a3.sources.r1.port = 8888 #channels a3.channels.c1.type =memory a3.channels.c1.capacity = 10000 a3.channels.c1.transactionCapacity = 100 #sinks a3.sinks.k1.type = file_roll a3.sinks.k1.sink.directory = /opt/module/flume-1.9.0/jobs/fileroll #bind a3.sources.r1.channels = c1 a3.sinks.k1.channel = c1 flume2.conf #name a2.sources = r1 a2.channels = c1 a2.sinks = k1 a2.sources.r1.type = avro a2.sources.r1.bind = localhost a2.sources.r1.port = 7777 #channels a2.channels.c1.type =memory a2.channels.c1.capacity = 10000 a2.channels.c1.transactionCapacity = 100 #sinks a2.sinks.k1.type = hdfs a2.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume/%Y%m%d/%H a2.sinks.k1.hdfs.filePrefix = logs- a2.sinks.k1.hdfs.round = true a2.sinks.k1.hdfs.roundValue = 1 a2.sinks.k1.hdfs.roundUnit = hour a2.sinks.k1.hdfs.useLocalTimeStamp = true a2.sinks.k1.hdfs.batchSize = 100 a2.sinks.k1.hdfs.fileType = DataStream a2.sinks.k1.hdfs.rollInterval = 60 a2.sinks.k1.hdfs.rollSize = 134217700 a2.sinks.k1.hdfs.rollCount = 0 #bind a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1 flume1.conf #name a1.sources = r1 a1.channels = c1 a1.sinks = k1 k2 #sources a1.sources.r1.type = TAILDIR a1.sources.r1.filegroups = f1 f2 a1.sources.r1.filegroups.f1 = /opt/module/flume-1.9.0/jobs/taildir/.*\.txt a1.sources.r1.filegroups.f2 = /opt/module/flume-1.9.0/jobs/taildir/.*\.log a1.sources.r1.positionFile = /opt/module/flume-1.9.0/jobs/position/position.json #channels a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 #sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = localhost a1.sinks.k1.port = 7777 a1.sinks.k2.type = avro a1.sinks.k2.hostname = localhost a1.sinks.k2.port = 8888 #Sink Processor a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = load_balance a1.sinkgroups.g1.processor.selector = random #bind a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c1 启动3: flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/loadbalancing/flume3.conf -n a3 -Dflume.root.logger=INFO,console 启动2: flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/loadbalancing/flume2.conf -n a2 -Dflume.root.logger=INFO,console 启动1: flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/loadbalancing/flume1.conf -n a1 -Dflume.root.logger=INFO,console
案例需求:故障转移
使用Flume-1监控端口,Flume-1将端口数据传递给Flume-2(为active状态),当flume2出现故障时,将数据传递给flume3
flume sources channels sinks
分析: flume1 netcat memory avro
flume2 avro memory logger
flume3 avro memory logger
flume3.conf
#name a3.sources = r1 a3.channels = c1 a3.sinks = k1 #sources a3.sources.r1.type = avro a3.sources.r1.bind = localhost a3.sources.r1.port = 8888 #channels a3.channels.c1.type =memory a3.channels.c1.capacity = 10000 a3.channels.c1.transactionCapacity = 100 #sinks a3.sinks.k1.type = logger #bind a3.sources.r1.channels = c1 a3.sinks.k1.channel = c1 flume2.conf #name a2.sources = r1 a2.channels = c1 a2.sinks = k1 a2.sources.r1.type = avro a2.sources.r1.bind = localhost a2.sources.r1.port = 7777 #channels a2.channels.c1.type =memory a2.channels.c1.capacity = 10000 a2.channels.c1.transactionCapacity = 100 #sinks a2.sinks.k1.type = logger #bind a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1 flume1.conf #name a1.sources = r1 a1.channels = c1 a1.sinks = k1 k2 #sources a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 5555 #channels a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 #sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = localhost a1.sinks.k1.port = 7777 a1.sinks.k2.type = avro a1.sinks.k2.hostname = localhost a1.sinks.k2.port = 8888 #Sink Processor a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = failover a1.sinkgroups.g1.processor.priority.k1 = 5 a1.sinkgroups.g1.processor.priority.k2 = 10 #bind a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c1 启动3: flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/failover/flume3.conf -n a3 -Dflume.root.logger=INFO,console 启动2: flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/failover/flume2.conf -n a2 -Dflume.root.logger=INFO,console 启动1: flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/failover/flume1.conf -n a1 -Dflume.root.logger=INFO,console
案例需求:聚合
hadoop102上的Flume-1监控文件/opt/module/flume-1.9.0/jobs/taildir,
hadoop103上的Flume-2监控某一个端口的数据流,
Flume-1与Flume-2将数据发送给hadoop104上的Flume-3,Flume-3将最终数据打印到控制台。
flume sources channels sinks
分析: flume1 taildir memory avro
flume2 netcat memory avro
flume3 avro memory logger
flume3.conf #name a3.sources = r1 a3.channels = c1 a3.sinks = k1 #sources a3.sources.r1.type = avro a3.sources.r1.bind = hadoop104 a3.sources.r1.port = 8888 #channels a3.channels.c1.type =memory a3.channels.c1.capacity = 10000 a3.channels.c1.transactionCapacity = 100 #sinks a3.sinks.k1.type = logger #bind a3.sources.r1.channels = c1 a3.sinks.k1.channel = c1 flume2.conf #name a2.sources = r1 a2.channels = c1 a2.sinks = k1 #sources a2.sources.r1.type = netcat a2.sources.r1.bind = hadoop103 a2.sources.r1.port = 7777 #channels a2.channels.c1.type =memory a2.channels.c1.capacity = 10000 a2.channels.c1.transactionCapacity = 100 #sinks a2.sinks.k1.type = avro a2.sinks.k1.hostname = hadoop104 a2.sinks.k1.port = 8888 #bind a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1 flume1.conf #name a1.sources = r1 a1.channels = c1 a1.sinks = k1 #sources a1.sources.r1.type = TAILDIR a1.sources.r1.filegroups = f1 f2 a1.sources.r1.filegroups.f1 = /opt/module/flume-1.9.0/jobs/taildir/.*\.txt a1.sources.r1.filegroups.f2 = /opt/module/flume-1.9.0/jobs/taildir/.*\.log a1.sources.r1.positionFile = /opt/module/flume-1.9.0/jobs/position/position.json #channels a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 #sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop104 a1.sinks.k1.port = 8888 #bind a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 启动3: flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/aggregated/flume3.conf -n a3 -Dflume.root.logger=INFO,console 启动2: flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/aggregated/flume2.conf -n a2 -Dflume.root.logger=INFO,console 启动1: flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/aggregated/flume1.conf -n a1 -Dflume.root.logger=INFO,console
案例需求:多路
hadoop101 的flume1 监控 8888端口的数据 并将已字母开头的发给Hadoop104,数字开头的发送给Hadoop103
flume2,flume3打印到控制台
flume sources channels sinks
分析: flume1 netcat memory avro
flume2 avro memory logger
flume3 avro memory logger
flume3.conf #name a3.sources = r1 a3.channels = c1 a3.sinks = k1 #sources a3.sources.r1.type = avro a3.sources.r1.bind = hadoop104 a3.sources.r1.port = 8888 #channels a3.channels.c1.type =memory a3.channels.c1.capacity = 10000 a3.channels.c1.transactionCapacity = 100 #sinks a3.sinks.k1.type = logger #bind a3.sources.r1.channels = c1 a3.sinks.k1.channel = c1 flume2.conf #name a2.sources = r1 a2.channels = c1 a2.sinks = k1 #sources a2.sources.r1.type = avro a2.sources.r1.bind = hadoop103 a2.sources.r1.port = 7777 #channels a2.channels.c1.type =memory a2.channels.c1.capacity = 10000 a2.channels.c1.transactionCapacity = 100 #sinks a2.sinks.k1.type = logger #bind a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1 flume1.conf #name a1.sources = r1 a1.channels = c1 c2 a1.sinks = k1 k2 #sources a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 6666 # Interceptor a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = myInterceptor.CustomInterceptor$Builder #channel selector a1.sources.r1.selector.type = multiplexing a1.sources.r1.selector.header = type a1.sources.r1.selector.mapping.letter = c1 a1.sources.r1.selector.mapping.number = c2 #channels a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 10000 a1.channels.c2.transactionCapacity = 100 #sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop104 a1.sinks.k1.port = 8888 a1.sinks.k2.type = avro a1.sinks.k2.hostname = hadoop103 a1.sinks.k2.port = 7777 #bind a1.sources.r1.channels = c1 c2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2 启动3: flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/multiplexing/flume3.conf -n a3 -Dflume.root.logger=INFO,console 启动2: flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/multiplexing/flume2.conf -n a2 -Dflume.root.logger=INFO,console 启动1: flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/multiplexing/flume1.conf -n a1 -Dflume.root.logger=INFO,console
#监听端口
要用到netcat
#name a1.sources = r1 a1.channels = c1 a1.sinks = k1 #sources a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 6666 #channels a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 100 #sink a1.sinks.k1.type = logger #bind a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 启动: flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/netcat-flume-logger.conf -n a1 -Dflume.root.logger=INFO,console