共计 2914 个字符,预计需要花费 8 分钟才能阅读完成。
这篇文章里面我们来看一下 Storm 里面的 tuple 到底是如何从一个 tuple 是怎么从一个 bolt 到另一个 bolt 上去的。
首先 Bolt 在发射一个 tuple 的时候是调用 OutputCollector 的 emit 或者 emitDirect 方法,
而这两个方法最终调用的是 clojure 代码里面的 mk-transfer-fn 方法:
1 2 3 4 5 6 | ;worker.clj ( defn mk-transfer- fn [ transfer-queue ]
( fn [ task ^Tuple tuple ]
(.put ^LinkedBlockingQueue
transfer-queue [ task tuple ] )
)) |
这个方法其实只是往一个 LinkedBlockingQueue 里面放入一条新记录 (task-id, tuple)
然后这个 queue 里面的内容会被下面这段代码处理
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | ; worker.clj ; 这里面的这个 socket 到底是什么东西? (async- loop
( fn [ ^ArrayList drainer
^KryoTupleSerializer serializer ]
; 从 transfer-queue 里面取出一个任务来
; 这个任务其实就是(task, tuple)
( let [ felem (. take transfer-queue) ]
(.add drainer felem)
(.drainTo transfer-queue drainer))
(read-locked endpoint-socket-lock
; 获取从 node+port 到 socket 的映射
( let [ node+port->socket @node+port->socket
; 获取从 task-id 到 node+port 的映射
task->node+port @task->node+port ]
( doseq [ [ task ^Tuple tuple ] drainer ]
; 获取 task 对应的 socket
( let [ socket
(node+port->socket
(task->node+port task))
; 序列化这个 tuple
ser-tuple (.serialize serializer tuple) ]
; 发送这个 tuple
(msg/ send socket task ser-tuple)
))
)) ) |
从上面代码可见,tuple 最终是被序列化之后由 msg/send 方法通过 socket 发送给指定的 task 的。注意上面代码里面的 async-loop
表示会创建一个单独的线程来执行这些代码。可以 storm 会起一个独立线程来专门发送待发送的消息的。
我们来看下这个 socket 到底是个怎么样的东西。这个 socket 是在 worker.clj 里面被初始化的,看下面的代码:
01 02 03 04 05 06 07 08 09 10 11 12 13 | ; socket(worker.clj) (swap! node+port->socket
merge
(into {}
(dofor
[ [ node port :as endpoint ] new -connections ]
[ endpoint
(msg/connect
mq-context
(( :node- >host assignment) node)
port)
]
))) |
从上面代码可以看出 socket 其实是 msg/connect 创建出来的。那 msg/connect 到底在做什么呢?这个方法是定义在 protocol.clj 里面的:
1 2 3 4 5 6 | (defprotocol Context
(bind [ context virtual-port ] )
(connect [ context host port ] )
( send -local-task-empty [ context virtual-port ] )
(term [ context ] )
) |
这里定义的只是一个接口而已,具体的实现是在 zmq.clj 里面。zmq 是 ZeroMQ 的缩写, 可见 storm 的 supervisor 之间就是利用 zeromq 来传递 tuple 的。
zmq.clj 里面的 ZMQCOntext 实现了 Context 接口:
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | (deftype ZMQContext [ context linger-ms ipc? ]
; 实现 Context 接口
Context
; 从给定的 virtual-port 拉消息
(bind [ this virtual-port ]
(-> context
(mq/socket mq/pull)
(mqvp/virtual-bind virtual-port)
(ZMQConnection.)
))
; 给给定的 host,port 推送消息(push)
(connect [ this host port ]
( let [ url ( if ipc?
( str "ipc://" port "ipc" )
( str "tcp://" host ":" port)) ]
(-> context
(mq/socket mq/push)
(mq/ set -linger linger-ms)
(mq/connect url)
(ZMQConnection.))))
; 给本地的 virtual-port 发送一条空消息
( send -local-task-empty [ this virtual-port ]
( let [ pusher
(-> context
(mq/socket mq/push)
(mqvp/virtual-connect virtual-port)) ]
(mq/ send pusher (mq/barr))
(.close pusher)))
(term [ this ]
(.term context))
; 实现 ZMQContextQuery 接口
ZMQContextQuery
(zmq-context [ this ]
context)) |
总结一些 Twitter Storm 对于 tuple 的处理 / 创建过程:
- Bolt 创建一个 tuple。
- Worker 把 tuple, 以及这个 tuple 要发送的地址 (task-id) 组成一个对象 (task-id, tuple) 放进待发送队列(LinkedBlockingQueue).
- 一个单独的线程(async-loop 所创建的线程)会取出发送队列里面的每个 tuple 来处理
- Worker 创建从当前 task 到目的 task 的 zeromq 连接。
- 序列化这个 tuple 并且通过这个 zeromq 的连接来发送这个 tuple。
推荐阅读:
Twitter Storm 安装配置(集群)笔记 http://www.linuxidc.com/Linux/2013-05/84307.htm
安装 Twitter Storm 集群 http://www.linuxidc.com/Linux/2012-07/66336.htm
Twitter Storm 安装配置(单机版)笔记 http://www.linuxidc.com/Linux/2013-05/84306.htm
Storm 实战及实例讲解一 http://www.linuxidc.com/Linux/2012-08/69146.htm