diff --git a/proxy/archive.go b/proxy/archive.go index a49cc46..8298c7b 100644 --- a/proxy/archive.go +++ b/proxy/archive.go @@ -22,6 +22,8 @@ var ( errArchivePublicRequest = errors.New("public RPC request should not reach archive") ArchiveRequestTimeout = time.Second * 30 + + ArchiveWorkerQueueSize = 10000 ) type ArchiveQueue struct { @@ -39,7 +41,7 @@ func (aq *ArchiveQueue) Run() { if aq.workerCount > 0 { workerCount = aq.workerCount } - workersQueue := make(chan *ParsedRequest) + workersQueue := make(chan *ParsedRequest, ArchiveWorkerQueueSize) for w := 0; w < workerCount; w++ { worker := &archiveQueueWorker{ log: aq.log.With(slog.Int("worker", w)), diff --git a/proxy/receiver_proxy.go b/proxy/receiver_proxy.go index 7e6481e..73bb477 100644 --- a/proxy/receiver_proxy.go +++ b/proxy/receiver_proxy.go @@ -23,6 +23,8 @@ var ( replacementNonceSize = 4096 replacementNonceTTL = time.Second * 5 * 12 + + ReceiverProxyWorkerQueueSize = 10000 ) type replacementNonceKey struct { @@ -138,7 +140,7 @@ func NewReceiverProxy(config ReceiverProxyConfig) (*ReceiverProxy, error) { } }) - shareQeueuCh := make(chan *ParsedRequest) + shareQeueuCh := make(chan *ParsedRequest, ReceiverProxyWorkerQueueSize) updatePeersCh := make(chan []ConfighubBuilder) prx.shareQueue = shareQeueuCh prx.updatePeers = updatePeersCh @@ -153,7 +155,7 @@ func NewReceiverProxy(config ReceiverProxyConfig) (*ReceiverProxy, error) { } go queue.Run() - archiveQueueCh := make(chan *ParsedRequest) + archiveQueueCh := make(chan *ParsedRequest, ReceiverProxyWorkerQueueSize) archiveFlushCh := make(chan struct{}) prx.archiveQueue = archiveQueueCh prx.archiveFlushQueue = archiveFlushCh diff --git a/proxy/sharing.go b/proxy/sharing.go index 07edf1f..345d50b 100644 --- a/proxy/sharing.go +++ b/proxy/sharing.go @@ -10,8 +10,8 @@ import ( ) var ( - jobBufferSize = 4096 - requestTimeout = time.Second * 10 + ShareWorkerQueueSize = 10000 + requestTimeout = time.Second * 10 ) type ShareQueue struct { @@ -33,7 +33,7 @@ type shareQueuePeer struct { func newShareQueuePeer(name string, client rpcclient.RPCClient) shareQueuePeer { return shareQueuePeer{ - ch: make(chan *ParsedRequest, jobBufferSize), + ch: make(chan *ParsedRequest, ShareWorkerQueueSize), name: name, client: client, }