通过多线程处理用户请求之后,首要解决的问题是数据间的线程安全。处理方法就是加锁,因为数据都是存在内存中,大部分请求的处理速度都很困,用linux原生的锁带来的线程间上下文切换代价过大,KeyDB设计了一个fastlock。
fastlock的设计思想是优先尝试在用户态加锁,如果用户态获取不到锁,再通过系统调用进入内核态通过futex阻塞加锁,这样尽可能的避免线程间的上下文切换。
struct ticket
{
union
{
struct
{
uint16_t m_active;
uint16_t m_avail;
};
unsigned u;
};
};
struct fastlock
{
volatile int m_pidOwner;
volatile int m_depth;
char szName[56];
/* Volatile data on seperate cache line */
volatile struct ticket m_ticket;
unsigned futex;
// 防止false sharing
char padding[56];
}
这里fastlock用了两个小技巧:1、char padding[56],故意填充56byte,和前面的ticket、futex组成64byte,是一个cache line,防止线程间伪共享出现;2、通过volatile避免编译器优化,强制CPU读取变量时每次都从内存中读取,从而避免将szName,m_ticket划分到一个cache line中。
下面看看加解锁过程:
extern "C" void fastlock_lock(struct fastlock *lock, spin_worker worker)
{
int pidOwner;
__atomic_load(&lock->m_pidOwner, &pidOwner, __ATOMIC_ACQUIRE);
if (pidOwner == gettid())
{
++lock->m_depth;
return;
}
int tid = gettid();
unsigned myticket = __atomic_fetch_add(&lock->m_ticket.m_avail, 1, __ATOMIC_RELEASE);
unsigned cloops = 0;
ticket ticketT;
int fHighPressure;
__atomic_load(&g_fHighCpuPressure, &fHighPressure, __ATOMIC_RELAXED);
unsigned loopLimit = fHighPressure ? 0x10000 : 0x100000;
if (worker != nullptr) {
for (;;) {
__atomic_load(&lock->m_ticket.u, &ticketT.u, __ATOMIC_ACQUIRE);
if ((ticketT.u & 0xffff) == myticket)
break;
if (!worker())
goto LNormalLoop;
}
} else {
LNormalLoop:
for (;;)
{
__atomic_load(&lock->m_ticket.u, &ticketT.u, __ATOMIC_ACQUIRE);
// 如果相等,说明此时没有其他线程竞争,可以直接获取锁
if ((ticketT.u & 0xffff) == myticket)
break;
#if defined(__i386__) || defined(__amd64__)
__asm__ __volatile__ ("pause");
#elif defined(__aarch64__)
__asm__ __volatile__ ("yield");
#endif
if ((++cloops % loopLimit) == 0)
{
//
fastlock_sleep(lock, tid, ticketT.u, myticket);
}
}
}
lock->m_depth = 1;
__atomic_store(&lock->m_pidOwner, &tid, __ATOMIC_RELEASE);
ANNOTATE_RWLOCK_ACQUIRED(lock, true);
std::atomic_thread_fence(std::memory_order_acquire);
}
extern "C" void fastlock_sleep(fastlock *lock, pid_t pid, unsigned wake, unsigned myticket)
{
UNUSED(lock);
UNUSED(pid);
UNUSED(wake);
UNUSED(myticket);
#ifdef __linux__
g_dlock.registerwait(lock, pid);
unsigned mask = (1U << (myticket % 32));
__atomic_fetch_or(&lock->futex, mask, __ATOMIC_ACQUIRE);
// double check the lock wasn't release between the last check and us setting the futex mask
uint32_t u;
__atomic_load(&lock->m_ticket.u, &u, __ATOMIC_ACQUIRE);
if ((u & 0xffff) != myticket)
{
futex(&lock->m_ticket.u, FUTEX_WAIT_BITSET_PRIVATE, wake, nullptr, mask);
}
__atomic_fetch_and(&lock->futex, ~mask, __ATOMIC_RELEASE);
g_dlock.clearwait(lock, pid);
#endif
__atomic_fetch_add(&g_longwaits, 1, __ATOMIC_RELAXED);
}
加锁时,首先在用户态通过原子变量的比较尝试加锁,尝试一定次数之后,如果失败,通过fastlock_sleep,futex阻塞等待m_ticket加锁,对于Redis这样的内存应用,大部分操作都很快,很有可能在用户态就获取到锁。
解锁:
extern "C" void fastlock_unlock(struct fastlock *lock)
{
--lock->m_depth;
if (lock->m_depth == 0)
{
int pidT;
__atomic_load(&lock->m_pidOwner, &pidT, __ATOMIC_RELAXED);
serverAssert(pidT >= 0); // unlock after free
int t = -1;
__atomic_store(&lock->m_pidOwner, &t, __ATOMIC_RELEASE);
std::atomic_thread_fence(std::memory_order_acq_rel);
ANNOTATE_RWLOCK_RELEASED(lock, true);
uint16_t activeNew = __atomic_add_fetch(&lock->m_ticket.m_active, 1, __ATOMIC_RELEASE); // on x86 the atomic is not required here, but ASM handles that case
#ifdef __linux__
unlock_futex(lock, activeNew);
#else
UNUSED(activeNew);
#endif
}
}
对m_active原子加1,使得加锁的用户态条件成立。
保证线程安全有两把锁要加,一个是每个client自身的锁,因为有些命令会操作非自己worker线程绑定的client,第二个是保护数据的全局锁。
从读取client数据开始:
void readQueryFromClient(connection *conn) {
std::unique_lock<decltype(c->lock)> lock(c->lock, std::defer_lock);
// 先尝试获取client的锁
if (!lock.try_lock()) // 获取不到,说明当前client正在做其他事情,以后再尝试
return; // Process something else while we wait
// 接下来读取并解析数据
// 最后处理请求
if (cserver.cthreads > 1 || g_pserver->m_pstorageFactory) {
parseClientCommandBuffer(c);
if (g_pserver->enable_async_commands && !serverTL->disable_async_commands && listLength(g_pserver->monitors) == 0 && (aeLockContention() || serverTL->rgdbSnapshot[c->db->id] || g_fTestMode)) {
// Frequent writers aren't good candidates for this optimization, they cause us to renew the snapshot too often
// so we exclude them unless the snapshot we need already exists.
// Note: In test mode we want to create snapshots as often as possibl to excercise them - we don't care about perf
bool fSnapshotExists = c->db->mvccLastSnapshot >= c->mvccCheckpoint;
bool fWriteTooRecent = !g_fTestMode && (((getMvccTstamp() - c->mvccCheckpoint) >> MVCC_MS_SHIFT) < static_cast<uint64_t>(g_pserver->snapshot_slip)/2);
// The check below avoids running async commands if this is a frequent writer unless a snapshot is already there to service it
// 有快照并且写请求不频繁,则根据MVCC处理,注意,这里不需要加全局锁
if (!fWriteTooRecent || fSnapshotExists) {
processInputBuffer(c, false, CMD_CALL_SLOWLOG | CMD_CALL_STATS | CMD_CALL_ASYNC);
}
}
// 否则进入在beforeSleep中异步处理
if (!c->vecqueuedcmd.empty())
serverTL->vecclientsProcess.push_back(c);
} else {
// If we're single threaded its actually better to just process the command here while the query is hot in the cache
// multithreaded lock contention dominates and batching is better
// 在处理命令之前,获取全局锁
AeLocker locker;
locker.arm(c);
runAndPropogateToReplicas(processInputBuffer, c, true /*fParse*/, CMD_CALL_FULL);
}
}
void beforeSleep(struct aeEventLoop *eventLoop) {
AeLocker locker;
int iel = ielFromEventLoop(eventLoop);
// 获取全局锁
locker.arm();
...
// 处理用户命令
runAndPropogateToReplicas(processClients);
}
因为beforeSleep的诸多操作中都需要加全局锁,所以这里把用户的命令解析出来之后,不会立即处理,而是放入队列中异步处理,这样做可以节省一次加全局锁的操作。
然后根据processClients函数从队列中取出命令并逐个执行:
void processClients()
{
serverAssert(GlobalLocksAcquired());
// Note that this function is reentrant and vecclients may be modified by code called from processInputBuffer
while (!serverTL->vecclientsProcess.empty()) {
client *c = serverTL->vecclientsProcess.front();
serverTL->vecclientsProcess.erase(serverTL->vecclientsProcess.begin());
/* There is more data in the client input buffer, continue parsing it
* in case to check if there is a full command to execute. */
std::unique_lock<fastlock> ul(c->lock);
processInputBuffer(c, false /*fParse*/, CMD_CALL_FULL);
}
if (listLength(serverTL->clients_pending_asyncwrite))
{
ProcessPendingAsyncWrites();
}
}