阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

Twitter Storm: Transactional Topolgoy简介

248次阅读
没有评论

共计 15414 个字符,预计需要花费 39 分钟才能阅读完成。

概述

Storm 通过保证每个 tuple 至少被处理一次来提供可靠的数据处理。关于这一点最常被问到的问题就是“既然 tuple 可能会被重写发射(replay), 那么我们怎么在 storm 上面做统计个数之类的事情呢?storm 有可能会重复计数吧?”

Storm 0.7.0 引入了 Transactional Topology, 它可以保证每个 tuple”被且仅被处理一次”, 这样你就可以实现一种非常准确,非常可扩展,并且高度容错方式来实现计数类应用。

跟 Distributed RPC 类似,transactional topology 其实不能算是 storm 的一个特性,它其实是用 storm 的底层原语 spout, bolt, topology, stream 等等抽象出来的一个特性。

这篇文章解释了事务性 topology 是怎样的一种抽象,怎样使用它的 api,同时也讨论了有关它实现的一些细节。

概念

让我们一步步地建立 transactional topology 的抽象。我们先提出一种最简单的抽象方式,然后一步步的完善改进,最后介绍 storm 代码里面所使用的抽象方式。

第一个设计:最简单的抽象方法

事务性 topology 背后的核心概念是要在处理数据的提供一个强顺序性。这种强顺序性最简单的表现、同时也是我们第一个设计就是:我们每次只处理一个 tuple,除非这个 tuple 处理成功,否则我们不去处理下一个 tuple。

每一个 tuple 都跟一个 transaction id 相关联。如果这个 tuple 处理失败了,然后需要重写发射,那么它会被重新发射 — 并且附着同样的 transaction id。这里说的 trasaction id 其实就是一个数字,来一个 tuple,它就递增一个。所以第一个 tuple 的 transaction id 是 1,第二个 tuple 的 transaction id 是 2,等等等等。

tuple 的强顺序性使得我们即使在 tuple 重发的时候也能够实现“一次而且只有一次”的语义。让我们看个例子:

比如你想统一个 stream 里面 tuple 的总数。那么为了保证统计数字的准确性,你在数据库里面不但要保存 tuple 的个数,还要保存这个数字所对应的最新的 transaction id。当你的代码要到数据库里面去更新这个数字的时候,你要判断只有当新的 transaction id 跟数据库里面保存的 transaction id 不一样的时候才去更新。考虑两种情况:

  • 数据库里面的 transaction id 跟当前的 transaction id 不一样:由于我们 transaction 的强顺序性,我们知道当前的 tuple 肯定没有统计在数据库里面。所以我们可以安全地递增这个数字,并且更新这个 transaction id.
  • 数据库里面的 transaction id 一样:那么我们知道当前 tuple 已经统计在数据库里面了,那么可以忽略这个更新。这个 tuple 肯定之前在更新了数据库之后,反馈给 storm 的时候失败了(ack 超时之类的)。

这个逻辑以及事务的强顺序性保证数据库里面的个数 (count) 即使在 tuple 被重发的时候也是准确的。这个主意(保存 count + transaction-id)是 Kafka 的开发者在这个设计文档里面提出来的。

更进一步来说,这个 topology 可以在一个事务里面更新很多不同的状态,并且可以到达”一次而且只有一次的逻辑”。如果有任何失败,那么已经成功的更新你再去更新它会忽略,失败的更新你去再次更新它则会接受。比如,如果你在处理一个 url 流,你可以更新每个 url 的转发次数,同时更新每个 domain 下 url 的转发次数。

这个简单设计有一个很大的问题,那就是你需要等待一个 tuple 完全处理成功之后才能去处理下一个 tuple。这个性能是非常差的。这个需要大量的数据库调用(只要每个 tuple 一个数据库调用), 而且这个设计也没有利用到 storm 的并行计算能力,所以它的可扩展能力是非常差的。

第二个设计

与每次只处理一个 tuple 的简单方案相比,一个更好的方案是每个 transaction 里面处理一批 tuple。所以如果你在做一个计数应用,那么你每次更新到总数里面的是这一整个 batch 的 tuple 数量。如果这个 batch 失败了,那么你重新 replay 这整个 batch。相应地,我们不是给每个 tuple 一个 transaction id 而是给整个 batch 一个 transaction id,batch 与 batch 之间的处理是强顺序性的,而 batch 内部是可以并行的。下面这个是设计图:

