测试代码
- package main
-
- import (
- "github.com/liangdas/mqant"
- log "github.com/liangdas/mqant/log/beego"
- "github.com/liangdas/mqant/module"
- "github.com/liangdas/mqant/registry"
- "github.com/liangdas/mqant/registry/consul"
- "github.com/mqant_test/helloworld"
- "github.com/mqant_test/web"
- "github.com/nats-io/nats.go"
- "time"
- )
-
- func main() {
- rs := consul.NewRegistry(func(options *registry.Options) {
- options.Addrs = []string{"127.0.0.1:8500"}
- })
- nc, err := nats.Connect("nats://127.0.0.1:4222", nats.MaxReconnects(10000))
- if err != nil {
- log.Error("nats error %v", err)
- return
- }
-
- app := mqant.CreateApp(
- module.Debug(true),//只有是在调试模式下才会在控制台打印日志, 非调试模式下只在日志文件中输出日志
- module.Registry(rs),
- module.Nats(nc),
- module.KillWaitTTL(5*time.Second),
- )
-
- app.OnConfigurationLoaded(func(app module.App) {
- log.Info("App OnConfigurationLoaded")
- })
-
- app.OnModuleInited(func(app module.App, module module.Module) {
- log.Info("App Module[%s] OnModuleInited",module.GetType())
- })
-
- app.OnStartup(func(app module.App) {
- log.Info("App OnStartup")
- })
-
- err = app.Run(
- helloworld.Module(),
- web.Module(),
- )
- if err != nil{
- log.Error(err.Error())
- }
-
- }
运行日志:
一:先来看看app的创建:
- app := mqant.CreateApp(
- module.Debug(true),
- module.Registry(rs),
- module.Nats(nc),
- module.KillWaitTTL(5*time.Second),
- )
- func CreateApp(opts ...module.Option) module.App {
- opts = append(opts, module.Version(version))
- return app.NewApp(opts...)
- }
- // NewApp 创建app
- func NewApp(opts ...module.Option) module.App {
- options := newOptions(opts...)
- app := new(DefaultApp)
- app.opts = options
- options.Selector.Init(selector.SetWatcher(app.Watcher))
- app.rpcserializes = map[string]module.RPCSerialize{}
- return app
- }
newOptions()代码量比较多,主要干的就是创建module.Options类型的对象opt,默认了一些值,然后遍历传给函数的opts,就是module.Debug(true)这些东西,在这些传递进来的闭包中设置opt的一些变量,达到让开发者自己控制的目的。默认是Debug和解析命令行参数的
如果外部传递的opts没有设置Nats的相关信息,就用默认的nats服务器地址创建一个连接
然后就是日志路径文件夹相关的初始化,此时整个app启动需要的配置(Registry、Selector、Log等等)都已经设置好了,这些设置同样交给app保存(app.opts = options)
二:再来看看app的运行:
app.Run(helloworld.module(),web.module()) 调用的是app/app.go里边的Run(),把要初始化的所有的Module对象都传递进去,至于传递进去的对象是不是要启动运行的那是后话
- // Run 运行应用
- func (app *DefaultApp) Run(mods ...module.Module) error {
- f, err := os.Open(app.opts.ConfPath)
- if err != nil {
- //文件不存在
- panic(fmt.Sprintf("config path error %v", err))
- }
- var cof conf.Config
- fmt.Println("Server configuration path :", app.opts.ConfPath)
- conf.LoadConfig(f.Name()) //加载配置文件
- cof = conf.Conf
- app.Configure(cof) //解析配置信息
-
- if app.configurationLoaded != nil {
- app.configurationLoaded(app)
- }
-
- // log.InitLog(app.opts.Debug, app.opts.ProcessID, app.opts.LogDir, cof.Log)
- // log.InitBI(app.opts.Debug, app.opts.ProcessID, app.opts.BIDir, cof.BI)
- log.Init(log.WithDebug(app.opts.Debug),
- log.WithProcessID(app.opts.ProcessID),
- log.WithBiDir(app.opts.BIDir),
- log.WithLogDir(app.opts.LogDir),
- log.WithLogFileName(app.opts.LogFileName),
- log.WithBiSetting(cof.BI),
- log.WithBIFileName(app.opts.BIFileName),
- log.WithLogSetting(cof.Log))
- log.Info("mqant %v starting up", app.opts.Version)
-
- manager := basemodule.NewModuleManager()
- manager.RegisterRunMod(modules.TimerModule()) //注册时间轮模块 每一个进程都默认运行
- // module
- for i := 0; i < len(mods); i++ {
- mods[i].OnAppConfigurationLoaded(app)
- manager.Register(mods[i])
- }
- app.OnInit(app.settings)
- manager.Init(app, app.opts.ProcessID)
- if app.startup != nil {
- app.startup(app)
- }
- log.Info("mqant %v started", app.opts.Version)
- // close
- c := make(chan os.Signal, 1)
- signal.Notify(c, os.Interrupt, os.Kill, syscall.SIGTERM)
- sig := <-c
- log.BiBeego().Flush()
- log.LogBeego().Flush()
- //如果一分钟都关不了则强制关闭
- timeout := time.NewTimer(app.opts.KillWaitTTL)
- wait := make(chan struct{})
- go func() {
- manager.Destroy()
- app.OnDestroy()
- wait <- struct{}{}
- }()
- select {
- case <-timeout.C:
- panic(fmt.Sprintf("mqant close timeout (signal: %v)", sig))
- case <-wait:
- log.Info("mqant closing down (signal: %v)", sig)
- }
- log.BiBeego().Close()
- log.LogBeego().Close()
- return nil
- }
具体流程如下:
- func LoadConfig(Path string) {
- // Read config.
- if err := readFileInto(Path); err != nil {
- panic(err)
- }
- if Conf.RPC.RPCExpired == 0 {
- Conf.RPC.RPCExpired = 3
- }
- if Conf.RPC.MaxCoroutine == 0 {
- Conf.RPC.MaxCoroutine = 100
- }
- }
-
- func readFileInto(path string) error {
- var data []byte
- buf := new(bytes.Buffer)
- f, err := os.Open(path)
- if err != nil {
- return err
- }
- defer f.Close()
- r := bufio.NewReader(f)
- for {
- line, err := r.ReadSlice('\n')
- if err != nil {
- if len(line) > 0 {
- buf.Write(line)
- }
- break
- }
- if !strings.HasPrefix(strings.TrimLeft(string(line), "\t "), "//") {
- buf.Write(line)
- }
- }
- data = buf.Bytes()
- //fmt.Print(string(data))
- return json.Unmarshal(data, &Conf)
- }
conf包中的Conf就被赋值了,Conf是Config类型的,结构如下,配置server.json的时候就可以按照这个struct里边的内容来配置
- type Config struct {
- Log map[string]interface{}
- BI map[string]interface{}
- OP map[string]interface{}
- RPC RPC `json:"rpc"`
- Module map[string][]*ModuleSettings
- Mqtt Mqtt
- Settings map[string]interface{}
- }
- // Configure 重设应用配置
- func (app *DefaultApp) Configure(settings conf.Config) error {
- app.settings = settings
- return nil
- }
这一步就是把上边格式化好的配置文件绑定到app上
- if app.configurationLoaded != nil {
- app.configurationLoaded(app)
- }
- // OnConfigurationLoaded 设置配置初始化完成后回调
- func (app *DefaultApp) OnConfigurationLoaded(_func func(app module.App)) error {
- app.configurationLoaded = _func
- return nil
- }
这个时候会回调main.go 里边设置的回调,也就是日志1的打印
- app.OnConfigurationLoaded(func(app module.App) {
- log.Info("App OnConfigurationLoaded")
- })
- log.Init(log.WithDebug(app.opts.Debug),
- log.WithProcessID(app.opts.ProcessID),
- log.WithBiDir(app.opts.BIDir),
- log.WithLogDir(app.opts.LogDir),
- log.WithLogFileName(app.opts.LogFileName),
- log.WithBiSetting(cof.BI),
- log.WithBIFileName(app.opts.BIFileName),
- log.WithLogSetting(cof.Log))
- log.Info("mqant %v starting up", app.opts.Version)
app.opts 是第一步 app初始化的时候 赋值的,这一步没啥说的,按照要求来就行
日志初始化完成后 日志2 也就打印出来了
- manager := basemodule.NewModuleManager()
- manager.RegisterRunMod(modules.TimerModule()) //注册时间轮模块 每一个进程都默认运行
- // module
- for i := 0; i < len(mods); i++ {
- mods[i].OnAppConfigurationLoaded(app)
- manager.Register(mods[i])
- }
- app.OnInit(app.settings)
- manager.Init(app, app.opts.ProcessID)
默认插入一个TimerModule,因为是系统级别的Module,所以调用的是RegisterRunMod(),然后遍历mods,分别调用自定义Module的OnAppConfigurationLoaded方法,日志3 日志4就是这么来的
manager.Register 就是把模块注册进去,这里注册的并不一定是要运行的,是否运行还有依据配置文件。
模块注册完成了之后,app.OnInit()啥子都没有干,连个日志没有,哈哈
接下来就是manager的Init还是蛮重要的
- // Init 初始化
- func (mer *ModuleManager) Init(app module.App, ProcessID string) {
- log.Info("This service ModuleGroup(ProcessID) is [%s]", ProcessID)
- mer.app = app
- mer.CheckModuleSettings() //配置文件规则检查
- for i := 0; i < len(mer.mods); i++ {
- for Type, modSettings := range app.GetSettings().Module {
- if mer.mods[i].mi.GetType() == Type {
- //匹配
- for _, setting := range modSettings {
- //这里可能有BUG 公网IP和局域网IP处理方式可能不一样,先不管
- if ProcessID == setting.ProcessID {
- mer.runMods = append(mer.runMods, mer.mods[i]) //这里加入能够运行的组件
- mer.mods[i].settings = setting
- }
- }
- break //跳出内部循环
- }
- }
- }
-
- for i := 0; i < len(mer.runMods); i++ {
- m := mer.runMods[i]
- m.mi.OnInit(app, m.settings)
-
- if app.GetModuleInited() != nil {
- app.GetModuleInited()(app, m.mi)
- }
-
- m.wg.Add(1)
- go run(m)
- }
- //timer.SetTimer(3, mer.ReportStatistics, nil) //统计汇报定时任务
- }
日志5的出处 不用说了吧,CheckModuleSettings主要是检查Config中的Module 是不是满足要求,要求如下:ID全局必须唯一、每一个类型的Module列表中ProcessID不能重复
Module配置文件检查完毕之后,就要遍历传递进来的模块,配置文件是否配置有该模块的配置选项 配有的话 还得看看ProcessID。把满足要求的模块塞进runMods中,然后遍历runMods执行模块的OnInit,日志8和日志13 就是这么来的,Timer的Oninit 没有打印日志,不然也会有显示
执行模块的OnInit同时,会触发app的OnModuleInited 回调
- app.OnModuleInited(func(app module.App, module module.Module) {
- log.Info("App Module[%s] OnModuleInited",module.GetType())
- })
所以日志6、日志9、日志14就是这么来的
最后一步就是run了,run会去调用模块的Run()
- func run(m *DefaultModule) {
- defer func() {
- if r := recover(); r != nil {
- if conf.LenStackBuf > 0 {
- buf := make([]byte, conf.LenStackBuf)
- l := runtime.Stack(buf, false)
- log.Error("%v: %s", r, buf[:l])
- } else {
- log.Error("%v", r)
- }
- }
- }()
- m.mi.Run(m.closeSig)
- m.wg.Done()
- }
日志11、日志12、日志17就是Run()里边打印的
- if app.startup != nil {
- app.startup(app)
- }
- log.Info("mqant %v started", app.opts.Version)
-
- //main.go中
- app.OnStartup(func(app module.App) {
- log.Info("App OnStartup")
- })
日志15、日志16 就是这么来的
Registering node: HelloWorld@a80901baa89b565b
Registering node: Web@cc93a77ac80a144c
一开始真的没有找到入口,到底从哪调用的。。。。通过利用其他手段 获得了调用堆栈
原来是在ModuleManager的OnInit()中 m.mi.OnInit(app, m.settings) 调用模块自己的OnInit()时
子模块的OnInit调用了基类的Oninit
- // OnInit 当模块初始化时调用
- func (m *BaseModule) OnInit(subclass module.RPCModule, app module.App, settings *conf.ModuleSettings, opt ...server.Option) {
- //初始化模块
- m.App = app
- m.subclass = subclass
- m.settings = settings
- //创建一个远程调用的RPC
-
- opts := server.Options{
- Metadata: map[string]string{},
- }
- for _, o := range opt {
- o(&opts)
- }
- if opts.Registry == nil {
- opt = append(opt, server.Registry(app.Registry()))
- }
-
- if opts.RegisterInterval == 0 {
- opt = append(opt, server.RegisterInterval(app.Options().RegisterInterval))
- }
-
- if opts.RegisterTTL == 0 {
- opt = append(opt, server.RegisterTTL(app.Options().RegisterTTL))
- }
-
- if len(opts.Name) == 0 {
- opt = append(opt, server.Name(subclass.GetType()))
- }
-
- if len(opts.ID) == 0 {
- opt = append(opt, server.ID(mqanttools.GenerateID().String()))
- }
-
- if len(opts.Version) == 0 {
- opt = append(opt, server.Version(subclass.Version()))
- }
- server := server.NewServer(opt...)
- err := server.OnInit(subclass, app, settings)
- if err != nil {
- log.Warning("server OnInit fail id(%s) error(%s)", m.GetServerID(), err)
- }
- hostname, _ := os.Hostname()
- server.Options().Metadata["hostname"] = hostname
- server.Options().Metadata["pid"] = fmt.Sprintf("%v", os.Getpid())
- ctx, cancel := context.WithCancel(context.Background())
- m.exit = cancel
- m.serviceStopeds = make(chan bool)
- m.service = service.NewService(
- service.Server(server),
- service.RegisterTTL(app.Options().RegisterTTL),
- service.RegisterInterval(app.Options().RegisterInterval),
- service.Context(ctx),
- )
-
- go func() {
- err := m.service.Run()
- if err != nil {
- log.Warning("service run fail id(%s) error(%s)", m.GetServerID(), err)
- }
- close(m.serviceStopeds)
- }()
- m.GetServer().SetListener(m)
- }
也就是说每个模块启动都会创建一个RPC 服务器。。。