diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..33df56b --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +benchresults.txt +coverage diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..9e00c1a --- /dev/null +++ b/Makefile @@ -0,0 +1,11 @@ +.PHONY: test +test: + go test -count=1 -coverprofile=coverage ./... + +.PHONY: bench +bench: + go test -bench=. ./... + +.PHONY: longbench +longbench: + go test -count=30 -bench=. ./... | tee benchresults.txt diff --git a/README.md b/README.md index 453f019..bd40a07 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,52 @@ # priorityq - generic prioritized queues in Go +In Go, the builtin buffered channels provide a concurrent FIFO queue for +passing messages between goroutines. Sometimes, however, it's convenient to be +able to assign priority levels to messages, so that they get delivered to +consumers more promptly. + +The `mq` package in this module implements a concurrent, dual-priority message +queue that guarantees receipt of a high-priority items before low-priority +ones. There is a pattern using two channels and select statements to achieve +similar functionality, but it's not exactly equivalent. (See the +[Background](#background) section for more details.) + +Additionally, the `pq` package implements a concurrent, traditional priority +queue, using a binary max-heap. This is more general than `mq`, because it +allows multiple levels of priority. This, of course, also makes operations +slower. + +## Benchmarks + +Here are some benchmark results from running on a Mac Studio/M1 Ultra. + + pkg: gogs.humancabbage.net/sam/priorityq/mq + Send-20 13.93n ± 0% + SendChan-20 13.19n ± 0% + Recv-20 13.64n ± 1% + RecvChan-20 13.29n ± 1% + ConcSendRecv-20 97.60n ± 1% + ConcSendRecvChan-20 171.8n ± 5% + HighContention-20 128.2n ± 0% + HighContentionChan-20 47.27n ± 0% + + pkg: gogs.humancabbage.net/sam/priorityq/pq + Send-20 18.79n ± 1% + Recv-20 268.1n ± 3% + ConcSendRecv-20 199.2n ± 2% + HighContention-20 440.0n ± 1% + + pkg: gogs.humancabbage.net/sam/priorityq/binheap + Insert-20 11.92n ± 0% + Extract-20 261.6n ± 2% + RepeatedInsertExtract-20 25.68n ± 1% + + pkg: gogs.humancabbage.net/sam/priorityq/circ + Push-20 2.196n ± 1% + Pop-20 2.187n ± 0% + +## Background + This module was inspired by [a reddit post][reddit] wherein /u/zandery23 asked how to implement a prioritized message queue in Go. A fantastic solution was [provided by /u/Ploobers][sol]. That's probably right for 99 out of 100 use @@ -26,16 +73,6 @@ From the [Go Language Specification][go_select]: Thus, it is possible for the second case to be chosen even if the first case is also ready. -The `mq` package in this module implements a concurrent, prioritized message -queue that guarantees receipt of a high-priority items before low-priority -ones. This is primarily a fun exercise, I cannot recommend that anyone -actually use this in a real project. - -Additionally, the `pq` package implements a concurrent priority queue, using a -binary max-heap. This is more general than `mq`, because it allows multiple -levels of priority, instead of just "high" and "low". This, of course, also -makes operations slower. - [reddit]: https://www.reddit.com/r/golang/comments/11drc17/worker_pool_reading_from_two_channels_one_chan/ [sol]: https://www.reddit.com/r/golang/comments/11drc17/worker_pool_reading_from_two_channels_one_chan/jabfvkh/ [go_select]: https://go.dev/ref/spec#Select_statements diff --git a/binheap/lib.go b/binheap/lib.go index 1c93e23..9e8380a 100644 --- a/binheap/lib.go +++ b/binheap/lib.go @@ -1,103 +1,115 @@ // Package binheap implements a binary max-heap. +// +// # Implementation +// +// [H] is parameterized over two types, one for the priority levels, one for +// the elements. Internally, there are two equally-sized buffers for these +// types. Re-heaping operations swap corresponding entries in these buffers +// in lock-step. package binheap import "golang.org/x/exp/constraints" // H is a binary max-heap. // -// `I` is the type of the priority IDs, and `E` the type of the elements. -type H[I constraints.Ordered, E any] struct { - heap []I - elems []E - len int +// `P` is the type of the priority levels, and `E` the type of the elements. +type H[P constraints.Ordered, E any] struct { + prs []P + els []E + len int } // Make creates a new heap. -func Make[I constraints.Ordered, E any](cap int) H[I, E] { - heap := make([]I, cap) - elems := make([]E, cap) - h := H[I, E]{heap: heap, elems: elems} +func Make[P constraints.Ordered, E any](cap int) H[P, E] { + priorities := make([]P, cap) + elements := make([]E, cap) + h := H[P, E]{prs: priorities, els: elements} return h } // Capacity returns the total capacity of the heap. -func (h *H[I, E]) Capacity() int { - return cap(h.heap) +func (h *H[P, E]) Capacity() int { + return cap(h.prs) } // Len returns the number of items in the heap. -func (h *H[I, E]) Len() int { +func (h *H[P, E]) Len() int { return h.len } // CanExtract returns true if the heap has any item, otherwise false. -func (h *H[I, E]) CanExtract() bool { +func (h *H[P, E]) CanExtract() bool { return h.len != 0 } // CanInsert returns true if the heap has unused capacity, otherwise false. -func (h *H[I, E]) CanInsert() bool { - return cap(h.heap)-h.len != 0 +func (h *H[P, E]) CanInsert() bool { + return cap(h.prs)-h.len != 0 } // Extract returns the current heap root, then performs a heap-down pass. // // If the heap is empty, it panics. -func (h *H[I, E]) Extract() (I, E) { +func (h *H[P, E]) Extract() (P, E) { if !h.CanExtract() { panic("heap is empty") } - - id := h.heap[0] - elem := h.elems[0] - var emptyId I + // extract root + priority := h.prs[0] + element := h.els[0] + // move last entry to root position + h.prs[0] = h.prs[h.len-1] + h.els[0] = h.els[h.len-1] + // clear the former last entry position, + // so as not to hold onto garbage + var emptyPriority P var emptyElem E - h.heap[0] = h.heap[h.len-1] - h.elems[0] = h.elems[h.len-1] - h.heap[h.len-1] = emptyId - h.elems[h.len-1] = emptyElem + h.prs[h.len-1] = emptyPriority + h.els[h.len-1] = emptyElem + // heap-down h.len-- idx := 0 for { - left := idx*2 + 1 - right := idx*2 + 2 + left := idx<<1 + 1 + right := idx<<1 + 2 largest := idx - if left < h.len && h.heap[left] > h.heap[largest] { + if left < h.len && h.prs[left] > h.prs[largest] { largest = left } - if right < h.len && h.heap[right] > h.heap[largest] { + if right < h.len && h.prs[right] > h.prs[largest] { largest = right } if largest == idx { break } - h.heap[idx], h.heap[largest] = h.heap[largest], h.heap[idx] - h.elems[idx], h.elems[largest] = h.elems[largest], h.elems[idx] + h.prs[idx], h.prs[largest] = h.prs[largest], h.prs[idx] + h.els[idx], h.els[largest] = h.els[largest], h.els[idx] idx = largest } - return id, elem + return priority, element } // Insert adds an item to the heap, then performs a heap-up pass. // // If the heap is full, it panics. -func (h *H[I, E]) Insert(id I, elem E) { +func (h *H[P, E]) Insert(priority P, elem E) { if !h.CanInsert() { panic("heap is full") } - + // insert new item into last position idx := h.len - h.heap[idx] = id - h.elems[idx] = elem + h.prs[idx] = priority + h.els[idx] = elem + // heap-up h.len++ for { - parent := (idx - 1) / 2 - if parent == idx || h.heap[parent] >= h.heap[idx] { + parent := (idx - 1) >> 1 + if parent < 0 || h.prs[parent] >= h.prs[idx] { break } - h.heap[parent], h.heap[idx] = h.heap[idx], h.heap[parent] - h.elems[parent], h.elems[idx] = h.elems[idx], h.elems[parent] + h.prs[parent], h.prs[idx] = h.prs[idx], h.prs[parent] + h.els[parent], h.els[idx] = h.els[idx], h.els[parent] idx = parent } } diff --git a/binheap/lib_test.go b/binheap/lib_test.go index f4d1115..89b4ae0 100644 --- a/binheap/lib_test.go +++ b/binheap/lib_test.go @@ -82,3 +82,50 @@ func TestRandomized(t *testing.T) { } } } + +func BenchmarkInsert(b *testing.B) { + h := binheap.Make[int, int](b.N) + rs := rand.NewSource(0) + r := rand.New(rs) + items := make([]int, b.N) + for i := 0; i < b.N; i++ { + items[i] = r.Int() + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + h.Insert(items[i], items[i]) + } +} + +func BenchmarkExtract(b *testing.B) { + h := binheap.Make[int, int](b.N) + rs := rand.NewSource(0) + r := rand.New(rs) + for i := 0; i < b.N; i++ { + n := r.Int() + h.Insert(n, n) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + h.Extract() + } +} + +func BenchmarkRepeatedInsertExtract(b *testing.B) { + h := binheap.Make[int, int](128) + rs := rand.NewSource(0) + r := rand.New(rs) + items := make([]int, b.N) + for i := 0; i < h.Capacity()-1; i++ { + n := r.Int() + h.Insert(n, n) + } + for i := 0; i < b.N; i++ { + items[i] = r.Int() + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + h.Insert(items[i], items[i]) + h.Extract() + } +} diff --git a/circ/lib.go b/circ/lib.go index 0172089..e97a5ea 100644 --- a/circ/lib.go +++ b/circ/lib.go @@ -9,12 +9,22 @@ type B[T any] struct { tail int } -// Make creates a new circular buffer. +// Make creates a new buffer. func Make[T any](cap int) B[T] { buf := make([]T, cap) return B[T]{buf: buf} } +// Capacity returns the total capacity of the buffer. +func (b *B[T]) Capacity() int { + return cap(b.buf) +} + +// Len returns the number of items in the buffer. +func (b *B[T]) Len() int { + return b.len +} + // CanPush returns true if the buffer has space for new items. func (b *B[T]) CanPush() bool { return cap(b.buf)-b.len != 0 @@ -32,9 +42,9 @@ func (b *B[T]) PopFront() T { if !b.CanPop() { panic("cannot pop from empty buffer") } - var empty T item := b.buf[b.head] - // clear buffer slot so that we don't hold on to garbage + // clear buffer slot so as not to hold on to garbage + var empty T b.buf[b.head] = empty b.len-- b.head++ diff --git a/circ/lib_test.go b/circ/lib_test.go index 624dd20..fa1625b 100644 --- a/circ/lib_test.go +++ b/circ/lib_test.go @@ -1,6 +1,7 @@ package circ_test import ( + "math/rand" "testing" "gogs.humancabbage.net/sam/priorityq/circ" @@ -9,11 +10,17 @@ import ( func TestRepeatPushPop(t *testing.T) { t.Parallel() cb := circ.Make[int](4) + if cb.Capacity() != 4 { + t.Errorf("wrong capacity") + } for i := 0; i < 50; i++ { cb.PushBack(1) cb.PushBack(2) cb.PushBack(3) cb.PushBack(4) + if cb.Len() != 4 { + t.Errorf("wrong length") + } checkPop := func(n int) { if v := cb.PopFront(); v != n { t.Errorf("popped %d, expected %d", v, n) @@ -65,3 +72,30 @@ func TestFullPushPanic(t *testing.T) { cb.PushBack(1) cb.PushBack(2) } + +func BenchmarkPush(b *testing.B) { + cb := circ.Make[int](b.N) + rs := rand.NewSource(0) + r := rand.New(rs) + items := make([]int, b.N) + for i := 0; i < b.N; i++ { + items[i] = r.Int() + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + cb.PushBack(items[i]) + } +} + +func BenchmarkPop(b *testing.B) { + cb := circ.Make[int](b.N) + rs := rand.NewSource(0) + r := rand.New(rs) + for i := 0; i < b.N; i++ { + cb.PushBack(r.Int()) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + cb.PopFront() + } +} diff --git a/lib.go b/lib.go index 1a998c2..5fb8687 100644 --- a/lib.go +++ b/lib.go @@ -20,3 +20,22 @@ // // [generics]: https://go.dev/blog/intro-generics package priorityq + +import ( + "fmt" +) + +// ErrEmpty means that an operation failed because the queue was empty. +var ErrEmpty error + +// ErrFull means that an operation failed because the queue was full. +var ErrFull error + +// ErrClosed means that an operation failed because the queue was closed. +var ErrClosed error + +func init() { + ErrEmpty = fmt.Errorf("queue is empty") + ErrFull = fmt.Errorf("queue is full") + ErrClosed = fmt.Errorf("queue is closed") +} diff --git a/mq/lib.go b/mq/lib.go index a843167..b1cc221 100644 --- a/mq/lib.go +++ b/mq/lib.go @@ -12,7 +12,7 @@ // word1, _ := mq.Recv() // word2, _ := mq.Recv() // fmt.Println(word1, word2) -// pq.Close() +// q.Close() // // Output: hello world // // # Implementation @@ -30,6 +30,7 @@ package mq import ( "sync" + "gogs.humancabbage.net/sam/priorityq" "gogs.humancabbage.net/sam/priorityq/circ" ) @@ -46,9 +47,9 @@ func Make[T any](cap int) Q[T] { high: high, low: low, } - s.canRecv = sync.NewCond(&s.mu) - s.canSendHigh = sync.NewCond(&s.mu) - s.canSendLow = sync.NewCond(&s.mu) + s.canRecv = sync.Cond{L: &s.mu} + s.canSendHigh = sync.Cond{L: &s.mu} + s.canSendLow = sync.Cond{L: &s.mu} return Q[T]{s} } @@ -56,9 +57,9 @@ type state[T any] struct { mu sync.Mutex high circ.B[T] low circ.B[T] - canSendHigh *sync.Cond - canSendLow *sync.Cond - canRecv *sync.Cond + canSendHigh sync.Cond + canSendLow sync.Cond + canRecv sync.Cond closed bool } @@ -110,51 +111,55 @@ func (s *state[T]) Send(value T) { // SendHigh adds an item with high priority, blocking if full. func (s *state[T]) SendHigh(value T) { - s.send(value, &s.high, s.canSendHigh) + s.send(value, &s.high, &s.canSendHigh) } // SendLow adds an item with low buffer, blocking if full. func (s *state[T]) SendLow(value T) { - s.send(value, &s.low, s.canSendLow) + s.send(value, &s.low, &s.canSendLow) } // TryRecv attempts to get an item from the queue, without blocking. // -// If the attempt succeeds, the returned bool is true. Otherwise, it is false. -func (s *state[T]) TryRecv() (value T, ok bool) { +// The error indicates whether the attempt succeeded, the queue is empty, or +// the queue is closed. +func (s *state[T]) TryRecv() (value T, err error) { s.mu.Lock() defer s.mu.Unlock() if s.high.CanPop() { value = s.high.PopFront() - ok = true s.canSendHigh.Broadcast() return } if s.low.CanPop() { value = s.low.PopFront() - ok = true s.canSendLow.Broadcast() return } + if s.closed { + err = priorityq.ErrClosed + } else { + err = priorityq.ErrEmpty + } return } // TrySend is an alias for TrySendLow. -func (s *state[T]) TrySend(value T) bool { +func (s *state[T]) TrySend(value T) error { return s.trySend(value, &s.low) } // TrySendHigh attempts to add an item with high priority, without blocking. // -// If the attempt succeeds, the returned bool is true. Otherwise, it is false. -func (s *state[T]) TrySendHigh(value T) bool { +// Returns an error from the root priorityq package, or nil if successful. +func (s *state[T]) TrySendHigh(value T) error { return s.trySend(value, &s.high) } // TrySendLow attempts to add an item with low priority, without blocking. // -// If the attempt succeeds, the returned bool is true. Otherwise, it is false. -func (s *state[T]) TrySendLow(value T) bool { +// Returns an error from the root priorityq package, or nil if successful. +func (s *state[T]) TrySendLow(value T) error { return s.trySend(value, &s.low) } @@ -176,13 +181,16 @@ func (s *state[T]) send(value T, buf *circ.B[T], cond *sync.Cond) { } } -func (s *state[T]) trySend(value T, buf *circ.B[T]) bool { +func (s *state[T]) trySend(value T, buf *circ.B[T]) error { s.mu.Lock() defer s.mu.Unlock() + if s.closed { + return priorityq.ErrClosed + } if !buf.CanPush() { - return false + return priorityq.ErrFull } buf.PushBack(value) s.canRecv.Broadcast() - return true + return nil } diff --git a/mq/lib_test.go b/mq/lib_test.go index 80d4ce5..5b3eb0a 100644 --- a/mq/lib_test.go +++ b/mq/lib_test.go @@ -6,6 +6,7 @@ import ( "sync" "testing" + "gogs.humancabbage.net/sam/priorityq" "gogs.humancabbage.net/sam/priorityq/mq" ) @@ -77,15 +78,15 @@ func TestDoubleClose(t *testing.T) { func TestTrySendRecv(t *testing.T) { t.Parallel() q := mq.Make[int](4) - assumeSendOk := func(n int, f func(int) bool) { - ok := f(n) - if !ok { + assumeSendOk := func(n int, f func(int) error) { + err := f(n) + if err != nil { t.Errorf("expected to be able to send") } } assumeRecvOk := func(expected int) { - actual, ok := q.TryRecv() - if !ok { + actual, err := q.TryRecv() + if err != nil { t.Errorf("expected to be able to receive") } if actual != expected { @@ -94,10 +95,10 @@ func TestTrySendRecv(t *testing.T) { } assumeSendOk(1, q.TrySendLow) assumeSendOk(2, q.TrySendLow) - assumeSendOk(3, q.TrySendLow) + assumeSendOk(3, q.TrySend) assumeSendOk(4, q.TrySendLow) - ok := q.TrySendLow(5) - if ok { + err := q.TrySendLow(5) + if err == nil { t.Errorf("expected low buffer to be full") } assumeRecvOk(1) @@ -109,8 +110,8 @@ func TestTrySendRecv(t *testing.T) { assumeSendOk(6, q.TrySendHigh) assumeSendOk(7, q.TrySendHigh) assumeSendOk(8, q.TrySendHigh) - ok = q.TrySendHigh(5) - if ok { + err = q.TrySendHigh(5) + if err == nil { t.Errorf("expected high buffer to be full") } assumeRecvOk(5) @@ -118,10 +119,19 @@ func TestTrySendRecv(t *testing.T) { assumeRecvOk(7) assumeRecvOk(8) - _, ok = q.TryRecv() - if ok { + _, err = q.TryRecv() + if err != priorityq.ErrEmpty { t.Errorf("expected queue to be empty") } + q.Close() + _, err = q.TryRecv() + if err != priorityq.ErrClosed { + t.Errorf("expected queue to be closed ") + } + err = q.TrySend(5) + if err != priorityq.ErrClosed { + t.Errorf("expected queue to be closed ") + } } func TestConcProducerConsumer(t *testing.T) { diff --git a/pq/lib.go b/pq/lib.go index 50922fe..d69dcf5 100644 --- a/pq/lib.go +++ b/pq/lib.go @@ -12,7 +12,7 @@ // _, word1, _ := pq.Recv() // _, word2, _ := pq.Recv() // fmt.Println(word1, word2) -// pq.Close() +// q.Close() // // Output: hello world // // # Implementation @@ -26,6 +26,7 @@ package pq import ( "sync" + "gogs.humancabbage.net/sam/priorityq" "gogs.humancabbage.net/sam/priorityq/binheap" "golang.org/x/exp/constraints" ) @@ -41,16 +42,16 @@ func Make[P constraints.Ordered, T any](cap int) Q[P, T] { s := &state[P, T]{ heap: heap, } - s.canRecv = sync.NewCond(&s.mu) - s.canSend = sync.NewCond(&s.mu) + s.canRecv = sync.Cond{L: &s.mu} + s.canSend = sync.Cond{L: &s.mu} return Q[P, T]{s} } type state[P constraints.Ordered, T any] struct { mu sync.Mutex heap binheap.H[P, T] - canSend *sync.Cond - canRecv *sync.Cond + canSend sync.Cond + canRecv sync.Cond closed bool } @@ -116,16 +117,21 @@ func (s *state[P, T]) Send(priority P, value T) { // // This returns both the item itself and the its assigned priority. // -// If the attempt succeeds, the returned bool is true. Otherwise, it is false. -func (s *state[P, T]) TryRecv() (priority P, value T, ok bool) { +// The error indicates whether the attempt succeeded, the queue is empty, or +// the queue is closed. +func (s *state[P, T]) TryRecv() (priority P, value T, err error) { s.mu.Lock() defer s.mu.Unlock() if s.heap.CanExtract() { priority, value = s.heap.Extract() - ok = true s.canSend.Broadcast() return } + if s.closed { + err = priorityq.ErrClosed + } else { + err = priorityq.ErrEmpty + } return } @@ -133,13 +139,16 @@ func (s *state[P, T]) TryRecv() (priority P, value T, ok bool) { // // This method does not block. If there is space in the buffer, it returns // true. If the buffer is full, it returns false. -func (s *state[P, T]) TrySend(priority P, value T) bool { +func (s *state[P, T]) TrySend(priority P, value T) error { s.mu.Lock() defer s.mu.Unlock() + if s.closed { + return priorityq.ErrClosed + } if !s.heap.CanInsert() { - return false + return priorityq.ErrFull } s.heap.Insert(priority, value) s.canRecv.Broadcast() - return true + return nil } diff --git a/pq/lib_test.go b/pq/lib_test.go index e91323d..6dc1dc9 100644 --- a/pq/lib_test.go +++ b/pq/lib_test.go @@ -6,6 +6,7 @@ import ( "sync" "testing" + "gogs.humancabbage.net/sam/priorityq" "gogs.humancabbage.net/sam/priorityq/pq" ) @@ -78,14 +79,14 @@ func TestTrySendRecv(t *testing.T) { t.Parallel() q := pq.Make[int, int](4) assumeSendOk := func(n int) { - ok := q.TrySend(n, n) - if !ok { + err := q.TrySend(n, n) + if err != nil { t.Errorf("expected to be able to send") } } assumeRecvOk := func(expected int) { - _, actual, ok := q.TryRecv() - if !ok { + _, actual, err := q.TryRecv() + if err != nil { t.Errorf("expected to be able to receive") } if actual != expected { @@ -96,8 +97,8 @@ func TestTrySendRecv(t *testing.T) { assumeSendOk(2) assumeSendOk(3) assumeSendOk(4) - ok := q.TrySend(5, 5) - if ok { + err := q.TrySend(5, 5) + if err == nil { t.Errorf("expected queue to be full") } assumeRecvOk(4) @@ -105,10 +106,19 @@ func TestTrySendRecv(t *testing.T) { assumeRecvOk(2) assumeRecvOk(1) - _, _, ok = q.TryRecv() - if ok { + _, _, err = q.TryRecv() + if err != priorityq.ErrEmpty { t.Errorf("expected queue to be empty") } + q.Close() + _, _, err = q.TryRecv() + if err != priorityq.ErrClosed { + t.Errorf("expected queue to be closed ") + } + err = q.TrySend(1, 1) + if err != priorityq.ErrClosed { + t.Errorf("expected queue to be closed ") + } } func TestConcProducerConsumer(t *testing.T) {