Java线程池

Java线程池

说说进程和线程

进程的引入

多道程序设计允许多个程序进入内存运行,提高CPU利用率和效率,为了区分进入内存的不同程序,引入了“进程”概念。

进程是具有独立功能的程序关于某个数据集合上的一次运行活动。是系统进行资源分配和调度(不支持线程机制时)的独立单位,每个进程分配了独立的地址空间。

线程的引入

进程切换开销大,且进程内部需要并行实现不同功能,如浏览器进程需要在播放视频时下载文件,由于线程间切换通信更快,无需经过内核(因为进程内共享内存和文件),因此引入线程,作为调度的基本单位。

在windows中,线程作为分配CPU的基本单位,如果线程数不大于CPU核数,任务并行运行,大于CPU核数时,任务并发运行。

为什么要使用多线程

提高CPU和IO设备的利用率,在单核上,当一个线程因为IO等原因被阻塞时,可以切换到另一个线程执行任务,不会因为阻塞而浪费时间,在多核上,一个核可以执行一个线程,因此只有多线程才能发挥多核的优势。

进程的生命周期和状态

就绪、运行、阻塞。其中只有就绪和运行可以双向转换。

线程的生命周期和状态

线程池又是什么

如果有一个待执行的任务,就为其新建一个线程去执行它。这种策略叫做“每任务每线程”,这种策略具有很大的缺陷:

  • 线程的生命周期需要开销
  • 线程大于CPU核数时,多余的空闲线程会占用内存
  • 受物理因素限制,无限制创建线程会引起Out Of Memory错误

在面临高负载情况时,以上缺陷都会暴露出来,因此将线程“池化”,作为一种可以调节控制的资源进行使用,能够让使用多线程的程序性能平缓的劣化,而不是程序快速崩溃导致不可用,维护了高可用性。

如何使用线程池

执行策略

  • 任务在什么线程中执行
  • 什么执行顺序
  • 最多多少个任务并发
  • 最多多少个任务等待
  • 系统过载应该放弃哪个任务、如何通知
  • 任务执行前和结束后做哪些处理

工作流程

创建线程池的静态工厂方法

  • newFixedThreadPool

    1
    2
    3
    4
    5
    public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>());
    }

    定长线程池,到达最大长度不再变化,等待队列无限长

  • newCachedThreadPool

    1
    2
    3
    4
    5
    public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    60L, TimeUnit.SECONDS,
    new SynchronousQueue<Runnable>());
    }

    创建可缓存无限制数量的线程池,如果线程中没有空闲线程池的话此时再来任务会新建线程,如果超过60秒此线程无用,那么就会将此线程销毁。简单来说就是忙不来的时候无限制创建临时线程,闲下来再回收

  • newSingleThreadExecutor

    1
    2
    3
    4
    5
    6
    public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
    (new ThreadPoolExecutor(1, 1,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>()));
    }

    线程数量为1,按规定的顺序(LIFO,FIFO,优先级)执行任务

  • ScheduledThreadPoolExecutor

    1
    2
    3
    4
    public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
    new DelayedWorkQueue());
    }

    创建固定大小线程池,支持定时及周期性执行任务。

使用ThreadPoolExecutor

《阿里巴巴Java开发手册》中提到,线程池不允许使用Executors创建,而是通过ThreadPoolExecutor方式,这个方法是Executor框架的底层实现,可以更加明确线程池的运行规则,避免资源耗尽的风险。

  1. FixedThreadPool和SingleThreadPool的队列长度为Integer.MAX_VALUE,可能会导致大量请求堆积导致OOM
  2. CachedThreadPool和ScheduledThreadPool会创建大量线程,导致OOM

ThreadPoolExecutor构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
  • corePoolSize:核心线程数量
  • maximumPoolSize:线程池中运行存在的最大线程数
  • keepAliveTime:存在线程数量大于核心线程数时,超过的此时间的空闲线程会被销毁
  • unit:时间单位
  • workQueue:当前线程池没有线程为任务提供运行服务时,任务的等待队列
  • threadFactory:自定义的线程,实现了threadFactory接口
  • handler:等待队列已经满后,任务的拒绝策略

使用示例

一个使用线程池对Web服务器发起10000次http请求的程序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import httpMethod.HttpRequest;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Created by Yang Xing Luo on 2019/12/20.
*/
public class Test {
public static void main(String[] args) throws InterruptedException, IOException {
int corePoolSize = 5;
int maximumPoolSize = 10;
long keepAliveTime = 10;
TimeUnit unit = TimeUnit.SECONDS;
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(3);
ThreadFactory threadFactory = new NameTreadFactory();
RejectedExecutionHandler handler = new MyIgnorePolicy();
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
workQueue, threadFactory, handler);
executor.prestartAllCoreThreads(); // 预启动所有核心线程

for (int i = 1; i <= 10000; i++) {
MyTask task = new MyTask(String.valueOf(i));
executor.execute(task);
}

System.in.read(); //阻塞主线程
}
public static class MyIgnorePolicy implements RejectedExecutionHandler {

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
doLog(r, e);
}

private void doLog(Runnable r, ThreadPoolExecutor e) {
// 可做日志记录等
System.err.println( r.toString() + " rejected");
// System.out.println("completedTaskCount: " + e.getCompletedTaskCount());
}
}

static class NameTreadFactory implements ThreadFactory {

private final AtomicInteger mThreadNum = new AtomicInteger(1);

@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "my-thread-" + mThreadNum.getAndIncrement());
System.out.println(t.getName() + " has been created");
return t;
}
}

static class MyTask implements Runnable {
private String name;

public MyTask(String name) {
this.name = name;
}

@Override
public void run() {
try {
System.out.println("name:::"+ name + "\n" +HttpRequest.sendGet("http://localhost:8080/user/test?test="+ URLEncoder.encode("中文", "utf-8"),null));
Thread.sleep(3000); //让任务执行慢点
} catch (InterruptedException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}

public String getName() {
return name;
}

@Override
public String toString() {
return "MyTask [name=" + name + "]";
}
}
}

更多细节

参数设置、线程销毁、拒绝策略、线程工厂

参考