Java教程

flume入门

本文主要是介绍flume入门,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

flume入门

  • 1.安装与配置
  • 2.基础架构
  • 3.事务
  • 4.agent的内部原理
  • 5.基础案例
    • 1.实时监控单个文件,并上传到HDFS中 hdfs
    • 2.使用Flume监听整个目录的文件,并上传至HDFS
    • 3.使用Flume监听整个目录的实时追加文件,并上传至HDFS
    • 4.复制 replicating
    • 5.负载均衡 load_balance
    • 7.故障转移 failover
    • 8.聚合
    • 8.多路 multiplexing
    • 9.监听端口 netcat

1.安装与配置

下载地址:http://archive.apache.org/dist/flume/
安装:
(1)将apache-flume-1.9.0-bin.tar.gz上传到linux的/opt/software目录下
(2)解压apache-flume-1.9.0-bin.tar.gz到/opt/module/目录下

tar -zxf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/

(3)修改apache-flume-1.9.0-bin的名称为flume

mv /opt/module/apache-flume-1.9.0-bin /opt/module/flume

(4)将lib文件夹下的guava-11.0.2.jar删除以兼容Hadoop 3.1.3

rm /opt/module/flume/lib/guava-11.0.2.jar

配置环境变量:

vim /etc/profile.d/my_env.sh

将flume的环境变量导进去

export FLUME_HOME=/opt/module/flume-1.9.0
export PATH=$PATH:$FLUME_HOME/bin

测试:
输入flume-ng 若出现语法提示则成功
在这里插入图片描述

2.基础架构

3.事务

在这里插入图片描述

4.agent的内部原理

在这里插入图片描述

5.基础案例

1.实时监控单个文件,并上传到HDFS中 hdfs

A.案例需求:实时监控单个文件,并上传到HDFS中 hdfs sink 为常用sink

  1. 确定 sources sink channel 这三个组件要使用什么类型
    exec hdfs memory
  2. 到官方文档看需要填什么参数
#name
  a1.sources = s1
  a1.sinks = k1
  a1.channels = c1

  #sources 定义
  a1.sources.s1.type = exec
  a1.sources.s1.command = tail -f /opt/module/flume-1.9.0/jobs/tail.txt

  #channel 定义
  a1.channels.c1.type = memory
  a1.channels.c1.capacity = 10000
  a1.channels.c1.transactionCapacity = 100


  #sinks 定义
  a1.sinks.k1.type = hdfs
  a1.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume/%Y%m%d/%H
  #上传文件的前缀
  a1.sinks.k1.hdfs.filePrefix = logs-
  #是否按照时间滚动文件夹
  a1.sinks.k1.hdfs.round = true
  #多少时间单位创建一个新的文件夹
  a1.sinks.k1.hdfs.roundValue = 1
  #重新定义时间单位
  a1.sinks.k1.hdfs.roundUnit = hour
  #是否使用本地时间戳
  a1.sinks.k1.hdfs.useLocalTimeStamp = true
  #积攒多少个Event才flush到HDFS一次
  a1.sinks.k1.hdfs.batchSize = 100
  #设置文件类型,可支持压缩
  a1.sinks.k1.hdfs.fileType = DataStream
  #多久生成一个新的文件
  a1.sinks.k1.hdfs.rollInterval = 60
  #设置每个文件的滚动大小
  a1.sinks.k1.hdfs.rollSize = 134217700
  #文件的滚动与Event数量无关
  a1.sinks.k1.hdfs.rollCount = 0

  #bind
  a1.sources.r1.channels = c1
  a1.sinks.k1.channel = c1

2.使用Flume监听整个目录的文件,并上传至HDFS

B.案例需求:使用Flume监听整个目录的文件,并上传至HDFS (自有当有新文件时才会上传)

