新建一个txt文件用来保存黑名单
在系统上找到sc
import findspark findspark.init() import pyspark sc = pyspark.SparkContext(appName="myAppName")
有的电脑直接就能找到,就不需要上面这段代码了
调取Streaming
from pyspark.sql import SparkSession from pyspark.streaming import StreamingContext from pyspark import SparkConf conf = SparkConf().setAppName("miniProject").setMaster("local[*]") sc=pyspark.SparkContext.getOrCreate(conf) ssc = StreamingContext(sc,10) lines = ssc.socketTextStream('127.0.0.1',8888)
看起来这个SparkConf像构建一个工作区,通过监听这个工作区来得到值。然后绑定这个窗口到8888端口。
由于是靠python来写的,与书上不一致,只有猜一下他的作用。
接下来就要对进来的数据进行筛选了。先测试一下(防止重启太麻烦)
blackForm = sc.textFile("./blackForm.txt") # 读取黑名单 blackForm=blackForm.map(lambda x:x.split(" ")) # 空格切分 rdd1 = sc.parallelize([["1111","hadoop"],["2222",'spark'],["3333","hive"]]) # 测试rdd rdd1=rdd1.map(lambda x:[x[1],x[0]]) rdd1.leftOuterJoin(blackForm).collect()
由于要求是用左连接。就改一下输入值的顺序
效果:
按这个图来说。理论上是可行的。
那就直接开始在启动程序里写
blackForm = sc.textFile("./blackForm.txt") blackForm=blackForm.map(lambda x:x.split(" ")) data = lines.map(lambda x:x.split(" ")) # 切分 data = data.map(lambda x:[x[1],x[0]]) # 转格式 data = data.transform(lambda x:x.leftOuterJoin(blackForm)) data = data.filter(lambda x:x[1][1]=='false') # 筛选 data = data.map(lambda x:(x[0],x[1][0])) # 转输出格式 data.pprint() ssc.start()
启动这段代码后在本机的窗口命令行输入nc -l -p 8888
然后输入
在监听行可以看到黑名单中为true的都被删选掉了
这里的转格式反了,但并不影响运行。如果输入nc -l -p … 无法输入。那么就换个奇怪点的端口。保证这个端口没被占用就行了。
读入数据查看格式。
test=sc.textFile("d:/data/web-Google.txt") test.take(10)
暂未可知。这明显构造不出来呀。用GraphFrame。Spark里面的DataFrame无法只创造1列啊。这构造出来一点也好理解啊。书上的GraphX应该才能构造出来。
看了半天,这样应该是可行的
from pyspark.sql import SparkSession from pyspark.sql import SQLContext from graphframes import * from pyspark.sql.types import * # 启动pyspark需要带参数 spark = SparkSession.builder.appName("testGraph") \ .master('local') \ .getOrCreate() sc = spark.sparkContext sqlContext = SQLContext(sc) countId = sc.textFile('D:/data/web-Google.txt').map(lambda x:x.split("\t")).map(lambda x:x[0]).distinct().map(lambda x:(int(x),x)) schema = StructType([ StructField("id", IntegerType(), True), StructField("from", StringType(), True), ]) df_v = sqlContext.createDataFrame(countId,schema)
这里的countId就是找到有多少不重复的ID,根据ID创建一个一模一样的value这样就可以用来创建DataFrame了,到后面再把这列from去掉
构造边
schema = StructType([ # true代表不为空 StructField("src", IntegerType(), True),# StructField("dst", IntegerType(), True),# StructField("relation", StringType(), True), ]) rdd2 = sc.textFile('D:/data/web-Google.txt') \ .map(lambda x:x.split("\t")) \ .map(lambda x:(int(x[0]),int(x[1]),"link")) df_e = sqlContext.createDataFrame(rdd2, schema)
构造图
g = GraphFrame(df_v,df_e)
最后出来的图效果
查询顶点和边的个数
countId即是我们的顶点。做一个计数则是顶点个数为73945,边就是edges的条数为5105039
查看每个网页被链接次数
from graphframes.lib import AggregateMessages as AM from pyspark.sql import functions as F g_new.aggregateMessages(F.count(AM.msg).alias("count"), sendToSrc="1").show()
向顶点发送1,然后count计数
筛选出id>1的组成子图
# 好像就是这样,有可能有问题 overOne = g_new.filterVertices('id > 1')