Skip to content

Commit

Permalink
Merge pull request #3 from opensciencegrid/add-disk-mem-queue
Browse files Browse the repository at this point in the history
Add ability for memory and disk to switch back and forth
  • Loading branch information
djw8605 authored Dec 20, 2021
2 parents 71ff69a + ad02719 commit 23be23d
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 31 deletions.
10 changes: 9 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ Environment variables:
* SHOVELER_LISTEN_PORT
* SHOVELER_LISTEN_IP
* SHOVELER_VERIFY
* SHOVELER_QUEUE_DIRECTORY

Message Bus Credentials
-----------------------
Expand All @@ -52,4 +53,11 @@ From Docker, you can start the container from the OSG hub with the following com
Packet Verification
-------------------

If the `verify` option or `SHOVELER_VERIFY` env. var. is set to `true` (the default), the shoveler will perform simple verification that the incoming UDP packets conform to XRootD monitoring packets.
If the `verify` option or `SHOVELER_VERIFY` env. var. is set to `true` (the default), the shoveler will perform simple verification that the incoming UDP packets conform to XRootD monitoring packets.

Queue Design
------------

The shoveler receives UDP packets and stores them onto a queue before being sent to the message bus. 100 messages are stored in memory. When the in memory messages reaches over 100, the messages are written to disk under the `SHOVELER_QUEUE_DIRECTORY` (env) or `queue_directory` (yaml) configured directories. A good default is `/tmp/shoveler-queue`, though it could also go in `/var/...`. The on-disk queue is persistent across shoveler restarts.

The queue length can be monitored through the prometheus monitoring metric name: `shoveler_queue_size`.
10 changes: 9 additions & 1 deletion config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,15 @@ listen:
# packet format
verify: true

# Export metrics
# Export prometheus metrics
metrics:
enable: true
port: 8000

# Directory to store overflow of queue onto disk.
# The queue keeps 100 messages in memory. If the shoveler is disconnected from the message bus,
# it will store messages over the 100 in memory onto disk into this directory. Once the connection has been re-established
# the queue will be emptied. The queue on disk is persistent between restarts.
config_directory: /tmp/shoveler-queue


90 changes: 62 additions & 28 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,17 @@ type MessageStruct struct {
}

type ConfirmationQueue struct {
msgQueue *dque.DQue
diskQueue *dque.DQue
mutex sync.Mutex
emptyCond *sync.Cond
inMemory *list.List
memQueue *list.List
usingDisk bool
}

var (
ErrEmpty = errors.New("queue is empty")
MaxInMemory = 100
ErrEmpty = errors.New("queue is empty")
MaxInMemory = 100
LowWaterMark = 50
)

