Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

Commit

Permalink
Break notifiers into a map so that only other requests under the same…
Browse files Browse the repository at this point in the history
… root are notified
  • Loading branch information
willscott committed Apr 25, 2023
1 parent 52a5a5c commit 02618b6
Showing 1 changed file with 66 additions and 17 deletions.
83 changes: 66 additions & 17 deletions lib/graph_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,20 @@ type Notifier interface {
NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error
}

type notifiersForCid struct {
lk sync.RWMutex
deleted int8
notifiers []Notifier
}

type GraphGateway struct {
fetcher CarFetcher
blockFetcher exchange.Fetcher
routing routing.ValueStore
namesys namesys.NameSystem
bstore blockstore.Blockstore

lk sync.RWMutex
notifiers map[Notifier]struct{}
notifiers sync.Map // cid -> notifiersForCid
metrics *GraphGatewayMetrics
}

Expand Down Expand Up @@ -152,7 +157,7 @@ func NewGraphGatewayBackend(f CarFetcher, blockFetcher exchange.Fetcher, opts ..
routing: vs,
namesys: ns,
bstore: bs,
notifiers: make(map[Notifier]struct{}),
notifiers: sync.Map{},
metrics: registerGraphGatewayMetrics(),
}, nil
}
Expand Down Expand Up @@ -242,6 +247,18 @@ func registerGraphGatewayMetrics() *GraphGatewayMetrics {
}
}

func (api *GraphGateway) getRootOfPath(path string) string {
pth, err := ipfspath.ParsePath(path)
if err != nil {
return path
}
if pth.IsJustAKey() {
return pth.Segments()[0]
} else {
return pth.Segments()[1]
}
}

/*
Implementation iteration plan:
Expand All @@ -263,9 +280,25 @@ func (api *GraphGateway) loadRequestIntoSharedBlockstoreAndBlocksGateway(ctx con
metrics: api.metrics,
}

api.lk.Lock()
api.notifiers[exch] = struct{}{}
api.lk.Unlock()
notifierKey := api.getRootOfPath(path)
var notifier *notifiersForCid
for {
notifiers, _ := api.notifiers.LoadOrStore(notifierKey, &notifiersForCid{notifiers: []Notifier{}})
if n, ok := notifiers.(*notifiersForCid); ok {
n.lk.Lock()
// could have been deleted after our load. try again.
if n.deleted != 0 {
n.lk.Unlock()
continue
}
notifier = n
n.notifiers = append(n.notifiers, exch)
n.lk.Unlock()
break
} else {
return nil, nil, errors.New("failed to get notifier")
}
}

go func(metrics *GraphGatewayMetrics) {
defer func() {
Expand Down Expand Up @@ -297,7 +330,7 @@ func (api *GraphGateway) loadRequestIntoSharedBlockstoreAndBlocksGateway(ctx con
return err
}
metrics.carBlocksFetchedMetric.Inc()
api.notifyAllOngoingRequests(ctx, blk)
api.notifyOngoingRequests(ctx, notifierKey, blk)
}
})
if err != nil {
Expand All @@ -317,21 +350,37 @@ func (api *GraphGateway) loadRequestIntoSharedBlockstoreAndBlocksGateway(ctx con
}

return blkgw, func() {
api.lk.Lock()
delete(api.notifiers, exch)
api.lk.Unlock()
notifier.lk.Lock()
for i, e := range notifier.notifiers {
if e == exch {
notifier.notifiers = append(notifier.notifiers[0:i], notifier.notifiers[i+1:]...)
break
}
}
if len(notifier.notifiers) == 0 {
notifier.deleted = 1
api.notifiers.Delete(notifierKey)
}
notifier.lk.Unlock()
}, nil
}

func (api *GraphGateway) notifyAllOngoingRequests(ctx context.Context, blks ...blocks.Block) {
api.lk.RLock()
for n := range api.notifiers {
err := n.NotifyNewBlocks(ctx, blks...)
if err != nil {
graphLog.Errorw("notifyAllOngoingRequests failed", "error", err)
func (api *GraphGateway) notifyOngoingRequests(ctx context.Context, key string, blks ...blocks.Block) {
if notifiers, ok := api.notifiers.Load(key); ok {
notifier, ok := notifiers.(*notifiersForCid)
if !ok {
graphLog.Errorw("notifyOngoingRequests failed", "error", "could not get notifier")
return
}
notifier.lk.RLock()
for _, n := range notifier.notifiers {
err := n.NotifyNewBlocks(ctx, blks...)
if err != nil {
graphLog.Errorw("notifyAllOngoingRequests failed", "error", err)
}
}
notifier.lk.RUnlock()
}
api.lk.RUnlock()
}

type fileCloseWrapper struct {
Expand Down

0 comments on commit 02618b6

Please sign in to comment.