qemu中对于消息循环的处理采用了Glib事件循环机制。每个事件称为一个事件源,叫做GSource,对于自建事件源,其第一个元素必须是GSource.每个事件源都会关联一个GMainContext,一个线程只能运行一个GMainContext,但是再其他线程中能够对事件源进行添加和删除。
步骤一(prepare):调用各事件源的prepare回调函数,检查事件源中是否有事件发生。对于无需poll的事件源(例如:idle事件源),其返回TRUE,表示idle事件已经发生了。对于需要poll的事件源(例如:文件事件源),其返回FALSE,因为只有在poll文件之后,才能直到文件事件是否发生。对应g_main_context_prepare()函数。
步骤二 (query):获取实际需要poll的文件fd。对应g_main_context_query()函数。
步骤三(check):调用poll对fd进行监听,调用各事件源的check回调函数,检查事件源是否有事件发生。对应g_main_context_query()函数。
步骤四(dispatch):若某事件源有事件发生(prepare或check返回TRUE),则调用其事件处理函数。对应g_main_context_check()函数。新事件源用于来处理GDK事件。通过从GSource结构体派生来创建新事件源。表示新事件源的结构体的第一个元素为GSource实例,其他元素与新事件源本身相关。通过调用g_source_new创建新事件源的实例,创建时需传递新事件源结构体的大小和一张函数表,而这张函数表决定了新事件源的行为。
几种常见的结构体定义如下:
struct GSourceFuncs {
gboolean (*prepare) (GSource *source,gint *timeout_);
gboolean (*check) (GSource *source);
gboolean (*dispatch) (GSource *source, GSourceFunc callback, gpointer user_data);
void (*finalize) (GSource *source);
};
//和pollFD类似
struct GPollFD
{
gint fd;
gushort events;
gushort revents;
}
struct GSourceList
{
GSource *head,*tail;
gint priority;
}
typedef void* gpointer;
struct GList
{
gpointer data;
GList *next;
GList*prev;
};是一个双向链表
struct GSList{
gpointer data;
GSList *next;
}
struct _GMainContext
{
/* The following lock is used for both the list of sources
and the list of poll records
*/
GMutex mutex;
GCond cond;
GThread *owner;
guint owner_count;
GMainContextFlags flags;
GSList waiters;
gint ref_count; / (atomic) */
GHashTable sources; / guint -> GSource */
GPtrArray pending_dispatches;
gint timeout; / Timeout for current iteration */
guint next_id;
GList *source_lists;
gint in_check_or_prepare;
GPollRec *poll_records;
guint n_poll_records;
GPollFD *cached_poll_array;
guint cached_poll_array_size;
GWakeup wakeup;
GPollFD wake_up_rec;
/ Flag indicating whether the set of fd's changed during a poll */
gboolean poll_changed;
GPollFunc poll_func;
gint64 time;
gboolean time_is_fresh;
};
struct _GMainLoop
{
GMainContext *context;
gboolean is_running; /* (atomic) */
gint ref_count; /* (atomic) */
};
struct _GSource
{
/*< private >*/
gpointer callback_data;
GSourceCallbackFuncs *callback_funcs;
const GSourceFuncs *source_funcs;
guint ref_count;
GMainContext *context;
gint priority;
guint flags;
guint source_id;
GSList *poll_fds;
GSource *prev;
GSource *next;
char *name;
GSourcePrivate *priv;
};
第一步:要使用Glib事件循环机制,首先得创建一个GMainLoop实例,
GMainLoop *loop = g_main_loop_new(NULL, FALSE);
g_main_loop_new代码如下,其第一个参数是GMainContext,如果为空,则使用默认的GMainContext,g_main_context_default会创建一个GMainContext,g_main_context_ref对GMainContext引用计数加一。
GMainLoop *
g_main_loop_new (GMainContext *context,
gboolean is_running)
{
GMainLoop *loop;
if (!context)
context = g_main_context_default();
g_main_context_ref (context);
loop = g_new0 (GMainLoop, 1);
loop->context = context;
loop->is_running = is_running != FALSE;
loop->ref_count = 1;
TRACE (GLIB_MAIN_LOOP_NEW (loop, context));
return loop;
}
第二步:通过调用g_source_new创建新事件源的实例,创建时需传递新事件源结构体的大小和一张函数表,而这张函数表决定了新事件源的行为。
从函数里面可以看出,主要是分配了内存空间和指定了函数表
GSource *
g_source_new (GSourceFuncs *source_funcs,
guint struct_size)
{
GSource *source;
g_return_val_if_fail (source_funcs != NULL, NULL);
g_return_val_if_fail (struct_size >= sizeof (GSource), NULL);
source = (GSource*) g_malloc0 (struct_size);
source->priv = g_slice_new0 (GSourcePrivate);
source->source_funcs = source_funcs;
source->ref_count = 1;
source->priority = G_PRIORITY_DEFAULT;
source->flags = G_HOOK_FLAG_ACTIVE;
source->priv->ready_time = -1;
/* NULL/0 initialization for all other fields */
TRACE (GLIB_SOURCE_NEW (source, source_funcs->prepare, source_funcs->check,
source_funcs->dispatch, source_funcs->finalize,
struct_size));
return source;
}
第三步:为GSource分配回调函数,和回调函数需要的数据。回调函数会在dispatch中被调用。
void
g_source_set_callback (GSource *source,
GSourceFunc func,
gpointer data,
GDestroyNotify notify)
{
GSourceCallback *new_callback;
g_return_if_fail (source != NULL);
g_return_if_fail (g_atomic_int_get (&source->ref_count) > 0);
TRACE (GLIB_SOURCE_SET_CALLBACK (source, func, data, notify));
new_callback = g_new (GSourceCallback, 1);
new_callback->ref_count = 1;
new_callback->func = func;
new_callback->data = data;
new_callback->notify = notify;
g_source_set_callback_indirect (source, new_callback, &g_source_callback_funcs);
}
第四步,将文件描述符表放入GSource中的poll_fds单向链表中(采用头插法)。
void
g_source_add_poll (GSource *source,
GPollFD *fd)
{
GMainContext *context;
g_return_if_fail (source != NULL);
g_return_if_fail (g_atomic_int_get (&source->ref_count) > 0);
g_return_if_fail (fd != NULL);
g_return_if_fail (!SOURCE_DESTROYED (source));
context = source->context;
if (context)
LOCK_CONTEXT (context);
source->poll_fds = g_slist_prepend (source->poll_fds, fd);
if (context)
{
if (!SOURCE_BLOCKED (source))
g_main_context_add_poll_unlocked (context, source->priority, fd);
UNLOCK_CONTEXT (context);
}
}
第五步:将GSource添加到context实例,移除可以调用g_source_destroy()函数。在g_source_attach中又调用了g_source_attach_unlocked函数。
guint
g_source_attach (GSource *source,
GMainContext *context)
{
guint result = 0;
g_return_val_if_fail (source != NULL, 0);
g_return_val_if_fail (g_atomic_int_get (&source->ref_count) > 0, 0);
g_return_val_if_fail (source->context == NULL, 0);
g_return_val_if_fail (!SOURCE_DESTROYED (source), 0);
if (!context)
context = g_main_context_default ();
LOCK_CONTEXT (context);
result = g_source_attach_unlocked (source, context, TRUE);
TRACE (GLIB_MAIN_SOURCE_ATTACH (g_source_get_name (source), source, context,
result));
UNLOCK_CONTEXT (context);
return result;
}
static guint
g_source_attach_unlocked (GSource *source,
GMainContext *context,
gboolean do_wakeup)
{
GSList *tmp_list;
guint id;
/* The counter may have wrapped, so we must ensure that we do not
* reuse the source id of an existing source.
*/
do
id = context->next_id++;
while (id == 0 || g_hash_table_contains (context->sources, GUINT_TO_POINTER (id)));
source->context = context;
source->source_id = id;
g_source_ref (source);
g_hash_table_insert (context->sources, GUINT_TO_POINTER (id), source);
source_add_to_context (source, context);
if (!SOURCE_BLOCKED (source))
{
tmp_list = source->poll_fds;
while (tmp_list)
{
g_main_context_add_poll_unlocked (context, source->priority, tmp_list->data);
tmp_list = tmp_list->next;
}
for (tmp_list = source->priv->fds; tmp_list; tmp_list = tmp_list->next)
g_main_context_add_poll_unlocked (context, source->priority, tmp_list->data);
}
tmp_list = source->priv->child_sources;
while (tmp_list)
{
g_source_attach_unlocked (tmp_list->data, context, FALSE);
tmp_list = tmp_list->next;
}
/* If another thread has acquired the context, wake it up since it
* might be in poll() right now.
*/
if (do_wakeup &&
(context->flags & G_MAIN_CONTEXT_FLAGS_OWNERLESS_POLLING ||
(context->owner && context->owner != G_THREAD_SELF)))
{
g_wakeup_signal (context->wakeup);
}
g_trace_mark (G_TRACE_CURRENT_TIME, 0,
"GLib", "g_source_attach",
"%s to context %p",
(g_source_get_name (source) != NULL) ? g_source_get_name (source) : "(unnamed)",
context);
return source->source_id;
}
GMainContext中的sources属性是一个hash表,source_id为key,source为value,因此在g_source_attach_unlocked中,首先找sources中未使用的id赋予source,然后插入到sources中。context的source_lists是一个双向链表,data为GSourceList,GSoureList指定了优先级相同的GSource链.source_add_to_context中首先在context中找到优先级相同的sources。然后将source插入到sources.如果有父对象则先插入父对象信息源后面。否则就将信息源插入都同优先级信息源链表尾部。然后通过g_main_contest_add_poll_unlocked将fd注册到poll要监听的pollfd链表中。这个链表在context->poll_records中。自此,就把GSource与GMainContext关联起来。
第五步:然后运行g_main_loop_run(loop)函数。直到某事件的回调函数调用g_main_loop_quit退出主循环,g_main_loop_run返回。
void
g_main_loop_run (GMainLoop *loop)
{
GThread *self = G_THREAD_SELF;
g_return_if_fail (loop != NULL);
g_return_if_fail (g_atomic_int_get (&loop->ref_count) > 0);
/* Hold a reference in case the loop is unreffed from a callback function */
g_atomic_int_inc (&loop->ref_count);
if (!g_main_context_acquire (loop->context))
{
gboolean got_ownership = FALSE;
/* Another thread owns this context */
LOCK_CONTEXT (loop->context);
g_atomic_int_set (&loop->is_running, TRUE);
while (g_atomic_int_get (&loop->is_running) && !got_ownership)
got_ownership = g_main_context_wait_internal (loop->context,
&loop->context->cond,
&loop->context->mutex);
if (!g_atomic_int_get (&loop->is_running))
{
UNLOCK_CONTEXT (loop->context);
if (got_ownership)
g_main_context_release (loop->context);
g_main_loop_unref (loop);
return;
}
g_assert (got_ownership);
}
else
LOCK_CONTEXT (loop->context);
if (loop->context->in_check_or_prepare)
{
g_warning ("g_main_loop_run(): called recursively from within a source's "
"check() or prepare() member, iteration not possible.");
g_main_loop_unref (loop);
return;
}
g_atomic_int_set (&loop->is_running, TRUE);
while (g_atomic_int_get (&loop->is_running))
g_main_context_iterate (loop->context, TRUE, TRUE, self);
UNLOCK_CONTEXT (loop->context);
g_main_context_release (loop->context);
g_main_loop_unref (loop);
}
主要关注的是g_main_context_iterate函数
static gboolean
g_main_context_iterate (GMainContext *context,
gboolean block,
gboolean dispatch,
GThread *self)
{
gint max_priority = 0;
gint timeout;
gboolean some_ready;
gint nfds, allocated_nfds;
GPollFD *fds = NULL;
gint64 begin_time_nsec G_GNUC_UNUSED;
UNLOCK_CONTEXT (context);
begin_time_nsec = G_TRACE_CURRENT_TIME;
if (!g_main_context_acquire (context))
{
gboolean got_ownership;
LOCK_CONTEXT (context);
if (!block)
return FALSE;
got_ownership = g_main_context_wait_internal (context,
&context->cond,
&context->mutex);
if (!got_ownership)
return FALSE;
}
else
LOCK_CONTEXT (context);
if (!context->cached_poll_array)
{
context->cached_poll_array_size = context->n_poll_records;
context->cached_poll_array = g_new (GPollFD, context->n_poll_records);
}
allocated_nfds = context->cached_poll_array_size;
fds = context->cached_poll_array;
UNLOCK_CONTEXT (context);
g_main_context_prepare (context, &max_priority);
while ((nfds = g_main_context_query (context, max_priority, &timeout, fds,
allocated_nfds)) > allocated_nfds)
{
LOCK_CONTEXT (context);
g_free (fds);
context->cached_poll_array_size = allocated_nfds = nfds;
context->cached_poll_array = fds = g_new (GPollFD, nfds);
UNLOCK_CONTEXT (context);
}
if (!block)
timeout = 0;
g_main_context_poll (context, timeout, max_priority, fds, nfds);
some_ready = g_main_context_check (context, max_priority, fds, nfds);
if (dispatch)
g_main_context_dispatch (context);
g_main_context_release (context);
g_trace_mark (begin_time_nsec, G_TRACE_CURRENT_TIME - begin_time_nsec,
"GLib", "g_main_context_iterate",
"Context %p, %s ⇒ %s", context, block ? "blocking" : "non-blocking", some_ready ? "dispatched" : "nothing");
LOCK_CONTEXT (context);
return some_ready;
}
首先都要调用g_main_context_acquire(context)获取该线程对context的控制权。然后调用g_main_context_prepare()函数,调用每个信息源的prepare函数,并将time_out与context中设置的time进行对比,选择最小的time作为poll的阻塞时间。
g_main_context-query()会将优先级高于max_priority的fd的监听event进行初始化,并添加到fds数组中,最后返回fds数组的数量n_fds.
g_main_context_poll()主要执行了poll函数。
g_main_context_check()来执行事件源的check函数,如果满足,则check函数返回true,并将事件源添加到context->pending_dispatches中
g_main_context_dispatch()会