Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add UUID to queue store #264

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions autopaho/auto.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,8 @@ func (c *ConnectionManager) PublishViaQueue(ctx context.Context, p *QueuePublish
if _, err := p.Packet().WriteTo(&b); err != nil {
return err
}
return c.queue.Enqueue(&b)
_, err := c.queue.Enqueue(&b)
return err
}

// TerminateConnectionForTest closes the active connection (if any). This function is intended for testing only, it
Expand Down Expand Up @@ -561,7 +562,7 @@ connectionLoop:
time.Sleep(1 * time.Second)
continue
}
r, err := entry.Reader()
_, r, err := entry.Reader()
if err != nil {
c.errors.Printf("error retrieving reader for queue entry: %s", err)
if err := entry.Leave(); err != nil {
Expand Down
43 changes: 28 additions & 15 deletions autopaho/queue/file/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ import (
"io"
"os"
"path/filepath"
"strings"
"sync"
"time"

"github.com/eclipse/paho.golang/autopaho/queue"
"github.com/google/uuid"
)

// A queue implementation that stores all data on disk
Expand Down Expand Up @@ -131,18 +133,18 @@ func (q *Queue) WaitForEmpty() chan struct{} {
}

// Enqueue add item to the queue.
func (q *Queue) Enqueue(p io.Reader) error {
func (q *Queue) Enqueue(p io.Reader) (uuid.UUID, error) {
q.mu.Lock()
defer q.mu.Unlock()
err := q.put(p)
id, err := q.put(p)
if err == nil && q.queueEmpty {
q.queueEmpty = false
for _, c := range q.waiting {
close(c)
}
q.waiting = q.waiting[:0]
}
return err
return id, err
}

// Peek retrieves the oldest item from the queue (without removing it)
Expand All @@ -165,23 +167,24 @@ func (q *Queue) Peek() (queue.Entry, error) {
}

// put writes out an item to disk
func (q *Queue) put(p io.Reader) error {
// Use CreateTemp to generate a file with a unique name (it will be removed when packet has been transmitted)
f, err := os.CreateTemp(q.path, q.prefix+"*"+q.extension)
func (q *Queue) put(p io.Reader) (uuid.UUID, error) {
id := uuid.New()
// Create a file with a unique name, it will be removed when packet has been transmitted
f, err := os.Create(filepath.Join(q.path, q.prefix+id.String()+q.extension))
if err != nil {
return err
return uuid.Nil, err
}

if _, err = io.Copy(f, p); err != nil {
f.Close()
_ = os.Remove(f.Name()) // Attempt to remove the partial file (not much we can do if this fails)
return err
return uuid.Nil, err
}
if err = f.Close(); err != nil {
_ = os.Remove(f.Name()) // Attempt to remove the partial file (not much we can do if this fails)
return err
return uuid.Nil, err
}
return nil
return id, nil
}

// get() returns a ReadCloser that accesses the oldest file available
Expand All @@ -195,7 +198,16 @@ func (q *Queue) get() (entry, error) {
if err != nil {
return entry{}, err
}
return entry{f: f}, nil

// Extract UUID from filename
fileNameUUID := strings.TrimSuffix(strings.TrimPrefix(filepath.Base(fn), q.prefix), q.extension)
uuid, err := uuid.Parse(fileNameUUID)
if err != nil {
f.Close()
return entry{}, fmt.Errorf("failed to parse UUID from filename: %w", err)
}

return entry{f: f, uuid: uuid}, nil
}

// oldestEntry returns the filename of the oldest entry in the queue (if any - io.EOF means none)
Expand Down Expand Up @@ -238,12 +250,13 @@ func (q *Queue) oldestEntry() (string, error) {

// entry is used to return a queue entry from Peek
type entry struct {
f *os.File
f *os.File
uuid uuid.UUID
}

// Reader provides access to the file contents
func (e entry) Reader() (io.Reader, error) {
return e.f, nil
// Reader provides access to the id and file contents
func (e entry) Reader() (uuid.UUID, io.Reader, error) {
return e.uuid, e.f, nil
}

// Leave closes the entry leaving it in the queue (will be returned on subsequent calls to Peek)
Expand Down
14 changes: 10 additions & 4 deletions autopaho/queue/file/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"github.com/eclipse/paho.golang/autopaho/queue"
"github.com/google/uuid"
)

// TestFileQueue some basic tests of the queue
Expand All @@ -50,7 +51,8 @@ func TestFileQueue(t *testing.T) {
default:
}
testEntry := []byte("This is a test")
if err := q.Enqueue(bytes.NewReader(testEntry)); err != nil {
_, err = q.Enqueue(bytes.NewReader(testEntry))
if err != nil {
t.Fatalf("error adding to queue: %s", err)
}
select {
Expand All @@ -61,7 +63,7 @@ func TestFileQueue(t *testing.T) {

const entryFormat = "Queue entry %d for testing"
for i := 0; i < 10; i++ {
if err := q.Enqueue(bytes.NewReader([]byte(fmt.Sprintf(entryFormat, i)))); err != nil {
if _, err := q.Enqueue(bytes.NewReader([]byte(fmt.Sprintf(entryFormat, i)))); err != nil {
t.Fatalf("error adding entry %d: %s", i, err)
}
time.Sleep(time.Nanosecond) // Short delay due to file system time resolution
Expand All @@ -78,10 +80,13 @@ func TestFileQueue(t *testing.T) {
if err != nil {
t.Fatalf("error peeking entry %d: %s", i, err)
}
r, err := entry.Reader()
id, r, err := entry.Reader()
if err != nil {
t.Fatalf("error getting reader for entry %d: %s", i, err)
}
if id == uuid.Nil {
t.Fatalf("expected non-nil UUID for entry %d", i)
}
buf := &bytes.Buffer{}
if _, err = buf.ReadFrom(r); err != nil {
t.Fatalf("error reading entry %d: %s", i, err)
Expand Down Expand Up @@ -114,7 +119,8 @@ func TestLeaveAndError(t *testing.T) {
}

testEntry := []byte("This is a test")
if err := q.Enqueue(bytes.NewReader(testEntry)); err != nil {
_, err = q.Enqueue(bytes.NewReader(testEntry))
if err != nil {
t.Fatalf("error adding to queue: %s", err)
}

Expand Down
41 changes: 26 additions & 15 deletions autopaho/queue/memory/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,21 @@ import (
"sync"

"github.com/eclipse/paho.golang/autopaho/queue"
"github.com/google/uuid"
)

// A queue implementation that stores all data in RAM

// queueItem represents a single item in the queue
type queueItem struct {
message []byte
uniqueID uuid.UUID
}

// Queue - basic memory based queue
type Queue struct {
mu sync.Mutex
messages [][]byte
items []queueItem
waiting []chan<- struct{} // closed when something arrives in the queue
waitingForEmpty []chan<- struct{} // closed when queue is empty
}
Expand All @@ -43,7 +50,7 @@ func New() *Queue {
func (q *Queue) Wait() chan struct{} {
c := make(chan struct{})
q.mu.Lock()
if len(q.messages) > 0 {
if len(q.items) > 0 {
q.mu.Unlock()
close(c)
return c
Expand All @@ -57,7 +64,7 @@ func (q *Queue) Wait() chan struct{} {
func (q *Queue) WaitForEmpty() chan struct{} {
c := make(chan struct{})
q.mu.Lock()
if len(q.messages) == 0 {
if len(q.items) == 0 {
q.mu.Unlock()
close(c)
return c
Expand All @@ -68,42 +75,46 @@ func (q *Queue) WaitForEmpty() chan struct{} {
}

// Enqueue add item to the queue.
func (q *Queue) Enqueue(p io.Reader) error {
func (q *Queue) Enqueue(p io.Reader) (uuid.UUID, error) {
var b bytes.Buffer
_, err := b.ReadFrom(p)
if err != nil {
return fmt.Errorf("Queue.Push failed to read into buffer: %w", err)
return uuid.Nil, fmt.Errorf("Queue.Push failed to read into buffer: %w", err)
}
q.mu.Lock()
defer q.mu.Unlock()
q.messages = append(q.messages, b.Bytes())
newItem := queueItem{
message: b.Bytes(),
uniqueID: uuid.New(),
}
q.items = append(q.items, newItem)
for _, c := range q.waiting {
close(c)
}
q.waiting = q.waiting[:0]
return nil
return newItem.uniqueID, nil
}

// Peek retrieves the oldest item from the queue (without removing it)
func (q *Queue) Peek() (queue.Entry, error) {
q.mu.Lock()
defer q.mu.Unlock()
if len(q.messages) == 0 {
if len(q.items) == 0 {
return nil, queue.ErrEmpty
}
// Queue implements Entry directly (as this always references q.messages[0]
// Queue implements Entry directly (as this always references q.items[0]
return q, nil
}

// Reader implements Entry.Reader - As the entry will always be the first item in the queue this is implemented
// against Queue rather than as a separate struct.
func (q *Queue) Reader() (io.Reader, error) {
func (q *Queue) Reader() (uuid.UUID, io.Reader, error) {
q.mu.Lock()
defer q.mu.Unlock()
if len(q.messages) == 0 {
return nil, queue.ErrEmpty
if len(q.items) == 0 {
return uuid.Nil, nil, queue.ErrEmpty
}
return bytes.NewReader(q.messages[0]), nil
return q.items[0].uniqueID, bytes.NewReader(q.items[0].message), nil
}

// Leave implements Entry.Leave - the entry (will be returned on subsequent calls to Peek)
Expand All @@ -125,9 +136,9 @@ func (q *Queue) Quarantine() error {
func (q *Queue) remove() error {
q.mu.Lock()
defer q.mu.Unlock()
initialLen := len(q.messages)
initialLen := len(q.items)
if initialLen > 0 {
q.messages = q.messages[1:]
q.items = q.items[1:]
}
if initialLen <= 1 { // Queue is now, or was already, empty
for _, c := range q.waitingForEmpty {
Expand Down
24 changes: 20 additions & 4 deletions autopaho/queue/memory/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/eclipse/paho.golang/autopaho/queue"
"github.com/google/uuid"
)

// TestMemoryQueue some basic tests of the queue
Expand All @@ -45,9 +46,13 @@ func TestMemoryQueue(t *testing.T) {
default:
}
testEntry := []byte("This is a test")
if err := q.Enqueue(bytes.NewReader(testEntry)); err != nil {
id, err := q.Enqueue(bytes.NewReader(testEntry))
if err != nil {
t.Fatalf("error adding to queue: %s", err)
}
if id == uuid.Nil {
t.Fatalf("expected non-nil UUID, got nil")
}
select {
case <-queueNotEmpty:
case <-time.After(time.Second):
Expand All @@ -56,9 +61,13 @@ func TestMemoryQueue(t *testing.T) {

const entryFormat = "Queue entry %d for testing"
for i := 0; i < 10; i++ {
if err := q.Enqueue(bytes.NewReader([]byte(fmt.Sprintf(entryFormat, i)))); err != nil {
id, err := q.Enqueue(bytes.NewReader([]byte(fmt.Sprintf(entryFormat, i))))
if err != nil {
t.Fatalf("error adding entry %d: %s", i, err)
}
if id == uuid.Nil {
t.Fatalf("expected non-nil UUID for entry %d, got nil", i)
}
}

// Remove the initial "This is a test" entry
Expand All @@ -73,10 +82,13 @@ func TestMemoryQueue(t *testing.T) {
if err != nil {
t.Fatalf("error peeking entry %d: %s", i, err)
}
r, err := entry.Reader()
id, r, err := entry.Reader()
if err != nil {
t.Fatalf("error getting reader for entry %d: %s", i, err)
}
if id == uuid.Nil {
t.Fatalf("expected non-nil UUID for entry %d, got nil", i)
}
buf := &bytes.Buffer{}
if _, err = buf.ReadFrom(r); err != nil {
t.Fatalf("error reading entry %d: %s", i, err)
Expand Down Expand Up @@ -105,9 +117,13 @@ func TestLeaveAndQuarantine(t *testing.T) {
}

testEntry := []byte("This is a test")
if err := q.Enqueue(bytes.NewReader(testEntry)); err != nil {
id, err := q.Enqueue(bytes.NewReader(testEntry))
if err != nil {
t.Fatalf("error adding to queue: %s", err)
}
if id == uuid.Nil {
t.Fatalf("expected non-nil UUID, got nil")
}

// Peek and leave the entry in the queue
if entry, err := q.Peek(); err != nil {
Expand Down
14 changes: 8 additions & 6 deletions autopaho/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package queue
import (
"errors"
"io"

"github.com/google/uuid"
)

var (
Expand All @@ -28,10 +30,10 @@ var (
// Users must call one of Leave, Remove, or Quarantine when done with the entry (and before calling Peek again)
// `Reader()` must not be called after calling Leave, Remove, or Quarantine (and any Reader previously requestes should be considered invalid)
type Entry interface {
Reader() (io.Reader, error) // Provides access to the file contents, subsequent calls may return the same reader
Leave() error // Leave the entry in the queue (same entry will be returned on subsequent calls to Peek).
Remove() error // Remove this entry from the queue. Returns queue.ErrEmpty if queue is empty after operation
Quarantine() error // Flag that this entry has an error (remove from queue, potentially retaining data with error flagged)
Reader() (uuid.UUID, io.Reader, error) // Provides access to the id, file contents, subsequent calls may return the same reader
Leave() error // Leave the entry in the queue (same entry will be returned on subsequent calls to Peek).
Remove() error // Remove this entry from the queue. Returns queue.ErrEmpty if queue is empty after operation
Quarantine() error // Flag that this entry has an error (remove from queue, potentially retaining data with error flagged)
}

// Queue provides the functionality needed to manage queued messages
Expand All @@ -40,8 +42,8 @@ type Queue interface {
// queue is empty at the time of the call)
Wait() chan struct{}

// Enqueue add item to the queue.
Enqueue(p io.Reader) error
// Enqueue add item to the queue, returns the id of the entry
Enqueue(p io.Reader) (uuid.UUID, error)

// Peek retrieves the oldest item from the queue without removing it
// Users must call one of Close, Remove, or Quarantine when done with the entry, and before calling Peek again.
Expand Down
Loading