2023年11月29日发(作者:)

记⼀次线程池任务执⾏异常

记⼀次线程池任务执⾏异常

⼀个名为 fetch- 线程池负责从Redis中读取⽂本数据,将读取到的⽂本数据提交给另⼀个线程池 tw-,将 tw- 线程池将任务通过HTTP请求的形式上报给过滤服务。如下图所

⽰:

⼀开始采⽤默认线程池配置⽅式:

private final BlockingQueue taskQueue = new LinkedBlockingQueue<>(1000 * 20);

private final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("fetch-%d").build();

private final ThreadPoolExecutor executorService = new ThreadPoolExecutor(3, nThreads, 1, ,

taskQueue, threadFactory, new RunsPolicy());

然后只提交三个任务startService() 是个 pipeline 形式不停地从redis上读取⽂本数据。程序运⾏⼀段时间之后,就卡死了。

startService()while(true)

//nThreads 3

for(int i = 0; i < nThreads; i++) {

e(()->{

startService();

});

}

}

CPU、内存以及程序的GC⽇志,都是正常的。发现:

sudo jstack -l pid

"fetch-26" #109 prio=5 os_prio=0 tid=0x00007fbfe00db000 nid=0xea76 waiting on condition [0x00007fc127bfc000]

"fetch-25" #108 prio=5 os_prio=0 tid=0x00007fbfec03c000 nid=0xea75 waiting on condition [0x00007fc1257dc000]

"fetch-24" #107 prio=5 os_prio=0 tid=0x00007fbf6c001000 nid=0xea74 waiting on condition [0x00007fc127cfd000]

执⾏从redis中读取⽂本任务的fetch- 线程池中的所有线程都阻塞了。由于提交的是Runnable任务,引⽤《Java并发编程实战》第七章中⼀段话:

导致线程提前死亡的最主要的原因是RuntimeException。由于这些异常表⽰出现了某种错误或者其他不可修复的错误,因此它们通常不会被捕获。它们不会在调

⽤栈中逐层传递,⽽是默认地在控制台中输出栈追踪信息,并终⽌线程

When a thread exits due to an uncaught exception, the JVM reports this event to our UncaughtExceptionHandler, otherwise the default handler just prints the

stack trace to standard error.

因此,我就⾃定义⼀个看看到底出现了什么错误:

UncaughtExceptionHandler

public class FetchTextExceptionHandler implements htExceptionHandler {

private static final Logger logger = ger();

@Override

public void uncaughtException(Thread t, Throwable e) {

("fetch redis text exception,thread name:{},msg:{}", e(), sage());

}

}

UncaughtExceptionHandler

只是简单地记录⽇志,先找到出错原因再说。重新发版,上线⼀段时间后发现出现程序卡死了,这次有了异常⽇志:

2018-11-23 23:10:25.681 ERROR 29818 --- [fetch-0] extExceptionHandler : fetch redis text exception,thread name:fetch-0,msg:I/O error on POST

request for "xxx": Read timed out; nested exception is TimeoutException: Read timed out

2018-11-23 23:10:25.686 ERROR 29818 --- [fetch-2] extExceptionHandler : fetch redis text exception,thread name:fetch-2,msg:I/O error on POST

request for "xxx": Read timed out; nested exception is TimeoutException: Read timed out

2018-11-23 23:10:27.429 ERROR 29818 --- [fetch-1] extExceptionHandler : fetch redis text exception,thread name:fetch-1,msg:I/O error on POST

request for "xxx": Read timed out; nested exception is TimeoutException: Read timed out

⼀看这个⽇志有点奇怪,fetch线程只是读取redis上的⽂本数据,并将⽂本数据封装到⼀个Runnable任务⾥⾯提交给 Tw- 线程池,Tw-线程 才是发送HTTP POST 请求将数

据提交给过滤服务。

于是去检查创建Tw-线程池创建代码:发现了Tw-线程池采⽤的是延迟策略。

CallerRunsPolicy

private final ThreadFactory threadFactory = new ThreadFactoryBuilder()

.setUncaughtExceptionHandler(exceptionHandler).setNameFormat("tw-%d").build();

private final ThreadPoolExecutor executorService = new ThreadPoolExecutor(20, maximumPoolSize, 1, ,

taskQueue, threadFactory, new RunsPolicy());

