Compare commits

...

10 Commits

Author SHA1 Message Date
Sam Fredrickson a20c8908de Unlock() before panic() when already closed. 2023-07-10 10:55:25 -07:00
Sam Fredrickson 9d1228988d Package queue misnamed in docstring. 2023-03-03 22:38:23 -08:00
Sam Fredrickson 0840eb9272 Add test for invalid priority levels to npq. 2023-03-03 22:34:58 -08:00
Sam Fredrickson 9f8d0760f1 Add npq package. 2023-03-03 22:20:11 -08:00
Sam Fredrickson f7474fb673 Rename package circ to queue. 2023-03-03 20:09:51 -08:00
Sam Fredrickson b00fe25128 Various improvements.
* Expand README.md, provide benchmark results.
* Add docs, benchmarks for binheap and circ packages.
* Add methods Len() and Capacity().
* Change *sync.Cond to sync.Cond.
* TryRecv() and TrySend() distinguish empty and closed errors.
* Improve test coverage.
* Add basic Makefile.
* Fix documentation mistakes.
2023-03-03 15:35:49 -08:00
Sam Fredrickson b3b491d9a9 Move priority queue to `pq` package; improve docs. 2023-03-02 01:53:12 -08:00
Sam Fredrickson 5e23a92314 Add high contention benchmark. 2023-03-01 19:31:36 -08:00
Sam Fredrickson ab364c31bb Implement a true priority queue.
* Add a binary max-heap implementation, `binheap`.
* Rename `precise` package to `mq`.
2023-03-01 19:29:15 -08:00
Sam Fredrickson 0759aaa2cd We've got a prioritized message queue, not a priority queue. 2023-03-01 13:39:19 -08:00
19 changed files with 1732 additions and 322 deletions

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
benchresults.txt
coverage

11
Makefile Normal file
View File

@ -0,0 +1,11 @@
.PHONY: test
test:
go test -count=1 -coverprofile=coverage ./...
.PHONY: bench
bench:
go test -bench=. ./...
.PHONY: longbench
longbench:
go test -count=30 -bench=. ./... | tee benchresults.txt

View File

