Spark分布式分析计算引擎解析

Spark分布式分析引擎解析:架构、核心概念RDD、运行模式及YARN执行流程介绍。

原文标题:分布式分析计算引擎Spark解析

原文作者:牧羊人的方向

冷月清谈:

Spark是一个功能强大的大数据处理引擎,能够处理批处理、流式查询和交互式查询等多种计算任务。Spark基于内存计算,比传统的MapReduce或Hive效率更高。

Spark的核心组件包括:
1. Spark Core:提供底层服务,定义了弹性分布式数据集(RDDs)以及操作RDDs的API。
2. Spark SQL:支持使用SQL处理结构化数据,提供了DataFrame和DataSet两种数据抽象。
3. Spark Streaming:实现实时流处理,将流式计算分解成一系列短小的批处理作业。
4. MLib:提供常用的机器学习算法库,支持流水线学习模式。
5. GraphX:提供图和图并行计算的API。

Spark的架构包括Cluster Manager、Worker、Driver和Executor等组件。Cluster Manager负责资源管理和调度;Worker执行任务提交;Executor是真正的计算单元;Driver创建SparkContext,负责执行应用程序;SparkContext是Spark功能的主要入口点。

RDD是Spark中数据处理的最基本抽象,是一个只读的分区记录集合。RDD支持Transformation(转换)和Action(执行)两种操作。

Spark的运行模式包括local、standalone、yarn、mesos和k8s等。在YARN上,Spark有yarn-client和yarn-cluster两种模式。

怜星夜思:

1、Spark为什么比MapReduce和Hive的效率高?除了内存计算还有什么其他原因吗?
2、RDD的惰性计算机制有什么优缺点?在实际应用中应该如何权衡?
3、Spark on YARN的两种模式,yarn-client和yarn-cluster,分别适用于哪些场景?

原文内容

Spark作为一种通用的大数据分析引擎,集成了批处理、流式查询以及交互式查询于一体,其技术体系相当复杂,本文简要介绍了Spark中的基本架构和基本概念RDD和执行流程,以及Spark on YARN两种模式。

1、Spark基本介绍

Apache Spark是一种通用可扩展的大数据分析引擎,集批处理、实时流处理、交互式查询与流计算于一体,避免了多种运算场景下需要部署不同集群带来的资源浪费。另外,Spark是基于内存的计算,相较于MapReduce或者Hive,处理效率上要提升数倍。

1.1 Spark核心组件
Spark是一种通用的大数据分析引擎,包含了大数据领域常见的各种计算框架:使用Spark Core用于离线计算;Spark SQL用于交互式查询;基于Spark Streaming实现实时流式计算;Spark MILlib用于机器学习;Spark GraphX用于图计算。

1)Spark Core

包含Spark的基本功能,包含任务调度,内存管理,容错机制等,为其他组件提供底层的服务。在内部定义了RDDs(弹性分布式数据集),提供了很多APIs来创建和操作这些RDDs。

2)Spark SQL
支持像其它数据库一样,使用SQL的方式处理结构化的数据源,包括Hive、HBase、MySQL、JSON等。Spark SQL提供了两种抽象的数据集合:DataFrame和Datasets
  • DataFrame是spark Sql对结构化数据的抽象,可以简单的理解为spark中的表

  • DataSet是数据的分布式集合

3)Spark Streaming
基于Spark Core实现了可扩展、高吞吐和容错的实时流处理,支持的数据源有Kafka、Flume、HDFS和TCP socket等,处理后的结果存储到HDFS或数据库中。Spark Streaming是将流式计算分解成一系列短小的批处理作业,每个批处理作业处理一段数据,每一段数据转换成Spark中的RDD进行转换操作。

4)MLib

MLIB是Spark对常用的机器学习算法的实现库,包括分类、回归、聚类、协同过滤、降维等算法,同时支持流水线的学习模式,即多个算法使用不同的参数以流水线的形式编排运行,得到算法的结果。

5)Graphx

Spark提供的关于图和图并行计算的API,集ETL、试探性分析和迭代式图计算于一体。

