• C/C++|基于回调函数实现异步操作


    首先,要搞懂一点,异步操作本质上也是并发,我们想要在线程级别实现异步并发基本就靠三种方式:

    • 多线程并发
    • 回调函数
    • 协程

    今天我们讨论的是回调函数,我们如何通过回调函数来实现异步操作呢?

    1. 非阻塞I/O操作+回调函数实现异步IO
    2. 基于定时器+回调函数实现异步任务调度
    3. 事件队列+回调函数

    我们就分别来实现一下他们吧,代码比较长,请耐心阅读。

    非阻塞I/O+回调

    这种方式允许程序在等待 I/O 操作完成时继续执行其他任务,从而提高并发性和性能。

    我们可以使用标准 C++ 库和 C++11 中的线程库来实现一个基于 epoll 的非阻塞 I/O 和回调函数的示例。epoll 是 Linux 提供的高效 I/O 多路复用机制,适用于处理大量并发连接。我们将使用 epoll 来实现异步 I/O,并使用回调函数处理完成的 I/O 操作。

    实例代码:使用 epoll 实现非阻塞 I/O 和回调函数

    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    
    void set_non_blocking(int fd) {
    	int flags = fcntl(fd, F_GETFL, 0);
    	if (flage == -1) {
    		throw std::runtime_error("fcntl get error");
    	}
    	if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
    		throw std::runtime_error("fcntl set error");
    	}
    }
    
    //事件处理器类
    class EventLoop {
    public:
    	EventLoop() {
    		epollFd_ = epoll_create1(0);
    		if (epoll_fd == -1) {
    			throw std::runtime_error("epoll_creat1 error");
    		}
    	}
    
    	~EpollLoop() {
    		close(epoll_fd);
    	}
    
    	//添加文件描述符及其对应的回调函数
    	void add_fd(int fd, std::function<void(int)> cb) {
    		set_non_blocking(fd);
    		epoll_event event;
    		event.events = EPOLLIN | EPOLLET;
    		event.data.fd = fd;
    		if (epoll_ctl(epollFd_, EPOLL_CTL_ADD, fd, &event) == -1) {
    			throw std::runtime_error("epoll_ctl add error");
    		}
    		callbacks_[fd] = cb;
    	}
    
    	//运行时间循环
    	void run() {
    		while (running_) {
    			std::vector<epoll_event> events(10);
    			int numActives = epoll_wait(epollFd_, events.data(), events.size(), -1);
    			if (numActives == -1) {
    				throw std::runtime_error("epoll_wait error");
    			}
    			for (int i = 0; i < n; ++i) {
    				int fd = events[i].data.fd;
    				if (callbacks_.find(fd) != callbacks_.end()) 
    					callbacks[fd](fd);
    			}
    		}
    	}
    
    	//停止事件循环
    	void stop() {
    		return = false;
    	}
    
    private:
    	int epollFd_; //epoll实例
    	std::unordered_map<int, std::function<void(int)>> callbacks_;
    	bool running_ = true;
    };
    
    
    // 示例回调函数,处理标准输入的读取
    void handle_stdin(int fd) {
    	char buffer[128];
    	ssize_t n = read(fd, buffer, sizeof(buffer) - 1);
    	if (n > 0) {
    		buffer[n] = '\0';
    		std::cout << "Read from stdin: " << buffer << std::endl;
    	} else if (n == 0) {
    		std::cout << "EOF from stdin" << std::endl;
    	} else {
    	if (errno != EAGAIN && errno != EWOULDBLOCK) {
    		std::ceer << "Read error: " <<  strerror(errno) << std::endl;
    		}	
    	}
    }
    
    int main () {
    	try {
    		EventLoop loop;
    
    		//注册标准输入的文件描述符和回调函数
    		loop.add_fd(STDIN_FILENO, handle_stdin);
    
    		//运行事件循环
    		std::thread loop_thread([&loop]() {
    			loop.run();
    		});
    
    		//模拟只线程的其他工作
    		std::this_thread::sleep_for(std::chrono::seconds(10));
    
    		//停止事件循环
    		loop.stop();
    		loop_thread.join();
    	} catch (const std::exception &e) {
    		std::cerr << "Exception: " << e.what() << std::endl;
    		return 1
    	}
    
    	return 0;
    }
    

    在该示例中,由于是非阻塞IO,所以如果有两个 fd 对应的事件被激活,那么他们会循环执行各自的回调操作,因为我们的EventLoop里面是又一个 while 循环的,如果这两个 fd 如果都是读一个很大很大的文件,那么他们在while循环中会交替执行各自的回调任务,实现异步操作。

    基于定时器+回调函数实现异步任务调度

    通过定时器和回调函数,可以在单线程环境中实现异步任务调度。在这个示例中,我们实现了一个简单的 Web 服务器定时清理过期会话的功能。类似的技术可以应用于其他需要定时任务调度的场景,如定时备份、日志轮换、定时数据采集等。

    1. SessionManager 类

    这个类负责管理用户会话,提供添加、删除和清理过期会话的方法。

    class SessionManager {
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    
    class SessionManager {
    public:
        void add_session(const std::string& session_id) {
            sessions[session_id] = std::chrono::steady_clock::now();
        }
    
        void remove_session(const std::string& session_id) {
            sessions.erase(session_id);
        }
    
        void clean_expired_sessions(std::chrono::seconds timeout) {
            auto now = std::chrono::steady_clock::now();
            for (auto it = sessions.begin(); it != sessions.end(); ) {
                if (now - it->second > timeout) {
                    std::cout << "Removing expired session: " << it->first << std::endl;
                    it = sessions.erase(it);
                } else {
                    ++it;
                }
            }
        }
    
    private:
        std::unordered_map<std::string, std::chrono::steady_clock::time_point> sessions;
    };
    
    • add_session:添加一个新会话,记录会话 ID 和当前时间。
    • remove_session:删除指定的会话。
    • clean_expired_sessions:清理过期会话,检查所有会话,如果会话时间超过指定的超时时间,则删除会话。

    2. EventLoop类

    这个类管理定时任务,通过维护一个事件列表并运行事件循环,在指定时间间隔内执行任务。

    class EventLoop {
    public:
        void add_event(std::function<void()> callback, std::chrono::milliseconds interval) {
            auto next_run = std::chrono::steady_clock::now() + interval;
            events.emplace_back(next_run, interval, callback);
        }
    
        void run() {
            while (running) {
                auto now = std::chrono::steady_clock::now();
                for (auto& event : events) {
                    if (now >= event.next_run) {
                        event.callback();
                        event.next_run = now + event.interval;
                    }
                }
                std::this_thread::sleep_for(std::chrono::milliseconds(10)); // 小休眠,减少CPU占用
            }
        }
    
        void stop() {
            running = false;
        }
    
    private:
        struct Event {
            std::chrono::steady_clock::time_point next_run;
            std::chrono::milliseconds interval;
            std::function<void()> callback;
    
            Event(std::chrono::steady_clock::time_point nr, std::chrono::milliseconds i, std::function<void()> cb)
                : next_run(nr), interval(i), callback(cb) {}
        };
    
        std::vector<Event> events;
        bool running = true;
    };
    
    • add_event:添加一个新的定时事件,指定回调函数和时间间隔。
    • run:启动事件循环,循环检查所有事件,如果事件到期则执行回调函数,并更新下一次执行时间。
    • stop:停止事件循环。

    3. 回调函数

    定义一个回调函数,用于清理过期会话。
    如果我们定义两个不同任务的回调函数,那么是不是就可以在某段时间实现多个任务了呢?

    void clean_sessions_task(SessionManager& session_manager, std::chrono::seconds timeout) {
        session_manager.clean_expired_sessions(timeout);
    }
    

    这个函数调用 SessionManager 的 clean_expired_sessions 方法来清理过期会话。

    4. main 函数

    int main() {
        SessionManager session_manager;
        EventLoop event_loop;
    
        // 添加一些示例会话
        session_manager.add_session("session1");
        std::this_thread::sleep_for(std::chrono::seconds(1));
        session_manager.add_session("session2");
    
        // 每2秒清理一次过期会话(假设过期时间为3秒)
        event_loop.add_event(std::bind(clean_sessions_task, std::ref(session_manager), std::chrono::seconds(3)), std::chrono::seconds(2));
    
        // 运行事件循环
        std::thread event_loop_thread([&event_loop]() {
            event_loop.run();
        });
    
        // 模拟服务器运行
        std::this_thread::sleep_for(std::chrono::seconds(10));
    
        // 停止事件循环
        event_loop.stop();
        event_loop_thread.join();
    
        return 0;
    }
    
    1. 创建 SessionManager 和 EventLoop 对象。
    2. 添加两个示例会话,分别在 0 秒和 1 秒时添加。
    3. 添加一个定时事件,每 2 秒调用一次 clean_sessions_task 来清理过期会话,过期时间为 3 秒。
    4. 启动一个线程运行事件循环。
    5. 主线程模拟服务器运行 10 秒钟。
    6. 停止事件循环并等待事件循环线程结束。

    如果我们添加两个定时事件,并且打开循环,那么不是就在某段时间实现了异步任务调度呢?

    基于事件队列+回调函数实现异步操作

    这个示例展示了如何使用标准 C++ 库和 C++11 线程库,通过事件队列和回调函数在单线程中实现异步任务调度。具体来说,我们实现了一个简单的系统,可以调度并执行延迟任务。

    1.EventLoop 类

    这个类管理定时任务,通过维护一个事件列表并运行事件循环,在指定时间间隔内执行任务。

    class EventLoop {
    public:
        // 添加一个新的定时事件
        void add_event(std::function<void()> callback, std::chrono::milliseconds interval) {
            auto next_run = std::chrono::steady_clock::now() + interval;
            events.emplace_back(next_run, interval, callback);
        }
    
        // 运行事件循环
        void run() {
            while (running) {
                auto now = std::chrono::steady_clock::now();
                for (auto& event : events) {
                    if (now >= event.next_run) {
                        event.callback(); // 执行回调函数
                        event.next_run = now + event.interval; // 更新下一次执行时间
                    }
                }
                std::this_thread::sleep_for(std::chrono::milliseconds(10)); // 小休眠,减少CPU占用
            }
        }
    
        // 停止事件循环
        void stop() {
            running = false;
        }
    
    private:
        struct Event {
            std::chrono::steady_clock::time_point next_run;
            std::chrono::milliseconds interval;
            std::function<void()> callback;
    
            Event(std::chrono::steady_clock::time_point nr, std::chrono::milliseconds i, std::function<void()> cb)
                : next_run(nr), interval(i), callback(cb) {}
        };
    
        std::vector<Event> events;
        bool running = true;
    };
    
    • add_event:添加一个新的定时事件,指定回调函数和时间间隔。
      • 参数 callback 是一个回调函数,当事件触发时调用。
      • 参数 interval 是一个时间间隔,表示事件应该在多长时间后执行。
      • next_run 记录了事件的下一次执行时间。
    • run:启动事件循环,循环检查所有事件,如果事件到期则执行回调函数,并更新下一次执行时间。
      • while (running):事件循环在 running 为 true 时持续运行。
      • event.callback():执行回调函数,表示事件触发。
      • event.next_run = now + event.interval:更新事件的下一次执行时间。
      • std::this_thread::sleep_for(std::chrono::milliseconds(10)):通过短暂休眠来减少 CPU 占用。
    • stop:停止事件循环,将 running 设置为 false。

    2. 回调函数

    定义两个简单的回调函数,用于示例任务。

    void say_hello() {
        std::cout << "Hello, World!" << std::endl;
    }
    
    void say_goodbye() {
        std::cout << "Goodbye, World!" << std::endl;
    }
    
    

    3.main函数

    设置并运行事件循环,添加一些示例任务,并定期执行这些任务。

    int main() {
        EventLoop loop;
    
        // 添加定时事件
        loop.add_event(say_hello, std::chrono::seconds(1)); // 每1秒执行一次
        loop.add_event(say_goodbye, std::chrono::seconds(2)); // 每2秒执行一次
    
        // 运行事件循环
        loop.run();
    
        return 0;
    }
    

    异步任务的实现

    异步任务的实现核心在于 EventLoop 类中的 add_event 和 run 方法:

    • add_event 方法将任务(回调函数)和执行时间(间隔)添加到事件队列中。
    • run 方法启动事件循环,定期检查事件队列,触发到期的任务,并通过回调函数执行这些任务。

    具体来说,异步任务的执行发生在以下代码行:

    if (now >= event.next_run) {
        event.callback(); // 执行回调函数
        event.next_run = now + event.interval; // 更新下一次执行时间
    }
    
    • event.callback():当当前时间 now 大于等于 event.next_run 时,执行回调函数,表示异步任务触发并执行。

    这个机制允许在单线程环境中,通过事件队列和回调函数实现异步任务调度,无需多线程或协程。每个任务在指定的时间间隔后触发,保持事件循环的运行和任务的调度。

    事件队列\定时器 + 回调真的能实现异步任务吗

    当回调任务是一个非常耗时的操作时,如果在单线程中执行,确实会导致事件循环被阻塞,从而影响其他任务的执行。这违背了异步执行的初衷,即不阻塞程序的其他部分。

    为了解决这个问题,主要还是以下方法:

    • 使用线程池
      • 我们可以使用一个线程池来处理耗时的任务,从而避免阻塞事件循环。线程池可以并发执行多个任务,确保事件循环保持响应。
  • 相关阅读:
    LLM-文本分块(langchain)与向量化(阿里云DashVector)存储,嵌入LLM实践
    产品经理懂点技术:几种常用的系统开发方法
    The Mittag-Leffler function
    spring cloud、gradle、父子项目、微服务框架搭建---cloud gateway(十)
    Docker专题(三)之 访问Docker仓库
    从ArcGIS两个DEM数据镶嵌结果错误说起——聊一聊像素类型和像素深度
    Spark集成hudi创建表报错
    有趣的 Kotlin 0x0F:Definitely non-nullable types
    useRef和useState在 react Hooks中用法
    阿里云python-SDK配置
  • 原文地址:https://blog.csdn.net/caiziming_001/article/details/139395582