#name
  a1.sources = r1
  a1.channels = c1
  a1.sink1 = k1
  
  #source
  a1.sources.r1.type = spooldir
  a1.sources.r1.spoolDir = /opt/module/flume-1.9.0/jobs/spooldir
  a1.sources.r1.fileSuffix = .COMPLETED
  a1.sources.r1.fileHeader = true

  #channels
  a1.channels.c1.type = memory
  a1.channels.c1.capacity = 10000
  a1.channels.c1.transactionCapacity = 100

  #sinks
  a1.sinks.k1.type = hdfs
  a1.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume/%Y%m%d/%H
  #上传文件的前缀
  a1.sinks.k1.hdfs.filePrefix = logs-
  #是否按照时间滚动文件夹
  a1.sinks.k1.hdfs.round = true
  #多少时间单位创建一个新的文件夹
  a1.sinks.k1.hdfs.roundValue = 1
  #重新定义时间单位
  a1.sinks.k1.hdfs.roundUnit = hour
  #是否使用本地时间戳
  a1.sinks.k1.hdfs.useLocalTimeStamp = true
  #积攒多少个Event才flush到HDFS一次
  a1.sinks.k1.hdfs.batchSize = 100
  #设置文件类型,可支持压缩
  a1.sinks.k1.hdfs.fileType = DataStream
  #多久生成一个新的文件
  a1.sinks.k1.hdfs.rollInterval = 60
  #设置每个文件的滚动大小
  a1.sinks.k1.hdfs.rollSize = 134217700
  #文件的滚动与Event数量无关
  a1.sinks.k1.hdfs.rollCount = 0

  #bind
  a1.sources.r1.channels = c1
  a1.sinks.k1.channel = c1

  启动: flume-ng agent -c  $FLUME_HOME/conf  -f  $FLUME_HOME/jobs/spooling-flume-hdfs.conf -n a1 -Dflume.root.logger=INFO,console

3.使用Flume监听整个目录的实时追加文件,并上传至HDFS

C.使用Flume监听整个目录的实时追加文件,并上传至HDFS (目录需要分组,只有是组内的文件追加内容时才会上传)(分组一般用正则表达式识别)

#name
    a1.sources = r1
    a1.channels = c1
    a1.sinks = k1

    #source
    a1.sources.r1.type = TAILDIR
    a1.sources.r1.filegroups = f1 f2
    a1.sources.r1.filegroups.f1 = /opt/module/flume-1.9.0/jobs/taildir/.*\.txt
    a1.sources.r1.filegroups.f2 = /opt/module/flume-1.9.0/jobs/taildir/.*\.log  
    a1.sources.r1.positionFile = /opt/module/flume-1.9.0/jobs/position/position.json

    #channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    a1.channels.c1.transactionCapacity = 100

    #sink
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume/%Y%m%d/%H
    #上传文件的前缀
    a1.sinks.k1.hdfs.filePrefix = logs-
    #是否按照时间滚动文件夹
    a1.sinks.k1.hdfs.round = true
    #多少时间单位创建一个新的文件夹
    a1.sinks.k1.hdfs.roundValue = 1
    #重新定义时间单位
    a1.sinks.k1.hdfs.roundUnit = hour
    #是否使用本地时间戳
    a1.sinks.k1.hdfs.useLocalTimeStamp = true
    #积攒多少个Event才flush到HDFS一次
    a1.sinks.k1.hdfs.batchSize = 100
    #设置文件类型,可支持压缩
    a1.sinks.k1.hdfs.fileType = DataStream
    #多久生成一个新的文件
    a1.sinks.k1.hdfs.rollInterval = 60
    #设置每个文件的滚动大小
    a1.sinks.k1.hdfs.rollSize = 134217700
    #文件的滚动与Event数量无关
    a1.sinks.k1.hdfs.rollCount = 0

    #bind
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

4.复制 replicating

D.案例需求: 复制
使用Flume-1监控文件变动,Flume-1将变动内容传递给Flume-2,同时Flume-1将变动内容传递给Flume-3,Flume-3负责输出到Local FileSystem,flume2传递到hdfs。
sources channels sinks
分析: flume1 taildir memory avro
flume2 avro memory hdfs
flume3 avro memory file_roll

