• 服务端ZMQ(二)——管道通信方式


    服务端ZMQ(二)——管道通信方式

    1、管道模式

    • 管道模式也称为“流水线”模式
    • 管道模式用于将数据分发到布置在流水线中的节点。数据始终沿流水线向下流动,流水线的每一级都连接到至少一个节点。当流水线级连接到多个节点时,数据在所有连接的节点之间进行轮询

    ZMQ管道模式套接字

    • ZMQ_PUSH
    • ZMQ_PULL

    具体模型如下:

    在这里插入图片描述

    Ventilator:任务发布器会生成大量可以并行运算的任务。

    Worker:有一组worker会处理这些任务

    Sink:结果接收器会在末端接收所有的Worker的处理结果,进行汇总

    Worker上游和"任务发布器"相连,下游和"结果接收器"相连

    “任务发布器” 和 "结果接收器"是这个网路结构中比较稳定的部分,由他们绑定至端点

    Worker只是连接两个端点

    需要等Worker全部启动后,在进行任务分发。Socket的连接会消耗一定时间(慢连接), 如果不尽兴同步的话,第一个Worker启动

    会一下子接收很多任务。

    “任务分发器” 会向Worker均匀的分发任务(负载均衡机制)

    “结果接收器” 会均匀地从Worker处收集消息(公平队列机制)

    2、测试用例

    //taskwork
    #include "../zhelpers.hpp"  //具体位置看文件放哪里  最后面会贴出来
    #include 
    
    int main(int argc, char *argv[])
    {
        //创建上下文
        zmq::context_t context(1);
        
        //pull端
        zmq::socket_t pull_t(context, ZMQ_PULL);  
        pull_t.connect("tcp://localhost:5557");
        //push端
        zmq::socket_t push_t(context, ZMQ_PUSH);
        push_t.connect("tcp://localhost:5558");
    
        while (1)
        {
            zmq::message_t msg;
            int workload;  //工作负载 毫秒
    
            pull_t.recv(&msg);
            std::string smessage(static_cast(msg.data()), msg.size());
    
            std::istringstream iss(smessage);
            iss >> workload;
    
            s_sleep(workload);
    
            //将结果推送到pull
            msg.rebuild();
            push_t.send(msg);
    
            std::cout << "." << std::flush;
    
        }
        
        
    
        return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    /*
    	taskpush
    	
        push 绑定到 tcp://localhost:5557
    
        推送任务 给taskwork
    */
    #include 
    #include 
    #include 
    #include 
    #include 
    
    #define within(num) (int) ((float) num * random () / (RAND_MAX + 1.0))
    
    int main (int argc, char *argv[])
    {
        zmq::context_t context (1);
    
        //  push套接字
        zmq::socket_t  sender(context, ZMQ_PUSH);
        sender.bind("tcp://*:5557");
    
        std::cout << "工作进程准备完毕 回车 " << std::endl;
        getchar ();
        std::cout << "发送任务给workers...\n" << std::endl;
    
        //  第一条信息为work ... 表示worker开始工作
        zmq::socket_t sink(context, ZMQ_PUSH);
        sink.connect("tcp://localhost:5558");
        zmq::message_t message(2);
        memcpy(message.data(), "work ...", 1);
        sink.send(message);
    
        //  初始化 随机生成器
        srandom ((unsigned) time (NULL));
    
        //  发送100个任务
        int task_nbr;
        int total_msec = 0;     //  总耗时
        for (task_nbr = 0; task_nbr < 100; task_nbr++) {
            int workload;
            //  随机工作负载为1ms ~ 100ms
            workload = within (100) + 1;
            total_msec += workload;
    
            message.rebuild(10);
            memset(message.data(), '\0', 10);
            sprintf ((char *) message.data(), "%d", workload);
            sender.send(message);
        }
        std::cout << "总耗时: " << total_msec << " 毫秒" << std::endl;
        sleep (1);              //  给处理时间
    
        return 0;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    /*
    	taskpull
    	
        pull 绑定 tcp://localhost:5558
    	
        从 taskwork 获取消息
    
    */
    
    #include 
    #include 
    #include 
    #include 
    
    int main (int argc, char *argv[])
    {
        //  pull套接字
        zmq::context_t context(1);
        zmq::socket_t receiver(context,ZMQ_PULL);
        receiver.bind("tcp://*:5558");
    
        //  等待批处理
        zmq::message_t message;
        receiver.recv(&message);
    
        //  起始计时
        struct timeval tstart;
        gettimeofday (&tstart, NULL);
    
        //  处理100次确认
        int task_nbr;
        int total_msec = 0;     //  总耗时
        for (task_nbr = 0; task_nbr < 100; task_nbr++) {
    
            receiver.recv(&message);
            if (task_nbr % 10 == 0)
                std::cout << ":" << std::flush;
            else
                std::cout << "." << std::flush;
        }
        //  处理时间
        struct timeval tend, tdiff;
        gettimeofday (&tend, NULL);
    
        if (tend.tv_usec < tstart.tv_usec) {
            tdiff.tv_sec = tend.tv_sec - tstart.tv_sec - 1;
            tdiff.tv_usec = 1000000 + tend.tv_usec - tstart.tv_usec;
        }
        else {
            tdiff.tv_sec = tend.tv_sec - tstart.tv_sec;
            tdiff.tv_usec = tend.tv_usec - tstart.tv_usec;
        }
        total_msec = tdiff.tv_sec * 1000 + tdiff.tv_usec / 1000;
        std::cout << "\n总耗时: " << total_msec << " 毫秒重登\n" << std::endl;
        return 0;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    //zhelpers.hpp
    #ifndef __ZHELPERS_HPP_INCLUDED__
    #define __ZHELPERS_HPP_INCLUDED__
    
    //  Include a bunch of headers that we will need in the examples
    
    #include  // https://github.com/zeromq/cppzmq
    
    #include 
    #include 
    #include 
    #include 
    
    #include 
    #include 
    #include         // random()  RAND_MAX
    #include 
    #include 
    #include 
    #if (!defined(WIN32))
    #   include 
    #   include 
    #endif
    
    //  Bring Windows MSVC up to C99 scratch
    #if (defined (WIN32))
        typedef unsigned long ulong;
        typedef unsigned int  uint;
        typedef __int64 int64_t;
    #endif
    
    //  On some version of Windows, POSIX subsystem is not installed by default.
    //  So define srandom and random ourself.
    //  
    #if (defined (WIN32))
    #   define srandom srand
    #   define random rand
    #endif
    
    // Visual Studio versions below 2015 do not support sprintf properly. This is a workaround.
    // Taken from http://stackoverflow.com/questions/2915672/snprintf-and-visual-studio-2010
    #if defined(_MSC_VER) && _MSC_VER < 1900
    
    #define snprintf c99_snprintf
    #define vsnprintf c99_vsnprintf
    
    	inline int c99_vsnprintf(char *outBuf, size_t size, const char *format, va_list ap)
    	{
    		int count = -1;
    
    		if (size != 0)
    			count = _vsnprintf_s(outBuf, size, _TRUNCATE, format, ap);
    		if (count == -1)
    			count = _vscprintf(format, ap);
    
    		return count;
    	}
    
    	inline int c99_snprintf(char *outBuf, size_t size, const char *format, ...)
    	{
    		int count;
    		va_list ap;
    
    		va_start(ap, format);
    		count = c99_vsnprintf(outBuf, size, format, ap);
    		va_end(ap);
    
    		return count;
    	}
    
    #endif
    
    //  Provide random number from 0..(num-1)
    #define within(num) (int) ((float)((num) * random ()) / (RAND_MAX + 1.0))
    
    //  Receive 0MQ string from socket and convert into C string
    //  Caller must free returned string.
    inline static char *
    s_recv(void *socket, int flags = 0) {
    	zmq_msg_t message;
    	zmq_msg_init(&message);
    
    	int rc = zmq_msg_recv(&message, socket, flags);
    
    	if (rc < 0)
    		return nullptr;           //  Context terminated, exit
    
    	size_t size = zmq_msg_size(&message);
    	char *string = (char*)malloc(size + 1);
    	memcpy(string, zmq_msg_data(&message), size);
    	zmq_msg_close(&message);
    	string[size] = 0;
    	return (string);
    }
    
    //  Receive 0MQ string from socket and convert into string
    inline static std::string
    s_recv (zmq::socket_t & socket, int flags = 0) {
    
        zmq::message_t message;
        socket.recv(&message, flags);
    
        return std::string(static_cast(message.data()), message.size());
    }
    
    inline static bool s_recv(zmq::socket_t & socket, std::string & ostring, int flags = 0)
    {
    	zmq::message_t message;
    	bool rc = socket.recv(&message, flags);
    
    	if (rc) {
    		ostring = std::string(static_cast(message.data()), message.size());
    	}
    	
    	return (rc);
    }
    
    //  Convert C string to 0MQ string and send to socket
    inline static int
    s_send(void *socket, const char *string, int flags = 0) {
    	int rc;
    	zmq_msg_t message;
    	zmq_msg_init_size(&message, strlen(string));
    	memcpy(zmq_msg_data(&message), string, strlen(string));
    	rc = zmq_msg_send(&message, socket, flags);
    	assert(-1 != rc);
    	zmq_msg_close(&message);
    	return (rc);
    }
    
    //  Convert string to 0MQ string and send to socket
    inline static bool
    s_send (zmq::socket_t & socket, const std::string & string, int flags = 0) {
    
        zmq::message_t message(string.size());
        memcpy (message.data(), string.data(), string.size());
    
        bool rc = socket.send (message, flags);
        return (rc);
    }
    
    //  Sends string as 0MQ string, as multipart non-terminal
    inline static int
    s_sendmore(void *socket, char *string) {
    	int rc;
    	zmq_msg_t message;
    	zmq_msg_init_size(&message, strlen(string));
    	memcpy(zmq_msg_data(&message), string, strlen(string));
    	//rc = zmq_send(socket, string, strlen(string), ZMQ_SNDMORE);
    	rc = zmq_msg_send(&message, socket, ZMQ_SNDMORE);
    	assert(-1 != rc);
    	zmq_msg_close(&message);
    	return (rc);
    }
    
    //  Sends string as 0MQ string, as multipart non-terminal
    inline static bool
    s_sendmore (zmq::socket_t & socket, const std::string & string) {
    
        zmq::message_t message(string.size());
        memcpy (message.data(), string.data(), string.size());
    
        bool rc = socket.send (message, ZMQ_SNDMORE);
        return (rc);
    }
    
    //  Receives all message parts from socket, prints neatly
    //
    inline static void
    s_dump (zmq::socket_t & socket)
    {
        std::cout << "----------------------------------------" << std::endl;
    
        while (1) {
            //  Process all parts of the message
            zmq::message_t message;
            socket.recv(&message);
    
            //  Dump the message as text or binary
            size_t size = message.size();
            std::string data(static_cast(message.data()), size);
    
            bool is_text = true;
    
            size_t char_nbr;
            unsigned char byte;
            for (char_nbr = 0; char_nbr < size; char_nbr++) {
                byte = data [char_nbr];
                if (byte < 32 || byte > 127)
                    is_text = false;
            }
            std::cout << "[" << std::setfill('0') << std::setw(3) << size << "]";
            for (char_nbr = 0; char_nbr < size; char_nbr++) {
                if (is_text)
                    std::cout << (char)data [char_nbr];
                else
                    std::cout << std::setfill('0') << std::setw(2)
                       << std::hex << (unsigned int) data [char_nbr];
            }
            std::cout << std::endl;
    
            int more = 0;           //  Multipart detection
            size_t more_size = sizeof (more);
            socket.getsockopt (ZMQ_RCVMORE, &more, &more_size);
            if (!more)
                break;              //  Last message part
        }
    }
    
    #if (!defined (WIN32))
    //  Set simple random printable identity on socket
    //  Caution:
    //    DO NOT call this version of s_set_id from multiple threads on MS Windows
    //    since s_set_id will call rand() on MS Windows. rand(), however, is not 
    //    reentrant or thread-safe. See issue #521.
    inline std::string
    s_set_id (zmq::socket_t & socket)
    {
        std::stringstream ss;
        ss << std::hex << std::uppercase
           << std::setw(4) << std::setfill('0') << within (0x10000) << "-"
           << std::setw(4) << std::setfill('0') << within (0x10000);
        socket.setsockopt(ZMQ_IDENTITY, ss.str().c_str(), ss.str().length());
        return ss.str();
    }
    #else
    // Fix #521
    inline std::string
    s_set_id(zmq::socket_t & socket, intptr_t id)
    {
        std::stringstream ss;
        ss << std::hex << std::uppercase
            << std::setw(4) << std::setfill('0') << id;
        socket.setsockopt(ZMQ_IDENTITY, ss.str().c_str(), ss.str().length());
        return ss.str();
    }
    #endif
    
    //  Report 0MQ version number
    //
    inline static void
    s_version (void)
    {
        int major, minor, patch;
        zmq_version (&major, &minor, &patch);
        std::cout << "Current 0MQ version is " << major << "." << minor << "." << patch << std::endl;
    }
    
    inline static void
    s_version_assert (int want_major, int want_minor)
    {
        int major, minor, patch;
        zmq_version (&major, &minor, &patch);
        if (major < want_major
        || (major == want_major && minor < want_minor)) {
            std::cout << "Current 0MQ version is " << major << "." << minor << std::endl;
            std::cout << "Application needs at least " << want_major << "." << want_minor
                  << " - cannot continue" << std::endl;
            exit (EXIT_FAILURE);
        }
    }
    
    //  Return current system clock as milliseconds
    inline static int64_t
    s_clock (void)
    {
    #if (defined (WIN32))
    	FILETIME fileTime;
    	GetSystemTimeAsFileTime(&fileTime);
    	unsigned __int64 largeInt = fileTime.dwHighDateTime;
    	largeInt <<= 32;
    	largeInt |= fileTime.dwLowDateTime;
    	largeInt /= 10000; // FILETIME is in units of 100 nanoseconds
    	return (int64_t)largeInt;
    #else
        struct timeval tv;
        gettimeofday (&tv, NULL);
        return (int64_t) (tv.tv_sec * 1000 + tv.tv_usec / 1000);
    #endif
    }
    
    //  Sleep for a number of milliseconds
    inline static void
    s_sleep (int msecs)
    {
    #if (defined (WIN32))
        Sleep (msecs);
    #else
        struct timespec t;
        t.tv_sec = msecs / 1000;
        t.tv_nsec = (msecs % 1000) * 1000000;
        nanosleep (&t, NULL);
    #endif
    }
    
    inline static void
    s_console (const char *format, ...)
    {
        time_t curtime = time (NULL);
        struct tm *loctime = localtime (&curtime);
        char *formatted = new char[20];
        strftime (formatted, 20, "%y-%m-%d %H:%M:%S ", loctime);
        printf ("%s", formatted);
        delete[] formatted;
    
        va_list argptr;
        va_start (argptr, format);
        vprintf (format, argptr);
        va_end (argptr);
        printf ("\n");
    }
    
    //  ---------------------------------------------------------------------
    //  Signal handling
    //
    //  Call s_catch_signals() in your application at startup, and then exit
    //  your main loop if s_interrupted is ever 1. Works especially well with
    //  zmq_poll.
    
    static int s_interrupted = 0;
    inline static void s_signal_handler (int signal_value)
    {
        s_interrupted = 1;
    }
    
    inline static void s_catch_signals ()
    {
    #if (!defined(WIN32))
        struct sigaction action;
        action.sa_handler = s_signal_handler;
        action.sa_flags = 0;
        sigemptyset (&action.sa_mask);
        sigaction (SIGINT, &action, NULL);
        sigaction (SIGTERM, &action, NULL);
    #endif
    }
    
    
    
    #endif
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252
    • 253
    • 254
    • 255
    • 256
    • 257
    • 258
    • 259
    • 260
    • 261
    • 262
    • 263
    • 264
    • 265
    • 266
    • 267
    • 268
    • 269
    • 270
    • 271
    • 272
    • 273
    • 274
    • 275
    • 276
    • 277
    • 278
    • 279
    • 280
    • 281
    • 282
    • 283
    • 284
    • 285
    • 286
    • 287
    • 288
    • 289
    • 290
    • 291
    • 292
    • 293
    • 294
    • 295
    • 296
    • 297
    • 298
    • 299
    • 300
    • 301
    • 302
    • 303
    • 304
    • 305
    • 306
    • 307
    • 308
    • 309
    • 310
    • 311
    • 312
    • 313
    • 314
    • 315
    • 316
    • 317
    • 318
    • 319
    • 320
    • 321
    • 322
    • 323
    • 324
    • 325
    • 326
    • 327
    • 328
    • 329
    • 330
    • 331
    • 332
    • 333
    • 334
    • 335
    • 336
    • 337
    • 338
    • 339
    • 340
    • 341

    示例结果: (可以建多个worker 多个pull 多个push 看一下 这里就不贴了)

    在这里插入图片描述

    参考案例:https://github.com/booksbyus/zguide/tree/master/examples

  • 相关阅读:
    selenium常见异常以及处理方法
    vscode终端显示有错误
    揭秘 JDQ 限流架构:实时数据链路的多维动态带宽管控|京东零售技术实践
    三维重建一种实现算法
    【算法系列 | 8】深入解析查找算法之—二分查找
    QT:使用行编辑器、滑动条、滚动条、进度条、定时器
    C++笔记之popen()和std_system()和std_async()执行系统命令比较
    使用 Aeraki Mesh 实现零代码侵入的 Dubbo 服务调用跟踪
    【追光者】大学即(已)将(经)毕业,四年,我的所感所想(部分)。
    翻译: GitHub Copilot开启AI自动生成代码的时代
  • 原文地址:https://blog.csdn.net/weixin_43730892/article/details/127693656