共计 6320 个字符,预计需要花费 16 分钟才能阅读完成。
导读 | Alibaba JStorm 是一个强大的企业级流式计算引擎,是 Apache Storm 的 4 倍性能,可以自由切换行模式或 mini-batch 模式,JStorm 不仅提供一个流式计算引擎,还提供实时计算的完整解决方案,涉及到更多的组件,如 jstorm-on-yarn, jstorm-on-docker, SQL Engine, Exactly-Once Framework 等等。 |
JStorm 是一个类似 Hadoop MapReduce 的系统,用户按照指定的接口实现一个任务,然后将这个任务递交给 JStorm 系统,JStorm 将这个任务跑起来,并且按 7 * 24 小时运行起来,一旦中间一个 Worker 发生意外故障,调度器立即分配一个新的 Worker 替换这个失效的 Worker。
因此,从应用的角度,JStorm 应用是一种遵守某种编程规范的分布式应用。从系统角度,JStorm 是一套类似 MapReduce 的调度系统。从数据的角度,JStorm 是一套基于流水线的消息处理机制。
实时计算现在是大数据领域中最火爆的一个方向,因为人们对数据的要求越来越高,实时性要求也越来越快,传统的 Hadoop MapReduce,逐渐满足不了需求,因此在这个领域需求不断。
JStorm | Hadoop | |
---|---|---|
角色 | Nimbus | JobTracker |
Supervisor | TaskTracker | |
Worker | Child | |
应用名称 | Topology | Job |
编程接口 | Spout/Bolt | Mapper/Reducer |
在 Storm 和 JStorm 出现以前,市面上出现很多实时计算引擎,但自 Storm 和 JStorm 出现后,基本上可以说一统江湖:究其优点:
- 开发非常迅速:接口简单,容易上手,只要遵守 Topology、Spout 和 Bolt 的编程规范即可开发出一个扩展性极好的应用,底层 RPC、Worker 之间冗余,数据分流之类的动作完全不用考虑
- 扩展性极好:当一级处理单元速度,直接配置一下并发数,即可线性扩展性能
- 健壮强:当 Worker 失效或机器出现故障时,自动分配新的 Worker 替换失效 Worker
- 数据准确性:可以采用 Ack 机制,保证数据不丢失。如果对精度有更多一步要求,采用事务机制,保证数据准确。
- 实时性高:JStorm 的设计偏向单行记录,因此,在时延较同类产品更低
JStorm 处理数据的方式是基于消息的流水线处理,因此特别 适合无状态计算,也就是计算单元的依赖的数据全部在接受的消息中可以找到,并且最好一个数据流不依赖另外一个数据流。
因此,常常用于:
- 日志分析,从日志中分析出特定的数据,并将分析的结果存入外部存储器如数据库。目前,主流日志分析技术就使用 JStorm 或 Storm
- 管道系统,将一个数据从一个系统传输到另外一个系统,比如将数据库同步到 Hadoop
- 消息转化器,将接受到的消息按照某种格式进行转化,存储到另外一个系统如消息中间件
- 统计分析器,从日志或消息中,提炼出某个字段,然后做 count 或 sum 计算,最后将统计值存入外部存储器。中间处理过程可能更复杂。
- 实时推荐系统,将推荐算法运行在 jstorm 中,达到秒级的推荐效果
首先,JStorm 有点类似于 Hadoop 的 MR(Map-Reduce),但是区别在于,hadoop 的 MR,提交到 hadoop 的 MR job,执行完就结束了,进程就退出了,而一个 JStorm 任务(JStorm 中称为 topology),是 7 *24 小时永远在运行的,除非用户主动 kill。
接下来是一张比较经典的 Storm 的大致的结构图(跟 JStorm 一样):
图中的水龙头(好吧,有点俗)就被称作 spout,闪电被称作 bolt。
在 JStorm 的 topology 中,有两种组件:spout 和 bolt。
# spout
spout 代表输入的数据源,这个数据源可以是任意的,比如说 kafaka,DB,HBase,甚至是 HDFS 等,JStorm 从这个数据源中不断地读取数据,然后发送到下游的 bolt 中进行处理。
# bolt
bolt 代表处理逻辑,bolt 收到消息之后,对消息做处理(即执行用户的业务逻辑),处理完以后,既可以将处理后的消息继续发送到下游的 bolt,这样会形成一个处理流水线(pipeline,不过更精确的应该是个有向图);也可以直接结束。
通常一个流水线的最后一个 bolt,会做一些数据的存储工作,比如将实时计算出来的数据写入 DB、HBase 等,以供前台业务进行查询和展现。
JStorm 框架对 spout 组件定义了一个接口:nextTuple,顾名思义,就是获取下一条消息。执行时,可以理解成 JStorm 框架会不停地调这个接口,以从数据源拉取数据并往 bolt 发送数据。
同时,bolt 组件定义了一个接口:execute,这个接口就是用户用来处理业务逻辑的地方。
每一个 topology,既可以有多个 spout,代表同时从多个数据源接收消息,也可以多个 bolt,来执行不同的业务逻辑。
接下来就是 topology 的调度和执行原理,对一个 topology,JStorm 最终会调度成一个或多个 worker,每个 worker 即为一个真正的操作系统执行进程,分布到一个集群的一台或者多台机器上并行执行。
而每个 worker 中,又可以有多个 task,分别代表一个执行线程。每个 task 就是上面提到的组件 (component) 的实现,要么是 spout 要么是 bolt。
用户在提交一个 topology 的时候,会指定以下的一些执行参数:
# 总 worker 数
即总的进程数。举例来说,我提交一个 topology,指定 worker 数为 3,那么最后可能会有 3 个进程在执行。之所以是可能,是因为根据配置,JStorm 有可能会添加内部的组件,如_acker 或者__topology_master(这两个组件都是特殊的 bolt),这样会导致最终执行的进程数大于用户指定的进程数。我们默认是如果用户设置的 worker 数小于 10 个,那么__topology_master 只是作为一个 task 存在,不独占 worker;如果用户设置的 worker 数量大于等于 10 个,那么__topology_master 作为一个 task 将独占一个 worker
# 每个 component 的并行度
上面提到每个 topology 都可以包含多个 spout 和 bolt,而每个 spout 和 bolt 都可以单独指定一个并行度 (parallelism),代表同时有多少个线程(task) 来执行这个 spout 或 bolt。
JStorm 中,每一个执行线程都有一个 task id,它从 1 开始递增,每一个 component 中的 task id 是连续的。
还是上面这个 topology,它包含一个 spout 和一个 bolt,spout 的并行度为 5,bolt 并行度为 10。那么我们最终会有 15 个线程来执行:5 个 spout 执行线程,10 个 bolt 执行线程。
这时 spout 的 task id 可能是 1~5,bolt 的 task id 可能是 6~15,之所以是可能,是因为 JStorm 在调度的时候,并不保证 task id 一定是从 spout 开始,然后到 bolt 的。但是同一个 component 中的 task id 一定是连续的。
# 每个 component 之间的关系
即用户需要去指定一个特定的 spout 发出的数据应该由哪些 bolt 来处理,或者说一个中间的 bolt,它发出的数据应该被下游哪些 bolt 处理。
还是以上面的 topology 为例,它们会分布在 3 个进程中。JStorm 使用了一种均匀的调度算法,因此在执行的时候,你会看到,每个进程分别都各有 5 个线程在执行。当然,由于 spout 是 5 个线程,不能均匀地分配到 3 个进程中,会出现一个进程只有 1 个 spout 线程的情况;同样地,也会出现一个进程中有 4 个 bolt 线程的情况。
在一个 topology 的运行过程中,如果一个进程(worker)挂掉了,JStorm 检测到之后,会不断尝试重启这个进程,这就是 7 *24 小时不间断执行的概念。
上面提到,spout 的消息会发送给特定的 bolt,bolt 也可以发送给其他的 bolt,那这之间是如何通信的呢?
首先,从 spout 发送消息的时候,JStorm 会计算出消息要发送的目标 task id 列表,然后看目标 task id 是在本进程中,还是其他进程中,如果是本进程中,那么就可以直接走进程内部通信(如直接将这个消息放入本进程中目标 task 的执行队列中);如果是跨进程,那么 JStorm 会使用 netty 来将消息发送到目标 task 中。
JStorm 是 7 *24 小时运行的,外部系统如果需要查询某个特定时间点的处理结果,并不会直接请求 JStorm(当然,DRPC 可以支持这种需求,但是性能并不是太好)。一般来说,在 JStorm 的 spout 或 bolt 中,都会有一个定时往外部存储写计算结果的逻辑,这样数据可以按照业务需求被实时或者近实时地存储起来,然后直接查询外部存储中的计算结果即可。
以上内容直接粘贴 JStorm 官网,切勿吐槽
# OS: CentOS 6.8 mininal
# host.ip: 10.1.1.78 aniutv-1
# host.ip: 10.1.1.80 aniutv-2
# host.ip: 10.1.1.97 aniutv-5
jstorm : /opt/jstorm(源码安装);
zookeeper : /opt/zookeeper(源码安装);
java : /usr/java/jdk1.7.0_79 (rpm 包安装)
zookeeper 集群参考(http://blog.csdn.net/wh211212/article/details/56014983)
zeromq 下载地址:http://zeromq.org/area:download/
下载 zeromq-4.2.1.tar.gz 到 /usr/local/src
cd /usr/local/src && tar -zxf zeromq-4.2.1.tar.gz -C /opt
cd /opt/zeromq-4.2.1 && ./configure && make && sudo make install && sudo ldconfig
cd /opt && git clone https://github.com/nathanmarz/jzmq.git
./autogen.sh && ./configure && make && make install
wget https://github.com/alibaba/jstorm/releases/download/2.1.1/jstorm-2.1.1.zip -P /usr/local/src
cd /usr/local/src && unzip jstorm-2.1.1.zip -d /opt
cd /opt && mv jstorm-2.1.1 jstorm
# mkdir /opt/jstorm/jstorm_data
echo '# jstorm env' >> ~/.bashrc
echo 'export JSTORM_HOME=/opt/jstorm' >> ~/.bashrc
echo 'export PATH=$PATH:$JSTORM_HOME/bin' >> ~/.bashrc
source ~/.bashrc
# JStorm 配置
sed -i /'storm.zookeeper.servers:/a\ -"10.1.1.78"' /opt/jstorm/conf/storm.yaml
sed -i /'storm.zookeeper.servers:/a\ -"10.1.1.80"' /opt/jstorm/conf/storm.yaml
sed -i /'storm.zookeeper.servers:/a\ -"10.1.1.97"' /opt/jstorm/conf/storm.yaml
sed -i /'storm.zookeeper.root/a\ nimbus.host:"10.1.1.78"' /opt/jstorm/conf/storm.yaml<>
配置项:
storm.zookeeper.servers: 表示 zookeeper 的地址;
nimbus.host: 表示 nimbus 的地址;
storm.zookeeper.root: 表示 JStorm 在 zookeeper 中的根目录,当多个 JStorm 共享一个 zookeeper 时,需要设置该选项,默认即为“/jstorm”;
storm.local.dir: 表示 JStorm 临时数据存放目录,需要保证 JStorm 程序对该目录有写权限;
java.library.path: Zeromq 和 java zeromq library 的安装目录,默认 ”/usr/local/lib:/opt/local/lib:/usr/lib”;
supervisor.slots.ports: 表示 Supervisor 提供的端口 Slot 列表,注意不要和其他端口发生冲突,默认是 68xx,而 Storm 的是 67xx;
topology.enable.classloader: false, 默认关闭 classloader,如果应用的 jar 与 JStorm 的依赖的 jar 发生冲突,比如应用使用 thrift9,但 jstorm 使用 thrift7 时,就需要打开 classloader。建议在集群级别上默认关闭,在具体需要隔离的 topology 上打开这个选项。
# 下面命令只需要在安装 jstorm_ui 和提交 jar 节点的机器上面执行即可
mkdir ~/.jstorm
cp -f $JSTORM_HOME/conf/storm.yaml ~/.jstorm
强制使用 tomcat7.0 或以上版本,切记拷贝~/.jstorm/storm.yaml, Web UI 可以和 Nimbus 在同一个节点上
mkdir ~/.jstorm
cp -f $JSTORM_HOME/conf/storm.yaml ~/.jstorm
下载 tomcat 7.x(以 apache-tomcat-7.0.37 为例)tar -xzf apache-tomcat-7.0.75.tar.gz
cd apache-tomcat-7.0.75
cd webapps
cp $JSTORM_HOME/jstorm-ui-2.1.1.war ./
mv ROOT ROOT.old
ln -s jstorm-ui-2.1.1 ROOT
# 另外不是 ln -s jstorm-ui-2.1.1.war ROOT 这个要小心
cd ../bin
./startup.sh
1. 在 nimbus 节点(10.1.1.78)上执行“nohup jstorm nimbus &”, 查看 $JSTORM_HOME/logs/nimbus.log 检查有无错误
2. 在 supervisor 节点 (10.1.1.78,10.1.1.80,10.1.1.97) 上执行“nohup jstorm supervisor &”, 查看 $JSTORM_HOME/logs/supervisor.log 检查有无错误
JStorm 集群启动成功截图如下:
# JStorm 集群安装问题总结
1、注意 /etc/hosts 设置,添加相对应的 ip hostname
2、设置 ssh 免密操作(此步骤在 zookeeper 集群完成)
3、注意各服务的环境变量设置