同一时刻,kafka当中数据只能被一个消费者组下面的一个消费者所消费。
kafka消费者在消费数据的时候,都是分组别的。不同组的消费不受影响,相同组内的消费,需要注意,如果partition有3个,消费者有3个,那么便是每一个消费者消费其中一个partition对应的数据;如果有2个消费者,此时一个消费者消费其中一个partition数据,另一个消费者消费2个partition的数据。如果有超过3个的消费者,同一时间只能最多有3个消费者能消费得到数据,
如果将数据保存到多个分区当中,只能保证分区之内有序,全局无序;
要想全局有序,将所有数据发送到一个分区当中。
指定topic+value:数据采用轮询模式保存
指定topic+key+value:如果key是固定死的,利用key的hash将数据发送某一个分区当中;如果key是动态的,也是利用key的hash,将数据发送到指定分区当中。
指定topic+partition+key+value:如果指定了分区数,那么就会将所有数据发送到指定分区当中。
Produce端向broker端发送数据并保存,为了防止发送的数据丢失,有ack机制,ack机制一共分为3种:
0:producer端发送数据,不管leader是否保存成功,follower是否同步成功,继续发送下一批数据;
1:producer端发送数据,保证leader保存成功,不管follower是否同步成功,继续发送下一批数据;
-1:producer端发送数据,既要保证leader保存成功,也要保证follower同步成功,再发送下一批数据。
## kafka的服务器 bootstrap.servers=bd-offcn-01:9092,bd-offcn-02:9092,bd-offcn-03:9092 ##Key的序列化器 key.serializer=org.apache.kafka.common.serialization.IntegerSerializer ##value的序列化器 value.serializer=org.apache.kafka.common.serialization.StringSerializer acks=[0|-1|1|all] ##消息确认机制 0: 不做确认,直管发送消息即可 -1|all: 不仅leader需要将数据写入本地磁盘,并确认,还需要同步的等待其它followers进行确认 1:只需要leader进行消息确认即可,后期follower可以从leader进行同步 batch.size=1024 #每个分区内的用户缓存未发送record记录的空间大小 ## 如果缓存区中的数据,没有占满,也就是任然有未用的空间,那么也会将请求发送出去,为了较少请求次数,我们可以配置linger.ms大于0, linger.ms=10 ## 不管缓冲区是否被占满,延迟10ms发送request buffer.memory=10240 #控制的是一个producer中的所有的缓存空间 retries=0 #发送消息失败之后的重试次数
kafka消费的并行度就是kaka topic分区的个数,或者分区的个数决定了同一时间同一消费者组内最多可以有多少个消费者消费数据
offset:是kafka的topic中的partition中的每一条消息的标识,如何区分该条消息在kafka对应的partition的位置,就是用该偏移量。offset的数据类型是Long,8个字节长度。offset在分区内是有序的,分区间是不一定有序。
副本分配算法:
将所有N Broker和待分配的i个Partition排序。
将第i个Partition分配到第(i mod n)个Broker上。
将第i个Partition的第j个副本分配到第((i + j) mod n)个Broker上。
同时也要兼顾复杂均衡,尽量在所有的节点里面都保存相同多的分区及其副本。
在kafka中,一个主题可以有多个分区;一个分区可以有多个segment文件段,一个segment文件段有两个文件:.log、.index文件。
.log文件保存数据,.index文件保存数据当中的索引,是稀疏索引。
一个segment文件段默认保存1g数据,当中segment文件段达到1g的数据量,就要开始分裂出第二个segment文件段,以此类推。
第一个segment文件段:
-rw-r--r-- 1 root root 10485760 Oct 13 17:14 00000000000000000000.index
-rw-r--r-- 1 root root 654696 Oct 13 17:14 00000000000000000000.log
第二个segment文件段:
-rw-r--r-- 1 root root 10485760 Oct 13 17:14 00000000000000004356.index
-rw-r--r-- 1 root root 654696 Oct 13 17:14 00000000000000004356.log
第三个segment文件段:
-rw-r--r-- 1 root root 10485760 Oct 13 17:14 00000000000000752386.index
-rw-r--r-- 1 root root 654696 Oct 13 17:14 00000000000000752386.log
以当前segment文件段当中.log文件中第一条数据的偏移量命名。
push模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。push模式的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息。
pull模式不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直返回空数据。针对这一点,Kafka的消费者在消费数据时会传入一个时长参数timeout,如果当前没有数据可供消费,consumer会等待一段时间之后再返回,这段时长即为timeout。
依次在 hadoop102、hadoop103、hadoop104 节点上启动 kafka [atguigu@hadoop102 kafka]$ bin/kafka-server-start.sh config/server.properties & [atguigu@hadoop103 kafka]$ bin/kafka-server-start.sh config/server.properties & [atguigu@hadoop104 kafka]$ bin/kafka-server-start.sh config/server.properties &
[atguigu@hadoop102 kafka]$ bin/kafka-server-stop.sh stop [atguigu@hadoop103 kafka]$ bin/kafka-server-stop.sh stop [atguigu@hadoop104 kafka]$ bin/kafka-server-stop.sh stop
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --list
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 \ --create --replication-factor 3 --partitions 1 --topic first
选项说明: --topic 定义 topic 名 --replication-factor 定义副本数 --partitions 定义分区数
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 \ --delete --topic first
[atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh \ --broker-list hadoop102:9092 --topic first >hello world >atguigu atguigu
[atguigu@hadoop103 kafka]$ bin/kafka-console-consumer.sh \ --zookeeper hadoop102:2181 --from-beginning --topic first
--from-beginning:会把 first 主题中以往所有的数据都读取出来。根据业务场景选择是 否增加该配置。
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 \ --describe --topic first
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh \ --zookeeper hadoop102:2181 --topic first --consumer.config config/consumer.properties [atguigu@hadoop103 kafka]$ bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --topic first --consumer.config config/consumer.properties
package com.atguigu.kafka; import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; public class NewProducer { public static void main(String[] args) { Properties props = new Properties(); // Kafka 服务端的主机名和端口号 props.put("bootstrap.servers", "hadoop103:9092"); // 等待所有副本节点的应答 props.put("acks", "all"); // 消息发送最大尝试次数 props.put("retries", 0); // 一批消息处理大小 props.put("batch.size", 16384); // 请求延时 props.put("linger.ms", 1); // 发送缓存区内存大小 props.put("buffer.memory", 33554432); // key 序列化 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // value 序列化 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 50; i++) { producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), "hello world-" + i)); } producer.close(); } }
定义一个类实现 Partitioner 接口,重写里面的方法(过时 API)
package com.atguigu.kafka; import java.util.Map; import kafka.producer.Partitioner; public class CustomPartitioner implements Partitioner { public CustomPartitioner() { super(); } @Override public int partition(Object key, int numPartitions) { // 控制分区 return 0; } }
#1、地址 bootstrap.servers=node01:9092 #2、序列化 key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer #3、主题(topic) 需要制定具体的某个topic(order)即可。 #4、消费者组 group.id=test public class OrderConsumer { public static void main(String[] args) { // 1\连接集群 Properties props = new Properties(); props.put("bootstrap.servers", "node01:9092"); props.put("group.id", "test"); //以下两行代码 ---消费者自动提交offset值 props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props); kafkaConsumer.subscribe(Arrays.asList("test")); while (true) { ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { String value = consumerRecord.value(); int partition = consumerRecord.partition(); long offset = consumerRecord.offset(); String key = consumerRecord.key(); System.out.println("key:" + key + "value:" + value + "partition:" + partition + "offset:" + offset); } } } }
public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props); TopicPartition topicPartition = new TopicPartition("test", 0); TopicPartition topicPartition1 = new TopicPartition("test", 1); kafkaConsumer.assign(Arrays.asList(topicPartition, topicPartition1)); while (true) { ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { String value = consumerRecord.value(); int partition = consumerRecord.partition(); long offset = consumerRecord.offset(); String key = consumerRecord.key(); System.out.println("key:" + key + "value:" + value + "partition:" + partition + "offset:" + offset); } kafkaConsumer.commitSync(); } } }
flume主要是做日志数据(离线或实时)地采集。
配置flume.conf文件
#为我们的source channel sink起名 a1.sources = r1 a1.channels = c1 a1.sinks = k1 #指定我们的source收集到的数据发送到哪个管道 a1.sources.r1.channels = c1 #指定我们的source数据收集策略 a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /export/servers/flumedata a1.sources.r1.deletePolicy = never a1.sources.r1.fileSuffix = .COMPLETED a1.sources.r1.ignorePattern = ^(.)*\\.tmp$ a1.sources.r1.inputCharset = UTF-8 #指定我们的channel为memory,即表示所有的数据都装进memory当中 a1.channels.c1.type = memory #指定我们的sink为kafka sink,并指定我们的sink从哪个channel当中读取数据 a1.sinks.k1.channel = c1 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = test a1.sinks.k1.kafka.bootstrap.servers = node01:9092,node02:9092,node03:9092 a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1
测试
[offcn@bd-offcn-02 kafka]$ bin/kafka-console-consumer.sh \ --topic test \ --bootstrap-server node01:9092,node02:9092,node03:9092 \ --from-beginning [root@node01 flume]$ bin/flume-ng agent --conf conf --conf-file conf/flume_kafka.conf --name a1 -Dflume.root.logger=INFO,console
Spark官网组件说明
Spark通用运行简易流程
Spark 的驱动器是执行开发程序中的 main 方法的进程。它负责开发人员编写的用来创 建 SparkContext、创建 RDD,以及进行 RDD 的转化操作和行动操作代码的执行。
Spark Executor 是一个工作进程,负责在 Spark 作业中运行任务,任务间相互独立。 Spark 应用启动时,Executor 节点被同时启动,并且始终伴随着整个 Spark 应用的生命周 期而存在。
通过自身的块管理器(Block Manager)为用户程序中要求缓存的 RDD 提供内存式 存储。RDD 是直接缓存在 Executor 进程内的,因此任务可以在运行时充分利用缓存数据加 速运算。
RDD:叫做弹性分布式数据集
特点:不可变,可分区,里面的元素可以并行计算的集合。
不能携带数据,类似于java当中的接口,携带的是元数据。
窄依赖:父RDD的一个分区只能被子RDD的一个分区所依赖=》独生子女
宽依赖:父RDD的一个分区会被子RDD的多个分区所依赖=》超生子女
RDD算子一共分为两类,一类是transformation(转换算子),一类是action(执行算子)。
Transformation:转换算子,惰性计算,只做连接不运算,只有遇到action才会带动转换算子进行运算。
map(一对一)、flatMap(一对多)、filter(一对N(0、1))、join、leftouterJoin、rightouterJoin、fullouterJoin、sortBy、sortByKey、gorupBy、groupByKey、reduceBy、reduceByKey、sample、union、mappatition、mappatitionwithindex、zip、zipWithIndex。
mappatition:
//创建RDD并指定分区数 val rdd: RDD[Int] = sc.parallelize(Array(1,2,3,4),2) //通过-将分区之间的数据连接 val result: RDD[String] = rdd.mapPartitions(x=>Iterator(x.mkString("-"))) //打印输出 println(result.collect().toBuffer)
mapPartitionsWithIndex:
val rdd: RDD[Int] = sc.parallelize(1 to 16,4) //查看每个分区当中都保存了哪些数据 val result: RDD[String] = rdd.mapPartitionsWithIndex((index,item)=>Iterator(index+":"+item.mkString(","))) //打印输出 result.foreach(println)
sample、union、join算子:
sample算子: a.说明 sample(withReplacement, fraction, seed):随机抽样算子,sample主要工作就是为了来研究数据本身,去代替全量研究会出现类似数据倾斜(dataSkew)等问题,无法进行全量研究,只能用样本去评估整体。 withReplacement:Boolean :有放回的抽样和无放回的抽样 fraction:Double:样本空间占整体数据量的比例,大小在[0, 1],比如0.2, 0.65 seed:Long:是一个随机数的种子,有默认值,通常不需要传参 def sampleOps(sc: SparkContext): Unit = { val list = sc.parallelize(1 to 100000) val sampled1 = list.sample(true, 0.01) println("sampled1 count: " + sampled1.count()) val sampled2 = list.sample(false, 0.01) println("sampled2 count: " + sampled2.count()) } union算子: a.说明 rdd1.union(rdd2) 相当于sql中的union all,进行两个rdd数据间的联合,需要说明一点是,该union是一个窄依赖操作,rdd1如果有N个分区,rdd2有M个分区,那么union之后的分区个数就为N+M。 join算子: rdd1.join(rdd2) 相当于sql中的join连接操作 A(id) a, B(aid) b select * from A a join B b on a.id = b.aid 交叉连接: across join select * from A a across join B ====>这回产生笛卡尔积 内连接: inner join,提取左右两张表中的交集 select * from A a inner join B on a.id = b.aid 或者 select * from A a, B b where a.id = b.aid 外连接:outer join 左外连接 left outer join 返回左表所有,右表匹配返回,匹配不上返回null select * from A a left outer join B on a.id = b.aid //leftOutJoin操作 val result1: RDD[(Int, (String, Option[Int]))] = rdd1.leftOuterJoin(rdd2) 右外连接 right outer join 刚好是左外连接的相反 select * from A a left outer join B on a.id = b.aid //rightOuterJoin val result2: RDD[(Int, (Option[String], Int))] = rdd1.rightOuterJoin(rdd2) 全连接 full join 全外连接 full outer join = left outer join + right outer join //fullOuterJoin val result3: RDD[(Int, (Option[String], Option[Int]))] = rdd1.fullOuterJoin(rdd2) 前提:要先进行join,rdd的类型必须是K-V
coalesce算子、reparation(numPartitions):
coalesce(numPartition, shuffle=false): 分区合并的意思
numPartition:分区后的分区个数
shuffle:此次重分区是否开启shuffle,决定当前的操作是宽(true)依赖还是窄(false)依赖
原先有100个分区,合并成10分区,或者原先有2个分区,重分区之后变成了4个。
coalesce默认是一个窄依赖算子,如果压缩到1个分区的时候,就要开启shuffle=true,此时coalesce是一个宽依赖算子
如果增大分区,shuffle=false,不会改变分区的个数,可以通过将shuffle=true来进行增大分区
可以用repartition(numPartition)来进行代替= coalesce(numPartitions, shuffle = true)
Action:执行算子,带动转换算子运算并将结果进行输出。
count、collect(将task的计算结果拉回到Driver端)、foreach(不会回收所有task计算结果,原理:将用户传入的参数推送到各个节点上去执行,只能去计算节点找结果)、saveAsTextFile(path)、reduce、foreachPatition、take、first、takeordered(n)。
mapPartitions效率高。
例子:将数据保存到数据库,如果是map算子,每保存一个元素,就要连接数据库,保存完,要断开数据库,如果数据量过大,反复的连接断开数据库会给数据库造成很大的压力;反之,使用mapParititons,一次操作的是一个分区当中的数据,没保存一个分区当中的数据,只需要连接断开一次数据库,给数据库造成的压力比较小。
reduceByKey效率高,因为它前期进行了预聚合,减少了网络传输。
reduceByKey是转换算子
Reduce是执行算子。
持久化方式一共分为两种,一种是cache,一种是persist
Spark中最重要的功能之一是跨操作在内存中持久化(或缓存)数据集。当您持久化RDD时,每个节点将其计算的任何分区存储在内存中,并在该数据集(或从该数据集派生的数据集)上的其他操作中重用这些分区。这使得未来的行动更快(通常超过10倍)。缓存是迭代算法和快速交互使用的关键工具。
如何进行持久化
可以使用persist()或cache()方法将RDD标记为持久化。第一次在动作中计算时,它将保存在节点的内存中。Spark的缓存是容错的——如果RDD的任何分区丢失,它将使用最初创建它的转换自动重新计算。
持久化的方法就是rdd.persist()或者rdd.cache()
Cache底层调用的是persist无参构造,而persist无参构造默认是将数据缓存到内存当中。
Persist可以选择缓存机制
跨任务支持通用的读写共享变量将是低效的(低效). 然而,Spark确实为两种常见的使用模式提供了两种有限类型的共享变量:广播变量和累加器。
就是说,为了能够更加高效的在driver和算子之间共享数据,spark提供了两种有限的共享变量,一者广播变量,一者累加器
定义广播变量注意点
变量一旦被定义为一个广播变量,那么这个变量只能读,不能修改
accumulator累加器
accumulator累加器的概念和mr中出现的counter计数器的概念有异曲同工之妙,对某些具备某些特征的数据进行累加。累加器的一个好处是,不需要修改程序的业务逻辑来完成数据累加,同时也不需要额外的触发一个action job来完成累加,反之必须要添加新的业务逻辑,必须要触发一个新的action job来完成,显然这个accumulator的操作性能更佳!
Join既有可能是窄依赖,也有可能是宽依赖。
Spark目前支持Hash分区和Range分区,用户也可以自定义分区,Hash分区为当前的默认分区,Spark中分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle过程属于哪个分区和Reduce的个数。
分区的决定,就是在宽依赖的过程中才有,窄依赖因为是一对一,分区确定的,所以不需要指定分区操作。
HashPartitioner
def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("demo").setMaster("local[*]") val sc = new SparkContext(conf) sc.setLogLevel("WARN") //加载数据 val rdd = sc.parallelize(List((1,3),(1,2),(2,4),(2,3),(3,6),(3,8)),8) //通过Hash分区 val result: RDD[(Int, Int)] = rdd.partitionBy(new org.apache.spark.HashPartitioner(2)) //获取分区方式 println(result.partitioner) //获取分区数 println(result.getNumPartitions) }
一个action算子就形成了一个job
描述的是RDD的执行流程。
遇到action算子形成DAG有向无环图。
血统Lineage
RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(即血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。
启动%SPARK_HOME%\bin\spark-shell.cmd脚本
Spark分布式环境
sbin/start-all.sh sbin/stop-all.sh
提交任务&执行程序
[root@node01 spark]# bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master spark://node01:7077 \ --driver-memory 1g \ --executor-memory 1g \ --executor-cores 2 \ --queue default \ ./examples/jars/spark-examples_2.11-2.4.7.jar \ 100
Spark分布式HA环境安装
配置基于Zookeeper的ha,需要在spark-env.sh中添加一句话:
注释掉如下内容: #SPARK_MASTER_HOST=node01 export SPARK_MASTER_PORT=7077 添加上如下内容:配置的时候保证下面语句在一行,否则配置不成功,每个-D参数使用空格分开 export SPARK_DAEMON_JAVA_OPTS=" -Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=node01:2181,node02:2181,node03:2181 -Dspark.deploy.zookeeper.dir=/spark"
因为ha不确定master在node01上面启动,所以将
export SPARK_MASTER_HOST=node01注释掉,同步spark-env.sh到其它机器,重启spark集群,node1、node02启动master。
提交任务&执行程序:
[root@node01 spark]# bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master spark://node01:7077,node02:7077 \ --driver-memory 1g \ --executor-memory 1g \ --executor-cores 2 \ --queue default \ ./examples/jars/spark-examples_2.11-2.4.7.jar \ 100
动态上下线slave
spark]# sbin/start-slave.sh node01:7077 -c 4 -m 1024M spark]# sbin/stop-slave.sh node01:7077 -c 4 -m 1024M
Spark分布式Yarn环境
修改hadoop配置文件yarn-site.xml
[root@node01 hadoop]$ vi yarn-site.xml <!--是否启动一个线程检查每个任务正使用的物理内存量,如果任务超出分配值,则直接将其杀掉,默认是true --> <property> <name>yarn.nodemanager.pmem-check-enabled</name> <value>false</value> </property> <!--是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true --> <property> <name>yarn.nodemanager.vmem-check-enabled</name> <value>false</value> </property>
修改spark-env.sh
[root@node01 conf]# vi spark-env.sh YARN_CONF_DIR=/export/servers/hadoop-2.6.0-cdh5.14.0/etc/hadoop HADOOP_CONF_DIR=/export/servers/hadoop-2.6.0-cdh5.14.0/etc/hadoop
client模式
[root@node01 spark]# bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn \ --deploy-mode client \ ./examples/jars/spark-examples_2.11-2.4.7.jar \ 100
cluster模式
[root@node01 spark]# bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn \ --deploy-mode cluster \ ./examples/jars/spark-examples_2.11-2.4.7.jar \ 100
Sparkcore编写 WordCount 程序
package com.atguigu import org.apache.spark.{SparkConf, SparkContext} object WordCount{ def main(args: Array[String]): Unit = { //1.创建 SparkConf 并设置 App 名称 val conf = new SparkConf().setAppName("WC") //2.创建 SparkContext,该对象是提交 Spark App 的入口 val sc = new SparkContext(conf) //3.使用 sc 创建 RDD 并执行相应的 transformation 和 action sc.textFile(args(0)).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_, 1).sortBy(_._2, false).saveAsTextFile(args(1)) //4.关闭连接 sc.stop() } }
打包到集群测试
bin/spark-submit \ --class WordCount \ --master spark://hadoop102:7077 \ WordCount.jar \ /word.txt \ /out
高效写入数据库
def saveInfoMySQLByForeachPartition(rdd: RDD[(String, Int)]): Unit = { rdd.foreachPartition(partition => { //这是在partition内部,属于该partition的本地 Class.forName("com.mysql.jdbc.Driver") val url = "jdbc:mysql://localhost:3306/test" val connection = DriverManager.getConnection(url, "mark", "sorry") val sql = """ |insert into wordcounts(word, `count`) Values(?, ?) |""".stripMargin val ps = connection.prepareStatement(sql) partition.foreach{case (word, count) => { ps.setString(1, word) ps.setInt(2, count) ps.execute() }} ps.close() connection.close() }) }
它提供了 2 个编程抽象:DataFrame 和 DataSet,并且作为分布式 SQL 查询引擎的作用。
DataFrame比RDD多了一个表头信息(Schema:约束信息)
Dataset
相对于RDD,Dataset提供了强类型支持(泛型),也是在RDD的每行数据加了类型约束,下图1-7是官网对于dataset的表述。
一共分为两种:一种是DSL风格,一种是SQL风格。
DSL:利用算子进行数据分析,对编程能力有一定的要求。
SQL:利用sql语句进行数据的分析。
指的是结构化信息。
SparkCore:底层抽象:RDD 程序入口:SparkContext
SparkSql:底层抽象:DataFrame和DataSet 程序入口:SparkSession
DataFrame=RDD-泛型+schema+sql+优化
DataSet=RDD+schma+sql+优化
SparkSession的构建
val spark = SparkSession.builder() .appName("SparkSQLOps") .master("local[*]") //.enableHiveSupport()//支持hive的相关操作 .getOrCreate()
DataFrame的构建方式
package chapter1 import org.apache.spark.SparkContext import org.apache.spark.sql.{DataFrame, SparkSession} object Create_DataFrame { def main(args: Array[String]): Unit = { //创建程序入口 val spark: SparkSession = SparkSession.builder().appName("createDF").master("local[*]").getOrCreate() //调用sparkContext val sc: SparkContext = spark.sparkContext //设置控制台日志输出级别 sc.setLogLevel("WARN") //从数据源创建DataFrame val personDF: DataFrame = spark.read.json("examples/src/main/resources/people.json") //展示数据 personDF.show() } }
从RDD进行转换:
val personDF: DataFrame = personRDD.toDF("id","name","age")
通过反射创建DataFrame:
val personDF: DataFrame = personRDD.toDF()
动态编程
val df = spark.createDataFrame(row, schema) val list = List( new Student(1, "王盛芃", 1, 19), new Student(2, "李金宝", 1, 49), new Student(3, "张海波", 1, 39), new Student(4, "张文悦", 0, 29) ) import spark.implicits._ val ds = spark.createDataset[Student](list)
Row:代表的是二维表中的一行记录,或者就是一个Java对象
spark.read.format(数据文件格式).load(path)
//导包 import spark.implicits._ //第一种方式 //加载json文件 val personDF: DataFrame = spark.read.format("json").load("E:\\data\\people.json") //加载parquet文件 val personDF1: DataFrame = spark.read.format("parquet").load("E:\\data\\people.parquet") //加载csv文件,csv文件有些特殊,如果想要带上表头,必须调用option方法 val person2: DataFrame = spark.read.format("csv").option("header","true").load("E:\\data\\people.csv") //加载数据库当中的表 val personDF3: DataFrame = spark.read .format("jdbc") .option("url", "jdbc:mysql://localhost:3306/bigdata") .option("user", "root") .option("password", "root") .option("dbtable", "person") .load()
spark.read.json(path)
//第二种方式 //加载json文件 val personDF4: DataFrame = spark.read.json("E:\\data\\people.json") //加载parquet文件 val personDF5: DataFrame = spark.read.parquet("E:\\data\\people.parquet") //加载csv文件,csv文件有些特殊,如果想要带上表头,必须调用option方法 val person6: DataFrame = spark.read.option("header","true").csv("E:\\data\\people.csv") //加载数据库当中的表 val properties = new Properties() properties.put("user", "root") properties.put("password", "root") val personDF7: DataFrame = spark.read.jdbc("jdbc:mysql://localhost:3306/bigdata", "person", properties)
//第一种方式 //保存为json文件 personDF.write.format("json").save("E:\\data\\json") //保存为parquet文件 personDF.write.format("parquet").save("E:\\data\\parquet") //保存为csv文件,想要带上表头,调用option方法 personDF.write.format("csv").option("header","true").save("E:\\data\\csv") //保存为数据库当中的表 personDF.write .format("jdbc") .option("url", "jdbc:mysql://localhost:3306/bigdata") .option("user", "root") .option("password", "root") .option("dbtable", "person").save()
//第二种方式 //保存为parque文件 personDF.write.parquet("E:\\data\\parquet") //保存为csv文件 personDF.write.option("header", "true").csv("E:\\data\\csv") //保存为json文件 personDF.write.format("json").save("E:\\data\\json") //保存为数据库的表 val props = new Properties() props.put("user","root") props.put("password","root") personDF.write.jdbc("jdbc:mysql://localhost:3306/bigdata","person",props)
1、需要引入hive的hive-site.xml,添加classpath目录下面即可,或者放到$SPARK_HOME/conf
2、为了能够正常解析hive-site.xml中hdfs路径,需要将hdfs-site.xml和core-site.xml到classpath下面
package chapter5 import org.apache.spark.SparkContext import org.apache.spark.sql.SparkSession object Hive_Support { def main(args: Array[String]): Unit = { //创建sparkSql程序入口 val spark: SparkSession = SparkSession.builder() .appName("demo") .master("local[*]") .enableHiveSupport() .getOrCreate() //调用sparkContext val sc: SparkContext = spark.sparkContext //设置日志级别 sc.setLogLevel("WARN") //导包 import spark.implicits._ //查询hive当中的表 spark.sql("show tables").show() //创建表 spark.sql("CREATE TABLE person (id int, name string, age int) row format delimited fields terminated by ' '") //导入数据 spark.sql("load data local inpath'./person.txt' into table person") //查询表当中数据 spark.sql("select * from person").show() } }
import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.{DataFrame, SparkSession} object UDF_Demo { def main(args: Array[String]): Unit = { //创建sparkSql程序入口 val spark: SparkSession = SparkSession.builder().appName("demo").master("local[*]").getOrCreate() //调用sparkContext val sc: SparkContext = spark.sparkContext //设置日志级别 sc.setLogLevel("WARN") //导包 import spark.implicits._ //加载文件 val personDF: DataFrame = spark.read.json("E:\\data\\people.json") //展示数据 //personDF.show() //注册成为一张表 personDF.createOrReplaceTempView("t_person") //赋予什么功能 val fun = (x:String)=>{ "Name:"+x } //没有addName这个函数,就注册它 spark.udf.register("addName",fun) //查询 spark.sql("select name,addName(name) from t_person").show() //释放资源 spark.stop() }}
开窗函数
row_number() over (partitin by XXX order by XXX)
rank() 跳跃排序,有两个第二名是,后边跟着的是第四名
dense_rank() 连续排序,有两个第二名是,后边跟着的是第三名
row_number() 连续排序,两个值相同排序也是不同
package com.zg.d03 import org.apache.spark.sql.SparkSession import org.apache.spark.{SparkConf, SparkContext} case class StudentScore(name:String,clazz:Int,score:Int) object SparkSqlOverDemo { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("sparksqlover") val sc = new SparkContext(conf) val spark = SparkSession.builder().config(conf).getOrCreate() val arr01 = Array(("a",1,88), ("b",1,78), ("c",1,95), ("d",2,74), ("e",2,92), ("f",3,99), ("g",3,99), ("h",3,45), ("i",3,53), ("j",3,78)) import spark.implicits._ val scoreRDD = sc.makeRDD(arr01).map(x=>StudentScore(x._1,x._2,x._3)).toDS scoreRDD.createOrReplaceTempView("t_score") //查询t_score表数据 spark.sql("select * from t_score").show() //使用开窗函数查找topN,rank() 跳跃排序,有两个第二名是,后边跟着的是第四名 spark.sql("select name,clazz,score, rank() over( partition by clazz order by score desc ) rownum from t_score ").show() //讲使用开窗函数后的查询结果作为一张临时表,这个临时表有每个班的成绩排名,再取前三名 spark.sql("select * from (select name,clazz,score, rank() over( partition by clazz order by score desc ) rownum from t_score) t1 where rownum <=3 ").show() } }
//SQL风格操作 /*positionDF.createOrReplaceTempView("t_position") val sql = """ |select position.workName as workNames,count(*) as counts |from( |select explode(data.list) as position |from t_position) |group by workNames |order by counts desc """.stripMargin spark.sql(sql).show()*/
第二代的流式处理框架,短时间内生成mirco-batch,提交一次作业。准实时,延迟略高,秒级或者亚秒级延迟。
SparkStreaming是SparkCore的api的一种扩展,使用DStream(discretized stream or DStream)作为数据模型,基于内存处理连续的数据流,本质上还是RDD的基于内存的计算。
DStream,本质上是RDD的序列。
实时计算:以事件为驱动。来一条数据就驱动着立刻处理一条数据。
准实时计算:以时间为驱动。不管是否接收到数据,达到时间节点再处理。
主要学习transform,updateByKey,window函数。
object SparkStreamingWordCountOps { def main(args: Array[String]): Unit = { /* StreamingContext的初始化,需要至少两个参数,SparkConf和BatchDuration SparkConf不用多说 batchDuration:提交两次作业之间的时间间隔,每次会提交一个DStream,将数据转化batch--->RDD 所以说:sparkStreaming的计算,就是每隔多长时间计算一次数据 */ val conf = new SparkConf() .setAppName("SparkStreamingWordCount") .setMaster("local[*]") val duration = Seconds(2) val ssc = new StreamingContext(conf, duration) //业务 //为了执行的流式计算,必须要调用start来启动 ssc.start() //为了不至于start启动程序结束,必须要调用awaitTermination方法等待程序业务完成之后调用stop方法结束程序,或者异常 ssc.awaitTermination() } }
awaitTermination
要想持续不断的执行streaming计算,就必须要调用awaitTermination方法,以便driver能够在后台常驻
无法读取手动拷贝,或者剪切到指定目录下的文件,只能读取通过流写入的文件。
正常情况下,我们可以读取到通过put上传的文件,还可以读取通过cp拷贝的文件,但是读取不了mv移动的文件。
读取文件的这种方式,没有额外的Receiver消耗线程资源,所以可以指定master为local
object SparkStreamingHDFS { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN) Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.spark_project").setLevel(Level.WARN) val conf = new SparkConf() .setAppName("SparkStreamingHDFS") .setMaster("local") val duration = Seconds(2) val ssc = new StreamingContext(conf, duration) //读取local中数据 --->需要通过流的方式写入 // val lines = ssc.textFileStream("file:///E:/data/monitored") //hdfs val lines = ssc.textFileStream("hdfs://node01:9000/data/spark") lines.print() ssc.start() ssc.awaitTermination() } }
Receiver方式:
Spark去kafka当中抽取数据,并将数据保存在executor的内存当中,但是往往会因为底层的计算失败而造成数据的丢失,有解决办法,开启WAL预写日志,将kafka当中的数据不仅保存在executor的内存中,还要保存在WAL预写日志当中一份,这样,加入内存中的数据丢失,还能从WAL当中进行恢复,但是这样造成了数据的冗余。
Direct模式:
Spark每隔批次间隔就去kafka当中,读取每个topic下面每个partition当中最新的偏移量范围,数据依然保存在kafka当中,如果计算失败,只要kafka保存数据时间足够长,就可以进行无限次的恢复,不会造成数据的冗余。
直连模式特点:batch time 每隔一段时间,去kafka读取一批数据,然后消费
简化并行度,rdd的分区数量=topic的分区数量
数据存储于kafka中,没有数据冗余
不存在单点问题
效率高
可以实现仅消费一次的语义 exactly-once语义
earliest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
latest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
(1)从kafka读取数据
import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.serializer.KryoSerializer import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} /* SparkStremaing从kafka中读取数据 PerPartitionConfig spark.streaming.kafka.maxRatePerPartition spark.streaming.kafka.minRatePerPartition 都是代表了streaming程序消费kafka速率, max: 每秒钟从每个分区读取的最大的纪录条数 max=10,分区个数为3,间隔时间为2s 所以这一个批次能够读到的最大的纪录条数就是:10*3*2=60 如果配置的为0,或者不设置,起速率没有上限 min: 每秒钟从每个分区读取的最小的纪录条数 那么也就意味着,streaming从kafka中读取数据的速率就在[min, max]之间 执行过程中出现序列化问题: serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord spark中有两种序列化的方式 默认的就是java序列化的方式,也就是写一个类 implement Serializable接口,这种方式的有点事非常稳定,但是一个非常的确定是性能不佳 还有一种高性能的序列化方式——kryo的序列化,性能非常高,官方给出的数据是超过java的序列化性能10倍,同时在使用的时候只需要做一个声明式的注册即可 sparkConf.set("spark.serializer", classOf[KryoSerializer].getName)//指定序列化的方式 .registerKryoClasses(Array(classOf[ConsumerRecord[String, String]]))//注册要序列化的类 */ object StreamingFromKafkaOps { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setMaster("local") .setAppName("StreamingFromKafkaOps") // .set("spark.serializer", classOf[KryoSerializer].getName) // .registerKryoClasses(Array(classOf[ConsumerRecord[String, String]])) //两次流式计算之间的时间间隔,batchInterval val batchDuration = Seconds(2) // 每隔2s提交一次sparkstreaming的作业 val ssc = new StreamingContext(conf, batchDuration) val topics = Set("hadoop") val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "node01:9092,node02:9092,node03:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "spark-kafka-grou-0817", "auto.offset.reset" -> "earliest", "enable.auto.commit" -> "false" ) /* 从kafka中读取数据 locationStrategy:位置策略 制定如何去给特定的topic和partition来分配消费者进行调度,可以通过LocationStrategies来得到实例。 在kafka0.10之后消费者先拉取数据,所以在适当的executor来缓存executor操作对于提高性能是非常重要的。 PreferBrokers: 如果你的executor在kafka的broker实例在相同的节点之上可以使用这种方式。 PreferConsistent: 大多数情况下使用这个策略,会把partition分散给所有的executor PreferFixed: 当网络或者设备性能等个方便不均衡的时候,可以蚕蛹这种方式给特定executor来配置特定partition。 不在这个map映射中的partition使用PreferConsistent策略 consumerStrategy:消费策略 配置在driver或者在executor上面创建的kafka的消费者。该接口封装了消费者进程信息和相关的checkpoint数据 消费者订阅的时候的策略: Subscribe : 订阅多个topic进行消费,这多个topic用集合封装 SubscribePattern : 可以通过正则匹配的方式,来订阅多个消费者,比如订阅的topic有 aaa,aab,aac,adc,可以通过a[abc](2)来表示 Assign : 指定消费特定topic的partition来进行消费,是更加细粒度的策略 */ val message:InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe(topics, kafkaParams)) // message.print()//直接打印有序列化问题 message.foreachRDD((rdd, bTime) => { if(!rdd.isEmpty()) { println("-------------------------------------------") println(s"Time: $bTime") println("-------------------------------------------") rdd.foreach(record => { println(record) }) } }) ssc.start() ssc.awaitTermination() } }
transform是一个transformation算子,转换算子。
/* transform,是一个transformation操作 transform(p:(RDD[A]) => RDD[B]):DStream[B] 类似的操作 foreachRDD(p: (RDD[A]) => Unit) transform的一个非常重要的一个操作,就是来构建DStream中没有的操作,DStream的大多数操作都可以用transform来模拟 比如map(p: (A) => B) ---> transform(rdd => rdd.map(p: (A) => B)) */
import org.apache.spark.rdd.RDD import org.apache.spark.streaming.dstream.{ DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} object Advertising_ranking { def main(args: Array[String]): Unit = { //创建程序入口 val conf: SparkConf = new SparkConf().setAppName("advertising").setMaster("local[*]") val sc = new SparkContext(conf) val ssc = new StreamingContext(sc,Seconds(5)) //设置日志级别 sc.setLogLevel("WARN") //接收数据 val data: ReceiverInputDStream[String] = ssc.socketTextStream("node01",9999) //切分数据 val spliData: DStream[String] = data.flatMap(_.split(" ")) //每条点击流日志记为1次 val pageAndOne: DStream[(String, Int)] = spliData.map((_,1)) //聚合相同的点击流 val pageAndCount: DStream[(String, Int)] = pageAndOne.reduceByKey(_+_) //遍历DStream中封装的RDD进行操作 val resultSorted: DStream[(String, Int)] = pageAndCount.transform(rdd => { //对RDD当中的数据进行倒叙排名 val sorted: RDD[(String, Int)] = rdd.sortBy(_._2, false) //从排名数据中取top3 val topThree: Array[(String, Int)] = sorted.take(3) //打印输出 topThree.foreach(println) //因为transform需要返回值 sorted }) //打印整体排名情况 resultSorted.print() //启动sparkStreaming ssc.start() //让其一直启动,等待程序关闭 ssc.awaitTermination() } }
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} object UpdateStateByKey_Demo { def updateFunc(currentValue:Seq[Int],historyValue:Option[Int]): Option[Int] = { val result: Int = currentValue.sum+historyValue.getOrElse(0) Some(result) } def main(args: Array[String]): Unit = { //创建sparkStreaming程序入口 val conf: SparkConf = new SparkConf().setAppName("demo").setMaster("local[*]") val sc = new SparkContext(conf) val ssc = new StreamingContext(sc,Seconds(5)) //设置日志级别 sc.setLogLevel("WARN") //设置检查点,用来保存历史状态 ssc.checkpoint("./999") //接收数据 val file: ReceiverInputDStream[String] = ssc.socketTextStream("node01",9999) //切分 val spliFile: DStream[String] = file.flatMap(_.split(" ")) //每个单词记为1次 val wordAndOne: DStream[(String, Int)] = spliFile.map((_,1)) //进行有状态转化操作 val wordAndCount: DStream[(String, Int)] = wordAndOne.updateStateByKey(updateFunc) //打印输出 wordAndCount.print() //开启sparkStreaming ssc.start() //让其一直开启,等待关闭 ssc.awaitTermination() } }
/** * window窗口操作 * 流式无界,所以我们要向进行全局的统计肯定是行不通的,那么我们可以对这个无界的数据集进行切分, * 被切分的这每一个小的区间,我们可以理解为window, * 而sparkstreaming是准实时流计算,微小的批次操作,可以理解为是一个特殊的window窗口操作。 * * 理论上,对这个窗口window的划分,有两种情况,一种就是按照数据的条数,另外一种就是按照时间。 * 但是在sparkstreaming中目前仅支持后者,也就是说仅支持基于时间的窗口,需要提供两个参数 * 一个参数是窗口的长度:window_length * 另外一个参数是窗口的计算频率:sliding_interval ,每隔多长时间计算一次window操作 * * streaming程序中还有一个interval是batchInterval,那这两个interval有什么关系? * batchInterval,每隔多长时间启动一个spark作业,而是每隔多长时间为程序提交一批数据 * * 特别需要注意的是: * window_length和sliding_interval都必须是batchInterval的整数倍。 * * 总结: * window操作 * 每隔M长的时间,去统计N长时间内产生的数据 * M被称之sliding_interval,窗口的滑动频率 * N被称之window_length,窗口的长度 * 该window窗口是一个滑动的窗口。 * * 当sliding_interval > window_length的时候,会出现窗口的空隙 * 当sliding_interval < window_length的时候,会出现窗口的重合 * 当sliding_interval = window_length的时候,两个窗口是严丝合缝的 * * batchInterval=2s * sliding_interval=4s * window_length=6s */ object WindowOps { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setMaster("local[*]") .setAppName("WindowOps") //两次流式计算之间的时间间隔,batchInterval val batchDuration = Seconds(2) // 每隔2s提交一次sparkstreaming的作业 val ssc = new StreamingContext(conf, batchDuration) val lines = ssc.socketTextStream("node01", 9999) val words = lines.flatMap(line => line.split("\\s+")) val pairs = words.map(word => (word, 1)) val ret = pairs.reduceByKeyAndWindow(_+_, windowDuration = Seconds(6), slideDuration = Seconds(4)) ret.print ssc.start() ssc.awaitTermination() } }