• 最佳实践-LinkBlockingQueue改进


    LinkBlockingQueue改进

    问题背景

    https://github.com/apache/dubbo/pull/9722/files
    使用线程池的同学对于标题中的队列想必都有过使用,但上述队列使用不当时则会造成程序OOM,那怎么来控制呢?

    使用ArrayBlockingQueue?如何来评估长度?

    是否有一个完美的解决方案呢,MemorySafeLinkedBlockingQueue则通过对内存的限制判断尽面控制队列的容量,完成解决了可能存在的OOM问题。

    获取内存大小(注:单位大B;支持准实时更新):

    Runtime.getRuntime().freeMemory()//JVM中已经申请到的堆内存中还未使用的大小
    Runtime.getRuntime().maxMemory()// JVM可从操作系统申请到的最大内存值 -Xxm
    Runtime.getRuntime().totalMemory()// JVM已从操作系统申请到的内存大小 —Xxs可设置该值大小-初始堆的大小
    
    • 1
    • 2
    • 3

    线程池在excute任务时,放队列,放不进去,使用新线程运行任务。这个放不进行,是使用的offer??非阻塞方法吗?

    参考:https://blog.csdn.net/weixin_43108539/article/details/125190023

     public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
         	//拿到32位的int
            int c = ctl.get();
         	//工作线程数<核心线程数
            if (workerCountOf(c) < corePoolSize) {
                //进入if,代表可以创建 核心 线程数
                if (addWorker(command, true))
                    return;
                //如果没进入if,代表创建核心线程数失败,重新获取 ctl
                c = ctl.get();
            }
            //判断线程池为Running状态,将任务添加入阻塞队列,使用offer
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                //再次判断是否为Running状态,若不是Running状态,remove任务
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                //如果线程池在Running状态,线程池数量为0
                else if (workerCountOf(recheck) == 0)
                    //阻塞队列有任务,但是没有工作线程,添加一个任务为空的工作线程处理阻塞队列中的任务
                    addWorker(null, false);
            }
            //阻塞队列已满,创建非核心线程,拒绝策略-addWorker中有判断核心线程数是否超过最大线程数
            else if (!addWorker(command, false))
                reject(command);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    空闲内存计算

    package com.zte.sdn.oscp.queue;
    
    import cn.hutool.core.thread.NamedThreadFactory;
    
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicBoolean;
    
    public class MemoryLimitCalculator {
    
        private static volatile long maxAvailable;
    
        private static final AtomicBoolean refreshStarted = new AtomicBoolean(false);
    
        private static void refresh() {
            maxAvailable = Runtime.getRuntime().freeMemory();
        }
    
        private static void checkAndScheduleRefresh() {
            if (!refreshStarted.get()) {
                // immediately refresh when first call to prevent maxAvailable from being 0
                // to ensure that being refreshed before refreshStarted being set as true
                // notice: refresh may be called for more than once because there is no lock
                refresh();
                if (refreshStarted.compareAndSet(false, true)) {
                    ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Dubbo-Memory-Calculator"));
                    // check every 50 ms to improve performance
                    scheduledExecutorService.scheduleWithFixedDelay(MemoryLimitCalculator::refresh, 50, 50, TimeUnit.MILLISECONDS);
                    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                        refreshStarted.set(false);
                        scheduledExecutorService.shutdown();
                    }));
                }
            }
        }
    
        /**
         * Get the maximum available memory of the current JVM.
         *
         * @return maximum available memory
         */
        public static long maxAvailable() {
            checkAndScheduleRefresh();
            return maxAvailable;
        }
    
        /**
         * Take the current JVM's maximum available memory
         * as a percentage of the result as the limit.
         *
         * @param percentage percentage
         * @return available memory
         */
        public static long calculate(final float percentage) {
            if (percentage <= 0 || percentage > 1) {
                throw new IllegalArgumentException();
            }
            checkAndScheduleRefresh();
            return (long) (maxAvailable() * percentage);
        }
    
        /**
         * By default, it takes 80% of the maximum available memory of the current JVM.
         *
         * @return available memory
         */
        public static long defaultLimit() {
            checkAndScheduleRefresh();
            return (long) (maxAvailable() * 0.8);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72

    内存安全队列

    package com.zte.sdn.oscp.queue;
    
    import java.util.Collection;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.TimeUnit;
    
    public class MemorySafeLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> {
    
        private static final long serialVersionUID = 8032578371739960142L;
    
        public static int THE_256_MB = 256 * 1024 * 1024;
    
        private int maxFreeMemory;
    
        private Rejector<E> rejector;
    
        public MemorySafeLinkedBlockingQueue() {
            this(THE_256_MB);
        }
    
        public MemorySafeLinkedBlockingQueue(final int maxFreeMemory) {
            super(Integer.MAX_VALUE);
            this.maxFreeMemory = maxFreeMemory;
            //default as DiscardPolicy to ensure compatibility with the old version
            this.rejector = new DiscardPolicy<>();
        }
    
        public MemorySafeLinkedBlockingQueue(final Collection<? extends E> c,
                                             final int maxFreeMemory) {
            super(c);
            this.maxFreeMemory = maxFreeMemory;
            //default as DiscardPolicy to ensure compatibility with the old version
            this.rejector = new DiscardPolicy<>();
        }
    
        /**
         * set the max free memory.
         *
         * @param maxFreeMemory the max free memory
         */
        public void setMaxFreeMemory(final int maxFreeMemory) {
            this.maxFreeMemory = maxFreeMemory;
        }
    
        /**
         * get the max free memory.
         *
         * @return the max free memory limit
         */
        public int getMaxFreeMemory() {
            return maxFreeMemory;
        }
    
        /**
         * set the rejector.
         *
         * @param rejector the rejector
         */
        public void setRejector(final Rejector<E> rejector) {
            this.rejector = rejector;
        }
    
        /**
         * determine if there is any remaining free memory.
         *
         * @return true if has free memory
         */
        public boolean hasRemainedMemory() {
            return MemoryLimitCalculator.maxAvailable() > maxFreeMemory;
        }
    
        @Override
        public void put(final E e) throws InterruptedException {
            if (hasRemainedMemory()) {
                super.put(e);
            } else {
                rejector.reject(e, this);
            }
        }
    
        @Override
        public boolean offer(final E e, final long timeout, final TimeUnit unit) throws InterruptedException {
            if (!hasRemainedMemory()) {
                rejector.reject(e, this);
                return false;
            }
            return super.offer(e, timeout, unit);
        }
    
        @Override
        public boolean offer(final E e) {
            if (!hasRemainedMemory()) {
                rejector.reject(e, this);
                return false;
            }
            return super.offer(e);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98

    拒绝策略

    注意其中的rejector是拒绝策略,默认的DiscardPolicy什么也不处理;

    而DiscardOldPolicy的处理逻辑很简单

    public class DiscardOldestPolicy<E> implements Rejector<E> {
    
        @Override
        public void reject(final E e, final Queue<E> queue) {
            queue.poll();
            queue.offer(e);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    AbortPolicy则直接抛出异常

    public class AbortPolicy<E> implements Rejector<E> {
    
        @Override
        public void reject(final E e, final Queue<E> queue) {
            throw new RejectException("no more memory can be used !");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    个人建议增加日志打印即可。

  • 相关阅读:
    房屋差价能否作为非违约方的损失
    校招|拿到腾讯、阿里、字节等10家互联网测试开发岗的offer
    Pr 时间重映射卡点
    【天梯赛 - L2习题集】啃题(6 / 44)
    iOS swift5 提示信息显示,提示弹框,第三方框架XHToastSwift
    母婴行业探秘:千万级会员体量下的精准营销
    Brocade FOS下载 博客光交换机固件升级
    类似东郊到家预约家政保洁小程序搭建
    QQ小程序——无法正常创建项目与uniapp联动问题
    【Linux集群教程】13 集群安全防御 - SELinux 功能
  • 原文地址:https://blog.csdn.net/sunquan291/article/details/126763158