diff --git a/README.md b/README.md index bd40a07..065099a 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,10 @@ queue, using a binary max-heap. This is more general than `mq`, because it allows multiple levels of priority. This, of course, also makes operations slower. +Lastly, the `npq` package implements a concurrent, n-priority message queue. +It's similar to `mq`, except that it an arbitrary fixed number of priority +levels. It can have better performance than `pq` for several hundred levels. + ## Benchmarks Here are some benchmark results from running on a Mac Studio/M1 Ultra. @@ -30,6 +34,12 @@ Here are some benchmark results from running on a Mac Studio/M1 Ultra. HighContention-20 128.2n ± 0% HighContentionChan-20 47.27n ± 0% + pkg: gogs.humancabbage.net/sam/priorityq/npq + Send-20 13.56n ± 0% + Recv-20 13.51n ± 0% + ConcSendRecv-20 176.3n ± 8% + HighContention-20 121.0n ± 0% + pkg: gogs.humancabbage.net/sam/priorityq/pq Send-20 18.79n ± 1% Recv-20 268.1n ± 3% diff --git a/binheap/lib.go b/binheap/lib.go index 9e8380a..d5c903a 100644 --- a/binheap/lib.go +++ b/binheap/lib.go @@ -1,4 +1,4 @@ -// Package binheap implements a binary max-heap. +// Package binheap implements a non-concurrent binary max-heap. // // # Implementation // diff --git a/mq/lib.go b/mq/lib.go index 2fbbca6..4686a21 100644 --- a/mq/lib.go +++ b/mq/lib.go @@ -7,8 +7,8 @@ // For example: // // q := mq.Make[string](8) -// mq.SendLow("world") -// mq.SendHigh("hello") +// q.SendLow("world") +// q.SendHigh("hello") // word1, _ := mq.Recv() // word2, _ := mq.Recv() // fmt.Println(word1, word2) @@ -17,23 +17,27 @@ // // # Implementation // -// Each queue has two circular buffers, one for each priority level. -// Currently, the capacities for these are fixed and equal. If one buffer is -// full, attempts to send further items with its priority level will block +// Each [Q] has two circular buffers, one for each priority level. Currently, +// the capacities for these are fixed and equal. If one buffer is full, +// attempts to send further items with its priority level will block // ([Q.Send]) or fail ([Q.TrySend]). // -// Compared the pq package, the limitation on priority levels increases -// performance, as its circular buffers are much less expensive than the heap -// operations of a traditional priority queue. +// Compared [pq.Q], the limitation on priority levels increases performance, +// as its circular buffers are much less expensive than the heap operations of +// a traditional priority queue. package mq import ( "sync" "gogs.humancabbage.net/sam/priorityq" + "gogs.humancabbage.net/sam/priorityq/pq" "gogs.humancabbage.net/sam/priorityq/queue" ) +// so that godoc (kinda) works +var _ *pq.Q[int, int] + // Q is a concurrent, dual-priority message queue. type Q[T any] struct { *state[T] diff --git a/npq/lib.go b/npq/lib.go new file mode 100644 index 0000000..6496397 --- /dev/null +++ b/npq/lib.go @@ -0,0 +1,213 @@ +// Package npq implements a concurrent, n-priority message queue. +// +// [Q] is similar to a buffered channel, except that senders can assign one of +// some fixed number priority levels to each item. Receivers will always get +// the item with the highest priority. +// +// For example: +// +// q := npq.Make[int, string](8, 1, 2) +// q.Send(1, "world") +// q.Send(2, "hello") +// word1, _ := q.Recv() +// word2, _ := q.Recv() +// fmt.Println(word1, word2) +// q.Close() +// // Output: hello world +// +// # Implementation +// +// Each [Q] has an array of circular buffers, one for each priority level. +// Currently, the capacities for these are fixed and equal. If one buffer is +// full, attempts to send further items with its priority level will block +// ([Q.Send]) or fail ([Q.TrySend]). +// +// Compared to [mq.Q] , which has only two priority levels, [Q] performs +// similarly in the Send and Recv benchmarks. In the ConcSendRecv benchmark, +// though, this package is around 75% slower. Even more peculiar, however, in +// HighContention, it is 7% faster. +// +// Compared the pq package, this package is faster in the HighContention +// benchmark until around 145-150 priority levels; in ConcSendRecv, until +// around 200 levels; and in Recv, until around 750 levels. +// +// It's important to remember the memory requirement differences. A [pq.Q] +// with capacity N can store N items; a [Q] with capacity N and P priority +// levels can store N*P items. +package npq + +import ( + "fmt" + "sync" + + "gogs.humancabbage.net/sam/priorityq" + "gogs.humancabbage.net/sam/priorityq/mq" + "gogs.humancabbage.net/sam/priorityq/pq" + "gogs.humancabbage.net/sam/priorityq/queue" + "golang.org/x/exp/constraints" +) + +// so that godoc (kinda) works +var _ *pq.Q[int, int] +var _ *mq.Q[int] + +// Q is a concurrent, dual-priority message queue. +type Q[P constraints.Integer, T any] struct { + *state[P, T] +} + +// Make a new queue. +func Make[P constraints.Integer, T any](cap int, min P, max P) Q[P, T] { + numLevels := max - min + 1 + levels := make([]queue.Q[T], numLevels) + for i := 0; i < int(numLevels); i++ { + levels[i] = queue.Make[T](cap) + } + s := &state[P, T]{ + levels: levels, + min: min, + max: max, + } + canSend := make([]sync.Cond, numLevels) + for i := 0; i < len(canSend); i++ { + canSend[i].L = &s.mu + } + s.canRecv = sync.Cond{L: &s.mu} + s.canSend = canSend + return Q[P, T]{s} +} + +type state[P constraints.Integer, T any] struct { + mu sync.Mutex + levels []queue.Q[T] + canSend []sync.Cond + canRecv sync.Cond + min P + max P + closed bool +} + +// Close marks the queue as closed. +// +// Attempting to close an already-closed queue results in a panic. +func (s *state[P, T]) Close() { + s.mu.Lock() + if s.closed { + panic("close of closed queue") + } + s.closed = true + s.mu.Unlock() + s.canRecv.Broadcast() +} + +// Recv gets an item, blocking when empty until one is available. +// +// The returned bool will be true if the queue still has items or is open. +// It will be false if the queue is empty and closed. +func (s *state[P, T]) Recv() (T, bool) { + s.mu.Lock() + defer s.mu.Unlock() + for { + var available int = -1 + findAvailable := func() { + for levelIdx := len(s.levels) - 1; levelIdx > -1; levelIdx-- { + level := &s.levels[levelIdx] + if level.CanPop() { + available = levelIdx + break + } + } + } + findAvailable() + for !s.closed && available == -1 { + s.canRecv.Wait() + findAvailable() + } + if s.closed && available == -1 { + var empty T + return empty, false + } + if available != -1 { + level := &s.levels[available] + value := level.PopFront() + s.canSend[available].Broadcast() + return value, true + } + } +} + +// Send adds an item with some priority, blocking if full. +func (s *state[P, T]) Send(priority P, value T) { + s.validatePriority(priority) + s.mu.Lock() + defer s.mu.Unlock() + + levelIdx := priority - s.min + level := &s.levels[levelIdx] + for { + for !s.closed && !level.CanPush() { + s.canSend[levelIdx].Wait() + } + if s.closed { + panic("send on closed queue") + } + if level.CanPush() { + level.PushBack(value) + s.canRecv.Broadcast() + return + } + } +} + +// TryRecv attempts to get an item from the queue, without blocking. +// +// The error indicates whether the attempt succeeded, the queue is empty, or +// the queue is closed. +func (s *state[P, T]) TryRecv() (value T, err error) { + s.mu.Lock() + defer s.mu.Unlock() + + for levelIdx := len(s.levels) - 1; levelIdx > -1; levelIdx-- { + level := &s.levels[levelIdx] + if level.CanPop() { + value = level.PopFront() + s.canSend[levelIdx].Broadcast() + return + } + } + if s.closed { + err = priorityq.ErrClosed + } else { + err = priorityq.ErrEmpty + } + + return +} + +// TrySend attempts to add an item with some priority, without blocking. +// +// Returns an error from the root priorityq package, or nil if successful. +func (s *state[P, T]) TrySend(priority P, value T) error { + s.validatePriority(priority) + s.mu.Lock() + defer s.mu.Unlock() + if s.closed { + return priorityq.ErrClosed + } + levelIdx := priority - s.min + level := &s.levels[levelIdx] + if !level.CanPush() { + return priorityq.ErrFull + } + level.PushBack(value) + s.canRecv.Broadcast() + + return nil +} + +func (s *state[P, T]) validatePriority(priority P) { + if priority < s.min || priority > s.max { + panic(fmt.Errorf("priority %d out of range (%d, %d)", + priority, s.min, s.max)) + } +} diff --git a/npq/lib_test.go b/npq/lib_test.go new file mode 100644 index 0000000..e74f0e1 --- /dev/null +++ b/npq/lib_test.go @@ -0,0 +1,257 @@ +package npq_test + +import ( + "math/rand" + "runtime" + "sync" + "testing" + + "gogs.humancabbage.net/sam/priorityq" + "gogs.humancabbage.net/sam/priorityq/npq" +) + +func TestRecvHighFirst(t *testing.T) { + t.Parallel() + q := npq.Make[int, int](4, 1, 2) + q.Send(1, 1) + q.Send(1, 2) + q.Send(1, 3) + q.Send(1, 4) + q.Send(2, 5) + q.Send(2, 6) + q.Send(2, 7) + q.Send(2, 8) + checkRecv := func(n int) { + if v, _ := q.Recv(); v != n { + t.Errorf("popped %d, expected %d", v, n) + } + } + checkRecv(5) + checkRecv(6) + checkRecv(7) + checkRecv(8) + checkRecv(1) + checkRecv(2) + checkRecv(3) + checkRecv(4) +} + +func TestSendClosedPanic(t *testing.T) { + t.Parallel() + defer func() { + if r := recover(); r == nil { + t.Errorf("sending to closed queue did not panic") + } + }() + q := npq.Make[int, int](4, 1, 2) + q.Close() + q.Send(1, 1) +} + +func TestRecvClosed(t *testing.T) { + t.Parallel() + q := npq.Make[int, int](4, 1, 2) + q.Send(1, 1) + q.Close() + _, ok := q.Recv() + if !ok { + t.Errorf("queue should have item to receive") + } + _, ok = q.Recv() + if ok { + t.Errorf("queue should be closed") + } +} + +func TestDoubleClose(t *testing.T) { + t.Parallel() + q := npq.Make[int, int](4, 1, 2) + defer func() { + if r := recover(); r == nil { + t.Errorf("closing a closed queue did not panic") + } + }() + q.Close() + q.Close() +} + +func TestTrySendRecv(t *testing.T) { + t.Parallel() + q := npq.Make[int, int](4, 1, 2) + assumeSendOk := func(n int, f func(int) error) { + err := f(n) + if err != nil { + t.Errorf("expected to be able to send") + } + } + assumeRecvOk := func(expected int) { + actual, err := q.TryRecv() + if err != nil { + t.Errorf("expected to be able to receive") + } + if actual != expected { + t.Errorf("expected %d, got %d", expected, actual) + } + } + trySendLow := func(n int) error { + return q.TrySend(1, n) + } + trySendHigh := func(n int) error { + return q.TrySend(2, n) + } + assumeSendOk(1, trySendLow) + assumeSendOk(2, trySendLow) + assumeSendOk(3, trySendLow) + assumeSendOk(4, trySendLow) + err := trySendLow(5) + if err == nil { + t.Errorf("expected low buffer to be full") + } + assumeRecvOk(1) + assumeRecvOk(2) + assumeRecvOk(3) + assumeRecvOk(4) + + assumeSendOk(5, trySendHigh) + assumeSendOk(6, trySendHigh) + assumeSendOk(7, trySendHigh) + assumeSendOk(8, trySendHigh) + err = trySendHigh(5) + if err == nil { + t.Errorf("expected high buffer to be full") + } + assumeRecvOk(5) + assumeRecvOk(6) + assumeRecvOk(7) + assumeRecvOk(8) + + _, err = q.TryRecv() + if err != priorityq.ErrEmpty { + t.Errorf("expected queue to be empty") + } + q.Close() + _, err = q.TryRecv() + if err != priorityq.ErrClosed { + t.Errorf("expected queue to be closed ") + } + err = q.TrySend(1, 5) + if err != priorityq.ErrClosed { + t.Errorf("expected queue to be closed ") + } +} + +func TestConcProducerConsumer(t *testing.T) { + t.Parallel() + q := npq.Make[int, int](4, 1, 2) + var wg sync.WaitGroup + produceDone := make(chan struct{}) + wg.Add(2) + go func() { + for i := 0; i < 10000; i++ { + q.Send(rand.Intn(2)+1, i) + } + close(produceDone) + wg.Done() + }() + go func() { + ok := true + for ok { + _, ok = q.Recv() + } + wg.Done() + }() + <-produceDone + t.Logf("producer done, closing channel") + q.Close() + wg.Wait() +} + +const highPriority = 2 + +func BenchmarkSend(b *testing.B) { + q := npq.Make[int, int](b.N, 1, highPriority) + // randomize priorities to get amortized cost per op + ps := make([]int, b.N) + for i := 0; i < b.N; i++ { + ps[i] = rand.Intn(highPriority) + 1 + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + q.Send(ps[i], i) + } +} + +func BenchmarkRecv(b *testing.B) { + q := npq.Make[int, int](b.N, 1, highPriority) + for i := 0; i < b.N; i++ { + q.Send(rand.Intn(highPriority)+1, i) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + q.Recv() + } +} + +func BenchmarkConcSendRecv(b *testing.B) { + q := npq.Make[int, int](b.N, 1, highPriority) + // randomize priorities to get amortized cost per op + ps := make([]int, b.N) + for i := 0; i < b.N; i++ { + ps[i] = rand.Intn(highPriority) + 1 + } + var wg sync.WaitGroup + wg.Add(2) + start := make(chan struct{}) + go func() { + <-start + for i := 0; i < b.N; i++ { + q.Send(ps[i], i) + } + wg.Done() + }() + go func() { + <-start + for i := 0; i < b.N; i++ { + q.Recv() + } + wg.Done() + }() + b.ResetTimer() + close(start) + wg.Wait() +} + +func BenchmarkHighContention(b *testing.B) { + q := npq.Make[int, int](b.N, 1, highPriority) + var wg sync.WaitGroup + start := make(chan struct{}) + done := make(chan struct{}) + numProducers := runtime.NumCPU() + sendsPerProducer := b.N / numProducers + wg.Add(numProducers) + for i := 0; i < numProducers; i++ { + go func() { + ps := make([]int, sendsPerProducer) + for i := range ps { + ps[i] = rand.Intn(highPriority) + 1 + } + <-start + for i := 0; i < sendsPerProducer; i++ { + q.Send(ps[i], 1) + } + wg.Done() + }() + } + go func() { + ok := true + for ok { + _, ok = q.Recv() + } + close(done) + }() + b.ResetTimer() + close(start) + wg.Wait() + q.Close() + <-done +}