flume3.conf
  #name
  a3.sources = r1
  a3.channels = c1
  a3.sinks = k1

  #sources
  a3.sources.r1.type = avro
  a3.sources.r1.bind = localhost
  a3.sources.r1.port = 8888

  #channels
  a3.channels.c1.type =memory
  a3.channels.c1.capacity = 10000
  a3.channels.c1.transactionCapacity = 100

  #sinks
  a3.sinks.k1.type = file_roll
  a3.sinks.k1.sink.directory = /opt/module/flume-1.9.0/jobs/fileroll

  #bind
  a3.sources.r1.channels = c1
  a3.sinks.k1.channel = c1

  flume2.conf
  #name
  a2.sources = r1
  a2.channels = c1
  a2.sinks = k1

  a2.sources.r1.type = avro
  a2.sources.r1.bind = localhost
  a2.sources.r1.port = 7777

  #channels
  a2.channels.c1.type =memory
  a2.channels.c1.capacity = 10000
  a2.channels.c1.transactionCapacity = 100

  #sinks
  a2.sinks.k1.type = hdfs
  a2.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume/%Y%m%d/%H
  a2.sinks.k1.hdfs.filePrefix = logs-
  a2.sinks.k1.hdfs.round = true
  a2.sinks.k1.hdfs.roundValue = 1
  a2.sinks.k1.hdfs.roundUnit = hour
  a2.sinks.k1.hdfs.useLocalTimeStamp = true
  a2.sinks.k1.hdfs.batchSize = 100
  a2.sinks.k1.hdfs.fileType = DataStream
  a2.sinks.k1.hdfs.rollInterval = 60
  a2.sinks.k1.hdfs.rollSize = 134217700
  a2.sinks.k1.hdfs.rollCount = 0

  #bind
  a2.sources.r1.channels = c1 
  a2.sinks.k1.channel = c1 

  flume1
  #name
  a1.sources = r1
  a1.channels = c1 c2
  a1.sinks = k1 k2

  #sources
  a1.sources.r1.type = TAILDIR
  a1.sources.r1.filegroups = f1 f2
  a1.sources.r1.filegroups.f1 = /opt/module/flume-1.9.0/jobs/taildir/.*\.txt
  a1.sources.r1.filegroups.f2 = /opt/module/flume-1.9.0/jobs/taildir/.*\.log  
  a1.sources.r1.positionFile = /opt/module/flume-1.9.0/jobs/position/position.json

  #ChannelSelector
  a1.sources.r1.selector.type = replicating

  #channels
  a1.channels.c1.type = memory
  a1.channels.c1.capacity = 10000
  a1.channels.c1.transactionCapacity = 100

  a1.channels.c2.type = memory
  a1.channels.c2.capacity = 10000
  a1.channels.c2.transactionCapacity = 100

  #sinks
  a1.sinks.k1.type = avro
  a1.sinks.k1.hostname = localhost
  a1.sinks.k1.port = 7777

  a1.sinks.k2.type = avro
  a1.sinks.k2.hostname = localhost
  a1.sinks.k2.port = 8888

  #bind
  a1.sources.r1.channels = c1 c2
  a1.sinks.k1.channel = c1
  a1.sinks.k2.channel = c2

  启动3:flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/replicating/flume3.conf -n a3 -Dflume.root.logger=INFO,console 

  启动2:flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/replicating/flume2.conf -n a2 -Dflume.root.logger=INFO,console

  启动1:flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/replicating/flume1.conf -n a1 -Dflume.root.logger=INFO,console 
 

5.负载均衡 load_balance

案例需求:负载均衡案例
使用Flume-1监控文件变动,Flume-1将变动内容(轮询或者随机传递给)传递给Flume-2,flume-3 Flume-3负责输出到Local FileSystem,flume2传递到hdfs。
sources channels sinks
分析: flume1 taildir memroy avro
flume2 avro memroy hdfs
flume3 avro memroy file_roll

