Storm作为分布式实时计算框架,已广泛使用多年,形成成熟的大数据分析和实时计算平台体系。本文简要介绍Storm的架构和一些概念如Topology、Spout和Bolt,以作了解。
Storm是开源的分布式实时计算系统,在实时分析、在线机器学习、连续计算、分布式RPC、ETL等场景中广泛使用。Storm集成了多种消息队列技术和数据库技术,其中的Topology消耗数据流,以任意复杂的方式处理这些流。Storm具有以下特性:
Storm集群中存在两种类型的节点:运行Nimbus服务的主节点和运行Supervisor服务的工作节点。Storm集群由一个主节点和多个工作节点组成,主节点上运行一个名为“Nimbus”的守护进程,用于分配代码、布置任务及故障检测;每个工作节点上则运行一个名为“Supervisor”的守护进程,用于监听工作、开始并终止工作进程。Nimbus和Supervisor之间的协调工作是通过zookeeper来完成的。Nimbus和Supervisor是无状态的,其元数据存储在Zookeeper中,使得系统具有很高的容错性。

Worker由Supervisor负责启动,一个worker可以有多个Executor执行线程,每个Executor又可以包含一个或多个Task,其中Task为Storm中最小处理单元。每个Executor都会启动一个消息循环线程,用于接收、处理和发送消息,当Executor收到属于其下某一Task的消息后,就会调用该Task对应的处理逻辑对消息进行处理。
Nimbus进程运行在Master Node,是Storm集群工作的全局指挥官,负责在集群中分发代码、对节点分配任务并监控异常。主要功能如下:
Supervisor进程运行在Worker Node , 是Storm集群的资源管理者,负责监听其主机上已经分配的主机的作业,启停已经分配的Worker进程。主要功能如下:
Nimbus和Supervisor进程都被设计为快速失败(遇到任何意外情况时进程自毁)和无状态(所有状态保存在Zookeeper或磁盘上)。这样设计的好处就是如果它们的进程被意外销毁,那么在重新启动后,就只需要从Zookeeper上获取之前的状态数据即可,并不会造成任何数据丢失。
Worker进程是Storm集群的任务构造者,构造Spoult或Bolt的Task实例运行其中的处理逻辑,启动Executor线程。主要功能如下:
Executor线程是Storm集群的任务执行者,循环执行Task代码。主要功能如下:
Storm使用Zookeeper来存储Nimbus、Supervisor、Worker以及Executor之间的共享的元数据,这些模块在重启之后,可以通过对应的元数据进行恢复,因此Storm的模块是无状态的。Storm在Zookeeper中存储数据的目录结构如下所示,这是一个根目录为/storm的树形结构,树中的每一个节点代表Zookeeper中的一个节点,每个叶子节点是Storm真正存储数据的地方。

Storm中涉及的概念包括Tuple、Stream、Stream Grouping 、Spout、Bolt和Topology。
1)元组Tuple
元组是消息传递的基本单元,是一个命名的值列表,元组中的字段可以是任何类型的对象。Storm使用元组作为其数据模型,而元组支持所有的基本类型、字符串和字符数组作为字段值,元组其实是一个value list。
2)流Stream
流是Storm的核心抽象,是一个无界的元组序列。源源不断传递的元组就组成了流,在分布式环境中进行创建和处理。
3)Stream Grouping
Stream Grouping定义了消息分发策略,定义了Bolt节点以何种方式接收数据。消息分发的策略包括:随机分配、根据字段值分配、广播全部分组、发给同一个Task以及无分组等。
4)Spout
Spout是拓扑中stream的来源,是拓扑中产生源数据流的组件。通常Spout会从外部数据源中读取数据,然后转化为拓扑内部的源数据。
5)Bolt
在拓扑中,所有的处理都是在Bolt中完成的,Bolt是stream的处理节点,可以完成过滤、业务处理、连接运算、连接与访问数据库等任何操作。
6)拓扑Topology
拓扑是Storm中运行的一个实时的应用程序,因为各个组件中间的消息流动而形成逻辑上的拓扑结构。Storm的拓扑类似于MapReduce的作业,主要区别是MapReduce作业最终会完成,而拓扑永远在运行直到被杀死。一个拓扑就是Spout和Bolt的连接流分组。
7)Storm和Hadoop中基本概念对比
| 类型 | Hadoop | Storm |
|---|---|---|
| 系统角色 | JobTracker | Nimbus |
| 系统角色 | TaskTracker | Supervisor |
| 系统角色 | Child | Worker |
| 应用名称 | Job | Topology |
| 组件接口 | Map/Reduce | Spout/Bolt |
使用Storm做实时计算,首先需要创建拓扑,一个拓扑是一个有向图的计算。在一个拓扑中每个节点包含处理逻辑,节点之间的连接显示数据应该如何在节点之间传递。如下图所示,其中包含Spout和Bolt:

