Spark Streaming 与 Structured Streaming:流处理引擎深度对比

对比Spark Streaming和Structured Streaming,从流模型、API、延迟和Kafka集成等方面分析,Structured Streaming在实时性和易用性方面更具优势。

原文标题:大数据系列之Spark Streaming和Structured Streaming对比

原文作者:牧羊人的方向

冷月清谈:

本文详细比较了 Spark Streaming 和 Structured Streaming 这两种 Spark 流处理引擎。Spark Streaming 采用微批次架构,将流式计算视为一系列小规模批处理,而 Structured Streaming 则将数据流视为连续追加的表。

API 使用方面,Spark Streaming 提供了基本的和高级的输入源,输出则依赖 foreachRDD 模式。其支持 Transform、Window 和 Join 等 RDD 算子操作。Structured Streaming 则复用了 Spark SQL 的批处理 API,输入源包括文件、Kafka、Socket 等,输出通过 DataStreamWriter 指定。它同样支持聚合、Window 和 Join 操作,且代码更为简洁。

时延方面,Structured Streaming 的 continuous 模式能够实时处理数据,延迟在毫秒级别,优于 Spark Streaming 的微批次模式。

两种引擎都支持与 Kafka 集成。Spark Streaming 使用 KafkaUtils 读取 Kafka 数据,并通过 KafkaProducer 发送数据。Structured Streaming 则直接通过 format("kafka") 读取和写入 Kafka 数据,更为便捷。

总体而言,Structured Streaming 在 API 简洁性、时延性能和聚合计算方面都优于 Spark Streaming,更适合对实时性要求较高的场景。

怜星夜思:

1、Structured Streaming 的 continuous 模式真的能做到毫秒级延迟吗?在实际应用中,哪些因素会影响其延迟?
2、在选择 Spark Streaming 或 Structured Streaming 时,应该考虑哪些因素?实际应用场景中,它们分别更适合哪些类型的任务?
3、除了 Spark Streaming 和 Structured Streaming,还有哪些常用的流处理引擎?它们之间有什么优缺点?

原文内容

本文对Spark Streaming和Structured Streaming在流模型、API使用、时延性能以及和Kafka对接等方面进行了对比,如下表所示:

1、流模型
  • Spark Streaming

Spark Streaming使用“微批次”的架构,把流式计算当作一系列连续的小规模批处理来对待。Spark Streaming从各种输入源中读取数据,并把数据分组为小的批次,新的批次按均匀的时间间隔创建出来。在每个时间区间开始的时候,一个新的批次就创建出来,在该区间内收到的数据都会被添加到这个批次中。在时间区间结束时,批次停止增长。时间区间的大小是由批次间隔这个参数决定的,批次间隔一般设在500毫秒到几秒之间,由应用开发者配置。每个输入批次都形成一个RDD,以Spark作业的方式处理并生成其他的RDD。处理的结果可以以批处理的方式传给外部系统。

  • Structured Streaming

Structured Streaming中的关键思想是将实时数据流当作可以连续追加的表,这样可以将流计算以静态表的方式进行处理。

在Structured Streaming中将input data stream作为“input table”,对input table查询会生成一个“result table”。在每次查询时候,新的记录会追加到input table中,同时也会更新到Result Table中,当result table更新的时候,这些更新的数据需要写到外部存储中。

Output有不同的模式:

  1. Complete mode:整个更新的result table写到外部存储

  2. Append mode:上一次写入以来新的追加数据写到外部存储

  3. Update mode:上一次写入以来新的更新数据写到外部存储

2、API使用
2.1 Spark Streaming API使用

1)Input Streaming

Spark Streaming有两种内置的Streaming源:

  • Basic source:StreamingContext API可用的源,比如文件系统、socket连接

  • Advanced source:比如kafka、flume等

2)Output输出

使用foreachRDD设计模式,通过维护一个静态的对象连接池,在多个RDDs/batches之间重用连接,降低消耗:

