目录
线程间事务机制本质上就是线程操作共享资源,最简单的实现,例如利用一个全局变量+全局互斥锁,各个线程就可以简单实现相互消息传递。
在实际应用中,引入事件对象,本质就是将这些共享资源包装成一个事件对象,在调用的过程中,所有线程都可以在一个等待函数中指定事件对象句柄。当指定的事务对象的状态被置为有信号状态时,事务对象等待函数将返回,线程将获得通知而被触发。
在win下,Windows API函数CreateEvent、WaitForSingleObject、SetEvent、ResetEvent、CloseHandle等实现事件对象的创建、设置、取值、重设、删除等操作,其内部使用了线程锁进行对象管理,实现跨线程事务相互调用。
在 linux下,没有提供现成API函数调用,但基于类似实现原理,采用线程互斥锁对象(ptread_mutex_t),通过其相关的pthread_mutex_init、pthread_mutex_lock、pthread_mutex_unlock、pthread_cond_wait、pthread_mutex_destroy等配套函数,实现实现跨线程间事务相互调用。
工程项目如下,测试层逻辑简单设计,创建一个event_handle,将其传递给线程对象,并进行事件等待命令,在主线程进行命令输入来触发事件信号,线程对象获得事件返回来执行后续操作。
- event_test
- bin
- build_win
- build_linux
- src
- event.h
- event.cpp
- myThread.h
- myThread.cpp
- win32Thread.h
- win32Thread.cpp
- testThread.h
- testThrad.cpp
- main.cpp
- CMakeLists.txt
event.h:
- #ifndef _HIK_EVENT_H_
- #define _HIK_EVENT_H_
-
- #ifdef WIN32
- #include
- #define event_handle HANDLE
- #else
- #include
-
- typedef struct
- {
- bool state;
- bool manual_reset;
- pthread_mutex_t mutex;
- pthread_cond_t cond;
- }event_t;
- #define event_handle event_t*
- #endif
-
- //返回值:NULL 出错
- event_handle event_create(bool manual_reset, bool init_state);
-
- //返回值:0 等到事件,-1出错,句柄必须被触发信号后,函数才会返回
- int event_wait(event_handle hevent);
-
- //返回值:0 等到事件,1 超时,-1出错,建议比event_wait优先使用
- int event_timedwait(event_handle hevent, long milliseconds);
-
- //返回值:0 成功,-1出错
- int event_set(event_handle hevent);
-
- //返回值:0 成功,-1出错
- int event_reset(event_handle hevent);
-
- //返回值:无
- void event_destroy(event_handle hevent);
-
- #endif
event.cpp
- #include "event.h"
- #ifdef __linux
- #include
- #include
- #endif
- #include
- event_handle event_create(bool manual_reset, bool init_state)
- {
- #ifdef WIN32
- HANDLE hevent = CreateEvent(NULL, manual_reset, init_state, NULL);
- #else
- event_handle hevent = new(std::nothrow) event_t;
- if (hevent == NULL)
- {
- return NULL;
- }
- hevent->state = init_state;
- hevent->manual_reset = manual_reset;
- if (pthread_mutex_init(&hevent->mutex, NULL))
- {
- delete hevent;
- return NULL;
- }
- if (pthread_cond_init(&hevent->cond, NULL))
- {
- pthread_mutex_destroy(&hevent->mutex);
- delete hevent;
- return NULL;
- }
- #endif
- return hevent;
- }
- int event_wait(event_handle hevent)
- {
- #ifdef WIN32
- DWORD ret = WaitForSingleObject(hevent, INFINITE);
- if (ret == WAIT_OBJECT_0)
- {
- return 0;
- }
- return -1;
- #else
- if (pthread_mutex_lock(&hevent->mutex))
- {
- return -1;
- }
- while (!hevent->state)
- {
- if (pthread_cond_wait(&hevent->cond, &hevent->mutex))
- {
- pthread_mutex_unlock(&hevent->mutex);
- return -1;
- }
- }
- if (!hevent->manual_reset)
- {
- hevent->state = false;
- }
- if (pthread_mutex_unlock(&hevent->mutex))
- {
- return -1;
- }
- return 0;
- #endif
- }
- int event_timedwait(event_handle hevent, long milliseconds)
- {
- #ifdef WIN32
- DWORD ret = WaitForSingleObject(hevent, milliseconds);
- if (ret == WAIT_OBJECT_0)
- {
- return 0;
- }
- if (ret == WAIT_TIMEOUT)
- {
- return 1;
- }
- return -1;
- #else
-
- int rc = 0;
- struct timespec abstime;
- struct timeval tv;
- gettimeofday(&tv, NULL);
- abstime.tv_sec = tv.tv_sec + milliseconds / 1000;
- abstime.tv_nsec = tv.tv_usec*1000 + (milliseconds % 1000)*1000000;
- if (abstime.tv_nsec >= 1000000000)
- {
- abstime.tv_nsec -= 1000000000;
- abstime.tv_sec++;
- }
-
- if (pthread_mutex_lock(&hevent->mutex) != 0)
- {
- return -1;
- }
- while (!hevent->state)
- {
- if ((rc = pthread_cond_timedwait(&hevent->cond, &hevent->mutex, &abstime)))
- {
- if (rc == ETIMEDOUT) break;
- pthread_mutex_unlock(&hevent->mutex);
- return -1;
- }
- }
- if (rc == 0 && !hevent->manual_reset)
- {
- hevent->state = false;
- }
- if (pthread_mutex_unlock(&hevent->mutex) != 0)
- {
- return -1;
- }
- if (rc == ETIMEDOUT)
- {
- //timeout return 1
- return 1;
- }
- //wait event success return 0
- return 0;
- #endif
- }
- int event_set(event_handle hevent)
- {
- #ifdef WIN32
- return !SetEvent(hevent);
- #else
- if (pthread_mutex_lock(&hevent->mutex) != 0)
- {
- return -1;
- }
-
- hevent->state = true;
-
- if (hevent->manual_reset)
- {
- if(pthread_cond_broadcast(&hevent->cond))
- {
- return -1;
- }
- }
- else
- {
- if(pthread_cond_signal(&hevent->cond))
- {
- return -1;
- }
- }
-
- if (pthread_mutex_unlock(&hevent->mutex) != 0)
- {
- return -1;
- }
-
- return 0;
- #endif
- }
- int event_reset(event_handle hevent)
- {
- #ifdef WIN32
- //ResetEvent 返回非零表示成功
- if (ResetEvent(hevent))
- {
- return 0;
- }
- return -1;
- #else
- if (pthread_mutex_lock(&hevent->mutex) != 0)
- {
- return -1;
- }
-
- hevent->state = false;
-
- if (pthread_mutex_unlock(&hevent->mutex) != 0)
- {
- return -1;
- }
- return 0;
- #endif
- }
- void event_destroy(event_handle hevent)
- {
- if(hevent){
- #ifdef WIN32
- CloseHandle(hevent);
- #else
- pthread_cond_destroy(&hevent->cond);
- pthread_mutex_destroy(&hevent->mutex);
- delete hevent;
- #endif
- }
- }
win32Thread.h,win下父线程类实现
- #if _MSC_VER > 1000
- #pragma once
- #endif // _MSC_VER > 1000
- #ifndef WIN32THREAD_H
- #define WIN32THREAD_H
-
- #include
- #include
-
- typedef void *HANDLE;
- class MyThread
- {
- public:
- MyThread();
- ~MyThread();
- void start();
- virtual int Run();
- HANDLE getThread();
- private:
- HANDLE hThread;
- static void agent(void *p);
- };
- #endif
win32Thread.cpp
- #include "win32Thread.h"
-
- #include
-
- MyThread::MyThread()
- {
- }
-
- MyThread::~MyThread()
- {
- WaitForSingleObject(hThread, INFINITE);
- }
-
- void MyThread::start()
- {
- hThread =(HANDLE)_beginthread(agent, 0, (void *)this);
- }
- int MyThread::Run()
- {
- printf("Base Thread\n");
- return 0;
- }
- void MyThread::agent(void *p)
- {
- MyThread *agt = (MyThread *)p;
- agt->Run();
- }
- HANDLE MyThread::getThread()
- {
- return hThread;
- }
myThread.h,Linux下父线程类实现
- /*
- * add arg in linux system and compile: -lpthread
- */
-
- #ifndef _MYTHREAD_H
- #define _MYTHREAD_H
- #include
- #include
-
- class MyThread
- {
- private:
- //current thread ID
- pthread_t tid;
- //thread status
- int threadStatus;
- //get manner pointer of execution
- static void* run0(void* pVoid);
- //manner of execution inside
- void* run1();
- public:
- //threadStatus-new create
- static const int THREAD_STATUS_NEW = 0;
- //threadStatus-running
- static const int THREAD_STATUS_RUNNING = 1;
- //threadStatus-end
- static const int THREAD_STATUS_EXIT = -1;
- // constructed function
- MyThread();
- ~MyThread();
- //the entity for thread running
- virtual int Run()=0;
- //start thread
- bool start();
- //gte thread ID
- pthread_t getThreadID();
- //get thread status
- int getState();
- //wait for thread end
- void join();
- //wait for thread end in limit time
- void join(unsigned long millisTime);
- };
-
- #endif /* _MYTHREAD_H */
myThread.cpp
- #include "myThread.h"
-
- #include
-
- void* MyThread::run0(void* pVoid)
- {
- MyThread* p = (MyThread*) pVoid;
- p->run1();
- return p;
- }
-
- void* MyThread::run1()
- {
- threadStatus = THREAD_STATUS_RUNNING;
- tid = pthread_self();
- Run();
- threadStatus = THREAD_STATUS_EXIT;
- tid = 0;
- pthread_exit(NULL);
- }
-
- MyThread::MyThread()
- {
- tid = 0;
- threadStatus = THREAD_STATUS_NEW;
- }
-
- MyThread::~MyThread()
- {
- join(10);
- }
-
- int MyThread::Run()
- {
- while(true){
- printf("thread is running!\n");
- sleep(100);
- }
- return 0;
- }
-
- bool MyThread::start()
- {
- return pthread_create(&tid, NULL, run0, this) == 0;
- }
-
- pthread_t MyThread::getThreadID()
- {
- return tid;
- }
-
- int MyThread::getState()
- {
- return threadStatus;
- }
-
- void MyThread::join()
- {
- if (tid > 0)
- {
- pthread_join(tid, NULL);
- }
- }
-
- void MyThread::join(unsigned long millisTime)
- {
- if (tid == 0)
- {
- return;
- }
- if (millisTime >0)
- {
- unsigned long k = 0;
- while (threadStatus != THREAD_STATUS_EXIT && k <= millisTime)
- {
- usleep(100);
- k++;
- }
- }
- join();
- }
testThread.h,基于自定义win父线程类或linux父线程类为基类,实现等待事件响应输出
- #if _MSC_VER > 1000
- #pragma once
- #endif // _MSC_VER > 1000
- #ifndef TIME_UP_INFO_H
- #define TIME_UP_INFO_H
- /*
- 测试线程,测试事件类跨线程通信机制.
- */
- #ifdef WIN32
- #include "win32Thread.h"
- #endif
-
- #ifdef linux
- #include "myThread.h"
- #endif
- #include "event.h"
-
- class TestThread : public MyThread
- {
- public:
- TestThread(event_handle handle_);
- ~TestThread();
- int Run();
- private:
- bool running;
- event_handle handle_test;
- };
- #endif
testThread.cpp
- #include "testThread.h"
-
- #include
- #ifdef WIN32
- #include
- #endif // WIN32
- #ifdef linux
- #include
- #endif
-
- TestThread::TestThread(event_handle handle_)
- : running(true)
- , handle_test(handle_)
- {
-
- };
-
- TestThread::~TestThread()
- {
- running = false;
- };
-
- int TestThread::Run()
- {
- int i=0,j=0;
- while (running)
- {
- //超时等待及持续等待测试
- if(0==event_timedwait(handle_test,10000))
- //if(0==event_wait(handle_test))
- {
- printf("event_wait success %d \n",++i);
- }else{
- printf("event_wait failed %d \n",++j);
- }
- #ifdef WIN32
- Sleep(10);
- #else
- usleep(10000);
- #endif
- }
- return 0;
- };
main.cpp,创建一个事件对象,并传给测试线程类创建一个线程对象,在主线程捕获用户输入来触发事件设置,测试线程对象是否顺便获得事务通知,实现线程间消息传递。
- #ifdef WIN32
- #include
- #endif
- #ifdef linux
- #include
- #endif
-
- #include "event.h"
- #include "testThread.h"
-
- int main(int argc, char* argv[])
- {
- event_handle handle_ = event_create(false,true);
- //event_handle handle_ = event_create(true,true);
- if(NULL==handle_)
- {
- printf("event_create failed!\n");
- return 0;
- }
- TestThread th(handle_);
- th.start();
- bool bExit = false;
- char ch = '0';
- int i=1, j=0;
- while(!bExit)
- {
- ch = getchar();
- switch(ch)
- {
- case 'q':
- bExit = true;
- break;
- case 'w':
- if(0==event_set(handle_))
- printf("event_set success %d \n",++i);
- break;
- case 's':
- if(0==event_reset(handle_))
- printf("event_reset success %d \n",++j);
- break;
- default:
- break;
- }
- }
- event_destroy(handle_);
- printf("test exit!");
- return 0;
- }
win下采用cmake+vs2010,linux下采用cmake+gcc编译,CMakeLists.txt
- # CMake 最低版本号要求
- cmake_minimum_required (VERSION 2.8)
- # 项目信息
- project (event_test)
- #
- if(WIN32)
- message(STATUS "windows compiling...")
- add_definitions(-D_PLATFORM_IS_WINDOWS_)
- set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} /MT")
- set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} /MTd")
- set(WIN_OS true)
- else(WIN32)
- message(STATUS "linux compiling...")
- add_definitions( -D_PLATFORM_IS_LINUX_)
- add_definitions("-Wno-invalid-source-encoding")
- # add_definitions("-O2")
- set(UNIX_OS true)
- set(_DEBUG true)
-
- endif(WIN32)
-
- #
- set(EXECUTABLE_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/bin)
-
- # 指定源文件的目录,并将名称保存到变量
- SET(source_h
- #
- ${PROJECT_SOURCE_DIR}/src/event.h
- ${PROJECT_SOURCE_DIR}/src/testThread.h
- )
-
- SET(source_cpp
- #
- ${PROJECT_SOURCE_DIR}/src/event.cpp
- ${PROJECT_SOURCE_DIR}/src/testThread.cpp
- ${PROJECT_SOURCE_DIR}/src/main.cpp
- )
-
- #头文件目录
- #include_directories(${PROJECT_SOURCE_DIR}/include)
-
- if (${UNIX_OS})
-
- SET(source_h_linux
- ${PROJECT_SOURCE_DIR}/src/myThread.h
- )
-
- SET(source_cpp_linux
- ${PROJECT_SOURCE_DIR}/src/myThread.cpp
- )
-
- add_definitions(
- "-W"
- "-fPIC"
- "-Wall"
- # "-Wall -g"
- "-Werror"
- "-Wshadow"
- "-Wformat"
- "-Wpointer-arith"
- "-D_REENTRANT"
- "-D_USE_FAST_MACRO"
- "-Wno-long-long"
- "-Wuninitialized"
- "-D_POSIX_PTHREAD_SEMANTICS"
- "-DACL_PREPARE_COMPILE"
- "-Wno-unused-parameter"
- "-fexceptions"
- )
- set(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} -O0")
-
- link_directories()
- # 指定生成目标
- add_executable(event_test ${source_h} ${source_cpp} ${source_h_linux} ${source_cpp_linux})
- #link
- target_link_libraries(event_test
- -lpthread -pthread -lz -lrt -ldl
- )
-
- endif(${UNIX_OS})
-
- if (${WIN_OS})
-
- set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /wd4819")
-
- SET(source_h_win
- ${PROJECT_SOURCE_DIR}/src/win32Thread.h
- )
-
- SET(source_cpp_win
- ${PROJECT_SOURCE_DIR}/src/win32Thread.cpp
- )
-
- add_definitions(
- "-D_CRT_SECURE_NO_WARNINGS"
- "-D_WINSOCK_DEPRECATED_NO_WARNINGS"
- "-DNO_WARN_MBCS_MFC_DEPRECATION"
- "-DWIN32_LEAN_AND_MEAN"
- )
-
- link_directories()
-
- if (CMAKE_BUILD_TYPE STREQUAL "Debug")
-
- set(CMAKE_RUNTIME_OUTPUT_DIRECTORY_DEBUG ${PROJECT_SOURCE_DIR}/bin)
- # 指定生成目标
- add_executable(event_testd ${source_h} ${source_cpp} ${source_h_win} ${source_cpp_win})
-
- else(CMAKE_BUILD_TYPE)
-
- set(CMAKE_RUNTIME_OUTPUT_DIRECTORY_RELEASE ${PROJECT_SOURCE_DIR}/bin)
- # 指定生成目标
- add_executable(event_test ${source_h} ${source_cpp} ${source_cpp_win})
-
- endif (CMAKE_BUILD_TYPE)
-
- endif(${WIN_OS})
编译命令如下
- 进入event_test目录
- win:
- mkdir build_win
- cmake -G "Visual Studio 10 2010 Win64" -DCMAKE_BUILD_TYPE=Release ..
- msbuild event_test.sln /p:Configuration="Release" /p:Platform="x64"
- Linux:
- mkdir build_linux
- cmake ..
- make
省略编译过程,以下是输出程序运行效果,左边是linux程序:

完整的示例代码已经上传CSDN: