大数据这一术语正是产生在全球数据爆炸增长的背景下,用来形容庞大的数据集合。与传统的数据集合相比,大数据通常包含大量的非结构化数据,且大数据需要更多的实时分析。大数据作为“互联网+”行动计划的主要内容,其重要性得到了广泛重视。
RDD是Spark提供的最重要的抽象概念,它是一种有容错机制的特殊数据集合,可以分布在集群的结点上,以函数式操作集合的方式进行各种并行操作。通俗点来讲,可以将RDD理解为一个分布式对象集合,本质上是一个只读的分区记录集合。每个RDD可以分成多个分区,每个分区就是一个数据集片段。一个RDD的不同分区可以保存到集群中的不同结点上,从而可以在集群中的不同结点上进行并行计算。
1、环境安装,安装Spark和Java。
2、pyspark交互式编程。
3、编写独立应用程序实现数据去重。
4、编写独立应用程序实现求平均值问题。
5、实验结果查看。
Spark是一种与Hadoop相似的开源集群计算环境,但是两者之间还存在一些不同之处,这些有用的不同之处使 Spark 在某些工作负载方面表现得更加优越,换句话说,Spark启用了内存分布数据集,除了能够提供交互式查询外,它还可以优化迭代工作负载。
(1)选择相应的Spark版本等进行安装。
(1)进入终端使用下面命令登录系统。
sudo tar -zcf ~/下载/spark-1.6.2-bin-without-hadoop.tgz -C /usr/local sudo mv ./spark-1.6.2-bin-without-hadoop/ ./spark sudo chown -R hadoop:Hadoop ./spark
(1)使用以下命令对Spark配置文件进行修改。
cd /usr/local/spark cp ./conf/spark-evn.sh.template ./conf/spark-env.sh
(1)使用以下命令检验Spark是否安装成功。
cd /usr/local/spark Bin/run-example SparkPi
(2)安装成功,会出现以下图片。
(1)输入以下代码运行。
cd /usr/local/spark ./bin/spark-shell –master local[4]
(2)启动spark-shell后,就会进入“scala>”命令提示符状态。
Java是一门面向对象编程语言,不仅吸收了C++语言的各种优点,还摒弃了C++里难以理解的多继承、指针等概念,因此Java语言具有功能强大和简单易用两个特征。Java语言作为静态面向对象编程语言的代表,极好地实现了面向对象理论,允许程序员以优雅的思维方式进行复杂的编程。
(1)输入以下代码运行。
sudo unzip ~/下载/apache-maven-3.3.9-bin.zip -d /usr/local cd /usr/local sudo mv apache-maven-3.3.9/ ./maven sudo chown -R Hadoop ./mave
(2)第一行命令成功输入运行后。
(3)Java应用程序代码。
cd ~ mkdir -p ./sparkapp2/src/main/java
(4)在 ./sparkapp2/src/main/java 下建立一个名为 SimpleApp.java 的文件(vim ./sparkapp2/src/main/java/SimpleApp.java),添加相应代码。
(5)使用maven打包java程序。
(6)通过将生成的jar包通过spark-submit提交到Spark中运行,输入以下代码。
/usr/local/spark/bin/spark-submit –class “SimpleApp” ~/sparkapp2/target/simple-project-1.0.jar
由老师提供相应文档data.txt,该数据集包含了某大学计算机系的成绩。
(1)该系总共有多少学生。
>>> lines = sc.textFile("file:///usr/local/spark/zm/data.txt") >>> res = lines.map(lambda x:x.split(",")).map(lambda x: x[0]) #获取每行数据的第1列 >>> distinct_res = res.distinct() #去重操作 >>> distinct_res.count() #取元素总个数
(2)该系共开设了多少门课程。
>>> lines = sc.textFile("file:///usr/local/spark/zm/data.txt") >>> res = lines.map(lambda x:x.split(",")).map(lambda x:x[1]) #获取每行数据的第2列 >>> distinct_res = res.distinct() #去重操作 >>> distinct_res.count() #取元素总个数
(3)Tom同学的总成绩平均分是多少。
>>> lines = sc.textFile("file:///usr/local/spark/zm/data.txt") >>> res = lines.map(lambda x:x.split(",")).filter(lambda x:x[0]=="Tom") #筛选Tom同学的成绩信息 >>> res.foreach(print) >>> score = res.map(lambda x:int(x[2])) #提取Tom同学的每门成绩,并转换为int类型 >>> num = res.count() #Tom同学选课门数 >>> sum_score = score.reduce(lambda x,y:x+y) #Tom同学的总成绩 >>> avg = sum_score/num #总成绩/门数=平均分 >>> print(avg)
(4)求每名同学的选修的课程门数。
>>> lines = sc.textFile("file:///usr/local/spark/zm/data.txt") >>> res = lines.map(lambda x:x.split(",")).map(lambda x:(x[0],1)) #学生每门课程都对应(学生姓名,1),学生有n门课程则有n个(学生姓名,1) >>> each_res = res.reduceByKey(lambda x,y: x+y) #按学生姓名获取每个学生的选课总数 >>> each_res.foreach(print)
(5)该系DataBase课程共有多少人选修。
>>> lines = sc.textFile("file:///usr/local/spark/zm/data.txt") >>> res = lines.map(lambda x:x.split(",")).filter(lambda x:x[1]=="DataBase") >>> res.count()
(6)各门课程的平均分是多少。
>>> lines = sc.textFile("file:///usr/local/spark/zm/data.txt") >>> res = lines.map(lambda x:x.split(",")).map(lambda x:(x[1],(int(x[2]),1))) #为每门课程的分数后面新增一列1,表示1个学生选择了该课程。格式如('Network', (44, 1)) >>> temp = res.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1])) #按课程名聚合课程总分和选课人数。格式如('Network', (7370, 142)) >>> avg = temp.map(lambda x:(x[0], round(x[1][0]/x[1][1],2))) #课程总分/选课人数 = 平均分,并利用round(x,2)保留两位小数 >>> avg.foreach(print)
(7)使用累加器计算共有多少人选了DataBase这门课。
>>> lines = sc.textFile("file:///usr/local/spark/zm/data.txt") >>> res = lines.map(lambda x:x.split(",")).filter(lambda x:x[1]=="DataBase") #筛选出选了DataBase课程的数据 >>> accum = sc.accumulator(0) #定义一个从0开始的累加器accum >>> res.foreach(lambda x:accum.add(1)) #遍历res,每扫描一条数据,累加器加1 >>> accum.value #输出累加器的最终值
由老师提供相应文档A.txt和B.txt,对两个文件进行合并,并剔除其中重复的内容,得到一个新文件C。
from pyspark import SparkContext #初始化SparkContext sc1 = SparkContext('local','zm') #加载两个文件A和B lines1 = sc.textFile("file:///usr/local/spark/zm/A.txt") lines2 = sc.textFile("file:///usr/local/spark/zm/B.txt") #合并两个文件的内容 lines = lines1.union(lines2) #去重操作 distinct_lines = lines.distinct() #排序操作 res = distinct_lines.sortBy(lambda x:x) #将结果写入result文件中,repartition(1)的作用是让结果合并到一个文件中,不加的话会结果写入到两个文件 res.repartition(1).saveAsTextFile("file:///usr/local/spark/zm/result")
由老师提供相应文档Algorithm.txt、Database.txt、Python.txt,对两个文件进行合并,并剔除其中重复的内容,得到一个新文件C。
from pyspark import SparkContext #初始化SparkContext sc = SparkContext('local',' zmzm') #加载三个文件Algorithm.txt、Database.txt和Python.txt lines1 = sc.textFile("file:///usr/local/spark/zmzm/Algorithm.txt") lines2 = sc.textFile("file:///usr/local/spark/zmzm/Database.txt") lines3 = sc.textFile("file:///usr/local/spark/zmzm/Python.txt") #合并三个文件的内容 lines = lines1.union(lines2).union(lines3) #为每行数据新增一列1,方便后续统计每个学生选修的课程数目。 data = lines.map(lambda x:x.split(" ")).map(lambda x:(x[0],(int(x[1]),1))) #根据key也就是学生姓名合计每门课程的成绩,以及选修的课程数目。 res = data.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1])) #利用总成绩除以选修的课程数来计算每个学生的每门课程的平均分,并利用round(x,2)保留两位小数 result = res.map(lambda x:(x[0],round(x[1][0]/x[1][1],2))) #将结果写入result文件中,repartition(1)的作用是让结果合并到一个文件中,不加的话会结果写入到三个文件 result.repartition(1).saveAsTextFile("file:///usr/local/spark/zmzm/result")