priorityq/npq/lib.go

215 lines
5.2 KiB
Go

// Package npq implements a concurrent, n-priority message queue.
//
// [Q] is similar to a buffered channel, except that senders can assign one of
// some fixed number priority levels to each item. Receivers will always get
// the item with the highest priority.
//
// For example:
//
// q := npq.Make[int, string](8, 1, 2)
// q.Send(1, "world")
// q.Send(2, "hello")
// word1, _ := q.Recv()
// word2, _ := q.Recv()
// fmt.Println(word1, word2)
// q.Close()
// // Output: hello world
//
// # Implementation
//
// Each [Q] has an array of circular buffers, one for each priority level.
// Currently, the capacities for these are fixed and equal. If one buffer is
// full, attempts to send further items with its priority level will block
// ([Q.Send]) or fail ([Q.TrySend]).
//
// Compared to [mq.Q] , which has only two priority levels, [Q] performs
// similarly in the Send and Recv benchmarks. In the ConcSendRecv benchmark,
// though, this package is around 75% slower. Even more peculiar, however, in
// HighContention, it is 7% faster.
//
// Compared the pq package, this package is faster in the HighContention
// benchmark until around 145-150 priority levels; in ConcSendRecv, until
// around 200 levels; and in Recv, until around 750 levels.
//
// It's important to remember the memory requirement differences. A [pq.Q]
// with capacity N can store N items; a [Q] with capacity N and P priority
// levels can store N*P items.
package npq
import (
"fmt"
"sync"
"gogs.humancabbage.net/sam/priorityq"
"gogs.humancabbage.net/sam/priorityq/mq"
"gogs.humancabbage.net/sam/priorityq/pq"
"gogs.humancabbage.net/sam/priorityq/queue"
"golang.org/x/exp/constraints"
)
// so that godoc (kinda) works
var _ *pq.Q[int, int]
var _ *mq.Q[int]
// Q is a concurrent, dual-priority message queue.
type Q[P constraints.Integer, T any] struct {
*state[P, T]
}
// Make a new queue.
func Make[P constraints.Integer, T any](cap int, min P, max P) Q[P, T] {
numLevels := max - min + 1
levels := make([]queue.Q[T], numLevels)
for i := 0; i < int(numLevels); i++ {
levels[i] = queue.Make[T](cap)
}
s := &state[P, T]{
levels: levels,
min: min,
max: max,
}
canSend := make([]sync.Cond, numLevels)
for i := 0; i < len(canSend); i++ {
canSend[i].L = &s.mu
}
s.canRecv = sync.Cond{L: &s.mu}
s.canSend = canSend
return Q[P, T]{s}
}
type state[P constraints.Integer, T any] struct {
mu sync.Mutex
levels []queue.Q[T]
canSend []sync.Cond
canRecv sync.Cond
min P
max P
closed bool
}
// Close marks the queue as closed.
//
// Attempting to close an already-closed queue results in a panic.
func (s *state[P, T]) Close() {
s.mu.Lock()
if s.closed {
s.mu.Unlock()
panic("close of closed queue")
}
s.closed = true
s.mu.Unlock()
s.canRecv.Broadcast()
}
// Recv gets an item, blocking when empty until one is available.
//
// The returned bool will be true if the queue still has items or is open.
// It will be false if the queue is empty and closed.
func (s *state[P, T]) Recv() (T, bool) {
s.mu.Lock()
defer s.mu.Unlock()
for {
var available int = -1
findAvailable := func() {
for levelIdx := len(s.levels) - 1; levelIdx > -1; levelIdx-- {
level := &s.levels[levelIdx]
if level.CanPop() {
available = levelIdx
break
}
}
}
findAvailable()
for !s.closed && available == -1 {
s.canRecv.Wait()
findAvailable()
}
if s.closed && available == -1 {
var empty T
return empty, false
}
if available != -1 {
level := &s.levels[available]
value := level.PopFront()
s.canSend[available].Broadcast()
return value, true
}
}
}
// Send adds an item with some priority, blocking if full.
func (s *state[P, T]) Send(priority P, value T) {
s.validatePriority(priority)
s.mu.Lock()
defer s.mu.Unlock()
levelIdx := priority - s.min
level := &s.levels[levelIdx]
for {
for !s.closed && !level.CanPush() {
s.canSend[levelIdx].Wait()
}
if s.closed {
panic("send on closed queue")
}
if level.CanPush() {
level.PushBack(value)
s.canRecv.Broadcast()
return
}
}
}
// TryRecv attempts to get an item from the queue, without blocking.
//
// The error indicates whether the attempt succeeded, the queue is empty, or
// the queue is closed.
func (s *state[P, T]) TryRecv() (value T, err error) {
s.mu.Lock()
defer s.mu.Unlock()
for levelIdx := len(s.levels) - 1; levelIdx > -1; levelIdx-- {
level := &s.levels[levelIdx]
if level.CanPop() {
value = level.PopFront()
s.canSend[levelIdx].Broadcast()
return
}
}
if s.closed {
err = priorityq.ErrClosed
} else {
err = priorityq.ErrEmpty
}
return
}
// TrySend attempts to add an item with some priority, without blocking.
//
// Returns an error from the root priorityq package, or nil if successful.
func (s *state[P, T]) TrySend(priority P, value T) error {
s.validatePriority(priority)
s.mu.Lock()
defer s.mu.Unlock()
if s.closed {
return priorityq.ErrClosed
}
levelIdx := priority - s.min
level := &s.levels[levelIdx]
if !level.CanPush() {
return priorityq.ErrFull
}
level.PushBack(value)
s.canRecv.Broadcast()
return nil
}
func (s *state[P, T]) validatePriority(priority P) {
if priority < s.min || priority > s.max {
panic(fmt.Errorf("priority %d out of range (%d, %d)",
priority, s.min, s.max))
}
}