Twitter Storm: Transactional Topolgoy 简介

所以如果你每个 batch 处理 1000 个 tuple 的话,那么你的应用将会少 1000 倍的数据库调用。同时它利用了 storm 的并行计算能力(每个 batch 内部可以并行)

虽然这个设计比第一个设计好多了,它仍然不是一个完美的方案。topology 里面的 worker 会花费大量的时间等待计算的其它部分完成。比如看下面的这个计算。

Twitter Storm: Transactional Topolgoy 简介

在 bolt 1 完成它的处理之后,它需要等待剩下的 bolt 去处理当前 batch,直到发射下一个 batch。

第三个设计(storm 采用的设计)

一个我们需要意识到的比较重要的问题是,为了实现 transactional 的特性,在处理一批 tuples 的时候,不是所有的工作都需要强顺序性的。比如,当做一个全局计数应用的时候,整个计算可以分为两个部分。

  • 计算这个 batch 的局部数量。
  • 把这个 batch 的局部数量更新到数据库里面去。

其中第二步在多个 batch 之前需要保证强的顺序性,但是第一步并不许要,所以我们可以把第一步并行化。所以当第一个 batch 在更新它的个数进入数据库的时候,第 2 到 10 个 batch 可以开始计算它们的局部数量了。

Storm 通过把一个 batch 的计算分成两个阶段来实现上面所说的原理:

  • processing 阶段:这个阶段很多 batch 可以并行计算。
  • commit 阶段:这个阶段各个 batch 之间需要有强顺序性的保证。所以第二个 batch 必须要在第一个 batch 成功提交之后才能提交。

这两个阶段合起来称为一个 transaction。许多 batch 可以在 processing 阶段的任何时刻并行计算,但是只有一个 batch 可以处在 commit 阶段。如果一个 batch 在 processing 或者 commit 阶段有任何错误,那么整个 transaction 需要被 replay。

设计细节

当使用 Transactional Topologies 的时候,storm 为你做下面这些事情:

1) 管理状态: Storm 把所有实现 Transactional Topologies 所必须的状态保存在 zookeeper 里面。这包括当前 transaction id 以及定义每个 batch 的一些元数据。

2) 协调事务: Storm 帮你管理所有事情,以帮你决定在任何一个时间点是该 proccessing 还是该 committing。

3) 错误检测: Storm 利用 acking 框架来高效地检测什么时候一个 batch 被成功处理了,被成功提交了,或者失败了。Storm 然后会相应地 replay 对应的 batch。你不需要自己手动做任何 acking 或者 anchoring — storm 帮你搞定所有事情。

4) 内置的批处理 API: Storm 在普通 bolt 之上包装了一层 API 来提供对 tuple 的批处理支持。Storm 管理所有的协调工作,包括决定什么时候一个 bolt 接收到一个特定 transaction 的所有 tuple。Storm 同时也会自动清理每个 transaction 所产生的中间数据。

5) 最后,需要注意的一点是 Transactional Topologies 需要一个可以完全重发 (replay) 一个特定 batch 的消息的队列系统(Message Queue)。Kestrel 之类的技术做不到这一点。而 Apache 的 Kafka 对于这个需求来说是正合适的。storm-contrib 里面的 storm-kafka 实现了这个。

一个基本的例子

你可以通过使用 TransactionalTopologyBuilder 来创建 transactional topology. 下面就是一个 transactional topology 的定义,它的作用是计算输入流里面的 tuple 的个数。这段代码来自 storm-starter 里面的 TransactionalGlobalCount。

1
2
3
4
5
6
7
8
MemoryTransactionalSpout spout = new MemoryTransactionalSpout(
DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);
TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder(
"global-count", "spout", spout, 3);
builder.setBolt("partial-count", new BatchCount(), 5)
.shuffleGrouping("spout");
builder.setBolt("sum", new UpdateGlobalCount())
.globalGrouping("partial-count");

TransactionalTopologyBuilder接受如下的参数

  • 这个 transaction topology 的 id
  • spout 在整个 topology 里面的 id。
  • 一个 transactional spout。
  • 一个可选的这个 transactional spout 的并行度。

topology 的 id 是用来在 zookeeper 里面保存这个 topology 的当前进度的,所以如果你重启这个 topology,它可以接着前面的进度继续执行。

