本示例内容在使用NIFI构建一个高度可扩展的物联网应用数据管道示例的基础上进行。
前文中的示例,如果放到生产环境中,那么MQTT和MINIFI部署在智能设备中,NIFI部署在数据中心服务器上。
如果需要修改MINIFI中的ETL任务,那么我们需要导出模板,转换成yml文件,放到MINIFI的conf目录下,然后重启MINIFI。技术上可行,但实际是不现实的,因为设备成千上万,部署位置千差万别。有没有更好的方法呢?
这里要解决两个问题:
更新ETL任务时,智能设备中的MINIFI不应该重启。
智能设备中的MINIFI应该能够自动获取更新后的yml文件。
在MINIFI子项目中,已经提供了一个工具:
Apache NiFi MiNiFi Command and Control(简称MINIFI C2)官方说明:
MiNiFi代理允许我们将数据流推送到网络边缘的较小设备上。这在一个较小的包中提供了使用NiFi处理数据的许多细节。在各种设备上运行许多不同的代理时,一个巨大的挑战是协调它们的工作并推出修改后的流。C2服务器是尝试解决此用例的开始。它为现有的PullHttpChangeIngestor功能提供了一个端点,目的是方便将适当的流定义分发到每个代理类。
在假设的用例中,一个或多个MiNiFi代理类定期轮询C2服务器,以获取对其流的更新。当有新版本可用时,C2服务器将把它发送回代理,此时代理将尝试使用新流重新启动自己,如果启动时出现问题,则回滚。C2服务器是可扩展和灵活配置的。ConfigurationProvider接口是主要的扩展点,在这里应该可以使用任意逻辑来获取更新的流。服务器支持双向TLS认证和可配置授权。
从上述说明可以看出,完美解决了前面两个问题,下面我们安装部署MINIFI C2。
- #MINIFI C2官方文档
- https://github.com/apache/nifi/tree/main/minifi/minifi-c2
- https://cwiki.apache.org/confluence/display/MINIFI/C2+Design#C2Design-Stop
1、安装启动 MINIFI C2
2、访问MINIFI C2
http://localhost:10090/c2/config?class=raspi3&version=6
注意地址中class和version的参数值来自哪里:
访问这个地址,输出的是目录下的文件内容。
到这里,MINIFI C2安装好了。那么设备上的MINIFI怎么连接过来呢?
3、配置MINIFI连接MINIFI C2
MiNiFi使用"Change Ingestor(更改接收器)",通过"Change Ingestor"将可能的新配置通知给代理。Change Ingestor是可插入模块,目前支持以下接收器:
RestChangeIngestor
PullHttpChangeIngestor
本示例中,使用PullHttpChangeIngestor,每隔一段时间来查询C2服务器并下载任何可用的新配置。要配置此接收器,需编辑minifi文件./conf/bootstrap.conf,取消注释相应的行,并按如下所示设置属性:
使用此配置,每个MiNiFi代理每隔1分钟会在http://localhost:10090/c2/config查询C2服务器REST API,并要求"raspi3"类的最新配置。
1分钟的频率仅用于演示目的。生产环境不会如此频繁地更新代理。
重启MINIFI,使配置生效。
4、测试联调
为了更好的演示ETL修改的效果,修改ETL,增加一个组件updateAttribute,添加一个属性version,属性值为1,然后创建模板、下载模板,转换为yml格式,部署到MINIFI上。
像使用NIFI构建一个高度可扩展的物联网应用数据管道示例中那样,测试,看下收到数据的version值,应该是1.
在NIFI中,修改这个ETL,将version改为2,然后创建模板、下载模板,转换为yml格式,然后命名为config.text.yml.v2,放到minifi-c2-home\files\raspi3目录下:
等1分钟,再次发送测试数据,看下收到的数据中version值是否变为2。
这里的命名约定非常重要。文件名以v版本号结尾。
我们将更新的yml放到的MINIFI C2上,设备上的MINIFI扫描到了新版本,自动热部署,完成了更新。解决了开头提出的那两个问题。
至此,示例完成。
最后,提供一个来自网络的IOT生产环境架构示意图,供参考: