阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

Kafka-Storm 集成部署

213次阅读
没有评论

共计 4112 个字符,预计需要花费 11 分钟才能阅读完成。

前言

分布式实时计算的主要组件采用基于流式计算的 Apache Storm,而实时计算的数据源来自基础数据输入组件中的 Kafka,如何将 Kafka 的消息数据传入 Storm 就是本文讨论的内容。

0. 材料准备

  • 正常稳定运行的 Kafka 集群(版本:Kafka 0.8.2)
  • 正常稳定运行的 Storm 集群(版本:Storm 0.9.8)
  • Maven 3.x

1. Storm Topology 工程

Storm 的任务(Job)称为 Topology,为了处理实时计算任务,需要新建一个 Storm Topology 工程。由于 Kafka 的消息传输模式,所谓的 Kafka-Storm 集成部署实际上就是需要实现一个接收 Kafka 消息的 Spout 接口。幸运的是,最新的 Storm 官方版本中已经内置了可靠的 KafkaSpout,不需要再去手工编写,只需要将 KafkaSpout 配置为 Topology 的输入数据源即可。

2. Maven 配置

本项目工程基于 Maven 构建。

  • 需要配置的主要依赖
<dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka</artifactId>
            <version>0.9.3</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>0.9.3</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.8.2.1</version>
            <scope>provided</scope>
        </dependency>

注意:这里的依赖的 scope 均为“provided”

  • Maven 编译配置
<build>
        <finalName>storm-kafka-topology</finalName>
        <resources>
            <resource>
                <directory>src/main/resources</directory>
            </resource>
        </resources>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.7</source>
                    <target>1.7</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <finalName>${project.artifactId}-${project.version}-shade</finalName>
                    <filters>
                        <filter>
                            <artifact>*:*</artifact>
                            <excludes>
                                <exclude>META-INF/*.SF</exclude>
                                <exclude>META-INF/*.DSA</exclude>
                                <exclude>META-INF/*.RSA</exclude>
                            </excludes>
                        </filter>
                    </filters>
                    <artifactSet>
                        <excludes>
                            <exclude>log4j:log4j:jar:</exclude>
                        </excludes>
                    </artifactSet>
                    <transformers>
                        <transformer
                            implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
                        <transformer
                            implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                            <mainClass>storm.kafka.example.StormTopology</mainClass>
                        </transformer>
                    </transformers>
                </configuration>
            </plugin>
        </plugins>
    </build>

3. 实现 Topology

以下是 Topology 的一个简单示例(Java 版)。

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class StormTopology {
    // Topology 关闭命令(通过外部传入消息控制)
    public static boolean shutdown = false;

    public static void main(String[] args) {
        // 注册 ZooKeeper 主机 
        BrokerHosts brokerHosts = new ZkHosts(
                "hd182:2181,hd185:2181,hd128:2181");
        // 所接收 Kafka 的 topic 名称 
        String topic = "flumeTopic";
        // ZooKeeper 的注册 node 名称(注意:需要加“/”,否则 ZooKeeper 会无法识别)
        String zkRoot = "/kafkastorm";

        // 配置 Spout
        String spoutId = "myKafka";
        SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topic, zkRoot,
                spoutId);
        // 配置 Scheme(可选)
        spoutConfig.scheme = new SchemeAsMultiScheme(new SimpleMessageScheme());
        KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("kafka-spout", kafkaSpout);
        builder.setBolt("operator", new OperatorBolt())
                .shuffleGrouping("kafka-spout");

        Config conf = new Config();
        conf.setDebug(true);
        conf.setNumWorkers(3);

        // 测试环境采用 local mode 模式 
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("test", conf, builder.createTopology());
        while (!shutdown) {
            Utils.sleep(100);
        }
        cluster.killTopology("test");
        cluster.shutdown();
    }
}

由于一个 KafkaSpout 只能接收一个指定 topic 的消息数据,因此,在实际生产环境 Topology 的实现中需要根据业务需求配置 Spout 的个数。

4. 必要的依赖包

由于 Topology 工程的依赖均为“provided”的 scope,需要将涉及到的依赖 jar 包拷贝到 Storm 安装目录的 lib 文件夹下,包括:

  • kafka_2.10-0.8.2.1.jar
  • storm-kafka-0.9.3.jar
  • scala-library-2.10.4.jar
  • zookeeper-3.4.6.jar
  • curator-client-2.6.0.jar
  • curator-framework-2.6.0.jar
  • curator-recipes-2.6.0.jar
  • guava-16.0.1.jar
  • metrics-core-2.2.0.jar

5. 上线运行

向 Storm 集群提交任务,观察数据输出结果。另外,还可以在 Storm 的 UI 界面查看 Topology 内部组件运行状态(需要使用 Cluster 模式)。

相关阅读

分布式发布订阅消息系统 Kafka 架构设计 http://www.linuxidc.com/Linux/2013-11/92751.htm

Apache Kafka 代码实例 http://www.linuxidc.com/Linux/2013-11/92754.htm

Apache Kafka 教程笔记 http://www.linuxidc.com/Linux/2014-01/94682.htm

Apache kafka 原理与特性 (0.8V)  http://www.linuxidc.com/Linux/2014-09/107388.htm

Kafka 部署与代码实例  http://www.linuxidc.com/Linux/2014-09/107387.htm

Kafka 介绍和集群环境搭建  http://www.linuxidc.com/Linux/2014-09/107382.htm

Kafka 的详细介绍 :请点这里
Kafka 的下载地址 :请点这里

本文永久更新链接地址 :http://www.linuxidc.com/Linux/2016-03/129063.htm

正文完
星哥玩云-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-21发表,共计4112字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中