共计 5686 个字符,预计需要花费 15 分钟才能阅读完成。
Storm 是一个开源的、大数据处理系统,与其他系统不同,它旨在用于分布式实时处理且与语言无关。了解更多请自己 google, 安装过程也请自己搜索。
做了一个简单的例子
package mapstorm;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
public class StormMain {
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(“word-reader”, new WordReader());
builder.setBolt(“word-normalizer”, new WordNormalizer()).shuffleGrouping(“word-reader”);
builder.setBolt(“word-counter”, new WordCounter(), 1).fieldsGrouping(“word-normalizer”, new Fields(“word”));
//Configuration
Config conf = new Config();
conf.put(“wordsFile”, args[0]);
conf.setDebug(true);
//conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
StormSubmitter.submitTopology(“wordCounterTopology”, conf, builder.createTopology());
// Thread.sleep(1000);
//StormSubmitter.(“wordCounterTopology”);
// StormSubmitter.shutdown();
//Topology run
//conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
//LocalCluster cluster = new LocalCluster();
//cluster.submitTopology(“Getting-Started-Toplogie”, conf, builder.createTopology());
//Thread.sleep(2000);
//cluster.shutdown();
//
}
}
—————————-
package mapstorm;
import Java.util.HashMap;
import java.util.Map;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
public class WordCounter extends BaseBasicBolt {
private static final long serialVersionUID = 5678586644899822142L;
Integer id;
String name;
Map<String, Integer> counters;
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
String str = input.getString(0);
System.out.println(“WordCounter word “+ str);
if(!counters.containsKey(str)){
counters.put(str, 1);
}else{
Integer c = counters.get(str) + 1;
counters.put(str, c);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {}
@Override
public void cleanup() {
System.out.println(“– Word Counter [“+name+”-“+id+”] –“);
for(Map.Entry<String, Integer> entry : counters.entrySet()){
System.out.println(entry.getKey()+”: “+entry.getValue());
}
System.out.println(“finish———–“);
}
@Override
public void prepare(Map stormConf, TopologyContext context) {
this.counters = new HashMap<String, Integer>();
this.name = context.getThisComponentId();
this.id = context.getThisTaskId();
}
}
———————————-
package mapstorm;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class WordNormalizer extends BaseBasicBolt {
public void cleanup() {
System.out.println(“finish”);
}
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
String sentence = input.getString(0);
String[] words = sentence.split(” “);
System.out.println(“WordNormalizer recevie “+ sentence);
for(String word : words){
word = word.trim();
if(!word.isEmpty()){
word = word.toLowerCase();
System.out.println(“WordNormalizer recevie “+ sentence+”words “+ word);
collector.emit(new Values(word));
}
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(“word”));
}
}
————————————-
package mapstorm;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class WordReader extends BaseRichSpout {
private SpoutOutputCollector collector;
private FileReader fileReader;
private String filePath;
private boolean completed = false;
public void ack(Object msgId) {
System.out.println(“OK:”+msgId);
}
public void close() {}
public void fail(Object msgId) {
System.out.println(“FAIL:”+msgId);
}
@Override
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
try {
this.fileReader = new FileReader(conf.get(“wordsFile”).toString());
} catch (FileNotFoundException e) {
throw new RuntimeException(“Error reading file [“+conf.get(“wordFile”)+”]”);
}
this.filePath = conf.get(“wordsFile”).toString();
this.collector = collector;
}
@Override
public void nextTuple() {
if(completed){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
return;
}
String str;
BufferedReader reader =new BufferedReader(fileReader);
try{
while((str = reader.readLine()) != null){
System.out.println(“WordReader read”+ str);
this.collector.emit(new Values(str),str);
System.out.println(“WordReader out”+ str);
}
}catch(Exception e){
throw new RuntimeException(“Error reading tuple”,e);
}finally{
completed = true;
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(“line”));
}
}
完成后打包成 storm.jar
通过 storm jar storm.jar mapstorm.StormMain /data/words.txt 即可启动运行.ps:words.txt 要分发到各 Supervisor 相应目录下。
可以通过 storm ui 页面看到 Topology 中多了一条任务。
如果要终止任务 storm kill name 即可,这里是 storm kill wordCounterTopology
推荐阅读:
Twitter Storm 安装配置(集群)笔记 http://www.linuxidc.com/Linux/2013-05/84307.htm
安装 Twitter Storm 集群 http://www.linuxidc.com/Linux/2012-07/66336.htm
Twitter Storm 安装配置(单机版)笔记 http://www.linuxidc.com/Linux/2013-05/84306.htm
Storm 实战及实例讲解一 http://www.linuxidc.com/Linux/2012-08/69146.htm