2024年3月31日发(作者:)
信 息 技 术
DOI:10.16661/.1672-3791.2016.07.015
2016 NO.07
SCIENCE & TECHNOLOGY INFORMATION
科技资讯
采用ScheduledThreadPoolExecutor执行定时重
试任务时内存溢出的分析及解决
①
余志坚
1
姜春志
2
(1.武汉理工大学机电工程学院 湖北武汉 430070;2.东华软件股份公司 北京 100000)
摘 要:开发JavaWeb项目中发现服务之间的调用存在超时情况,由于涉及的处理逻辑全部是异步,引入定时重试的机制,
重试工具选择了JDK自带的ScheduledThreadPoolExecutor。当A服务依赖B服务,B服务由于在业务高峰期处理能力降低,导致大
量A服务过来的请求超时,A加入了超时重试机制,间隔时间根据重试次数的多少来决定,次数越多,两次重试之间间隔的时间
越多,此时的业务高峰也会给A带来大量请求,大量的超时会导致重试队列迅速堆积,直到内存溢出。该文从线程池工作机制、
ScheduledThreadPoolExecutor实例的创建,获取重试任务的过程以及提交任务的过程角度分析,并通过源代码的剖析和测试工
具MyEclipse进行演示测试内存泄露的情况,得出避免内存泄露的解决方案。
关键词:ScheduledThreadPoolExecutor 线程池 内存溢出
中图分类号:TP3文献标识码:A 文章编号:1672-3791(2016)03(a)-0015-03
1 ScheduledThreadPoolExecutor实例的创建过程及线程
池工作机制
1.1 ScheduledThreadPoolExecutor实例的创建过程
重试工具选择了JDK自带的ScheduledThreadPoolExecutor。
ScheduledThreadPoolExecutor实例的创建过程如下:
ScheduledThreadPoolExecutor实例的创建过程如下:(1)获取当
前机器上处理器的数量;(2)使用Google的ThreadFactoryBuiler
创建指定格式名称的线程,以方便查看问题;(3)有需要被拒绝的
任务时,抛出异常;(4)创建定时任务池;打开MyEclipse工具显示
相对的代码:int corePoolSize=time().
availableProcessors();
ThreadFactory tf=new ThreadFactoryBuilder().
setNameFormat("FailureRetryTask-pool-%d").build();
RejectedExecutionHandler handler=new
olicy();
ScheduledThreadPoolExecutor taskService=new
ScheduletThreadPooExecutor(corePoolSize,tf,handler);
线程池就是多个线程在一个队列中取任务执行,提交的任务
会被放入队列中等待线程执行,故队列要设置一个大小。线程池
同样会根据任务繁忙程度来动态调整连接数,空闲时保持最小连
接数,繁忙时增加连接,但不会超过上限,具有伸缩性,线程的创
建和销毁也需要消耗系统资源,线程的连接重用就可以避免这部
分损失,具有重用性。
1.2 线程池工作机制
线程获取任务的策略就是如果当前线程池运行状态正常,则
阻塞等待任务,否则直接返回或等待有限时间后返回。线程池中
线程的主要任务就是获取任务,然后执行,然后再去获取任务,如
此循环,这就实现了线程池中线程的可重用。
Worker封装了任务,同时创建了新的线程,并被添加到集合
workers中,这个workers其实就是最核心的线程池。通过run方法
实现重用。private final HashSet
HashSet
public void run(){
try{
Runnable task=firstTask;
firstTask=null;
while(task!=null||(task=getTask())!=null){
runTask(task);
task=null;}}
finally{
workerDone(this);
}
}
Runnable getTask(){
for(;;){
try{
表1 实验结果
Class
Task$Sync
ledThreadPoolExecutor$ScheduledFutureTask
ors$RunnableAdapter
①作者简介:余志坚(1989—),男,汉,湖北黄冈人,硕士在读,研究方向:汽车零部件。
Instance Count
276578
276578
276578
Total Size
3956808
8850496
2212624
科技资讯 SCIENCE & TECHNOLOGY INFORMATION
15
科技资讯
2016 NO.07
SCIENCE & TECHNOLOGY INFORMATION
int state=runState;
if(state>SHUTDOWN){return null;}
Runnable r;
if(state==SHUTDOWN){r=();}
else if(poolSize>corePoolSize||allowCoreThreadTimeOut)
{
r=(keepAliveTime,TimeUnit.
NANOSECONDS);
}else{r=();}
if(r!=null){return r;}
if(workerCanExit()){
if(runState>=SHUTDOWN){interruptIdleWorkers
();}
return null;
}
}
catch(InterruptedException ie){}
}
}
private boolean workerCanExit(){
final RenntrantLock mainLock=ck;
();
boolean canExit; try{canExit=runState>
=STOP||y()||
(allowCoreThreadTimeOut&&poolSize>(1,
corePoolSize));
}finally{();}
return canExit;
}
如果此时线程池运行状态是终止(runState >= STOP),或
者队列为空,或者允许核心线程超时并且线程池中线程数量大于
最小线程数量,那么方法将返回true。再回到getTask方法,调用
workerCanExit方法的前提是没有获取到任务,根据上边获取任
务的过程,这几个条件都有可能成立,所以此时getTask方法可以
返回null,上层Worker的run方法从while循环重返回,整个线程
结束,这就实现了线程池的可伸缩。
2 ScheduledThreadPoolExecutor获取任务的过程
在getTask()中,描述了整个获取任务的过程,如果线程池运
行状态已经是SHUTDOWN了,调用非阻塞方法poll,因为如果当
前有任务,那么可以获取到任务并返回,如果没有任务,也没有必
要阻塞在队列上等待任务,因为已经SHUTDOWN,后续不会再
有任务进入。
如果当前线程数大于最小线程数,或者核心线程也可以做超
时处理,意味着如果获取不到任务就可以销毁一部分线程了,所
以poll方法设置了等待时间,超时后立即返回。
另一种情况是线程池还在运行状态,并且当前线程数不大
于最小线程数,同时也不允许最小线程数以内的线程超时,这个
时候线程就要调用阻塞方法take,等待任务进入队列以后才返
回。
16
科技资讯 SCIENCE & TECHNOLOGY INFORMATION
信 息 技 术
3 ScheduledThreadPoolExecutor提交任务的执行过程
ScheduledThreadPoolExecutor提交任务的执行过程,首先提
交任务:taskService .schedule(new Runnable(){public void
run(){}},1,TimeUnit,DAYS);
ScheduledThreadPoolExecutor通过schedule方法提交定时
任务,schedule方法源码如下:
public ScheduledFuture> schedule(Runnable
command,long delay,TimeUnit unit){
if(command==null||unit==null){throw new
NullPointerException();}
if(delay<0){delay=0;}
RunnableScheduledFuture> t=decorateTask(command,
new ScheduledFutureTask
triggerTime));
delayedExecute(t);
return t;
}
提交的任务会被封装成ScheduledFutureTask类型对象。
分析delayedExecute方法:private void delayedExecute
(Runnable command){
if(isShutDown()){reject(command);return;}
if(getPoolSize() ue().add(command); } 如果线程的运行状态不是RUNNNING或者入队列没有成 功,则采用线程池的构造方法中设置的拒绝策略来处理任务。 如果当前线程池中的线程数量poolSize小于线程池核心线程的数 量corePoolSize,执行prestartCoreThread(),该方法会创建一个新 的线程来执行任务,如果prestartCoreThread()创建的新线程执行 任务失败或者当前线程池中的线程数量poolSize大于等于线程池 核心线程数量corePoolSize,当若线程池的运行状态是RUNNING 并且入队成功,由于在多线程环境下,状态随时可能会改变,此时 线程池的运行状态runState不是RUNNING或者线程池中没有可 用的线程(poolSize==0),要确保进入的任务被执行处理,线程池 在初始化完成以后是空的,并没有线程,如果在服务器中使用线 程池,服务重启后有大量请求进入,则要同时创建多个线程,而且 创建过程是加锁同步的,会导致一定的竞争,解决办法就是线程 池初始化时调用prestartAllCoreThreads方法启动核心线程数量 的线程,这样就能在线程池中的线程就绪以后才开始接收请求。 通过getQueue方法获取任务队列,并且调用add方法向队列 中添加任务,dq的定义: private final DelayQueue dq=new DelayQueue public boolean add(Runnable x){return ((RunnableScheduleFuture)x);} 可以看出dq是阻塞队列,线程池中的线程都是在队列中取数 据,ScheduledThreadPoolExecutor中的构造方法里的队列的实 现使用链表结构的阻塞队列,add方法内部调用offer方法,offer源 码如下:public boolean offer(E e){ final ReentrantLock lock=(); 信 息 技 术 (); try{ E first=(); (e); if(first==null||eTo(first)<0){ All(); return true; } }finally{();}} 这方法需要在多线程环境下同步执行,会用到锁Lock。锁实现 的大概原理如下。 Lock实现锁的方式是通过排他性更新共享变量,更新成功的 线程继续执行,没有更新成功的线程将会被阻塞。Lock的共享变 量state在可重入锁中可以用来表示一个线程调用了几次lock方 法,也就是有几次获取锁的行为。Lock的功能实现是通过内部聚 合了抽象队列同步器(AQS),同步器有公平和非公平之分。非公 平同步器对于新来的线程会尝试获取,不成功以后才会进入等待 队列,而公平同步器则会首先判断是否排队。AQS中会保存获取 锁的当先线程的引用。如果一次性尝试获取锁不成功,则线程会 进入队列,循环尝试获取锁。 peek方法会获取队列的第一个元素,只是获取,并没有出队 列。接着调用优先级队列PriorityQueue类型变量q的offer方法将 队列入队,优先级队列会对任务进行排序,距离执行时间越近,位 置越靠前。下边的if判断可以这样理解,first是在当前任务入队之 前获取的,也就是队列中原有的第一个任务,compareTo的这段比 较是说当前任务的执行时间比队列中第一个任务执行时间还要 早,如果first是null,那么当前任务入队后将是第一个元素,如果 当前任务的执行时间比队列中第一个任务的执行时间早,那么当 前入队后也将是第一个元素,只要这两个条件有一个成立了,这 个if的判断条件就为true,就要执行Condition类型的available变 量的signalAll方法,唤醒等待的线程工作。 4 队列的大小判断 队列的大小是决定内存溢出最直观的因素,首先来看看优先 级队列PriorityQueue的offer方法:public boolean offer(E e){ if(e==null){throw new NullPointerException();} modCount++; int i=size; if(i>=){grow(i+1);} size=i+1; if(i==0){queue[0]=e;} else{siftUp(i,e);} return true; 上述代码表示如果队列中元素的个数(size)大于等于队列的 长度,将要通过grow方法扩容,如下:private void grow(int minCapacity){ if(minCapacity<0){throw new OutOfMemoryError();} int oldCapacity=; int newCapacity=((oldCapacity<64)?((oldCapacity+1)*2): ((oldCapacity/2)*3)); if(newCapacity<0){newCapacity=_VALUE;} 2016 NO.07 SCIENCE & TECHNOLOGY INFORMATION 科技资讯 if(newCapacity {newCapacity=minCapacity;} queue=(queue,newCapacity); } 若队列容量小于64,那就在原有基础上加1然后扩大2倍,这种 情况绝对不会造成内存的溢出问题。如果大于等于64呢?直接扩 容一半,然后将值赋给一个int型变量,当某种情况如果超过int类 型的最大值了,JDK的处理是赋值成Integer的MAX_VALUE为 2147483647,也就是最大的队列长度是2G多,如果一个对象的大 小按照50个字节来算,将会占用100G的内存必定溢出。 5 模拟内存溢出代码测试 当业务高峰给服务器带来大量请求,大量的超时会导致重试 队列迅速堆积,直到内存溢出,下面就通过代码来测试一下:模拟 大量的添加任务,并且任务在调度队列中堆积,推迟一天执行。 while(true){ le(new Runnable(){public void run() {}},1,); } 虚拟机启动参数-: -Xms32M -Xmx32M -Xmn10M-XX: +HeapDumpOnOutOfMemoryError -XX: HeapDumpPath=d:/ 运行输出: emoryError:Java heap space Dump- ing head to d:/java_... Heap dump file created [44940425 bytes in 0.618 secs] 内存溢出了,来看看内存快照(见表1)。 6 解决方案及措施 编译好的java程序需要运行在JVM中,而JVM为程序提供并 管理所需要的内存空间,JVM自带一个用于回收没有任何引用指 向的对象的线程机制(垃圾回收器),但针对于 ScheduledThreadPoolExecutor提交的任务会被封装成 ScheduledFutureTask类型对象且每个对象中又有Sync成员变 量。解决的办法可以是手动判断队列的大小,通过taskService. getQueue().size()方法,通过Jmap内存分析工具估算每个对象的 大小,Jmap是一个可以输出所有内存中对象的工具,甚至可以将 JVM 中的heap,以二进制输出成文本。打印出某个Java进程内存 内的所有‘对象’的情况,结合能够为队列分配的内存大小,计算 出队列容纳任务的最大数量,以避免内存溢出。 参考文献 [1]逯昌浩.浅析多核处理器条件下的Java编程[J].中国科技信 息,2009(12):128,130. [2]张复兴,曾新洲.扩展线程池模型及性能分析[J].计算技术与 自动化,2007(4):110-112. [3](美)Bruce 编程思想[M].陈昊鹏,译.北京:机械工 业出版社,2007. 科技资讯 SCIENCE & TECHNOLOGY INFORMATION 17


发布评论