https://github.com/apache/dubbo/pull/9722/files
使用线程池的同学对于标题中的队列想必都有过使用,但上述队列使用不当时则会造成程序OOM,那怎么来控制呢?
使用ArrayBlockingQueue?如何来评估长度?
是否有一个完美的解决方案呢,MemorySafeLinkedBlockingQueue则通过对内存的限制判断尽面控制队列的容量,完成解决了可能存在的OOM问题。
获取内存大小(注:单位大B;支持准实时更新):
Runtime.getRuntime().freeMemory()//JVM中已经申请到的堆内存中还未使用的大小
Runtime.getRuntime().maxMemory()// JVM可从操作系统申请到的最大内存值 -Xxm
Runtime.getRuntime().totalMemory()// JVM已从操作系统申请到的内存大小 —Xxs可设置该值大小-初始堆的大小
线程池在excute任务时,放队列,放不进去,使用新线程运行任务。这个放不进行,是使用的offer??非阻塞方法吗?
参考:https://blog.csdn.net/weixin_43108539/article/details/125190023
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//拿到32位的int
int c = ctl.get();
//工作线程数<核心线程数
if (workerCountOf(c) < corePoolSize) {
//进入if,代表可以创建 核心 线程数
if (addWorker(command, true))
return;
//如果没进入if,代表创建核心线程数失败,重新获取 ctl
c = ctl.get();
}
//判断线程池为Running状态,将任务添加入阻塞队列,使用offer
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//再次判断是否为Running状态,若不是Running状态,remove任务
if (! isRunning(recheck) && remove(command))
reject(command);
//如果线程池在Running状态,线程池数量为0
else if (workerCountOf(recheck) == 0)
//阻塞队列有任务,但是没有工作线程,添加一个任务为空的工作线程处理阻塞队列中的任务
addWorker(null, false);
}
//阻塞队列已满,创建非核心线程,拒绝策略-addWorker中有判断核心线程数是否超过最大线程数
else if (!addWorker(command, false))
reject(command);
}
package com.zte.sdn.oscp.queue;
import cn.hutool.core.thread.NamedThreadFactory;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class MemoryLimitCalculator {
private static volatile long maxAvailable;
private static final AtomicBoolean refreshStarted = new AtomicBoolean(false);
private static void refresh() {
maxAvailable = Runtime.getRuntime().freeMemory();
}
private static void checkAndScheduleRefresh() {
if (!refreshStarted.get()) {
// immediately refresh when first call to prevent maxAvailable from being 0
// to ensure that being refreshed before refreshStarted being set as true
// notice: refresh may be called for more than once because there is no lock
refresh();
if (refreshStarted.compareAndSet(false, true)) {
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Dubbo-Memory-Calculator"));
// check every 50 ms to improve performance
scheduledExecutorService.scheduleWithFixedDelay(MemoryLimitCalculator::refresh, 50, 50, TimeUnit.MILLISECONDS);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
refreshStarted.set(false);
scheduledExecutorService.shutdown();
}));
}
}
}
/**
* Get the maximum available memory of the current JVM.
*
* @return maximum available memory
*/
public static long maxAvailable() {
checkAndScheduleRefresh();
return maxAvailable;
}
/**
* Take the current JVM's maximum available memory
* as a percentage of the result as the limit.
*
* @param percentage percentage
* @return available memory
*/
public static long calculate(final float percentage) {
if (percentage <= 0 || percentage > 1) {
throw new IllegalArgumentException();
}
checkAndScheduleRefresh();
return (long) (maxAvailable() * percentage);
}
/**
* By default, it takes 80% of the maximum available memory of the current JVM.
*
* @return available memory
*/
public static long defaultLimit() {
checkAndScheduleRefresh();
return (long) (maxAvailable() * 0.8);
}
}
package com.zte.sdn.oscp.queue;
import java.util.Collection;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class MemorySafeLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> {
private static final long serialVersionUID = 8032578371739960142L;
public static int THE_256_MB = 256 * 1024 * 1024;
private int maxFreeMemory;
private Rejector<E> rejector;
public MemorySafeLinkedBlockingQueue() {
this(THE_256_MB);
}
public MemorySafeLinkedBlockingQueue(final int maxFreeMemory) {
super(Integer.MAX_VALUE);
this.maxFreeMemory = maxFreeMemory;
//default as DiscardPolicy to ensure compatibility with the old version
this.rejector = new DiscardPolicy<>();
}
public MemorySafeLinkedBlockingQueue(final Collection<? extends E> c,
final int maxFreeMemory) {
super(c);
this.maxFreeMemory = maxFreeMemory;
//default as DiscardPolicy to ensure compatibility with the old version
this.rejector = new DiscardPolicy<>();
}
/**
* set the max free memory.
*
* @param maxFreeMemory the max free memory
*/
public void setMaxFreeMemory(final int maxFreeMemory) {
this.maxFreeMemory = maxFreeMemory;
}
/**
* get the max free memory.
*
* @return the max free memory limit
*/
public int getMaxFreeMemory() {
return maxFreeMemory;
}
/**
* set the rejector.
*
* @param rejector the rejector
*/
public void setRejector(final Rejector<E> rejector) {
this.rejector = rejector;
}
/**
* determine if there is any remaining free memory.
*
* @return true if has free memory
*/
public boolean hasRemainedMemory() {
return MemoryLimitCalculator.maxAvailable() > maxFreeMemory;
}
@Override
public void put(final E e) throws InterruptedException {
if (hasRemainedMemory()) {
super.put(e);
} else {
rejector.reject(e, this);
}
}
@Override
public boolean offer(final E e, final long timeout, final TimeUnit unit) throws InterruptedException {
if (!hasRemainedMemory()) {
rejector.reject(e, this);
return false;
}
return super.offer(e, timeout, unit);
}
@Override
public boolean offer(final E e) {
if (!hasRemainedMemory()) {
rejector.reject(e, this);
return false;
}
return super.offer(e);
}
}
注意其中的rejector是拒绝策略,默认的DiscardPolicy什么也不处理;
而DiscardOldPolicy的处理逻辑很简单
public class DiscardOldestPolicy<E> implements Rejector<E> {
@Override
public void reject(final E e, final Queue<E> queue) {
queue.poll();
queue.offer(e);
}
}
AbortPolicy则直接抛出异常
public class AbortPolicy<E> implements Rejector<E> {
@Override
public void reject(final E e, final Queue<E> queue) {
throw new RejectException("no more memory can be used !");
}
}
个人建议增加日志打印即可。