def sendPartition(iter):
# ConnectionPool is a static, lazily initialized pool of connections
connection = ConnectionPool.getConnection()
for record in iter:
connection.send(record)
# return to the pool for future reuse
ConnectionPool.returnConnection(connection)

dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))

注:连接池中的连接按需创建,并且一段时间不使用时会超时,实现了将数据发送到外部系统的最有效方法。

3)RDD算子操作

  • Transform操作

允许任意的RDD-RDD函数应用于DStream,例如,可以通过将输入数据流与预先计算的垃圾信息(也可以使用Spark生成)进行实时数据清理,然后基于此进行过滤。

spamInfoRDD = sc.pickleFile(...)  # RDD containing spam information

# join data stream with spam information to do data cleaning
cleanedDStream = wordCounts.transform(lambda rdd: rdd.join(spamInfoRDD).filter(...))
  • Window操作

Spark Streaming支持窗口计算,这样允许在滑动创建进行转换,如下图所示:

当窗口滑过源DStream,源RDDs被组合产生窗口DStream的RDDs。Windows需要2个参数:窗口长度和滑动间隔。

# Reduce last 30 seconds of data, every 10 seconds
windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10)
  • Join操作

stream1 = ...
stream2 = ...
joinedStream = stream1.join(stream2)
2.2 Structured Streaming API使用

Structured Streaming代码编写完全复用Spark SQL的 batch API,也就是对一个或者多个 stream或者table进行query。Query的结果是result table,可以以多种不同的模式(append, update, complete)输出到外部存储中。

1)Input输入

Structured Streaming输入源有以下:

  1. File source:以文件作为输入流数据,支持txt、CSV、JSON等格式

  2. Kafka Source:从Kafka中读取数据

  3. Socket source:从socket连接中读取UTF8 text数据

  4. Rate Source

# Read all the csv files written atomically in a directory
userSchema = StructType().add("name", "string").add("age", "integer")
csvDF = spark \
.readStream \
.option("sep", ";") \
.schema(userSchema) \
.csv("/path/to/directory") # Equivalent to format("csv").load("/path/to/directory")

2)Output输出

Streaming计算完成后,需要使用DataStreamWriter 写入到外部存储中,指定以下参数:

  • Output sink的信息如Data format、location等

  • Output mode,append、complete还是update

  • Query name:query的名称

  • Trigger interval:指定trigger间隔,如果没有指定,会在之前数据处理完成后立即触发

  • Checkpoint location:对于容错要求高的,指定checkpoint写入的位置

writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "updates")
.start()

3)Streaming dataframe操作

  • 聚合操作

df = ...  # streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: DateType }

# Select the devices which have signal more than 10
df.select("device").where("signal > 10")

# Running count of the number of updates for each device type
df.groupBy("deviceType").count()
  • Window操作

如下图所示,统计10分钟窗口、5分钟更新的流数据

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
window(words.timestamp, "10 minutes", "5 minutes"),
words.word
).count()
  • JOIN操作

Structured Streaming支持Streaming和Data Frame之间的join操作

staticDf = spark.read. ...
streamingDf = spark.readStream. ...
streamingDf.join(staticDf, "type") # inner equi-join with a static DF
streamingDf.join(staticDf, "type", "right_join") # right outer join with a static DF
3、时延对比

Spark Streaming是基于micro-batch模式,在处理实时流数据的时候存在process-time和event-time窗口时延。

Structured Streaming的continuous mode是实时处理的,只要一有数据就会进行处理,时延基本在毫秒级别。

4、对接Kafka数据
4.1 Spark Streaming对Kafka支持

参考“” 

1)读取Kafka数据

from pyspark.streaming.kafka import KafkaUtils
directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})

2)发送数据到Kafka

Spark Streaming写入数据到Kafka是通过调用KafkaProducer模块实现的

from kafka import KafkaProducer

