以一个简易版的数据库连接池的实现来说明一下
连接池的connection以队列来管理
getConnection的时候,如果队列中connection个数小于50,且暂时无可用的connection(个数为0或者peek看下头部需要先出那个元素还处于不可用状态),就新建连接并建立连接,开始一直新建到50个connection,就是_currentPoolSize =50
如果队列中connection个数大于等于50,且暂时无可用的connection(个数为0或者peek看下头部需要先出那个元素还处于不可用状态),就等着Monitor.Wait(_connectionPoolQueueLock)
returnConnection的时候,使用Monitor.Pulse(_connectionPoolQueueLock) 随机通知一个wait的线程可以继续getConnection了
using System;
using System.Collections.Generic;
using System.Data;
using System.Threading;
using Mono.Data.Sqlite;
namespace demo.unity.sqlite
{
public class SQLiteConnectionManager
{
private Queue> _connectionPoolQueue;
private object _connectionPoolQueueLock = new object();
private const int maxPoolSize = 50;
private volatile bool _disposed;
private int _currentPoolSize;
private readonly System.Timers.Timer _cleanupTimer = new System.Timers.Timer(10 * 60 * 1000);
public SQLiteConnectionManager()
{
_connectionPoolQueue = new Queue>(maxPoolSize);
_cleanupTimer.Elapsed += _cleanupTimerElapsed;
_cleanupTimer.AutoReset = true;
_cleanupTimer.Start();
}
private void _cleanupTimerElapsed(object sender, System.Timers.ElapsedEventArgs e)
{
lock (_connectionPoolQueueLock)
{
while (_connectionPoolQueue.Count > 0 && (DateTime.UtcNow - _connectionPoolQueue.Peek().Item2).TotalMinutes > 15)
{
var tup = _connectionPoolQueue.Dequeue();
tup.Item1.Dispose();
_currentPoolSize--;
}
}
}
private SqliteConnection _createNewConnection(SqliteConnectionStringBuilder builder)
{
var connection = new SqliteConnection(builder.ConnectionString);
connection.Open();
return connection;
}
public SqliteConnection getConnection(SqliteConnectionStringBuilder builder)
{
lock (_connectionPoolQueueLock)
{
// count == 0 or queue.peek no use connection
while (_connectionPoolQueue.Count == 0 || _connectionPoolQueue.Peek().Item1.State != ConnectionState.Open)
{
if (_disposed)
{
throw new ObjectDisposedException("The DB connection pool is is already disposed");
}
if (_currentPoolSize < maxPoolSize)
{
// create and open connection
var connection = _createNewConnection(builder);
_connectionPoolQueue.Enqueue( new Tuple(connection, DateTime.UtcNow));
_currentPoolSize++;
}
else
{
Monitor.Wait(_connectionPoolQueueLock);
}
}
return _connectionPoolQueue.Dequeue().Item1;
}
}
public void returnConnection(SqliteConnection connection)
{
if (connection == null)
{
return;
}
lock (_connectionPoolQueueLock)
{
_connectionPoolQueue.Enqueue(new Tuple(connection, DateTime.UtcNow));
Monitor.Pulse(_connectionPoolQueueLock);
}
}
public void dispose()
{
lock (_connectionPoolQueueLock)
{
_disposed = true;
while (_connectionPoolQueue.Count > 0)
{
var tup = _connectionPoolQueue.Dequeue();
tup.Item1?.Dispose();
_currentPoolSize--;
}
// wake up any waiting threads
Monitor.PulseAll(_connectionPoolQueueLock);
}
_cleanupTimer.Stop();
_cleanupTimer.Dispose();
}
}
}