Driver相当于Client,Executor相当于Server
package com.zxy.Socket import java.io.OutputStream import java.net.Socket object Driver { def main(args: Array[String]): Unit = { //连接服务器 val client: Socket = new Socket("localhost",9999) //发送数据 val out: OutputStream = client.getOutputStream out.write(2) out.flush() out.close() client.close() } }
package com.zxy.Socket import java.io.InputStream import java.net.{ServerSocket, Socket} object Executor { def main(args: Array[String]): Unit = { //启动服务器,接受数据 val server: ServerSocket = new ServerSocket(9999) println("服务器启动,等待数据") //等待客户端连接接收数据 val client: Socket = server.accept() val in: InputStream = client.getInputStream val i: Int = in.read() println(s"接收到客户端数据 + ${i}") client.close() server.close() } }
先启动服务端Executor,等待数据
启动客户端Driver,建立连接发送数据
修改以上案例,使用两个服务端Executor接收数据 将Task中的数据分开计算
package com.zxy.Socket import java.io.{InputStream, ObjectInputStream} import java.net.{ServerSocket, Socket} object Executor1 { def main(args: Array[String]): Unit = { //启动服务器,接受数据 val server: ServerSocket = new ServerSocket(8888) println("服务器启动,等待数据") //等待客户端连接接收数据 val client: Socket = server.accept() val in: InputStream = client.getInputStream val TaskOBJ2: ObjectInputStream = new ObjectInputStream(in) val task: SubTask = TaskOBJ2.readObject().asInstanceOf[SubTask] val ints: List[Int] = task.computer() println(s"计算[8888]后的结果是: ${ints}") TaskOBJ2.close() client.close() server.close() } }
package com.zxy.Socket import java.io.{InputStream, ObjectInputStream} import java.net.{ServerSocket, Socket} object Executor2 { def main(args: Array[String]): Unit = { //启动服务器,接受数据 val server: ServerSocket = new ServerSocket(9999) println("服务器启动,等待数据") //等待客户端连接接收数据 val client: Socket = server.accept() val in: InputStream = client.getInputStream val TaskOBJ1: ObjectInputStream = new ObjectInputStream(in) val task: SubTask = TaskOBJ1.readObject().asInstanceOf[SubTask] val ints: List[Int] = task.computer() println(s"计算[9999]后的结果是: ${ints}") TaskOBJ1.close() client.close() server.close() } }
package com.zxy.Socket import java.io.{ObjectOutputStream, OutputStream} import java.net.Socket object Driver { def main(args: Array[String]): Unit = { //连接服务器 val client1: Socket = new Socket("localhost",8888) val client2: Socket = new Socket("localhost",9999) val task: Task = new Task() //server1发送数据 val out1: OutputStream = client1.getOutputStream val TaskOBJ1: ObjectOutputStream = new ObjectOutputStream(out1) val subTask1 = new SubTask() subTask1.logic = task.logic subTask1.datas = task.datas.take(2) TaskOBJ1.writeObject(subTask1) TaskOBJ1.flush() TaskOBJ1.close() client1.close() //server2发送数据 val out2: OutputStream = client2.getOutputStream val TaskOBJ2: ObjectOutputStream = new ObjectOutputStream(out2) val subTask2 = new SubTask() subTask2.logic = task.logic subTask2.datas = task.datas.takeRight(2) TaskOBJ2.writeObject(subTask2) TaskOBJ2.flush() TaskOBJ2.close() client2.close() println("数据发送完毕") } }
package com.zxy.Socket class Task extends Serializable { val datas = List(1,2,3,4) val logic:Int => Int = _ * 2 }
package com.zxy.Socket class SubTask extends Serializable { //初始值 var datas:List[Int] = _ var logic:Int => Int = _ //计算 def computer()={ datas.map(logic) } }
先启动Executor1,Executor2; 再启动Driver Executor1: 服务器启动,等待数据 计算[8888]后的结果是: List(2, 4) Executor2: 服务器启动,等待数据 计算[9999]后的结果是: List(6, 8) Driver: 数据发送完毕
Spark 计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构, 用于处理不同的应用场景。三大数据结构分别是: > RDD: 弹性分布式数据集 > 累加器:分布式共享只写变量 > 广播变量:分布式共享只读变量