Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。
工作模型:
高级场景:
图中有两个agent,Agent foo能将读取到的数据传输到不同的目的地,sink1将数据写入HDFS,sink2将数据写入java消息队列服务,sink3将数据写入另一个Agent
汇聚功能:
flume 配置文件:( https://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#configuration )放在flume的conf目录下,以 .conf 或 .conf.propertites 结尾
# example.conf: A single-node Flume configuration # a1 为agent的名称 # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
启动
$ bin/flume-ng agent --conf conf[配置文件目录] --conf-file example.conf[刚刚创建的配置文件] --name a1[agent名称,必须与配置文件中agent的名称一致] -Dflume.root.logger=INFO,console # 简写形式 $ bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template
# conf/file-to-hdfs.conf # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /var/log # channel a1.channels.c1.type = file a1.channels.c1.checkpointDir = /mnt/flume/checkpoint a1.channels.c1.dataDirs = /mnt/flume/data # Describe the sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs:///flume/events/%y-%m-%d/%H%M/%S a1.sinks.k1.hdfs.filePrefix = events- a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.writeFormat = Text # 注意:当使用 hdfs sink 时需要用到hadoop的jar包,解决方法是在这台机器解压一个hadoop并配置HADOOP_HOME a1.sinks.k1.hdfs.rollInterval = 30 # 30秒切分一下文件 a1.sinks.k1.hdfs.rollSize = 1024 # 文件达到1024字节切分文件 a1.sinks.k1.hdfs.rollCount = 10 # 每10次event切分一次文件 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
A和B两台机器的实时产生的日志数据汇总到机器C中,通过机器C将数据统一上传到HDFS的指定目录中。HDFS中的目录是按天生成的,每天一个目录。
bigdata02和bigdata03需要采集日志文件,所以使用File Source;日志文件可以接受丢失,所以使用 Memory Channel;为了加快传输,使用Avro Sink进行网络传输,Avro是一种序列化系统,使用它传输数据效率更高。
bigdata 2和3 的配置文件
# 配置 agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置 source a1.sources.r1.type = exec a1.sources.r1.command = tail -F /var/log/web.log # 配置 channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 配置 sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = 192.168.56.155 a1.sinks.k1.port = 45454 # 绑定 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
bigdata04 配置
# 配置 agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置 source a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 45454 # 配置 channel a1.channels.c1.type = memory # 配置 sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs:///access/%Y%m%d a1.sinks.k1.hdfs.useLocalTimeStamp = true a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.writeFormat = Text a1.sinks.k1.hdfs.rollInterval = 3600 # 绑定 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
Event 是 Flume 传输数据的基本单位,也是事务的基本单位,在文本文件中, 通常一行记录就是一个 Event。Event 中包含 header 和 body
我们可以在 Source 中给每一条数据的 header 中增加 key-value , 在 Channel 和 Sink 中使用 header 中的值了。
Source 可以指定一个或者多个拦截器按先后顺序依次对采集到的数据进行处 理
例:根据source的值确定写入的目录名称
ExecSource -> Search and Replace Interceptor -> Regex Extraxtor Interceptor -> File Channel -> HDFS Sink
# agent 的名称是 a1 # 指定 source 组件、 channel 组件和 Sink 组件的名称 a1.sources = r1 a1.channels = c1 a1.sinks = k1 # 配置 source 组件 a1.sources.r1.type = exec a1.sources.r1.command = tail -F /data/log/moreType.log # 配置拦截器 [ 多个拦截器按照顺序依次执行 ] a1.sources.r1.interceptors = i1 i2 i3 i4 a1.sources.r1.interceptors.i1.type = search_replace a1.sources.r1.interceptors.i1.searchPattern = "type":"video_info" # 规则可以使用正则 a1.sources.r1.interceptors.i1.replaceString = "type":"videoInfo" a1.sources.r1.interceptors.i2.type = search_replace a1.sources.r1.interceptors.i2.searchPattern = "type":"user_info" a1.sources.r1.interceptors.i2.replaceString = "type":"userInfo" a1.sources.r1.interceptors.i3.type = search_replace a1.sources.r1.interceptors.i3.searchPattern = "type":"gift_record" a1.sources.r1.interceptors.i3.replaceString = "type":"giftRecord" a1.sources.r1.interceptors.i4.type = regex_extractor a1.sources.r1.interceptors.i4.regex = "type":"(\\w+)" a1.sources.r1.interceptors.i4.serializers = s1 # 用于生成 logType=>\\w+ 值为正则中的一组 a1.sources.r1.interceptors.i4.serializers.s1.name = logType # 配置 channel 组件 a1.channels.c1.type = file a1.channels.c1.checkpointDir = /data/soft/apache-flume-1.9.0-bin/data/moreTyp a1.channels.c1.dataDirs = /data/soft/apache-flume-1.9.0-bin/data/moreType/dat # 配置 sink 组件 a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://192.168.182.100:9000/moreType/%Y%m%d/%{logType a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.writeFormat = Text a1.sinks.k1.hdfs.rollInterval = 3600 a1.sinks.k1.hdfs.rollSize = 134217728 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.useLocalTimeStamp = true # 增加文件前缀和后缀 a1.sinks.k1.hdfs.filePrefix = data a1.sinks.k1.hdfs.fileSuffix = .log # 把组件连接起来 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
Source 发往多个 Channel 的策略设置, 如果 source 后面接了多个 channel , 到底是给所有的 channel 都发, 还是根据规则发送到不同 channel , 这些是由 Channel Selectors 来控制的
a1.sources = r1 a1.channels = c1 c2 c3 a1.sources.r1.selector.type = replicating a1.sources.r1.channels = c1 c2 c3 a1.sources.r1.selector.optional = c3 # c3 为可选,所以当写入数据失败时会被忽略 # 写入 c1 c2 失败时会导致事务性的失败,会重写
a1.sources = r1 a1.channels = c1 c2 c3 c4 a1.sources.r1.selector.type = multiplexing a1.sources.r1.selector.header = state a1.sources.r1.selector.mapping.CZ = c1 a1.sources.r1.selector.mapping.US = c2 c3 a1.sources.r1.selector.default = c4 # 如果 state 属性的值是 CZ , 则发送给 c1 # 如果 state 属性的值是 US , 则发送给 c2 c3 # 如果 state 属性的值是其它值, 则发送给 c4
将收集的文件传输到Logger和HDFS上
# agent 的名称是 a1 # 指定 source 组件、 channel 组件和 Sink 组件的名称 a1.sources = r1 a1.channels = c1 c2 a1.sinks = k1 k2 # 配置 source 组件 a1.sources.r1.type = netcat a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 44444 # 配置 channle 选择器 [ 默认就是 replicating , 所以可以省略 ] a1.sources.r1.selector.type = replicating # 配置 channel 组件 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 # 配置 sink 组件 a1.sinks.k1.type = logger a1.sinks.k2.type = hdfs a1.sinks.k2.hdfs.path = hdfs://192.168.182.100:9000/replicating a1.sinks.k2.hdfs.fileType = DataStream a1.sinks.k2.hdfs.writeFormat = Text a1.sinks.k2.hdfs.rollInterval = 3600 a1.sinks.k2.hdfs.rollSize = 134217728 a1.sinks.k2.hdfs.rollCount = 0 a1.sinks.k2.hdfs.useLocalTimeStamp = true a1.sinks.k2.hdfs.filePrefix = data a1.sinks.k2.hdfs.fileSuffix = .log # 把组件连接起来 a1.sources.r1.channels = c1 c2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2
# agent 的名称是 a1 # 指定 source 组件、 channel 组件和 Sink 组件的名称 a1.sources = r1 a1.channels = c1 c2 a1.sinks = k1 k2 # 配置 source 组件 a1.sources.r1.type = netcat a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 44444 # 配置 source 拦截器 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = regex_extractor a1.sources.r1.interceptors.i1.regex = "city":"(\\w+)" a1.sources.r1.interceptors.i1.serializers = s1 a1.sources.r1.interceptors.i1.serializers.s1.name = city # 配置 channle 选择器 a1.sources.r1.selector.type = multiplexing a1.sources.r1.selector.header = city a1.sources.r1.selector.mapping.bj = c1 a1.sources.r1.selector.default = c2 # 配置 channel 组件 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 # 配置 sink 组件 a1.sinks.k1.type = logger a1.sinks.k2.type = hdfs a1.sinks.k2.hdfs.path = hdfs://192.168.182.100:9000/multiplexing a1.sinks.k2.hdfs.fileType = DataStream a1.sinks.k2.hdfs.writeFormat = Text a1.sinks.k2.hdfs.rollInterval = 3600 a1.sinks.k2.hdfs.rollSize = 134217728 a1.sinks.k2.hdfs.useLocalTimeStamp = true a1.sinks.k2.hdfs.filePrefix = data a1.sinks.k2.hdfs.fileSuffix = .log # 把组件连接起来 a1.sources.r1.channels = c1 c2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2
Sink 发送数据的策略设置, 一个 channel 后面可以接多个 sink , channel 中的数据 是被哪个 sink 获取, 这个是由 Sink Processors 控制的
a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = load_balance a1.sinkgroups.g1.processor.backoff = true a1.sinkgroups.g1.processor.selector = random # 轮询:round_robin
a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = failover a1.sinkgroups.g1.processor.priority.k1 = 5 a1.sinkgroups.g1.processor.priority.k2 = 10 a1.sinkgroups.g1.processor.maxpenalty = 10000
bigdata04 配置文件:
# agent 的名称是 a1 # 指定 source 组件、 channel 组件和 Sink 组件的名称 a1.sources = r1 a1.channels = c1 a1.sinks = k1 k2 # 配置 source 组件 a1.sources.r1.type = netcat a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 44444 # 配置 channel 组件 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 配置 sink 组件 , [ 为了方便演示效果, 把 batch-size 设置为 1] a1.sinks.k1.type=avro a1.sinks.k1.hostname=192.168.182.101 a1.sinks.k1.port=41414 a1.sinks.k1.batch-size = 1 a1.sinks.k2.type=avro a1.sinks.k2.hostname=192.168.182.102 a1.sinks.k2.port=41414 a1.sinks.k2.batch-size = 1 # 配置 sink 策略 a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = load_balance a1.sinkgroups.g1.processor.backoff = true a1.sinkgroups.g1.processor.selector = round_robin # 把组件连接起来 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c1
bigdata02 bigdata03 配置文件
# 指定 source 组件、 channel 组件和 Sink 组件的名称 a1.sources = r1 a1.channels = c1 a1.sinks = k1 # 配置 source 组件 a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 41414 # 配置 channel 组件 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 配置 sink 组件 a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://192.168.182.100:9000/load_balance a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.writeFormat = Text a1.sinks.k1.hdfs.rollInterval = 3600 a1.sinks.k1.hdfs.rollSize = 134217728 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.useLocalTimeStamp = true a1.sinks.k1.hdfs.fileSuffix = .log # 把组件连接起来 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
bigdata04 配置文件
# agent 的名称是a1 # 指定 source 组件、 channel 组件和 Sink 组件的名称 a1.sources = r1 a1.channels = c1 a1.sinks = k1 k2 # 配置 source 组件 a1.sources.r1.type = netcat a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 44444 # 配置 channel 组件 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 配置 sink 组件 , [ 为了方便演示效果, 把 batch-size 设置为 1] a1.sinks.k1.type = avro a1.sinks.k1.hostname = 192.168.182.101 a1.sinks.k1.port = 41414 a1.sinks.k1.batch-size = 1 a1.sinks.k2.type = avro a1.sinks.k2.hostname = 192.168.182.102 a1.sinks.k2.port = 41414 a1.sinks.k2.batch-size = 1 # 配置 sink 策略 a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = failover a1.sinkgroups.g1.processor.priority.k1 = 5 a1.sinkgroups.g1.processor.priority.k2 = 10 # 把组件连接起来 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c1
bigdata02 bigdata03 配置文件
# agent 的名称是a1 # 指定 source 组件、 channel 组件和 Sink 组件的名称 a1.sources = r1 a1.channels = c1 a1.sinks = k1 # 配置 source 组件 a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 41414 # 配置 channel 组件 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 配置 sink 组件 [ 为了区分两个 sink 组件生成的文件, 修改 filePrefix 的值 ] a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://192.168.182.100:9000/failover a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.writeFormat = Text a1.sinks.k1.hdfs.rollInterval = 3600 a1.sinks.k1.hdfs.rollSize = 134217728 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.useLocalTimeStamp = true a1.sinks.k1.hdfs.filePrefix = data101 a1.sinks.k1.hdfs.fileSuffix = .log # 把组件连接起来 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
jps
和jstat -gcutil PID 1000
export JAVA_OPTS="-Xms1024m -Xmx1024m -Dcom.sun.management.jmxremote"
log4j.properties
下的flume.log.dir
和flume.log.file
Flume是一个单进程程序,存在单点故障,所以需要一个监控机制,当Flume宕机后需要重启。
解决方法:通过shell脚本监控Flume进程及重启
# 等号前是一个Agent的唯一标识,用于过滤对应的Flume进程,要保证每台机器上唯一。通过判断启动命令中是否存在这个标识而辨别 # 等号后时启动Flume的脚本 example=startExample.sh
启动Flume的脚本
#!/bin/bash flume_path=/data/soft/apache-flume-1.9.0-bin nohup ${flume_path}/bin/flume-ng agent --name a1 --conf ${flume_path}/conf/ -
#!/bin/bash monlist=`cat monlist.conf` echo "start check" for item in ${monlist} do # 设置字段分隔符 OLD_IFS=$IFS IFS="=" # 把一行内容转成多列 [ 数组 ] arr=($item) # 获取等号左边的内容 name=${arr[0]} # 获取等号右边的内容 script=${arr[1]} echo "time is:"`date +"%Y-%m-%d %H:%M:%S"` " check "$name if [ `jps -m|grep $name | wc -l` -eq 0 ] then # 发短信或者邮件告警 echo `date +"%Y-%m-%d %H:%M:%S"`$name "is none" sh -x ./${script} fi done
可放入crontab定时调度 * * * * * root /bin/bash /data/soft/monlist.sh