• zmq封装


    ZmqBindlib

    zmq常用封装

    使用方法

    基本使用

    1.简单请求回复

     ZmqRequest request = new ZmqRequest();
                request.RemoteAddress = localaddes;
                request.PubClient = "A";
                int num = 0;
                while (true)
                {
                    //   Thread.Sleep(1000);
                    //string msg = request.Request("hi");
                    Person p=  request.Request(new Person { Name = "jin", Description = "请求", Id = num++, Title = "rr" });
                    Console.WriteLine(p.Description+p.Name);
                }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
        ZmqResponse rep = new ZmqResponse();
                rep.LocalAddress = localaddes;
                rep.Start();
                int num = 0;
                //rep.ByteReceived += (sender, e) =>
                //{
                //    Console.WriteLine(System.Text.Encoding.Default.GetString(e));
                //    rep.Response("word"+num++);
                //};
                rep.StringReceived += (sender, e) =>
                {
                    Console.WriteLine(e);
                    if (e == "hi")
                    {
                        Thread.Sleep(1000);
                    }
                    rep.Response("word" + num++);
                };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    2.异步下的请求回复,类似TCP,支持多请求

      server =new EhoServer();
                server.RouterAddress = "tcp://127.0.0.1:66666";//服务地址,请求的远端地址
                //  server.ByteReceived += Server_ByteReceived;
              // server.StringReceived += Server_StringReceived1; 
                server.Start();
    
    • 1
    • 2
    • 3
    • 4
    • 5
     private static void Server_StringReceived1(object? sender, RspSocket e)
            {
                Console.WriteLine(e.Message);
                if (e.Message == "hi")
                {
                  //  Thread.Sleep(4000);
                    e.Response("jinyu");
                    return;
                }
                e.Response("word");
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    		 private static  void recvice()
            {
                while (true)
                {
                    var ss = server.GetMsg();
                    ss.Message.Description = "回复"+ss.Message.Id;
                    ss.Response(ss.Message);
                }
            }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    3.订阅发布

    	````
    	  ZmqSubscriber sub = new ZmqSubscriber();
            sub.Address = new string[] { localaddes };
            sub.Subscribe("A");
           // sub.ByteReceived += Sub_ByteReceived;
            sub.StringReceived += Sub_StringReceived;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
      		  ZmqPublisher pub = new ZmqPublisher();
              pub.LocalAddress =localaddes;
             // pub.IsProxy = true; 是否使用中间代理
              int num = 0;
              while (true)
              {
                 // Thread.Sleep(1000);
                  pub.Publish("A", "ssss"+num++);
              }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
        		static void Proxy()
            {
        	//中间代理
                ZmqDDSProxy.PubAddress = "tcp://127.0.0.1:7771";//注意,客户端订阅此地址
                ZmqDDSProxy.SubAddress = "tcp://127.0.0.1:7772";//客户端发布此地址
                ZmqDDSProxy.Start();
            }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        	
        ## 中心高可用部署
        1.推荐方式
          使用IP漂移:
          1. windows  
             使用DNS+VLS;Panguha软件
          2.Linux
             使用keppalive
        
        2.使用封装
          该功能前提是可以使用广播,可以允许少量数据丢失;
          (1)请求返回模式
              中心:
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
              EhoServer eho = new EhoServer();
                eho.IsCluster = true;
                eho.DealerAddress = "inproc://server";
                eho.RouterAddress = "tcp://127.0.0.1:5550";
        
                eho.StringReceived += EhoServer_StringReceived;
                eho.Start();
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
            客户端:与单个一致
        
            (2)订阅发布
        
            中心:
        
        • 1
        • 2
        • 3
        • 4
        • 5
          ZmqDDSProxy.PubAddress = "tcp://127.0.0.1:2222";
                ZmqDDSProxy.SubAddress = "tcp://127.0.0.1:4444";
                ZmqDDSProxy.IsCluster=true;
             ZmqDDSProxy.Start();
        
        • 1
        • 2
        • 3
        • 4
        
            发布端:
        
        
        • 1
        • 2
        • 3
                ZmqPublisher pub = new ZmqPublisher();
                pub.Address = "tcp://127.0.0.1:5678";
                pub.IsProxy = true; //是否使用中间代理
                pub.IsDDS = true;//高可用启动
                int num = 0;
                while (true)
                {
                    Thread.Sleep(1000);
                    try
                    {
                        pub.Publish("A", "ssss" + num++);
                    }catch(Exception ex)
                    {
                        Console.WriteLine(ex.ToString());
                    }
        
                }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
        • 14
        • 15
        • 16
        • 17
        
                    订阅端:
        
        
        • 1
        • 2
        • 3
                  ZmqSubscriber sub = new ZmqSubscriber();
                sub.Address = new string[] { "tcp://127.0.0.1:1234" };
                sub.IsDDS = true;//高可用启动
                sub.Subscribe("");
               // sub.ByteReceived += Sub_ByteReceived;
                sub.StringReceived += Sub_StringReceived;
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
                    对于发布订阅,中心何发布订阅端都需要启动高可用,会刷新地址
        
                    (3)负载均衡式订阅发布
                     该模式是仿照kafka功能的;
                     中心:
        
        • 1
        • 2
        • 3
        • 4
        • 5
                   ZmqDDSProxy.PubAddress = "tcp://127.0.0.1:2222";
                ZmqDDSProxy.SubAddress = "tcp://127.0.0.1:4444";
                ZmqDDSProxy.IsCluster = true;//高可用
                ZmqDDSProxy.StartProxy(); //注意方法,启动和另外发布订阅方法不同
        
        • 1
        • 2
        • 3
        • 4
                    发布端:和前面一样
        
                    订阅端:
        
        • 1
        • 2
        • 3
               ZmqSubscriberGroup zmqSubscriber=new ZmqSubscriberGroup();
                zmqSubscriber.Address = "tcp://127.0.0.1:1234";
                zmqSubscriber.IsDDS= true;//高可用
               // zmqSubscriber.Indenty = "test";//订阅在不同分组
                zmqSubscriber.Subscribe("A");
                zmqSubscriber.StringReceived += ZmqSubscriber_StringReceived;
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
                    (4)kafka封装
        
        • 1
                 KafkaPublisher kafkaPublisher = new KafkaPublisher();
                    int num = 0;
                    while (true)
                    {
                        Thread.Sleep(1000);
                        kafkaPublisher.Push("A", "SSSSS"+num++);
                    }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
                      KafkaSubscriber  kafkaSubscriber = new KafkaSubscriber();
                      kafkaSubscriber.Subscriber("A");
                      kafkaSubscriber.Consume(p =>
                      {
                          if(p==null)
                          {
                              return;
                          }
                          Console.WriteLine(string.Format("Received message at {0}:{1}", p.Topic, p.Value));
                      
                      });
          
          • 1
          • 2
          • 3
          • 4
          • 5
          • 6
          • 7
          • 8
          • 9
          • 10
          • 11
          
          		说明
          		1.接收数据一端,定义了2个事件一个方法,顺序是ByteReceived、StringReceived、GetMsg()方法。一旦前一个实现,后面就无效。
          
          
          项目地址:https://github.com/jinyuttt/ZmqBindlib.git
          
          
          • 1
          • 2
          • 3
          • 4
          • 5
          • 6
          • 7
        • 相关阅读:
          无线振弦采集仪在岩土工程中如何远程监测和远程维护
          FPGA学习笔记(七)verilog的深入学习之任务与函数(语法篇3)
          机器学习笔记 - 常用概率分布
          【探索AI】二十四 深度学习之第7周:深度学习在实际应用中的案例
          源码编译CEF(branch=6045)+mp4+mp3笔记
          机器学习笔记之最优化理论与方法(八)无约束优化问题——常用求解方法(中)
          Tomcat
          java计算机毕业设计物业综合信息管理系统源码+数据库+系统+lw文档+mybatis+运行部署
          云栖大会,未来万物皆是计算机?
          【前端精进之路】JS篇:第15期 JavaScript模块化规范
        • 原文地址:https://blog.csdn.net/jinyuttt/article/details/133961726