flume3.conf
  #name
  a3.sources = r1
  a3.channels = c1
  a3.sinks = k1

  #sources
  a3.sources.r1.type = avro
  a3.sources.r1.bind = localhost
  a3.sources.r1.port = 8888

  #channels
  a3.channels.c1.type =memory
  a3.channels.c1.capacity = 10000
  a3.channels.c1.transactionCapacity = 100

  #sinks
  a3.sinks.k1.type = file_roll
  a3.sinks.k1.sink.directory = /opt/module/flume-1.9.0/jobs/fileroll

  #bind
  a3.sources.r1.channels = c1
  a3.sinks.k1.channel = c1

  flume2.conf
  #name
  a2.sources = r1
  a2.channels = c1
  a2.sinks = k1

  a2.sources.r1.type = avro
  a2.sources.r1.bind = localhost
  a2.sources.r1.port = 7777

  #channels
  a2.channels.c1.type =memory
  a2.channels.c1.capacity = 10000
  a2.channels.c1.transactionCapacity = 100

  #sinks
  a2.sinks.k1.type = hdfs
  a2.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume/%Y%m%d/%H
  a2.sinks.k1.hdfs.filePrefix = logs-
  a2.sinks.k1.hdfs.round = true
  a2.sinks.k1.hdfs.roundValue = 1
  a2.sinks.k1.hdfs.roundUnit = hour
  a2.sinks.k1.hdfs.useLocalTimeStamp = true
  a2.sinks.k1.hdfs.batchSize = 100
  a2.sinks.k1.hdfs.fileType = DataStream
  a2.sinks.k1.hdfs.rollInterval = 60
  a2.sinks.k1.hdfs.rollSize = 134217700
  a2.sinks.k1.hdfs.rollCount = 0

  #bind
  a2.sources.r1.channels = c1 
  a2.sinks.k1.channel = c1 

  flume1.conf
  #name
  a1.sources = r1
  a1.channels = c1
  a1.sinks = k1 k2

  #sources
  a1.sources.r1.type = TAILDIR
  a1.sources.r1.filegroups = f1 f2
  a1.sources.r1.filegroups.f1 = /opt/module/flume-1.9.0/jobs/taildir/.*\.txt
  a1.sources.r1.filegroups.f2 = /opt/module/flume-1.9.0/jobs/taildir/.*\.log  
  a1.sources.r1.positionFile = /opt/module/flume-1.9.0/jobs/position/position.json

  #channels
  a1.channels.c1.type = memory
  a1.channels.c1.capacity = 10000
  a1.channels.c1.transactionCapacity = 100 

  #sink
  a1.sinks.k1.type = avro
  a1.sinks.k1.hostname = localhost
  a1.sinks.k1.port = 7777

  a1.sinks.k2.type = avro
  a1.sinks.k2.hostname = localhost
  a1.sinks.k2.port = 8888

  #Sink Processor
  a1.sinkgroups = g1
  a1.sinkgroups.g1.sinks = k1 k2
  a1.sinkgroups.g1.processor.type = load_balance
  a1.sinkgroups.g1.processor.selector = random

  #bind
  a1.sources.r1.channels = c1
  a1.sinks.k1.channel = c1
  a1.sinks.k2.channel = c1

  启动3:
  flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/loadbalancing/flume3.conf -n a3 -Dflume.root.logger=INFO,console
  启动2:
  flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/loadbalancing/flume2.conf -n a2 -Dflume.root.logger=INFO,console
  启动1:
  flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/loadbalancing/flume1.conf -n a1 -Dflume.root.logger=INFO,console

7.故障转移 failover

案例需求:故障转移
使用Flume-1监控端口,Flume-1将端口数据传递给Flume-2(为active状态),当flume2出现故障时,将数据传递给flume3
flume sources channels sinks
分析: flume1 netcat memory avro
flume2 avro memory logger
flume3 avro memory logger
flume3.conf

