写在前面: 博主是一名大数据的初学者,昵称来源于《爱丽丝梦游仙境》中的Alice和自己的昵称。作为一名互联网小白,
写博客一方面是为了记录自己的学习历程,一方面是希望能够帮助到很多和自己一样处于起步阶段的萌新
。由于水平有限,博客中难免会有一些错误,有纰漏之处恳请各位大佬不吝赐教!个人小站:http://alices.ibilibili.xyz/ , 博客主页:https://alice.blog.csdn.net/
尽管当前水平可能不及各位大佬,但我还是希望自己能够做得更好,因为一天的生活就是一生的缩影
。我希望在最美的年华,做最好的自己
!
在差不多一年前,菌刚接触Flume那会,写了一篇关于Flume的博客。今天无意间翻到,才发现当时介绍的内容是多么的浅显,于是菌打算再为大家介绍如何在Flume中实现过滤器的操作。
码字不易,先赞后看!
A、B两台日志服务机器实时生产日志主要类型为 access.log
、nginx.log
、web.log
现在要求:
把A、B 机器中的access.log
、nginx.log
、web.log
采集汇总到C机器上然后统一收集到hdfs中。
但是在hdfs中要求的目录为:
/source/logs/access/20180101/**
/source/logs/nginx/20180101/**
/source/logs/web/20180101/**
服务器A对应的IP为 192.168.100.100
服务器B对应的IP为 192.168.100.110
服务器C对应的IP为 192.168.100.120
node01与node02服务器开发flume的配置文件
[root@node01 ~]# cd /export/servers/apache-flume-1.8.0-bin/conf [root@node01 conf]# vim exec_source_avro_sink.conf
a1.sources = r1 r2 r3 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.command = tail -F /export/taillogs/access.log a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = static ## static拦截器的功能就是往采集到的数据的header中插入自己定义的key-value对 a1.sources.r1.interceptors.i1.key = type a1.sources.r1.interceptors.i1.value = access a1.sources.r2.type = exec a1.sources.r2.command = tail -F /export/taillogs/nginx.log a1.sources.r2.interceptors = i2 a1.sources.r2.interceptors.i2.type = static a1.sources.r2.interceptors.i2.key = type a1.sources.r2.interceptors.i2.value = nginx a1.sources.r3.type = exec a1.sources.r3.command = tail -F /export/taillogs/web.log a1.sources.r3.interceptors = i3 a1.sources.r3.interceptors.i3.type = static a1.sources.r3.interceptors.i3.key = type a1.sources.r3.interceptors.i3.value = web # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 20000 a1.channels.c1.transactionCapacity = 10000 # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = node03 a1.sinks.k1.port = 41414 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sources.r2.channels = c1 a1.sources.r3.channels = c1 a1.sinks.k1.channel = c1
注:
定义拦截器的类型
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
需要注意的是:static拦截器的功能就是往采集到的数据的header中插入自己定义的key-value键值对
例如:
a1.sources.r1.interceptors.i1.key = type
a1.sources.r1.interceptors.i1.value = access
在node03上面开发flume配置文件
[root@node03 ~]# cd /export/servers/apache-flume-1.8.0-bin/conf [root@node03 conf]# vim avro_source_hdfs_sink.conf
a1.sources = r1 a1.sinks = k1 a1.channels = c1 #定义source a1.sources.r1.type = avro a1.sources.r1.bind = 192.168.100.120 a1.sources.r1.port =41414 #添加时间拦截器 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder #定义channels a1.channels.c1.type = memory a1.channels.c1.capacity = 20000 a1.channels.c1.transactionCapacity = 10000 #定义sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path=hdfs://192.168.100.100:8020/source/logs/%{type}/%Y%m%d #组装source、channel、sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
为了方便观察采集的结果,我们分别在node01和node02上开发shell脚本,模拟数据生成。
[root@node01 servers]# cd /export/shells/ [root@node01 shells]# vim server.sh
#!/bin/bash while true do date >> /export/taillogs/access.log; date >> /export/taillogs/web.log; date >> /export/taillogs/nginx.log; sleep 0.5; done
node03启动flume实现数据收集
[root@node03 conf]# cd /export/servers/apache-flume-1.8.0-bin [root@node03 apache-flume-1.8.0-bin]# bin/flume-ng agent -c conf -f conf/avro_source_hdfs_sink.conf -name a1 -Dflume.root.logger=DEBUG,console
node01与node02启动flume实现数据监控
[root@node01 shells]# cd /export/servers/apache-flume-1.8.0-bin [root@node01 apache-flume-1.8.0-bin]# bin/flume-ng agent -c conf -f conf/exec_source_avro_sink.conf -name a1 -Dflume.root.logger=DEBUG,console
node01 与 node02 启动生成文件脚本
cd /export/shells sh server.sh
查看hdfs指定的输出路径下的目录构成
查看某个目录下的文件构成
可以发现随着我们shell脚本的启动,数据被不断的追加到指定的监控文件中,node01和node02在检测到变化之后,将变化的内容在node03进行汇总,然后node03根据定义的不同生产日志类型,对于进行“过滤”输出到HDFS的不同目录下。
本篇博客作者简单为大家介绍了一种Flume拦截器的使用示例,如果有看不太懂的小伙伴建议与这篇文章一起食用。当然,都看过的小伙伴也许也会感觉还是少了些内容,例如Flume的自定义监控,选择器,Sink组,还有Flume的web端监控——Ganglia等等…大家Duck不必担心,接下来的几篇博客,博主将详细介绍这些关于Flume的“干货”,敬请期待!!!
受益的朋友或对大数据技术感兴趣的伙伴记得点赞关注支持一波????
希望我们都能在学习的道路上越走越远????