• mqant启动流程


    测试代码

    1. package main
    2. import (
    3. "github.com/liangdas/mqant"
    4. log "github.com/liangdas/mqant/log/beego"
    5. "github.com/liangdas/mqant/module"
    6. "github.com/liangdas/mqant/registry"
    7. "github.com/liangdas/mqant/registry/consul"
    8. "github.com/mqant_test/helloworld"
    9. "github.com/mqant_test/web"
    10. "github.com/nats-io/nats.go"
    11. "time"
    12. )
    13. func main() {
    14. rs := consul.NewRegistry(func(options *registry.Options) {
    15. options.Addrs = []string{"127.0.0.1:8500"}
    16. })
    17. nc, err := nats.Connect("nats://127.0.0.1:4222", nats.MaxReconnects(10000))
    18. if err != nil {
    19. log.Error("nats error %v", err)
    20. return
    21. }
    22. app := mqant.CreateApp(
    23. module.Debug(true),//只有是在调试模式下才会在控制台打印日志, 非调试模式下只在日志文件中输出日志
    24. module.Registry(rs),
    25. module.Nats(nc),
    26. module.KillWaitTTL(5*time.Second),
    27. )
    28. app.OnConfigurationLoaded(func(app module.App) {
    29. log.Info("App OnConfigurationLoaded")
    30. })
    31. app.OnModuleInited(func(app module.App, module module.Module) {
    32. log.Info("App Module[%s] OnModuleInited",module.GetType())
    33. })
    34. app.OnStartup(func(app module.App) {
    35. log.Info("App OnStartup")
    36. })
    37. err = app.Run(
    38. helloworld.Module(),
    39. web.Module(),
    40. )
    41. if err != nil{
    42. log.Error(err.Error())
    43. }
    44. }

    运行日志:

     一:先来看看app的创建:

     

    1. app := mqant.CreateApp(
    2. module.Debug(true),
    3. module.Registry(rs),
    4. module.Nats(nc),
    5. module.KillWaitTTL(5*time.Second),
    6. )
    7. func CreateApp(opts ...module.Option) module.App {
    8. opts = append(opts, module.Version(version))
    9. return app.NewApp(opts...)
    10. }
    11. // NewApp 创建app
    12. func NewApp(opts ...module.Option) module.App {
    13. options := newOptions(opts...)
    14. app := new(DefaultApp)
    15. app.opts = options
    16. options.Selector.Init(selector.SetWatcher(app.Watcher))
    17. app.rpcserializes = map[string]module.RPCSerialize{}
    18. return app
    19. }

    newOptions()代码量比较多,主要干的就是创建module.Options类型的对象opt,默认了一些值,然后遍历传给函数的opts,就是module.Debug(true)这些东西,在这些传递进来的闭包中设置opt的一些变量,达到让开发者自己控制的目的。默认是Debug和解析命令行参数的

    如果外部传递的opts没有设置Nats的相关信息,就用默认的nats服务器地址创建一个连接

    然后就是日志路径文件夹相关的初始化,此时整个app启动需要的配置(Registry、Selector、Log等等)都已经设置好了,这些设置同样交给app保存(app.opts = options)

    二:再来看看app的运行:

            app.Run(helloworld.module(),web.module()) 调用的是app/app.go里边的Run(),把要初始化的所有的Module对象都传递进去,至于传递进去的对象是不是要启动运行的那是后话

    1. // Run 运行应用
    2. func (app *DefaultApp) Run(mods ...module.Module) error {
    3. f, err := os.Open(app.opts.ConfPath)
    4. if err != nil {
    5. //文件不存在
    6. panic(fmt.Sprintf("config path error %v", err))
    7. }
    8. var cof conf.Config
    9. fmt.Println("Server configuration path :", app.opts.ConfPath)
    10. conf.LoadConfig(f.Name()) //加载配置文件
    11. cof = conf.Conf
    12. app.Configure(cof) //解析配置信息
    13. if app.configurationLoaded != nil {
    14. app.configurationLoaded(app)
    15. }
    16. // log.InitLog(app.opts.Debug, app.opts.ProcessID, app.opts.LogDir, cof.Log)
    17. // log.InitBI(app.opts.Debug, app.opts.ProcessID, app.opts.BIDir, cof.BI)
    18. log.Init(log.WithDebug(app.opts.Debug),
    19. log.WithProcessID(app.opts.ProcessID),
    20. log.WithBiDir(app.opts.BIDir),
    21. log.WithLogDir(app.opts.LogDir),
    22. log.WithLogFileName(app.opts.LogFileName),
    23. log.WithBiSetting(cof.BI),
    24. log.WithBIFileName(app.opts.BIFileName),
    25. log.WithLogSetting(cof.Log))
    26. log.Info("mqant %v starting up", app.opts.Version)
    27. manager := basemodule.NewModuleManager()
    28. manager.RegisterRunMod(modules.TimerModule()) //注册时间轮模块 每一个进程都默认运行
    29. // module
    30. for i := 0; i < len(mods); i++ {
    31. mods[i].OnAppConfigurationLoaded(app)
    32. manager.Register(mods[i])
    33. }
    34. app.OnInit(app.settings)
    35. manager.Init(app, app.opts.ProcessID)
    36. if app.startup != nil {
    37. app.startup(app)
    38. }
    39. log.Info("mqant %v started", app.opts.Version)
    40. // close
    41. c := make(chan os.Signal, 1)
    42. signal.Notify(c, os.Interrupt, os.Kill, syscall.SIGTERM)
    43. sig := <-c
    44. log.BiBeego().Flush()
    45. log.LogBeego().Flush()
    46. //如果一分钟都关不了则强制关闭
    47. timeout := time.NewTimer(app.opts.KillWaitTTL)
    48. wait := make(chan struct{})
    49. go func() {
    50. manager.Destroy()
    51. app.OnDestroy()
    52. wait <- struct{}{}
    53. }()
    54. select {
    55. case <-timeout.C:
    56. panic(fmt.Sprintf("mqant close timeout (signal: %v)", sig))
    57. case <-wait:
    58. log.Info("mqant closing down (signal: %v)", sig)
    59. }
    60. log.BiBeego().Close()
    61. log.LogBeego().Close()
    62. return nil
    63. }

    具体流程如下:

    1:日志0打印的出处上边很明显

    2:conf.LoadConfig(f.Name())  调用conf\config.go 中 LoadConfig(Path string)

    1. func LoadConfig(Path string) {
    2. // Read config.
    3. if err := readFileInto(Path); err != nil {
    4. panic(err)
    5. }
    6. if Conf.RPC.RPCExpired == 0 {
    7. Conf.RPC.RPCExpired = 3
    8. }
    9. if Conf.RPC.MaxCoroutine == 0 {
    10. Conf.RPC.MaxCoroutine = 100
    11. }
    12. }
    13. func readFileInto(path string) error {
    14. var data []byte
    15. buf := new(bytes.Buffer)
    16. f, err := os.Open(path)
    17. if err != nil {
    18. return err
    19. }
    20. defer f.Close()
    21. r := bufio.NewReader(f)
    22. for {
    23. line, err := r.ReadSlice('\n')
    24. if err != nil {
    25. if len(line) > 0 {
    26. buf.Write(line)
    27. }
    28. break
    29. }
    30. if !strings.HasPrefix(strings.TrimLeft(string(line), "\t "), "//") {
    31. buf.Write(line)
    32. }
    33. }
    34. data = buf.Bytes()
    35. //fmt.Print(string(data))
    36. return json.Unmarshal(data, &Conf)
    37. }

            conf包中的Conf就被赋值了,Conf是Config类型的,结构如下,配置server.json的时候就可以按照这个struct里边的内容来配置

    1. type Config struct {
    2. Log map[string]interface{}
    3. BI map[string]interface{}
    4. OP map[string]interface{}
    5. RPC RPC `json:"rpc"`
    6. Module map[string][]*ModuleSettings
    7. Mqtt Mqtt
    8. Settings map[string]interface{}
    9. }

    3:app.Configure(cof)

    1. // Configure 重设应用配置
    2. func (app *DefaultApp) Configure(settings conf.Config) error {
    3. app.settings = settings
    4. return nil
    5. }

    这一步就是把上边格式化好的配置文件绑定到app上

    4:配置文件已经初始化好了 回调给app

    1. if app.configurationLoaded != nil {
    2. app.configurationLoaded(app)
    3. }
    4. // OnConfigurationLoaded 设置配置初始化完成后回调
    5. func (app *DefaultApp) OnConfigurationLoaded(_func func(app module.App)) error {
    6. app.configurationLoaded = _func
    7. return nil
    8. }

    这个时候会回调main.go 里边设置的回调,也就是日志1的打印

    1. app.OnConfigurationLoaded(func(app module.App) {
    2. log.Info("App OnConfigurationLoaded")
    3. })

    5:日志的初始化

    1. log.Init(log.WithDebug(app.opts.Debug),
    2. log.WithProcessID(app.opts.ProcessID),
    3. log.WithBiDir(app.opts.BIDir),
    4. log.WithLogDir(app.opts.LogDir),
    5. log.WithLogFileName(app.opts.LogFileName),
    6. log.WithBiSetting(cof.BI),
    7. log.WithBIFileName(app.opts.BIFileName),
    8. log.WithLogSetting(cof.Log))
    9. log.Info("mqant %v starting up", app.opts.Version)

    app.opts 是第一步 app初始化的时候 赋值的,这一步没啥说的,按照要求来就行

    日志初始化完成后 日志2 也就打印出来了

    6:创建module manager管理用户自定义的一些Module

    1. manager := basemodule.NewModuleManager()
    2. manager.RegisterRunMod(modules.TimerModule()) //注册时间轮模块 每一个进程都默认运行
    3. // module
    4. for i := 0; i < len(mods); i++ {
    5. mods[i].OnAppConfigurationLoaded(app)
    6. manager.Register(mods[i])
    7. }
    8. app.OnInit(app.settings)
    9. manager.Init(app, app.opts.ProcessID)

    默认插入一个TimerModule,因为是系统级别的Module,所以调用的是RegisterRunMod(),然后遍历mods,分别调用自定义Module的OnAppConfigurationLoaded方法,日志3 日志4就是这么来的

    manager.Register 就是把模块注册进去,这里注册的并不一定是要运行的,是否运行还有依据配置文件。

    模块注册完成了之后,app.OnInit()啥子都没有干,连个日志没有,哈哈

    接下来就是manager的Init还是蛮重要的

    1. // Init 初始化
    2. func (mer *ModuleManager) Init(app module.App, ProcessID string) {
    3. log.Info("This service ModuleGroup(ProcessID) is [%s]", ProcessID)
    4. mer.app = app
    5. mer.CheckModuleSettings() //配置文件规则检查
    6. for i := 0; i < len(mer.mods); i++ {
    7. for Type, modSettings := range app.GetSettings().Module {
    8. if mer.mods[i].mi.GetType() == Type {
    9. //匹配
    10. for _, setting := range modSettings {
    11. //这里可能有BUG 公网IP和局域网IP处理方式可能不一样,先不管
    12. if ProcessID == setting.ProcessID {
    13. mer.runMods = append(mer.runMods, mer.mods[i]) //这里加入能够运行的组件
    14. mer.mods[i].settings = setting
    15. }
    16. }
    17. break //跳出内部循环
    18. }
    19. }
    20. }
    21. for i := 0; i < len(mer.runMods); i++ {
    22. m := mer.runMods[i]
    23. m.mi.OnInit(app, m.settings)
    24. if app.GetModuleInited() != nil {
    25. app.GetModuleInited()(app, m.mi)
    26. }
    27. m.wg.Add(1)
    28. go run(m)
    29. }
    30. //timer.SetTimer(3, mer.ReportStatistics, nil) //统计汇报定时任务
    31. }

    日志5的出处 不用说了吧,CheckModuleSettings主要是检查Config中的Module 是不是满足要求,要求如下:ID全局必须唯一、每一个类型的Module列表中ProcessID不能重复

     Module配置文件检查完毕之后,就要遍历传递进来的模块,配置文件是否配置有该模块的配置选项  配有的话  还得看看ProcessID。把满足要求的模块塞进runMods中,然后遍历runMods执行模块的OnInit,日志8和日志13 就是这么来的,Timer的Oninit 没有打印日志,不然也会有显示

    执行模块的OnInit同时,会触发app的OnModuleInited 回调

    1. app.OnModuleInited(func(app module.App, module module.Module) {
    2. log.Info("App Module[%s] OnModuleInited",module.GetType())
    3. })

    所以日志6、日志9、日志14就是这么来的

    最后一步就是run了,run会去调用模块的Run()

    1. func run(m *DefaultModule) {
    2. defer func() {
    3. if r := recover(); r != nil {
    4. if conf.LenStackBuf > 0 {
    5. buf := make([]byte, conf.LenStackBuf)
    6. l := runtime.Stack(buf, false)
    7. log.Error("%v: %s", r, buf[:l])
    8. } else {
    9. log.Error("%v", r)
    10. }
    11. }
    12. }()
    13. m.mi.Run(m.closeSig)
    14. m.wg.Done()
    15. }

    日志11、日志12、日志17就是Run()里边打印的 

    7:所有的流程都走完了,最后就回调app的OnStartup,打印日志

    1. if app.startup != nil {
    2. app.startup(app)
    3. }
    4. log.Info("mqant %v started", app.opts.Version)
    5. //main.go中
    6. app.OnStartup(func(app module.App) {
    7. log.Info("App OnStartup")
    8. })

    日志15、日志16 就是这么来的

    8:从上到下捋一遍,发现日志7、日志10没有找到来源

    Registering node: HelloWorld@a80901baa89b565b

    Registering node: Web@cc93a77ac80a144c

    一开始真的没有找到入口,到底从哪调用的。。。。通过利用其他手段 获得了调用堆栈

     原来是在ModuleManager的OnInit()中 m.mi.OnInit(app, m.settings) 调用模块自己的OnInit()时

     子模块的OnInit调用了基类的Oninit

    1. // OnInit 当模块初始化时调用
    2. func (m *BaseModule) OnInit(subclass module.RPCModule, app module.App, settings *conf.ModuleSettings, opt ...server.Option) {
    3. //初始化模块
    4. m.App = app
    5. m.subclass = subclass
    6. m.settings = settings
    7. //创建一个远程调用的RPC
    8. opts := server.Options{
    9. Metadata: map[string]string{},
    10. }
    11. for _, o := range opt {
    12. o(&opts)
    13. }
    14. if opts.Registry == nil {
    15. opt = append(opt, server.Registry(app.Registry()))
    16. }
    17. if opts.RegisterInterval == 0 {
    18. opt = append(opt, server.RegisterInterval(app.Options().RegisterInterval))
    19. }
    20. if opts.RegisterTTL == 0 {
    21. opt = append(opt, server.RegisterTTL(app.Options().RegisterTTL))
    22. }
    23. if len(opts.Name) == 0 {
    24. opt = append(opt, server.Name(subclass.GetType()))
    25. }
    26. if len(opts.ID) == 0 {
    27. opt = append(opt, server.ID(mqanttools.GenerateID().String()))
    28. }
    29. if len(opts.Version) == 0 {
    30. opt = append(opt, server.Version(subclass.Version()))
    31. }
    32. server := server.NewServer(opt...)
    33. err := server.OnInit(subclass, app, settings)
    34. if err != nil {
    35. log.Warning("server OnInit fail id(%s) error(%s)", m.GetServerID(), err)
    36. }
    37. hostname, _ := os.Hostname()
    38. server.Options().Metadata["hostname"] = hostname
    39. server.Options().Metadata["pid"] = fmt.Sprintf("%v", os.Getpid())
    40. ctx, cancel := context.WithCancel(context.Background())
    41. m.exit = cancel
    42. m.serviceStopeds = make(chan bool)
    43. m.service = service.NewService(
    44. service.Server(server),
    45. service.RegisterTTL(app.Options().RegisterTTL),
    46. service.RegisterInterval(app.Options().RegisterInterval),
    47. service.Context(ctx),
    48. )
    49. go func() {
    50. err := m.service.Run()
    51. if err != nil {
    52. log.Warning("service run fail id(%s) error(%s)", m.GetServerID(), err)
    53. }
    54. close(m.serviceStopeds)
    55. }()
    56. m.GetServer().SetListener(m)
    57. }

    也就是说每个模块启动都会创建一个RPC 服务器。。。

  • 相关阅读:
    【线性代数】P3 行列式按行展开&异乘变零定理
    SpringCloud注册中心Eureka
    springCloud本地镜像打包配置
    node12-node的 get请求
    大模型-基于大模型的数据标注
    点击百度网盘安装包无反应
    Nodejs初识
    工资支付系统可行性研究报告
    Spring Boot 统一功能处理
    [附源码]SSM计算机毕业设计智能超市导购系统JAVA
  • 原文地址:https://blog.csdn.net/zhangdell/article/details/125600283