Spark架构与核心概念及快速上手
一. 基本架构
首先Spark是一种计算引擎,它基于MapReduce计算模型扩展,复杂的计算任务主要在内存中进行,其速度远快于MapReduce,我们通常所说的Spark是运行 Yarn 资源管理器上的即 Spark on Yarn,从上层应用来看,每个Spark应用都有一个驱动器程序(driver)和一个执行器程序(executor)组成。
驱动器程序(driver)
当我们启动 spark-submit 后,driver 用于发起计算任务,首先就会执行 driver 程序,在 driver 上定义了程序需要用到的数据集,以及计算方法。在 spark-shell 中,spark-shell 本身就是就是 driver 程序,启动 spark-shell 后,我们会得到一个变量 SparkContext 也就是 sc,总的来说,driver就是一个程序入口,它同时也会收集 executor 计算进度等的数据上报。
$ spark-shell
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://datanode04:4049
Spark context available as 'sc' (master = yarn, app id = application_1682483202546_206006).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.0-cdh6.2.0
/_/
Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_202)
Type in expressions to have them evaluated.
Type :help for more information.
scala> sc
res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@e50e308
scala> sc.master
res1: String = yarn
scala> sc.appName
res2: String = Spark shell
执行器程序(executor)
executor 是实际的任务计算程序,driver 会向 yarn 申请计算资源,同时和 executor 交互,executor 会反向注册到 driver 上。我们在启动程序时会指定 executor 的计算资源(executor数量、CPU核数、内存大小)。
在 spark on yarn 模型中,ApplicationMaster 表示 driver,YarnChild 表示了 executor。
二. 基本概念
在 driver 提交了任务以后,通过 SparkContext 获取到需要计算到的数据(rdd数据集),以及计算的方法(算子),通过 Transform 、Action 算子在集群中进行复杂的计算,其中会涉及到很多的基本概念。
Application
- 使用submit提交的应用
- 一个app可以出发多次action,触发一次action形成一个job
- 一个app中可以有一到多个job
- standalone模式下,一个Application在一个节点上只能创建一个executor(CoarseGrainedExecutorBackend),一个节点可以创建多个executor,但是每个executor只能来自于一个Application
- yarn模式下,一个Application可以在一个节点启动多个yarn child,即多个yarn child可以来自同一个计算任务。
Job
- Driver向Executor提交的作业
- 触发一次Action形成一个完整的DAG
- 一个Dag对应一个job
- 一个job中有一个到多个stage,一个stage中有一个到多个task
Dag
- 有向无环图,是对多个RDD转换过程和依赖关系的描述,触发Action就会形成一个完整的DAG,一个DAG就是一个JOB
- dag是由 DAGScheduler 类切分Stage、TaskSet,然后给TaskScheduler
Stage
- 概念:任务执行阶段,有两种类型的stage
- 类型1:ShuffleMapStage 对应的 task 为ShuffleMapTask,读取数据源或者生成中间结果
- 类型2:ResultStage 对应的是 ResultTask ,用于计算最终结果
- stage执行是有先后顺序的,先执行前面的,后执行后面的
- 一个stage对应一个taskset
- 一个taskset中的task数量取决于stage中最后一个rdd分区的数量
- stage划分依据:是否有shuffle
Taskset
- 保存同一种计算逻辑的多个task集合
- 一个stage对应一个taskset,一个taskset中的task计算逻辑都一样(不会有shuffle,比如都是map),计算的数据不一样
Task
- spark中任务最小的执行单元
- task是类的实例,有属性(从哪里读数据),有方法(如何计算,例如map传入的函数)。
- task的数量决定了并行度,同时也要考虑可用的cores
- 一个分区对应了一个task,task的数量取决于stage中最后一个分区的数量
- 没有shuffle产生的多个算子会被合并到一个task中
- task分为两种类型 ShuffleMapTask、ResultTask
ShuffleMapTask
- 可以读取各种数据源
- 可以读取shuffle write后的数据
- 专门为shuffle做准备,会应用分区器,将数据溢写到磁盘(不是hdfs)
- 这种task干的事叫做ShuffleWrite,下游的task会读取该数据,叫做shuffle read
ResultTask
- 可以读取各种数据源
- 可以读取shuffle write后的中间结果
- 专门为了产生计算结果,最终写入hdfs
- 它没有shuffle
Partitioner
- 分区器是一个类,在driver端被实例化,其中包含了一个方法 getPartiton,用于判断一条数据实际的归属分区
- 分区器有一个方法numPartitions返回分区数量
- getPartiton 在executor端在ShuffleWrite之前被调用
dependency
- 概念:依赖关系,指定的是父RDD和子RDD之间的依赖关系
- 窄依赖:没有shuffle产生,多个算子会被合并到一个task中,即在一个pipeline中
- 宽依赖:有shuffle产生,是划分stage的依据
shuffle
- 概念:需要通过网络将数据传输到多台机器,或者本机的不同分区
- 上游的RDD的一个分区将数据给了下游的rdd的多个分区,就是shuffle,需要注意的是,shuffle过程是下游的task到上游拉取数据,不是上游task发送给下游的task的
- shuffle的功能将类似的数据按照指定的分区器规则,通过网络,传输到指定的机器的指定分区中。
三. 快速上手
快速WordCount程序
- 首先使用 idea 创建maven程序,能运行一个scala项目,然后才添加 spark 依赖,编写spark项目。
- pom.xml 示例
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>sparkhello</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<project.build.sourceEncoding>utf-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.1</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.12</version>
</dependency>
<dependency>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
<configuration>
<includes>
<include>**/*.scala</include>
<include>**/*.java</include>
</includes>
</configuration>
</execution>
<execution>
<id>scala-test-compile</id>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>
- Spark主程序
package com.yuchaoshui.note
import org.apache.spark.{SparkConf, SparkContext}
object SparkMain {
def main(args: Array[String]) = {
val conf = new SparkConf().setAppName("wordCount").setMaster("local[*]")
val sc = new SparkContext(conf)
sc.textFile("/etc/profile") // SparkContext读入纯文本。
.flatMap(line => line.split(" ")) // 按照空格拆分成词。
.map(word => (word, 1)) // 将每个词映射成(word,1),这样就形成了key vale键值对。
.reduceByKey((pre, after) => pre + after) // reduceByKey的作用域是key-value类型的键值对,只对每个key的value进行处理。
.sortBy(_._2, ascending=false) // 升序排序 _._表示2元组tuple对象,后面的数字2表示取第几个数作为排序依据。
.foreach(println) // 打印结果
println("done!")
}
}
四. 连接Spark方式
写spark程序时,有两种方式可以连接到spark集群:SparkContext、SparkSession
// 方法1:使用 SparkContext
val conf = new SparkConf().setAppName("MyApp").setMaster("local")
val sc = new SparkContext(conf)
val input = sc.textFile("file:///etc/profile")
input.foreach(println)
// 方法2:使用 SparkSession
val spark = SparkSession.builder().appName("MyApp").getOrCreate()
val profile = spark.read.format("text").load("/user/work/yzy/debug/xxx.log")
profile.show()