在Storm中,Spout有3中获取数据的模式:直接连接、消息队列和DRPC。
1)直接连接
在直接连接的架构中,Spout直接与数据源相连接,这种架构实现起来很容易,特别当消息源是一个已知的设备或设备组。另外,也可以使用多个Spout从多个消息源中获取消息,这样可以均匀地分发收集器访问数据源,比如从Web服务器收集日志文件。

2)消息队列
消息发射器把消息发送到消息队列系统,Spout从消息队列系统中获取消息。使用消息队列的优势是可以作为Spout和数据源之间的中间件,也就是Spout不需要知道关于消息发射器的任何东西,添加和删除发射器的过程比直接连接更容易。当然也会带来问题,比如消息队列系统会成为故障点、在处理流程中增加了一层。

3)分布式RPC
DRPCSpout是一个Spout实现,从DRPC服务器接收数据流并处理。DRPC工作流如下:

Bolt是一个组件,以元组作为输入,生成元组作为输出。在客户端主机中创建Bolt,序列化到拓扑,并提交到集群的主控节点。集群启动Worker,反序列化Bolt,调用并处理元组。

Spouts和Bolts在storm集群上执行任务时,是由多个Tasks并行执行,如上图所示,每一个圆圈代表一个 Task。当一个Tuple需要从Bolt A发送给Bolt B执行的时候,由Stream groupings分组策略来决定应该发送给Bolt B的哪一个Task执行。Storm中一共有如下8个内置的 Stream Grouping。
1)Shuffle grouping随机分组
Tuples随机的分发到每个Bolt的每个Task上,每个Bolt获取到等量的Tuples。
2)Fields grouping按字段分组
Streams通过grouping指定的字段(field)来分组。假设通过user-id字段进行分区,那么具有相同user-id的Tuples就会发送到同一个Task。
3)Partial Key grouping
Streams通过grouping中指定的字段(field)来分组,与Fields Grouping相似。但是对于两个下游的Bolt来说是负载均衡的,可以在输入数据不平均的情况下提供更好的优化。
4)All grouping广播分组
Streams会被所有的Bolt的Tasks进行复制。由于存在数据重复处理,所以需要谨慎使用。
5)Global grouping全局分组
整个Streams会进入Bolt的其中一个Task,通常会进入id最小的Task。
6)None grouping不分组
不分组意思是说stream不关心到底谁会收到它tuple,当前None grouping和Shuffle grouping等价,都是进行随机分发。
7)Direct grouping直接分组
Direct grouping只能被用于direct streams,使用这种方式需要由Tuple的生产者直接指定由哪个Task进行处理。只有被声明为Direct Stream的消息流可以声明这种分组方法,而且这种消息tuple必须使用emitDirect方法来发射。消息处理者可以通过TopologyContext来获取处理它的消息的task的id(OutputCollector.emit方法也会返回task的id)
8)Local or shuffle grouping
如果目标Bolt有Tasks和当前Bolt的Tasks处在同一个Worker进程中,那么则优先将Tuple Shuffled到处于同一个进程的目标Bolt的Tasks上,这样可以最大限度地减少网络传输。否则,就和普通的Shuffle Grouping行为一致。
在Storm中运行Topology任务主要依赖下面三个实体:

以下定义一个Topology,由1个Spout和2个Bolt组成
#配置Topology
conf.setNumWorkers(2); //使用两个Worker进程
topologyBuilder.setSpout(“read-spout”,new ReadSpout(),2); //设置并行度为2
topologyBuilder.setBolt(“norm-bolt”,new NormBolt(),2).setNumTasks(4).shuffleGrouping(“read-spout”); //使用4个Tasks
topologyBuilder.setBolt(“write-bolt”,new YellowBolt(),6).shuffleGrouping(“norm-blot”);
因此该Topology一共有两个Worker进程,2+2+6=10个Executor线程,2+4+6=12个Tasks,每个Worker进程会分配到5个Executor和6个Tasks。

上文简要介绍了Storm的基本架构和一些基本概念,与其它大数据组件比较,Storm有它的优势,技术架构上也更为成熟。
参考资料: