Skip to content

Latest commit

 

History

History
1076 lines (623 loc) · 113 KB

ch11.md

File metadata and controls

1076 lines (623 loc) · 113 KB

11. 流处理

有效的复杂系统总是从简单的系统演化而来。 反之亦然:从零设计的复杂系统没一个能有效工作的。

——约翰·加尔,Systemantics(1975)


[TOC]

在第10章中,我们讨论了批处理技术,它将一组文件读取为输入并生成一组新的输出文件。输出是派生数据的一种形式;也就是说,如果需要,可以通过再次运行批处理过程来重新创建数据集。我们看到了如何使用这个简单而强大的想法来创建搜索索引,推荐系统,分析等等。

然而,在第10章中仍然有一个大的假设:即输入是有界的,即已知和有限的大小,所以批处理知道它何时完成了它的输入。例如,MapReduce中心的排序操作必须读取其全部输入,然后才能开始生成输出:可能发生最后一个输入记录是具有最低键的输入记录,因此必须是第一个输出记录,所以提前开始输出不是一种选择。

实际上,很多数据是无限的,因为它随着时间的推移逐渐到达:你的用户昨天和今天产生了数据,明天他们将继续产生更多的数据。除非您停业,否则这个过程永远都不会结束,所以数据集从来就不会以任何有意义的方式“完成”[1]。因此,批处理程序必须将数据人为地分成固定时间段的数据块,例如,在每天结束时处理一天的数据,或者在每小时结束时处理一小时的数据。

日常批处理过程中的问题是,输入的更改只会在一天之后的输出中反映出来,这对于许多急躁的用户来说太慢了。为了减少延迟,我们可以更频繁地运行处理 - 比如说,在每秒钟的末尾,甚至连续地处理秒数的数据,完全放弃固定的时间片,并简单地处理每一个事件。这是流处理背后的想法。

一般来说,“流”是指随着时间的推移逐渐可用的数据。这个概念出现在很多地方:Unix的stdin和stdout,编程语言(lazy lists)[2],文件系统API(如Java的FileInputStream),TCP连接,通过互联网传送音频和视频等等。 在本章中,我们将把事件流视为一个数据管理机制:我们在上一章中看到的批量数据的无界的,递增处理的对应物。我们将首先讨论流如何被表示,存储和通过网络传输。在第451页的“数据库和流”中,我们将调查流和数据库之间的关系。最后,在第464页的“Processing Streams”中,我们将探索连续处理这些流的方法和工具,以及它们可以用来构建应用程序的方法。

传递事件流

在批处理领域,作业的输入和输出是文件(也许在分布式文件系统上)。什么是类似的流媒体?

当输入是一个文件(一个字节序列)时,第一个处理步骤通常是将其解析为一系列记录。在流处理的上下文中,记录通常被称为事件,但它本质上是一样的:一个小的,自包含的,不可变的对象,包含某个时间点发生的事情的细节。一个事件通常包含一个时间戳,指示何时根据时钟来发生(参见第288页上的“单调对时钟”)。

例如,发生的事情可能是用户采取的行动,例如查看页面或进行购买。它也可能来源于机器,例如来自温度传感器的周期性测量或者CPU利用率度量。在第391页上的“使用Unix工具进行批处理”的示例中,Web服务器日志的每一行都是一个事件。

事件可能被编码为文本字符串或JSON,或者以某种二进制形式编码,如第4章所述。这种编码允许您存储一个事件,例如将其附加到一个文件,将其插入关系表,或将其写入文档数据库。它还允许您通过网络将事件发送到另一个节点以进行处理。

在批处理领域,作业的输入和输出是文件(也许在分布式文件系统上)。什么是类似的流媒体?

当输入是一个文件(一个字节序列)时,第一个处理步骤通常是将其解析为一系列记录。在流处理的上下文中,记录通常被称为事件,但它本质上是一样的:一个小的,自包含的,不可变的对象,包含某个时间点发生的事情的细节。一个事件通常包含一个时间戳,指示何时根据时钟来发生(参见第288页上的“单调对时钟”)。

例如,发生的事情可能是用户采取的行动,例如查看页面或进行购买。它也可能来源于机器,例如来自温度传感器的周期性测量或者CPU利用率度量。在第391页上的“使用Unix工具进行批处理”的示例中,Web服务器日志的每一行都是一个事件。

事件可能被编码为文本字符串或JSON,或者以某种二进制形式编码,如第4章所述。这种编码允许您存储一个事件,例如将其附加到一个文件,将其插入关系表,或将其写入文档数据库。它还允许您通过网络将事件发送到另一个节点以进行处理。

在批处理中,文件被写入一次,然后可能被多个作业读取。类似地,在流媒体术语中,一个事件由生产者(也称为发布者或发送者)生成一次,然后由多个消费者(订阅者或接收者)进行处理[3]。在文件系统中,文件名标识一组相关记录;在流媒体系统中,相关的事件通常被组合成一个主题或流。

原则上,文件或数据库足以连接生产者和消费者:生产者将其生成的每个事件写入数据存储,并且每个使用者定期轮询数据存储以检查自上次运行以来出现的事件。这实际上是批处理在每天结束时处理一天的数据的过程。

但是,如果数据存储不是为这种用途而设计的,那么在延迟较小的情况下继续进行处理时,轮询将变得非常昂贵。您调查的次数越多,返回新事件的请求百分比越低,因此开销越高。相反,当新事件出现时,最好通知消费者。

数据库传统上不太支持这种通知机制:关系数据库通常具有触发器,它们可以对变化作出反应(例如,将一行插入到表中),但是它们的功能非常有限,有点在数据库设计中是事后的[4,5]。相反,已经开发了专门的工具来提供事件通知。

消息系统

通知消费者有关新事件的常用方法是使用消息传递系统:生产者发送包含事件的消息,然后将消息推送给消费者。我们之前在第136页的“消息传递数据流”中介绍了这些系统,但现在我们将详细介绍这些系统。

像生产者和消费者之间的Unix管道或TCP连接这样的直接通信渠道将是实现消息传递系统的简单方法。但是,大多数消息传递系统都在这个基本模型上扩展特别是,Unix管道和TCP将一个发送者与一个接收者完全连接,而一个消息传递系统允许多个生产者节点将消息发送到相同的主题,并允许多个消费者节点接收主题中的消息。

在这个发布/订阅模式中,不同的系统采取多种方法,对于所有目的都没有一个正确的答案。为了区分这些系统,询问以下两个问题特别有帮助:

  1. 如果生产者发送消息的速度比消费者能够处理的速度快,会发生什么?一般来说,有三种选择:系统可以放置消息,缓冲队列中的消息或应用背压(也称为流量控制;即阻止生产者发送更多的消息)。例如,Unix管道和TCP使用背压:他们有一个小的固定大小的缓冲区,如果填满,发件人被阻塞,直到收件人从缓冲区中取出数据(参见“网络拥塞和排队”(第282页))。

    如果消息被缓存在队列中,那么了解该队列增长会发生什么很重要。如果队列不再适合内存,或者将消息写入磁盘,系统是否会崩溃?如果是这样,磁盘访问如何影响邮件系统的性能[6]?

  2. 如果节点崩溃或暂时脱机,会发生什么情况 - 是否有消息丢失?与数据库一样,持久性可能需要写入磁盘和/或复制的某种组合(请参阅第227页的侧栏“复制和耐久性”),这有成本。如果您有时可能会丢失消息,则可能在同一硬件上获得更高的吞吐量和更低的延迟。

消息丢失是否可以接受取决于应用程序。例如,对于定期传输的传感器读数和指标,偶尔丢失的数据点可能并不重要,因为更新后的值将在短时间后发送。但是,要注意,如果大量的消息被丢弃,那么衡量标准是不正确的[7]。如果您正在计数事件,那么更重要的是它们可靠地传送,因为每个丢失的消息都意味着不正确的计数器。

我们在第10章中探讨的批处理系统的一个很好的特性是它们提供了强大的可靠性保证:失败的任务会自动重试,失败任务的部分输出会自动丢弃。这意味着输出与没有发生故障一样,这有助于简化编程模型。在本章的后面,我们将研究如何在流式上下文中提供类似的保证。

直接从生产者传递给消费者

许多消息传递系统使用生产者和消费者之间的直接网络通信,而不通过中间节点:

  • UDP组播广泛应用于金融行业,例如股票市场,其中低时延非常重要[8]。虽然UDP本身是不可靠的,但应用程序级别的协议可以恢复丢失的数据包(生产者必须记住它发送的数据包,以便它可以按需重新发送数据包)。

  • 无代理的消息库,如ZeroMQ [9]和nanomsg采取类似的方法,通过TCP或IP多播实现发布/订阅消息传递。

    StatsD [10]和Brubeck [7]使用不可靠的UDP消息传递来收集网络中所有机器的指标并对其进行监控。 (在StatsD协议中,如果接收到所有消息,则计数器度量标准是正确的;使用UDP将使得度量标准最好近似为[11]。另请参阅“TCP与UDP”

  • 如果消费者在网络上公开服务,生产者可以直接发送HTTP或RPC请求(请参阅第131页的“通过服务进行数据流:REST和RPC”)以将消息推送给使用者。这就是webhooks背后的想法[12],一种服务的回调URL被注册到另一个服务中,并且每当事件发生时都会向该URL发出请求。

尽管这些直接消息传递系统在设计它们的情况下运行良好,但是它们通常要求应用程序代码知道消息丢失的可能性。他们可以容忍的错误是相当有限的:即使协议检测并重新传输在网络中丢失的数据包,他们通常假设生产者和消费者不断在线。

如果消费者处于脱机状态,则可能会丢失在无法访问时发送的消息。一些协议允许生产者重试失败的消息传递,但是如果生产者崩溃,这种方法可能会失效,失去了它应该重试的消息的缓冲区。

消息代理

一个广泛使用的替代方法是通过消息代理(也称为消息队列)发送消息,消息代理实质上是一种针对处理消息流而优化的数据库。它作为服务器运行,生产者和消费者作为客户连接到服务器。生产者将消息写入经纪人,消费者通过从经纪人那里读取消息来接收他们。

通过将数据集中在代理中,这些系统可以更容易地容忍来来去去的客户端(连接,断开连接和崩溃),而耐久性问题则转移到代理商。一些消息代理只将消息保存在内存中,而另一些消息代理(取决于配置)将其写入磁盘,以便在代理崩溃的情况下不会丢失。面对缓慢的消费者,他们通常会允许排除异常的排队(而不是丢弃消息或背压),尽管这种选择也可能取决于配置。

排队的结果也是消费者通常是异步的:当生产者发送消息时,通常只等待代理确认已经缓存消息,不等待消息被消费者处理。向消费者的交付将发生在未来某个未定的时间点 - 通常在几分之一秒之内,但有时在队列积压之后显着延迟。

消息代理与数据库进行比较

有些消息代理甚至可以使用XA或JTA参与两阶段提交协议(请参阅第367页的“实践中的分布式事务”)。这个功能与数据库在本质上非常相似,尽管消息代理和数据库之间仍存在重要的实际差异:

  • 数据库通常保留数据,直到被明确删除,而大多数消息代理在消息成功传递给消费者时自动删除消息。这样的消息代理不适合长期的数据存储。
  • 由于他们很快删除了邮件,大多数邮件经纪人都认为他们的工作集合相当小,即队列很短。如果代理需要缓冲很多消息,因为消费者速度较慢(如果消息不再适合内存,则可能会将消息泄漏到磁盘),每个消息需要更长的时间处理,整体吞吐量可能会降低[6]。
  • 数据库通常支持二级索引和各种搜索数据的方式,而消息代理通常支持某种方式订阅匹配某种模式的主题子集。机制是不同的,但是这两种方式都是客户选择想要了解的数据部分的根本途径。
  • 查询数据库时,结果通常基于数据的时间点快照;如果另一个客户端随后向数据库写入更改查询结果的内容,则第一个客户端不会发现其先前的结果现在已过期(除非它重复查询或轮询更改)。相比之下,消息代理不支持任意查询,但是当数据发生变化时(即新消息可用时),它们会通知客户端。

这是消息代理的传统观点,它被封装在JMS [14]和AMQP [15]等标准中,并以RabbitMQ,ActiveMQ,HornetQ,Qpid,TIBCO企业消息服务,IBM MQ,Azure服务总线和Google Cloud Pub/Sub [16]。

多个消费者

当多个消费者阅读同一主题中的消息时,将使用两种主要的消息传递模式,如图11-1所示:

负载均衡

每条消息都被传递给其中一个消费者,所以消费者可以共享处理消息的主题。经纪人可以任意分配消息给消费者。当处理消息的代价很高时,此模式非常有用,因此您希望能够添加消费者来并行处理消息。 (在AMQP中,可以通过让多个客户端从同一个队列中消费来实现负载均衡,而在JMS中则称为共享订阅)。

扇出

每条消息都被传递给所有的消费者。扇出允许几个独立的消费者每个“收听”相同的消息广播,而不会相互影响 - 流式等同于具有读取相同输入文件的多个不同批处理作业。 (此功能由JMS中的主题订阅提供,并在AMQP中交换绑定。)

图11-1。 (a)负载平衡:分担消费者之间消费话题的工作; (b)扇出:将消息传递给多个消费者。 两种模式可以组合:例如,两个独立的消费者组可以每个订阅一个话题,使得每个组共同收到所有消息,但是在每个组内,只有一个节点接收每个消息。

确认和重新交付

消费者可能会随时崩溃,所以可能发生的情况是经纪人向消费者提供消息,但消费者从不处理消费者,或者在消费者崩溃之前只处理消费者。为了确保消息不会丢失,消息代理使用确认:客户端必须明确告诉代理处理消息的时间,以便代理可以将其从队列中移除。

如果与客户端的连接关闭或超时而没有代理收到确认,则认为消息未处理,因此它将消息再次传递给另一个消费者。 (请注意,可能发生这样的消息实际上是完全处理的,但网络中的确认丢失了,处理这种情况需要一个原子提交协议,正如在第360页的“实践中的分布式事务”中所讨论的那样)

当与负载平衡相结合时,这种重新传递行为对消息的排序有一个有趣的影响。在图11-2中,消费者通常按照生产者发送的顺序处理消息。然而,消费者2在处理消息m3时崩溃,与此同时消费者1正在处理消息m4。未确认的消息m3随后被重新发送给消费者1,结果消费者1按照m4,m3,m5的顺序处理消息。因此,m3和m4不是以与生产者1发送的顺序相同的顺序交付的。

图11-2 消费者2在处理m3时崩溃,因此稍后再次向消费者1递送

即使消息代理试图保留消息的顺序(如JMS和AMQP标准所要求的),负载平衡与重新传递的组合也不可避免地导致消息被重新排序。为避免此问题,您可以为每个使用者使用单独的队列(即不使用负载平衡功能)。如果消息是完全独立的,消息重新排序并不是一个问题,但是如果消息之间存在因果依赖关系,这一点很重要,我们将在后面的章节中看到。

分区日志

通过网络发送数据包或向网络服务发送请求通常是暂时的操作,不会留下永久的痕迹。尽管可以永久记录(使用数据包捕获和日志记录),但我们通常不这么想。即使是信息经纪人,他们将信息持久地写入磁盘,在被传递给消费者之后,很快就会将其删除,因为它们是以短暂的消息传递思想为基础构建的。

数据库和文件系统采用相反的方法:写入数据库或文件的所有内容通常都要被永久记录下来,至少在某些人明确选择将其删除之前。

思维方式上的这种差异对如何创建派生数据有很大的影响。批处理过程的一个关键特征是,你可以反复运行它们,试验处理步骤,没有损坏输入的风险(因为输入是只读的)。 AMQP / JMS风格的消息并非如此:如果确认导致从代理中删除消息,则接收消息具有破坏性,因此您不能再次运行同一消费者,并期望得到相同的结果。

如果您将新消费者添加到消息系统,则通常只会开始接收在注册后发送的消息;任何先前的消息已经消失,无法恢复。将它与文件和数据库进行对比,您可以随时添加新客户端,并且可以读取过去任意写入的数据(只要应用程序没有明确覆盖或删除数据)。

为什么我们不能混合使用数据库的持久存储方式和低延迟的消息通知功能呢?这是基于日志消息代理的思想。

使用日志进行消息存储

日志只是磁盘上只能追加记录的序列。我们先前在第3章中的日志结构存储引擎和预写日志的上下文中讨论了日志,在第5章中也讨论了复制的上下文中。

可以使用相同的结构来实现消息代理:生产者通过将消息附加到日志的末尾来发送消息,并且消费者通过依次读取日志来接收消息。如果消费者到达日志的末尾,则等待通知新消息已被追加。 Unix工具tail -f监视数据被附加的文件,基本上是这样工作的。

为了扩展到比单个磁盘提供更高的吞吐量,可以对日志进行分区(在第6章的意义上)。然后可以在不同的机器上托管不同的分区,使每个分区成为一个单独的日志,可以独立于其他分区读取和写入。然后可以将一个话题定义为一组分段,它们都携带相同类型的消息。这种方法如图11-3所示。

在每个分区中,代理为每个消息分配一个单调递增的序列号或偏移量(在图11-3中,框中的数字是消息偏移量)。这样的序列号是有意义的,因为分区是仅附加的,所以分区内的消息是完全有序的。没有跨不同分区的订购保证。

图11-3 生产者通过将消息附加到主题分区文件来发送消息,消费者依次读取这些文件

Apache Kafka [17,18],Amazon Kinesis Streams [19]和Twitter的DistributedLog [20,21]都是基于日志的消息代理。 Google Cloud Pub / Sub在体系结构上相似,但是暴露了JMS风格的API而不是日志抽象[16]。尽管这些消息代理将所有消息写入磁盘,但通过在多台机器上进行分区,每秒能够实现数百万条消息的吞吐量,并通过复制消息来实现容错性[22,23]。

日志与传统消息相比

基于日志的方法平凡支持扇出消息传递,因为多个消费者可以独立读取日志,而不会相互影响 - 读取消息不会将其从日志中删除。为了在一组消费者之间实现负载平衡,代理人可以将整个分区分配给消费者组中的节点,而不是将消息分配给消费者客户端。

每个客户端然后使用分配的分区中的所有消息。通常情况下,当一个用户被分配了一个日志分区时,它会以直接的单线程的方式顺序地读取分区中的消息。这种粗粒度的负载均衡方法有一些缺点:

  • 共享消费主题工作的节点数最多可以是该主题中的日志分区数,因为同一个分区内的消息被传递到同一个节点1
  • 如果单个消息处理缓慢,则会阻止处理该分区中的后续消息(一种行头阻塞形式;请参阅第13页上的“描述性能”)。

因此,在处理消息可能代价高昂,并且希望逐个消息地平行处理以及消息排序并不那么重要的情况下,消息代理的JMS / AMQP风格是可取的。另一方面,在消息吞吐量高的情况下,每条消息的处理速度都很快,消息的排序也是重要的,基于日志的方法运行得很好。

消费者偏移量

消耗一个分区依次可以很容易地判断哪些消息已经被处理:所有消息的偏移量小于消费者的当前偏移量已经被处理,并且具有更大偏移量的所有消息还没有被看到。因此,经纪人不需要跟踪每条消息的确认,只需要定期记录消费者的偏移。在这种方法中减少的簿记开销以及批处理和流水线的机会有助于提高基于日志的系统的吞吐量。

实际上,这种偏移量与单领先数据库复制中常见的日志序列号非常相似,我们在第151页的“设置新的跟踪者”中讨论了这种情况。在数据库复制中,日志序列号允许跟随者断开连接后,重新连接到领导,并在不跳过任何写入的情况下恢复复制。这里使用的原则完全相同:信息经纪人的行为像一个领导者数据库,而消费者就像一个追随者。

如果消费者节点失败,则使用者组中的另一个节点将被分配失败的使用者分区,并开始以最后记录的偏移量使用消息。如果消费者已经处理了后续的消息,但还没有记录他们的偏移量,那么在重新启动后这些消息将被第二次处理。本章后面我们将讨论处理这个问题的方法。

磁盘空间使用情况

如果您只附加到日志,则最终将耗尽磁盘空间。为了回收磁盘空间,日志实际上被分割成段,并且不时地将旧段删除或移动到归档存储器。 (我们将在后面讨论一种更为复杂的释放磁盘空间的方法。)

这就意味着,如果一个消费者的速度慢,消费者的消费速度落后于消费者的偏移量,那么消费者的偏移量就会指向一个已经被删除的消费者。实际上,日志实现了一个有限大小的缓冲区,当旧的消息满时,它也被称为循环缓冲区或环形缓冲区。但是,由于该缓冲区在磁盘上,因此可能相当大。

让我们来做一个后台计算。在撰写本文时,典型的大型硬盘驱动器容量为6TB,顺序写入吞吐量为150MB / s。如果您以最快的速度写邮件,则需要大约11个小时才能填满驱动器。因此,磁盘可以缓存11个小时的消息,之后它将开始覆盖旧的消息。即使您使用多个硬盘驱动器和机器,这个比率也是一样的。在实践中,部署很少使用磁盘的完整写入带宽,所以日志通常可以保存几天甚至几周的缓冲区。

不管你保留多长时间的消息,一个日志的吞吐量或多或少保持不变,因为无论如何每个消息都被写入磁盘[18]。这种行为与将邮件默认保存在内存中的消息传递系统形成鲜明对比,如果队列变得太大,只将其写入磁盘:当这些系统开始写入磁盘时,这些系统速度很快,并且变慢得多,所以吞吐量取决于保留的历史数量。

当消费者跟不上生产者时

在“信息系统”第441页的开头,我们讨论了如果消费者无法跟上生产者发送信息的速度的三种选择:丢弃信息,缓冲或施加背压。在这个分类法中,基于日志的方法是一种缓冲形式,具有较大但固定大小的缓冲区(受可用磁盘空间的限制)。

如果消费者远远落后于它所要求的信息比保留在磁盘上的信息要旧,那么它将不能读取这些信息,所以代理人有效地丢弃了比缓冲区容量更大的旧信息。您可以监控消费者在日志头后面的距离,如果显着落后,则会发出警报。由于缓冲区很大,因此有足够的时间让人类操作员修复缓慢的消费者,并在消息开始丢失之前让其赶上。

即使消费者落后太多,开始丢失信息,也只有消费者受到影响;它不会中断其他消费者的服务。这是一个巨大的运营优势:您可以通过实验性的方式使用生产日志进行开发,测试或调试,而不必担心会中断生产服务。当消费者关闭或崩溃时,会停止消耗资源,唯一剩下的就是消费者抵消。

这种行为也与传统的信息经纪人形成了鲜明的对比,在这种情况下,您需要小心删除消费者已经关闭的任何队列,否则他们会继续不必要地积累消息,并从仍然活跃的消费者那里带走内存。

重播旧信息

我们之前提到,使用AMQP和JMS风格的消息代理,处理和确认消息是一个破坏性的操作,因为它会导致消息在代理上被删除。另一方面,在基于日志的消息代理中,使用消息更像是从文件中读取数据:这是只读操作,不会更改日志。

处理的唯一副作用,除了消费者的任何产出之外,消费者补偿正在向前发展。但是偏移量是在消费者的控制之下的,所以如果需要的话可以很容易地被操纵:例如,你可以用昨天的偏移量开始一个消费者的副本,并将输出写到不同的位置,以便重新处理最后一天值得的消息。您可以重复这个任意次数,改变处理代码。

这方面使得基于日志的消息传递更像上一章的批处理过程,其中派生数据通过可重复的转换过程与输入数据明确分离。它允许更多的实验,更容易从错误和错误中恢复,使其成为在组织内集成数据流的好工具[24]。

流与数据库

我们已经在消息代理和数据库之间进行了一些比较。尽管传统上它们被视为单独的工具类别,但是我们看到基于日志的消息中介已经成功地从数据库中获取了想法并将其应用于消息传递。我们也可以反过来:从消息和流中获取想法,并将它们应用于数据库。

我们之前曾经说过,事件是某个时刻发生的事情的记录。发生的事情可能是用户操作(例如键入搜索查询)或传感器读取,但也可能是写入数据库。事情被写入数据库的事实是可以被捕获,存储和处理的事件。这一观察结果表明,数据库和数据流之间的连接不仅仅是磁盘上日志的物理存储 - 这是非常重要的。

事实上,复制日志(请参阅第158页上的“复制日志的实现”)是数据库写入事件的流,由领导者在处理事务时生成。追随者将写入流应用到他们自己的数据库副本,从而最终得到相同数据的精确副本。复制日志中的事件描述发生的数据更改。

我们还在第348页的“全序广播”中遇到了状态机复制原理,其中指出:如果每个事件代表对数据库的写入,并且每个副本按相同的顺序处理相同的事件,则副本将所有这些都以相同的最终状态结束。 (处理一个事件被认为是一个确定性的操作。)这只是事件流的另一种情况! 在本节中,我们将首先看看异构数据系统中出现的一个问题,然后探讨如何通过将事件流的想法带入数据库来解决这个问题。

保持系统同步

正如我们在本书中所看到的,没有一个系统能够满足所有的数据存储,查询和处理需求。在实践中,大多数不重要的应用程序需要结合多种不同的技术来满足他们的需求:例如,使用OLTP数据库来为用户请求提供服务,缓存来加速常见请求,处理全文索引搜索查询和用于分析的数据仓库。每个数据都有其自己的数据副本,存储在自己的表示中,并根据自己的目的进行了优化。

由于相同或相关的数据出现在不同的地方,因此需要保持相互同步:如果某个项目在数据库中进行了更新,则还需要在缓存,搜索索引和数据仓库中进行更新。对于数据仓库,这种同步通常由ETL进程执行(参见第91页的“数据仓库”),通常通过获取数据库的完整副本,转换数据库并将其批量加载到数据仓库中 - 换句话说,一个批处理。同样,我们在第419页的“批量工作流的输出”中看到了如何使用批处理过程创建搜索索引,建议系统和其他派生数据系统。

如果周期性的完整数据库转储过于缓慢,有时使用的替代方法是双重写入,其中应用程序代码在数据更改时明确写入每个系统:例如,首先写入数据库,然后更新搜索索引,然后使缓存条目无效(或者甚至同时执行这些写入)。

但是,双重写入有一些严重的问题,其中一个是图11-4所示的竞争条件。在这个例子中,两个客户端同时想要更新一个项目X:客户端1想要将值设置为A,客户端2想要将其设置为B.两个客户端首先将新值写入数据库,然后将其写入到搜索索引。由于运行时间不正确,这些请求是交错的:数据库首先看到从客户端1的写入将值设置为A,然后从客户端2写入将值设置为B,因此数据库中的最终值为B.搜索索引首先看到来自客户端2,然后是客户端1的写入,所以搜索索引中的最终值是A.这两个系统现在永久地不一致,即使没有发生错误。

图11-4 在数据库中,X首先被设置为A,然后被设置为B,而在搜索索引处,写入以相反的顺序到达

除非有一些额外的并发检测机制,例如我们在第184页上的“检测并发写入”中讨论的版本向量,否则您甚至不会注意到发生了并发写入 - 一个值将简单地以无提示方式覆盖另一个值。

双重写入的另一个问题是其中一个写入可能会失败,而另一个成功。这是一个容错问题,而不是一个并发问题,但也会造成两个系统互相矛盾的结果。确保它们都成功或者两者都失败是原子提交问题的一个例子,这个问题的解决是昂贵的(请参阅第354页上的“原子提交和两阶段提交(2PC)”)。

如果只有一个复制的数据库和一个领导者,那么这个领导者决定了写入顺序,所以状态机复制方法可以在数据库的副本中工作。然而,在图11-4中,没有一个领导者:数据库可能有一个领导者,搜索索引可能有一个领导者,但是既不在另一个领导者之后,也可能发生冲突(参见“多领导者复制“)。

如果实际上只有一个领导者(例如数据库),并且如果我们可以使搜索索引成为数据库的追随者,情况会更好。但这在实践中可能吗?

改变数据捕获

大多数数据库的复制日志的问题在于,它们一直被认为是数据库的内部实现细节,而不是公共API。客户端应该通过其数据模型和查询语言来查询数据库,而不是分析复制日志并尝试从中提取数据。

数十年来,许多数据库根本没有记录方式来获取写入到他们的变更日志。由于这个原因,很难将数据库中所做的所有更改复制到不同的存储技术,如搜索索引,缓存或数据仓库。

最近,人们对变更数据捕获(CDC)越来越感兴趣,它是观察写入数据库的所有数据变化并将其提取出来,并将其复制到其他系统中的过程。 CDC特别感兴趣的是,如果改变可以立即用于流,可以立即写入。

例如,您可以捕获数据库中的更改并不断将相同的更改应用于搜索索引。如果更改的日志以相同的顺序应用,则可以预期搜索索引中的数据与数据库中的数据匹配。搜索索引和任何其他派生的数据系统只是变化流的消费者,如图11-5所示。

图11-5 将数据按顺序写入一个数据库,然后按照相同的顺序将这些更改应用到其他系统 实施更改数据捕获 我们可以调用日志消费者导出的数据系统,正如在第三部分的介绍中所讨论的:存储在搜索索引和数据仓库中的数据只是记录系统中数据的另一个视图。更改数据捕获是一种机制,可确保对记录系统所做的所有更改都反映在派生数据系统中,以便派生系统具有数据的准确副本。

从本质上说,改变数据捕获使得一个数据库成为领导者(从中捕获变化的数据库),并将其他人变成追随者。基于日志的消息代理非常适合从源数据库传输更改事件,因为它保留了消息的排序(避免了图11-2的重新排序问题)。

数据库触发器可用于通过注册触发器来实现更改数据捕获(请参阅第152页的“基于触发器的复制”),这些触发器可观察数据表的所有更改,并将相应的条目添加到更改日志表中。但是,他们往往是脆弱的,并有显着的性能开销。解析复制日志可以是一个更强大的方法,但它也带来了挑战,例如处理模式更改。

LinkedIn的Databus [25],Facebook的Wormhole [26]和Yahoo!的Sherpa [27]大规模地使用这个想法。 Bottled Water使用解码预写日志的API来实现PostgreSQL的CDC [28],Maxwell和Debezium通过解析binlog为MySQL做类似的事情[29,30,31],Mongoriver读取MongoDB oplog [32,33] ,而GoldenGate为Oracle提供类似的功能[34,35]。

像消息代理一样,更改数据捕获通常是异步的:记录数据库系统不会等待更改应用到消费者,然后再进行更改。这种设计具有的操作优势是添加缓慢的使用者不会影响记录系统太多,但是它具有所有复制滞后问题的缺点(请参见第161页中的“复制滞后问题”)。

初始快照

如果具有对数据库所做的所有更改的日志,则可以通过重播日志来重新构建数据库的整个状态。但是,在许多情况下,永远保留所有更改将需要太多的磁盘空间,并且重播将花费太长时间,因此日志需要被截断。

例如,构建新的全文索引需要整个数据库的完整副本 - 仅仅应用最近更改的日志是不够的,因为它将丢失最近未更新的项目。因此,如果您没有完整的日志历史记录,则需要从一致的快照开始,如先前在第155页上的“设置新的追随者”中所述。

数据库的快照必须与更改日志中的已知位置或偏移量相对应,以便您知道在快照处理完成后,在哪一点开始应用更改。一些CDC工具集成了这个快照工具,而其他工具则将其作为手动操作。

日志压缩

如果只能保留有限的日志历史记录,则每次需要添加新的派生数据系统时都需要执行快照过程。但是,日志压缩提供了一个很好的选择。

在日志结构化的存储引擎的情况下,我们先讨论了第72页的“Hash索引”中的日志压缩(参见图3-2的示例)。原理很简单:存储引擎使用相同的密钥定期查找日志记录,丢弃任何重复内容,并且只保留每个密钥的最新更新。这个压缩和合并过程在后台运行。

在日志结构存储引擎中,具有特殊空值(逻辑删除)的更新指示删除了一个密钥,并在日志压缩过程中将其删除。但只要密钥不被覆盖或删除,它就永远留在日志中。这种压缩日志所需的磁盘空间仅取决于数据库的当前内容,而不取决于数据库中发生的写入次数。如果相同的密钥经常被覆盖,则以前的值将最终被垃圾收集,并且只保留最新的值。

在基于日志的消息代理和更改数据捕获方面,相同的想法也适用。如果CDC系统设置为每个更改都有一个主键,并且每个键的更新都替换了该键的以前的值,那么仅保留最近写入的特定键就足够了。

现在,无论何时要重建派生数据系统(如搜索索引),都可以从日志压缩主题的偏移量0开始新的使用者,然后依次扫描日志中的所有消息。日志保证包含数据库中每个键的最新值(也可能是一些较旧的值) - 换句话说,您可以使用它来获取数据库内容的完整副本,而无需获取CDC的另一个快照源数据库。

Apache Kafka支持此日志压缩功能。正如我们将在本章后面看到的,它允许消息代理被用于持久存储,而不仅仅是用于临时消息。

API支持更改流

越来越多的数据库开始支持变更流作为一流的接口,而不是典型的改造和逆向工程CDC的努力。例如,RethinkDB允许查询在查询更改结果[36],Firebase [37]和CouchDB [38]基于同样可用于应用程序的更改提要进行数据同步时订阅通知,而Meteor使用MongoDB oplog订阅数据更改并更新用户界面[39]。

VoltDB允许事务以流的形式连续地从数据库中导出数据[40]。数据库将关系数据模型中的输出流表示为一个表,事务可以在其中插入元组,但不能被查询。然后这个流由提交事务已经写入这个特殊表的元组日志组成,它们按照提交的顺序。外部使用者可以异步使用此日志并使用它来更新派生的数据系统。

Kafka Connect [41]致力于将广泛的数据库系统的变更数据捕获工具与Kafka集成。一旦更改事件发生在Kafka中,它就可以用来更新派生的数据系统,比如搜索索引,也可以用于本章稍后讨论的流处理系统。

事件源

我们在这里讨论的想法和事件采购之间有一些相似之处,这是一个在领域驱动设计(DDD)社区中开发的技术。我们将简要讨论事件源,因为它包含了一些有用的和相关的流式系统的想法。

与更改数据捕获类似,事件采购涉及将所有对应用程序状态的更改存储为更改事件的日志。最大的区别是事件源代码在不同的抽象层次上应用了这个想法:

  • 在更改数据捕获中,应用程序以可变方式使用数据库,随意更新和删除记录。从数据库中提取较低级别的更改日志(例如,通过解析复制日志),从而确保从数据库中提取的写入顺序与实际写入的顺序相匹配,从而避免图中的竞争条件11-4。写入数据库的应用程序不需要知道CDC正在发生。
  • 在事件源中,应用程序逻辑是基于写入事件日志的不可变事件而显式构建的。在这种情况下,事件存储是附加的,更新或删除是不鼓励或禁止的。事件旨在反映应用程序级别发生的事情,而不是低级状态更改。

事件源是一种强大的数据建模技术:从应用程序的角度来看,将用户的行为记录为不可变的事件更有意义,而不是记录这些行为对可变数据库的影响。事件采购使得随着时间的推移而逐渐发展应用程序变得更加容易,通过更容易理解事情发生的原因以及防范应用程序错误(请参阅“不可变事件的优点”),帮助进行调试。

例如,存储“学生取消课程注册”事件清楚地表达了单一行为的中性意图,而副作用“从注册表中删除了一个条目,并且一个取消原因被添加到学生反馈表“嵌入了很多有关方式的假设数据稍后将被使用。如果引入新的应用程序功能,例如“将地点提供给等待列表中的下一个人” - 事件采购方法允许将新的副作用轻松地链接到现有事件上。

事件采购类似于编年史数据模型[45],事件日志和事实表之间也有相似之处,您可以在星型模式中找到它(请参阅第93页上的“星星和雪花:分析模式”) 。

专门的数据库如Event Store [46]已经被开发来支持使用事件采购的应用程序,但总的来说,这个方法是独立于任何特定的工具的。传统的数据库或基于日志的消息代理也可以用来构建这种风格的应用程序。

从事件日志中导出当前状态

事件日志本身并不是很有用,因为用户通常期望看到系统的当前状态,而不是修改的历史。例如,在购物网站上,用户期望能够看到他们购物车的当前内容,而不是他们对购物车所做的所有改变的附加列表。

因此,使用事件源的应用程序需要记录事件的日志(表示写入系统的数据),并将其转换为适合向用户显示的应用程序状态(从系统读取数据的方式[47 ])。这种转换可以使用任意的逻辑,但它应该是确定性的,以便您可以再次运行它并从事件日志中派生相同的应用程序状态。

与更改数据捕获一样,重放事件日志可以让您重新构建系统的当前状态。但是,日志压缩需要以不同的方式处理:

  • 用于记录更新的CDC事件通常包含记录的全部新版本,因此主键的当前值完全由该主键的最近事件确定,并且日志压缩可以丢弃以前的事件同样的钥匙。
  • 另一方面,事件采购是将事件建模为更高的级别:事件通常表示用户操作的意图,而不是由于操作而发生的状态更新的机制。在这种情况下,以后的事件通常不会覆盖以前的事件,所以您需要事件的完整历史来重构最终状态。日志压缩不可能以相同的方式进行。

使用事件源的应用程序通常有一些机制来存储从事件日志中导出的当前状态的快照,因此它们不需要重复处理完整的日志。但是,这只是一个性能优化,可以加快读取和恢复崩溃的速度;目的是系统能够永久存储所有原始事件,并在需要时重新处理完整的事件日志。我们在第463页的“不变性的限制”中讨论这个假设。

命令和事件

事件采购哲学是仔细区分事件和命令[48]。当来自用户的请求首先到达时,它最初是一个命令:在这一点上它可能仍然失败,例如因为违反了一些完整性条件。应用程序必须首先验证它是否可以执行该命令。如果验证成功并且命令被接受,则它变成一个持久且不可变的事件。

例如,如果用户试图注册特定用户名,或在飞机上或剧院中预定座位,则应用程序需要检查用户名或座位是否已被占用。 (我们先前在第364页的“容错概念”中讨论过这个例子。)当检查成功时,应用程序可以生成一个事件来指示特定的用户名是由特定的用户ID注册的,座位已经预留给特定的顾客。

在事件发生的时候,这成为事实。即使客户稍后决定更改或取消预订,事实仍然是事实,他们以前曾为某个特定的座位进行预订,而更改或取消是稍后添加的单独事件。

事件流的消费者不允许拒绝事件:当消费者看到事件时,它已经是日志中不可变的部分,并且可能已经被其他消费者看到。因此,任何命令的验证都需要在事件成为事件之前同步发生,例如,通过使用可自动验证命令并发布事件的可序列化事务。

或者,预订座位的用户请求可以分成两个事件:第一个是暂时预约,第二个是确认预约后的单独确认事件(如第350页上的“使用总订单广播实现线性化存储”中所述) 。这个分割允许验证发生在一个异步的过程中。

状态,流和不变性

我们在第10章中看到批量处理从其输入文件的不变性中受益,因此您可以在现有输入文件上运行实验性处理作业,而不用担心损坏它们。这种不变性原则也是使得事件采购和数据变化如此强大的原因。

我们通常将数据库视为存储应用程序的当前状态 - 这种表示法针对读取进行了优化,而且通常对于服务查询来说是最方便的。状态的本质是它的变化,所以数据库支持更新和删除数据以及插入数据。这是如何符合不变性的?

只要你的状态发生了变化,那么这个状态是随着时间的推移而变化的事件的结果。例如,您当前可用的座位列表是您已经处理的预订的结果,当前帐户余额是帐户中的信用卡和借方的结果,而您的Web服务器的响应时间图是发生的所有Web请求的个别响应时间。

无论国家如何变化,总会有一系列事件导致这些变化。即使事情已经解决,事实仍然是事实发生的事实。关键的想法是可变状态和不可变事件的附加日志不相互矛盾:它们是同一枚硬币的两面。所有变化的日志,变化日志,代表了随着时间的推移状态的演变。

如果您有数学上的倾向,那么您可能会说应用程序状态是随着时间的推移整合了一个事件流而得到的,而且当您按照时间区分状态时会得到一个更改流,如图11-6所示[ 49,50,51]。这个比喻有一定的局限性(例如,国家的二阶导数似乎没有意义),但这是考虑数据的一个有用的起点。

图11-6 当前应用程序状态和事件流之间的关系

如果你持久地存储更新日志,那么这只是使状态重现的效果。如果你认为事件的日志是你的记录系统,并且从它派生出任何可变状态,那么就更容易推断通过系统的数据流。正如帕特·赫兰(Pat Helland)所说的[52]:

事务日志记录对数据库所做的所有更改。高速追加是更改日志的唯一方法。从这个角度来看,数据库的内容会保存日志中最新记录值的缓存。事实是日志。数据库是日志子集的缓存。该缓存子集恰好是来自日志的每个记录和索引值的最新值。

日志压缩(如第456页的“日志压缩”中所述)是一种桥接日志和数据库状态之间区别的方法:它只保留每条记录的最新版本,并丢弃被覆盖的版本。

不可变事件的优点

数据库中的不变性是一个古老的想法。例如,会计师在数个世纪以来一直使用不变性财务簿记。当一笔交易发生时,它被记录在一个仅追加分类帐中,这本质上是描述货币,商品或服务已经转手的事件日志。账目,如损益或资产负债表,是从分类账中的交易中加起来得来的[53]。

如果发生错误,会计师不会删除或更改分类帐中的错误交易 - 而是增加另一笔交易,以补偿错误,例如退还不正确的费用。不正确的交易将永远保留在分类帐中,因为审计原因可能很重要。如果从不正确的分类账导出的错误数字已经公布,那么下一个会计期间的数字就包括一个更正。这个过程在会计中是完全正常的[54]。

尽管这种可审计性在金融系统中尤其重要,但对于不受这种严格管制的许多其他系统也是有益的。如“批处理输出的哲学”(第439页)中所述,如果您意外地部署了将错误数据写入数据库的错误代码,那么如果代码能够破坏性地覆盖数据,恢复将更加困难。通过不可变事件的追加日志,诊断发生的事情和从问题中恢复起来要容易得多。

不可变的事件也捕获比当前状态更多的信息。例如,在购物网站上,顾客可以将物品添加到他们的购物车,然后再将其移除。虽然第二个事件从订单履行角度取消了第一个事件,但为了分析目的,客户正在考虑某个特定项目,但是之后决定采取反对措施。也许他们会选择在未来购买,或者他们找到替代品。这个信息被记录在一个事件日志中,但是当它们从购物车中被删除时,这个信息会丢失在删除项目的数据库中[42]。

从同一事件日志中获取多个视图

而且,通过从不变事件日志中分离可变状态,可以从事件的相同日志中派生出几个不同的面向读取的表示。这就像一个流的多个消费者一样工作(图11-5):例如,分析数据库Druid使用这种方法从Kafka直接获取[55],Pista chio是一个分布式的键值存储,使用Kafka作为提交日志[56],Kafka Connect接收器可以将来自Kafka的数据导出到各种不同的数据库和索引[41]。对于许多其他存储和索引系统(如搜索服务器)来说,类似地从分布式日志中获取输入也是有意义的(请参阅第455页上的“保持系统同步”)。

从事件日志到数据库有一个明确的转换步骤,可以更容易地随时间推移您的应用程序:如果您想要引入一个以新的方式呈现现有数据的新功能,您可以使用事件日志来构建一个单独的新功能的读取优化视图,并与现有的一起运行

系统而不必修改它们。并行运行旧系统和新系统通常比在现有系统中执行复杂的模式迁移更容易。一旦旧的系统不再需要,你可以简单地关闭它并回收它的资源[47,57]。

如果您不必担心如何查询和访问数据,那么存储数据通常是非常简单的。模式设计,索引和存储引擎的许多复杂性都是希望支持某些查询和访问模式的结果(参见第3章)。出于这个原因,通过将数据写入的形式与读取形式分开,并允许几个不同的读取视图,可以获得很大的灵活性。这个想法有时被称为命令查询责任分离(CQRS)[42,58,59]。

数据库和模式设计的传统方法是基于数据必须以与查询相同的形式写入的谬误。有关正常化和非规范化的争论(请参阅第31页上的“多对一和多对多关系”),如果可以将数据从写入优化的事件日志转换为读取优化的应用程序状态,则变得基本无关紧要:在读取优化的视图中对数据进行非规范化是完全合理的,因为翻译过程为您提供了一种机制,使其与事件日志保持一致。

在第11页的“描述负载”中,我们讨论了Twitter的家庭时间表,最近一个特定用户正在关注的人(如邮箱)写的最近发布的推文缓存。这是阅读优化状态的另一个例子:家庭时间表高度变形,因为你的推文在所有跟随你的人的时间线上都是重复的。然而,扇出服务保持这种复制状态与新的推文和新的以下关系保持同步,这保持了复制的可管理性。

并发控制

事件采集和更改数据捕获的最大缺点是事件日志的消费者通常是异步的,所以用户可能会写入日志,然后从日志派生的视图中读取并查找他们的写作还没有反映在读取视图。我们在第162页的“阅读您自己的作品”中讨论了这个问题和潜在的解决方案。

一种解决方案是同步执行读取视图的更新,并将事件附加到日志中。这需要一个事务来将写入操作合并到一个原子单元中,所以要么需要将事件日志和读取视图保存在同一个存储系统中,要么需要跨不同系统的分布式事务。或者,您可以使用第350页上的“使用总订单广播实现线性化存储”中讨论的方法。

另一方面,从事件日志导出当前状态也简化了并发控制的某些方面。对多个对象事务的需求(请参阅第228页上的“单对象和多对象操作”)源于单个用户操作,需要在多个不同的位置更改数据。通过事件采购,您可以设计一个事件,以便对用户操作进行独立的描述。用户操作只需要在一个地方进行一次写操作,即将事件附加到日志中,这很容易使原子化。

如果事件日志和应用程序状态以相同的方式分区(例如,为分区3中的客户处理事件只需要更新应用程序状态的分区3),则直接的单线程日志消费者不需要并发控制(write-by)构造,它一次只处理一个事件(另请参阅第252页的“实际的串行执行”)。该日志通过在分区中定义事件的串行顺序来消除并发性的不确定性[24]。如果一个事件触及多个状态分区,那么需要做更多的工作,我们将在第12章讨论。

不变性的限制

许多不使用事件源模型的系统依赖于不可变性:各种数据库在内部使用不可变的数据结构或多版本数据来支持时间点快照(请参见“索引和快照隔离” )。 Git,Mercurial和Fossil等版本控制系统也依靠不可变的数据来保存文件的版本历史记录。

永远保持所有变化的不变的历史在多大程度上是可行的?答案取决于数据集中的流失量。一些工作负载主要是添加数据,很少更新或删除;他们很容易使不变。其他工作负载在较小的数据集上有较高的更新和删除率;在这些情况下,不可改变的历史可能变得过于庞大,碎片化可能成为一个问题,压缩和垃圾收集的表现对于操作的鲁棒性变得至关重要[60,61]。

除了性能方面的原因外,也可能出于管理方面的原因需要删除数据的情况,尽管这些数据都是不可变的。例如,隐私条例可能要求在关闭帐户后删除用户的个人信息,数据保护立法可能要求删除错误的信息,或者可能需要包含敏感信息的意外泄露。

在这种情况下,仅仅在日志中添加另一个事件来指示先前的数据应该被视为删除是不够的 - 您实际上是想重写历史并假装数据从未写在第一位。例如,Datomic调用这个特性excision [62],而Fossil版本控制系统有一个类似的概念叫做shunning [63]。

真正的删除数据是非常困难的[64],因为拷贝可以存在于很多地方:例如,存储引擎,文件系统和SSD通常写入一个新的位置,而不是覆盖到位[52],而备份通常是故意不可改变的防止意外删除或腐败。删除更多的是“使检索数据更难”,而不是“使检索数据不可能”。无论如何,有时您必须尝试,正如我们在“立法和自律”中所看到的第542页。

流处理

到目前为止,本章中我们已经讨论了流的来源(用户活动事件,传感器和写入数据库),我们讨论了流如何传输(通过直接消息传送,通过消息代理和事件日志)。

剩下的就是讨论一下你可以用流做什么 - 也就是说,你可以处理它。一般来说,有三种选择:

  1. 您可以将事件中的数据写入数据库,缓存,搜索索引或类似的存储系统,然后由其他客户端查询。如图11-5所示,这是保持数据库与系统其他部分发生更改同步的好方法 - 特别是当流消费者是写入数据库的唯一客户端时。写入存储系统的流程相当于我们在“批处理工作流程的输出”页面上讨论的内容。
  2. 您可以以某种方式将事件推送给用户,例如通过发送电子邮件警报或推送通知,或通过将事件流式传输到可实时显示的实时仪表板。在这种情况下,人是流的最终消费者。
  3. 您可以处理一个或多个输入流以产生一个或多个输出流。数据流可能会经过由几个这样的处理阶段组成的流水线,然后才会输出(选项1或2)。

在本章的其余部分中,我们将讨论选项3:处理流以产生其他派生流。处理这样的流的代码片段被称为操作员或作业。它与我们在第10章中讨论过的Unix进程和MapReduce作业密切相关,数据流的模式是相似的:一个流处理器以只读的方式使用输入流,并将其输出写入一个不同的位置时尚。

流处理器中的分区和并行化模式也非常类似于第10章中介绍的MapReduce和数据流引擎,因此我们不在这里重复这些主题。基本的映射操作(如转换和过滤记录)也是一样的。

批量作业的一个关键区别是流不会结束。这种差别有很多含义:正如本章开始部分所讨论的,排序对无界数据集没有意义,因此不能使用排序合并联接(请参阅“减少联接和分组”)。容错机制也必须改变:对于已经运行了几分钟的批处理作业,可以简单地从头开始重新启动失败的任务,但是对于已经运行数年的流作业,在开始后重新开始崩溃可能不是一个可行的选择。

流处理的应用

长期以来,流处理一直用于监控目的,如果某个事情发生,组织就希望得到警报。例如:

  • 欺诈检测系统需要确定信用卡的使用模式是否意外地发生了变化,并且如果信用卡可能已被盗用,则将其封锁。
  • 交易系统需要检查金融市场的价格变化,并根据指定的规则进行交易。
  • 制造系统需要监控工厂中机器的状态,如果出现故障,可以快速识别问题。
  • 军事和情报系统需要跟踪潜在的攻击者的行动,并在发生袭击的迹象时发出警报。

这些类型的应用程序需要非常复杂的模式匹配和相关性。然而,流处理的其他用途也随着时间的推移而出现。在本节中,我们将简要比较一下这些应用程序。

复杂的事件处理

复杂事件处理(CEP)是20世纪90年代为分析事件流而开发的一种方法,尤其适用于需要搜索某些事件模式的应用程序[65,66]。与正则表达式允许您在字符串中搜索特定字符模式的方式类似,CEP允许您指定规则以在流中搜索某些事件模式。

CEP系统通常使用高级声明式查询语言(如SQL或图形用户界面)来描述应该检测到的事件模式。这些查询被提交给一个处理引擎,该引擎使用输入流并在内部维护一个执行所需匹配的状态机。当发现匹配时,引擎发出一个复杂的事件(因此名字)与事件模式的细节[67]。

在这些系统中,查询和数据之间的关系与普通数据库相比是颠倒的。通常情况下,数据库会持久存储数据,并将查询视为暂时的:当查询进入时,数据库搜索与查询匹配的数据,然后在查询完成时忘记查询。 CEP引擎反转了这些角色:查询是长期存储的,来自输入流的事件不断流过它们,以搜索匹配事件模式的查询[68]。

CEP的实现包括Esper [69],IBM InfoSphere Streams [70],Apama,TIBCO StreamBase和SQLstream。像Samza这样的分布式流处理器也获得了对流声明式查询的SQL支持[71]。

流分析

使用流处理的另一个领域是对流进行分析。 CEP和流分析之间的边界是模糊的,但作为一般规则,分析往往不太关心找到特定的事件序列,并且更倾向于聚合和统计度量大量的事件——例如:

  • 测量某种类型事件的速率(每个时间间隔发生的频率)
  • 计算一段时间内某个值的滚动平均值
  • 将当前的统计数据与以前的时间间隔进行比较(例如,检测趋势或提醒与上周同期相比过高或过低的指标)

这些统计信息通常是在固定的时间间隔内进行计算的,例如,您可能想知道在过去5分钟内每秒对服务的平均查询次数,以及在此期间的第99百分位响应时间。在几分钟内平均,从一秒钟到下一秒钟平滑无关的波动,同时还能及时了解交通模式的任何变化。您汇总的时间间隔称为窗口,我们将在第468页的“关于时间的推理”中更详细地讨论窗口。

流分析系统有时使用概率算法,例如Bloom filter(我们在第79页的“性能优化”中遇到过),设置成员资格,HyperLogLog [72]基数估计以及各种百分比估计算法(请参阅“Percentiles in Practice “第16页)。概率算法产生近似的结果,但是具有在流处理器中比精确算法需要少得多的存储器的优点。近似算法的使用有时会使人们相信流处理系统总是有损和不精确的,但这是错误的:流处理没有任何内在的近似,而概率算法只是一个优化[73]。 许多开源分布式流处理框架的设计都是以分析为基础的:例如Apache Storm,Spark Streaming,Flink,Concord,Samza和Kafka Streams [74]。托管服务包括Google Cloud Dataflow和Azure Stream Analytics。

保持物化视图

我们在第451页的“数据库和数据流”中看到,可以使用数据库更改流来保持派生数据系统(如缓存,搜索索引和数据仓库)与源数据库保持最新。我们可以将这些示例视为维护实体化视图的具体情况(请参阅“聚合:数据多维数据集和实例化视图”(第101页)):导出某个数据集的替代视图,以便可以高效地查询它,并在底层数据更改[50]。

同样,在事件采购中,应用程序状态通过应用事件日志来维护;这里的应用状态也是一种物化视图。与流分析场景不同,在某个时间窗口内仅考虑事件通常是不够的:构建物化视图可能需要在任意时间段内的所有事件,除了可能由日志压缩丢弃的任何过时事件(请参阅“日志压缩“)。实际上,您需要一个可以一直延伸到一开始的窗口。 原则上,任何流处理器都可以用于物化视图维护,尽管永久维护事件的需要与一些主要在有限持续时间的窗口上运行的面向分析的框架的假设背道而驰。 Samza和Kafka Streams支持这种用法,建立在Kafka对夯实的支持上[75]。

在流上搜索

除了允许搜索由多个事件组成的模式的CEP外,还有时需要基于复杂的标准(例如全文搜索查询)来搜索单个事件。

例如,媒体监测服务可以订阅新闻文章和媒体广播,并搜索任何关于公司,产品或感兴趣的话题的新闻。这是通过预先制定一个搜索查询来完成的,然后不断地将新闻项目流与这个查询进行匹配。在一些网站上也有类似的功能:例如,房地产网站的用户在市场上出现符合其搜索条件的新房产时,可以要求通知。 Elasticsearch [76]的渗滤器功能是实现这种流式搜索的一种选择。

传统的搜索引擎首先索引文件,然后在索引上运行查询。相比之下,搜索一个数据流将会处理它的头部:查询被存储,文档通过查询运行,就像CEP一样。在最简单的情况下,您可以针对每个查询来测试每个文档,但是如果您有大量查询,这可能会变慢。为了优化过程,可以对查询和文档进行索引,从而缩小可能匹配的查询集合[77]。

消息传递和RPC

在第136页的“消息传递数据流”中,我们讨论了消息传递系统作为RPC的替代方案,即作为通信服务的机制,例如在参与者模型中所使用的。虽然这些系统也是基于消息和事件,但我们通常不会将它们视为流处理器:

Actor框架主要是管理通信模块的并发和分布式执行的机制,而流处理主要是数据管理技术。

  • 参与者之间的交流往往是短暂的,而且是一对一的,而事件日志则是持久的,多用户的。
  • 参与者可以以任意方式进行通信(包括循环请求/响应模式),但流处理器通常设置在非循环流水线中,其中每个流是一个特定作业的输出,并且从一组明确定义的输入流派生。

也就是说,RPC类系统和流处理之间有一些交叉区域。例如,Apache Storm有一个称为分布式RPC的功能,它允许将用户查询分散到一系列也处理事件流的节点上;这些查询然后与来自输入流的事件交织,结果可以被汇总并发回给用户[78]。 (另请参阅“多分区数据处理”(第514页)。)

也可以使用actor框架来处理流。但是,很多这样的框架在崩溃的情况下不能保证消息的传递,所以这个过程不是容错的,除非你实现了额外的重试逻辑。

关于时间的推理

流处理器通常需要处理时间,特别是在用于分析目的的时候,频繁使用时间窗口,例如“过去五分钟的平均时间”。“最后五分钟”的含义似乎应该是未知的,大而清晰,但不幸的是这个概念是令人惊讶的棘手。

在批处理过程中,处理任务通过大量的历史事件迅速收缩。如果需要按时间分类,批处理需要查看每个事件中嵌入的时间戳。查看运行批处理的机器的系统时钟没有意义,因为处理运行的时间与事件实际发生的时间无关。

批处理可以在几分钟内读取一年的历史事件;在大多数情况下,感兴趣的时间表是历史的一年,而不是几分钟的处理。而且,在事件中使用时间戳允许处理确定性的:在相同的输入上再次运行相同的处理过程会得到相同的结果(请参阅“故障容错”在页面429)。

另一方面,许多流处理框架使用处理机器上的本地系统时钟(处理时间)来确定窗口[79]。这种方法具有简单的优点,事件创建和事件处理之间的延迟可以忽略不计。然而,如果存在任何显着的处理滞后,即处理可能比事件实际发生的时间显着晚,则会中断处理。

事件时间与处理时间

有许多原因可能会延迟处理:排队,网络故障(请参阅第267页的“不可靠的网络”),导致消息代理或处理器中出现争用的性能问题,重新启动流消费者或重新处理过去的事件(请参阅第451页的“重播旧消息”),或者在修复代码中的错误之后进行恢复。

而且,消息延迟还可能导致消息的不可预知的排序。例如,假设用户首先发出一个Web请求(由Web服务器A处理),然后发出第二个请求(由服务器B处理)。 A和B发出描述他们处理的请求的事件,但是B的事件在A的事件发生之前到达消息代理。现在,流处理器将首先看到B事件,然后看到A事件,即使它们实际上是以相反的顺序发生的。

如果有一个类比的话,可以考虑一下“星球大战”的电影:第四集于1977年发行,1980年第五集,1983年第六集,之后分别在1999年,2002年和2005年发行第一,二,三集,以及2015年的第七集【80】2。如果你按照他们出来的顺序观看电影,你处理电影的顺序与他们叙述的顺序是不一致的。 (情节编号就像事件时间戳一样,观看电影的日期就是处理时间。)作为人类,我们能够应对这种不连续性,但是流处理算法需要专门编写以适应这种情况时间安排和订购问题。

令人困惑的事件时间和处理时间导致错误的数据。例如,假设您有一个流处理器来测量请求率(计算每秒请求数)。如果您重新部署流处理器,则可能会关闭一分钟,并在事件恢复时处理积压的事件。如果您根据处理时间来衡量速率,那么看起来好像在处理积压时突然出现异常的请求高峰,而事实上请求的实际速率是稳定的(图11-7)。

图11-7 由于处理时间的窗口化,由于处理速率的变化而引入人为因素

知道什么时候你准备好了

从事件时间的角度来定义窗口时,一个棘手的问题是,当你收到特定窗口的所有事件,或者是否还有事件发生时,你永远无法确定。

例如,假设您将事件分组为一分钟的窗口,以便您可以统计每分钟的请求数。你已经计算了一些事件,这些事件的时间戳是在第37分钟的时间落下的,时间已经推移了。现在大部分的事件都在一小时的第38和第39分钟之内。你什么时候宣布你已经完成了第37分钟的窗口,并输出其计数器值?

在一段时间没有看到任何新的事件之后,您可以超时并宣布一个窗口,但仍然可能发生某些事件被缓存在另一台计算机上,由于网络中断而延迟。您需要能够处理窗口已经声明完成后到达的这样的滞留事件。大体上,你有两个选择[1]:

  1. 忽略这些零散的事件,因为它们在正常情况下可能只是一小部分事件。您可以将丢弃事件的数量作为度量标准进行跟踪,并在您开始丢弃大量数据时发出警报。
  2. 发布一个更正,更新的窗口与包含散兵队员的价值。您可能还需要收回以前的输出。

在某些情况下,可以使用特殊的消息来指示“从现在开始,不会有比t更早的时间戳的消息”,消费者可以使用它来触发窗口[81]。但是,如果不同机器上的多个生产者正在生成事件,每个事件都有自己的最小时间戳阈值,则消费者需要分别跟踪每个生产者。在这种情况下添加和删除生产者是比较棘手的。

你用的是什么时间?

当事件可以在系统中的多个点缓冲时,为事件分配时间戳更加困难。例如,考虑将使用率度量的事件报告给服务器的移动应用程序。该应用程序可能会在设备处于脱机状态时使用,在这种情况下,它将在设备上本地缓冲事件,并在下一次可用的互联网连接(可能是几小时甚至几天)时将它们发送到服务器。对于这个流的任何消费者来说,这些事件将显示为极其滞后的落后者。

在这种情况下,根据移动设备的本地时钟,事件的时间戳实际上应该是发生用户交互的时间。但是,用户控制的设备上的时钟通常是不可信的,因为它可能会被意外或故意设置为错误的时间(请参见“时钟同步和精度”(第269页))。服务器收到事件的时间(根据服务器的时钟)更可能是准确的,因为服务器在您的控制之下,但在描述用户交互方面意义不大。

要调整不正确的设备时钟,一种方法是记录三个时间戳[82]:

  • 事件发生的时间,根据设备时钟
  • 根据设备时钟将事件发送到服务器的时间
  • 服务器根据服务器时钟收到事件的时间

通过从第三个时间戳中减去第二个时间戳,可以估算设备时钟和服务器时钟之间的偏移量(假设网络延迟与所需的时间戳精度相比可忽略不计)。然后,您可以将该偏移量应用于事件时间戳,从而估计事件实际发生的真实时间(假设设备时钟偏移在事件发生的时间与发送到服务器的时间之间没有变化)。

这个问题对于流处理来说并不是唯一的,批处理遇到了与时间推理完全相同的问题。在一个流式环境中,我们更加注意到时间的流逝。

窗口的类型

一旦你知道如何确定一个事件的时间戳,下一步就是决定如何定义一段时间的窗口。窗口然后可以用于聚合,例如计数事件,或计算窗口内的值的平均值。有几种窗口是常用的[79,83]:

Tumbling窗口

一个翻滚的窗口有一个固定的长度,每个事件都属于一个窗口。例如,如果您有1分钟的翻滚窗口,则所有时间戳在10:03:00和10:03:59之间的事件会被分组到一个窗口中,10:04:00和10:04:59之间的事件下一个窗口,等等。您可以通过获取每个事件时间戳并将其四舍五入到最接近的分钟来确定它所属的窗口,从而实现1分钟的翻滚窗口。

Hopping窗

跳频窗口也具有固定的长度,但允许窗口重叠以提供一些平滑。例如,1分钟跳跃大小的5分钟窗口将包含10:03:00至10:07:59之间的事件,则下一个窗口将覆盖10:04:00至10:08之间的事件: 59,等等。您可以通过首先计算1分钟滚动窗口,然后聚合在几个相邻的窗口上来实现此跳频窗口。

滑动窗口

滑动窗口包含在彼此的某个间隔内发生的所有事件。例如,一个5分钟的滑动窗口将覆盖10点03分39秒和10点08分12秒的事件,因为它们相距不到5分钟(注意翻滚和跳跃的5分钟窗口不会把这两个事件在同一个窗口中,因为他们使用固定的边界)。滑动窗口可以通过保持按时间排序的事件缓冲区并在从窗口到期时移除旧事件来实现。

会话窗口

与其他窗口类型不同,会话窗口没有固定的持续时间。相反,它是通过将同一用户的所有事件分组在一起,并在时间上紧密地组合在一起来定义的,并且当用户在一段时间内不活动时(例如,如果30分钟内没有事件),窗口结束。会话化是网站分析的常见要求(请参阅第406页的“GROUP BY”)。

流式连接

在第10章中,我们讨论了批处理作业如何通过关键连接数据集,以及这种连接如何构成数据管道的重要组成部分。由于流处理将数据管道概括为对无界数据集进行增量处理,因此对流进行连接的需求也完全相同。

然而,新事件随时可能出现在一个流中,这使得加入流比批处理作业更具挑战性。为了更好地理解情况,我们来区分三种不同类型的连接:流 - 流连接,流表连接和表连接[84]。在下面的章节中,我们将通过例子来说明。 流 - 流连接(窗口连接)

假设您的网站上有搜索功能,并且想要检测搜索到的网址的近期趋势。每次有人输入搜索查询时,都会记录包含查询和返回结果的事件。每当有人点击其中一个搜索结果时,就会记录另一个记录点击的事件。为了计算搜索结果中每个网址的点击率,您需要将搜索操作和点击操作的事件组合在一起,这些事件通过具有相同的会话ID进行连接。广告系统需要类似的分析[85]。

如果用户放弃他们的搜索,点击可能永远不会到来,即使它到了,搜索和点击之间的时间可能是高度可变的:在很多情况下,它可能是几秒钟,但可能长达几天或几周(如果用户运行搜索,忘记关于该浏览器选项卡,然后返回到选项卡,稍后再单击一个结果)。由于可变的网络延迟,点击事件甚至可能在搜索事件之前到达。您可以选择合适的加入窗口,例如,如果间隔至多一小时发生一次搜索,您可以选择加入搜索。

请注意,在click事件中嵌入搜索的细节并不等同于加入事件:这样做只会告诉您有关用户单击搜索结果的情况,而不是用户未点击任何搜索结果的搜索结果。为了衡量搜索质量,您需要准确的点击率,为此您需要搜索事件和点击事件。

为了实现这种类型的连接,流处理器需要维护状态:例如,在最后一小时发生的所有事件,都由会话标识索引。无论何时发生搜索事件或点击事件,都会将其添加到适当的索引,并且流处理器还检查另一个索引,以查看是否已经到达同一会话ID的另一个事件。如果有匹配的事件,则发出说明哪个搜索结果被点击的事件。如果搜索事件过期而没有看到匹配的点击事件,则会发出说明哪些搜索结果未被点击的事件。

流表连接(stream enrichment)

在第404页的“示例:用户活动事件分析”(图10-2)中,我们看到了加入两个数据集的批量作业示例:一组用户活动事件和一个用户配置文件数据库。将用户活动事件视为流,并在流处理器中连续执行相同的连接是很自然的:输入是

包含用户ID的活动事件流,并且输出是活动事件流,其中用户ID已经用关于用户的简档信息来扩充。这个过程有时被称为使用来自数据库的信息来丰富活动事件。

要执行此联接,流程过程需要一次查看一个活动事件,在数据库中查找事件的用户标识,并将该概要信息添加到活动事件中。数据库查询可以通过查询远程数据库来实现;但是,正如在“示例:分析用户活动事件”一节中讨论的,此类远程查询可能会很慢并且有可能导致数据库过载[75]。 另一种方法是将数据库副本加载到流处理器中,以便在本地进行查询而无需网络往返。这种技术与我们在408页的“Map-Side连接”中讨论的散列连接非常相似:如果数据库的本地副本足够小,则本地副本可能是内存中的散列表,或者是本地磁盘上的索引。 与批处理作业的区别在于批处理作业使用数据库的时间点快照作为输入,而流处理器是长时间运行的,并且数据库的内容可能随时间而改变,所以流处理器数据库的本地副本需要保持最新。这个问题可以通过更改数据捕获来解决:流处理器可以订阅用户配置文件数据库的更新日志以及活动事件流。在创建或修改配置文件时,流处理器会更新其本地副本。因此,我们获得两个流之间的连接:活动事件和配置文件更新。 流表连接实际上非常类似于流 - 流连接;最大的区别在于对于表changelog流,连接使用一个可以回溯到“开始时间”(概念上是无限的窗口)的窗口,新版本的记录会覆盖较早的版本。对于流输入,连接可能根本没有维护窗口。

表格表连接(物化视图维护)

考虑我们在第11页的“描述负载”中讨论的Twitter时间线示例。我们说过,当用户想要查看他们的家庭时间线时,对用户所关注的所有人进行迭代是非常昂贵的,推文,并合并它们。 相反,我们需要一个时间线缓存:一种每个用户的“收件箱”,在发送tweets的时候写入这些信息,以便读取时间线是一次查询。实现和维护此缓存需要以下事件处理:

  • 当用户发送新的推文时,它将被添加到每个跟随你的用户的时间线上。
  • 用户删除推文时,将从所有用户的时间表中删除。
  • 当用户u1开始跟随用户u2时,u2最近的tweets被添加到u1的时间线上。
  • 当用户u1取消关注用户u2时,u1的推文将从u1的时间线中移除。

要在流处理器中实现这种缓存维护,需要用于推文(发送和删除)和跟随关系(跟随和取消跟随)的事件流。流过程需要维护一个包含每个用户关注者集合的数据库,以便知道当一个新的tweet到达时需要更新哪些时间轴[86]。 查看这个流过程的另一种方式是维护一个连接两个表(tweet和follow)的查询的物化视图,如下所示:

SELECT follows.follower_id AS timeline_id, 
    array_agg(tweets.* ORDER BY tweets.timestamp DESC)
FROM tweets
JOIN follows ON follows.followee_id = tweets.sender_id 
GROUP BY follows.follower_id

流的连接直接对应于该查询中的表的连接。时间轴实际上是这个查询结果的缓存,每当基础表发生变化时都会更新3

连接的时间依赖性

这就产生了一个问题:如果不同的事件发生在相似的时间周围,他们按照何种顺序进行处理?在流表连接示例中,如果用户更新其配置文件,哪些活动事件与旧配置文件(在配置文件更新之前处理)结合,哪些与新配置文件结合(在配置文件更新之后处理)?换句话说:如果状态随着时间的推移而改变,并且你加入了某个状态,那么你使用什么时间点来加入[45]?

这种时间依赖性可能发生在许多地方。例如,如果您销售东西,则需要对发票进行适当的税率,这取决于国家或州,产品类型和销售日期(因为税率会随时变化)。将销售额加入税率表时,如果您正在重新处理历史数据,您可能希望加入销售时的税率,这可能与当前的税率不同。

如果跨流的事件排序是未确定的,那么这个连接变得不确定[87],这意味着你不能在相同的输入上重新运行相同的工作,并且必然会得到相同的结果:输入流上的事件可能交织在当你再次运行这个工作时,采用不同的方式

在数据仓库中,这个问题被称为缓慢变化的维度(SCD),通常通过对特定版本的联合记录使用唯一的标识符来解决:例如,每当税率改变时,新的标识符,并且发票包括销售时的税率标识符[88,89]。这种变化使连接成为确定性的,但是由于表中所有记录的版本都需要保留,导致日志压缩是不可能的。

容错

在本章的最后一节中,让我们考虑流处理器如何容忍错误。我们在第10章中看到,批处理框架可以很容易地容忍错误:如果MapReduce作业中的任务失败,可以简单地在另一台机器上重新启动,并且丢弃失败任务的输出。这种透明的重试是可能的,因为输入文件是不可变的,每个任务都将其输出写入到HDFS上的单独文件,并且输出仅在任务成功完成时可见。

特别是,批处理容错方法可确保批处理作业的输出与没有出错的情况相同,即使事实上某些任务失败了。看起来好像每个输入记录都被处理了一次 - 没有记录被跳过,而且没有处理两次。尽管重新启动任务意味着实际上可能会多次处理记录,但输出中的可见效果好像只处理过一次。这个原则被称为一次语义学,虽然有效 - 一次将是一个更具描述性的术语[90]。

在流处理过程中也出现了同样的容错问题,但是处理起来不那么直观:等到某个任务完成之后才使其输出可见,因为流是无限的,因此您永远无法完成处理。

小批量和检查点

一个解决方案是将流分解成小块,并像小型批处理一样处理每个块。这种方法被称为microbatching,它被用于Spark Streaming [91]。批处理大小通常约为1秒,这是性能折中的结果:较小的批次会导致更大的调度和协调开销,而较大的批次意味着流处理器的结果变得可见之前的较长延迟。

微缩也隐含地提供了与批量大小相等的翻滚窗口(通过处理时间而不是事件时间戳)。任何需要更大窗口的作业都需要明确地将状态从一个微阵列转移到下一个微阵列。

Apache Flink中使用的一种变体方法是定期生成状态滚动检查点并将其写入持久存储器[92,93]。如果流操作符崩溃,它可以从最近的检查点重新启动,并放弃在最后一个检查点和崩溃之间生成的任何输出。检查点由消息流中的条形码触发,类似于微型图形之间的边界,但不强制特定的窗口大小。

在流处理框架的范围内,微观网格化和检查点方法提供了与批处理一样的一次语义。但是,只要输出离开流处理器(例如,通过写入数据库,向外部消息代理发送消息或发送电子邮件),框架将不再能够放弃失败批处理的输出。在这种情况下,重新启动失败的任务会导致外部副作用发生两次,单独使用微配量或检查点不足以防止此问题。

原子提交重访

为了在出现故障时给出精确的一次处理,我们需要确保处理事件的所有输出和副作用只有当处理成功时才会生效。这些影响包括发送给下游运营商或外部消息传递系统(包括电子邮件或推送通知)的任何消息,任何数据库写入,对运营商状态的任何更改以及对输入消息的任何确认(包括将消费者偏移量向前移动基于日志的消息代理)。

这些事情要么都是原子地发生,要么都不发生,但是不应该彼此不同步。如果这种方法听起来很熟悉,那是因为我们在分布式事务和两阶段提交的情况下,在第360页的“准确一次的消息处理”中讨论了它。

在第9章中,我们讨论了分布式交易(如XA)的传统实现中的问题。然而,在更受限制的环境中,可以有效地实现这样的原子提交设施。 Google云数据流[81,92]和VoltDB [94]中使用了这种方法,并计划在Apache Kafka [95,96]中添加类似的功能。与XA不同,这些实现不会尝试跨异构技术提供事务,而是通过在流处理框架中管理状态更改和消息传递来保持内部事务。事务协议的开销可以通过在单个事务中处理几个输入消息来分摊。

幂等

我们的目标是放弃任何失败的任务的部分输出,以便他们可以安全地重试,而不会两次生效。分布式事务是实现这一目标的一种方式,但另一种方式是依赖幂等性[97]。

幂等操作是可以多次执行的操作,并且与只执行一次操作具有相同的效果。例如,将键值存储中的某个键设置为某个固定值是幂等的(再次写入该值会覆盖具有相同值的值),而递增计数器不是幂等的(再次执行递增意味着该值递增两次)。

即使一个操作不是天生的幂等,它往往可以与一些额外的元数据幂等。例如,在使用来自卡夫卡的消息时,每条消息都有一个持续的,单调递增的偏移量。将值写入外部数据库时,可以将触发上次写入的消息的偏移量与值包含在一起。因此,您可以判断是否已应用更新,并避免再次执行相同的更新。

风暴三叉戟的状态处理基于类似的想法[78]。依赖幂等性意味着一些假设:重启一个失败的任务必须以相同的顺序重播相同的消息(一个基于日志的消息代理这样做),处理必须是确定性的,其他节点不能同时更新相同的值[ 98,99]。

当从一个处理节点故障转移到另一个处理节点时,可能需要进行防护(请参阅第291页上的“领导和锁定”),以防止被认为是死的节点的干扰

失败后重建状态

任何需要状态的流进程(例如,任何窗口聚合(例如计数器,平均值和直方图)以及用于连接的任何表和索引)都必须确保在失败之后可以恢复此状态。

一种选择是将状态保持在远程数据存储中并复制它,尽管如每个单独消息的远程数据库查询速度可能会很慢,正如在“流表加入(第473页)”中所述。另一种方法是保持流处理器的本地状态,并定期复制。然后,当流处理器从故障中恢复时,新任务可以读取复制状态并恢复处理而不丢失数据。

例如,Flink定期捕获操作员状态的快照,并将它们写入HDFS等持久存储器中[92,93]。 Samza和Kafka Streams通过将状态更改发送到具有日志压缩功能的专用Kafka主题来复制状态更改,这类似于更改数据捕获[84,100]。 VoltDB通过冗余处理多个节点上的每个输入消息来复制状态(请参阅第252页的“实际的串行执行”)。

在某些情况下,甚至可能不需要复制状态,因为它可以从输入流重建。例如,如果状态由一个相当短的窗口中的聚合组成,则它可能足够快,以便重放与该窗口相对应的输入事件。如果状态是通过更改数据捕获维护的数据库的本地副本,那么也可以从日志压缩的更改流重建数据库(请参阅“日志压缩”一节第456页)。

但是,所有这些权衡取决于底层基础架构的性能特征:在某些系统中,网络延迟可能低于磁盘访问延迟,网络带宽可能与磁盘带宽相当。在所有情况下都没有普遍理想的权衡,随着存储和网络技术的发展,本地和远程状态的优点也可能会发生变化。

本章小结

在本章中,我们讨论了事件流,他们所服务的目的以及如何处理它们。在某些方面,流处理非常类似于我们在第10章讨论的批处理,而是在无限的(永无止境的)流而不是固定大小的输入上持续进行。从这个角度来看,消息代理和事件日志可以作为文件系统的流媒体。

我们花了一些时间比较两种消息代理:

AMQP/JMS风格的消息代理 ​ 经纪人将个人消息分配给消费者,消费者在成功处理个人消息时确认消息。消息被确认后从代理中删除。这种方法适合作为RPC的异步形式(另请参阅第136页的“消息传递数据流”),例如在任务队列中,消息处理的确切顺序并不重要,没有在处理之后,需要重新读取旧消息。

基于日志的消息代理

​ 代理将分区中的所有消息分配给相同的使用者节点,并始终以相同的顺序传递消息。并行性是通过划分来实现的,消费者通过检查他们所处理的最后一个消息的偏移来跟踪他们的进度。代理将消息保留在磁盘上,因此如有必要,可以跳回并重新读取旧消息。

基于日志的方法与数据库中的复制日志(参见第5章)和日志结构存储引擎(请参阅第3章)具有相似之处。我们看到,这种方法特别适用于消耗输入流并生成派生状态或派生输出流的流处理系统。

就流的来源而言,我们讨论了几种可能性:用户活动事件,提供定期读数的传感器和数据馈送(例如金融市场数据)自然地表示为流。我们看到,将数据写入数据流也是有用的:我们可以捕获更改日志 - 即对数据库所做的所有更改的历史记录 - 隐式地通过更改数据捕获或通过事件明确地捕获采购。日志压缩允许流保留数据库内容的完整副本。

将数据库表示为流为系统集成提供了强大的机会。您可以通过使用更改日志并将其应用于派生系统,使派生的数据系统(如搜索索引,缓存和分析系统)保持最新。您甚至可以从头开始,从开始一直到现在消耗更改的日志,从而为现有数据构建新的视图。

将状态保持为流并重放消息的设施也是在各种流处理框架中实现流连接和容错的技术的基础。我们讨论了流处理的几个目的,包括搜索事件模式(复杂事件处理),计算加窗聚合(流分析)以及保持派生数据系统处于最新状态(材料化视图)。

然后我们讨论了在流处理器中推理时间的困难,包括处理时间和事件时间戳之间的区别,以及在你认为窗口完成之后处理到达的离散事件的问题。

我们区分了可能出现在流程中的三种类型的连接:

流式流连接

​ 两个输入流由活动事件组成,并且连接操作符搜索在某个时间窗口内发生的相关事件。例如,它可以匹配相同用户在30分钟内采取的两个动作。如果您想要在一个流中查找相关事件,则两个连接输入实际上可以是相同的流(自连接)。

流表连接

​ 一个输入流由活动事件组成,另一个输入流是数据库更改日志。更新日志保持数据库的本地副本最新。对于每个活动事件,连接运算符将查询数据库并输出一个丰富的活动事件。

表格连接

​ 两个输入流都是数据库更新日志。在这种情况下,一方的每一个变化都与另一方的最新状态相结合。结果是对两个表之间的连接的物化视图进行了一系列更改。

最后,我们讨论了在流处理器中实现容错和一次语义的技术。与批处理一样,我们需要放弃任何失败任务的部分输出。然而,由于流程长时间运行并持续产生输出,所以不能简单地丢弃所有的输出。相反,可以使用更细粒度的恢复机制,基于微博,检查点,事务或幂等写入。

参考文献

  1. Tyler Akidau, Robert Bradshaw, Craig Chambers, et al.: “The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale, Unbounded, Out-of-Order Data Processing,” Proceedings of the VLDB Endowment, volume 8, number 12, pages 1792–1803, August 2015. doi:10.14778/2824032.2824076

  2. Harold Abelson, Gerald Jay Sussman, and Julie Sussman: Structure and Interpretation of Computer Programs, 2nd edition. MIT Press, 1996. ISBN: 978-0-262-51087-5, available online at mitpress.mit.edu

  3. Patrick Th. Eugster, Pascal A. Felber, Rachid Guerraoui, and Anne-Marie Kermarrec: “The Many Faces of Publish/Subscribe,” ACM Computing Surveys, volume 35, number 2, pages 114–131, June 2003. doi:10.1145/857076.857078

  4. Joseph M. Hellerstein and Michael Stonebraker: Readings in Database Systems, 4th edition. MIT Press, 2005. ISBN: 978-0-262-69314-1, available online at redbook.cs.berkeley.edu

  5. Don Carney, Uğur Çetintemel, Mitch Cherniack, et al.: “Monitoring Streams – A New Class of Data Management Applications,” at 28th International Conference on Very Large Data Bases (VLDB), August 2002.

  6. Matthew Sackman: “Pushing Back,” lshift.net, May 5, 2016.

  7. Vicent Martí: “Brubeck, a statsd-Compatible Metrics Aggregator,” githubengineering.com, June 15, 2015.

  8. Seth Lowenberger: “MoldUDP64 Protocol Specification V 1.00,” nasdaqtrader.com, July 2009.

  9. Pieter Hintjens: ZeroMQ – The Guide. O'Reilly Media, 2013. ISBN: 978-1-449-33404-8

  10. Ian Malpass: “Measure Anything, Measure Everything,” codeascraft.com, February 15, 2011.

  11. Dieter Plaetinck: “25 Graphite, Grafana and statsd Gotchas,” blog.raintank.io, March 3, 2016.

  12. Jeff Lindsay: “Web Hooks to Revolutionize the Web,” progrium.com, May 3, 2007.

  13. Jim N. Gray: “Queues Are Databases,” Microsoft Research Technical Report MSR-TR-95-56, December 1995.

  14. Mark Hapner, Rich Burridge, Rahul Sharma, et al.: “JSR-343 Java Message Service (JMS) 2.0 Specification,” jms-spec.java.net, March 2013.

  15. Sanjay Aiyagari, Matthew Arrott, Mark Atwell, et al.: “AMQP: Advanced Message Queuing Protocol Specification,” Version 0-9-1, November 2008.

  16. Google Cloud Pub/Sub: A Google-Scale Messaging Service,” cloud.google.com, 2016.

  17. Apache Kafka 0.9 Documentation,” kafka.apache.org, November 2015.

  18. Jay Kreps, Neha Narkhede, and Jun Rao: “Kafka: A Distributed Messaging System for Log Processing,” at 6th International Workshop on Networking Meets Databases (NetDB), June 2011.

  19. Amazon Kinesis Streams Developer Guide,” docs.aws.amazon.com, April 2016.

  20. Leigh Stewart and Sijie Guo: “Building DistributedLog: Twitter’s High-Performance Replicated Log Service,” blog.twitter.com, September 16, 2015.

  21. DistributedLog Documentation,” Twitter, Inc., distributedlog.io, May 2016.

  22. Jay Kreps: “Benchmarking Apache Kafka: 2 Million Writes Per Second (On Three Cheap Machines),” engineering.linkedin.com, April 27, 2014.

  23. Kartik Paramasivam: “How We’re Improving and Advancing Kafka at LinkedIn,” engineering.linkedin.com, September 2, 2015.

  24. Jay Kreps: “The Log: What Every Software Engineer Should Know About Real-Time Data's Unifying Abstraction,” engineering.linkedin.com, December 16, 2013.

  25. Shirshanka Das, Chavdar Botev, Kapil Surlaker, et al.: “All Aboard the Databus!,” at 3rd ACM Symposium on Cloud Computing (SoCC), October 2012.

  26. Yogeshwer Sharma, Philippe Ajoux, Petchean Ang, et al.: “Wormhole: Reliable Pub-Sub to Support Geo-Replicated Internet Services,” at 12th USENIX Symposium on Networked Systems Design and Implementation (NSDI), May 2015.

  27. P. P. S. Narayan: “Sherpa Update,” developer.yahoo.com, June 8, .

  28. Martin Kleppmann: “Bottled Water: Real-Time Integration of PostgreSQL and Kafka,” martin.kleppmann.com, April 23, 2015.

  29. Ben Osheroff: “Introducing Maxwell, a mysql-to-kafka Binlog Processor,” developer.zendesk.com, August 20, 2015.

  30. Randall Hauch: “Debezium 0.2.1 Released,” debezium.io, June 10, 2016.

  31. Prem Santosh Udaya Shankar: “Streaming MySQL Tables in Real-Time to Kafka,” engineeringblog.yelp.com, August 1, 2016.

  32. Mongoriver,” Stripe, Inc., github.com, September 2014.

  33. Dan Harvey: “Change Data Capture with Mongo + Kafka,” at Hadoop Users Group UK, August 2015.

  34. Oracle GoldenGate 12c: Real-Time Access to Real-Time Information,” Oracle White Paper, March 2015.

  35. Oracle GoldenGate Fundamentals: How Oracle GoldenGate Works,” Oracle Corporation, youtube.com, November 2012.

  36. Slava Akhmechet: “Advancing the Realtime Web,” rethinkdb.com, January 27, 2015.

  37. Firebase Realtime Database Documentation,” Google, Inc., firebase.google.com, May 2016.

  38. Apache CouchDB 1.6 Documentation,” docs.couchdb.org, 2014.

  39. Matt DeBergalis: “Meteor 0.7.0: Scalable Database Queries Using MongoDB Oplog Instead of Poll-and-Diff,” info.meteor.com, December 17, 2013.

  40. Chapter 15. Importing and Exporting Live Data,” VoltDB 6.4 User Manual, docs.voltdb.com, June 2016.

  41. Neha Narkhede: “Announcing Kafka Connect: Building Large-Scale Low-Latency Data Pipelines,” confluent.io, February 18, 2016.

  42. Greg Young: “CQRS and Event Sourcing,” at Code on the Beach, August 2014.

  43. Martin Fowler: “Event Sourcing,” martinfowler.com, December 12, 2005.

  44. Vaughn Vernon: Implementing Domain-Driven Design. Addison-Wesley Professional, 2013. ISBN: 978-0-321-83457-7

  45. H. V. Jagadish, Inderpal Singh Mumick, and Abraham Silberschatz: “View Maintenance Issues for the Chronicle Data Model,” at 14th ACM SIGACT-SIGMOD-SIGART Symposium on Principles of Database Systems (PODS), May 1995. doi:10.1145/212433.220201

  46. Event Store 3.5.0 Documentation,” Event Store LLP, docs.geteventstore.com, February 2016.

  47. Martin Kleppmann: Making Sense of Stream Processing. Report, O'Reilly Media, May 2016.

  48. Sander Mak: “Event-Sourced Architectures with Akka,” at JavaOne, September 2014.

  49. Julian Hyde: personal communication, June 2016.

  50. Ashish Gupta and Inderpal Singh Mumick: Materialized Views: Techniques, Implementations, and Applications. MIT Press, 1999. ISBN: 978-0-262-57122-7

  51. Timothy Griffin and Leonid Libkin: “Incremental Maintenance of Views with Duplicates,” at ACM International Conference on Management of Data (SIGMOD), May 1995. doi:10.1145/223784.223849

  52. Pat Helland: “Immutability Changes Everything,” at 7th Biennial Conference on Innovative Data Systems Research (CIDR), January 2015.

  53. Martin Kleppmann: “Accounting for Computer Scientists,” martin.kleppmann.com, March 7, 2011.

  54. Pat Helland: “Accountants Don't Use Erasers,” blogs.msdn.com, June 14, 2007.

  55. Fangjin Yang: “Dogfooding with Druid, Samza, and Kafka: Metametrics at Metamarkets,” metamarkets.com, June 3, 2015.

  56. Gavin Li, Jianqiu Lv, and Hang Qi: “Pistachio: Co-Locate the Data and Compute for Fastest Cloud Compute,” yahoohadoop.tumblr.com, April 13, 2015.

  57. Kartik Paramasivam: “Stream Processing Hard Problems – Part 1: Killing Lambda,” engineering.linkedin.com, June 27, 2016.

  58. Martin Fowler: “CQRS,” martinfowler.com, July 14, 2011.

  59. Greg Young: “CQRS Documents,” cqrs.files.wordpress.com, November 2010.

  60. Baron Schwartz: “Immutability, MVCC, and Garbage Collection,” xaprb.com, December 28, 2013.

  61. Daniel Eloff, Slava Akhmechet, Jay Kreps, et al.: "Re: Turning the Database Inside-out with Apache Samza," Hacker News discussion, news.ycombinator.com, March 4, 2015.

  62. Datomic Development Resources: Excision,” Cognitect, Inc., docs.datomic.com.

  63. Fossil Documentation: Deleting Content from Fossil,” fossil-scm.org, 2016.

  64. Jay Kreps: “The irony of distributed systems is that data loss is really easy but deleting data is surprisingly hard,twitter.com, March 30, 2015.

  65. David C. Luckham: “What’s the Difference Between ESP and CEP?,” complexevents.com, August 1, 2006.

  66. Srinath Perera: “How Is Stream Processing and Complex Event Processing (CEP) Different?,” quora.com, December 3, 2015.

  67. Arvind Arasu, Shivnath Babu, and Jennifer Widom: “The CQL Continuous Query Language: Semantic Foundations and Query Execution,” The VLDB Journal, volume 15, number 2, pages 121–142, June 2006. doi:10.1007/s00778-004-0147-z

  68. Julian Hyde: “Data in Flight: How Streaming SQL Technology Can Help Solve the Web 2.0 Data Crunch,” ACM Queue, volume 7, number 11, December 2009. doi:10.1145/1661785.1667562

  69. Esper Reference, Version 5.4.0,” EsperTech, Inc., espertech.com, April 2016.

  70. Zubair Nabi, Eric Bouillet, Andrew Bainbridge, and Chris Thomas: “Of Streams and Storms,” IBM technical report, developer.ibm.com, April 2014.

  71. Milinda Pathirage, Julian Hyde, Yi Pan, and Beth Plale: “SamzaSQL: Scalable Fast Data Management with Streaming SQL,” at IEEE International Workshop on High-Performance Big Data Computing (HPBDC), May 2016. doi:10.1109/IPDPSW.2016.141

  72. Philippe Flajolet, Éric Fusy, Olivier Gandouet, and Frédéric Meunier: “HyperLo⁠g​Log: The Analysis of a Near-Optimal Cardinality Estimation Algorithm,” at Conference on Analysis of Algorithms (AofA), June 2007.

  73. Jay Kreps: “Questioning the Lambda Architecture,” oreilly.com, July 2, 2014.

  74. Ian Hellström: “An Overview of Apache Streaming Technologies,” databaseline.wordpress.com, March 12, 2016.

  75. Jay Kreps: “Why Local State Is a Fundamental Primitive in Stream Processing,” oreilly.com, July 31, 2014.

  76. Shay Banon: “Percolator,” elastic.co, February 8, 2011.

  77. Alan Woodward and Martin Kleppmann: “Real-Time Full-Text Search with Luwak and Samza,” martin.kleppmann.com, April 13, 2015.

  78. Apache Storm 1.0.1 Documentation,” storm.apache.org, May 2016.

  79. Tyler Akidau: “The World Beyond Batch: Streaming 102,” oreilly.com, January 20, 2016.

  80. Stephan Ewen: “Streaming Analytics with Apache Flink,” at Kafka Summit, April 2016.

  81. Tyler Akidau, Alex Balikov, Kaya Bekiroğlu, et al.: “MillWheel: Fault-Tolerant Stream Processing at Internet Scale,” at 39th International Conference on Very Large Data Bases (VLDB), August 2013.

  82. Alex Dean: “Improving Snowplow's Understanding of Time,” snowplowanalytics.com, September 15, 2015.

  83. Windowing (Azure Stream Analytics),” Microsoft Azure Reference, msdn.microsoft.com, April 2016.

  84. State Management,” Apache Samza 0.10 Documentation, samza.apache.org, December 2015.

  85. Rajagopal Ananthanarayanan, Venkatesh Basker, Sumit Das, et al.: “Photon: Fault-Tolerant and Scalable Joining of Continuous Data Streams,” at ACM International Conference on Management of Data (SIGMOD), June 2013. doi:10.1145/2463676.2465272

  86. Martin Kleppmann: “Samza Newsfeed Demo,” github.com, September 2014.

  87. Ben Kirwin: “Doing the Impossible: Exactly-Once Messaging Patterns in Kafka,” ben.kirw.in, November 28, 2014.

  88. Pat Helland: “Data on the Outside Versus Data on the Inside,” at 2nd Biennial Conference on Innovative Data Systems Research (CIDR), January 2005.

  89. Ralph Kimball and Margy Ross: The Data Warehouse Toolkit: The Definitive Guide to Dimensional Modeling, 3rd edition. John Wiley & Sons, 2013. ISBN: 978-1-118-53080-1

  90. Viktor Klang: “I'm coining the phrase 'effectively-once' for message processing with at-least-once + idempotent operations,” twitter.com, October 20, 2016.

  91. Matei Zaharia, Tathagata Das, Haoyuan Li, et al.: “Discretized Streams: An Efficient and Fault-Tolerant Model for Stream Processing on Large Clusters,” at 4th USENIX Conference in Hot Topics in Cloud Computing (HotCloud), June 2012.

  92. Kostas Tzoumas, Stephan Ewen, and Robert Metzger: “High-Throughput, Low-Latency, and Exactly-Once Stream Processing with Apache Flink,” data-artisans.com, August 5, 2015.

  93. Paris Carbone, Gyula Fóra, Stephan Ewen, et al.: “Lightweight Asynchronous Snapshots for Distributed Dataflows,” arXiv:1506.08603 [cs.DC], June 29, 2015.

  94. Ryan Betts and John Hugg: Fast Data: Smart and at Scale. Report, O'Reilly Media, October 2015.

  95. Flavio Junqueira: “Making Sense of Exactly-Once Semantics,” at Strata+Hadoop World London, June 2016.

  96. Jason Gustafson, Flavio Junqueira, Apurva Mehta, Sriram Subramanian, and Guozhang Wang: “KIP-98 – Exactly Once Delivery and Transactional Messaging,” cwiki.apache.org, November 2016.

  97. Pat Helland: “Idempotence Is Not a Medical Condition,” Communications of the ACM, volume 55, number 5, page 56, May 2012. doi:10.1145/2160718.2160734

  98. Jay Kreps: “Re: Trying to Achieve Deterministic Behavior on Recovery/Rewind,” email to samza-dev mailing list, September 9, 2014.

  99. E. N. (Mootaz) Elnozahy, Lorenzo Alvisi, Yi-Min Wang, and David B. Johnson: “A Survey of Rollback-Recovery Protocols in Message-Passing Systems,” ACM Computing Surveys, volume 34, number 3, pages 375–408, September 2002. doi:10.1145/568522.568525

  100. Adam Warski: “Kafka Streams – How Does It Fit the Stream Processing Landscape?,” softwaremill.com, June 1, 2016.


上一章 目录 下一章
第十章:批处理 设计数据密集型应用 第十二章:数据系统的未来

Footnotes

  1. 有可能创建一个负载均衡方案,在这个方案中,两个消费者通过读取全部消息来共享处理分区的工作,但是其中一个只考虑具有偶数偏移量的消息,而另一个消费者处理奇数编号的偏移量。或者,您可以将消息处理扩展到线程池,但这种方法会使消费者偏移管理变得复杂。一般来说,分区的单线程处理是可取的,并行分区可以通过使用更多的分区来增加。

  2. 感谢Flink社区的Kostas Kloudas提出这个比喻。

  3. 如果你把一个流视为一个表的衍生物,如图11-6所示,并且把一个连接看作是两个表u·v的乘积,那么会发生一些有趣的事情:物化连接的变化流遵循产品规则 u·v)'= u'v + uv'。 换句话说,任何tweets的变化都与当前的追随者联系在一起,任何追随者的变化都与当前的tweets [49,50]相结合。