1.2 Spark基本架构
Spark整体架构如图所示,包括ClusterManager、Worker、Driver和executor等。

1)Cluster Manager

集群管理器,它存在于Master进程中,主要用来对应用程序申请的资源进行管理和调度,根据其部署模式的不同,可以分为local,standalone,yarn,mesos等模式。

2)worker
Spark的工作节点,用于执行任务的提交,主要工作职责有以下:
  • worker节点通过注册机向cluster manager汇报自身的cpu,内存等信息。

  • worker节点在spark master作用下创建并启用executor,executor是真正的计算单元。

  • spark master将任务Task分配给worker节点上的executor并执行运用。

  • worker节点同步资源信息和executor状态信息给cluster manager。
在yarn 模式下运行worker节点一般指的是NodeManager节点,standalone模式下运行一般指的是slave节点。
3)executor

Executor是真正执行计算任务的组件,它是application运行在worker上的一个进程。这个进程负责Task的运行,它能够将数据保存在内存或磁盘存储中,也能够将结果数据返回给Driver。Executor宿主在worker节点上,每个Worker上存在一个或多个Executor进程,每个executor持有一个线程池,每个线程可以执行一个task。根据Executor上CPU-core的数量,其每个时间可以并行多个跟core一样数量的task,其中task任务即为具体执行的Spark程序的任务。

4)Application

Application是Spark API编程的应用程序,它包括实现Driver功能的代码和在程序中各个executor上要执行的代码,一个application由多个job组成。其中应用程序的入口为用户所定义的main方法。

5)Driver

Driver的功能是创建SparkContext,负责执行用户写的Application的main函数进程,创建SparkContext的目的是为了准备Spark应用程序的运行环境。Application通过Driver和Cluster Manager及executor进行通讯,当Executor部分运行完毕后,Driver同时负责将SparkContext关闭。Driver可以运行在application节点上,也可以由application提交给Cluster Manager,再由Cluster Manager安排worker节点运行。Driver节点也负责提交Job,并将Job转化为Task,在各个Executor进程间协调Task的调度。

6)sparkContext 

sparkContext是整个spark应用程序最关键的一个对象,是Spark所有功能的主要入口点。核心作用是初始化spark应用程序所需要的组件,同时还负责向master程序进行注册等。

用户程序从最开始提交到最终的计算执行,需要经历以下几个阶段:

  1. 用户程序创建SparkContext时,新创建的SparkContext实例会连接到ClusterManager。ClusterManager根据用户提交时设置的CPU和内存信息为本次的提交分配计算资源,启动Executor进程

  2. Driver会根据用户程序划分为不同的执行阶段,每个执行阶段由一组完全相同的Task组成,这些Task分别作用于待处理数据的不同分区。SparkContext根据RDD的依赖关系构建DAG图,DAG图提交给DAGScheduler解析成Stage,然后把一个个TaskSet提交给底层调度器TaskScheduler处理。在阶段划分完成和Task创建后,Driver会向Executor发送Task

  3. Executor在接收到Task后,会下载Task运行时候的依赖,在准备好Task的执行环境后,开始执行Task并将Task的运行状态汇报给Driver

  4. Driver会根据收到的Task状态来处理不同的状态更新。Task分为两种:一种是shuffle map task,实现数据的重新洗牌,洗牌的结果保存到Executor所在节点的文件系统中;另外一种是result Task,负责生成结果数据。

  5. Driver会不断的调用Task,重复2~4的过程,将Task发送到Executor执行,在所有的Task都正常执行或超过执行次数的限制仍然没有执行成功时停止。

2、Spark基本概念
2.1 RDD弹性分布式数据集

RDD是弹性分布式数据集,是Spark中数据处理的最基本抽象,可以被并行操作的元素集合。RDD在本质上是一个只读的分区记录集合,每个RDD可分成多个分区,每个分区就是一个数据集片段,并且一个RDD的不同分区可以被保存到集群中不同的节点上,从而可以在集群的不同节点上进行并行计算。

