step1: 桶排序本质是一种分治算法
step2:每个桶都代表了一个元素的范围
step3:每个桶中的元素都排好序后,取出来,这样子就有序了
1.actor思想(单线程处理)
2.xdb加锁(类似的还有mysql的锁机制)
1.首先是GatewayHandler收到玩家请求
- public void channelRead(ChannelHandlerContext ctx, Object msg) {
- try {
- if(msg == null) {
- return;
- }
-
- if(GameServer.isServerClosed()) {
- return;
- }
-
- ProtobufMessage message = (ProtobufMessage) msg;
- session.parseAndDispatchMessage(message);
- }
2.ClientSession是绑定了根据userId,玩家登录后就也确定了roleId。 所以将上一步的请求和这个玩家信息封装为上下文如:OnlineMsgParam,然后嵌套Instance,扔到xdb线程池中执行
- // 这个是使用xdb.Executor.getInstance()是单例类,是支持了Timeout
- xdb.Executor.getInstance().execute(() ->
- client2LogicMsgInstance.handle(message, "userId", userId));
其中这个xdb.Executor.getInstance()是一个支持超时的线程池TimeoutExecutor
execute方法:
- public void execute(Runnable command) {
- super.execute(xdb.Angel.decorateRunnable(command, defaultTimeout));
- }
可见这是一个被包装为支持超时的任务了。
3.这个任务中很可能是一个修改操作,比如:领奖,那么这个业务Task被执行时,里面肯定是触发了一个submit的提交,咱们看下实现:
- @MsgReceiver(MsgArtifact.CSAddPositionExp.class)
- public static void onCSAddPositionExp(OnlineMsgParam param) {
-
- // 这个业务体肯定是在xdb线程中,同时也接收了超时检测,但是我想这个超时检测仅仅是:
- // 这个方法体的,毕竟submit的执行还是异步的。所以一般不会超时,除非带有select这种超时了
-
- new PAddArtifactBaseExp(param.getHumanId(), equips).submit();
- }
submit的实现如下:
- public final Future<Procedure> submit() {
- // 首先验证下是不是已经在事务内了,在事务内了是不能submit了
- verify();
- // 看着是new了一个ProcedureFuture对象,实际还是任务的提交
- return new ProcedureFuture
(this); - }
看下构造方法:
- public ProcedureFuture(P p) {
- // 保存下事务
- this.p = p;
-
- // 扔到带有超时检测的线程池中
- // 实现的是Future接口
- future = Xdb.executor().getProcedureTimeoutExecutor()
- .submit(this, p, p.getConf().getMaxExecutionTime());
-
- // 默认情况下,啥都不干
- this.done = null;
- }
可见是把当前事务又包装了下,这个submit是自己写的,注意:主角Angle登场了。
其实还是调用的java线程池的submit,只不过是多了一层Angle的包装:
- public
Future submit(Runnable task, T result, long timeout) { - xdb.Worker.debugHunger(this);
- return super.submit(xdb.Angel.decorate(task, result, timeout));
- }
这个Angle包装下是为了啥呢,接下来看:
- public static <V> Callable<V> decorate(Runnable task, V result, long timeout) {
- // 将Runnable包装为Callable
- final Callable<V> callable = Executors.callable(task, result);
-
- // 默认肯定是有超时检测的,因此这里就是:TimeoutCallable
- return timeout > 0 ? new TimeoutCallable<>(callable, timeout) : callable;
- }
可见是为了这个TimeoutCallable又进行了包装(又使用TimeoutManager带上了超时检测)
- @Override
- public V call() throws Exception {
- if (timeout > 0) {
- runner = Thread.currentThread();
-
- // 将此任务开始执行前扔到TimeoutManager中,如果到时候没移除,则说明此任务执行超时
- final TimeoutManager tm = TimeoutManager.getInstance();
- tm.schedule(this, timeout);
-
- try {
- // 任务真正开始执行
- return inner.call();
- } finally {
- // 执行完毕后,移除
- tm.remove(this);
- runner = null;
- }
- } else {
- return inner.call();
- }
- }
至此,是不是有点迷糊了,再次后头看一下,其实这个inner.call()执行的实际是啥呢?其实就是:
ProcedureFuture的方法体,也就是最初:new ProcedureFuture的地方,我们看下这个执行体(只不过这个执行体又被包装了支持了超时检测)
- @Override
- public void run() {
- // 存储过程开始执行,执行次数+1
- ++ranTimes;
-
- try {
- // 创建事务和执行存储过程。
- try {
- // 核心方法:所以这个里面调用的还是call方法
- Transaction.create().perform(p);
- } finally {
- // safe if create fail
- Transaction.destroy();
- }
-
- // 正常存储过程执行结束
- // 但是目前看done变量为空,等于这句啥也没做
- done();
- } catch (XLockDead e) { // 这个异常何时被抛出来呢?其实就是:Lockkey中的无参lock方法使用的是lockInterruptibly,在被超时打断的时候会跑出来
- /** @see Lockey#lock() */
-
- // 重试次数过多
- if (ranTimes >= p.getConf().getRetryTimes()) {
- done();
-
- // 达到最大重复次数.报告最终错误.
- throw new XAngelError();
- }
-
- // 下面是发生死锁了,随机一个时间,进行重试
- int delay = Xdb.random().nextInt(p.getConf().getRetryDelay());
-
- // 再次提交任务到线程池
- future = Xdb.executor().getScheduledTimeoutExecutor().schedule(
- Executors.callable(this, p),
- delay,
- TimeUnit.MILLISECONDS,
- p.getConf().getMaxExecutionTime());
-
- // 报告死锁错误,future打断当前的监视对象,重新监视。
- throw e;
- } catch (Error error) {
- done();
- throw error;
- } catch (Throwable e) {
- done();
- // 有其他方法,不需要包装一下,直接扔出去吗?
- throw new XError(e);
- }
- }
我们看下Transaction的perform方法干了啥?其实最重要的就是调用了call方法,从而创建事务,
并且执行我们的process方法
- public void perform(Procedure p) throws Throwable {
- try {
- // 总数 = .True(未统计此项) + .False + .Exception
- //counter.increment(p.getClass().getName());
- totalCount.incrementAndGet();
-
- // flush lock . MEMORY类型的表本来不需要这个锁,为了不复杂化流程,不做特殊处理。
- Lock flushLock = Xdb.getInstance().getTables().flushReadLock();
- flushLock.lockInterruptibly();
- try {
- // 重点方法call!!!
- if (p.call()) {
- if (_real_commit_() > 0) {
- logNotify(p);
- // else : 没有修改,不需要logNotify。至此过程处理已经完成了。
- }
- } else {
- // 执行逻辑返回false统计
- //counter.increment(p.getClass().getName() + ".False");
- totalFalse.incrementAndGet();
- _last_rollback_(); // 应用返回 false,回滚
- }
- } catch (Throwable e) {
- // 未处理的异常,回滚
- _last_rollback_();
- throw e;
- } finally {
- // 有多把锁
- if (deadLockDetection && lockList.size() > 1) {
- //死锁风险检测
- deadlockDetection(p.getClass().getName());
- }
-
- this.doneRunAllTask();
- this.finish();
- flushLock.unlock();
- }
-
- } catch (Throwable e) {
- p.setException(e);
- p.setSuccess(false);
- // 执行异常统计
- //counter.increment(p.getClass().getName() + ".Exception");
- totalException.incrementAndGet();
- // 所有的异常错误都应该处理,尽量不抛到这里。这里仅记录日志。
- Trace.error("Transaction Perform Exception " + p.getClass().getName(), e);
- throw e;
- }
- }
然后进入到高潮部分,也就call方法,也就是调用我们业务层的process,从而真正业务执行部分(同时在业务异常或者返回false时将本地缓存回滚,也就是log删除掉)
- public boolean call() {
- // 当前如果不在事务内
- if (Transaction.current() == null) {
- try {
- // perform 将回调本函数,然后执行事务已经存在的分支。
- // 何时被加上事务的呢?其实就是这个Transaction.create()中执行的,当前ThreadLocal没存,则设置下
- Transaction.create().perform(this);
- } catch (Throwable e) {
- // this.setException(e); 在 Transaction.perform 里面会保存异常。这里没什么事可做了。
- } finally {
- Transaction.destroy();
- this.fetchTasks();
- }
- return this.isSuccess();
- }
-
- // 执行到这里必然是处于事务中了,则记录下保存点
- int savepoint = beginAndSavepoint();
-
- // 捕捉所有异常,在发生异常和process返回false时,回滚到过程开始的保存点。
- // 不捕捉错误,所有的错误抛到外层。
- try {
- if (process()) {
- commit();
- this.setSuccess(true);
- return true;
- }
- } catch (Exception ex) {
- this.setException(ex);
- logErrorFunc.accept(ex);
- }
-
- // 进行业务的回滚
- rollback(savepoint);
-
- return false;
- }
经过上面的分析,我们看出来,其实就是业务的执行时,仅仅是使用ThreadLocal保存了当前new出来的事务对象,然后接着执行我们的process方法了,里面就是我们游戏层的业务实现了,我们分析下process方法如何拿锁的,我们直接看秘宝的process方法:
1.映入眼帘的肯定是这一句
ArtifactBean artifactBean = Artifact.get(humanId);
2.看一下get方法
- public static xbean.ArtifactBean get(Long key) throws Exception {
- return _Tables_.getInstance().artifact.get(key);
- }
3.这个方法是重载的
- public final V get(K key) throws Exception {
- return get(key, true);
- }
4.接下来看实现
- public final V get(K key, boolean holdNull) throws Exception {
- if (null == key) {
- throw new NullPointerException("key is null");
- }
-
- countGet.incrementAndGet();
-
- // 从事务本身里先进行查询,也就是在事务内拿过一次锁之后,以后不会再重复拿了
- final Transaction currentT = Transaction.current();
-
- // 先从本地事务哪个普通的Map对象中拿缓存.这段代码算是优化了
- TRecord<K, V> rCached = currentT.getCachedTRecord(this, key);
- if (rCached != null) {
- return rCached.getValue();
- }
-
- // 事务缓存中没拿到,就生成一把锁
- Lockey lockey = Lockeys.get(this, key);
-
- // 这里调用的是:lockInterruptibly方法
- lockey.lock();
- try {
- // 这个是实现LRU算法的,从本地缓存取数据,对应的类是:TTableCacheLRU,取缓存会有一次synchronized的调用,所以上面还有一份事务内的缓存加快访问
- // LRU底层是:包装的LinkedHashMap实现,删除策略是自己实现的
- TRecord<K, V> r = cache.get(key);
-
- // 缓存中不存在
- if (null == r) {
- // 记录下缓存miss了
- countGetMiss.incrementAndGet();
-
- // 缓存中也没有,那就查询sql了
- V value = _find(key);
-
- // sql也没查询到
- if (null == value) {
- countGetStorageMiss.incrementAndGet();
- if (holdNull) {
- currentT.add(lockey);
- }
- return null;
- }
-
- // sql查询到了
- r = new TRecord<K, V>(this, value, lockey, TRecord.State.INDB_GET);
-
- // 先记录数据到LRU缓存中
- cache.addNoLog(key, r);
- }
-
- // 下面其实是记录一次缓存到本地事务中,这样子不是每次都从LRU cache中取,可以减少一次锁的访问
- // 重点:记录当前事务拿到的锁! 死锁检测就从这里入手了
- // 注意: 里面还是有一次lock。 所以下面finally会先unlock一次
- currentT.add(lockey);
-
- // 记录下本次事务用过的缓存
- currentT.addCachedTRecord(this, r);
-
- // 返回取到的值
- return r.getValue();
- } finally {
- lockey.unlock();
- }
- }
分析:
可以看出来,当前缓存的话,是根据Transaction中一个普通的Map中拿到的,首先根据当前的表名字和key取缓存,毕竟会涉及到多张表,所以是要传入表对象的。
这样子其实有2份缓存:1份是事务内的缓存,一份是:根据表+synchronized。
这样子拿过缓存后,就不会再加锁了直接取缓存。否则会查询缓存。
缓存查询不到,查sql,查到了,则记录到本地事务中
前面分析了,何时释放所有的锁呢?
Transaction.java
- /**
- * 结束事务,释放所有锁并且清除,清除wrapper。
- */
- private void finish() {
- wrappers.clear();
-
- // 没有按照lock的顺序unlock。
- for (Lockey lockey : locks.values()) {
- // Trace.debug("unlock " + lockey);
- try {
- lockey.unlock();
- } catch (Throwable e) {
- Trace.fatal("unlock " + lockey, e);
- }
- }
- locks.clear();
- cachedTRecord.clear();
- }
如何进行超时检测呢?
在Exector初始化的时候,就负责开启了一个定时器进行超时检测的处理
- this.scheduled.scheduleWithFixedDelay(
- // 执行超时检测
- xdb.util.TimeoutManager.getInstance(),
-
- timeoutPeriod,
- timeoutPeriod,
- TimeUnit.MILLISECONDS);
检测到超时后,进行打断,咱们的Xdb线程池中的线程都是Worker线程:
- @Override
- public void onTimeout() {
- final Thread r = runner;
-
- // 如果这个r为null,则说明已经执行完了
- if (r != null) {
- if (r instanceof Worker) {
- ((Worker) r).angelInterrupt();
- } else {
- r.interrupt();
- }
- }
- }
打断方法实现如下:
- /**
- * 这个是Worker被打断时,多执行一个标记
- */
- public void angelInterrupt() {
- angel.set(true);
- super.interrupt();
- }
Lockey中使用的是可被打断的lockInterruptibly:
- /**
- * 这个非常的重要,
- */
- public final void lock() {
- try {
- lock.lockInterruptibly();
- } catch (InterruptedException ex) {
- // 这里我认为其实未必就是真的死锁,有可能业务执行繁忙超时后,也会被打断?
- if (Worker.angelInterrupted()) {
- throw new XLockDead();
- }
-
- throw new XLockInterrupted(this.toString());
- }
- }
3.这样子在ArtifactMsgHandler的一个方法处理器中,就会到一个xdb线程池中执行。
1.
1.
1.