#name
  a3.sources = r1
  a3.channels = c1
  a3.sinks = k1

  #sources
  a3.sources.r1.type = avro
  a3.sources.r1.bind = localhost
  a3.sources.r1.port = 8888

  #channels
  a3.channels.c1.type =memory
  a3.channels.c1.capacity = 10000
  a3.channels.c1.transactionCapacity = 100

  #sinks
  a3.sinks.k1.type = logger
  

  #bind
  a3.sources.r1.channels = c1
  a3.sinks.k1.channel = c1
  
  flume2.conf
  #name
  a2.sources = r1
  a2.channels = c1
  a2.sinks = k1

  a2.sources.r1.type = avro
  a2.sources.r1.bind = localhost
  a2.sources.r1.port = 7777

  #channels
  a2.channels.c1.type =memory
  a2.channels.c1.capacity = 10000
  a2.channels.c1.transactionCapacity = 100

  #sinks
  a2.sinks.k1.type = logger
  

  #bind
  a2.sources.r1.channels = c1 
  a2.sinks.k1.channel = c1 

  flume1.conf
  #name
  a1.sources = r1
  a1.channels = c1
  a1.sinks = k1 k2

  #sources
  a1.sources.r1.type = netcat
  a1.sources.r1.bind = localhost
  a1.sources.r1.port = 5555

  #channels
  a1.channels.c1.type = memory
  a1.channels.c1.capacity = 10000
  a1.channels.c1.transactionCapacity = 100 

  #sink
  a1.sinks.k1.type = avro
  a1.sinks.k1.hostname = localhost
  a1.sinks.k1.port = 7777

  a1.sinks.k2.type = avro
  a1.sinks.k2.hostname = localhost
  a1.sinks.k2.port = 8888

  #Sink Processor
  a1.sinkgroups = g1
  a1.sinkgroups.g1.sinks = k1 k2
  a1.sinkgroups.g1.processor.type = failover
  a1.sinkgroups.g1.processor.priority.k1 = 5
  a1.sinkgroups.g1.processor.priority.k2 = 10

  #bind
  a1.sources.r1.channels = c1
  a1.sinks.k1.channel = c1
  a1.sinks.k2.channel = c1

  启动3:
  flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/failover/flume3.conf -n a3 -Dflume.root.logger=INFO,console
  启动2:
  flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/failover/flume2.conf -n a2 -Dflume.root.logger=INFO,console
  启动1:
  flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/failover/flume1.conf -n a1 -Dflume.root.logger=INFO,console

8.聚合

案例需求:聚合
hadoop102上的Flume-1监控文件/opt/module/flume-1.9.0/jobs/taildir,
hadoop103上的Flume-2监控某一个端口的数据流,
Flume-1与Flume-2将数据发送给hadoop104上的Flume-3,Flume-3将最终数据打印到控制台。
flume sources channels sinks
分析: flume1 taildir memory avro
flume2 netcat memory avro
flume3 avro memory logger

flume3.conf        
  #name
  a3.sources = r1
  a3.channels = c1
  a3.sinks = k1

  #sources
  a3.sources.r1.type = avro
  a3.sources.r1.bind = hadoop104
  a3.sources.r1.port = 8888

  #channels
  a3.channels.c1.type =memory
  a3.channels.c1.capacity = 10000
  a3.channels.c1.transactionCapacity = 100

  #sinks
  a3.sinks.k1.type = logger

  #bind
  a3.sources.r1.channels = c1
  a3.sinks.k1.channel = c1

  flume2.conf
  #name
  a2.sources = r1
  a2.channels = c1
  a2.sinks = k1

  #sources
  a2.sources.r1.type = netcat
  a2.sources.r1.bind = hadoop103
  a2.sources.r1.port = 7777

  #channels
  a2.channels.c1.type =memory
  a2.channels.c1.capacity = 10000
  a2.channels.c1.transactionCapacity = 100

  #sinks
  a2.sinks.k1.type = avro
  a2.sinks.k1.hostname = hadoop104
  a2.sinks.k1.port = 8888

  #bind
  a2.sources.r1.channels = c1
  a2.sinks.k1.channel = c1

  flume1.conf
  #name
  a1.sources = r1
  a1.channels = c1
  a1.sinks = k1 

  #sources
  a1.sources.r1.type = TAILDIR
  a1.sources.r1.filegroups = f1 f2
  a1.sources.r1.filegroups.f1 = /opt/module/flume-1.9.0/jobs/taildir/.*\.txt
  a1.sources.r1.filegroups.f2 = /opt/module/flume-1.9.0/jobs/taildir/.*\.log  
  a1.sources.r1.positionFile = /opt/module/flume-1.9.0/jobs/position/position.json

  #channels
  a1.channels.c1.type = memory
  a1.channels.c1.capacity = 10000
  a1.channels.c1.transactionCapacity = 100 

  #sink
  a1.sinks.k1.type = avro
  a1.sinks.k1.hostname = hadoop104
  a1.sinks.k1.port = 8888

  #bind
  a1.sources.r1.channels = c1
  a1.sinks.k1.channel = c1
  
  启动3:
  flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/aggregated/flume3.conf -n a3 -Dflume.root.logger=INFO,console
  启动2:
  flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/aggregated/flume2.conf -n a2 -Dflume.root.logger=INFO,console
  启动1:
  flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/aggregated/flume1.conf -n a1 -Dflume.root.logger=INFO,console

