1.å¦ä½å¨eclipseè°è¯stormç¨åº
å¦ä½å¨eclipseè°è¯stormç¨åº
ä¸ãä»ç»
stormæä¾äºä¸¤ç§è¿è¡æ¨¡å¼ï¼æ¬å°æ¨¡å¼ååå¸å¼æ¨¡å¼ãæ¬å°æ¨¡å¼é对å¼åè°è¯storm topologiesé常æç¨ã
Storm has two modes of operation: local mode and distributed mode.
In local mode,订货平台 源码 Storm executes completely in process by simulating
worker nodes with threads. Local mode is useful for testing and
development of topologies
å 为å¤æ°ç¨åºå¼åè é½æ¯ä½¿ç¨windowsç³»ç»è¿è¡ç¨åºå¼åï¼å¦æå¨æ¬æºä¸å®è£ stormç¯å¢çæ åµä¸ï¼å¼åãè°è¯stormç¨åºãå¦æä½ æ£å¨ä¸ºæ¤é®é¢èç¦æ¼ï¼è¯·ä½¿ç¨æ¬ææä¾çæ¹æ³ã
äºãå®æ½æ¥éª¤
å¦ä½åºäºeclipse+mavenè°è¯stormç¨åºï¼æ¥éª¤å¦ä¸ï¼
1.æ建好å¼åç¯å¢ï¼eclipse+maven,æ¬äººä½¿ç¨çæ¯eclipse Kepler ä¸maven3.1.1ï¼
2.å建maven项ç®ï¼å¹¶ä¿®æ¹pom.xmlï¼å 容å¦pom.xmlï¼æºå¨èç½ï¼ä¸è½½æéçä¾èµjarï¼
Githubä¸çpom.xml,å¼å ¥çä¾èµå¤ªå¤ï¼æäºä¸éè¦ï¼
3. ç¼åstormç¨åºï¼æå®ä¸ºæ¬å°æ¨¡å¼è¿è¡ãæ¬ææä¾çç¨åºæ¯wordcount
éè¦çæ¯LocalCluster cluster = new LocalCluster();è¿ä¸å¥
Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(2);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep();
cluster.killTopology("test");
cluster.shutdown();
pom.xmlæ件
<project xmlns="mons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.2.1</version>
</dependency>
</dependencies>
</project>
stormç¨åº
package storm.starter;
import java.util.HashMap;
import java.util.Map;
import storm.starter.spout.RandomSentenceSpout;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
/
*** This topology demonstrates Storm's stream groupings and multilang
* capabilities.
*/
public class WordCountTopology {
public static class SplitSentence extends BaseBasicBolt {
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
try {
String msg = input.getString(0);
System.out.println(msg + "-------------------");
if (msg != null) {
String[] s = msg.split(" ");
for (String string : s) {
collector.emit(new Values(string));
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
public static class WordCount extends BaseBasicBolt {
Map<String, Integer> counts = new HashMap<String, Integer>();
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String word = tuple.getString(0);
Integer count = counts.get(word);
if (count == null)
count = 0;
count++;
counts.put(word, count);
collector.emit(new Values(word, count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 5);
builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping(
"spout");
builder.setBolt("count", new WordCount(), ).fieldsGrouping("split",
new Fields("word"));
Config conf = new Config();
conf.setDebug(true);
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopology(args[0], conf,
builder.createTopology());
} else {
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", conf, builder.createTopology());
Thread.sleep();
cluster.shutdown();
}
}
}
package storm.starter.spout;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import java.util.Map;
import java.util.Random;
public class RandomSentenceSpout extends BaseRichSpout {
SpoutOutputCollector _collector;
Random _rand;
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
_rand = new Random();
}
@Override
public void nextTuple() {
Utils.sleep();
String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
"four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
String sentence = sentences[_rand.nextInt(sentences.length)];
_collector.emit(new Values(sentence));
}
@Override
public void ack(Object id) {
}
@Override
public void fail(Object id) {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}