docker-compose.yml
version: '2' services: spark: image: docker.io/bitnami/spark:3 environment: - SPARK_MODE=master - SPARK_RPC_AUTHENTICATION_ENABLED=no - SPARK_RPC_ENCRYPTION_ENABLED=no - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no - SPARK_SSL_ENABLED=no ports: - '8080:8080' spark-worker-1: image: docker.io/bitnami/spark:3 environment: - SPARK_MODE=worker - SPARK_MASTER_URL=spark://spark:7077 - SPARK_WORKER_MEMORY=1G - SPARK_WORKER_CORES=1 - SPARK_RPC_AUTHENTICATION_ENABLED=no - SPARK_RPC_ENCRYPTION_ENABLED=no - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no - SPARK_SSL_ENABLED=no spark-worker-2: image: docker.io/bitnami/spark:3 environment: - SPARK_MODE=worker - SPARK_MASTER_URL=spark://spark:7077 - SPARK_WORKER_MEMORY=1G - SPARK_WORKER_CORES=1 - SPARK_RPC_AUTHENTICATION_ENABLED=no - SPARK_RPC_ENCRYPTION_ENABLED=no - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no - SPARK_SSL_ENABLED=no
在cmd
中cd
到yml
所在的目录,执行
docker-compose up
等待安装完成并且启动完成
可在docker-desktop中查看启动的集群
在浏览器中输入localhost:8080
访问master 的web UI
:
编写利用python脚本生成1KB、1MB、10MB、100MB的文本
def txtwriter(count, file_name): for i in range(len(count)): for j in range(count[i]): with open(file_name[i], mode='a', encoding='utf-8') as file_obj: file_obj.write('apple peach pear\n') print(str(i)+" "+str(j)+ file_name[i]) if __name__ == "__main__": count = [64, 64*1024, 64*1024*10, 64*1024*100] #1KB 1Mb 10MB 100Mb file_name = ["1KB", "1Mb", "10MB", "100Mb"] txtwriter(count, file_name)
编写wordcount以及计时脚本
from pyspark import SparkConf, SparkContext import sys import time import os def wordcount(file_path): counts = sc.textFile(file_path)\ .flatMap(lambda line: line.split(' '))\ .map(lambda x: (x, 1))\ .reduceByKey(lambda a, b: a+b) output = counts.collect() for(word, count) in output: print('%s : %i'%(word, count)) # def txtwriter(count, file_name): # for i in range(len(count)): # for j in range(count[i]): # with open(file_name[i], mode='a', encoding='utf-8') as file_obj: # file_obj.write('apple peach pear\n') if __name__ == "__main__": count = [64, 64*1024, 64*1024*10, 64*1024*100] #1KB 1Mb 10MB 100Mb file_name = ["1KB", "1Mb", "10MB", "100Mb"] # txtwriter(count, file_name) for i in range(len(file_name)): starttime = time.time() conf = SparkConf() sc = SparkContext(conf = conf) wordcount(file_path=file_name[i]) endtime = time.time() print('time:', endtime-starttime) with open("time.txt", mode='a', encoding='utf-8') as file_obj: file_obj.write(str(endtime-starttime) + '\n') sc.stop() # for i in range(file_name): # os.remove(file_name[i])
将数据上传到集群中
docker cp cluster_test.py 8c089a440dd5:/tmp/test docker cp txtw.py 8c089a440dd5:/tmp/test ......
wordcount
在master主机中执行
spark-submit --master local[1] cluster_test.py
计算结果
数据大小 | 1KB | 1MB | 10MB | 100MB |
---|---|---|---|---|
执行时间 | 6.970337629318237 | 2.368252992630005 | 11.44127345085144 | 102.59012055397034 |
wordcount
在master主机中执行
spark-submit --master local[2] cluster_test.py
计算结果
数据大小 | 1KB | 1MB | 10MB | 100MB |
---|---|---|---|---|
执行时间 | 7.166856050491333 | 1.9559352397918701 | 6.257161378860474 | 61.2608277797699 |
wordcount
在master主机中执行
spark-submit --master spark://8c089a440dd5:7077 cluster_test.py
计算结果
数据大小 | 1KB | 1MB | 10MB | 100MB |
---|---|---|---|---|
执行时间 | 11.847958087921143 | 9.145256996154785 | 13.520023584365845 | 68.8401427268982 |
wordcount
结果汇总数据大小 | 1KB | 1MB | 10MB | 100MB |
---|---|---|---|---|
单线程(one worker) | 6.970337629318237 | 2.368252992630005 | 11.44127345085144 | 102.59012055397034 |
多线程(two workers) | 7.166856050491333 | 1.9559352397918701 | 6.257161378860474 | 61.2608277797699 |
分布式(two workers) | 11.847958087921143 | 9.145256996154785 | 13.520023584365845 | 68.8401427268982 |
由表可以看到,分布式在数据量较小时所花时间最长,推测为系统调度消耗时间较多,但数据量大时,分布式的处理时间是显著减少的。单机处理时,数据量较小的时候消耗时间是小于分布式的,并且多线程处理是显著优于单线程的,单机处理的能力毕竟有限,可以推测分布式机器数量增多时,在处理大量数据时能力是优于单机处理的。