已复制
全屏展示
复制代码

Spark架构与核心概念及快速上手


· 8 min read

一. 基本架构

首先Spark是一种计算引擎,它基于MapReduce计算模型扩展,复杂的计算任务主要在内存中进行,其速度远快于MapReduce,我们通常所说的Spark是运行 Yarn 资源管理器上的即 Spark on Yarn,从上层应用来看,每个Spark应用都有一个驱动器程序(driver)和一个执行器程序(executor)组成。

驱动器程序(driver)

当我们启动 spark-submit 后,driver 用于发起计算任务,首先就会执行 driver 程序,在 driver 上定义了程序需要用到的数据集,以及计算方法。在 spark-shell 中,spark-shell 本身就是就是 driver 程序,启动 spark-shell 后,我们会得到一个变量 SparkContext 也就是 sc,总的来说,driver就是一个程序入口,它同时也会收集 executor 计算进度等的数据上报。

$ spark-shell
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://datanode04:4049
Spark context available as 'sc' (master = yarn, app id = application_1682483202546_206006).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.0-cdh6.2.0
      /_/
         
Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_202)
Type in expressions to have them evaluated.
Type :help for more information.

scala> sc
res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@e50e308

scala> sc.master
res1: String = yarn

scala> sc.appName
res2: String = Spark shell

执行器程序(executor)

executor 是实际的任务计算程序,driver 会向 yarn 申请计算资源,同时和 executor 交互,executor 会反向注册到 driver 上。我们在启动程序时会指定 executor 的计算资源(executor数量、CPU核数、内存大小)。

在 spark on yarn 模型中,ApplicationMaster 表示 driver,YarnChild 表示了 executor。

二. 基本概念

在 driver 提交了任务以后,通过 SparkContext 获取到需要计算到的数据(rdd数据集),以及计算的方法(算子),通过 Transform 、Action 算子在集群中进行复杂的计算,其中会涉及到很多的基本概念。

Application

  • 使用submit提交的应用
  • 一个app可以出发多次action,触发一次action形成一个job
  • 一个app中可以有一到多个job
  • standalone模式下,一个Application在一个节点上只能创建一个executor(CoarseGrainedExecutorBackend),一个节点可以创建多个executor,但是每个executor只能来自于一个Application
  • yarn模式下,一个Application可以在一个节点启动多个yarn child,即多个yarn child可以来自同一个计算任务。

Job

  • Driver向Executor提交的作业
  • 触发一次Action形成一个完整的DAG
  • 一个Dag对应一个job
  • 一个job中有一个到多个stage,一个stage中有一个到多个task

Dag

  • 有向无环图,是对多个RDD转换过程和依赖关系的描述,触发Action就会形成一个完整的DAG,一个DAG就是一个JOB
  • dag是由 DAGScheduler 类切分Stage、TaskSet,然后给TaskScheduler

Stage

  • 概念:任务执行阶段,有两种类型的stage
  • 类型1:ShuffleMapStage 对应的 task 为ShuffleMapTask,读取数据源或者生成中间结果
  • 类型2:ResultStage 对应的是 ResultTask ,用于计算最终结果
  • stage执行是有先后顺序的,先执行前面的,后执行后面的
  • 一个stage对应一个taskset
  • 一个taskset中的task数量取决于stage中最后一个rdd分区的数量
  • stage划分依据:是否有shuffle

Taskset

  • 保存同一种计算逻辑的多个task集合
  • 一个stage对应一个taskset,一个taskset中的task计算逻辑都一样(不会有shuffle,比如都是map),计算的数据不一样

Task

  • spark中任务最小的执行单元
  • task是类的实例,有属性(从哪里读数据),有方法(如何计算,例如map传入的函数)。
  • task的数量决定了并行度,同时也要考虑可用的cores
  • 一个分区对应了一个task,task的数量取决于stage中最后一个分区的数量
  • 没有shuffle产生的多个算子会被合并到一个task中
  • task分为两种类型 ShuffleMapTask、ResultTask
ShuffleMapTask
  • 可以读取各种数据源
  • 可以读取shuffle write后的数据
  • 专门为shuffle做准备,会应用分区器,将数据溢写到磁盘(不是hdfs)
  • 这种task干的事叫做ShuffleWrite,下游的task会读取该数据,叫做shuffle read
ResultTask
  • 可以读取各种数据源
  • 可以读取shuffle write后的中间结果
  • 专门为了产生计算结果,最终写入hdfs
  • 它没有shuffle

Partitioner

  • 分区器是一个类,在driver端被实例化,其中包含了一个方法 getPartiton,用于判断一条数据实际的归属分区
  • 分区器有一个方法numPartitions返回分区数量
  • getPartiton 在executor端在ShuffleWrite之前被调用

dependency

  • 概念:依赖关系,指定的是父RDD和子RDD之间的依赖关系
  • 窄依赖:没有shuffle产生,多个算子会被合并到一个task中,即在一个pipeline中
  • 宽依赖:有shuffle产生,是划分stage的依据

shuffle

  • 概念:需要通过网络将数据传输到多台机器,或者本机的不同分区
  • 上游的RDD的一个分区将数据给了下游的rdd的多个分区,就是shuffle,需要注意的是,shuffle过程是下游的task到上游拉取数据,不是上游task发送给下游的task的
  • shuffle的功能将类似的数据按照指定的分区器规则,通过网络,传输到指定的机器的指定分区中。

三. 快速上手

快速WordCount程序
  • 首先使用 idea 创建maven程序,能运行一个scala项目,然后才添加 spark 依赖,编写spark项目。
  • pom.xml 示例
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>sparkhello</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <project.build.sourceEncoding>utf-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.scala-tools</groupId>
            <artifactId>maven-scala-plugin</artifactId>
            <version>2.15.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-jdk14</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-nop</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.12</version>
        </dependency>
        <dependency>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-dependency-plugin</artifactId>
            <version>3.1.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.3.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                        <configuration>
                            <includes>
                                <include>**/*.scala</include>
                                <include>**/*.java</include>
                            </includes>
                        </configuration>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>
  • Spark主程序
package com.yuchaoshui.note

import org.apache.spark.{SparkConf, SparkContext}

object SparkMain {
  def main(args: Array[String]) = {
    val conf = new SparkConf().setAppName("wordCount").setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.textFile("/etc/profile")                  // SparkContext读入纯文本。
      .flatMap(line => line.split(" "))          // 按照空格拆分成词。
      .map(word => (word, 1))                    // 将每个词映射成(word,1),这样就形成了key vale键值对。
      .reduceByKey((pre, after) => pre + after)  // reduceByKey的作用域是key-value类型的键值对,只对每个key的value进行处理。
      .sortBy(_._2, ascending=false)             // 升序排序 _._表示2元组tuple对象,后面的数字2表示取第几个数作为排序依据。
      .foreach(println)                          // 打印结果
    println("done!")
  }
}

四. 连接Spark方式

写spark程序时,有两种方式可以连接到spark集群:SparkContext、SparkSession

// 方法1:使用 SparkContext
val conf = new SparkConf().setAppName("MyApp").setMaster("local")
val sc = new SparkContext(conf)
val input = sc.textFile("file:///etc/profile")
input.foreach(println)


// 方法2:使用 SparkSession
val spark = SparkSession.builder().appName("MyApp").getOrCreate()
val profile = spark.read.format("text").load("/user/work/yzy/debug/xxx.log")
profile.show()

文章推荐