From 02618b63fb152eb0045ede67a87d911e53dd07cc Mon Sep 17 00:00:00 2001 From: Will Scott Date: Tue, 25 Apr 2023 15:48:58 +0200 Subject: [PATCH 1/2] Break notifiers into a map so that only other requests under the same root are notified --- lib/graph_gateway.go | 83 +++++++++++++++++++++++++++++++++++--------- 1 file changed, 66 insertions(+), 17 deletions(-) diff --git a/lib/graph_gateway.go b/lib/graph_gateway.go index 833fbbc..b4552e9 100644 --- a/lib/graph_gateway.go +++ b/lib/graph_gateway.go @@ -83,6 +83,12 @@ type Notifier interface { NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error } +type notifiersForCid struct { + lk sync.RWMutex + deleted int8 + notifiers []Notifier +} + type GraphGateway struct { fetcher CarFetcher blockFetcher exchange.Fetcher @@ -90,8 +96,7 @@ type GraphGateway struct { namesys namesys.NameSystem bstore blockstore.Blockstore - lk sync.RWMutex - notifiers map[Notifier]struct{} + notifiers sync.Map // cid -> notifiersForCid metrics *GraphGatewayMetrics } @@ -152,7 +157,7 @@ func NewGraphGatewayBackend(f CarFetcher, blockFetcher exchange.Fetcher, opts .. routing: vs, namesys: ns, bstore: bs, - notifiers: make(map[Notifier]struct{}), + notifiers: sync.Map{}, metrics: registerGraphGatewayMetrics(), }, nil } @@ -242,6 +247,18 @@ func registerGraphGatewayMetrics() *GraphGatewayMetrics { } } +func (api *GraphGateway) getRootOfPath(path string) string { + pth, err := ipfspath.ParsePath(path) + if err != nil { + return path + } + if pth.IsJustAKey() { + return pth.Segments()[0] + } else { + return pth.Segments()[1] + } +} + /* Implementation iteration plan: @@ -263,9 +280,25 @@ func (api *GraphGateway) loadRequestIntoSharedBlockstoreAndBlocksGateway(ctx con metrics: api.metrics, } - api.lk.Lock() - api.notifiers[exch] = struct{}{} - api.lk.Unlock() + notifierKey := api.getRootOfPath(path) + var notifier *notifiersForCid + for { + notifiers, _ := api.notifiers.LoadOrStore(notifierKey, ¬ifiersForCid{notifiers: []Notifier{}}) + if n, ok := notifiers.(*notifiersForCid); ok { + n.lk.Lock() + // could have been deleted after our load. try again. + if n.deleted != 0 { + n.lk.Unlock() + continue + } + notifier = n + n.notifiers = append(n.notifiers, exch) + n.lk.Unlock() + break + } else { + return nil, nil, errors.New("failed to get notifier") + } + } go func(metrics *GraphGatewayMetrics) { defer func() { @@ -297,7 +330,7 @@ func (api *GraphGateway) loadRequestIntoSharedBlockstoreAndBlocksGateway(ctx con return err } metrics.carBlocksFetchedMetric.Inc() - api.notifyAllOngoingRequests(ctx, blk) + api.notifyOngoingRequests(ctx, notifierKey, blk) } }) if err != nil { @@ -317,21 +350,37 @@ func (api *GraphGateway) loadRequestIntoSharedBlockstoreAndBlocksGateway(ctx con } return blkgw, func() { - api.lk.Lock() - delete(api.notifiers, exch) - api.lk.Unlock() + notifier.lk.Lock() + for i, e := range notifier.notifiers { + if e == exch { + notifier.notifiers = append(notifier.notifiers[0:i], notifier.notifiers[i+1:]...) + break + } + } + if len(notifier.notifiers) == 0 { + notifier.deleted = 1 + api.notifiers.Delete(notifierKey) + } + notifier.lk.Unlock() }, nil } -func (api *GraphGateway) notifyAllOngoingRequests(ctx context.Context, blks ...blocks.Block) { - api.lk.RLock() - for n := range api.notifiers { - err := n.NotifyNewBlocks(ctx, blks...) - if err != nil { - graphLog.Errorw("notifyAllOngoingRequests failed", "error", err) +func (api *GraphGateway) notifyOngoingRequests(ctx context.Context, key string, blks ...blocks.Block) { + if notifiers, ok := api.notifiers.Load(key); ok { + notifier, ok := notifiers.(*notifiersForCid) + if !ok { + graphLog.Errorw("notifyOngoingRequests failed", "error", "could not get notifier") + return + } + notifier.lk.RLock() + for _, n := range notifier.notifiers { + err := n.NotifyNewBlocks(ctx, blks...) + if err != nil { + graphLog.Errorw("notifyAllOngoingRequests failed", "error", err) + } } + notifier.lk.RUnlock() } - api.lk.RUnlock() } type fileCloseWrapper struct { From 36f029e7bae61c3be4c85b2df5e5c5cf4dd8b94e Mon Sep 17 00:00:00 2001 From: Marcin Rataj Date: Wed, 26 Apr 2023 17:38:46 +0200 Subject: [PATCH 2/2] =?UTF-8?q?chore:=20notifiersForCid=20=E2=86=92=20noti?= =?UTF-8?q?fiersForRootCid?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lib/graph_gateway.go | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/lib/graph_gateway.go b/lib/graph_gateway.go index b4552e9..5fa642d 100644 --- a/lib/graph_gateway.go +++ b/lib/graph_gateway.go @@ -83,7 +83,9 @@ type Notifier interface { NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error } -type notifiersForCid struct { +// notifiersForRootCid is used for reducing lock contention by only notifying +// exchanges related to the same content root CID +type notifiersForRootCid struct { lk sync.RWMutex deleted int8 notifiers []Notifier @@ -96,7 +98,7 @@ type GraphGateway struct { namesys namesys.NameSystem bstore blockstore.Blockstore - notifiers sync.Map // cid -> notifiersForCid + notifiers sync.Map // cid -> notifiersForRootCid metrics *GraphGatewayMetrics } @@ -281,10 +283,10 @@ func (api *GraphGateway) loadRequestIntoSharedBlockstoreAndBlocksGateway(ctx con } notifierKey := api.getRootOfPath(path) - var notifier *notifiersForCid + var notifier *notifiersForRootCid for { - notifiers, _ := api.notifiers.LoadOrStore(notifierKey, ¬ifiersForCid{notifiers: []Notifier{}}) - if n, ok := notifiers.(*notifiersForCid); ok { + notifiers, _ := api.notifiers.LoadOrStore(notifierKey, ¬ifiersForRootCid{notifiers: []Notifier{}}) + if n, ok := notifiers.(*notifiersForRootCid); ok { n.lk.Lock() // could have been deleted after our load. try again. if n.deleted != 0 { @@ -367,16 +369,16 @@ func (api *GraphGateway) loadRequestIntoSharedBlockstoreAndBlocksGateway(ctx con func (api *GraphGateway) notifyOngoingRequests(ctx context.Context, key string, blks ...blocks.Block) { if notifiers, ok := api.notifiers.Load(key); ok { - notifier, ok := notifiers.(*notifiersForCid) + notifier, ok := notifiers.(*notifiersForRootCid) if !ok { - graphLog.Errorw("notifyOngoingRequests failed", "error", "could not get notifier") + graphLog.Errorw("notifyOngoingRequests failed", "key", key, "error", "could not get notifiersForRootCid") return } notifier.lk.RLock() for _, n := range notifier.notifiers { err := n.NotifyNewBlocks(ctx, blks...) if err != nil { - graphLog.Errorw("notifyAllOngoingRequests failed", "error", err) + graphLog.Errorw("notifyOngoingRequests failed", "key", key, "error", err) } } notifier.lk.RUnlock()