Skip to content

Commit

Permalink
Updated the FlowEventQueue to be easier to understand (#299)
Browse files Browse the repository at this point in the history
  • Loading branch information
DanteNiewenhuis authored Jan 31, 2025
1 parent 7066dbc commit f471d06
Showing 1 changed file with 139 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@

/**
* A specialized priority queue for future event of {@link FlowNode}s sorted on time.
* The queue is based on a min heap binary tree (https://www.digitalocean.com/community/tutorials/min-heap-binary-tree)
* The nodes keep a timerIndex which indicates their placement in the tree.
* The timerIndex is -1 when a node is not in the tree.
*
* <p>
* By using a specialized priority queue, we reduce the overhead caused by the default priority queue implementation
* being generic.
Expand All @@ -52,20 +56,24 @@ public FlowEventQueue(int initialCapacity) {
}

/**
* Enqueue a timer for the specified context or update the existing timer.
* Enqueue a timer for the specified node or update the existing timer.
*
* When Long.MAX_VALUE is given as a deadline, the node is removed from the queue
* @param node node to queue
*/
public void enqueue(FlowNode node) {
FlowNode[] es = queue;
int k = node.getTimerIndex();
// The timerIndex indicates whether a node is already in the queue
int timerIndex = node.getTimerIndex();

if (node.getDeadline() != Long.MAX_VALUE) {
if (k >= 0) {
update(es, node, k);
if (timerIndex >= 0) {
update(this.queue, node, timerIndex);
} else {
add(es, node);
add(this.queue, node);
}
} else if (k >= 0) {
delete(es, k);
} else if (timerIndex >= 0) {
delete(this.queue, timerIndex);
node.setTimerIndex(-1);
}
}

Expand All @@ -80,22 +88,23 @@ public FlowNode poll(long now) {
return null;
}

final FlowNode[] es = queue;
final FlowNode head = es[0];
final FlowNode head = this.queue[0];

if (now < head.getDeadline()) {
return null;
}

int n = size - 1;
this.size = n;
final FlowNode next = es[n];
es[n] = null; // Clear the last element of the queue
// Move the last element of the queue to the front
this.size--;
final FlowNode next = this.queue[this.size];
this.queue[this.size] = null; // Clear the last element of the queue

if (n > 0) {
siftDown(0, next, es, n);
// Sift down the new head.
if (this.size > 0) {
siftDown(0, next, this.queue, this.size);
}

// Set the index of the head to -1 indicating it is not scheduled anymore
head.setTimerIndex(-1);
return head;
}
Expand All @@ -115,46 +124,58 @@ public long peekDeadline() {
* Add a new entry to the queue.
*/
private void add(FlowNode[] es, FlowNode node) {
if (this.size >= es.length) {
if (this.size >= this.queue.length) {
// Re-fetch the resized array
es = grow();
this.grow();
}

siftUp(this.size, node, es);
siftUp(this.size, node, this.queue);

this.size++;
}

/**
* Update the deadline of an existing entry in the queue.
*/
private void update(FlowNode[] es, FlowNode node, int k) {
if (k > 0) {
int parent = (k - 1) >>> 1;
if (es[parent].getDeadline() > node.getDeadline()) {
siftUp(k, node, es);
private void update(FlowNode[] eventList, FlowNode node, int timerIndex) {
if (timerIndex > 0) {
int parentIndex = (timerIndex - 1) >>> 1;
if (eventList[parentIndex].getDeadline() > node.getDeadline()) {
siftUp(timerIndex, node, eventList);
return;
}
}

siftDown(k, node, es, this.size);
siftDown(timerIndex, node, eventList, this.size);
}

/**
* Deadline an entry from the queue.
* The move a node from the queue
*
* @param eventList all scheduled events
* @param timerIndex the index of the node to remove
*/
private void delete(FlowNode[] es, int k) {
int s = --this.size;
if (s == k) {
es[k] = null; // Element is last in the queue
} else {
FlowNode moved = es[s];
es[s] = null;

siftDown(k, moved, es, s);

if (es[k] == moved) {
siftUp(k, moved, es);
private void delete(FlowNode[] eventList, int timerIndex) {
this.size--;

// If the element is the last element, simply remove it
if (timerIndex == this.size) {
eventList[timerIndex] = null;
}

// Else, swap the node to remove with the last node and sift it up or down to get the moved node in the correct
// position.
else {

// swap the node with the last element
FlowNode moved = eventList[this.size];
eventList[this.size] = null;

siftDown(timerIndex, moved, eventList, this.size);

// SiftUp, if siftDown did not move the node
if (eventList[timerIndex] == moved) {
siftUp(timerIndex, moved, eventList);
}
}
}
Expand All @@ -172,35 +193,92 @@ private FlowNode[] grow() {
return queue;
}

private static void siftUp(int k, FlowNode key, FlowNode[] es) {
while (k > 0) {
int parent = (k - 1) >>> 1;
FlowNode e = es[parent];
if (key.getDeadline() >= e.getDeadline()) break;
es[k] = e;
e.setTimerIndex(k);
k = parent;
/**
* Iteratively swap the node at the given timerIndex with its parents until the node is at the correct
* position in the binary tree (ie, both children of the node are bigger than the node itself).
*
* @param timerIndex the index of the node that needs to be sifted up
* @param node the node that needs to be sifted up
* @param eventList the list of nodes that are scheduled
*/
private static void siftUp(int timerIndex, FlowNode node, FlowNode[] eventList) {
while (timerIndex > 0) {
// Find parent Node
int parentIndex = (timerIndex - 1) >>> 1;
FlowNode parentNode = eventList[parentIndex];

// Break if the deadline of the node is bigger than the parent node
if (node.getDeadline() >= parentNode.getDeadline()) break;

// Otherwise, swap node with the parentNode
eventList[timerIndex] = parentNode;
parentNode.setTimerIndex(timerIndex);
timerIndex = parentIndex;
}
es[k] = key;
key.setTimerIndex(k);

eventList[timerIndex] = node;
node.setTimerIndex(timerIndex);
}

private static void siftDown(int k, FlowNode key, FlowNode[] es, int n) {
int half = n >>> 1; // loop while a non-leaf
while (k < half) {
int child = (k << 1) + 1; // assume left child is least
FlowNode c = es[child];
int right = child + 1;
if (right < n && c.getDeadline() > es[right].getDeadline()) c = es[child = right];
/**
* Iteratively swap the node at the given timerIndex with its smallest child until the node is at the correct
* position in the binary tree (ie, both children of the node are bigger than the node itself).
*
* @param timerIndex the index of the node that needs to be sifted down
* @param node the node that needs to be sifted down
* @param eventList the list of nodes that are scheduled
* @param queueSize the current size of the queue
*/
private static void siftDown(int timerIndex, FlowNode node, FlowNode[] eventList, int queueSize) {
int half = queueSize >>> 1; // loop while a non-leaf
while (timerIndex < half) {

// Get the index of the smallest child
int smallestChildIndex = getSmallestChildIndex(timerIndex, eventList, queueSize);

if (key.getDeadline() <= c.getDeadline()) break;
// Get the smallest child
FlowNode smallestChildNode = eventList[smallestChildIndex];

es[k] = c;
c.setTimerIndex(k);
k = child;
// If the node is smaller than the smallest child, break
if (node.getDeadline() <= smallestChildNode.getDeadline()) break;

// Otherwise, swap the node with its smallest child
eventList[timerIndex] = smallestChildNode;
smallestChildNode.setTimerIndex(timerIndex);
timerIndex = smallestChildIndex;
}

es[k] = key;
key.setTimerIndex(k);
eventList[timerIndex] = node;
node.setTimerIndex(timerIndex);
}

/**
* Return the index of the child with the smallest deadline time of the node at the given timerIndex
*
* @param timerIndex the index of the parent node
* @param eventList the list of all scheduled events
* @param queueSize the current size of the queue
* @return the timerIndex of the smallest child
*/
private static int getSmallestChildIndex(int timerIndex, FlowNode[] eventList, int queueSize) {
// Calculate the index of the left child
int leftChildIndex = (timerIndex << 1) + 1;

// If the left child is at the end of the queue, there is no right child.
// Thus, the left child is always the smallest
if (leftChildIndex + 1 >= queueSize) return leftChildIndex;

FlowNode leftChildNode = eventList[leftChildIndex];

// Get right child
int rightChildIndex = leftChildIndex + 1;
FlowNode rightChildNode = eventList[rightChildIndex];

// If the rightChild is smaller, return its index
// otherwise, return the index of the left child
if (rightChildNode.getDeadline() < leftChildNode.getDeadline()) {
return rightChildIndex;
}
return leftChildIndex;
}
}

0 comments on commit f471d06

Please sign in to comment.