也就是说:当Fecth-线程提交任务过快时,Tw-线程池的满了,让任务回退到调⽤者,任务由fetch-线程来执⾏了。因此,上⾯的⽇志打印出来的是线

taskQueueCallerRunsPolicy

程名字是fetch

thread name:

再引⽤⼀段话:

调⽤者运⾏策略(Caller-Runs)实现了⼀种调节机制,该策略不会抛弃任务,也不会抛出异常,⽽是将某些任务回退到调⽤者,从⽽降低新任务的流量。它不会

在线程池(这⾥的线程池是 tw-线程池)的某个线程中执⾏新提交的任务,⽽是在⼀个调⽤了execute的线程(fetch 线程)中执⾏该任务【fetch 线程执⾏execute tw-

线程池提交任务】

知道了异常出现的原因,于是我就把 tw-线程的 饱和策略从原来的修改成,再重新运⾏程序,⼀段时间后,发现 tw-线程池中的30个线程全部阻

CallerRunsPolicyAbortPolicy

塞,fetch-线程池中的三个线程也全部阻塞。如下图:

在程序中每个tw-线程隔20ms发送⼀次HTTP POST请求,将⽂本上报给过滤服务,30tw-线程,并发量⼤约是1500次每秒,每次提交的数据不超过30KB吧。

看程序输出的log⽇志:30tw- 线程 都是⼀样的异常

SocketTimeoutException,Read timed out

2018-11-24 09:16:47.885 ERROR 9765 --- [tw-310] TwExceptionHandler : http request report tw exception,thread name:tw-

310,cause:TimeoutException: Read timed out,msg:I/O error on POST request for "xxx": Read timed out; nested exception is

TimeoutException: Read timed out

3 fetch-线程的异常⽇志是:

rejected from PoolExecutor

2018-11-24 09:04:36.758 ERROR 9765 --- [fetch-2] extExceptionHandler : fetch redis text exception,thread name:fetch-2,msg:Task

ReportTwAuditService$$Lambda$75/259476123@7376559c rejected from PoolExecutor@75fa1939[Running, pool size = 30, active

threads = 30, queu

ed tasks = 50000, completed tasks = 20170]

这是因为 fetch-线程向 tw-线程池提交任务,⽽tw-线程池上⾯的饱和策略已经改成了,当tw-线程池任务队列满了时,tw-线程就把 fetch-线程 提交过来的任务给拒绝

AbortPolicy

了,并向fetch-线程抛出RejectedExecutionException 异常。

总结⼀下就是:30tw-线程因为发送HTTP POST请求给过滤服务出现 全部阻塞,⽽tw-线程池的饱和策略是即:丢弃任务并

SocketTimeoutException,Read timed outAbortPolicy

抛出RejectedExecutionException 异常,导致 fetch 线程阻塞,且提交给tw-线程池的任务被 abort,这就是上⾯那张图中所有线程都全部阻塞的原因。

再引⽤⼀段话:

⼯作线程在执⾏⼀个任务时被阻塞,如果等待⽤户的输⼊数据,但是⽤户⼀直不输⼊数据,导致这个线程⼀直被阻塞。这样的⼯作线程名存实亡,它实际上不执

⾏任何任务了。如果线程池中的所有线程都处于这样的状态,那么线程池就⽆法加⼊新的任务了。各种类型的线程池中⼀个严重的风险是线程泄漏,当从线程池

中除去⼀个线程以执⾏⼀项任务,⽽在任务完成后该线程却没有返回池时,会发⽣这种情况。发⽣线程泄漏的⼀种情形出现在任务抛出⼀个 RuntimeException

或⼀个 Error 时。如果池类没有捕捉到它们,那么线程只会退出⽽线程池的⼤⼩将会永久减少⼀个。当这种情况发⽣的次数⾜够多时,线程池最终就为空,⽽且

系统将停⽌,因为没有可⽤的线程来处理任务。

既然tw-线程发送HTTP请求出现了 SocketTImeoutException,那么来看看HTTP连接池的配置:

import ient;

import tConfig;

import tConnectionKeepAliveStrategy;

import ientBuilder;

import gHttpClientConnectionManager;

import ;

import uration;

import HttpRequestFactory;

import mponentsClientHttpRequestFactory;

import ssageConverter;

import HttpMessageConverter;

import mplate;

import it;

