已复制
全屏展示
复制代码

Spark 创建 DataFrame 的7种方法


· 3 min read

准备原始数据

cat /tmp/data3.txt
zhang,34
wang,23
li,45

使用 toDF 传入参数创建DataFrame

按照顺序传入

object WordCount {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local[*]").appName("myApp").getOrCreate()
    val lines = spark.sparkContext.textFile("/tmp/data3.txt")
    
    import spark.implicits._
    val df = lines.map(line => {
      val fields = line.split(",")
      (fields(0), fields(1).toInt)
    }).toDF("name", "age")

    df.printSchema()
    df.show()
  }
}

使用 case class 创建DataFrame

case class Boy(name: String, age: Int)

object WordCount {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local[*]").appName("myApp").getOrCreate()
    val lines = spark.sparkContext.textFile("/tmp/data3.txt")
    
    import spark.implicits._
    val df = lines.map(line => {
      val fields = line.split(",")
      Boy(fields(0), fields(1).toInt)
    }).toDF()
    
    df.printSchema()
    df.show()
  }
}

使用 scala class 创建DataFrame

import scala.beans.BeanProperty

class Boy(@BeanProperty val name: String, @BeanProperty val age: Int)

object WordCount {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local[*]").appName("myApp").getOrCreate()
    val lines = spark.sparkContext.textFile("/tmp/data3.txt")
    val boys = lines.map(line => {
      val fields = line.split(",")
      new Boy(fields(0), fields(1).toInt)
    })
    val df = spark.createDataFrame(boys, classOf[Boy])
    
    df.printSchema()
    df.show()
  }
}

使用 java class 创建DataFrame

java class中必须要添加 getter 方法

// java 类
public class Boy {
    private String name;
    private Integer age;

    public Boy() {
    }

    public Boy(String name, Integer age) {
        this.name = name;
        this.age = age;
    }

    public String getName() {
        return name;
    }

    public Integer getAge() {
        return age;
    }
}

使用

object WordCount {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local[*]").appName("myApp").getOrCreate()
    val lines = spark.sparkContext.textFile("/tmp/data3.txt")
    val boys = lines.map(line => {
      val fields = line.split(",")
      new Boy(fields(0), fields(1).toInt)
    })
    val df = spark.createDataFrame(boys, classOf[Boy])

    df.printSchema()
    df.show()
  }
}

使用 StructType 创建DataFrame

使用 Row RDD 和 StructType 创建

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

object WordCount {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local[*]").appName("myApp").getOrCreate()
    val lines = spark.sparkContext.textFile("/tmp/data3.txt")

    val rowBoys: RDD[Row] = lines.map(line => {
      val fields = line.split(",")
      Row(fields(0), fields(1).toInt)
    })

    val schema = StructType(
      List(
        StructField("name", StringType),
        StructField("age", IntegerType)
      )
    )
    val df: DataFrame = spark.createDataFrame(rowBoys, schema)

    df.printSchema()
    df.show()
  }
}

读取 JSON 创建DataFrame

原始数据

cat /tmp/data4.json
{"name":"zhang", "age":34}
{"name":"wang", "age":23}
{"name":"li"}
  • 在执行 spark.read.json 方法时,立即进行的数据读取,因为它需要知道数据的schema信息。类似地,Spark同样也可以读取其他的数据格式,然后直接得到dataframe,当有些字段没有时用null代替

object WordCount {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local[*]").appName("myApp").getOrCreate()

    import spark.implicits._
    val df = spark.read.json("/tmp/data4.json").where($"_corrupt_record".isNull)
    df.printSchema()
    df.show()
  }
}
  • 当json数据有异常时,会保存在 _corrupt_record 字段。

读取 CSV 创建DataFrame

csv读取schema时,不需要读取整体的数据,只需要读取前几行即可

object WordCount {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local[*]").appName("myApp").getOrCreate()

    val df = spark.read.option("inferSchema", "true").csv("/tmp/data5.csv")
    df.printSchema()
    df.show()
  }
}
  • option("inferSchema", "true") 字段自动类型推断

指定 schema

object WordCount {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local[*]").appName("myApp").getOrCreate()

    val schema = StructType(
      List(
        StructField("name", StringType),
        StructField("age", IntegerType)
      )
    )

    val df = spark.read.schema(schema).csv("/tmp/data5.csv")
    df.printSchema()
    df.show()
  }
}
🔗

文章推荐