RDD提供了一种高端受限的共享内存模型,即RDD是只读的记录分区的集合,不能直接修改,只能基于稳定的物理存储中的数据集创建RDD,或者通过在其他RDD上执行确定的转换操作(如map,join和group by)而创建得到新的RDD。Spark中可以通过一系列的算子对RDD进行操作,主要分为Transformation和Action两种操作。
  • Transformation(转换):是对已有的RDD进行换行生成新的RDD,对于转换过程采用惰性计算机制,不会立即计算出结果。常用的方法有map,filter,flatmap等。

  • Action(执行):对已有的RDD对数据执行计算产生结果,并将结果返回Driver或者写入到外部存储中。常用的方法有reduce,collect,saveAsTextFile等。

RDD具有自动容错、位置感知性调度和可伸缩性的特点,每个RDD主要有以下属性:

  1. 分片Partition:数据集的基本组成单位,对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定分片的个数,默认为分配的CPU core数目

  2. 计算每个分区的函数:Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute函数,compute函数会对迭代器进行汇总,不需要保存每次的计算结果

  3. RDD之间的依赖关系:RDD每次转换都会生成新的RDD,所以RDD之间会形成类似流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不需要对RDD的所有分区进行重新计算

  4. RDD的分片函数Partitioner:Spark中实现两种类型的分片函数,基于哈希函数HashPartitioner和基于分区函数RangePartitioner。Partitioner函数决定了RDD本身的分片数量,也决定了RDD Shuffle输出时的分片数量

  5. 存储每个分片的优先位置的列表:对于HDFS文件,这个列表保存了每个partition所在块的位置。Spark在进行任务调度的时候,会尽可能的将计算任务分配到其所要处理的数据块的存储位置。

2.2 DAG有向无环图
Spark会根据用户提交的逻辑中的RDD操作生成RDD之间的依赖关系,同时这个计算链生成了逻辑上的DAG。在Spark中DAG主要分为DAGSchedulerTaskScheduler
  • DAGScheduler是面向stage的高层级的调度器,DAG Scheduler把DAG拆分为多个Task,每组Task都是一个stage,解析时是以shuffle为边界进行反向构建的,每当遇见一个shuffle,spark就会产生一个新的stage,接着以TaskSet的形式提交给底层的调度器(task scheduler),每个stage封装成一个TaskSet。DAG Scheduler需要记录RDD被存入磁盘物化等动作,同时会需要Task寻找最优等调度逻辑,以及监控因shuffle跨节点输出导致的失败。

  • TaskScheduler负责每一个具体任务的执行,包括任务集的调度管理、状态结果跟踪、物理资源调度管理、任务执行和获取结果。TaskScheduler维护所有TaskSet,当Executor向Driver发生心跳时,TaskScheduler会根据资源剩余情况分配相应的Task。

3、Spark运行模式及运行流程
3.1 Spark的运行模式

Spark的运行模式主要有以下几种:

运行模式 运行类型 说明
local 本地模式 常用于本地开发测试,分为local单线程和local-cluster多线程模式
standalone 集群模式 独立模式,在spark自己的资源调度管理框架上运行,该框架采用master/salve结构
yarn 集群模式 在yarn资源管理器框架上运行,由yarn负责资源管理,spark负责任务调度和计算
mesos 集群模式 在mesos资源管理器框架上运行,由mesos负责资源管理,spark负责任务调度和计算
k8s 集群模式 在k8s上运行
3.2 Spark程序在YARN上执行流程

Spark on YARN分为两种模式yarn-client模式和yarn-cluster模式,一般采用的是yarn-cluster模式。yarn-cluster和yarn-client的区别在于yarn appMaster,yarn-cluster中ApplicationMaster不仅负责申请资源,并负责监控Task的运行状况,因此可以关掉client;yarn-client中ApplicationMaster仅负责申请资源,由client中的driver来监控调度Task的运行,因此不能关掉client。

