Skip to content

Commit

Permalink
Pandora can send previous block information while resubscribed (#112)
Browse files Browse the repository at this point in the history
- Fix pending block subscription api

Co-authored-by: meta-bot <[email protected]>
Co-authored-by: Atif Anowar <[email protected]>
  • Loading branch information
3 people authored Nov 26, 2021
1 parent 2addfe8 commit e6b5eef
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 13 deletions.
4 changes: 4 additions & 0 deletions accounts/abi/bind/backends/simulated.go
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,10 @@ func (fb *filterBackend) HeaderByHash(ctx context.Context, hash common.Hash) (*t
return fb.bc.GetHeaderByHash(hash), nil
}

func (fb *filterBackend) BlockByNumber(ctx context.Context, number rpc.BlockNumber) (*types.Block, error) {
return nil, nil
}

func (fb *filterBackend) GetReceipts(ctx context.Context, hash common.Hash) (types.Receipts, error) {
number := rawdb.ReadHeaderNumber(fb.db, hash)
if number == nil {
Expand Down
42 changes: 29 additions & 13 deletions eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,15 +228,33 @@ func (api *PublicFilterAPI) NewPendingBlockHeaders(ctx context.Context, pendingF
go func() {
// sending previous blocks to orchestrator
sender := func(windowStart, windowEnd uint64) {
log.Debug("sending header to orchestrator", "windowStart", windowStart, "windowEnd", windowEnd)
for i := windowStart; i <= windowEnd; i++ {
temphead, err := api.backend.HeaderByNumber(ctx, rpc.BlockNumber(i))
temphead, err := api.backend.BlockByNumber(ctx, rpc.BlockNumber(i))
if temphead != nil && err == nil {
// if block exists and no error occurred then send it
log.Debug("Notifying to orchestrator", "block number", temphead.Number.Uint64())
notifier.Notify(rpcSub.ID, temphead)
log.Debug("Notifying to orchestrator", "block number", temphead.NumberU64())
notifier.Notify(rpcSub.ID, temphead.Header())
} else {
log.Debug("sending failed", "consideredBlockNumber", i, "tempHead", temphead, "err", err)
}
}
}
sendByHash := func(start uint64, blockHash common.Hash) {
log.Info("sending header to orchestrator", "start", start, "blockHash", blockHash)
var headerArray []*types.Header
for header, _ := api.backend.HeaderByHash(ctx, blockHash); header != nil; header, _ = api.backend.HeaderByHash(ctx, header.ParentHash) {
headerArray = append(headerArray, header)
if header.Number.Uint64() == start {
break
}
}
log.Debug("sendByHash found headers of length", "len", len(headerArray))
for i := len(headerArray) - 1; i >= 0; i-- {
log.Debug("Notifying to orchestrator", "block number", headerArray[i].Number.Uint64())
notifier.Notify(rpcSub.ID, headerArray[i])
}
}
// we are starting from block 1 because block 0 has no pandora extra data info. so sending it will create an error
windowStart, windowEnd := uint64(1), uint64(1)
header, _ := api.backend.HeaderByHash(ctx, pendingFilter.FromBlockHash)
Expand Down Expand Up @@ -273,22 +291,20 @@ func (api *PublicFilterAPI) NewPendingBlockHeaders(ctx context.Context, pendingF
// If anything goes wrong remove it.
headers := make(chan *types.Header)
headersSub := api.events.SubscribePendingHeads(headers)
firstTime := true

for {
select {
case h := <-headers:
if firstTime {
// entered into the running phase. for the first time we will do checking and send previous blocks.
firstTime = false
if windowEnd+1 < h.Number.Uint64() {
// not consecutive. so send them first
windowStart = windowEnd + 1
windowEnd = h.Number.Uint64()
sender(windowStart, windowEnd)
}
if h != nil && windowEnd+1 < h.Number.Uint64() {
windowStart = windowEnd + 1
windowEnd = h.Number.Uint64() - 1
log.Debug("previous block information are not sent. so resending", "windowStart", windowStart, "windowEnd", windowEnd)
sendByHash(windowStart, h.ParentHash)
}
notifier.Notify(rpcSub.ID, h)
if h != nil {
windowEnd = h.Number.Uint64()
}
case rpcErr := <-rpcSub.Err():
log.Debug("error found in rpc subscription", "error", rpcErr)
headersSub.Unsubscribe()
Expand Down
1 change: 1 addition & 0 deletions eth/filters/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type Backend interface {
ChainDb() ethdb.Database
HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error)
HeaderByHash(ctx context.Context, blockHash common.Hash) (*types.Header, error)
BlockByNumber(ctx context.Context, number rpc.BlockNumber) (*types.Block, error)
GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error)
GetLogs(ctx context.Context, blockHash common.Hash) ([][]*types.Log, error)
GetPendingHeadsSince(ctx context.Context, from common.Hash) []*types.Header
Expand Down
4 changes: 4 additions & 0 deletions eth/filters/filter_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ func (b *testBackend) HeaderByHash(ctx context.Context, hash common.Hash) (*type
return rawdb.ReadHeader(b.db, hash, *number), nil
}

func (b *testBackend) BlockByNumber(ctx context.Context, number rpc.BlockNumber) (*types.Block, error) {
return nil, nil
}

func (b *testBackend) GetReceipts(ctx context.Context, hash common.Hash) (types.Receipts, error) {
if number := rawdb.ReadHeaderNumber(b.db, hash); number != nil {
return rawdb.ReadReceipts(b.db, hash, *number, params.TestChainConfig), nil
Expand Down

0 comments on commit e6b5eef

Please sign in to comment.