ThreadPool.java
4.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package com.skua.tool.pool;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.skua.tool.callback.IThreadPoolCallback;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.concurrent.*;
/**
* @author sonin
* @date 2021/12/6 8:28
*/
public class ThreadPool {
private static final Log log = LogFactory.getLog(ThreadPool.class);
private String threadPoolName;
private Boolean daemon;
private Integer priority;
private Thread.UncaughtExceptionHandler uncaughtExceptionHandler;
private Integer corePoolSize;
private Integer maximumPoolSize;
private Long keepAliveTime;
private TimeUnit timeUnit;
private Integer queueCapacity;
private RejectedExecutionHandler rejectedExecutionHandler;
ThreadPool() {
threadPoolName = "default-" + System.currentTimeMillis();
daemon = false;
priority = 5;
uncaughtExceptionHandler = (thread, exception) -> {
log.error(thread.getName() + " => " + exception.getMessage());
};
corePoolSize = Runtime.getRuntime().availableProcessors();
maximumPoolSize = 2 * Runtime.getRuntime().availableProcessors();
keepAliveTime = 0L;
timeUnit = TimeUnit.MICROSECONDS;
queueCapacity = 10 * Runtime.getRuntime().availableProcessors();
rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();
}
public ThreadPool threadPoolName(String threadPoolName) {
this.threadPoolName = threadPoolName;
return this;
}
public ThreadPool daemon(Boolean daemon) {
this.daemon = daemon;
return this;
}
public ThreadPool priority(Integer priority) {
this.priority = priority;
return this;
}
public ThreadPool uncaughtExceptionHandler(Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
this.uncaughtExceptionHandler = uncaughtExceptionHandler;
return this;
}
public ThreadPool corePoolSize(Integer corePoolSize) {
this.corePoolSize = corePoolSize;
if (this.maximumPoolSize < corePoolSize) {
this.maximumPoolSize = corePoolSize;
}
return this;
}
public ThreadPool maximumPoolSize(Integer maximumPoolSize) {
this.maximumPoolSize = maximumPoolSize;
if (this.corePoolSize > maximumPoolSize) {
this.corePoolSize = maximumPoolSize;
}
return this;
}
public ThreadPool keepAliveTime(Long keepAliveTime) {
this.keepAliveTime = keepAliveTime;
return this;
}
public ThreadPool timeUnit(TimeUnit timeUnit) {
this.timeUnit = timeUnit;
return this;
}
public ThreadPool queueCapacity(Integer queueCapacity) {
this.queueCapacity = queueCapacity;
return this;
}
public ThreadPool rejectedExecutionHandler(RejectedExecutionHandler rejectedExecutionHandler) {
this.rejectedExecutionHandler = rejectedExecutionHandler;
return this;
}
public ThreadPoolExecutor build() {
String threadPoolName = this.threadPoolName;
Boolean daemon = this.daemon;
Integer priority = this.priority;
Thread.UncaughtExceptionHandler uncaughtExceptionHandler = this.uncaughtExceptionHandler;
Integer corePoolSize = this.corePoolSize;
Integer maximumPoolSize = this.maximumPoolSize;
Long keepAliveTime = this.keepAliveTime;
TimeUnit timeUnit = this.timeUnit;
Integer queueCapacity = this.queueCapacity;
RejectedExecutionHandler rejectedExecutionHandler = this.rejectedExecutionHandler;
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("pool-" + threadPoolName + "-%d").setDaemon(daemon).setPriority(priority).setUncaughtExceptionHandler(uncaughtExceptionHandler).build();
return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, new LinkedBlockingQueue<>(queueCapacity), threadFactory, rejectedExecutionHandler);
}
public void execute(int loopSize, IThreadPoolCallback iThreadPoolCallback) throws Exception {
ThreadPoolExecutor threadPoolExecutor = this.build();
CountDownLatch countDownLatch = new CountDownLatch(loopSize);
for (int i = 0; i < loopSize; i++) {
int finalI = i;
threadPoolExecutor.execute(() -> {
try {
iThreadPoolCallback.doThreadPool(finalI);
} catch (Exception e) {
e.printStackTrace();
log.error(this.threadPoolName + " error: " + e.getMessage());
} finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await();
threadPoolExecutor.shutdown();
}
}