
storm软件包中lib目录下的所有jar包
- package com.bjsxt.storm.wc;
-
- import backtype.storm.Config;
- import backtype.storm.LocalCluster;
- import backtype.storm.generated.StormTopology;
- import backtype.storm.topology.TopologyBuilder;
- import backtype.storm.tuple.Fields;
- import backtype.storm.utils.Utils;
-
- public class WordCountTopology {
-
- public static void main(String[] args) {
- // 拓扑封装了计算逻辑
- TopologyBuilder builder = new TopologyBuilder();
- // 设置lineSpout:该spout负责向拓扑发送句子
- builder.setSpout("lineSpout", new LineSpout());
- // 设置切分闪电,该闪电处理从水龙头lineSpout通过随机分组发送过来的元组
- builder.setBolt("splitBolt", new SplitBolt())
- .shuffleGrouping("lineSpout");
- // 定义一个计数闪电,该闪电从splitBolt闪电通过按字段分组的方式分发过来的元组
- // 按照元组中word的值进行分组。要保证相同的单词一定发送给同一个闪电。
- builder.setBolt("countBolt", new CountBolt())
- .fieldsGrouping("splitBolt", new Fields("word"));
-
- // 通过建造者创建一个拓扑的实例
- StormTopology wordCountTopology = builder.createTopology();
-
- // 本地模拟集群
- LocalCluster cluster = new LocalCluster();
-
- Config config = new Config();
-
- // 将拓扑提交到本地模拟集群
- cluster.submitTopology("wordCountTopology", config, wordCountTopology);
-
- // 睡眠10s,也就是让本地模拟集群运行10s
- Utils.sleep(10000);
-
- // 关闭本地模拟集群
- cluster.shutdown();
-
- }
-
- }
- package com.bjsxt.storm.wc;
-
- import backtype.storm.spout.SpoutOutputCollector;
- import backtype.storm.task.TopologyContext;
- import backtype.storm.topology.OutputFieldsDeclarer;
- import backtype.storm.topology.base.BaseRichSpout;
- import backtype.storm.tuple.Fields;
- import backtype.storm.tuple.Values;
- import backtype.storm.utils.Utils;
-
- import java.util.Map;
-
- public class LineSpout extends BaseRichSpout {
-
- private SpoutOutputCollector collector;
-
- private String[] lines = {
- "The logic for a realtime application is packaged into a Storm topology",
- "A stream is an unbounded sequence of tuples that is processed and created in parallel in a distributed fashion",
- "A spout is a source of streams in a topology",
- "Bolts can do anything from filtering, functions, aggregations, joins, talking to databases, and more.",
- "A stream grouping defines how that stream should be partitioned among the bolt's tasks.",
- "Storm guarantees that every spout tuple will be fully processed by the topology",
- "Each spout or bolt executes as many tasks across the cluster",
- "Each worker process is a physical JVM and executes a subset of all the tasks for the topology"
- };
-
- private int index = 0;
-
- @Override
- public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
- // 在该组件在集群中初始化的时候调用一次
- this.collector = collector;
- }
-
- @Override
- public void nextTuple() {
- // 由storm的线程不停地调用,以便从数据源获取元组
- // 该方法不需要自己写循环和遍历
- // 该方法不能阻塞
- // 负责从数据源获取元组,向DAG发送元组
- // 轮询取出句子
- String lingStr = lines[index % lines.length];
- // 将句子封装为元组发射
- collector.emit(new Values(lingStr));
- index++;
-
- Utils.sleep(10);
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- // 用于声明元组的结构以及流
- // declarer.declareStream("s1", new Fields("key1", "key2", "key3"));
- // declarer.declareStream("s2", new Fields("key21", "key22"));
- // 发送元组的时候就有一个字段,是line,它的值是句子
- // 可以将元组想象为map集合,只不过其key是固定的几个
- declarer.declare(new Fields("line"));
- }
- }
- package com.bjsxt.storm.wc;
-
- import backtype.storm.task.OutputCollector;
- import backtype.storm.task.TopologyContext;
- import backtype.storm.topology.OutputFieldsDeclarer;
- import backtype.storm.topology.base.BaseRichBolt;
- import backtype.storm.tuple.Fields;
- import backtype.storm.tuple.Tuple;
- import backtype.storm.tuple.Values;
-
- import java.util.Map;
-
- public class SplitBolt extends BaseRichBolt {
-
- private OutputCollector collector;
-
- @Override
- public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- this.collector = collector;
- }
-
- @Override
- public void execute(Tuple input) {
- String lineStr = input.getStringByField("line");
- String[] wordStrs = lineStr.split(" ");
-
- for (String wordStr : wordStrs) {
- //
- this.collector.emit(new Values(wordStr, 1));
- }
-
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word", "count"));
- }
- }
- package com.bjsxt.storm.wc;
-
- import backtype.storm.task.OutputCollector;
- import backtype.storm.task.TopologyContext;
- import backtype.storm.topology.OutputFieldsDeclarer;
- import backtype.storm.topology.base.BaseRichBolt;
- import backtype.storm.tuple.Tuple;
-
- import java.util.HashMap;
- import java.util.Map;
-
- public class CountBolt extends BaseRichBolt {
-
- private Map
counts; -
- @Override
- public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- counts = new HashMap<>();
- }
-
- @Override
- public void execute(Tuple input) {
- // new Fields("word", "count")
- String wordStr = input.getStringByField("word");
- Integer count = input.getIntegerByField("count");
-
- Integer sum = counts.get(wordStr);
-
- if (sum == null) {
- counts.put(wordStr, count);
- } else {
- counts.put(wordStr, sum + count);
- }
-
- counts.forEach((k, v) -> {
- System.out.println(k + "_________" + v);
- });
- System.out.println("========================================");
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
-
- }
- }
右键运行WordCountTopology