一个 transaction topology 里面有一个唯一的 TransactionalSpout, 这个 spout 是通过TransactionalTopologyBuilder 的构造函数来制定的。在这个例子里面,MemoryTransactionalSpout被用来从一个内存变量里面读取数据 (DATA)。第二个参数制定数据的 fields,第三个参数指定每个 batch 的最大 tuple 数量。关于如何自定义TransactionalSpout 我们会在后面介绍。

推荐阅读:

Twitter Storm 安装配置(集群)笔记 http://www.linuxidc.com/Linux/2013-05/84307.htm

安装 Twitter Storm 集群 http://www.linuxidc.com/Linux/2012-07/66336.htm

Twitter Storm 安装配置(单机版)笔记 http://www.linuxidc.com/Linux/2013-05/84306.htm

Storm 实战及实例讲解一 http://www.linuxidc.com/Linux/2012-08/69146.htm

现在说说 bolts。这个 topology 并行地计算 tuple 的总数量。第一个bolt:BatchBolt,随机地把输入 tuple 分给各个 task,然后各个 task 各自统计局部数量。第二个bolt:UpdateBlobalCount, 用全局 grouping 来从汇总这个 batch 的总的数量。然后再把总的数量更新到数据库里面去。

下面是 BatchCount 的定义:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public static class BatchCount extends BaseBatchBolt {
Object _id;
BatchOutputCollector _collector;
 
int _count = 0;
 
@Override
public void prepare(Map conf, TopologyContext context,
BatchOutputCollector collector, Object id) {
_collector = collector;
_id = id;
}
 
@Override
public void execute(Tuple tuple) {
_count++;
}
 
@Override
public void finishBatch() {
_collector.emit(new Values(_id, _count));
}
 
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "count"));
}
}

storm 会为每个 batch 创建这个一个 BatchCount 对象。而这些 BatchCount 是运行在 BatchBoltExecutor 里面的。而 BatchBoltExecutor 负责创建以及清理这个对象的实例。

这个对象的 prepare 方法接收如下参数:

  • 包含 storm config 信息的 map。
  • TopologyContext
  • OutputCollector
  • 这个 batch 的 id。而在 Transactional Topologies 里面,这个 id 则是一个 TransactionAttempt 对象。

这个 batch bolt 的抽象在 DRPC 里面也可以用,只是 id 的类型不一样而已。BatchBolt 其实真的接收一个 id 类型的参数 — 它是一个 java 模板类,所以如果你只是想在 transactioinal topology 里面使用这个 BatchBolt,你可以这样定义:

1
2
3
public abstract class BaseTransactionalBolt
extends BaseBatchBolt<TransactionAttempt> {
}

在 transaction topology 里面发射的所有的 tuple 都必须以 TransactionAttempt 作为第一个 field,然后 storm 可以根据这个 field 来判断哪些 tuple 属于一个 batch。所以你在发射 tuple 的时候需要满足这个条件。

TransactionAttempt包含两个值:一个 transaction id,一个 attempt id。transaction id 的作用就是我们上面介绍的对于每个 batch 是唯一的,而且不管这个 batchreplay 多少次都是一样的。attempt id 是对于每个 batch 唯一的一个 id,但是对于统一个 batch,它 replay 之后的 attempt id 跟 replay 之前就不一样了,我们可以把 attempt id 理解成 replay-times, storm 利用这个 id 来区别一个 batch 发射的 tuple 的不同版本。

transaction id 对于每个 batch 加一,所以第一个 batch 的 transaction id 是”1″, 第二个 batch 是”2″, 以此类推。

execute 方法会为 batch 里面的每个 tuple 执行一次,你应该把这个 batch 里面的状态保持在一个本地变量里面。对于这个例子来说,它在 execute 方法里面递增 tuple 的个数。

最后,当这个 bolt 接收到某个 batch 的所有的 tuple 之后,finishBatch 方法会被调用。这个例子里面的 BatchCount 类会在这个时候发射它的局部数量到它的输出流里面去。

