From 4052afa31dc00c316e115d1b2b0585a1f2f418ef Mon Sep 17 00:00:00 2001 From: Sam Fredrickson Date: Tue, 28 Feb 2023 20:33:22 -0800 Subject: [PATCH] Initial commit. --- LICENSE | 19 ++++ README.md | 36 +++++++ circ/lib.go | 59 ++++++++++++ circ/lib_test.go | 67 +++++++++++++ go.mod | 3 + go.sum | 0 precise/lib.go | 164 +++++++++++++++++++++++++++++++ precise/lib_test.go | 228 ++++++++++++++++++++++++++++++++++++++++++++ 8 files changed, 576 insertions(+) create mode 100644 LICENSE create mode 100644 README.md create mode 100644 circ/lib.go create mode 100644 circ/lib_test.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 precise/lib.go create mode 100644 precise/lib_test.go diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..f31272a --- /dev/null +++ b/LICENSE @@ -0,0 +1,19 @@ +Copyright 2023 Samuel Fredrickson + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the “Software”), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..f91a9e9 --- /dev/null +++ b/README.md @@ -0,0 +1,36 @@ +# priorityq - generic priority queue in Go + +This module was inspired by [a reddit post][reddit] wherein /u/zandery23 asked +how to implement a priority queue in Go. A fantastic solution was [provided by +/u/Ploobers][sol]. That's probably right for 99 out of 100 use cases, but it's +not completely precise. + +Particularly, the second select block does not guarantee that an item from the +priority queue will be taken if there is also an item in the regular queue. + +```go +select { +case job := <-mq.priorityQueue: + // ... +case job := <-mq.regularQueue: + // ... +// ... +} +``` + +From the [Go Language Specification][go_select]: + +> If one or more of the communications can proceed, a single one that can +> proceed is chosen via a uniform pseudo-random selection. + +Thus, it is possible for the second case to be chosen even if the first case is +also ready. + +The `precise` package in this module implements a concurrent priority queue that +guarantees receipt of a high-priority items before low-priority ones. This is +primarily a fun exercise, I cannot recommend that anyone actually use this in a +real project. + +[reddit]: https://www.reddit.com/r/golang/comments/11drc17/worker_pool_reading_from_two_channels_one_chan/ +[sol]: https://www.reddit.com/r/golang/comments/11drc17/worker_pool_reading_from_two_channels_one_chan/jabfvkh/ +[go_select]: https://go.dev/ref/spec#Select_statements diff --git a/circ/lib.go b/circ/lib.go new file mode 100644 index 0000000..b14ccaf --- /dev/null +++ b/circ/lib.go @@ -0,0 +1,59 @@ +package circ + +// B is a generic, non-concurrent circular FIFO buffer. +type B[T any] struct { + buf []T + len int + head int + tail int +} + +// Make creates a new circular buffer. +func Make[T any](cap int) B[T] { + buf := make([]T, cap) + return B[T]{buf: buf} +} + +// CanPush returns true if the buffer has space for new items. +func (b *B[T]) CanPush() bool { + return cap(b.buf)-b.len != 0 +} + +// CanPop returns true if the buffer has one or more items. +func (b *B[T]) CanPop() bool { + return b.len != 0 +} + +// PopFront returns the front-most item from the buffer. +// +// If the buffer is empty, it panics. +func (b *B[T]) PopFront() T { + if !b.CanPop() { + panic("cannot pop from empty buffer") + } + var empty T + item := b.buf[b.head] + // clear buffer slot so that we don't hold on to garbage + b.buf[b.head] = empty + b.len-- + b.head++ + if b.head == cap(b.buf) { + b.head = 0 + } + return item +} + +// PushBack adds an item to the end of the buffer. +// +// If the buffer is full, it panics. +func (b *B[T]) PushBack(value T) { + if !b.CanPush() { + panic("cannot push back to full buffer") + } + b.buf[b.tail] = value + b.len++ + b.tail++ + if b.tail == cap(b.buf) { + b.tail = 0 + } +} diff --git a/circ/lib_test.go b/circ/lib_test.go new file mode 100644 index 0000000..624dd20 --- /dev/null +++ b/circ/lib_test.go @@ -0,0 +1,67 @@ +package circ_test + +import ( + "testing" + + "gogs.humancabbage.net/sam/priorityq/circ" +) + +func TestRepeatPushPop(t *testing.T) { + t.Parallel() + cb := circ.Make[int](4) + for i := 0; i < 50; i++ { + cb.PushBack(1) + cb.PushBack(2) + cb.PushBack(3) + cb.PushBack(4) + checkPop := func(n int) { + if v := cb.PopFront(); v != n { + t.Errorf("popped %d, expected %d", v, n) + } + } + checkPop(1) + checkPop(2) + checkPop(3) + checkPop(4) + } +} + +func TestInterleavedPushPop(t *testing.T) { + t.Parallel() + cb := circ.Make[int](4) + checkPop := func(n int) { + if v := cb.PopFront(); v != n { + t.Errorf("popped %d, expected %d", v, n) + } + } + cb.PushBack(1) + cb.PushBack(2) + cb.PushBack(3) + checkPop(1) + cb.PushBack(4) + cb.PushBack(5) + checkPop(2) +} + +func TestEmptyPopPanic(t *testing.T) { + defer func() { + if r := recover(); r == nil { + t.Errorf("popping empty buffer did not panic") + } + }() + t.Parallel() + cb := circ.Make[int](4) + cb.PopFront() +} + +func TestFullPushPanic(t *testing.T) { + defer func() { + if r := recover(); r == nil { + t.Errorf("pushing full buffer did not panic") + } + }() + t.Parallel() + cb := circ.Make[int](1) + cb.PushBack(1) + cb.PushBack(2) +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..3b1a761 --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module gogs.humancabbage.net/sam/priorityq + +go 1.20 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..e69de29 diff --git a/precise/lib.go b/precise/lib.go new file mode 100644 index 0000000..171f1fb --- /dev/null +++ b/precise/lib.go @@ -0,0 +1,164 @@ +package precise + +import ( + "sync" + + "gogs.humancabbage.net/sam/priorityq/circ" +) + +// Q is a precise, concurrent priority queue. +// +// Each queue has two internal buffers, high and low. This implementation +// guarantees that when there are items in both buffers, consumers receive +// ones from the high priority buffer first. +// +// Each buffer has the same capacity, set on initial construction. Sending to +// a buffer will block if it is full, even if the other buffer has space. +type Q[T any] struct { + *state[T] +} + +// Make a new priority queue. +func Make[T any](cap int) Q[T] { + high := circ.Make[T](cap) + low := circ.Make[T](cap) + s := &state[T]{ + high: high, + low: low, + } + s.canRecv = sync.NewCond(&s.mu) + s.canSendHigh = sync.NewCond(&s.mu) + s.canSendLow = sync.NewCond(&s.mu) + return Q[T]{s} +} + +type state[T any] struct { + mu sync.Mutex + high circ.B[T] + low circ.B[T] + canSendHigh *sync.Cond + canSendLow *sync.Cond + canRecv *sync.Cond + closed bool +} + +// Close marks the queue as closed. +// +// Subsequent attempts to send will panic. Subsequent calls to Recv will +// continue to return the remaining items in the queue. +func (s *state[T]) Close() { + s.mu.Lock() + s.closed = true + s.mu.Unlock() + s.canRecv.Broadcast() +} + +// Recv returns an item from the prioritized buffers, blocking if empty. +// +// The returned bool will be true if the queue still has items or is open. +// It will be false if the queue is empty and closed. +func (s *state[T]) Recv() (T, bool) { + s.mu.Lock() + defer s.mu.Unlock() + for { + for !s.closed && !s.high.CanPop() && !s.low.CanPop() { + s.canRecv.Wait() + } + if s.closed && !s.high.CanPop() && !s.low.CanPop() { + var empty T + return empty, false + } + if s.high.CanPop() { + value := s.high.PopFront() + s.canSendHigh.Broadcast() + return value, true + } + if s.low.CanPop() { + value := s.low.PopFront() + s.canSendLow.Broadcast() + return value, true + } + } +} + +// Send is an alias for SendLow. +func (s *state[T]) Send(value T) { + s.SendLow(value) +} + +// SendHigh adds an item to the high priority buffer, blocking if full. +func (s *state[T]) SendHigh(value T) { + s.send(value, &s.high, s.canSendHigh) +} + +// SendLow adds an item to the low priority buffer, blocking if full. +func (s *state[T]) SendLow(value T) { + s.send(value, &s.low, s.canSendLow) +} + +// TryRecv attempts to return an item from the prioritized buffers. +// +// This method does not block. If there is an item in a buffer, it returns +// true. If the buffer is empty, it returns false. +func (s *state[T]) TryRecv() (value T, ok bool) { + s.mu.Lock() + defer s.mu.Unlock() + if s.high.CanPop() { + value = s.high.PopFront() + ok = true + s.canSendHigh.Broadcast() + return + } + if s.low.CanPop() { + value = s.low.PopFront() + ok = true + s.canSendLow.Broadcast() + return + } + return +} + +// TrySendHigh attempts to add an item to the high priority buffer. +// +// This method does not block. If there is space in the buffer, it returns +// true. If the buffer is full, it returns false. +func (s *state[T]) TrySendHigh(value T) bool { + return s.trySend(value, &s.high) +} + +// TrySendLow attempts to add an item to the low priority buffer. +// +// This method does not block. If there is space in the buffer, it returns +// true. If the buffer is full, it returns false. +func (s *state[T]) TrySendLow(value T) bool { + return s.trySend(value, &s.low) +} + +func (s *state[T]) send(value T, buf *circ.B[T], cond *sync.Cond) { + s.mu.Lock() + defer s.mu.Unlock() + for { + if s.closed { + panic("send on closed queue") + } + for !buf.CanPush() { + cond.Wait() + } + if buf.CanPush() { + buf.PushBack(value) + s.canRecv.Broadcast() + return + } + } +} + +func (s *state[T]) trySend(value T, buf *circ.B[T]) bool { + s.mu.Lock() + defer s.mu.Unlock() + if !buf.CanPush() { + return false + } + buf.PushBack(value) + s.canRecv.Broadcast() + return true +} diff --git a/precise/lib_test.go b/precise/lib_test.go new file mode 100644 index 0000000..53c449b --- /dev/null +++ b/precise/lib_test.go @@ -0,0 +1,228 @@ +package precise_test + +import ( + "math/rand" + "sync" + "testing" + + "gogs.humancabbage.net/sam/priorityq/precise" +) + +func TestRecvHighFirst(t *testing.T) { + t.Parallel() + q := precise.Make[int](4) + q.Send(1) + q.Send(2) + q.Send(3) + q.Send(4) + q.SendHigh(5) + q.SendHigh(6) + q.SendHigh(7) + q.SendHigh(8) + checkRecv := func(n int) { + if v, _ := q.Recv(); v != n { + t.Errorf("popped %d, expected %d", v, n) + } + } + checkRecv(5) + checkRecv(6) + checkRecv(7) + checkRecv(8) + checkRecv(1) + checkRecv(2) + checkRecv(3) + checkRecv(4) +} + +func TestSendClosedPanic(t *testing.T) { + t.Parallel() + defer func() { + if r := recover(); r == nil { + t.Errorf("sending to closed queue did not panic") + } + }() + q := precise.Make[int](4) + q.Close() + q.Send(1) +} + +func TestRecvClosed(t *testing.T) { + t.Parallel() + q := precise.Make[int](4) + q.Send(1) + q.Close() + _, ok := q.Recv() + if !ok { + t.Errorf("queue should have item to receive") + } + _, ok = q.Recv() + if ok { + t.Errorf("queue should be closed") + } +} + +func TestTrySendRecv(t *testing.T) { + t.Parallel() + q := precise.Make[int](4) + assumeSendOk := func(n int, f func(int) bool) { + ok := f(n) + if !ok { + t.Errorf("expected to be able to send") + } + } + assumeRecvOk := func(expected int) { + actual, ok := q.TryRecv() + if !ok { + t.Errorf("expected to be able to receive") + } + if actual != expected { + t.Errorf("expected %d, got %d", expected, actual) + } + } + assumeSendOk(1, q.TrySendLow) + assumeSendOk(2, q.TrySendLow) + assumeSendOk(3, q.TrySendLow) + assumeSendOk(4, q.TrySendLow) + ok := q.TrySendLow(5) + if ok { + t.Errorf("expected low buffer to be full") + } + assumeRecvOk(1) + assumeRecvOk(2) + assumeRecvOk(3) + assumeRecvOk(4) + + assumeSendOk(5, q.TrySendHigh) + assumeSendOk(6, q.TrySendHigh) + assumeSendOk(7, q.TrySendHigh) + assumeSendOk(8, q.TrySendHigh) + ok = q.TrySendHigh(5) + if ok { + t.Errorf("expected high buffer to be full") + } + assumeRecvOk(5) + assumeRecvOk(6) + assumeRecvOk(7) + assumeRecvOk(8) + + _, ok = q.TryRecv() + if ok { + t.Errorf("expected queue to be empty") + } +} + +func TestConcProducerConsumer(t *testing.T) { + t.Parallel() + q := precise.Make[int](4) + var wg sync.WaitGroup + produceDone := make(chan struct{}) + wg.Add(2) + go func() { + for i := 0; i < 10000; i++ { + if rand.Intn(2) == 0 { + q.Send(i) + } else { + q.SendHigh(i) + } + } + close(produceDone) + wg.Done() + }() + go func() { + ok := true + for ok { + _, ok = q.Recv() + } + wg.Done() + }() + <-produceDone + t.Logf("producer done, closing channel") + q.Close() + wg.Wait() +} + +func BenchmarkSend(b *testing.B) { + q := precise.Make[int](b.N) + b.ResetTimer() + for i := 0; i < b.N; i++ { + q.Send(i) + } +} + +func BenchmarkSendChan(b *testing.B) { + c := make(chan int, b.N) + b.ResetTimer() + for i := 0; i < b.N; i++ { + c <- i + } +} + +func BenchmarkRecv(b *testing.B) { + q := precise.Make[int](b.N) + for i := 0; i < b.N; i++ { + q.Send(i) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + q.Recv() + } +} + +func BenchmarkRecvChan(b *testing.B) { + c := make(chan int, b.N) + for i := 0; i < b.N; i++ { + c <- i + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + <-c + } +} + +func BenchmarkConcSendRecv(b *testing.B) { + q := precise.Make[int](b.N) + var wg sync.WaitGroup + wg.Add(2) + start := make(chan struct{}) + go func() { + <-start + for i := 0; i < b.N; i++ { + q.Send(i) + } + wg.Done() + }() + go func() { + <-start + for i := 0; i < b.N; i++ { + q.Recv() + } + wg.Done() + }() + b.ResetTimer() + close(start) + wg.Wait() +} + +func BenchmarkConcSendRecvChan(b *testing.B) { + c := make(chan int, b.N) + var wg sync.WaitGroup + wg.Add(2) + start := make(chan struct{}) + go func() { + <-start + for i := 0; i < b.N; i++ { + c <- i + } + wg.Done() + }() + go func() { + <-start + for i := 0; i < b.N; i++ { + <-c + } + wg.Done() + }() + b.ResetTimer() + close(start) + wg.Wait() +}