to_kafka = KafkaProducer(bootstrap_servers=broker_list)
to_kafka.send(topic_name,send_msg,encode(utf8))
to_kafka.flush()
4.2 Structured Streaming对Kafka支持

1)从Kafka中读取数据,并将二进制流数据转为字符串

# Construct a streaming DataFrame that reads from topic1
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "192.168.112.101:2181 ") \
.option("subscribe", "kafka_topic") \
.option("startingOffsets", "earliest") \
.load()

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

2)使用Spark作为Producer发送Kafka数据

# Write key-value data from a DataFrame to a Kafka topic specified in an option
query = df \
.selectExpr("CAST(userId AS STRING) AS key", "to_json(struct(*)) AS value") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers","192.168.112.101:2181 ") \
.option("topic", " kafka_topic ") \
.option("checkpointLocation", "/path/to/HDFS/dir") \
.start()
5、与其它流引擎比较

Structured Streaming和其它流计算引擎比较,在时延性能、API简洁、聚类计算等方面都具有一定的优势。

参考资料

  • http://spark.apache.org/docs/latest/streaming-programming-guide.html

  • http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

  • https://www.slidestalk.com/u2909/FromSparkStreamingtoStructuredStreaming58639

  • https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html

  • https://databricks.com/blog/2016/07/28/structured-streaming-in-apache-spark.html


实际应用中的延迟取决于多个因素,例如数据量、集群资源、sink 的类型等等。如果使用像 Kafka 这样的消息队列作为 sink,延迟通常会比较低,但如果写入数据库,延迟可能会更高一些。优化网络配置和集群资源分配也很重要。

如果你的应用需要处理复杂的事件模式,比如 CEP(复杂事件处理),那么 Flink 是一个不错的选择。如果你的应用需要处理大量的状态数据,Flink 也比 Spark Streaming 更合适。如果你的应用比较简单,只需要做一些简单的 ETL,那么 Kafka Streams 就足够了。

毫秒级延迟指的是处理时间,不包括数据传输、序列化/反序列化这些时间。如果你的数据源本身就慢,或者 sink 写入速度慢,整体延迟还是会比较高。可以看看官方文档,里面有更详细的解释。

理论上 continuous 模式的延迟非常低,但实际应用中,网络抖动、数据倾斜、下游系统处理能力等都会影响最终的延迟。我之前做过一个项目,刚开始延迟确实很低,但后来数据量上来了,下游数据库扛不住,延迟就上去了。

如果你的团队对 SQL 比较熟悉,那么 Structured Streaming 会更容易上手。它使用 SQL-like 的 API,可以更容易地表达复杂的逻辑。另外,Structured Streaming 的容错机制也更完善,可以保证 exactly-once 语义。

Flink 也是一个很火的流处理引擎,它在处理复杂事件、状态管理方面做得比较好。Kafka Streams 也很不错,它跟 Kafka 集成得很好,适合构建轻量级的流处理应用。另外还有 Storm、Samza 等等。选择哪个取决于你的具体需求和技术栈。

Flink 的优势在于低延迟、高吞吐量、exactly-once 语义。Kafka Streams 的优势在于简单易用、与 Kafka 集成紧密。Storm 的优势在于成熟稳定,有很多成功的案例。选择哪个需要根据你的应用场景、团队技能、运维成本等综合考虑。

Structured Streaming 更适合处理结构化数据,比如从数据库或 Kafka 读取数据,然后进行一些聚合、JOIN 操作。Spark Streaming 更适合处理非结构化数据,比如日志分析、点击流分析等。当然,Structured Streaming 也能处理非结构化数据,但需要先转换成结构化数据。

选择哪个主要看你的需求。如果对实时性要求非常高,那就选 Structured Streaming。如果你的应用场景比较复杂,需要用到一些底层的 API,或者已经有 Spark Streaming 的代码,那就继续用 Spark Streaming 也行。比如,我之前做实时广告推荐,就用的 Spark Streaming,因为它可以灵活控制一些底层细节。