下面是 UpdateGlobalCount 类的定义。

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public static class UpdateGlobalCount
extends BaseTransactionalBolt
implements ICommitter {
TransactionAttempt _attempt;
BatchOutputCollector _collector;
 
int _sum = 0;
 
@Override
public void prepare(Map conf,
TopologyContext context,
BatchOutputCollector collector,
TransactionAttempt attempt) {
_collector = collector;
_attempt = attempt;
}
 
@Override
public void execute(Tuple tuple) {
_sum+=tuple.getInteger(1);
}
 
@Override
public void finishBatch() {
Value val = DATABASE.get(GLOBAL_COUNT_KEY);
Value newval;
if(val == null ||
!val.txid.equals(_attempt.getTransactionId())) {
newval = new Value();
newval.txid = _attempt.getTransactionId();
if(val==null) {
newval.count = _sum;
} else {
newval.count = _sum + val.count;
}
DATABASE.put(GLOBAL_COUNT_KEY, newval);
} else {
newval = val;
}
_collector.emit(new Values(_attempt, newval.count));
}
 
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "sum"));
}
}

UpdateGlobalCount是 Transactional Topologies 相关的类,所以它继承自 BaseTransactionalBolt。在 execute 方法里面,UpdateGlobalCount 累积这个 batch 的计数,比较有趣的是 finishBatch 方法。

首先,注意这个 bolt 实现了 ICommitter 接口。这告诉 storm 要在这个事务的 commit 阶段调用 finishBatch 方法。所以对于 finishBatch 的调用会保证强顺序性(顺序就是 transaction id 的升序), 而相对来说 execute 方法在任何时候都可以执行,processing 或者 commit 阶段都可以。另外一种把 bolt 标识为 commiter 的方法是调用 TransactionalTopologyBuildersetCommiterBolt来添加 Bolt(而不是 setBolt)。

UpdateGlobalCount里面 finishBatch 方法的逻辑是首先从数据库中获取当前的值,并且把数据库里面的 transaction id 与当前这个 batch 的 transaction id 进行比较。如果他们一样,那么忽略这个 batch。否则把这个 batch 的结果加到总结果里面去,并且更新数据库。

关于 transactional topology 的更深入的例子可以卡看 storm-starter 里面的 TransactionalWords 类,这个类里面会在一个事务里面更新多个数据库。

Transactional Topology API

这一节介绍 Transaction topology API

Bolts

在一个 transactional topology 里面最多有三种类型的 bolt:

    • BasicBolt: 这个 bolt 不跟 batch 的 tuple 打交道,它只基于单个 tuple 的输入来发射新的 tuple。
    • BatchBolt: 这个 bolt 处理 batch 在一起的 tuples。对于每一个 tuple 调用 execute 方法。而在整个 batch 处理完成的时候调用 finishBatch 方法
    • 被标记成 Committer 的 BatchBolt: 和普通的 BatchBolt 的唯一的区别是 finishBatch 这个方法被调用的时机。作为 committer 的 BatchBolt 的 finishBatch 方法在 commit 阶段调用。一个 batch 的 commit 阶段由 storm 保证只在前一个 batch 成功提交之后才会执行。并且它会重试直到 topology 里面的所有 bolt 在 commit 完成提交。有两个方法可以让一个普通 BatchBolt 变成 committer: 1) 实现 ICommitter 接口 2) 通过 TransactionalTopologyBuilder 的 setCommitterBolt 方法把 BatchBolt 添加到 topology 里面去。

Processing phase vs. commit phase in bolts

为了搞清除 processing 阶段与 commit 阶段的区别,让我们看个例子:

Twitter Storm: Transactional Topolgoy 简介

在这个 topology 里面只有用红线标出来的是 committers。

在 processing 阶段,bolt A 会处理从 spout 发射出来的整个 batch。并且发射 tuple 给 bolt B 和 bolt C。Bolt B 是一个 committer,所以它会处理所有的 tuple,但是不会调用 finishBatch 方法。Bolt C 同样也不会调用 finishBatch 方法,它的原因是:它不知道它有没有从 Bolt B 接收到所有的 tuple。(因为 Bolt B 还在等着事务提交)最后 Bolt D 会接收到 Bolt C 在调用 execute 方法的时候发射的所有的 tuple。

当 batch 提交的时候,Bolt B 上的 finishBatch 被调用。Bolt C 现在可以判断它接收到了所有的 tuple,所以可以调用 finishBatch 了。最后 Bolt D 接收到了它的所有的 tuple 所以就调用 finishBatch 了。

要注意的是,虽然 Bolt D 是一个 committer, 它在接收到整个 batch 的 tuple 之后不需要等待第二个 commit 信号。因为它是在 commit 阶段接收到的整个 batch,它会调用 finishBatch 来完成整个事务。

Acking

注意,你不需要显式地去做任何的 acking 或者 anchoring。storm 在背后都做掉了。(storm 对 transactional topolgies 里面的 acking 机制进行了高度的优化)

