上一篇文章中,我们使用了Scala语言调用Spark SQL接口进行了开发,本篇文章我们使用Java语言进行同样业务功能的处理,依然是对JSON、Txt文本进行处理。
JSON和Txt文件内容如下所示:
{"name":"Michael"} {"name":"Andy", "age":30} {"name":"Justin", "age":19}
Michael, 29 Andy, 30 Justin, 19
Java处理JSON代码:
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; public class TestSQL { public static void main(String[] args) { SparkSession spark = SparkSession .builder().master("local") .appName("Java Spark SQL basic example") .config("spark.some.config.option", "some-value") .getOrCreate(); Dataset<Row> df = spark.read().json("file:///d:/test_spark/people.json"); df.show(); df.createOrReplaceTempView("people"); Dataset<Row> sqlDF = spark.sql("select * from people where age>20"); sqlDF.show(); } }
Java处理Txt代码,需要定义一个Person实体类:
public class Person { private String name; private long age; public String getName() { return name; } public void setName(String name) { this.name = name; } public long getAge() { return age; } public void setAge(long age) { this.age = age; } }
import com.alan.entity.Person; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.*; public class TestText { public static void main(String[] args) { SparkSession spark = SparkSession .builder().master("local") .appName("Java Spark SQL basic example") .config("spark.some.config.option", "some-value") .getOrCreate(); JavaRDD<Person> peopleRDD = spark.read() .textFile("d:/test_spark/people.txt") .javaRDD() .map(line -> { String[] parts = line.split(","); Person person = new Person(); person.setName(parts[0]); person.setAge(Integer.parseInt(parts[1].trim())); return person; }); // Apply a schema to an RDD of JavaBeans to get a DataFrame Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class); // Register the DataFrame as a temporary view peopleDF.createOrReplaceTempView("people"); // SQL statements can be run by using the sql methods provided by spark Dataset<Row> teenagersDF = spark.sql("select * from people where age>20"); teenagersDF.show(); } }