本文适用于Java / Scala程序员,他们想要决定将哪个框架用于大型应用程序的流传输部分,或者只是想知道它们之间的根本区别,以防万一。我将编写Scala,但是我要描述的所有框架也都具有Java API。 您还可以在Rock the JVM博客上找到此文章,或者在YouTube上或以下视频中找到视频形式:
我将讨论Akka Streams,Kafka Streams和Spark Streaming的主要优点和缺点 ,并让您了解如何在一个非常简单的字数统计应用程序中使用它们,这是其中之一。当人们学习任何分布式编程工具时,首先要做的基本事情。
Kafka Streams Kafka Streams是用于处理无界数据的 客户端库。那是什么意思? 客户端库表示我们编写的应用程序使用其他基础结构(本例为Kafka集群)提供的服务。因此,我们与集群进行交互以处理潜在的无穷数据流。数据被表示为键值记录,这使它们易于识别,并且将它们组织为主题,这些主题是持久事件日志,本质上是写入磁盘并复制的持久数据队列。在这种架构中,我们有生产者应用程序将记录推送到这些主题中-例如,如果您有一个在线商店,您想跟踪订单发生的所有事情-另一方面,我们有多个消费者应用程序可以读取这些主题中以各种方式和从不同时间点开始的数据。
这种结构化数据的方式允许高度分布式和可扩展的体系结构,这些体系结构也是容错的。Kafka还嵌入了一次精确的消息传递语义,这意味着,如果您将记录发送到Kafka,您将确保它到达集群并且记录一次,且没有重复。这一点特别重要,因为通常在分布式系统中很难获得这种机制。
从组织Kafka的方式来看,API允许Java或Scala应用程序与Kafka群集进行交互,而与可能同时使用它的其他应用程序无关。因此,应用程序访问同一分布式和可伸缩服务的这种独立性自然会激发大型应用程序中独立微服务的使用。
Kafka流看起来像什么
object WordCountApplication extends App { import Serdes._ val props: Properties = { val p = new Properties() p.put(StreamsConfig.APPLICATION_ID_CONFIG, "myFabulousWordCount") p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "my-kafka-broker-url:9092") p } val builder: StreamsBuilder = new StreamsBuilder val textLines: KStream[String, String] = builder.stream[String, String]("TextLinesTopic") val wordCounts: KTable[String, Long] = textLines .flatMapValues(textLine => textLine.toLowerCase.split("\\W+")) .groupBy((_, word) => word) .count()(Materialized.as("word-counts-table")) wordCounts.toStream.to("WordsWithCountsTopic") val streams: KafkaStreams = new KafkaStreams(builder.build(), props) streams.start() sys.ShutdownHookThread { streams.close(10, TimeUnit.SECONDS) } }
这就是Kafka Streams中的字数统计应用程序的外观。现在,这段代码非常繁重,无法一次全部使用,因此我将其分解。
import Serdes._
Kafka以二进制形式存储记录以提高性能,这意味着我们需要序列化和反序列化它们。我们可以通过在Scala中自动导入序列化器和反序列化器(Serdes)来做到这一点。
val props: Properties = { val p = new Properties() p.put(StreamsConfig.APPLICATION_ID_CONFIG, "myFabulousWordCount") p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "my-kafka-broker-url:9092") p }
应用程序的第一部分总是需要配置要连接的Kafka集群的详细信息。这将使用Java风格的API,我个人主要是Scala程序员讨厌它。Java人士对此可能会更满意。
val builder: StreamsBuilder = new StreamsBuilder val textLines: KStream[String, String] = builder.stream[String, String]("TextLinesTopic")
接下来,我们使用构建器模式从所需的主题中将记录作为键值对读取。
val wordCounts: KTable[String, Long] = textLines .flatMapValues(textLine => textLine.toLowerCase.split("\\W+")) .groupBy((_, word) => word) .count()(Materialized.as("word-counts-table"))
然后,我们将一些函数式编程运算符应用于流,就好像它是一个集合一样,并将该流转换为表。Kafka Streams具有表的概念,该表允许进行数据聚合和处理。Kafka具有流表对偶性,使我们可以在它们之间来回转换。
wordCounts.toStream.to("WordsWithCountsTopic")
说到转换,我们可能希望将此表转换为流并将其提供给其他一些应用程序可能会感兴趣的主题。
val streams: KafkaStreams = new KafkaStreams(builder.build(), props) streams.start() sys.ShutdownHookThread { streams.close(10, TimeUnit.SECONDS) }
最后,我们只需要启动流并设置适当的停止,否则流将是静态的并且不会执行任何操作。 Kafka流传的Strengths和Weaknesses Kafka Streams的主要优点是,Kafka集群将为您提供 高速,容错和高可伸缩性。Kafka还提供了这种 恰好一次的消息发送语义,这在分布式系统中确实很难,因为否则其他许多框架都无法提供这种保证,因此您最终可能会重复或丢失数据。同时,Kafka 鼓励使用同一消息总线进行通信的微服务的使用,因此您有能力和控制权通过Kafka设置自己的微服务间通信协议。
当然,卡夫卡并非没有缺点。作为一个主要的Scala程序员,我讨厌Kafka的 命令式Java风格的API,但是我会为Kafka否则的出色功能而痛苦。另一个缺点是,如果您想在架构中使用Kafka,则需要设置 一个单独的Kafka集群即使您不一定需要分配专用计算机,也需要进行管理。Kafka也是高度可配置的,但是您需要事先了解配置才能使其完全正常运行。最后,Kafka仅支持生产者-消费者类型的体系结构。您还可以模拟其他体系结构,但是从来没有感觉到它像是自然地支持它的,或者就像某些人喜欢说的那样,是“一流”的,因此,即使Kafka在此特定目的上是出色的,但并非如此。与其他框架一样多才多艺。 Akka流 也就是说,让我们进入Akka Streams。Akka Streams是用Scala编写的,为JVM构建的极高性能的库,它是Reactive Streams规范的规范实现。响应式宣言的宗旨是 响应性,弹性,容错性和消息驱动的语义,所有这些都是Akka Streams的核心。在这里,您可以完全控制处理无限量数据中的单个记录,并且可以100%控制任何配置的流拓扑。Akka Streams由非常成功的并发actor模型提供支持,并且流传输组件是基于异步的单个组件构建的,这些组件可以以您想要的任何方式处理数据。
Akka Streams的主要优势再次是 高可伸缩性和容错能力,但是以不同的方式,正如我们将在代码中看到的那样。Akka Streams提供了一种 极其通用且简洁的流API,该API已演变为它自己的基于Scala的DSL,您可以简单地“插入”组件并启动它们。同时,Akka Streams还提供了一个低级的GraphStage API,可让您 对特定组件的各个逻辑进行高度控制。
正如我提到的那样,Akka Streams具有很高的性能和容错能力,但它是为不同的目的而构建的。在Kafka中,您将其用作消息总线,而您的应用程序是Kafka集群的客户端API,在这里,Akka Streams是 应用程序逻辑不可或缺的一部分。您可以想象Akka Streams就像您的应用程序的循环系统一样,而Kafka只是外部组织良好的血液库。
Akka流看起来如何
val source1 = Source(List("Akka", "is", "awesome")) val source2 = Source(List("learning", "Akka", "Streams")) val sink = Sink.foreach[(String, Int)](println) val graph = GraphDSL.create() { implicit builder => import GraphDSL.Implicits._ • val wordCounter = Flow[String] .fold[Map[String, Int]](Map()) { (map, record) => map + (record -> (map.getOrElse(record, 0) + 1)) } .flatMapConcat(m => Source(m.toList)) val merge = builder.add(Merge[String](2)) val counter = builder.add(wordCounter) source1 ~> merge ~> counter ~> sink source2 ~> merge • ClosedShape } RunnableGraph.fromGraph(graph).run()
因此,让我们看一下如何使用Akka Streams构建字数统计应用程序。即使您有使用Scala的经验,此代码可能仍然过于简洁。这是Akka Streams的缺点之一。从我的经验和学生的经验来看,无论您以前的经验如何,对初学者来说通常都很难。但是不要担心,我会分解它。这是代码的主要部分。
val source1 = Source(List("Akka", "is", "awesome")) val source2 = Source(List("learning", "Akka", "Streams")) val sink = Sink.foreach[(String, Int)](println)
前三行构建原始源,这些源将异步发出元素(在我们的示例中为字符串)。
val wordCounter = Flow[String] .fold[Map[String, Int]](Map()) { (map, record) => map + (record -> (map.getOrElse(record, 0) + 1)) } .flatMapConcat(m => Source(m.toList))
有趣的是,实际上是计算字数的部分在这里,就像在简单的字符串列表上一样,我们进行折叠。看起来非常简洁,很难看,而且确实需要一些习惯,但是,如果您已经使用了Scala集合很多,那么看起来就不会 太陌生。我希望。
val merge = builder.add(Merge[String](2)) val counter = builder.add(wordCounter) source1 ~> merge ~> counter ~> sink source2 ~> merge
但是,这不是Akka Streams的有趣之处和强大之处。魔术发生在这里,我们只需将不同的流组件插入自己的逻辑即可。
类似于流的可视图形如下所示。
现在看一下代码。
source1 ~> merge ~> counter ~> sink source2 ~> merge
因此请注意,我们直接在代码中对流拓扑进行了非常相似的表示!你们中的有些人可能会“ meh”,但是很难夸大仅用三行代码(完全异步,高速且容错)来构造任意流式布局是多么容易。 Akka流长处和短处 因此,让我讨论Akka Streams的大起大落。
由于Akka Streams是Reactive Streams的实现,因此自然遵循Reactive Manifesto的所有宗旨,即 弹性,响应能力,容错能力和消息驱动行为。因此,它提供了极高的速度和高可伸缩性。 该API非常强大,并且是我见过的最接近完全可视化表示形式的API。同时,Akka Streams提供了一个低级的GraphStage API,它为您提供世界上用于自定义流逻辑的所有控件 ,例如批处理数据或手动中断或重定向流–确实,可能性无穷无尽。Akka Streams最好的部分之一是 它可以通过 Alpakka Kafka连接器。Akka Streams是作为应用程序的开发库而构建的,因此,您不像使用Kafka那样作为客户端API进行编程,但是您将像其他任何库一样使用它来构建分布式应用程序。
Akka Streams的不利之处在于,Akka Streams 很难学习并且难以为继。我知道有人在开玩笑说Lightbend的人很难做到这一点,以至于只有他们能理解。同时,Akka Streams就像流的C ++一样,即使您拥有世界上的所有力量(也许尤其是因为您拥有世界上的所有力量), 您也可以轻松地用脚射击自己。同时,尽管如果您将整个套件与clustering&co一起使用,则Akka(在所有方面都可以)很好地扩展,但是将Akka Streams集成以实现可伸缩性通常是一个挑战。Akka Streams是 您的应用程序不可或缺的一部分 这既是福也是祸,因为像任何“建筑”图书馆一样,您需要采用某种思维方式。
Spark Streaming 现在让我们继续进行Spark Streaming,它是流行的Spark分布式计算引擎的自然流扩展。Spark流式传输的目的是大规模处理无休止的大数据。您可以在两个API级别之间进行选择 :一个是具有离散化流(DStreams)的低级高控制API,另一个是非常熟悉的DataFrame API,也称为结构化流,它为常规的“静态”大数据提供了相同的API 。Spark从一开始就具有 自然的可伸缩性和容错性,并在两种输出模式下起作用:微批量,在这种模式下,Spark将在每个时间间隔上累加到目前为止收集到的所有数据;而连续模式则提供较低的延迟(目前尚处于实验阶段)。
Spark的最大优势是能够处理 海量数据,熟悉的 DataFrame或SQL API以及丰富的 Spark UI,可让您实时监视和跟踪作业的性能。
值得注意的是,Spark将需要 一个专用的计算集群来运行,这通常在生产中成本很高。同时,Spark极其可 配置,如果您知道如何正确调整Spark,您将获得一些巨大的性能改进。
How Spark Streaming Looks Like
val spark = SparkSession.builder() .appName("Word count") .master("local[*]") .getOrCreate() •val streamingDF = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "your-kafka-broker:9092") .option("subscribe", "myTopic") .load() •val wordCount = streamingDF .selectExpr("cast(value as string) as word") .groupBy("word") .count() •wordCount.writeStream .format("console") .outputMode("append") .start() .awaitTermination()
因此,让我们看一下标准的单词计数应用程序,在这里我们将使用高级的结构化流API。清洁且可分离。让我们分解一下
val spark = SparkSession.builder() .appName("Word count") .master("local[*]") .getOrCreate()
您唯一需要的样板就是启动Spark会话。在那之后...
val streamingDF = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "your-kafka-broker:9092") .option("subscribe", "myTopic") .load()
...您可以通过指定要读取的数据源来读取数据,Spark Streaming自然支持开箱即用。
val wordCount = streamingDF .selectExpr("cast(value as string) as word") .groupBy("word") .count()
实际的逻辑也很简单。在SQL中,这只是一个带有计数的“分组依据”,我们在这里这样做。由于Kafka将数据存储在二进制文件中,因此我们必须在开始时添加强制类型转换。
wordCount.writeStream .format("console") .outputMode("append") .start() .awaitTermination()
最后,您要做的就是将流指向输出接收器(在这里我们也可以使用Kafka),然后开始流查询。
Spark Streaming的优点和缺点 因此,让我们讨论一下Spark Streaming的风风雨雨。
从第一天开始, Spark Streaming就为大数据而构建 。它提供了 作为原始Spark核心的容错功能,以及 两个用于处理数据的API:一个具有DStreams的低层高级控件,以及一个具有形式熟悉的结构的免提功能。 DataFrames和SQL。Spark的一个不错的功能是能够根据事件时间和水印处理后期数据 ,这在现实生活中非常强大。如果使用得当,Spark可以高度配置,并具有巨大的性能优势,并且可以 连接到Kafka通过其内置的连接器作为数据输入或数据输出。尤其重要的是,Spark还可以从庞大的社区中受益,并提供出色的文档和帮助。另外一个好处是,Spark还可以在本地快速启动,以进行较小的数据处理。
与其他框架一样,Spark并不是完美的。DataFrame和SQL API既方便又熟悉,但是作为一个函数式程序员,我有点胃疼,因为 在编译时会丢失某些类型的安全性。当然,您拥有数据集,但是如果您输入lambda,则会失去一些性能。Spark Streaming非常适合大数据和微批处理,但是 除非您使用连续模式,否则这种模式在低延迟竞赛中并不出色,正如我们所说的那样,这种模式是无法保证的。最后,Spark 将需要一个专用的集群来运行,因此,根据您的需求,如果您遵循此路线,可能会被迫在计算上花费更多的钱。
什么时候使用
现在的最后一块:您什么时候应该使用什么?自然地,每个框架都是以一定的意图构建的,我们将它们放在这里。
当您要将Akka Streams烘焙到应用程序中时 ,Akka Streams最适合高性能系统 。它具有非常强大的API,但是除非您知道自己在做什么,否则可能会很容易(不管是否偶然)拍摄自己。
另一方面,Kafka可以最好地用作应用程序的外部高性能消息总线,因此,如果您希望 微服务在公共事件存储中进行读写操作,那么使用Kafka可能是最好的选择。但是,它的Java风格的API很麻烦,但据我了解,这可能与代码的整洁度和趣味性有关。
最后,毫无疑问,Spark Streaming最适合用于 大数据计算。Spark一直擅长于此,在这里我们也删除了数据界限。但是,在进行此记录时,Spark Streaming不利于实际的应用程序逻辑和低延迟,因此您可能希望将其用作 数据聚合器以从数据中收集见解。
作为奖励, 每个人都可以与Kafka一起工作,因此,如果您想将Kafka添加到您的聚会中,则无论选择哪种工具都可以做到这一点。
原文链接:http://codingdict.com