阻塞队列之LinkedBlockingQueue

概述

LinkedBlockingQueue内部由单链表实现,只能从head取元素,从tail添加元素。添加元素和获取元素都有独立的锁,也就是说LinkedBlockingQueue是读写分离的,读写操作可以并行执行。LinkedBlockingQueue采用**可重入锁(ReentrantLock)**来保证在并发情况下的线程安全。

构造器

LinkedBlockingQueue一共有三个构造器,分别是无参构造器、可以指定容量的构造器、可以穿入一个容器的构造器。如果在创建实例的时候调用的是无参构造器,LinkedBlockingQueue的默认容量是Integer.MAX_VALUE,这样做很可能会导致队列还没有满,但是内存却已经满了的情况(内存溢出)。

1
2
3
4
5
public LinkedBlockingQueue();    //设置容量为Integer.MAX

public LinkedBlockingQueue(int capacity); //设置指定容量

public LinkedBlockingQueue(Collection<? extends E> c); //穿入一个容器,如果调用该构造器,容量默认也是Integer.MAX_VALUE

LinkedBlockingQueue常用操作

取数据

  • **take()**:首选。当队列为空时阻塞
  • poll():弹出队顶元素,队列为空时,返回空
  • peek():和poll类似,返回队列队顶元素,但顶元素不弹出。队列为空时返回null
  • remove(Object o):移除某个元素,队列为空时抛出异常。成功移除返回true

添加数据

  • **put()**:首选。队满是阻塞
  • offer():队满时返回false

判断队列是否为空

size()方法会遍历整个队列,时间复杂度为O(n),所以最好选用isEmtpy()

put元素原理

基本过程:

  1. 判断元素是否为null,为null抛出异常
  2. 加锁(可中断锁)
  3. 判断队列长度是否到达容量,如果到达一直等待
  4. 如果没有队满,enqueue()在队尾加入元素
  5. 队列长度加1,此时如果队列还没有满,调用signal唤醒其他阻塞队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void put() throws InterruptedException {
if (e == null) throw new NullPointerException();

int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
}

take元素原理

基本过程:

  1. 加锁(依旧是ReentrantLock),注意这里的锁和写入是不同的两把锁
  2. 判断队列是否为空,如果为空就一直等待
  3. 通过dequeue方法取得数据
  4. 取走元素后队列是否为空,如果不为空唤醒其他等待中的队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}

enqueue()dequeue()方法实现都比较简单,无非就是将元素添加到队尾,从队顶取走元素

LinkedBlockingQueue与LinkedBlockingDeque比较

LinkedBlockingDequeLinkedBlockingQueue的相同点在于:

  1. 基于链表
  2. 容量可选,不设置的话,就是Int的最大值

LinkedBlockingQueue的不同点在于:

  1. 双端链表和单链表
  2. 不存在哨兵节点
  3. 一把锁+两个条件

实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
public class ProducerAndConsumer {
public static void main(String[] args) {
try {
BlockingQueue queue = new LinkedBlockingQueue(5);

ExecutorService executor = Executors.newFixedThreadPool(5);
Produer producer = new Produer(queue);
for (int i = 0; i < 3; i++) {
executor.execute(producer);
}
executor.execute(new Consumer(queue));

executor.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}
}

class Produer implements Runnable {

private BlockingQueue queue;
private int nums = 20; //循环次数

//标记数据编号
private static volatile AtomicInteger count = new AtomicInteger();
private boolean isRunning = true;
public Produer() {}

public Produer(BlockingQueue queue){
this.queue = queue;
}

public void run() {
String data = null;
try {
System.out.println("开始生产数据");
System.out.println("-----------------------");

while (nums > 0) {
nums--;
count.decrementAndGet();

Thread.sleep(500);
System.out.println(Thread.currentThread().getId() + ": 生产者生产了一个数据");
queue.put(count.getAndIncrement());
}
} catch(Exception e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} finally {
System.out.println("生产者线程退出!");
}
}
}

class Consumer implements Runnable {

private BlockingQueue queue;
private int nums = 20;
private boolean isRunning = true;

public Consumer() {}

public Consumer(BlockingQueue queue) {
this.queue = queue;
}

public void run() {
System.out.println("消费者开始消费");
System.out.println("-------------------------");

while (nums > 0) {
nums--;
try {
while (isRunning) {
int data = (Integer) queue.take();
Thread.sleep(500);
System.out.println("消费者消费的数据是" + data);
}

} catch(Exception e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} finally {
System.out.println("消费者线程退出!");
}
}
}
}

效果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
开始生产数据
-----------------------
开始生产数据
-----------------------
开始生产数据
-----------------------
消费者开始消费
-------------------------
12: 生产者生产了一个数据
11: 生产者生产了一个数据
13: 生产者生产了一个数据
12: 生产者生产了一个数据
消费者消费的数据是: -3
11: 生产者生产了一个数据
13: 生产者生产了一个数据
消费者消费的数据是: -3
12: 生产者生产了一个数据
11: 生产者生产了一个数据
13: 生产者生产了一个数据
消费者消费的数据是: -3
11: 生产者生产了一个数据
12: 生产者生产了一个数据
13: 生产者生产了一个数据
消费者消费的数据是: -3
消费者消费的数据是: -3
11: 生产者生产了一个数据
12: 生产者生产了一个数据
消费者消费的数据是: -3
13: 生产者生产了一个数据
消费者消费的数据是: -3
消费者消费的数据是: -3
11: 生产者生产了一个数据
12: 生产者生产了一个数据
消费者消费的数据是: -3
13: 生产者生产了一个数据
消费者消费的数据是: -2
消费者消费的数据是: -2
11 :生产者生产了一个数据
消费者消费的数据是: -2
12 :生产者生产了一个数据
生产者线程退出!
消费者消费的数据是: -2
生产者线程退出!
消费者消费的数据是: -1
生产者线程退出!
消费者消费的数据是: -1
消费者消费的数据是: -2
消费者消费的数据是: -1
消费者消费的数据是: -1
消费者消费的数据是: -1
消费者消费的数据是: -1

Powered by AppBlog.CN     浙ICP备14037229号

Copyright © 2012 - 2021 APP开发技术博客 All Rights Reserved.

访客数 : | 访问量 :