8.多路 multiplexing

案例需求:多路
hadoop101 的flume1 监控 8888端口的数据 并将已字母开头的发给Hadoop104,数字开头的发送给Hadoop103
flume2,flume3打印到控制台
flume sources channels sinks
分析: flume1 netcat memory avro
flume2 avro memory logger
flume3 avro memory logger

flume3.conf        
  #name
  a3.sources = r1
  a3.channels = c1
  a3.sinks = k1

  #sources
  a3.sources.r1.type = avro
  a3.sources.r1.bind = hadoop104
  a3.sources.r1.port = 8888

  #channels
  a3.channels.c1.type =memory
  a3.channels.c1.capacity = 10000
  a3.channels.c1.transactionCapacity = 100

  #sinks
  a3.sinks.k1.type = logger

  #bind
  a3.sources.r1.channels = c1
  a3.sinks.k1.channel = c1

  flume2.conf
  #name
  a2.sources = r1
  a2.channels = c1
  a2.sinks = k1

  #sources
  a2.sources.r1.type = avro
  a2.sources.r1.bind = hadoop103
  a2.sources.r1.port = 7777

  #channels
  a2.channels.c1.type =memory
  a2.channels.c1.capacity = 10000
  a2.channels.c1.transactionCapacity = 100

  #sinks
  a2.sinks.k1.type = logger

  #bind
  a2.sources.r1.channels = c1
  a2.sinks.k1.channel = c1

  flume1.conf
  #name
  a1.sources = r1
  a1.channels = c1 c2
  a1.sinks = k1 k2

  #sources
  a1.sources.r1.type = netcat
  a1.sources.r1.bind = localhost
  a1.sources.r1.port = 6666

  # Interceptor
  a1.sources.r1.interceptors = i1
  a1.sources.r1.interceptors.i1.type = myInterceptor.CustomInterceptor$Builder

  #channel selector
  a1.sources.r1.selector.type = multiplexing
  a1.sources.r1.selector.header = type
  a1.sources.r1.selector.mapping.letter = c1
  a1.sources.r1.selector.mapping.number = c2

  #channels
  a1.channels.c1.type = memory
  a1.channels.c1.capacity = 10000
  a1.channels.c1.transactionCapacity = 100 

  a1.channels.c2.type = memory
  a1.channels.c2.capacity = 10000
  a1.channels.c2.transactionCapacity = 100

  #sink
  a1.sinks.k1.type = avro
  a1.sinks.k1.hostname = hadoop104
  a1.sinks.k1.port = 8888

  a1.sinks.k2.type = avro
  a1.sinks.k2.hostname = hadoop103
  a1.sinks.k2.port = 7777

  #bind
  a1.sources.r1.channels = c1 c2
  a1.sinks.k1.channel = c1
  a1.sinks.k2.channel = c2      

  启动3:
  flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/multiplexing/flume3.conf -n a3 -Dflume.root.logger=INFO,console
  启动2:
  flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/multiplexing/flume2.conf -n a2 -Dflume.root.logger=INFO,console
  启动1:
  flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/multiplexing/flume1.conf -n a1 -Dflume.root.logger=INFO,console

9.监听端口 netcat

#监听端口
要用到netcat

#name
  a1.sources = r1
  a1.channels = c1
  a1.sinks = k1 

  #sources
  a1.sources.r1.type = netcat
  a1.sources.r1.bind = localhost
  a1.sources.r1.port = 6666

  #channels
  a1.channels.c1.type = memory
  a1.channels.c1.capacity = 10000
  a1.channels.c1.transactionCapacity = 100 

  #sink
  a1.sinks.k1.type = logger
  
  #bind
  a1.sources.r1.channels = c1
  a1.sinks.k1.channel = c1
  
  启动:
  flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/netcat-flume-logger.conf -n a1 -Dflume.root.logger=INFO,console
这篇关于flume入门的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!