Failing a transaction

在使用普通 bolt 的时候,你可以通过调用 OutputCollector 的 fail 方法来 fail 这个 tuple 所在的 tuple 树。由于 Transactional Topologies 把 acking 框架从用户的视野里面隐藏掉了,它提供一个不同的机制来 fail 一个 batch(从而使得这个 batch 被 replay)。只要抛出一个 FailedException 就可以了。跟普通的异常不一样,这个异常只会导致当前的 batch 被 replay, 而不会使整个进程 crash 掉。

Transactional spout

TransactionalSpout 接口跟普通的 Spout 接口完全不一样。一个 TransactionalSpout 的实现一个 batch 一个 batch 的 tuple,而且必须保证同一个 batch 的 transaction id 始终一样。

在 transactional topology 中运行的时候,transactional spout 看起来是这样的一个结构:

Twitter Storm: Transactional Topolgoy 简介

在图的左边的 coordinator 是一个普通的 storm 的 spout — 它一直为事务的 batch 发射 tuple。Emitter 则像一个普通的 storm bolt,它负责为每个 batch 实际发射 tuple。emitter 以 all grouping 的方式订阅 coordinator 的”batch emit”流。

由于 TransactionalSpout 发射的 tuple 可能需要会被 replay,因此需要具有幂等性(否则多次 replay 同一个 tuple 会使得最后的结果不对),为了实现幂等性,需要保存 Transactional Spout 的少量的状态,这个状态是保存在 ZooKeeper 里面的。

关于如何实现一个 TransactionalSpout 的细节可以参见 Javadoc。

Partitioned Transactional Spout

一种常见的 TransactionalSpout 是那种从多个 queue broker 夺取数据然后再发射的 tuple。比如 TransactionalKafkaSpout 是这样工作的。IPartitionedTransactionalSpout把这些管理每个分区的状态以保证可以 replay 的幂等性的工作都自动化掉了。更多可以参考 Javadoc

配置

Transactional Topologies 有两个重要的配置:

      • Zookeeper: 默认情况下,transactional topology 会把状态信息保存在主 zookeeper 里面(协调集群的那个)。你可以通过这两个配置来指定其它的 zookeeper:”transactional.zookeeper.servers”和“transactional.zookeeper.port“。
      • 同时活跃的 batch 数量:你必须设置同时处理的 batch 数量。你可以通过”topology.max.spout.pending”来指定,如果你不指定,默认是 1。

实现

Transactional Topologies 的实现是非常优雅的。管理提交协议,检测失败并且串行提交看起来很复杂,但是使用 storm 的原语来进行抽象是非常简单的。

    • transactional topology 里面的 spout 是一个子 topology, 它由一个 spout 和一个 bolt 组成。
      • spout 是协调者,它只包含一个 task。
      • bolt 是发射者
      • bolt 以 all grouping 的方式订阅协调者的输出。
      • 元数据的序列化用的是 kryo。
    • 协调者使用 acking 框架来决定什么时候一个 batch 被成功执行完成,然后去决定一个 batch 什么时候被成功提交。
    • 状态信息被以 RotatingTransactionalState 的形式保存在 zookeeper 里面了。
    • commiting bolts 以 all grouping 的方式订阅协调者的 commit 流。
    • CoordinatedBolt 被用来检测一个 bolt 是否收到了一个特定 batch 的所有 tuple。
      • 这一点上面跟 DRPC 里面是一样的。
      • 对于 commiting bolt 来说,他会一直等待,知道从 coordinator 的 commit 流里面接收到一个 tuple 之后,它才会调用 finishBatch 方法。
      • 所以在没有从 coordinator 的 commit 流接收到一个 tuple 之前,committing bolt 不可能调用 finishBolt 方法。

概述

Storm 通过保证每个 tuple 至少被处理一次来提供可靠的数据处理。关于这一点最常被问到的问题就是“既然 tuple 可能会被重写发射(replay), 那么我们怎么在 storm 上面做统计个数之类的事情呢?storm 有可能会重复计数吧?”

Storm 0.7.0 引入了 Transactional Topology, 它可以保证每个 tuple”被且仅被处理一次”, 这样你就可以实现一种非常准确,非常可扩展,并且高度容错方式来实现计数类应用。

跟 Distributed RPC 类似,transactional topology 其实不能算是 storm 的一个特性,它其实是用 storm 的底层原语 spout, bolt, topology, stream 等等抽象出来的一个特性。

