在Apahce IoTDB中,查询最后需要根据时间戳列做join,而这一步操作是通过一个带有自动去重功能的优先队列实现的。之前的实现中,我们采用了Java自带的TreeSet
,但是发现了如下的问题之后,自己实现了一个高效的去重优先队列。
Java语言库中自带了PriorityQueue
作为默认优先队列的实现,为了代码的重用,Java也采用泛型编程的方式实现了PriorityQueue
。但是与其他语言不同的是,Java的泛型编程只适用于类对象,对其基础类型却不适用。比如对于long
类型而言,Java自带的PriorityQueue<Long>
必须将其先装箱成Long
对象,再进行add
,poll
时也需要对应的拆箱操作。这种装箱与拆箱操作不仅浪费了cpu时间,也浪费了内存空间,一个long
型整数占8字节内存,但是一个Long
对象至少还要包含4字节的对象头。
当我们还需要在优先队列的基础上增加去重的功能,也就是不希望堆中出现重复元素,在Java语言库中我们只能找到TreeSet
去实现这个功能,但是TreeSet
是用红黑树实现的,其中为了保持树平衡以及数据的全序性的复杂旋转操作,其实我们是不需要的。与PriorityQueue
一样,Java中的TreeSet
同样以泛型编程的方式实现,存在装箱与拆箱的overhead。
在Apache IoTDB中实现了一个TimeSelector
的工具类,在server/src/main/java/org/apache/iotdb/db/utils/datastructure
包下,具体实现方式其实就是内置了一个long[]
,因为时间戳都是long
类型的,可以根据具体场景,将long[]
替换成其他原始数据类型的数组。
在percolateUp
和percolateDown
中,加上了去重的检查,如果当前元素等于需要插入的元素,就直接return
,不继续进行下面的操作,这样就达到了去重的效果。
Talk is easy, show me the code:
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.apache.iotdb.db.utils.datastructure; import java.util.Arrays; public class TimeSelector { private static final int MIN_DEFAULT_CAPACITY = 8; private final boolean ascending; private long[] timeHeap; private int heapSize; private long lastTime; public TimeSelector(int defaultCapacity, boolean isAscending) { this.ascending = isAscending; timeHeap = new long[Math.max(defaultCapacity, MIN_DEFAULT_CAPACITY)]; heapSize = 0; lastTime = Long.MIN_VALUE; } public boolean isEmpty() { while (heapSize != 0 && timeHeap[0] == lastTime) { timeHeap[0] = timeHeap[heapSize - 1]; percolateDown(0, timeHeap[0]); --heapSize; } return heapSize == 0; } public void add(long time) { if (heapSize == 0) { timeHeap[0] = time; } if (percolateUp(heapSize, time)) { ++heapSize; checkExpansion(); } } public long pollFirst() { long minTime = lastTime; while (minTime == lastTime) { minTime = timeHeap[0]; timeHeap[0] = timeHeap[heapSize - 1]; percolateDown(0, timeHeap[0]); --heapSize; } lastTime = minTime; return minTime; } private void checkExpansion() { if (heapSize == timeHeap.length) { timeHeap = Arrays.copyOf(timeHeap, timeHeap.length << 1); } } private boolean percolateUp(int index, long element) { if (index == 0) { return true; } int parentIndex = (index - 1) >>> 1; long parent = timeHeap[parentIndex]; if (parent == element) { return false; } else if (ascending ? element < parent : parent < element) { timeHeap[index] = parent; timeHeap[parentIndex] = element; boolean isSuccessful = percolateUp(parentIndex, element); if (!isSuccessful) { timeHeap[index] = element; timeHeap[parentIndex] = parent; } return isSuccessful; } else { // ascending ? parent < element : element < parent timeHeap[index] = element; return true; } } private void percolateDown(int index, long element) { if (index == heapSize - 1) { return; } int childIndex = getSmallerChildIndex(index); if (childIndex != -1) { long child = timeHeap[childIndex]; if (ascending ? child < element : element < child) { timeHeap[childIndex] = element; timeHeap[index] = child; percolateDown(childIndex, element); } } } /** * Calculates the children indexes for a given index and checks to see which one is smaller and * returns the index. * * @param index the given index * @return index of a smaller child or -1 if no children */ private int getSmallerChildIndex(int index) { final int leftChildIndex = (index << 1) + 1; final int rightChildIndex = (index << 1) + 2; int smallerChildIndex; if (heapSize <= leftChildIndex) { smallerChildIndex = -1; } else if (heapSize <= rightChildIndex) { smallerChildIndex = leftChildIndex; } else { if (ascending) { smallerChildIndex = timeHeap[leftChildIndex] < timeHeap[rightChildIndex] ? leftChildIndex : rightChildIndex; } else { smallerChildIndex = timeHeap[leftChildIndex] < timeHeap[rightChildIndex] ? rightChildIndex : leftChildIndex; } } return smallerChildIndex; } @Override public String toString() { return Arrays.toString(this.timeHeap); } }
我们将PriorityQueue<Long>
, TreeSet<Long>
以及TimeSelector
的性能进行对比,插入10,000,000个long
型随机数,然后在取出所有数相加
https://github.com/JackieTien97/TimeSelector
package org.apache.iotdb.experiment; import java.util.PriorityQueue; import java.util.Random; import java.util.TreeSet; public class Main { private static final int TOTAL_COUNT = 10_000_000; private static final long SEED = 123456; public static void main(String[] args) { // Prior version long startTime = System.nanoTime(); Random random = new Random(SEED); PriorityQueue<Long> priorityQueue = new PriorityQueue<>(TOTAL_COUNT); for (int i = 0; i < TOTAL_COUNT; i++) { priorityQueue.add(random.nextLong()); } long sum = 0; while (!priorityQueue.isEmpty()) { sum += priorityQueue.poll(); } System.out.println("Sum is: " + sum); System.out.println("PriorityQueue cost " + (System.nanoTime() - startTime) / 1_000_000 + "ms."); // TreeSet version startTime = System.nanoTime(); random = new Random(SEED); TreeSet<Long> treeSet = new TreeSet<>(); for (int i = 0; i < TOTAL_COUNT; i++) { treeSet.add(random.nextLong()); } sum = 0; while (!treeSet.isEmpty()) { sum += treeSet.pollFirst(); } System.out.println("Sum is: " + sum); System.out.println("TreeSet cost " + (System.nanoTime() - startTime) / 1_000_000 + "ms."); // iotdb TimeSelector version startTime = System.nanoTime(); random = new Random(SEED); TimeSelector selector = new TimeSelector(TOTAL_COUNT, true); for (int i = 0; i < TOTAL_COUNT; i++) { selector.add(random.nextLong()); } sum = 0; while (!selector.isEmpty()) { sum += selector.pollFirst(); } System.out.println("Sum is: " + sum); System.out.println("TimeSelector cost " + (System.nanoTime() - startTime) / 1_000_000 + "ms."); } }
PriorityQueue | TreeSet | TimeSelector | |
---|---|---|---|
耗时(ms) | 9,727 | 11,105 | 7,350 |
可以看到PriorityQueue<Long>
比TimeSelector
慢了32%,TreeSet<Long>
比TimeSelector
慢了51%.