Skip to content

Commit

Permalink
cap queue size
Browse files Browse the repository at this point in the history
  • Loading branch information
dvush committed Nov 5, 2024
1 parent a5b85b7 commit 42a14c7
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 6 deletions.
4 changes: 3 additions & 1 deletion proxy/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ var (
errArchivePublicRequest = errors.New("public RPC request should not reach archive")

ArchiveRequestTimeout = time.Second * 30

ArchiveWorkerQueueSize = 10000
)

type ArchiveQueue struct {
Expand All @@ -39,7 +41,7 @@ func (aq *ArchiveQueue) Run() {
if aq.workerCount > 0 {
workerCount = aq.workerCount
}
workersQueue := make(chan *ParsedRequest)
workersQueue := make(chan *ParsedRequest, ArchiveWorkerQueueSize)
for w := 0; w < workerCount; w++ {
worker := &archiveQueueWorker{
log: aq.log.With(slog.Int("worker", w)),
Expand Down
6 changes: 4 additions & 2 deletions proxy/receiver_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ var (

replacementNonceSize = 4096
replacementNonceTTL = time.Second * 5 * 12

ReceiverProxyWorkerQueueSize = 10000
)

type replacementNonceKey struct {
Expand Down Expand Up @@ -138,7 +140,7 @@ func NewReceiverProxy(config ReceiverProxyConfig) (*ReceiverProxy, error) {
}
})

shareQeueuCh := make(chan *ParsedRequest)
shareQeueuCh := make(chan *ParsedRequest, ReceiverProxyWorkerQueueSize)
updatePeersCh := make(chan []ConfighubBuilder)
prx.shareQueue = shareQeueuCh
prx.updatePeers = updatePeersCh
Expand All @@ -153,7 +155,7 @@ func NewReceiverProxy(config ReceiverProxyConfig) (*ReceiverProxy, error) {
}
go queue.Run()

archiveQueueCh := make(chan *ParsedRequest)
archiveQueueCh := make(chan *ParsedRequest, ReceiverProxyWorkerQueueSize)
archiveFlushCh := make(chan struct{})
prx.archiveQueue = archiveQueueCh
prx.archiveFlushQueue = archiveFlushCh
Expand Down
6 changes: 3 additions & 3 deletions proxy/sharing.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
)

var (
jobBufferSize = 4096
requestTimeout = time.Second * 10
ShareWorkerQueueSize = 10000
requestTimeout = time.Second * 10
)

type ShareQueue struct {
Expand All @@ -33,7 +33,7 @@ type shareQueuePeer struct {

func newShareQueuePeer(name string, client rpcclient.RPCClient) shareQueuePeer {
return shareQueuePeer{
ch: make(chan *ParsedRequest, jobBufferSize),
ch: make(chan *ParsedRequest, ShareWorkerQueueSize),
name: name,
client: client,
}
Expand Down

0 comments on commit 42a14c7

Please sign in to comment.