本文主要是介绍【并发编程】支持按优先级排序的无界阻塞队列PriorityBlockingQueue,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
什么是PriorityBlockingQueue
- PriorityBlockingQueue是一个无界的基于数组的优先级阻塞队列。
- 数组的默认长度是11,虽然指定了数组的长度,但是可以无限的扩充,直到资源消耗尽为止。
- 每次出队都返回优先级别最高的或者最低的元素。
- 默认情况下元素采用自然顺序升序排序,当然我们也可以通过构造函数来指定Comparator来对元素进行排序。
- PriorityBlockingQueue不能保证同优先级元素的顺序。
PriorityBlockingQueue的使用场景
- 抢购活动,会员级别高的用户优先抢购到商品
- 银行办理业务,vip客户插队
PriorityBlockingQueue的特点
- 优先级高的先出队,优先级低的后出队。
- 使用的数据结构是数组+二叉堆:默认容量11,可指定初始容量,会自动扩容,最大容量是(Integer.MAX_VALUE - 8)
- 使用的锁是是ReentrantLock,存取是同一把锁
PriorityBlockingQueue的入队出队逻辑
- 入队:无界队列不阻塞。根据比较器进行堆化(排序)自下而上。传入比较器对象就按照比较器的顺序排序,如果比较器为 null,则按照自然顺序排序。
- 出队:阻塞对象NotEmpty,队列为空时阻塞。优先级最高的元素在堆顶(弹出堆顶元素)。弹出前比较两个子节点再进行堆化(自上而下)。
二叉堆与数组
- 完全二叉树:除了最后一行,其他行都满的二叉树,而且最后一行所有叶子节点都从左向右开始排序。
- 二叉堆:完全二叉树的基础上,加以一定的条件约束的一种特殊的二叉树。
- 大顶堆(最大堆):父结点的键值总是大于或等于任何一个子节点的键值。
- 小顶堆(最小堆):父结点的键值总是小于或等于任何一个子节点的键值。
- 二叉堆与数组的转换规则
// 当前节点为i
int i;
// 当前节点的父节点计算
int parent = (i -1 )/ 2 ;
// 左面的子节点计算
int leftChild = 2 * i + 1;
// 右面的子节点计算
int rightChild = 2 * i + 2;
PriorityBlockingQueue的使用方式
// 创建优先级阻塞队列 Comparator为null,自然排序
PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<Integer>(5);
// 自定义Comparator
PriorityBlockingQueue<Integer> queue1 = new PriorityBlockingQueue<Integer>(5, new Comparator<Integer>() {
@Override
public int compare(Integer o1, Integer o2) {
return o2 - o1;
}
});
PriorityBlockingQueue的构造方法源码分析
/**
* 无参构造,直接调用俩个参数的构造
*/
public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}
/**
* 一个参数的构造,直接调用俩个参数的构造
* initialCapacity:数组的大小
*/
public PriorityBlockingQueue(int initialCapacity) {
this(initialCapacity, null);
}
/**
* 俩个参数的构造
* initialCapacity:数组的大小
* comparator:排序规则
*/
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
// 数组长度小于0,抛异常
if (initialCapacity < 1)
throw new IllegalArgumentException();
// 初始化锁
this.lock = new ReentrantLock();
// 初始化无数据等待的条件队列
this.notEmpty = lock.newCondition();
// 设置排序规则
this.comparator = comparator;
// 初始化数组
this.queue = new Object[initialCapacity];
}
/**
* 需要初始化数组的构造方法
*/
public PriorityBlockingQueue(Collection<? extends E> c) {
// 初始化锁
this.lock = new ReentrantLock();
// 初始化无数据等待的条件队列
this.notEmpty = lock.newCondition();
// 一个标记:如果不知道堆的顺序,则为true
boolean heapify = true; // true if not known to be in heap order
// 一个标记:如果必须筛选空值,则为true
boolean screen = true; // true if must screen for nulls
// 判断集合是否实现了SortedSet(一个可自动为元素排序的集合)
if (c instanceof SortedSet<?>) {
// 数据强转
SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
// 设置排序规则
this.comparator = (Comparator<? super E>) ss.comparator();
// 标记已经知道如何排序了
heapify = false;
}
// 传入的集合是当前这种类型
else if (c instanceof PriorityBlockingQueue<?>) {
// 强转!
PriorityBlockingQueue<? extends E> pq =
(PriorityBlockingQueue<? extends E>) c;
// 设置排序规则
this.comparator = (Comparator<? super E>) pq.comparator();
// 不筛选空的值:由于PriorityBlockingQueue本身不容许空的值,所以传入的集合中肯定没有空值
screen = false;
// 如果类型相同:说明元素已经是有序的
if (pq.getClass() == PriorityBlockingQueue.class) // exact match
heapify = false;
}
// 元素转数组
Object[] a = c.toArray();
// 得到数组的长度
int n = a.length;
// If c.toArray incorrectly doesn't return Object[], copy it.
// 如果集合c转化为数组不是Object数组,就将其转化为Object数组
if (a.getClass() != Object[].class)
a = Arrays.copyOf(a, n, Object[].class);
// 数组中可能有空值,进行筛选。
// 数组只有一个元素的时候(不存在顺序),或者排序器存在的时候才会进行筛选
if (screen && (n == 1 || this.comparator != null)) {
// 原理数组中的每个元素
for (int i = 0; i < n; ++i)
// 元素为null,抛异常!
if (a[i] == null)
throw new NullPointerException();
}
// 将数组赋值到PriorityBlockingQueue的实际数组中
this.queue = a;
// 赋值数组长度
this.size = n;
// 堆的顺序可能不对的时候,进行数组建堆
if (heapify)
// 数组建堆:基于堆的排序算法
heapify();
}
PriorityBlockingQueue的入队方法源码分析
/**
* PriorityBlockingQueue的入队方法
*/
public void put(E e) {
// 不需要阻塞,直接调用offer方法
offer(e); // never need to block
}
/**
* 有返回值的入队方法
*/
public boolean offer(E e) {
// 入队元素为null,抛异常
if (e == null)
throw new NullPointerException();
// 得到当前队列的锁
final ReentrantLock lock = this.lock;
// 加锁
lock.lock();
// 定义俩个变量:n是size的长度 cap是数组的长度
int n, cap;
// 定义临时一个数组
Object[] array;
// size的长度大于数组的长度
while ((n = size) >= (cap = (array = queue).length))
// 尝试扩容
tryGrow(array, cap);
try {
// 得到排序器
Comparator<? super E> cmp = comparator;
// 将当前节点放在应该放的二叉堆上的位置。这里进行了排序!
if (cmp == null)
siftUpComparable(n, e, array);
else
siftUpUsingComparator(n, e, array, cmp);
// 长度加一
size = n + 1;
// 消费者线程条件队列转同步队列
notEmpty.signal();
} finally {
// 唤醒真正的消费者线程
lock.unlock();
}
// 返回入队成功
return true;
}
/**
* 扩容方法
*/
private void tryGrow(Object[] array, int oldCap) {
// 释放锁:防止阻塞出队的操作
lock.unlock(); // must release and then re-acquire main lock
// 定义新的数组
Object[] newArray = null;
// 通过CAS的方式去获取锁,执行下面的代码
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {
try {
// 新的长度:长度小于64每次+2,大于64每次变为1.5倍
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
// 扩容后超过最大容量
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
// 旧的长度加一
int minCap = oldCap + 1;
// 旧的长度加一就超过了最大的长度,抛异常!
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
// 直接设置为最大的长度
newCap = MAX_ARRAY_SIZE;
}
// 新的长度大于旧的长度并且没有其他线程改原有的长度
if (newCap > oldCap && queue == array)
// 新数组赋值
newArray = new Object[newCap];
} finally {
// 扩容完成,交给其他线程处理
allocationSpinLock = 0;
}
}
// 另一个线程正在分配,释放时间片
if (newArray == null) // back off if another thread is allocating
Thread.yield();
// 加锁
lock.lock();
// 新数组不为空(获取过扩容机制)并且数组没有改变
if (newArray != null && queue == array) {
// 新数组直接执行
queue = newArray;
// 复制数据
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
PriorityBlockingQueue的出队方法源码分析
/**
* PriorityBlockingQueue的出队方法
*/
public E take() throws InterruptedException {
// 获取当前队列的锁对象
final ReentrantLock lock = this.lock;
// 获取锁操作:优先响应中断
lock.lockInterruptibly();
// 定义结果变量
E result;
try {
// 出队的数据为null,进行等待
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
// 释放锁,唤醒下一个线程
lock.unlock();
}
// 返回出队的元素
return result;
}
/**
* 出队
*/
private E dequeue() {
// 长度减一
int n = size - 1;
// 长度小于0,直接返回null,说明没有元素了
if (n < 0)
return null;
else {
// 定义新的数组
Object[] array = queue;
// 得到第一个元素(二叉堆的最顶级节点)
E result = (E) array[0];
// 得到最后一个节点
E x = (E) array[n];
// 最后一个节点变为null
array[n] = null;
// 得到排序器
Comparator<? super E> cmp = comparator;
// 将最后一个节点放在应该放的二叉堆上的位置。线放到头结点,然后进行排序
if (cmp == null)
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
// 长度变化
size = n;
// 返回出队的具体元素
return result;
}
}
结束语
- 获取更多本文的前置知识文章,以及新的有价值的文章,让我们一起成为架构师!
- 关注公众号,可以让你对MySQL有非常深入的了解
- 关注公众号,每天持续高效的了解并发编程!
- 关注公众号,后续持续高效的了解spring源码!
- 这个公众号,无广告!!!每日更新!!!
这篇关于【并发编程】支持按优先级排序的无界阻塞队列PriorityBlockingQueue的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!