C/C++教程

Spark学习总结-Spark-Core

本文主要是介绍Spark学习总结-Spark-Core,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

Spark-Core

一 Driver和Executor通信

Driver相当于Client,Executor相当于Server

  • Driver代码
package com.zxy.Socket

import java.io.OutputStream
import java.net.Socket


object Driver {
    def main(args: Array[String]): Unit = {
        //连接服务器
        val client: Socket = new Socket("localhost",9999)
        
        //发送数据
        val out: OutputStream = client.getOutputStream
        
        out.write(2)
        out.flush()
        out.close()
        client.close()
    }
}
  • Executor代码
package com.zxy.Socket

import java.io.InputStream
import java.net.{ServerSocket, Socket}

object Executor {
    def main(args: Array[String]): Unit = {
        //启动服务器,接受数据
        val server: ServerSocket = new ServerSocket(9999)
        
        println("服务器启动,等待数据")
        //等待客户端连接接收数据
        val client: Socket = server.accept()
        
        val in: InputStream = client.getInputStream
        
        val i: Int = in.read()
        
        println(s"接收到客户端数据 + ${i}")
        client.close()
        server.close()
    }
}

先启动服务端Executor,等待数据

启动客户端Driver,建立连接发送数据

二 案例引入Spark三大数据结构

1 案例

修改以上案例,使用两个服务端Executor接收数据
将Task中的数据分开计算
  • Executor1

package com.zxy.Socket

import java.io.{InputStream, ObjectInputStream}
import java.net.{ServerSocket, Socket}

object Executor1 {
    def main(args: Array[String]): Unit = {
        //启动服务器,接受数据
        val server: ServerSocket = new ServerSocket(8888)
        
        println("服务器启动,等待数据")
        //等待客户端连接接收数据
        val client: Socket = server.accept()
        
        val in: InputStream = client.getInputStream
    
        val TaskOBJ2: ObjectInputStream = new ObjectInputStream(in)
        val task: SubTask = TaskOBJ2.readObject().asInstanceOf[SubTask]
        
        val ints: List[Int] = task.computer()

        println(s"计算[8888]后的结果是: ${ints}")
        TaskOBJ2.close()
        client.close()
        server.close()
        
    }
}

  • Executor2

package com.zxy.Socket

import java.io.{InputStream, ObjectInputStream}
import java.net.{ServerSocket, Socket}

object Executor2 {
    def main(args: Array[String]): Unit = {
        //启动服务器,接受数据
        val server: ServerSocket = new ServerSocket(9999)
        
        println("服务器启动,等待数据")
        //等待客户端连接接收数据
        val client: Socket = server.accept()
        
        val in: InputStream = client.getInputStream
    
        val TaskOBJ1: ObjectInputStream = new ObjectInputStream(in)
        val task: SubTask = TaskOBJ1.readObject().asInstanceOf[SubTask]
        
        val ints: List[Int] = task.computer()

        println(s"计算[9999]后的结果是: ${ints}")
        TaskOBJ1.close()
        client.close()
        server.close()
        
    }
}

  • Driver

package com.zxy.Socket

import java.io.{ObjectOutputStream, OutputStream}
import java.net.Socket


object Driver {
    def main(args: Array[String]): Unit = {
        //连接服务器
        val client1: Socket = new Socket("localhost",8888)
        val client2: Socket = new Socket("localhost",9999)
    
        val task: Task = new Task()
        
        //server1发送数据
        val out1: OutputStream = client1.getOutputStream
        val TaskOBJ1: ObjectOutputStream = new ObjectOutputStream(out1)
    
        val subTask1 = new SubTask()
        subTask1.logic = task.logic
        subTask1.datas = task.datas.take(2)
        
        TaskOBJ1.writeObject(subTask1)
        TaskOBJ1.flush()
        TaskOBJ1.close()
        client1.close()
    
        //server2发送数据
        val out2: OutputStream = client2.getOutputStream
        val TaskOBJ2: ObjectOutputStream = new ObjectOutputStream(out2)
    
        val subTask2 = new SubTask()
        subTask2.logic = task.logic
        subTask2.datas = task.datas.takeRight(2)
        
        TaskOBJ2.writeObject(subTask2)
        TaskOBJ2.flush()
        TaskOBJ2.close()
        client2.close()
        println("数据发送完毕")
    }
}

  • Task

package com.zxy.Socket

class Task extends Serializable {
    val datas = List(1,2,3,4)
    
    val logic:Int => Int = _ * 2
}

  • SubTask

package com.zxy.Socket

class SubTask extends Serializable {
    //初始值
    var datas:List[Int] = _
    
    var logic:Int => Int = _
    
    //计算
    def computer()={
        datas.map(logic)
    }
}

  • 执行效果

先启动Executor1,Executor2;
再启动Driver
Executor1:
    服务器启动,等待数据
    计算[8888]后的结果是: List(2, 4)

Executor2:
    服务器启动,等待数据
    计算[9999]后的结果是: List(6, 8)
Driver:
	数据发送完毕

2 Spark三大数据结构

	Spark 计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构,
用于处理不同的应用场景。三大数据结构分别是:
> RDD: 弹性分布式数据集
> 累加器:分布式共享只写变量
> 广播变量:分布式共享只读变量
这篇关于Spark学习总结-Spark-Core的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!