• 一次业务代码的流式重构


    封面图

    业务场景

    目标 ~> 调度 ~> MQ ~> 引擎,就是生产者消费者模型,非常简单。

    为了提高性能,调度需要将一个大目标拆分为多个子任务,启动多个引擎并发地去执行。

    举个例子,用户输入一个 A 段目标 1.0.0.0/8(2^24=16,777,216),设置了全端口(1-65535)三种协议(ICMP UDP TCP)扫描,假定引擎每次处理 10W 目标,200 个端口时效率最佳。

    老代码

    SpiltTargets ~> SpiltPorts ~> SpiltProtocol ~> MQ,代码抽象为三个函数,顺序执行,每个阶段执行完才能进入下个阶段,中间产生的所有数据都保存在内存中,然后全部推送到 MQ。

    SpiltTargets 后,子任务数量变为 16,777,216 / 100,000 = 168

    接着 SpiltPorts 后,65535 / 200 = 328,此时子任务数量变为 168 * 328 = 55104

    最终 SpiltProtocol,子任务数量 55104 * 2 + 168(ICMP 协议无端口)= 110376,高达 11W 之多

    优点

    • 代码实现简单
    • 纯 CPU 运算,整个拆分过程快,由 MQ 持久化消息,不担心重启丢数据(不过不能在拆分的时候重启)

    缺点

    • 调度内存占用高(一行字符串最终变为 11W 行字符串)
    • MQ 消息数量太多,内存占用大的同时,还可能丢消息

    后续

    其实按照 10W 目标,200 个端口拆分,整个系统还算撑得住,直到后来我们的系统把客户的路由器给打挂了(看来有时候不能一味的追求快)。

    为了扫描变慢点,拆分粒度改为了 256 个目标,50 个端口,最终产生消息数 65535 * 1311 * 2 + 65535 = 171,898,305,都上亿了,调度和 MQ 都顶不住了!

    当时的修改是引入二级队列,一级还是按照 10W 拆分,后台协程定时从一级获取消息按照 256 拆分为二级,引擎从二级队列获取子任务。

    流式重构

    虽然上面的二级队列解决了问题,但是我感觉并不是很完美,为什么要等到所有的流程都走完才推消息呢?为什么要先推消息,然后拉回来,再推出去呢?

    受到 go-zero/stream 启发,我决定将其流式化重构,去除业务代码,核心的骨架如下。

    type Stream struct {
    	source <-chan []string // 一批目标
    	done   chan struct{}   // 退出信号
    }
    
    func NewStream(targets []string) Stream {
        // 此处使用无缓冲的 channel 演示,具体可以根据上下游的处理能力设置 buffer
    	source := make(chan []string) 
    	done := make(chan struct{})
    
    	go func() {
    		defer close(source)
    
    		for _, v := range targets {
    			select {
    			case <-done: // 监听退出信号
    				return
    			default:
    			}
    			source <- []string{v} // 传递给下一阶段
    		}
    	}()
    
    	return Stream{
    		source: source,
    		done:   done,
    	}
    }
    
    func (s Stream) SpiltTargets(chunk int) Stream {
    	source := make(chan []string)
    
    	var buf []string
    
    	go func() {
    		defer close(source)
    
    		for msg := range s.source {
    			select {
    			case <-s.done:
    				return
    			default:
    			}
    
    			// 缓存 chunk 数量的目标后,传递给下一阶段,算法很简单,此处忽略
    			for _, v := range msg {
    				buf = append(buf, v)
    			}
    			source <- buf
    		}
    	}()
    
    	return Stream{
    		source: source,
    		done:   s.done,
    	}
    }
    
    func (s Stream) SpiltPorts(chunk int) Stream {
    	// 逻辑和 SpiltTargets 一致,只不过对端口做处理
    }
    
    func (s Stream) PushMQ(protocol []string) Stream {
    	// 逻辑基本和上面一致
    
    	// 有个策略,只有在当前队列消息数少于 500 时,才推送
    	// 不能一股脑全推送,否则就和老代码效果一样了(拆分速度远远快于消费速度)
    }
    
    func (s Stream) Wait() {
    	// 等待所有的子任务都拆分完成
    	for range s.source {
    	}
    
    	// 关闭 MQ 连接
    }
    
    func (s Stream) Tidy() {
    	// 通知所有阶段都退出
    	close(s.done)
    
    	// 删除队列
    
    	// 关闭 MQ 连接
    }
    

    使用效果如下:

    func main() {
    	s := NewStream([]string{"1.0.0.0/8"})
    	s.SpiltTargets(10000).PushMQ("icmp").SpiltPorts(200).PushMQ("udp", "tcp").Wait()
    }
    

    代码效果看起来还不错,就像水一样徐徐流过,而不像之前水库泄洪似的。

    优点

    • 不用担心拆分粒度,省内存,MQ 消息数可控
    • 方便拓展,根据业务需求可以加入更多的处理阶段

    缺点

    • 整个拆分过程伴随着任务运行一直存在,不能利用 MQ 持久化
    • 只能处理局部数据,不能处理全量数据

    后续

    由于持久化方案太复杂,目前暂时没做,不过问题不大,重启这种非正常情况毕竟机率非常小

    总结

    Go 的 channel 非常适合做流式处理。

    在设计时不仅仅要完成功能,还要适当考虑性能,虽然这样花费的时间可能稍微多点。

    参考

    https://github.com/kevwan/stream

  • 相关阅读:
    2022年Java秋招面试必看的 | 微服务面试题
    SpringBoot+Vue实现Excel文档导入和导出
    docker笔记
    Linux——环境变量与地址空间
    ​​​​​​青少年软件编程(C语言)等级考试试卷目录一览
    【链接装载与库】 Linux共享库的组织
    “errcode“:40164,“errmsg“:“invalid ip ...微信公众号开发调用失败的解决办法
    关于如何解决visualc++6.0打开文件闪退的一种方式(附带解决输入法无法显示)
    SQL Server入门-SSMS简单使用(2008R2版)-2
    Dubbo3应用开发——架构的演变过程
  • 原文地址:https://www.cnblogs.com/yahuian/p/stream-refactor.html