<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.3.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-flume_2.11</artifactId> <version>2.3.1</version> </dependency> </dependencies>
[root@node1 data]# vi portToSpark.conf #指定agent的sources,sinks,channels a1.sources = s1 a1.sinks = k1 a1.channels = c1 #配置sources属性 a1.sources.s1.type = netcat a1.sources.s1.bind = node1 a1.sources.s1.port = 44444 #配置sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = node1 a1.sinks.k1.port = 8888 a1.sinks.k1.batch-size = 1 #配置channel类型 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 #整合flume进程中source channel sink a1.sources.s1.channels = c1 a1.sinks.k1.channel = c1
package SparkStreaming.flume import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.flume.FlumeUtils import org.apache.spark.streaming.{Seconds, StreamingContext} object ByFlumePush { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[3]").setAppName("hdfs") val ssc: StreamingContext = new StreamingContext(conf, Seconds(10)) val ds = FlumeUtils.createStream(ssc, "node1", 8888, StorageLevel.MEMORY_ONLY) ds.print() ssc.start() ssc.awaitTermination() } }
1. 启动flume flume-ng agent -n a1 -f portToSpark.conf -Dflume.root.logger=INFO 2. 运行主类,将java代码打包上传到node1上 spark-submit --class flume.Demo01 ssc.jar 3. 开启监听的端口号 [root@node1 ~]# telnet node1 44444
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.3.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-flume_2.11</artifactId> <version>2.3.1</version> </dependency> </dependencies>
[root@node1 data]# vi portToSpark2.conf #指定agent的sources,sinks,channels a1.sources = s1 a1.sinks = k1 a1.channels = c1 #配置sources属性 a1.sources.s1.type = netcat a1.sources.s1.bind = node1 a1.sources.s1.port = 44444 #配置sink a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink a1.sinks.k1.hostname = node1 a1.sinks.k1.port = 8888 a1.sinks.k1.batch-size = 1 #配置channel类型 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 #整合flume进程中source channel sink a1.sources.s1.channels = c1 a1.sinks.k1.channel = c1
package SparkStreaming.flume import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.flume.FlumeUtils import org.apache.spark.streaming.{Seconds, StreamingContext} object ByFlumePush { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[3]").setAppName("hdfs") val ssc: StreamingContext = new StreamingContext(conf, Seconds(10)) val ds = FlumeUtils.createPollingStream(ssc, "node1", 8888, StorageLevel.MEMORY_ONLY) ds.print() ssc.start() ssc.awaitTermination() } }
[root@node1 jars]# pwd /opt/app/spark-2.3.1/jars [root@node1 jars]# cp spark-streaming_2.11-2.3.1.jar /opt/app/flume-1.8.0/lib/ [root@node1 data]# pwd /opt/data [root@node1 data]# cp ssc.jar /opt/app/flume-1.8.0/lib/
(ssc.jar为别人打的包,保存在G://shixun//ssc.jar路径下了)
[root@node1 data]# flume-ng agent -n a1 -f portToSpark2.conf -Dflume.root.logger=INFO,console [root@node1 data]# spark-submit --class flume.ByFlumePush ssc2.jar [root@node1 data]# telnet node1 44444