// NewConfirmationQueue returns an initialized list.
Expand All @@ -46,19 +48,19 @@ func (cq *ConfirmationQueue) Init() *ConfirmationQueue {
qDir := path.Dir(queueDir)
segmentSize := 10000
var err error
cq.msgQueue, err = dque.NewOrOpen(qName, qDir, segmentSize, ItemBuilder)
cq.diskQueue, err = dque.NewOrOpen(qName, qDir, segmentSize, ItemBuilder)
if err != nil {
log.Panicln("Failed to create queue:", err)
}
err = cq.msgQueue.TurboOn()
err = cq.diskQueue.TurboOn()
if err != nil {
log.Errorln("Failed to turn on dque Turbo mode, the queue will be safer but much slower:", err)
}

cq.emptyCond = sync.NewCond(&cq.mutex)

// Start the metrics goroutine
cq.inMemory = list.New()
cq.memQueue = list.New()
go cq.queueMetrics()
return cq

Expand All @@ -67,7 +69,11 @@ func (cq *ConfirmationQueue) Init() *ConfirmationQueue {
func (cq *ConfirmationQueue) Size() int {
cq.mutex.Lock()
defer cq.mutex.Unlock()
return cq.inMemory.Len() + cq.msgQueue.SizeUnsafe()
if cq.usingDisk {
return cq.diskQueue.SizeUnsafe()
} else {
return cq.memQueue.Len()
}
}

// queueMetrics updates the queue size prometheus metric
Expand All @@ -93,12 +99,30 @@ func (cq *ConfirmationQueue) Enqueue(msg []byte) {
cq.mutex.Lock()
defer cq.mutex.Unlock()
// Check size of in memory queue
if cq.inMemory.Len() < MaxInMemory {
// Add to in memory queue
cq.inMemory.PushBack(msg)

// Still using in-memory
if !cq.usingDisk && (cq.memQueue.Len()+1) < MaxInMemory {
cq.memQueue.PushBack(msg)
} else if !cq.usingDisk && (cq.memQueue.Len()+1) >= MaxInMemory {
// Not using disk queue, but the next message would go over MaxInMemory
// Transfer everything to the on-disk queue
for cq.memQueue.Len() > 0 {
toEnqueue := cq.memQueue.Remove(cq.memQueue.Front()).([]byte)
err := cq.diskQueue.Enqueue(&MessageStruct{Message: toEnqueue})
if err != nil {
log.Errorln("Failed to enqueue message:", err)
}
}
// Enqueue the current
err := cq.diskQueue.Enqueue(&MessageStruct{Message: msg})
if err != nil {
log.Errorln("Failed to enqueue message:", err)
}
cq.usingDisk = true

} else {
// Add to on disk queue
err := cq.msgQueue.Enqueue(&MessageStruct{Message: msg})
// Last option is we are using disk
err := cq.diskQueue.Enqueue(&MessageStruct{Message: msg})
if err != nil {
log.Errorln("Failed to enqueue message:", err)
}
Expand All @@ -109,24 +133,34 @@ func (cq *ConfirmationQueue) Enqueue(msg []byte) {
// dequeueLocked dequeues a message, assuming the queue has already been locked
func (cq *ConfirmationQueue) dequeueLocked() ([]byte, error) {
// Check if we have a message available in the queue
if cq.inMemory.Len() == 0 {
if !cq.usingDisk && cq.memQueue.Len() == 0 {
return nil, ErrEmpty
} else if cq.usingDisk && cq.diskQueue.Size() == 0 {
return nil, ErrEmpty
}
// Remove the first element and get the value
toReturn := cq.inMemory.Remove(cq.inMemory.Front()).([]byte)

// See if we have anything on the on-disk
for cq.inMemory.Len() < MaxInMemory {
// Dequeue something from the on disk
msgStruct, err := cq.msgQueue.Dequeue()
if err == dque.ErrEmpty {
// Queue is empty
break

if !cq.usingDisk {
return cq.memQueue.Remove(cq.memQueue.Front()).([]byte), nil
} else if cq.usingDisk && (cq.diskQueue.Size()-1) >= LowWaterMark {
// If we are using disk, and the on disk size is larger than the low water mark
msgStruct, err := cq.diskQueue.Dequeue()
if err != nil {
log.Errorln("Failed to dequeue: ", err)
}
// Add the new message to the back of the in memory queue
cq.inMemory.PushBack(msgStruct.(*MessageStruct).Message)
return msgStruct.(*MessageStruct).Message, err
} else {
// Using disk, but the next enqueue makes it < LowWaterMark, transfer everything from on disk to in-memory
for cq.diskQueue.Size() > 0 {
msgStruct, err := cq.diskQueue.Dequeue()
if err != nil {
log.Errorln("Failed to dequeue: ", err)
}
cq.memQueue.PushBack(msgStruct.(*MessageStruct).Message)
}
cq.usingDisk = false
return cq.memQueue.Remove(cq.memQueue.Front()).([]byte), nil
}
return toReturn, nil

}

// Dequeue Blocking function to receive a message
Expand All @@ -151,5 +185,5 @@ func (cq *ConfirmationQueue) Dequeue() ([]byte, error) {
func (cq *ConfirmationQueue) Close() error {
cq.mutex.Lock()
defer cq.mutex.Unlock()
return cq.msgQueue.Close()
return cq.diskQueue.Close()
}
14 changes: 13 additions & 1 deletion queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,25 @@ func TestQueueLotsEntries(t *testing.T) {
queue.Enqueue([]byte(msgString))
}

assert.Equal(t, 100000, queue.Size())
//assert.Equal(t, 100000, queue.Size())
for i := 1; i <= 100000; i++ {
msgString := "test." + strconv.Itoa(i)
msg, err := queue.Dequeue()
assert.NoError(t, err)
assert.Equal(t, msgString, string(msg))
}
assert.Equal(t, 0, queue.Size())
for i := 1; i <= 100000; i++ {
msgString := "test." + strconv.Itoa(i)
queue.Enqueue([]byte(msgString))
}

assert.Equal(t, 100000, queue.Size())
for i := 1; i <= 100000; i++ {
msgString := "test." + strconv.Itoa(i)
msg, err := queue.Dequeue()
assert.NoError(t, err)
assert.Equal(t, msgString, string(msg))
}

}

0 comments on commit 23be23d

Please sign in to comment.