3.2.1 YARN-client模式

  1. ResourceManager接到请求后在集群中选择一个NodeManager分配Container资源为AppMaster作准备
  2. 在Container中启动ApplicationMaster进程;driver进程运行在client中,并初始化sparkContext;

  3. sparkContext初始化完后与ApplicationMaster通讯,通过ApplicationMaster向ResourceManager申请Container,ApplicationMaster通知NodeManager在获得的Container中启动excutor进程;1.sparkContext分配Task给excutor,excutor发送运行状态给driver。

3.2.2 YARN-cluster模式

  1. client 向yarn提交应用程序,包含ApplicationMaster程序、启动ApplicationMaster的命令、需要在Executor中运行的程序等。
  2. ApplicationMaster程序启动ApplicationMaster的命令、需要在Executor中运行的程序等。

  3. ApplicationMaster向ResourceManager注册申请Container资源,这样用户可以直接通过ResourceManage查看应用程序的运行状态。

  4. ApplicationMaster申请到资源(也就是Container)后,便与对应的NodeManager通信,启动excutor进程。

  5. Task向ApplicationMaster汇报运行的状态和进度,以让ApplicationMaster随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务。

  6. 应用程序运行完成后,ApplicationMaster向ResourceManager申请注销并关闭自己。

4、总结

Spark技术体系相当复杂,本文简要介绍了Spark中的基本架构和基本概念RDD和执行流程,以及Spark on YARN两种模式。Spark有关的开发在之前的大数据系列中有所涉及,这里不再赘述。

参考资料:

  1. 《Spark技术内部:深入解析Spark内核架构设计与技术原理》,张安站著

  2. https://spark.apache.org/docs/latest/index.html

  3. https://blog.csdn.net/weixin_45366499/article/details/110010589

  4. https://blog.csdn.net/zxc123e/article/details/79912343

  5. https://blog.csdn.net/crazybean_lwb/article/details/106316513


惰性计算就像一个精明的厨师,他会先把所有菜谱步骤都记下来,然后统筹安排,最后才开始做菜,这样可以节省时间和资源。但是如果哪个步骤错了,可能要等所有菜都做完了才能发现。

yarn-client模式适用于交互式应用和调试,因为Driver运行在客户端,方便查看日志和监控运行状态。yarn-cluster模式适用于生产环境,因为Driver运行在集群中,更稳定可靠,即使客户端断开连接也不会影响应用程序的运行。

选择哪种模式还要考虑资源分配和网络环境。如果客户端机器资源有限,或者网络不稳定,最好选择yarn-cluster模式,避免Driver成为瓶颈。

我觉得除了内存计算,Spark的优化策略也功不可没,比如DAG调度、Stage划分、数据本地化等等,这些都对性能提升有很大帮助。就好比一个赛车,光发动机好不够,还得有优秀的底盘、轮胎和空气动力学设计。

可以这样理解,yarn-client就像你在电脑上玩游戏,可以随时暂停和查看游戏信息;yarn-cluster就像把游戏放到服务器上运行,即使你关了电脑,游戏也会继续运行。

补充一下,Spark提供了toDebugString()方法可以查看RDD的DAG,这对于理解惰性求值的过程很有帮助。另外,也可以使用persist()cache()方法将中间RDD持久化到内存或磁盘,方便调试和复用。

“Spark为什么比MapReduce和Hive效率高”这个问题问得好!内存计算确实是主要原因,它避免了频繁的磁盘I/O操作。另外,Spark的DAG执行引擎也比MapReduce的迭代式计算模型更高效,减少了中间数据的落地。

关于RDD的惰性计算,我觉得优点在于可以优化执行计划,避免不必要的计算,提高效率。缺点就是调试起来可能比较麻烦,不太容易观察中间结果。实际应用中需要根据具体场景选择,如果数据量很大,优化执行计划很重要,那就适合用惰性计算;如果数据量小,调试更重要,那就干脆不用。

从更底层的角度来看,Spark使用了更先进的序列化和反序列化技术,这在数据传输和处理过程中也能节省不少时间。就像快递公司,打包和拆包的效率越高,整体配送速度就越快。