@ -1,12 +1,69 @@
# priorityq - generic priority queue in Go
# priorityq - generic prioritized queues in Go
In Go, the builtin buffered channels provide a concurrent FIFO queue for
passing messages between goroutines. Sometimes, however, it's convenient to be
able to assign priority levels to messages, so that they get delivered to
consumers more promptly.
The `mq` package in this module implements a concurrent, dual-priority message
queue that guarantees receipt of a high-priority items before low-priority
ones. There is a pattern using two channels and select statements to achieve
similar functionality, but it's not exactly equivalent. (See the
[Background](#background) section for more details.)
Additionally, the `pq` package implements a concurrent, traditional priority
queue, using a binary max-heap. This is more general than `mq`, because it
allows multiple levels of priority. This, of course, also makes operations
slower.
Lastly, the `npq` package implements a concurrent, n-priority message queue.
It's similar to `mq`, except that it an arbitrary fixed number of priority
levels. It can have better performance than `pq` for several hundred levels.
## Benchmarks
Here are some benchmark results from running on a Mac Studio/M1 Ultra.
pkg: gogs.humancabbage.net/sam/priorityq/mq
Send-20 13.93n ± 0%
SendChan-20 13.19n ± 0%
Recv-20 13.64n ± 1%
RecvChan-20 13.29n ± 1%
ConcSendRecv-20 97.60n ± 1%
ConcSendRecvChan-20 171.8n ± 5%
HighContention-20 128.2n ± 0%
HighContentionChan-20 47.27n ± 0%
pkg: gogs.humancabbage.net/sam/priorityq/npq
Send-20 13.56n ± 0%
Recv-20 13.51n ± 0%
ConcSendRecv-20 176.3n ± 8%
HighContention-20 121.0n ± 0%
pkg: gogs.humancabbage.net/sam/priorityq/pq
Send-20 18.79n ± 1%
Recv-20 268.1n ± 3%
ConcSendRecv-20 199.2n ± 2%
HighContention-20 440.0n ± 1%
pkg: gogs.humancabbage.net/sam/priorityq/binheap
Insert-20 11.92n ± 0%
Extract-20 261.6n ± 2%
RepeatedInsertExtract-20 25.68n ± 1%
pkg: gogs.humancabbage.net/sam/priorityq/circ
Push-20 2.196n ± 1%
Pop-20 2.187n ± 0%
## Background
This module was inspired by [a reddit post][reddit] wherein /u/zandery23 asked
how to implement a priority queue in Go. A fantastic solution was [provided by
/u/Ploobers][sol]. That's probably right for 99 out of 100 use cases, but it's
not completely precise.
how to implement a prioritized message queue in Go. A fantastic solution was
[provided by /u/Ploobers][sol]. That's probably right for 99 out of 100 use
cases, but it's not completely precise.
Particularly, the second select block does not guarantee that an item from the
priority queue will be taken if there is also an item in the regular queue.
prioritized queue will be taken if there is also an item in the regular queue.
```go
select {
@ -26,11 +83,6 @@ From the [Go Language Specification][go_select]:
Thus, it is possible for the second case to be chosen even if the first case is
also ready.
The `precise` package in this module implements a concurrent priority queue that
guarantees receipt of a high-priority items before low-priority ones. This is
primarily a fun exercise, I cannot recommend that anyone actually use this in a
real project.
[reddit]: https://www.reddit.com/r/golang/comments/11drc17/worker_pool_reading_from_two_channels_one_chan/
[sol]: https://www.reddit.com/r/golang/comments/11drc17/worker_pool_reading_from_two_channels_one_chan/jabfvkh/
[go_select]: https://go.dev/ref/spec#Select_statements

115
binheap/lib.go Normal file
View File

@ -0,0 +1,115 @@
// Package binheap implements a non-concurrent binary max-heap.
//
// # Implementation
//
// [H] is parameterized over two types, one for the priority levels, one for
// the elements. Internally, there are two equally-sized buffers for these
// types. Re-heaping operations swap corresponding entries in these buffers
// in lock-step.
package binheap
import "golang.org/x/exp/constraints"
// H is a binary max-heap.
//
// `P` is the type of the priority levels, and `E` the type of the elements.
type H[P constraints.Ordered, E any] struct {
prs []P
els []E
len int
}
// Make creates a new heap.
func Make[P constraints.Ordered, E any](cap int) H[P, E] {
priorities := make([]P, cap)
elements := make([]E, cap)
h := H[P, E]{prs: priorities, els: elements}
return h
}
// Capacity returns the total capacity of the heap.
func (h *H[P, E]) Capacity() int {
return cap(h.prs)
}
// Len returns the number of items in the heap.
func (h *H[P, E]) Len() int {
return h.len
}
// CanExtract returns true if the heap has any item, otherwise false.
func (h *H[P, E]) CanExtract() bool {
return h.len != 0
}
// CanInsert returns true if the heap has unused capacity, otherwise false.
func (h *H[P, E]) CanInsert() bool {
return cap(h.prs)-h.len != 0
}
// Extract returns the current heap root, then performs a heap-down pass.
//
// If the heap is empty, it panics.
func (h *H[P, E]) Extract() (P, E) {
if !h.CanExtract() {
panic("heap is empty")
}
// extract root
priority := h.prs[0]
element := h.els[0]
// move last entry to root position
h.prs[0] = h.prs[h.len-1]
h.els[0] = h.els[h.len-1]
// clear the former last entry position,
// so as not to hold onto garbage
var emptyPriority P
var emptyElem E
h.prs[h.len-1] = emptyPriority
h.els[h.len-1] = emptyElem
// heap-down
h.len--
idx := 0
for {
left := idx<<1 + 1
right := idx<<1 + 2
largest := idx
if left < h.len && h.prs[left] > h.prs[largest] {
largest = left
}
if right < h.len && h.prs[right] > h.prs[largest] {
largest = right
}
if largest == idx {
break
}
h.prs[idx], h.prs[largest] = h.prs[largest], h.prs[idx]
h.els[idx], h.els[largest] = h.els[largest], h.els[idx]
idx = largest
}
return priority, element
}
// Insert adds an item to the heap, then performs a heap-up pass.
//
// If the heap is full, it panics.
func (h *H[P, E]) Insert(priority P, elem E) {
if !h.CanInsert() {
panic("heap is full")
}
// insert new item into last position
idx := h.len
h.prs[idx] = priority
h.els[idx] = elem
// heap-up
h.len++
for {
parent := (idx - 1) >> 1
if parent < 0 || h.prs[parent] >= h.prs[idx] {
break
}
h.prs[parent], h.prs[idx] = h.prs[idx], h.prs[parent]
h.els[parent], h.els[idx] = h.els[idx], h.els[parent]
idx = parent
}
}

131
binheap/lib_test.go Normal file
View File

@ -0,0 +1,131 @@
package binheap_test
import (
"math/rand"
"testing"
"gogs.humancabbage.net/sam/priorityq/binheap"
)
func TestSmoke(t *testing.T) {
h := binheap.Make[int, int](10)
if h.Capacity() != 10 {
t.Errorf("expected heap capacity to be 10")
}
h.Insert(1, 1)
h.Insert(2, 2)
h.Insert(3, 3)
h.Insert(4, 4)
if h.Len() != 4 {
t.Errorf("expected heap length to be 4")
}
checkExtract := func(n int) {
_, extracted := h.Extract()
if extracted != n {
t.Errorf("expected to extract %d, got %d", n, extracted)
}
}
checkExtract(4)
checkExtract(3)
checkExtract(2)
checkExtract(1)
}
func TestInsertFullPanic(t *testing.T) {
h := binheap.Make[int, int](4)
h.Insert(1, 1)
h.Insert(2, 2)
h.Insert(3, 3)
h.Insert(4, 4)
defer func() {
if r := recover(); r == nil {
t.Errorf("expected final insert to panic")
}
}()
h.Insert(5, 5)
}
func TestExtractEmptyPanic(t *testing.T) {
h := binheap.Make[int, int](4)
defer func() {
if r := recover(); r == nil {
t.Errorf("expected extract to panic")
}
}()
h.Extract()
}
func TestRandomized(t *testing.T) {
h := binheap.Make[int, int](8192)
rs := rand.NewSource(0)
r := rand.New(rs)
// insert a bunch of random integers
for i := 0; i < h.Capacity(); i++ {
n := r.Int()
h.Insert(n, n)
}
// ensure that each extracted integer is <= the last extracted integer
var extracted []int
for h.CanExtract() {
id, item := h.Extract()
if id != item {
t.Errorf("id / item mismatch: %d %d", id, item)
}
lastIdx := len(extracted) - 1
extracted = append(extracted, item)
if lastIdx < 0 {
continue
}
if item > extracted[lastIdx] {
t.Errorf("newly extracted %d is greater than %d",
item, extracted[lastIdx])
}
}
}
func BenchmarkInsert(b *testing.B) {
h := binheap.Make[int, int](b.N)
rs := rand.NewSource(0)
r := rand.New(rs)
items := make([]int, b.N)
for i := 0; i < b.N; i++ {
items[i] = r.Int()
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
h.Insert(items[i], items[i])
}
}
func BenchmarkExtract(b *testing.B) {
h := binheap.Make[int, int](b.N)
rs := rand.NewSource(0)
r := rand.New(rs)
for i := 0; i < b.N; i++ {
n := r.Int()
h.Insert(n, n)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
h.Extract()
}
}
func BenchmarkRepeatedInsertExtract(b *testing.B) {
h := binheap.Make[int, int](128)
rs := rand.NewSource(0)
r := rand.New(rs)
items := make([]int, b.N)
for i := 0; i < h.Capacity()-1; i++ {
n := r.Int()
h.Insert(n, n)
}
for i := 0; i < b.N; i++ {
items[i] = r.Int()
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
h.Insert(items[i], items[i])
h.Extract()
}
}

View File

@ -1,59 +0,0 @@
package circ
// B is a generic, non-concurrent circular FIFO buffer.
type B[T any] struct {
buf []T
len int
head int
tail int
}
// Make creates a new circular buffer.
func Make[T any](cap int) B[T] {
buf := make([]T, cap)
return B[T]{buf: buf}
}
// CanPush returns true if the buffer has space for new items.
func (b *B[T]) CanPush() bool {
return cap(b.buf)-b.len != 0
}
// CanPop returns true if the buffer has one or more items.
func (b *B[T]) CanPop() bool {
return b.len != 0
}
// PopFront returns the front-most item from the buffer.
//
// If the buffer is empty, it panics.
func (b *B[T]) PopFront() T {
if !b.CanPop() {
panic("cannot pop from empty buffer")
}
var empty T
item := b.buf[b.head]
// clear buffer slot so that we don't hold on to garbage
b.buf[b.head] = empty
b.len--
b.head++
if b.head == cap(b.buf) {
b.head = 0
}
return item
}
// PushBack adds an item to the end of the buffer.
//
// If the buffer is full, it panics.
func (b *B[T]) PushBack(value T) {
if !b.CanPush() {
panic("cannot push back to full buffer")
}
b.buf[b.tail] = value
b.len++
b.tail++
if b.tail == cap(b.buf) {
b.tail = 0
}
}

View File

@ -1,67 +0,0 @@
package circ_test
import (
"testing"
"gogs.humancabbage.net/sam/priorityq/circ"
)
func TestRepeatPushPop(t *testing.T) {
t.Parallel()
cb := circ.Make[int](4)
for i := 0; i < 50; i++ {
cb.PushBack(1)
cb.PushBack(2)
cb.PushBack(3)
cb.PushBack(4)
checkPop := func(n int) {
if v := cb.PopFront(); v != n {
t.Errorf("popped %d, expected %d", v, n)
}
}
checkPop(1)
checkPop(2)
checkPop(3)
checkPop(4)
}
}
func TestInterleavedPushPop(t *testing.T) {
t.Parallel()
cb := circ.Make[int](4)
checkPop := func(n int) {
if v := cb.PopFront(); v != n {
t.Errorf("popped %d, expected %d", v, n)
}
}
cb.PushBack(1)
cb.PushBack(2)
cb.PushBack(3)
checkPop(1)
cb.PushBack(4)
cb.PushBack(5)
checkPop(2)
}
func TestEmptyPopPanic(t *testing.T) {
defer func() {
if r := recover(); r == nil {
t.Errorf("popping empty buffer did not panic")
}
}()
t.Parallel()
cb := circ.Make[int](4)
cb.PopFront()
}
func TestFullPushPanic(t *testing.T) {
defer func() {
if r := recover(); r == nil {
t.Errorf("pushing full buffer did not panic")
}
}()
t.Parallel()
cb := circ.Make[int](1)
cb.PushBack(1)
cb.PushBack(2)
}

2
go.mod
View File

@ -1,3 +1,5 @@
module gogs.humancabbage.net/sam/priorityq
go 1.20
require golang.org/x/exp v0.0.0-20230224173230-c95f2b4c22f2

2
go.sum
View File

@ -0,0 +1,2 @@
golang.org/x/exp v0.0.0-20230224173230-c95f2b4c22f2 h1:Jvc7gsqn21cJHCmAWx0LiimpP18LZmUxkT5Mp7EZ1mI=
golang.org/x/exp v0.0.0-20230224173230-c95f2b4c22f2/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=

41
lib.go Normal file
View File

@ -0,0 +1,41 @@
// Package priorityq provides generic implementations of various concurrent,
// prioritized queues.
//
// # Behavior
//
// All types of queues in this module act similarly to buffered Go channels.
//
// - They are bounded to a fixed capacity, set at construction.
// - Closing and sending to an already-closed queue causes a panic.
// - Receivers can continue getting items after closure, and can use a final
// bool to determine when there are none remaining.
// - They are safe for multiple concurrent senders and receivers.
//
// # Implementation
//
// All data structures in this module use [generics], introduced in Go 1.18.
//
// All of the concurrent data structures in this package use a [sync.Mutex]
// and a few [sync.Cond] variables.
//
// [generics]: https://go.dev/blog/intro-generics
package priorityq
import (
"fmt"
)
// ErrEmpty means that an operation failed because the queue was empty.
var ErrEmpty error
// ErrFull means that an operation failed because the queue was full.
var ErrFull error
// ErrClosed means that an operation failed because the queue was closed.
var ErrClosed error
func init() {
ErrEmpty = fmt.Errorf("queue is empty")
ErrFull = fmt.Errorf("queue is full")
ErrClosed = fmt.Errorf("queue is closed")
}

201
mq/lib.go Normal file
View File

@ -0,0 +1,201 @@
// Package mq implements a concurrent, dual-priority message queue.
//
// [Q] is similar to a buffered channel, except that senders can assign one of
// two priority levels to each item, "high" or "low." Receivers will always
// get a high-priority item ahead of any low-priority ones.
//
// For example:
//
// q := mq.Make[string](8)
// q.SendLow("world")
// q.SendHigh("hello")
// word1, _ := mq.Recv()
// word2, _ := mq.Recv()
// fmt.Println(word1, word2)
// q.Close()
// // Output: hello world
//
// # Implementation
//
// Each [Q] has two circular buffers, one for each priority level. Currently,
// the capacities for these are fixed and equal. If one buffer is full,
// attempts to send further items with its priority level will block
// ([Q.Send]) or fail ([Q.TrySend]).
//
// Compared [pq.Q], the limitation on priority levels increases performance,
// as its circular buffers are much less expensive than the heap operations of
// a traditional priority queue.
package mq
import (
"sync"
"gogs.humancabbage.net/sam/priorityq"
"gogs.humancabbage.net/sam/priorityq/pq"
"gogs.humancabbage.net/sam/priorityq/queue"
)
// so that godoc (kinda) works
var _ *pq.Q[int, int]
// Q is a concurrent, dual-priority message queue.
type Q[T any] struct {
*state[T]
}
// Make a new queue.
func Make[T any](cap int) Q[T] {
high := queue.Make[T](cap)
low := queue.Make[T](cap)
s := &state[T]{
high: high,
low: low,
}
s.canRecv = sync.Cond{L: &s.mu}
s.canSendHigh = sync.Cond{L: &s.mu}
s.canSendLow = sync.Cond{L: &s.mu}
return Q[T]{s}
}
type state[T any] struct {
mu sync.Mutex
high queue.Q[T]
low queue.Q[T]
canSendHigh sync.Cond
canSendLow sync.Cond
canRecv sync.Cond
closed bool
}
// Close marks the queue as closed.
//
// Attempting to close an already-closed queue results in a panic.
func (s *state[T]) Close() {
s.mu.Lock()
if s.closed {
s.mu.Unlock()
panic("close of closed queue")
}
s.closed = true
s.mu.Unlock()
s.canRecv.Broadcast()
}
// Recv gets an item, blocking when empty until one is available.
//
// The returned bool will be true if the queue still has items or is open.
// It will be false if the queue is empty and closed.
func (s *state[T]) Recv() (T, bool) {
s.mu.Lock()
defer s.mu.Unlock()
for {
for !s.closed && !s.high.CanPop() && !s.low.CanPop() {
s.canRecv.Wait()
}
if s.closed && !s.high.CanPop() && !s.low.CanPop() {
var empty T
return empty, false
}
if s.high.CanPop() {
value := s.high.PopFront()
s.canSendHigh.Broadcast()
return value, true
}
if s.low.CanPop() {
value := s.low.PopFront()
s.canSendLow.Broadcast()
return value, true
}
}
}
// Send is an alias for SendLow.
func (s *state[T]) Send(value T) {
s.SendLow(value)
}
// SendHigh adds an item with high priority, blocking if full.
func (s *state[T]) SendHigh(value T) {
s.send(value, &s.high, &s.canSendHigh)
}
// SendLow adds an item with low buffer, blocking if full.
func (s *state[T]) SendLow(value T) {
s.send(value, &s.low, &s.canSendLow)
}
// TryRecv attempts to get an item from the queue, without blocking.
//
// The error indicates whether the attempt succeeded, the queue is empty, or
// the queue is closed.
func (s *state[T]) TryRecv() (value T, err error) {
s.mu.Lock()
defer s.mu.Unlock()
if s.high.CanPop() {
value = s.high.PopFront()
s.canSendHigh.Broadcast()
return
}
if s.low.CanPop() {
value = s.low.PopFront()
s.canSendLow.Broadcast()
return
}
if s.closed {
err = priorityq.ErrClosed
} else {
err = priorityq.ErrEmpty
}
return
}
// TrySend is an alias for TrySendLow.
func (s *state[T]) TrySend(value T) error {
return s.trySend(value, &s.low)
}
// TrySendHigh attempts to add an item with high priority, without blocking.
//
// Returns an error from the root priorityq package, or nil if successful.
func (s *state[T]) TrySendHigh(value T) error {
return s.trySend(value, &s.high)
}
// TrySendLow attempts to add an item with low priority, without blocking.
//
// Returns an error from the root priorityq package, or nil if successful.
func (s *state[T]) TrySendLow(value T) error {
return s.trySend(value, &s.low)
}
func (s *state[T]) send(value T, buf *queue.Q[T], cond *sync.Cond) {
s.mu.Lock()
defer s.mu.Unlock()
for {
for !s.closed && !buf.CanPush() {
cond.Wait()
}
if s.closed {
panic("send on closed queue")
}
if buf.CanPush() {
buf.PushBack(value)
s.canRecv.Broadcast()
return
}
}
}
func (s *state[T]) trySend(value T, buf *queue.Q[T]) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.closed {
return priorityq.ErrClosed
}
if !buf.CanPush() {
return priorityq.ErrFull
}
buf.PushBack(value)
s.canRecv.Broadcast()
return nil
}

View File

@ -1,16 +1,18 @@
package precise_test
package mq_test
import (
"math/rand"
"runtime"
"sync"
"testing"
"gogs.humancabbage.net/sam/priorityq/precise"
"gogs.humancabbage.net/sam/priorityq"
"gogs.humancabbage.net/sam/priorityq/mq"
)
func TestRecvHighFirst(t *testing.T) {
t.Parallel()
q := precise.Make[int](4)
q := mq.Make[int](4)
q.Send(1)
q.Send(2)
q.Send(3)
@ -41,14 +43,14 @@ func TestSendClosedPanic(t *testing.T) {
t.Errorf("sending to closed queue did not panic")
}
}()
q := precise.Make[int](4)
q := mq.Make[int](4)
q.Close()
q.Send(1)
}
func TestRecvClosed(t *testing.T) {
t.Parallel()
q := precise.Make[int](4)
q := mq.Make[int](4)
q.Send(1)
q.Close()
_, ok := q.Recv()
@ -61,18 +63,30 @@ func TestRecvClosed(t *testing.T) {
}
}
func TestDoubleClose(t *testing.T) {
t.Parallel()
q := mq.Make[int](4)
defer func() {
if r := recover(); r == nil {
t.Errorf("closing a closed queue did not panic")
}
}()
q.Close()
q.Close()
}
func TestTrySendRecv(t *testing.T) {
t.Parallel()
q := precise.Make[int](4)
assumeSendOk := func(n int, f func(int) bool) {
ok := f(n)
if !ok {
q := mq.Make[int](4)
assumeSendOk := func(n int, f func(int) error) {
err := f(n)
if err != nil {
t.Errorf("expected to be able to send")
}
}
assumeRecvOk := func(expected int) {
actual, ok := q.TryRecv()
if !ok {
actual, err := q.TryRecv()
if err != nil {
t.Errorf("expected to be able to receive")
}
if actual != expected {
@ -81,10 +95,10 @@ func TestTrySendRecv(t *testing.T) {
}
assumeSendOk(1, q.TrySendLow)
assumeSendOk(2, q.TrySendLow)
assumeSendOk(3, q.TrySendLow)
assumeSendOk(3, q.TrySend)
assumeSendOk(4, q.TrySendLow)
ok := q.TrySendLow(5)
if ok {
err := q.TrySendLow(5)
if err == nil {
t.Errorf("expected low buffer to be full")
}
assumeRecvOk(1)
@ -96,8 +110,8 @@ func TestTrySendRecv(t *testing.T) {
assumeSendOk(6, q.TrySendHigh)
assumeSendOk(7, q.TrySendHigh)
assumeSendOk(8, q.TrySendHigh)
ok = q.TrySendHigh(5)
if ok {
err = q.TrySendHigh(5)
if err == nil {
t.Errorf("expected high buffer to be full")
}
assumeRecvOk(5)
@ -105,15 +119,24 @@ func TestTrySendRecv(t *testing.T) {
assumeRecvOk(7)
assumeRecvOk(8)
_, ok = q.TryRecv()
if ok {
_, err = q.TryRecv()
if err != priorityq.ErrEmpty {
t.Errorf("expected queue to be empty")
}
q.Close()
_, err = q.TryRecv()
if err != priorityq.ErrClosed {
t.Errorf("expected queue to be closed ")
}
err = q.TrySend(5)
if err != priorityq.ErrClosed {
t.Errorf("expected queue to be closed ")
}
}
func TestConcProducerConsumer(t *testing.T) {
t.Parallel()
q := precise.Make[int](4)
q := mq.Make[int](4)
var wg sync.WaitGroup
produceDone := make(chan struct{})
wg.Add(2)
@ -142,7 +165,7 @@ func TestConcProducerConsumer(t *testing.T) {
}
func BenchmarkSend(b *testing.B) {
q := precise.Make[int](b.N)
q := mq.Make[int](b.N)
b.ResetTimer()
for i := 0; i < b.N; i++ {
q.Send(i)
@ -158,7 +181,7 @@ func BenchmarkSendChan(b *testing.B) {
}
func BenchmarkRecv(b *testing.B) {
q := precise.Make[int](b.N)
q := mq.Make[int](b.N)
for i := 0; i < b.N; i++ {
q.Send(i)
}
@ -180,7 +203,7 @@ func BenchmarkRecvChan(b *testing.B) {
}
func BenchmarkConcSendRecv(b *testing.B) {
q := precise.Make[int](b.N)
q := mq.Make[int](b.N)
var wg sync.WaitGroup
wg.Add(2)
start := make(chan struct{})
@ -226,3 +249,64 @@ func BenchmarkConcSendRecvChan(b *testing.B) {
close(start)
wg.Wait()
}
func BenchmarkHighContention(b *testing.B) {
q := mq.Make[int](b.N)
var wg sync.WaitGroup
start := make(chan struct{})
done := make(chan struct{})
numProducers := runtime.NumCPU()
sendsPerProducer := b.N / numProducers
wg.Add(numProducers)
for i := 0; i < numProducers; i++ {
go func() {
<-start
for i := 0; i < sendsPerProducer; i++ {
q.Send(1)
}
wg.Done()
}()
}
go func() {
ok := true
for ok {
_, ok = q.Recv()
}
close(done)
}()
b.ResetTimer()
close(start)
wg.Wait()
q.Close()
<-done
}
func BenchmarkHighContentionChan(b *testing.B) {
c := make(chan int, b.N)
var wg sync.WaitGroup
start := make(chan struct{})
done := make(chan struct{})
numProducers := runtime.NumCPU()
sendsPerProducer := b.N / numProducers
wg.Add(numProducers)
for i := 0; i < numProducers; i++ {
go func() {
<-start
for i := 0; i < sendsPerProducer; i++ {
c <- 1
}
wg.Done()
}()
}
go func() {
for n := range c {
_ = n
}
close(done)
}()
b.ResetTimer()
close(start)
wg.Wait()
close(c)
<-done
}

214
npq/lib.go Normal file
View File

@ -0,0 +1,214 @@
// Package npq implements a concurrent, n-priority message queue.
//
// [Q] is similar to a buffered channel, except that senders can assign one of
// some fixed number priority levels to each item. Receivers will always get
// the item with the highest priority.
//
// For example:
//
// q := npq.Make[int, string](8, 1, 2)
// q.Send(1, "world")
// q.Send(2, "hello")
// word1, _ := q.Recv()
// word2, _ := q.Recv()
// fmt.Println(word1, word2)
// q.Close()
// // Output: hello world
//
// # Implementation
//
// Each [Q] has an array of circular buffers, one for each priority level.
// Currently, the capacities for these are fixed and equal. If one buffer is
// full, attempts to send further items with its priority level will block
// ([Q.Send]) or fail ([Q.TrySend]).
//
// Compared to [mq.Q] , which has only two priority levels, [Q] performs
// similarly in the Send and Recv benchmarks. In the ConcSendRecv benchmark,
// though, this package is around 75% slower. Even more peculiar, however, in
// HighContention, it is 7% faster.
//
// Compared the pq package, this package is faster in the HighContention
// benchmark until around 145-150 priority levels; in ConcSendRecv, until
// around 200 levels; and in Recv, until around 750 levels.
//
// It's important to remember the memory requirement differences. A [pq.Q]
// with capacity N can store N items; a [Q] with capacity N and P priority
// levels can store N*P items.
package npq
import (
"fmt"
"sync"
"gogs.humancabbage.net/sam/priorityq"
"gogs.humancabbage.net/sam/priorityq/mq"
"gogs.humancabbage.net/sam/priorityq/pq"
"gogs.humancabbage.net/sam/priorityq/queue"
"golang.org/x/exp/constraints"
)
// so that godoc (kinda) works
var _ *pq.Q[int, int]
var _ *mq.Q[int]
// Q is a concurrent, dual-priority message queue.
type Q[P constraints.Integer, T any] struct {
*state[P, T]
}
// Make a new queue.
func Make[P constraints.Integer, T any](cap int, min P, max P) Q[P, T] {
numLevels := max - min + 1
levels := make([]queue.Q[T], numLevels)
for i := 0; i < int(numLevels); i++ {
levels[i] = queue.Make[T](cap)
}
s := &state[P, T]{
levels: levels,
min: min,
max: max,
}
canSend := make([]sync.Cond, numLevels)
for i := 0; i < len(canSend); i++ {
canSend[i].L = &s.mu
}
s.canRecv = sync.Cond{L: &s.mu}
s.canSend = canSend
return Q[P, T]{s}
}
type state[P constraints.Integer, T any] struct {
mu sync.Mutex
levels []queue.Q[T]
canSend []sync.Cond
canRecv sync.Cond
min P
max P
closed bool
}
// Close marks the queue as closed.
//
// Attempting to close an already-closed queue results in a panic.
func (s *state[P, T]) Close() {
s.mu.Lock()
if s.closed {
s.mu.Unlock()
panic("close of closed queue")
}
s.closed = true
s.mu.Unlock()
s.canRecv.Broadcast()
}
// Recv gets an item, blocking when empty until one is available.
//
// The returned bool will be true if the queue still has items or is open.
// It will be false if the queue is empty and closed.
func (s *state[P, T]) Recv() (T, bool) {
s.mu.Lock()
defer s.mu.Unlock()
for {
var available int = -1
findAvailable := func() {
for levelIdx := len(s.levels) - 1; levelIdx > -1; levelIdx-- {
level := &s.levels[levelIdx]
if level.CanPop() {
available = levelIdx
break
}
}
}
findAvailable()
for !s.closed && available == -1 {
s.canRecv.Wait()
findAvailable()
}
if s.closed && available == -1 {
var empty T
return empty, false
}
if available != -1 {
level := &s.levels[available]
value := level.PopFront()
s.canSend[available].Broadcast()
return value, true
}
}
}
// Send adds an item with some priority, blocking if full.
func (s *state[P, T]) Send(priority P, value T) {
s.validatePriority(priority)
s.mu.Lock()
defer s.mu.Unlock()
levelIdx := priority - s.min
level := &s.levels[levelIdx]
for {
for !s.closed && !level.CanPush() {
s.canSend[levelIdx].Wait()
}
if s.closed {
panic("send on closed queue")
}
if level.CanPush() {
level.PushBack(value)
s.canRecv.Broadcast()
return
}
}
}
// TryRecv attempts to get an item from the queue, without blocking.
//
// The error indicates whether the attempt succeeded, the queue is empty, or
// the queue is closed.
func (s *state[P, T]) TryRecv() (value T, err error) {
s.mu.Lock()
defer s.mu.Unlock()
for levelIdx := len(s.levels) - 1; levelIdx > -1; levelIdx-- {
level := &s.levels[levelIdx]
if level.CanPop() {
value = level.PopFront()
s.canSend[levelIdx].Broadcast()
return
}
}
if s.closed {
err = priorityq.ErrClosed
} else {
err = priorityq.ErrEmpty
}
return
}
// TrySend attempts to add an item with some priority, without blocking.
//
// Returns an error from the root priorityq package, or nil if successful.
func (s *state[P, T]) TrySend(priority P, value T) error {
s.validatePriority(priority)
s.mu.Lock()
defer s.mu.Unlock()
if s.closed {
return priorityq.ErrClosed
}
levelIdx := priority - s.min
level := &s.levels[levelIdx]
if !level.CanPush() {
return priorityq.ErrFull
}
level.PushBack(value)
s.canRecv.Broadcast()
return nil
}
func (s *state[P, T]) validatePriority(priority P) {
if priority < s.min || priority > s.max {
panic(fmt.Errorf("priority %d out of range (%d, %d)",
priority, s.min, s.max))
}
}

277
npq/lib_test.go Normal file
View File

@ -0,0 +1,277 @@
package npq_test
import (
"math/rand"
"runtime"
"sync"
"testing"
"gogs.humancabbage.net/sam/priorityq"
"gogs.humancabbage.net/sam/priorityq/npq"
)
func TestRecvHighFirst(t *testing.T) {
t.Parallel()
q := npq.Make[int, int](4, 1, 2)
q.Send(1, 1)
q.Send(1, 2)
q.Send(1, 3)
q.Send(1, 4)
q.Send(2, 5)
q.Send(2, 6)
q.Send(2, 7)
q.Send(2, 8)
checkRecv := func(n int) {
if v, _ := q.Recv(); v != n {
t.Errorf("popped %d, expected %d", v, n)
}
}
checkRecv(5)
checkRecv(6)
checkRecv(7)
checkRecv(8)
checkRecv(1)
checkRecv(2)
checkRecv(3)
checkRecv(4)
}
func TestSendClosedPanic(t *testing.T) {
t.Parallel()
defer func() {
if r := recover(); r == nil {
t.Errorf("sending to closed queue did not panic")
}
}()
q := npq.Make[int, int](4, 1, 2)
q.Close()
q.Send(1, 1)
}
func TestTooLowPriorityPanic(t *testing.T) {
testInvalidPriorityPanic(t, 0)
}
func TestTooHighPriorityPanic(t *testing.T) {
testInvalidPriorityPanic(t, 3)
}
func testInvalidPriorityPanic(t *testing.T, invalid int) {
t.Parallel()
defer func() {
if r := recover(); r == nil {
t.Errorf("sending with invalid priority did not panic")
}
}()
q := npq.Make[int, int](4, 1, 2)
q.Send(invalid, 1)
}
func TestRecvClosed(t *testing.T) {
t.Parallel()
q := npq.Make[int, int](4, 1, 2)
q.Send(1, 1)
q.Close()
_, ok := q.Recv()
if !ok {
t.Errorf("queue should have item to receive")
}
_, ok = q.Recv()
if ok {
t.Errorf("queue should be closed")
}
}
func TestDoubleClose(t *testing.T) {
t.Parallel()
q := npq.Make[int, int](4, 1, 2)
defer func() {
if r := recover(); r == nil {
t.Errorf("closing a closed queue did not panic")
}
}()
q.Close()
q.Close()
}
func TestTrySendRecv(t *testing.T) {
t.Parallel()
q := npq.Make[int, int](4, 1, 2)
assumeSendOk := func(n int, f func(int) error) {
err := f(n)
if err != nil {
t.Errorf("expected to be able to send")
}
}
assumeRecvOk := func(expected int) {
actual, err := q.TryRecv()
if err != nil {
t.Errorf("expected to be able to receive")
}
if actual != expected {
t.Errorf("expected %d, got %d", expected, actual)
}
}
trySendLow := func(n int) error {
return q.TrySend(1, n)
}
trySendHigh := func(n int) error {
return q.TrySend(2, n)
}
assumeSendOk(1, trySendLow)
assumeSendOk(2, trySendLow)
assumeSendOk(3, trySendLow)
assumeSendOk(4, trySendLow)
err := trySendLow(5)
if err == nil {
t.Errorf("expected low buffer to be full")
}
assumeRecvOk(1)
assumeRecvOk(2)
assumeRecvOk(3)
assumeRecvOk(4)
assumeSendOk(5, trySendHigh)
assumeSendOk(6, trySendHigh)
assumeSendOk(7, trySendHigh)
assumeSendOk(8, trySendHigh)
err = trySendHigh(5)
if err == nil {
t.Errorf("expected high buffer to be full")
}
assumeRecvOk(5)
assumeRecvOk(6)
assumeRecvOk(7)
assumeRecvOk(8)
_, err = q.TryRecv()
if err != priorityq.ErrEmpty {
t.Errorf("expected queue to be empty")
}
q.Close()
_, err = q.TryRecv()
if err != priorityq.ErrClosed {
t.Errorf("expected queue to be closed ")
}
err = q.TrySend(1, 5)
if err != priorityq.ErrClosed {
t.Errorf("expected queue to be closed ")
}
}
func TestConcProducerConsumer(t *testing.T) {
t.Parallel()
q := npq.Make[int, int](4, 1, 2)
var wg sync.WaitGroup
produceDone := make(chan struct{})
wg.Add(2)
go func() {
for i := 0; i < 10000; i++ {
q.Send(rand.Intn(2)+1, i)
}
close(produceDone)
wg.Done()
}()
go func() {
ok := true
for ok {
_, ok = q.Recv()
}
wg.Done()
}()
<-produceDone
t.Logf("producer done, closing channel")
q.Close()
wg.Wait()
}
const highPriority = 2
func BenchmarkSend(b *testing.B) {
q := npq.Make[int, int](b.N, 1, highPriority)
// randomize priorities to get amortized cost per op
ps := make([]int, b.N)
for i := 0; i < b.N; i++ {
ps[i] = rand.Intn(highPriority) + 1
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
q.Send(ps[i], i)
}
}
func BenchmarkRecv(b *testing.B) {
q := npq.Make[int, int](b.N, 1, highPriority)
for i := 0; i < b.N; i++ {
q.Send(rand.Intn(highPriority)+1, i)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
q.Recv()
}
}
func BenchmarkConcSendRecv(b *testing.B) {
q := npq.Make[int, int](b.N, 1, highPriority)
// randomize priorities to get amortized cost per op
ps := make([]int, b.N)
for i := 0; i < b.N; i++ {
ps[i] = rand.Intn(highPriority) + 1
}
var wg sync.WaitGroup
wg.Add(2)
start := make(chan struct{})
go func() {
<-start
for i := 0; i < b.N; i++ {
q.Send(ps[i], i)
}
wg.Done()
}()
go func() {
<-start
for i := 0; i < b.N; i++ {
q.Recv()
}
wg.Done()
}()
b.ResetTimer()
close(start)
wg.Wait()
}
func BenchmarkHighContention(b *testing.B) {
q := npq.Make[int, int](b.N, 1, highPriority)
var wg sync.WaitGroup
start := make(chan struct{})
done := make(chan struct{})
numProducers := runtime.NumCPU()
sendsPerProducer := b.N / numProducers
wg.Add(numProducers)
for i := 0; i < numProducers; i++ {
go func() {
ps := make([]int, sendsPerProducer)
for i := range ps {
ps[i] = rand.Intn(highPriority) + 1
}
<-start
for i := 0; i < sendsPerProducer; i++ {
q.Send(ps[i], 1)
}
wg.Done()
}()
}
go func() {
ok := true
for ok {
_, ok = q.Recv()
}
close(done)
}()
b.ResetTimer()
close(start)
wg.Wait()
q.Close()
<-done
}

155
pq/lib.go Normal file
View File

@ -0,0 +1,155 @@
// Package pq implements a concurrent priority queue.
//
// [Q] is similar to a buffered channel, except that senders attach to each
// item a priority, and receivers always get the highest-priority item.
//
// For example:
//
// import "gogs.humancabbage.net/sam/priorityq/pq"
// q := pq.Make[int, string](8)
// q.Send(1, "world")
// q.Send(2, "hello")
// _, word1, _ := pq.Recv()
// _, word2, _ := pq.Recv()
// fmt.Println(word1, word2)
// q.Close()
// // Output: hello world
//
// # Implementation
//
// Each queue has a [binary max-heap]. Sending and receiving items require
// heap-up and heap-down operations, respectively.
//
// [binary max-heap]: https://en.wikipedia.org/wiki/Binary_heap
package pq
import (
"sync"
"gogs.humancabbage.net/sam/priorityq"
"gogs.humancabbage.net/sam/priorityq/binheap"
"golang.org/x/exp/constraints"
)
// Q is a generic, concurrent priority queue.
type Q[P constraints.Ordered, T any] struct {
*state[P, T]
}
// Make a new queue.
func Make[P constraints.Ordered, T any](cap int) Q[P, T] {
heap := binheap.Make[P, T](cap)
s := &state[P, T]{
heap: heap,
}
s.canRecv = sync.Cond{L: &s.mu}
s.canSend = sync.Cond{L: &s.mu}
return Q[P, T]{s}
}
type state[P constraints.Ordered, T any] struct {
mu sync.Mutex
heap binheap.H[P, T]
canSend sync.Cond
canRecv sync.Cond
closed bool
}
// Close marks the queue as closed.
//
// Attempting to close an already-closed queue results in a panic.
func (s *state[P, T]) Close() {
s.mu.Lock()
if s.closed {
s.mu.Unlock()
panic("close of closed queue")
}
s.closed = true
s.mu.Unlock()
s.canRecv.Broadcast()
}
// Recv gets an item, blocking when empty until one is available.
//
// This returns both the item itself and the its assigned priority.
//
// The returned bool will be true if the queue still has items or is open.
// It will be false if the queue is empty and closed.
func (s *state[P, T]) Recv() (P, T, bool) {
s.mu.Lock()
defer s.mu.Unlock()
for {
for !s.closed && !s.heap.CanExtract() {
s.canRecv.Wait()
}
if s.closed && !s.heap.CanExtract() {
var emptyP P
var emptyT T
return emptyP, emptyT, false
}
if s.heap.CanExtract() {
priority, value := s.heap.Extract()
s.canSend.Broadcast()
return priority, value, true
}
}
}
// Send adds an item with some priority, blocking if full.
func (s *state[P, T]) Send(priority P, value T) {
s.mu.Lock()
defer s.mu.Unlock()
for {
for !s.closed && !s.heap.CanInsert() {
s.canSend.Wait()
}
if s.closed {
panic("send on closed queue")
}
if s.heap.CanInsert() {
s.heap.Insert(priority, value)
s.canRecv.Broadcast()
return
}
}
}
// TryRecv attempts to get an item without blocking.
//
// This returns both the item itself and the its assigned priority.
//
// The error indicates whether the attempt succeeded, the queue is empty, or
// the queue is closed.
func (s *state[P, T]) TryRecv() (priority P, value T, err error) {
s.mu.Lock()
defer s.mu.Unlock()
if s.heap.CanExtract() {
priority, value = s.heap.Extract()
s.canSend.Broadcast()
return
}
if s.closed {
err = priorityq.ErrClosed
} else {
err = priorityq.ErrEmpty
}
return
}
// TrySend attempts to add an item with some priority, without blocking.
//
// This method does not block. If there is space in the buffer, it returns
// true. If the buffer is full, it returns false.
func (s *state[P, T]) TrySend(priority P, value T) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.closed {
return priorityq.ErrClosed
}
if !s.heap.CanInsert() {
return priorityq.ErrFull
}
s.heap.Insert(priority, value)
s.canRecv.Broadcast()
return nil
}

237
pq/lib_test.go Normal file
View File

@ -0,0 +1,237 @@
package pq_test
import (
"math/rand"
"runtime"
"sync"
"testing"
"gogs.humancabbage.net/sam/priorityq"
"gogs.humancabbage.net/sam/priorityq/pq"
)
func TestRecvHighestFirst(t *testing.T) {
t.Parallel()
q := pq.Make[int, int](8)
q.Send(4, 4)
q.Send(2, 2)
q.Send(1, 1)
q.Send(5, 5)
q.Send(7, 7)
q.Send(8, 8)
q.Send(3, 3)
q.Send(6, 6)
checkRecv := func(n int) {
if _, v, _ := q.Recv(); v != n {
t.Errorf("popped %d, expected %d", v, n)
}
}
checkRecv(8)
checkRecv(7)
checkRecv(6)
checkRecv(5)
checkRecv(4)
checkRecv(3)
checkRecv(2)
checkRecv(1)
}
func TestSendClosedPanic(t *testing.T) {
t.Parallel()
defer func() {
if r := recover(); r == nil {
t.Errorf("sending to closed queue did not panic")
}
}()
q := pq.Make[int, int](4)
q.Close()
q.Send(1, 1)
}
func TestRecvClosed(t *testing.T) {
t.Parallel()
q := pq.Make[int, int](4)
q.Send(1, 1)
q.Close()
_, _, ok := q.Recv()
if !ok {
t.Errorf("queue should have item to receive")
}
_, _, ok = q.Recv()
if ok {
t.Errorf("queue should be closed")
}
}
func TestDoubleClose(t *testing.T) {
t.Parallel()
q := pq.Make[int, int](4)
defer func() {
if r := recover(); r == nil {
t.Errorf("closing a closed queue did not panic")
}
}()
q.Close()
q.Close()
}
func TestTrySendRecv(t *testing.T) {
t.Parallel()
q := pq.Make[int, int](4)
assumeSendOk := func(n int) {
err := q.TrySend(n, n)
if err != nil {
t.Errorf("expected to be able to send")
}
}
assumeRecvOk := func(expected int) {
_, actual, err := q.TryRecv()
if err != nil {
t.Errorf("expected to be able to receive")
}
if actual != expected {
t.Errorf("expected %d, got %d", expected, actual)
}
}
assumeSendOk(1)
assumeSendOk(2)
assumeSendOk(3)
assumeSendOk(4)
err := q.TrySend(5, 5)
if err == nil {
t.Errorf("expected queue to be full")
}
assumeRecvOk(4)
assumeRecvOk(3)
assumeRecvOk(2)
assumeRecvOk(1)
_, _, err = q.TryRecv()
if err != priorityq.ErrEmpty {
t.Errorf("expected queue to be empty")
}
q.Close()
_, _, err = q.TryRecv()
if err != priorityq.ErrClosed {
t.Errorf("expected queue to be closed ")
}
err = q.TrySend(1, 1)
if err != priorityq.ErrClosed {
t.Errorf("expected queue to be closed ")
}
}
func TestConcProducerConsumer(t *testing.T) {
t.Parallel()
q := pq.Make[int, int](4)
var wg sync.WaitGroup
produceDone := make(chan struct{})
wg.Add(2)
go func() {
for i := 0; i < 10000; i++ {
q.Send(rand.Int(), i)
}
close(produceDone)
wg.Done()
}()
go func() {
ok := true
for ok {
_, _, ok = q.Recv()
}
wg.Done()
}()
<-produceDone
t.Logf("producer done, closing channel")
q.Close()
wg.Wait()
}
func BenchmarkSend(b *testing.B) {
q := pq.Make[int, int](b.N)
// randomize priorities to get amortized cost per op
ps := make([]int, b.N)
for i := 0; i < b.N; i++ {
ps[i] = rand.Int()
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
q.Send(ps[i], i)
}
}
func BenchmarkRecv(b *testing.B) {
q := pq.Make[int, int](b.N)
// randomize priorities to get amortized cost per op
for i := 0; i < b.N; i++ {
q.Send(rand.Int(), i)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
q.Recv()
}
}
func BenchmarkConcSendRecv(b *testing.B) {
q := pq.Make[int, int](b.N)
// randomize priorities to get amortized cost per op
ps := make([]int, b.N)
for i := 0; i < b.N; i++ {
ps[i] = rand.Int()
}
var wg sync.WaitGroup
wg.Add(2)
start := make(chan struct{})
go func() {
<-start
for i := 0; i < b.N; i++ {
q.Send(ps[i], i)
}
wg.Done()
}()
go func() {
<-start
for i := 0; i < b.N; i++ {
q.Recv()
}
wg.Done()
}()
b.ResetTimer()
close(start)
wg.Wait()
}
func BenchmarkHighContention(b *testing.B) {
q := pq.Make[int, int](b.N)
var wg sync.WaitGroup
start := make(chan struct{})
done := make(chan struct{})
numProducers := runtime.NumCPU()
sendsPerProducer := b.N / numProducers
wg.Add(numProducers)
for i := 0; i < numProducers; i++ {
go func() {
ps := make([]int, sendsPerProducer)
for i := 0; i < sendsPerProducer; i++ {
ps[i] = rand.Int()
}
<-start
for i := 0; i < sendsPerProducer; i++ {
q.Send(ps[i], 1)
}
wg.Done()
}()
}
go func() {
ok := true
for ok {
_, _, ok = q.Recv()
}
close(done)
}()
b.ResetTimer()
close(start)
wg.Wait()
q.Close()
<-done
}

View File

@ -1,164 +0,0 @@
package precise
import (
"sync"
"gogs.humancabbage.net/sam/priorityq/circ"
)
// Q is a precise, concurrent priority queue.
//
// Each queue has two internal buffers, high and low. This implementation
// guarantees that when there are items in both buffers, consumers receive
// ones from the high priority buffer first.
//
// Each buffer has the same capacity, set on initial construction. Sending to
// a buffer will block if it is full, even if the other buffer has space.
type Q[T any] struct {
*state[T]
}
// Make a new priority queue.
func Make[T any](cap int) Q[T] {
high := circ.Make[T](cap)
low := circ.Make[T](cap)
s := &state[T]{
high: high,
low: low,
}
s.canRecv = sync.NewCond(&s.mu)
s.canSendHigh = sync.NewCond(&s.mu)
s.canSendLow = sync.NewCond(&s.mu)
return Q[T]{s}
}
type state[T any] struct {
mu sync.Mutex
high circ.B[T]
low circ.B[T]
canSendHigh *sync.Cond
canSendLow *sync.Cond
canRecv *sync.Cond
closed bool
}
// Close marks the queue as closed.
//
// Subsequent attempts to send will panic. Subsequent calls to Recv will
// continue to return the remaining items in the queue.
func (s *state[T]) Close() {
s.mu.Lock()
s.closed = true
s.mu.Unlock()
s.canRecv.Broadcast()
}
// Recv returns an item from the prioritized buffers, blocking if empty.
//
// The returned bool will be true if the queue still has items or is open.
// It will be false if the queue is empty and closed.
func (s *state[T]) Recv() (T, bool) {
s.mu.Lock()
defer s.mu.Unlock()
for {
for !s.closed && !s.high.CanPop() && !s.low.CanPop() {
s.canRecv.Wait()
}
if s.closed && !s.high.CanPop() && !s.low.CanPop() {
var empty T
return empty, false
}
if s.high.CanPop() {
value := s.high.PopFront()
s.canSendHigh.Broadcast()
return value, true
}
if s.low.CanPop() {
value := s.low.PopFront()
s.canSendLow.Broadcast()
return value, true
}
}
}
// Send is an alias for SendLow.
func (s *state[T]) Send(value T) {
s.SendLow(value)
}
// SendHigh adds an item to the high priority buffer, blocking if full.
func (s *state[T]) SendHigh(value T) {
s.send(value, &s.high, s.canSendHigh)
}
// SendLow adds an item to the low priority buffer, blocking if full.
func (s *state[T]) SendLow(value T) {
s.send(value, &s.low, s.canSendLow)
}
// TryRecv attempts to return an item from the prioritized buffers.
//
// This method does not block. If there is an item in a buffer, it returns
// true. If the buffer is empty, it returns false.
func (s *state[T]) TryRecv() (value T, ok bool) {
s.mu.Lock()
defer s.mu.Unlock()
if s.high.CanPop() {
value = s.high.PopFront()
ok = true
s.canSendHigh.Broadcast()
return
}
if s.low.CanPop() {
value = s.low.PopFront()
ok = true
s.canSendLow.Broadcast()
return
}
return
}
// TrySendHigh attempts to add an item to the high priority buffer.
//
// This method does not block. If there is space in the buffer, it returns
// true. If the buffer is full, it returns false.
func (s *state[T]) TrySendHigh(value T) bool {
return s.trySend(value, &s.high)
}
// TrySendLow attempts to add an item to the low priority buffer.
//
// This method does not block. If there is space in the buffer, it returns
// true. If the buffer is full, it returns false.
func (s *state[T]) TrySendLow(value T) bool {
return s.trySend(value, &s.low)
}
func (s *state[T]) send(value T, buf *circ.B[T], cond *sync.Cond) {
s.mu.Lock()
defer s.mu.Unlock()
for {
for !s.closed && !buf.CanPush() {
cond.Wait()
}
if s.closed {
panic("send on closed queue")
}
if buf.CanPush() {
buf.PushBack(value)
s.canRecv.Broadcast()
return
}
}
}
func (s *state[T]) trySend(value T, buf *circ.B[T]) bool {
s.mu.Lock()
defer s.mu.Unlock()
if !buf.CanPush() {
return false
}
buf.PushBack(value)
s.canRecv.Broadcast()
return true
}

75
queue/lib.go Normal file
View File

@ -0,0 +1,75 @@
// Package queue implements a non-concurrent queue.
//
// # Implementation
//
// [Q] is a classic ring buffer. It tracks its head, tail, and length. This
// makes determining whether the queue is full or empty trivial.
package queue
// Q is a non-concurrent queue.
type Q[T any] struct {
buf []T
len int
head int
tail int
}
// Make creates a new queue.
func Make[T any](cap int) Q[T] {
buf := make([]T, cap)
return Q[T]{buf: buf}
}
// Capacity returns the total capacity of the queue.
func (b *Q[T]) Capacity() int {
return cap(b.buf)
}
// Len returns the number of items in the queue.
func (b *Q[T]) Len() int {
return b.len
}
// CanPush returns true if the queue has space for new items.
func (b *Q[T]) CanPush() bool {
return cap(b.buf)-b.len != 0
}
// CanPop returns true if the queue has one or more items.
func (b *Q[T]) CanPop() bool {
return b.len != 0
}
// PopFront returns the front-most item from the queue.
//
// If the queue is empty, it panics.
func (b *Q[T]) PopFront() T {
if !b.CanPop() {
panic("cannot pop from empty queue")
}
item := b.buf[b.head]
// clear queue slot so as not to hold on to garbage
var empty T
b.buf[b.head] = empty
b.len--
b.head++
if b.head == cap(b.buf) {
b.head = 0
}
return item
}
// PushBack adds an item to the end of the queue.
//
// If the queue is full, it panics.
func (b *Q[T]) PushBack(value T) {
if !b.CanPush() {
panic("cannot push back to full queue")
}
b.buf[b.tail] = value
b.len++
b.tail++
if b.tail == cap(b.buf) {
b.tail = 0
}
}

101
queue/lib_test.go Normal file
View File

@ -0,0 +1,101 @@
package queue_test
import (
"math/rand"
"testing"
"gogs.humancabbage.net/sam/priorityq/queue"
)
func TestRepeatPushPop(t *testing.T) {
t.Parallel()
q := queue.Make[int](4)
if q.Capacity() != 4 {
t.Errorf("wrong capacity")
}
for i := 0; i < 50; i++ {
q.PushBack(1)
q.PushBack(2)
q.PushBack(3)
q.PushBack(4)
if q.Len() != 4 {
t.Errorf("wrong length")
}
checkPop := func(n int) {
if v := q.PopFront(); v != n {
t.Errorf("popped %d, expected %d", v, n)
}
}
checkPop(1)
checkPop(2)
checkPop(3)
checkPop(4)
}
}
func TestInterleavedPushPop(t *testing.T) {
t.Parallel()
q := queue.Make[int](4)
checkPop := func(n int) {
if v := q.PopFront(); v != n {
t.Errorf("popped %d, expected %d", v, n)
}
}
q.PushBack(1)
q.PushBack(2)
q.PushBack(3)
checkPop(1)
q.PushBack(4)
q.PushBack(5)
checkPop(2)
}
func TestEmptyPopPanic(t *testing.T) {
defer func() {
if r := recover(); r == nil {
t.Errorf("popping empty buffer did not panic")
}
}()
t.Parallel()
q := queue.Make[int](4)
q.PopFront()
}
func TestFullPushPanic(t *testing.T) {
defer func() {
if r := recover(); r == nil {
t.Errorf("pushing full buffer did not panic")
}
}()
t.Parallel()
q := queue.Make[int](1)
q.PushBack(1)
q.PushBack(2)
}
func BenchmarkPush(b *testing.B) {
q := queue.Make[int](b.N)
rs := rand.NewSource(0)
r := rand.New(rs)
items := make([]int, b.N)
for i := 0; i < b.N; i++ {
items[i] = r.Int()
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
q.PushBack(items[i])
}
}
func BenchmarkPop(b *testing.B) {
q := queue.Make[int](b.N)
rs := rand.NewSource(0)
r := rand.New(rs)
for i := 0; i < b.N; i++ {
q.PushBack(r.Int())
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
q.PopFront()
}
}