手写简易阻塞队列与线程池
Wallace Xu 2020-09-21 Java
阻塞队列和线程池作为并发编程中两个非常重要的组件,其原理需要我们掌握,二者的核心都离不开锁的使用,本次我们就来写一个简易的阻塞队列和一个线程池。
# 阻塞队列
阻塞队列的核心是元素容器、最大容量和锁,底层容器可以使用固定大小的数组与指针来模拟队列,这里就直接使用JDK提供的LinkedList了。JDK中的ArrayBlockingQueue继承了AbstractQueue,提供了所有的队列的方法例如add()/remove()/offer()/poll()等,但此外还提供了专门的阻塞方法put()/take(),我们就只实现这两个方法。
在每次尝试向队列中插入元素时,先上锁,如果发现队列已满,则唤醒其他消费者线程并调用condition.await()进入等待池。直到此线程被唤醒并重新获得锁后发现队列未满,则向队列中插入元素,最后解锁。
线程调用阻塞队列的take()方法时同理。
import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class MyBlockingQueue<E> {
private final LinkedList<E> elements;
private final ReentrantLock mainLock;
private final Condition condition;
private final int capacity;
public MyBlockingQueue(int capacity) {
this.capacity = capacity;
elements = new LinkedList<E>();
mainLock = new ReentrantLock();
condition = mainLock.newCondition();
}
public void put(E element) throws InterruptedException {
try {
mainLock.lock();
while (elements.size() == capacity) {
condition.signalAll();
condition.await();
}
elements.offer(element);
System.out.println("插入元素"+element);
}finally {
mainLock.unlock();
}
}
public E take() throws InterruptedException {
try {
mainLock.lock();
while (elements.size() == 0) {
condition.signalAll();
condition.await();
}
return elements.poll();
}finally {
mainLock.unlock();
}
}
}
//测试,队列大小为5,一个生产者向队列中写入20个数据,同时4个消费者线程从队列中取数据
class TestQueue{
private MyBlockingQueue<Integer> bq = null;
private void test() {
bq = new MyBlockingQueue<>(5);
Runnable producer = ()->{
for (int i = 0; i < 20; i++) {
try {
bq.put(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
Runnable consumer = ()->{
while (true) {
try {
System.out.println(Thread.currentThread().getName()+"消费了:"+bq.take());
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
new Thread(producer).start();
for (int i = 0; i < 4; i++) {
new Thread(consumer).start();
}
}
public static void main(String[] args) {
TestQueue testQueue = new TestQueue();
testQueue.test();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# 线程池
ThreadPoolExecutor有几个关键的参数:corePoolSize、blockingQueue、maximumPoolSize、timeOut、rejectPolicy。我们实现的线程池暂不考虑maximumPoolSize。
提交任务时,首先获取锁,如果已经创建的工作线程的数量未达到corePoolSize,那么创建新的线程来运行该任务,否则尝试将任务放入阻塞队列中,如果因为阻塞队列已满而放入失败则拒绝该任务。最后释放锁。
值得注意的是,为了使得已经创建的线程在执行完任务后不销毁而是尝试从任务队列中取任务,我们需要继承Thread类并封装任务,在重写的run()方法中控制线程获取任务的逻辑。
import java.util.ArrayList;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.locks.ReentrantLock;
public class MyThreadPool {
private final ArrayBlockingQueue<Runnable> taskQueue;
private final ArrayList<MyThread> threads;
private final int corePoolSize;
private int currentWorkingThreads;
private final ReentrantLock mainLock;
private class MyThread extends Thread {
private Runnable task;
public MyThread(Runnable task) {
this.task = task;
}
@Override
public void run() {
while (true) {
if (task != null) {
task.run();
task = null;
} else {
try {
task = taskQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
public MyThreadPool(int corePoolSize) {
this.corePoolSize = corePoolSize;
currentWorkingThreads = 0;
threads = new ArrayList<>(corePoolSize);
taskQueue = new ArrayBlockingQueue<>(corePoolSize * 4);
mainLock = new ReentrantLock();
}
public void execute(Runnable task) {
try {
mainLock.lock();
if (currentWorkingThreads < corePoolSize) {
MyThread myThread = new MyThread(task);
myThread.start();
threads.add(myThread);
currentWorkingThreads++;
} else {
if (!taskQueue.offer(task))
rejectTask(task);
}
}finally {
mainLock.unlock();
}
}
private void rejectTask(Runnable task) {
System.out.println("Task Queue is Full, REJECTED!");
}
}
//测试,尝试向最大核心线程数为4的线程池提交30个任务
class TestPool {
public static void main(String[] args) {
MyThreadPool myThreadPool = new MyThreadPool(4);
Runnable taskTemplate = () -> {
System.out.println(Thread.currentThread().getName() + "执行中");
try {
//单个任务执行中睡眠1秒,致使任务队列被填满
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
};
for (int i = 0; i < 30; i++) {
myThreadPool.execute(taskTemplate);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86