Skip to content

Commit

Permalink
Massage away 2-way PIFOs (#2234)
Browse files Browse the repository at this point in the history
This PR makes progress towards #2226 by updating variable names and
comments to account for the fact that round robin PIFOs can make n-way
trees. Additionally, it implements a PIFO tree and a more complicated
tree (`complicated_tree.py`) using round robin and strict queues. This
accounts for the first three checkboxes of #2226. The docs have been
updated to reflect the new PIFO style in #2223. To run the relevant
tests,
```
runt -i "queues"
```

(For sdn.py, I removed the stats component stuff because I thought it
was mentioned that this wasn't important and so the new style PIFOs are
not equipped to do stats. If it needs to be there, let me know.)

---------

Co-authored-by: Anshuman Mohan <[email protected]>
  • Loading branch information
csziklai and anshumanmohan authored Aug 9, 2024
1 parent e283961 commit d7b609d
Show file tree
Hide file tree
Showing 13 changed files with 240,260 additions and 67 deletions.
33 changes: 33 additions & 0 deletions calyx-py/calyx/complex_tree_oracle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# For usage, see gen_queue_data_expect.sh

import sys
import calyx.queues as queues
from calyx import queue_util


if __name__ == "__main__":
max_cmds, len = int(sys.argv[1]), int(sys.argv[2])
keepgoing = "--keepgoing" in sys.argv
commands, values, _, _ = queue_util.parse_json()

# Our complex PIFO is a tree of queues. It has the shape
# rr(strict(A, B, C), rr(D, E, F), strict(G, H)).

subqueues3 = [queues.Fifo(len) for _ in range(3)]
# a second subqueue copy is required, we cannot pass the same subqueue across more than one function call
subqueues3s = [queues.Fifo(len) for _ in range(3)]
subqueues2 = [queues.Fifo(len) for _ in range(2)]

pifo = queues.RRQueue(
3,
[133, 266, 400],
[
queues.StrictPifo(3, [44, 88, 133], [0, 1, 2], subqueues3s, len),
queues.RRQueue(3, [177, 221, 266], subqueues3, len),
queues.StrictPifo(2, [333, 400], [0, 1], subqueues2, len),
],
len,
)

ans = queues.operate_queue(pifo, max_cmds, commands, values, keepgoing=keepgoing)
queue_util.dump_json(commands, values, ans)
2 changes: 1 addition & 1 deletion calyx-py/calyx/gen_queue_data_expect.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ cat ../test/correctness/queues/sdn.data | python3 pifo_tree_oracle.py $num_cmds
# - pifo_oracle.py
# - pifo_tree_oracle.py

for queue_kind in fifo pifo pifo_tree; do
for queue_kind in fifo pifo pifo_tree complex_tree; do
python3 queue_data_gen.py $num_cmds > ../test/correctness/queues/$queue_kind.data
[[ "$queue_kind" != "pifo_tree" ]] && cp ../test/correctness/queues/$queue_kind.data ../test/correctness/queues/binheap/$queue_kind.data
cat ../test/correctness/queues/$queue_kind.data | python3 ${queue_kind}_oracle.py $num_cmds $queue_size --keepgoing > ../test/correctness/queues/$queue_kind.expect
Expand Down
45 changes: 36 additions & 9 deletions calyx-py/calyx/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,8 @@ class RRQueue:
client can divide the incoming traffic into `n` flows.
For example, if n = 3 and the client passes boundaries [133, 266, 400],
packets will be divided into three flows: [0, 133], [134, 266], [267, 400].
The argument `subqueues` is a list of subqueues, which can be any type of
queue from this module.
- At push, we check the `boundaries` list to determine which flow to push to.
Take the boundaries example given earlier, [133, 266, 400].
Expand All @@ -507,14 +509,18 @@ class RRQueue:
- Pop first tries to pop from `hot`. If this succeeds, great. If it fails,
it increments `hot` and therefore continues to check all other flows
in round robin fashion.
- Peek allows the client to see which element is at the head of the queue
without removing it. Thus, peek works in a similar fashion to `pop`, except
`hot` is restored to its original value at the every end.
Further, nothing is actually dequeued.
"""

def __init__(self, n, boundaries, max_len: int):
def __init__(self, n, boundaries, subqueues, max_len: int):
self.hot = 0
self.n = n
self.pifo_len = 0
self.boundaries = boundaries
self.data = [Fifo(max_len) for _ in range(n)]
self.data = subqueues

self.max_len = max_len
assert (
Expand All @@ -525,9 +531,9 @@ def push(self, val: int, *_):
"""Pushes `val` to the PIFO."""
if self.pifo_len == self.max_len:
raise QueueError("Cannot push to full PIFO.")
for fifo, boundary in zip(self.data, self.boundaries):
for subqueue, boundary in zip(self.data, self.boundaries):
if val <= boundary:
fifo.push(val)
subqueue.push(val)
self.pifo_len += 1
break

Expand All @@ -536,8 +542,8 @@ def increment_hot(self):
self.hot = 0 if self.hot == (self.n - 1) else self.hot + 1

def pop(self, *_) -> Optional[int]:
"""Pops the PIFO by popping some internal FIFO.
Updates `hot` to be one more than the index of the internal FIFO that
"""Pops the PIFO by popping some internal subqueue.
Updates `hot` to be one more than the index of the internal subqueue that
we did pop.
"""
if self.pifo_len == 0:
Expand Down Expand Up @@ -578,23 +584,27 @@ class StrictPifo:
of priority of the flows. For example, if n = 3 and the client passes order
[1, 2, 0], flow 1 (packets in range [134, 266]) is first priority, flow 2
(packets in range [267, 400]) is second priority, and flow 0 (packets in range
[0, 133]) is last priority.
[0, 133]) is last priority. The argument `subqueues` is a list of subqueues,
which can be any type of queue from this module.
- At push, we check the `boundaries` list to determine which flow to push to.
Take the boundaries example given earlier, [133, 266, 400].
If we push the value 89, it will end up in flow 0 becuase 89 <= 133,
and 305 would end up in flow 2 since 266 <= 305 <= 400.
- Pop first tries to pop from `order[0]`. If this succeeds, great. If it fails,
it tries `order[1]`, etc.
- Peek allows the client to see which element is at the head of the queue
without removing it. Thus, peek works in a similar fashion to `pop`. Further,
nothing is actually dequeued.
"""

def __init__(self, n, boundaries, order, max_len: int):
def __init__(self, n, boundaries, order, subqueues, max_len: int):
self.order = order
self.priority = 0
self.n = n
self.pifo_len = 0
self.boundaries = boundaries
self.data = [Fifo(max_len) for _ in range(n)]
self.data = subqueues

self.max_len = max_len

Expand Down Expand Up @@ -632,6 +642,23 @@ def pop(self, *_):
except QueueError:
self.next_priority()

def peek(self, *_) -> Optional[int]:
"""Peeks into the PIFO."""
if self.pifo_len == 0:
raise QueueError("Cannot peek into empty PIFO.")

original_priority = self.priority
while True:
try:
val = self.data[self.priority].peek()
if val is not None:
self.priority = original_priority
return val
else:
self.next_priority()
except QueueError:
self.next_priority()

def __len__(self) -> int:
return self.pifo_len

Expand Down
8 changes: 5 additions & 3 deletions calyx-py/calyx/rr_queue_oracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
boundaries = [50, 100, 150, 200, 250, 300, 400]
else:
raise ValueError("Unsupported number of flows")

subqueues = [queues.Fifo(len) for _ in range(numflows)]

# Our Round Robin Queue orchestrates n FIFOs, in this case provided as
# a command line argument. It orchestrates the FIFOs in a round-robin fashion.
pifo = queues.RRQueue(numflows, boundaries, len)
# Our Round Robin Queue orchestrates n subqueues, in this case provided as
# a command line argument. It orchestrates the subqueues in a round-robin fashion.
pifo = queues.RRQueue(numflows, boundaries, subqueues, len)

ans = queues.operate_queue(pifo, num_cmds, commands, values, keepgoing=keepgoing)

Expand Down
10 changes: 6 additions & 4 deletions calyx-py/calyx/strict_queue_oracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
else:
raise ValueError("Unsupported number of flows")

# Our Strict queue orchestrates n FIFOs. It takes in a list of
# boundaries of length n, as well as a list `order` which specifies the ranked
subqueues = [queues.Fifo(len) for _ in range(numflows)]

# Our Strict queue orchestrates n subqueues. It takes in a list of
# boundaries of length n, as well as a list `order` which specifies the ranked
# order of the flows.
pifo = queues.StrictPifo(numflows, boundaries, order, len)
pifo = queues.StrictPifo(numflows, boundaries, order, subqueues, len)

ans = queues.operate_queue(pifo, num_cmds, commands, values, keepgoing=keepgoing)

queue_util.dump_json(commands, values, ans)
queue_util.dump_json(commands, values, ans)
Loading

0 comments on commit d7b609d

Please sign in to comment.