| Storm配置项 | 说明 |
| java.library.path | Storm本身依赖包的路径,存在多个时用冒号分隔 |
| storm.local.dir | Storm使用的本地文件系统目录(必须存在并且storm进程可读写)。默认是storm的根目录下的storm-local。 |
| storm.zookeeper.servers | storm集群对应的zookeeper集群的主机列表 |
| storm.zookeeper.port | storm集群对应的zookeeper集群的服务端口,zookeeper默认端口为2181 |
| storm.zookeeper.root | storm的元数据在zookeeper中存储的根目录,默认值是/storm |
| storm.cluster.mode | storm运行模式,local或distributed。集群模式需设置为distributed |
| storm.messaging.transport | storm的消息传输机制,使用netty作为消息传输时设置为backtype.storm.messaging.netty.Context |
| nimbus.host | 整个storm集群的nimbus节点 |
| nimbus.supervisor.timeout.secs | storm中每个被发射出去的消息处理的超时时间,该时间影响到消息的处理,同时在storm ui上杀掉一个拓扑时的默认时间(kill动作发出后多长时间才会真正将该拓扑杀掉)。默认值是60 |
| ui.port | storm自带UI,以http服务形式支持访问,此处设置该http服务的端口(非root用户端口号需要大于1024) |
| ui.childopts | storm UI进程的java参数设置(对java进程的约束都可以在此设置,如内存等) |
| logviewer.port | 此处用于设置该Log Viewer进程的端口(Log Viewer进程也是http形式,需要运行在每个storm节点上)。默认值8000 |
| logviewer.childopts | Log Viewer进程的参数设置 |
| logviewer.appender.name | storm log4j的appender,设置的名字对应于文件storm/log4j2/cluster.xml中设置的appender,cluster.xml可以控制storm logger的级别 |
| supervisor.slots.ports | storm的slot,最好设置为OS核数的整数倍;同时由于storm是基于内存的实时计算,slot数不要大于每台物理机可运行slot个数:(物理内存-虚拟内存)/单个java进程最大可占用内存数 |
| worker.childopts | storm的worker进程的java限制,有效地设置该参数能够在拓扑异常时进行原因分析: -Xms1024m -Xmx1024m -XX:+UseConcMarkSweepGC -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70 -XX:+HeapDumpOnOutOfMemoryError 其中:Xms为单个java进程最小占用内存数,Xmx为最大占用内存数,设置HeapDumpOnOutOfMemoryError的好处是,当内存使用量超过Xmx时,java进程将被JVM杀掉同时会生成java_pid |
| storm.messaging.netty.buffer_size | netty传输的buffer大小,默认为5MB,当spout发射的消息较大时,此处需要对应调整 |
| storm.messaging.netty.max_retries | 这几个参数是关于使用netty作为底层消息传输时的相关设置,需要重视,否则可能由于bug而引起错误: java.lang.IllegalArgumentException: timeout value is negative |
| storm.messaging.netty.max_wait_ms | |
| storm.messaging.netty.min_wait_ms | |
| topology.debug | 该参数可以在拓扑中覆盖,表示该拓扑是否运行于debug模式。运行于debug模式时,storm将记录拓扑中收发消息等的详细信息,线上环境不建议打开 |
| topology.acker.executors | storm通过acker机制保证消息不丢失,此参数用于设置每个拓扑的acker数量,由于acker基本消耗的资源较小,强烈建议将此参数设置在较低的水平,可以在拓扑中进行覆盖 |
| topology.max.spout.pending | 一个spout任务中处于pending状态的最大元组数量。该配置应用于单个任务,而不是整个spout或拓扑,可在拓扑中进行覆盖。 |
此外,storm/log4j2/cluster.xml文件中可以配置storm的日志级别矩阵信息等。
操作系统的配置,其中有两项需要配置(通过ulimit -a查看):
1、open files:当前用户可以打开的文件描述符数。
2、max user processes:当前用户可以运行的进程数,此参数太小将引起storm的一个错误:
java.lang.OutOfMemoryError: unable to create new native thread
完成拓扑中定义的业务逻辑,即执行拓扑的进程。
一个worker的基本执行步骤:
每个任务对应一个线程或多个任务对应一个线程
线程称为executor
executor在worker中运行
worker是一个JVM进程
在supervisor中运行
worker中线程间通信使用的是Disruptor,进程间通信可能是netty也可以是zmq。默认使用netty。
数据流:



该目录下有三个文件: