Spark 创建 DataFrame 的7种方法
准备原始数据
cat /tmp/data3.txt
zhang,34
wang,23
li,45
使用 toDF 传入参数创建DataFrame
按照顺序传入
object WordCount {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[*]").appName("myApp").getOrCreate()
val lines = spark.sparkContext.textFile("/tmp/data3.txt")
import spark.implicits._
val df = lines.map(line => {
val fields = line.split(",")
(fields(0), fields(1).toInt)
}).toDF("name", "age")
df.printSchema()
df.show()
}
}
使用 case class 创建DataFrame
case class Boy(name: String, age: Int)
object WordCount {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[*]").appName("myApp").getOrCreate()
val lines = spark.sparkContext.textFile("/tmp/data3.txt")
import spark.implicits._
val df = lines.map(line => {
val fields = line.split(",")
Boy(fields(0), fields(1).toInt)
}).toDF()
df.printSchema()
df.show()
}
}
使用 scala class 创建DataFrame
import scala.beans.BeanProperty
class Boy(@BeanProperty val name: String, @BeanProperty val age: Int)
object WordCount {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[*]").appName("myApp").getOrCreate()
val lines = spark.sparkContext.textFile("/tmp/data3.txt")
val boys = lines.map(line => {
val fields = line.split(",")
new Boy(fields(0), fields(1).toInt)
})
val df = spark.createDataFrame(boys, classOf[Boy])
df.printSchema()
df.show()
}
}
使用 java class 创建DataFrame
java class中必须要添加 getter 方法
// java 类
public class Boy {
private String name;
private Integer age;
public Boy() {
}
public Boy(String name, Integer age) {
this.name = name;
this.age = age;
}
public String getName() {
return name;
}
public Integer getAge() {
return age;
}
}
使用
object WordCount {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[*]").appName("myApp").getOrCreate()
val lines = spark.sparkContext.textFile("/tmp/data3.txt")
val boys = lines.map(line => {
val fields = line.split(",")
new Boy(fields(0), fields(1).toInt)
})
val df = spark.createDataFrame(boys, classOf[Boy])
df.printSchema()
df.show()
}
}
使用 StructType 创建DataFrame
使用 Row RDD 和 StructType 创建
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
object WordCount {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[*]").appName("myApp").getOrCreate()
val lines = spark.sparkContext.textFile("/tmp/data3.txt")
val rowBoys: RDD[Row] = lines.map(line => {
val fields = line.split(",")
Row(fields(0), fields(1).toInt)
})
val schema = StructType(
List(
StructField("name", StringType),
StructField("age", IntegerType)
)
)
val df: DataFrame = spark.createDataFrame(rowBoys, schema)
df.printSchema()
df.show()
}
}
读取 JSON 创建DataFrame
原始数据
cat /tmp/data4.json
{"name":"zhang", "age":34}
{"name":"wang", "age":23}
{"name":"li"}
- 在执行 spark.read.json 方法时,立即进行的数据读取,因为它需要知道数据的schema信息。类似地,Spark同样也可以读取其他的数据格式,然后直接得到dataframe,当有些字段没有时用null代替
object WordCount {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[*]").appName("myApp").getOrCreate()
import spark.implicits._
val df = spark.read.json("/tmp/data4.json").where($"_corrupt_record".isNull)
df.printSchema()
df.show()
}
}
- 当json数据有异常时,会保存在 _corrupt_record 字段。
读取 CSV 创建DataFrame
csv读取schema时,不需要读取整体的数据,只需要读取前几行即可
object WordCount {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[*]").appName("myApp").getOrCreate()
val df = spark.read.option("inferSchema", "true").csv("/tmp/data5.csv")
df.printSchema()
df.show()
}
}
- option("inferSchema", "true") 字段自动类型推断
指定 schema
object WordCount {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[*]").appName("myApp").getOrCreate()
val schema = StructType(
List(
StructField("name", StringType),
StructField("age", IntegerType)
)
)
val df = spark.read.schema(schema).csv("/tmp/data5.csv")
df.printSchema()
df.show()
}
}