/**

* Created by Administrator on 2018/7/4.

* 配置 RestTemplate 连接池

*/

@Configuration

public class RestTemplateConfig {

/**

* /questions/44762794/java-spring-resttemplate-sets-unwanted-headers

* set http header explicitly: "Accept-Charset": "utf-8"

*/

@Bean

public RestTemplate restTemplate() {

RestTemplate restTemplate = new RestTemplate(httpRequestFactory());

for (HttpMessageConverter converter : sageConverters()) {

if (converter instanceof StringHttpMessageConverter) {

((StringHttpMessageConverter)converter).setWriteAcceptCharset(false);

}

}

return restTemplate;

}

@Bean

public ClientHttpRequestFactory httpRequestFactory() {

return new HttpComponentsClientHttpRequestFactory(httpClient());

}

@Bean

public HttpClient httpClient() {

//配置http长连接

PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager(30, S);

Total(1000);

aultMaxPerRoute(20);

RequestConfig requestConfig = ()

//服务器返回数据(response)的时间,超过该时间抛出read timeout

.setSocketTimeout(5000)

//连接上服务器(握⼿成功)的时间,超出该时间抛出connect timeout

.setConnectTimeout(5000)

//从连接池中获取连接的超时时间,超过该时间未拿到可⽤连接抛出异常

.setConnectionRequestTimeout(1000).build();

return ().setDefaultRequestConfig(requestConfig).

setConnectionManager(connectionManager).

setConnectionManagerShared(true)

//keep alive

.setKeepAliveStrategy(CE)

.build();

}

}

到这⾥就⼤概知道解决⽅案了:

让弱鸡的过滤服务⽜B⼀点,这有点不太可能的。你懂的……

控制HTTP 请求速度,并添加线程抛出异常时处理⽅法(⾃定义ThreadPoolExecutor,重写afterExecute⽅法)⽽不仅仅是实现UncaughtExceptionHandler,简单地

打印出异常⽇志。

fetch-线程 阻塞的原因是因为:向tw-线程池提交任务,⽽tw-线程池采⽤的饱和策略是,如果把它改成:直接丢弃任务⽽不抛出异常。这样fetch-线

AbortPolicyDiscardPolicy

程就不会收到RejectedExecutionException 异常⽽阻塞了。当然了,采⽤饱和策略的话,fetch-线程提交任务出现异常就⽆法感知了,这时我们还可以⾃定

DiscardPolicy

义饱和策略。如下:可以简单地打印出⼀个⽇志:

import ;

import Factory;

import edExecutionHandler;

import PoolExecutor;

/**

* @author xxx

* @date 2018/11/24

private static final int nThreads = time().availableProcessors();//24

返回的是逻辑cpu的个数。

~$ cat /proc/cpuinfo| grep "processor"| wc -l

24

在⼀台机器上开多少个线程合适?有个公式$$N_{threads}=N_{cpu}U_{cpu}(1+frac{W}{C})$$

W是等待时间、C是使⽤CPU的计算时间。因此,需要估计任务的类型,是计算密集型,还是IO密集型?另外:⼀台物理机上不仅仅是你写的程序在上⾯跑,还有其他⼈写

的程序也在上⾯跑,因此,在使⽤这个公式计算线程数⽬时也要注意到这⼀点。

如果任务之间是异构的且独⽴的,两种不同类型的任务,那么可以使⽤2个线程池来执⾏这些任务。⽐如⼀个线程池执⾏CPU密集型任务,另⼀个线程池执⾏IO密集型任

务。

为什么要⾃定义线程池?

个⼈认为我们在写代码的时候对要处理的任务是有⼀定的了解的,⽐如并发量多⼤?数据量多⼤?根据这些信息就⼤概能知道任务队列定义多长合适,⽽不是采⽤默认的⽆

界阻塞队列。

同时,对任务的特征也有所了解,⽐如是否要调⽤远程HTTP服务?是否写磁盘有IO阻塞?还是只是转换数据、处理数据,另外所部署的服务器的硬件性能咋样?这些都能

作为定义线程个数的⼀些参考。

最后,采⽤⾃定义线程池,在任务执⾏出错了,可能更灵活地控制处理错误,⽐如记录错误⽇志、执⾏任务前以及执⾏任务后的清理操作……

参考资料

JAVA 并发编程实战》

原⽂: