diff --git a/eth/filters/api.go b/eth/filters/api.go index bd53847fc6de..14291b3ff915 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -233,10 +233,10 @@ func (api *PublicFilterAPI) NewPendingBlockHeaders(ctx context.Context, pendingF temphead, err := api.backend.BlockByNumber(ctx, rpc.BlockNumber(i)) if temphead != nil && err == nil { // if block exists and no error occurred then send it - log.Debug("Notifying to orchestrator", "block number", temphead.NumberU64()) + log.Info("Notifying to orchestrator", "block number", temphead.NumberU64()) notifier.Notify(rpcSub.ID, temphead.Header()) } else { - log.Debug("sending failed", "consideredBlockNumber", i, "tempHead", temphead, "err", err) + log.Info("sending failed", "consideredBlockNumber", i, "tempHead", temphead, "err", err) } } } @@ -245,13 +245,13 @@ func (api *PublicFilterAPI) NewPendingBlockHeaders(ctx context.Context, pendingF var headerArray []*types.Header for header, _ := api.backend.HeaderByHash(ctx, blockHash); header != nil; header, _ = api.backend.HeaderByHash(ctx, header.ParentHash) { headerArray = append(headerArray, header) - if header.Number.Uint64() == start { + if header.Number.Uint64() <= start { break } } - log.Debug("sendByHash found headers of length", "len", len(headerArray)) + log.Info("sendByHash found headers of length", "len", len(headerArray)) for i := len(headerArray) - 1; i >= 0; i-- { - log.Debug("Notifying to orchestrator", "block number", headerArray[i].Number.Uint64()) + log.Info("Notifying to orchestrator", "block number", headerArray[i].Number.Uint64()) notifier.Notify(rpcSub.ID, headerArray[i]) } } @@ -265,27 +265,8 @@ func (api *PublicFilterAPI) NewPendingBlockHeaders(ctx context.Context, pendingF if header != nil { windowEnd = header.Number.Uint64() } - log.Debug("NewPendingBlockHeaders sending", "start", windowStart, "end", windowEnd) + log.Info("NewPendingBlockHeaders sending", "start", windowStart, "end", windowEnd) sender(windowStart, windowEnd) - // first send all available pending headers from the pending queue - pendingHeaders := api.backend.GetPendingHeadsSince(ctx, pendingFilter.FromBlockHash) - for _, pendingHeader := range pendingHeaders { - log.Debug("pending headers are sending first", "from", pendingFilter.FromBlockHash, "header hash", pendingHeader.Hash()) - notifier.Notify(rpcSub.ID, pendingHeader) - } - - if len(pendingHeaders) > 0 { - penHeaderStart := pendingHeaders[0].Number.Uint64() - if windowEnd+1 < penHeaderStart { - // not consecutive. so more blocks are finalized in the mean time. send them - windowStart = windowEnd + 1 - tempHead, _ := api.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber) - if tempHead != nil { - windowEnd = tempHead.Number.Uint64() - } - sender(windowStart, windowEnd) - } - } // accept PendingHeaderChannelSize concurrent pending headers and send them // If anything goes wrong remove it. @@ -295,16 +276,18 @@ func (api *PublicFilterAPI) NewPendingBlockHeaders(ctx context.Context, pendingF for { select { case h := <-headers: - if h != nil && windowEnd+1 < h.Number.Uint64() { + if h == nil { + log.Info("header is nil so orchestrator can't be notified") + continue + } + if windowEnd+1 < h.Number.Uint64() { windowStart = windowEnd + 1 - windowEnd = h.Number.Uint64() - 1 - log.Debug("previous block information are not sent. so resending", "windowStart", windowStart, "windowEnd", windowEnd) + log.Info("previous block information are not sent. so resending", "windowStart", windowStart, "windowEnd", h.Number.Uint64()-1) sendByHash(windowStart, h.ParentHash) } + windowEnd = h.Number.Uint64() notifier.Notify(rpcSub.ID, h) - if h != nil { - windowEnd = h.Number.Uint64() - } + log.Info("Successfully notify the orchestrator", "header", *h, "windowEnd", windowEnd) case rpcErr := <-rpcSub.Err(): log.Debug("error found in rpc subscription", "error", rpcErr) headersSub.Unsubscribe()