当想要通知或唤醒某个线程我们完成了某任务时,可以用completion。
当线程A需要等待线程B的某个资源时,线程A进入睡眠进行等待,当线程B完成后需要通知线程A,这时候就可以使用completion去通知,而且没有race conditions。
Linux中使用completion大概分为5步:
completion结构体,include/linux/completion.h
中定义
struct completion {
unsigned int done;
struct swait_queue_head wait;
};
swait_queue_head定义在include/linux/swait.h
中
struct swait_queue_head {
raw_spinlock_t lock;
struct list_head task_list;
};
completion结构体其实就是一个链表头。将要wait等待的任务都挂进该链表中即可。
静态创建completion,使用DECLARE_COMPLETION()
宏,初始done = 0
#define COMPLETION_INITIALIZER(work) \
{ 0, __SWAIT_QUEUE_HEAD_INITIALIZER((work).wait) }
#define DECLARE_COMPLETION(work) \
struct completion work = COMPLETION_INITIALIZER(work)
动态创建completion,使用init_completion(struct completion *)
#define init_completion(x) __init_completion(x)
/**
* init_completion - Initialize a dynamically allocated completion
* @x: pointer to completion structure that is to be initialized
*
* This inline function will initialize a dynamically created completion
* structure.
*/
static inline void __init_completion(struct completion *x)
{
x->done = 0;
init_swait_queue_head(&x->wait);
}
/**
* reinit_completion - reinitialize a completion structure
* @x: pointer to completion structure that is to be reinitialized
*
* This inline function should be used to reinitialize a completion structure so it can
* be reused. This is especially important after complete_all() is used.
*/
static inline void reinit_completion(struct completion *x)
{
x->done = 0;
}
/**
* wait_for_completion: - waits for completion of a task
* @x: holds the state of this particular completion
*
* This waits to be signaled for completion of a specific task. It is NOT
* interruptible and there is no timeout.
*
* See also similar routines (i.e. wait_for_completion_timeout()) with timeout
* and interrupt capability. Also see complete().
*/
void __sched wait_for_completion(struct completion *x)
{
wait_for_common(x, MAX_SCHEDULE_TIMEOUT, TASK_UNINTERRUPTIBLE);
}
EXPORT_SYMBOL(wait_for_completion);
该函数not interruptible and no timeout。调用需要注意安全。
wait_for_completion()最终会调用raw_spin_lock_irq()/raw_spin_unlock_irq(),所以只能在开启中断的环境中被调用
static inline long __sched
__wait_for_common(struct completion *x,
long (*action)(long), long timeout, int state)
{
might_sleep();
complete_acquire(x);
raw_spin_lock_irq(&x->wait.lock);
timeout = do_wait_for_common(x, action, timeout, state);
raw_spin_unlock_irq(&x->wait.lock);
complete_release(x);
return timeout;
}
有关spin_lock()/spin_lock_irq()/spin_lock_irqsave()
等函数,参考《linux device driver. chpter 5》。
/**
* wait_for_completion_timeout: - waits for completion of a task (w/timeout)
* @x: holds the state of this particular completion
* @timeout: timeout value in jiffies
*
* This waits for either a completion of a specific task to be signaled or for a
* specified timeout to expire. The timeout is in jiffies. It is not
* interruptible.
*
* Return: 0 if timed out, and positive (at least 1, or number of jiffies left
* till timeout) if completed.
*/
unsigned long __sched
wait_for_completion_timeout(struct completion *x, unsigned long timeout)
{
return wait_for_common(x, timeout, TASK_UNINTERRUPTIBLE);
}
EXPORT_SYMBOL(wait_for_completion_timeout);
该函数同样不能被中断,不过有timeout时间,单位jiffies。
/**
* wait_for_completion_interruptible: - waits for completion of a task (w/intr)
* @x: holds the state of this particular completion
*
* This waits for completion of a specific task to be signaled. It is
* interruptible.
*
* Return: -ERESTARTSYS if interrupted, 0 if completed.
*/
int __sched wait_for_completion_interruptible(struct completion *x)
{
long t = wait_for_common(x, MAX_SCHEDULE_TIMEOUT, TASK_INTERRUPTIBLE);
if (t == -ERESTARTSYS)
return t;
return 0;
}
EXPORT_SYMBOL(wait_for_completion_interruptible);
可以被中断,没有超时。超时需要使用wait_for_completion_interruptible_timeout()
。
可以通过kill发送信号从而被interrupt。
/**
* wait_for_completion_killable: - waits for completion of a task (killable)
* @x: holds the state of this particular completion
*
* This waits to be signaled for completion of a specific task. It can be
* interrupted by a kill signal.
*
* Return: -ERESTARTSYS if interrupted, 0 if completed.
*/
int __sched wait_for_completion_killable(struct completion *x)
{
long t = wait_for_common(x, MAX_SCHEDULE_TIMEOUT, TASK_KILLABLE);
if (t == -ERESTARTSYS)
return t;
return 0;
}
EXPORT_SYMBOL(wait_for_completion_killable);
/**
* try_wait_for_completion - try to decrement a completion without blocking
* @x: completion structure
*
* Return: 0 if a decrement cannot be done without blocking
* 1 if a decrement succeeded.
*
* If a completion is being used as a counting completion,
* attempt to decrement the counter without blocking. This
* enables us to avoid waiting if the resource the completion
* is protecting is not available.
*/
bool try_wait_for_completion(struct completion *x)
{
unsigned long flags;
bool ret = true;
/*
* Since x->done will need to be locked only
* in the non-blocking case, we check x->done
* first without taking the lock so we can
* return early in the blocking case.
*/
if (!READ_ONCE(x->done))
return false;
raw_spin_lock_irqsave(&x->wait.lock, flags);
if (!x->done)
ret = false;
else if (x->done != UINT_MAX)
x->done--;
raw_spin_unlock_irqrestore(&x->wait.lock, flags);
return ret;
}
EXPORT_SYMBOL(try_wait_for_completion);
This function will not put the thread on the wait queue but rather returns false if it would need to en queue (block) the thread, else it consumes one posted completion and returns true.
/**
* complete: - signals a single thread waiting on this completion
* @x: holds the state of this particular completion
*
* This will wake up a single thread waiting on this completion. Threads will be
* awakened in the same order in which they were queued.
*
* See also complete_all(), wait_for_completion() and related routines.
*
* If this function wakes up a task, it executes a full memory barrier before
* accessing the task state.
*/
void complete(struct completion *x)
{
unsigned long flags;
raw_spin_lock_irqsave(&x->wait.lock, flags);
if (x->done != UINT_MAX)
x->done++;
swake_up_locked(&x->wait);
raw_spin_unlock_irqrestore(&x->wait.lock, flags);
}
EXPORT_SYMBOL(complete);
唤醒第一个等待当前completion的task,tasks排序是进入queue的先后顺序排列的。
/**
* complete_all: - signals all threads waiting on this completion
* @x: holds the state of this particular completion
*
* This will wake up all threads waiting on this particular completion event.
*
* If this function wakes up a task, it executes a full memory barrier before
* accessing the task state.
*
* Since complete_all() sets the completion of @x permanently to done
* to allow multiple waiters to finish, a call to reinit_completion()
* must be used on @x if @x is to be used again. The code must make
* sure that all waiters have woken and finished before reinitializing
* @x. Also note that the function completion_done() can not be used
* to know if there are still waiters after complete_all() has been called.
*/
void complete_all(struct completion *x)
{
unsigned long flags;
lockdep_assert_RT_in_threaded_ctx();
raw_spin_lock_irqsave(&x->wait.lock, flags);
x->done = UINT_MAX;
swake_up_all_locked(&x->wait);
raw_spin_unlock_irqrestore(&x->wait.lock, flags);
}
EXPORT_SYMBOL(complete_all);
唤醒所有的等待当前completion的任务task。
/**
* completion_done - Test to see if a completion has any waiters
* @x: completion structure
*
* Return: 0 if there are waiters (wait_for_completion() in progress)
* 1 if there are no waiters.
*
* Note, this will always return true if complete_all() was called on @X.
*/
bool completion_done(struct completion *x)
{
unsigned long flags;
if (!READ_ONCE(x->done))
return false;
/*
* If ->done, we need to wait for complete() to release ->wait.lock
* otherwise we can end up freeing the completion before complete()
* is done referencing it.
*/
raw_spin_lock_irqsave(&x->wait.lock, flags);
raw_spin_unlock_irqrestore(&x->wait.lock, flags);
return true;
}
EXPORT_SYMBOL(completion_done);
检查是否还有task在等待completion。
#include
#include
#include
#include
#include
#include
#include
#include //kmalloc()
#include //copy_to/from_user()
#include
#include // Required for the completion
uint32_t read_count = 0;
static struct task_struct *wait_thread;
DECLARE_COMPLETION(data_read_done);
//struct completion data_read_done; initializing completion dynamicly
dev_t dev = 0;
static struct class *dev_class;
static struct cdev completion_cdev;
int completion_flag = 0;
static int __init completion_driver_init(void);
static void __exit completion_driver_exit(void);
/*************** Driver Functions **********************/
static int completion_open(struct inode *inode, struct file *file);
static int completion_release(struct inode *inode, struct file *file);
static ssize_t completion_read(struct file *filp,
char __user *buf, size_t len,loff_t * off);
static ssize_t completion_write(struct file *filp,
const char *buf, size_t len, loff_t * off);
/******************************************************/
//File operation structure
static struct file_operations fops =
{
.owner = THIS_MODULE,
.read = completion_read,
.write = completion_write,
.open = completion_open,
.release = completion_release,
};
/*
** Waitqueue thread
*/
static int wait_function(void *unused)
{
while(1) {
pr_info("Waiting For Event...\n");
wait_for_completion (&data_read_done);
if(completion_flag == 2) {
pr_info("Event Came From Exit Function\n");
return 0;
}
pr_info("Event Came From Read Function - %d\n", ++read_count);
completion_flag = 0;
}
do_exit(0);
return 0;
}
/*
** This function will be called when we open the Device file
*/
static int completion_open(struct inode *inode, struct file *file)
{
pr_info("Device File Opened...!!!\n");
return 0;
}
/*
** This function will be called when we close the Device file
*/
static int completion_release(struct inode *inode, struct file *file)
{
pr_info("Device File Closed...!!!\n");
return 0;
}
/*
** This function will be called when we read the Device file
*/
static ssize_t completion_read(struct file *filp, char __user *buf, size_t len, loff_t *off)
{
pr_info("Read Function\n");
completion_flag = 1;
if(!completion_done (&data_read_done)) {
complete (&data_read_done);
}
return 0;
}
/*
** This function will be called when we write the Device file
*/
static ssize_t completion_write(struct file *filp, const char __user *buf, size_t len, loff_t *off)
{
pr_info("Write function\n");
return len;
}
/*
** Module Init function
*/
static int __init completion_driver_init(void)
{
/*Allocating Major number*/
if((alloc_chrdev_region(&dev, 0, 1, "completion_Dev")) <0){
pr_err("Cannot allocate major number\n");
return -1;
}
pr_info("Major = %d Minor = %d \n",MAJOR(dev), MINOR(dev));
/*Creating cdev structure*/
cdev_init(&completion_cdev,&fops);
completion_cdev.owner = THIS_MODULE;
completion_cdev.ops = &fops;
/*Adding character device to the system*/
if((cdev_add(&completion_cdev,dev,1)) < 0){
pr_err("Cannot add the device to the system\n");
goto r_class;
}
/*Creating struct class*/
if((dev_class = class_create(THIS_MODULE,"completion_class")) == NULL){
pr_err("Cannot create the struct class\n");
goto r_class;
}
/*Creating device*/
if((device_create(dev_class,NULL,dev,NULL,"completion_device")) == NULL){
pr_err("Cannot create the Device 1\n");
goto r_device;
}
//Create the kernel thread with name 'WaitThread'
wait_thread = kthread_create(wait_function, NULL, "WaitThread");
if (wait_thread) {
pr_info("Thread Created successfully\n");
wake_up_process(wait_thread);
} else
pr_err("Thread creation failed\n");
//Initializing Completion dynamicly
//init_completion(&data_read_done);
pr_info("Device Driver Insert...Done!!!\n");
return 0;
r_device:
class_destroy(dev_class);
r_class:
unregister_chrdev_region(dev,1);
return -1;
}
/*
** Module exit function
*/
static void __exit completion_driver_exit(void)
{
completion_flag = 2;
if(!completion_done (&data_read_done)) {
complete (&data_read_done);
}
device_destroy(dev_class,dev);
class_destroy(dev_class);
cdev_del(&completion_cdev);
unregister_chrdev_region(dev, 1);
pr_info("Device Driver Remove...Done!!!\n");
}
module_init(completion_driver_init);
module_exit(completion_driver_exit);
MODULE_LICENSE("GPL");
MODULE_DESCRIPTION("A simple device driver - Completion (Static Method)");
MODULE_VERSION("1.23");