这篇文章解释了事务性 topology 是怎样的一种抽象,怎样使用它的 api,同时也讨论了有关它实现的一些细节。

概念

让我们一步步地建立 transactional topology 的抽象。我们先提出一种最简单的抽象方式,然后一步步的完善改进,最后介绍 storm 代码里面所使用的抽象方式。

第一个设计:最简单的抽象方法

事务性 topology 背后的核心概念是要在处理数据的提供一个强顺序性。这种强顺序性最简单的表现、同时也是我们第一个设计就是:我们每次只处理一个 tuple,除非这个 tuple 处理成功,否则我们不去处理下一个 tuple。

每一个 tuple 都跟一个 transaction id 相关联。如果这个 tuple 处理失败了,然后需要重写发射,那么它会被重新发射 — 并且附着同样的 transaction id。这里说的 trasaction id 其实就是一个数字,来一个 tuple,它就递增一个。所以第一个 tuple 的 transaction id 是 1,第二个 tuple 的 transaction id 是 2,等等等等。

tuple 的强顺序性使得我们即使在 tuple 重发的时候也能够实现“一次而且只有一次”的语义。让我们看个例子:

比如你想统一个 stream 里面 tuple 的总数。那么为了保证统计数字的准确性,你在数据库里面不但要保存 tuple 的个数,还要保存这个数字所对应的最新的 transaction id。当你的代码要到数据库里面去更新这个数字的时候,你要判断只有当新的 transaction id 跟数据库里面保存的 transaction id 不一样的时候才去更新。考虑两种情况:

  • 数据库里面的 transaction id 跟当前的 transaction id 不一样:由于我们 transaction 的强顺序性,我们知道当前的 tuple 肯定没有统计在数据库里面。所以我们可以安全地递增这个数字,并且更新这个 transaction id.
  • 数据库里面的 transaction id 一样:那么我们知道当前 tuple 已经统计在数据库里面了,那么可以忽略这个更新。这个 tuple 肯定之前在更新了数据库之后,反馈给 storm 的时候失败了(ack 超时之类的)。

这个逻辑以及事务的强顺序性保证数据库里面的个数 (count) 即使在 tuple 被重发的时候也是准确的。这个主意(保存 count + transaction-id)是 Kafka 的开发者在这个设计文档里面提出来的。

更进一步来说,这个 topology 可以在一个事务里面更新很多不同的状态,并且可以到达”一次而且只有一次的逻辑”。如果有任何失败,那么已经成功的更新你再去更新它会忽略,失败的更新你去再次更新它则会接受。比如,如果你在处理一个 url 流,你可以更新每个 url 的转发次数,同时更新每个 domain 下 url 的转发次数。

这个简单设计有一个很大的问题,那就是你需要等待一个 tuple 完全处理成功之后才能去处理下一个 tuple。这个性能是非常差的。这个需要大量的数据库调用(只要每个 tuple 一个数据库调用), 而且这个设计也没有利用到 storm 的并行计算能力,所以它的可扩展能力是非常差的。

第二个设计

与每次只处理一个 tuple 的简单方案相比,一个更好的方案是每个 transaction 里面处理一批 tuple。所以如果你在做一个计数应用,那么你每次更新到总数里面的是这一整个 batch 的 tuple 数量。如果这个 batch 失败了,那么你重新 replay 这整个 batch。相应地,我们不是给每个 tuple 一个 transaction id 而是给整个 batch 一个 transaction id,batch 与 batch 之间的处理是强顺序性的,而 batch 内部是可以并行的。下面这个是设计图:

Twitter Storm: Transactional Topolgoy 简介

所以如果你每个 batch 处理 1000 个 tuple 的话,那么你的应用将会少 1000 倍的数据库调用。同时它利用了 storm 的并行计算能力(每个 batch 内部可以并行)

虽然这个设计比第一个设计好多了,它仍然不是一个完美的方案。topology 里面的 worker 会花费大量的时间等待计算的其它部分完成。比如看下面的这个计算。

Twitter Storm: Transactional Topolgoy 简介

在 bolt 1 完成它的处理之后,它需要等待剩下的 bolt 去处理当前 batch,直到发射下一个 batch。

第三个设计(storm 采用的设计)

一个我们需要意识到的比较重要的问题是,为了实现 transactional 的特性,在处理一批 tuples 的时候,不是所有的工作都需要强顺序性的。比如,当做一个全局计数应用的时候,整个计算可以分为两个部分。

  • 计算这个 batch 的局部数量。
  • 把这个 batch 的局部数量更新到数据库里面去。

其中第二步在多个 batch 之前需要保证强的顺序性,但是第一步并不许要,所以我们可以把第一步并行化。所以当第一个 batch 在更新它的个数进入数据库的时候,第 2 到 10 个 batch 可以开始计算它们的局部数量了。

Storm 通过把一个 batch 的计算分成两个阶段来实现上面所说的原理:

  • processing 阶段:这个阶段很多 batch 可以并行计算。
  • commit 阶段:这个阶段各个 batch 之间需要有强顺序性的保证。所以第二个 batch 必须要在第一个 batch 成功提交之后才能提交。

这两个阶段合起来称为一个 transaction。许多 batch 可以在 processing 阶段的任何时刻并行计算,但是只有一个 batch 可以处在 commit 阶段。如果一个 batch 在 processing 或者 commit 阶段有任何错误,那么整个 transaction 需要被 replay。

设计细节

当使用 Transactional Topologies 的时候,storm 为你做下面这些事情:

1) 管理状态: Storm 把所有实现 Transactional Topologies 所必须的状态保存在 zookeeper 里面。这包括当前 transaction id 以及定义每个 batch 的一些元数据。

2) 协调事务: Storm 帮你管理所有事情,以帮你决定在任何一个时间点是该 proccessing 还是该 committing。

3) 错误检测: Storm 利用 acking 框架来高效地检测什么时候一个 batch 被成功处理了,被成功提交了,或者失败了。Storm 然后会相应地 replay 对应的 batch。你不需要自己手动做任何 acking 或者 anchoring — storm 帮你搞定所有事情。

4) 内置的批处理 API: Storm 在普通 bolt 之上包装了一层 API 来提供对 tuple 的批处理支持。Storm 管理所有的协调工作,包括决定什么时候一个 bolt 接收到一个特定 transaction 的所有 tuple。Storm 同时也会自动清理每个 transaction 所产生的中间数据。

5) 最后,需要注意的一点是 Transactional Topologies 需要一个可以完全重发 (replay) 一个特定 batch 的消息的队列系统(Message Queue)。Kestrel 之类的技术做不到这一点。而 Apache 的 Kafka 对于这个需求来说是正合适的。storm-contrib 里面的 storm-kafka 实现了这个。

一个基本的例子

你可以通过使用 TransactionalTopologyBuilder 来创建 transactional topology. 下面就是一个 transactional topology 的定义,它的作用是计算输入流里面的 tuple 的个数。这段代码来自 storm-starter 里面的 TransactionalGlobalCount。

1
2
3
4
5
6
7
8
MemoryTransactionalSpout spout = new MemoryTransactionalSpout(
DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);
TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder(
"global-count", "spout", spout, 3);
builder.setBolt("partial-count", new BatchCount(), 5)
.shuffleGrouping("spout");
builder.setBolt("sum", new UpdateGlobalCount())
.globalGrouping("partial-count");

TransactionalTopologyBuilder接受如下的参数

  • 这个 transaction topology 的 id
  • spout 在整个 topology 里面的 id。
  • 一个 transactional spout。
  • 一个可选的这个 transactional spout 的并行度。

topology 的 id 是用来在 zookeeper 里面保存这个 topology 的当前进度的,所以如果你重启这个 topology,它可以接着前面的进度继续执行。

一个 transaction topology 里面有一个唯一的 TransactionalSpout, 这个 spout 是通过TransactionalTopologyBuilder 的构造函数来制定的。在这个例子里面,MemoryTransactionalSpout被用来从一个内存变量里面读取数据 (DATA)。第二个参数制定数据的 fields,第三个参数指定每个 batch 的最大 tuple 数量。关于如何自定义TransactionalSpout 我们会在后面介绍。

推荐阅读:

Twitter Storm 安装配置(集群)笔记 http://www.linuxidc.com/Linux/2013-05/84307.htm

安装 Twitter Storm 集群 http://www.linuxidc.com/Linux/2012-07/66336.htm

Twitter Storm 安装配置(单机版)笔记 http://www.linuxidc.com/Linux/2013-05/84306.htm

Storm 实战及实例讲解一 http://www.linuxidc.com/Linux/2012-08/69146.htm

正文完
星哥玩云-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-20发表,共计15414字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中