Skip to content

Commit

Permalink
refactor: manyIter now channels
Browse files Browse the repository at this point in the history
  • Loading branch information
hacdias committed Nov 30, 2023
1 parent 72cc034 commit 27a543b
Showing 1 changed file with 36 additions and 16 deletions.
52 changes: 36 additions & 16 deletions server_routers.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type parallelRouter struct {
}

func (r parallelRouter) FindProviders(ctx context.Context, key cid.Cid, limit int) (iter.ResultIter[types.Record], error) {
return find(r.routers, func(ri server.ContentRouter) (iter.ResultIter[types.Record], error) {
return find(ctx, r.routers, func(ri server.ContentRouter) (iter.ResultIter[types.Record], error) {
return ri.FindProviders(ctx, key, limit)
})
}
Expand All @@ -63,12 +63,12 @@ func (r parallelRouter) ProvideBitswap(ctx context.Context, req *server.BitswapW
}

func (r parallelRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[*types.PeerRecord], error) {
return find(r.routers, func(ri server.ContentRouter) (iter.ResultIter[*types.PeerRecord], error) {
return find(ctx, r.routers, func(ri server.ContentRouter) (iter.ResultIter[*types.PeerRecord], error) {
return ri.FindPeers(ctx, pid, limit)
})
}

func find[T any](routers []server.ContentRouter, call func(server.ContentRouter) (iter.ResultIter[T], error)) (iter.ResultIter[T], error) {
func find[T any](ctx context.Context, routers []server.ContentRouter, call func(server.ContentRouter) (iter.ResultIter[T], error)) (iter.ResultIter[T], error) {
switch len(routers) {
case 0:
return iter.ToResultIter(iter.FromSlice([]T{})), nil
Expand All @@ -94,36 +94,56 @@ func find[T any](routers []server.ContentRouter, call func(server.ContentRouter)
}

// Otherwise return manyIter with remaining iterators.
return &manyIter[T]{it: its}, nil
return newManyIter(ctx, its), nil
}

type manyIter[T any] struct {
it []iter.ResultIter[T]
next int
ctx context.Context
its []iter.ResultIter[T]
nextCh chan int
next int
}

func (mi *manyIter[T]) Next() bool {
for i, it := range mi.it {
if it.Next() {
mi.next = i
return true
}
func newManyIter[T any](ctx context.Context, its []iter.ResultIter[T]) *manyIter[T] {
nextCh := make(chan int)

for i, it := range its {
go func(ch chan int, it iter.ResultIter[T], index int) {
for it.Next() {
ch <- index
}
}(nextCh, it, i)
}

mi.next = -1
return false
return &manyIter[T]{
ctx: ctx,
its: its,
nextCh: nextCh,
next: -1,
}
}

func (mi *manyIter[T]) Next() bool {
select {
case i := <-mi.nextCh:
mi.next = i
return true
case <-mi.ctx.Done():
mi.next = -1
return false
}
}

func (mi *manyIter[T]) Val() iter.Result[T] {
if mi.next == -1 {
return iter.Result[T]{Err: errors.New("no next value")}
}
return mi.it[mi.next].Val()
return mi.its[mi.next].Val()
}

func (mi *manyIter[T]) Close() error {
var err error
for _, it := range mi.it {
for _, it := range mi.its {
err = errors.Join(err, it.Close())
}
return err
Expand Down

0 comments on commit 27a543b

Please sign in to comment.