本文按照官方文档指引完成Flume的HelloWorld例子:
本例使用Netcat TCP
类型的source
获取网络数据,通过Logger
类型的sink
的打印数据,两者通过Memory Channel
绑定,如下图所示。
其中,Netcat TCP Source
、Logger Sink
、Memory Channel
的说明请参见官方描述:
NetCat TCP Source
A netcat-like source that listens on a given port and turns each line of text into an event. Acts like nc -k -l [host] [port]. In other words, it opens a specified port and listens for data. The expectation is that the supplied data is newline separated text. Each line of text is turned into a Flume event and sent via the connected channel.
Logger Sink
Logs event at INFO level. Typically useful for testing/debugging purpose. This sink is the only exception which doesn’t require the extra configuration explained in the Logging raw data section.
Memory Channel
The events are stored in an in-memory queue with configurable max size. It’s ideal for flows that need higher throughput and are prepared to lose the staged data in the event of a agent failures.
配置文件如下:
# helloworld.conf: A single-node Flume configuration # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
该配置文件中,分别定义了:
a1
的agent;r1
类型为netcat
的source,在本机44444端口监听;c1
类型为memory
的channel,该channel在内存中缓存event数据;k1
类型为loger
的sink,该sink将event数据打印到控制台;在单个配置文件中可以定义多个agent,在启动flume进程时,可通过传参指定要启动的agent。
启动flume之前,需先修改flume-env.sh 配置文件。在conf目录中,将flume-env.sh.template文件重命名,并新增JAVA_HOME参数:
# If this file is placed at FLUME_CONF_DIR/flume-env.sh, it will be sourced # during Flume startup. # Enviroment variables can be set here. # export JAVA_HOME=/usr/lib/jvm/java-8-oracle export JAVA_HOME=/usr/local/jdk1.8.0_202
进入flume目录,通过以下命令,启动配置文件中名为a1
的agent:
# bin/flume-ng agent --conf conf --conf-file conf/helloworld.conf --name a1 -Dflume.root.logger=INFO,console
其中:
--conf=<conf-dir>
指定flume-env.sh配置文件和log4j文件所在目录部分输出如下:
Info: Sourcing environment configuration script /usr/local/flume-1.9.0/conf/flume-env.sh Info: Including Hadoop libraries found via (/usr/local/hadoop-3.2.2/bin/hadoop) for HDFS access Info: Including Hive libraries found via () for Hive access --- 省略若干行 2022-01-19 13:48:47,256 (lifecycleSupervisor-1-5) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:166)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]
可以看到,source、channel、sink均创建并启动,同时socket server在本机44444端口开启监听。
使用telnet
向本机44444端口发送测试数据:
[root@bukp4 ~]# telnet localhost 44444 Trying ::1... telnet: connect to address ::1: Connection refused Trying 127.0.0.1... Connected to localhost. Escape character is '^]'. linchao OK bukp.net OK
观察控制台,可看到数据输出:
2022-01-19 14:08:51,744 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 6C 69 6E 63 68 61 6F 0D linchao. } 2022-01-19 14:09:38,724 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 62 75 6B 70 2E 6E 65 74 0D bukp.net. }
至此,HelloWorld级别的Flume示例完成。