Skip to content

Commit

Permalink
feat: debug too many shuffling promises (#7251)
Browse files Browse the repository at this point in the history
* feat: add asyncShufflingCalculation to StateTransitionOpts

* feat: add asyncShufflingCalculation to all regen / processSlots consumers

* fix: default to false for async shuffling and remove unnecessary props

* fix: remove unnecessary flags from stateTransition

* feat: implement conditional build of shuffling for prepareNextSlot

* fix: spec test bug where shufflingCache is present from BeaconChain constructor

* feat: sync build next shuffling if not queued async

* fix: use getSync to pull next shuffling correctly

* docs: add comment to prepareNextSlot

* refactor: rename StateCloneOpts to StateRegenerationOpts

* feat: pass asyncShufflingCalculation through to afterProcessEpoch and refactor conditional to run purely sync

* docs: add issue number to comment

* chore: lint
  • Loading branch information
matthewkeil authored Dec 4, 2024
1 parent 37c4287 commit dbe2188
Show file tree
Hide file tree
Showing 11 changed files with 105 additions and 70 deletions.
7 changes: 6 additions & 1 deletion packages/beacon-node/src/chain/prepareNextSlot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,12 @@ export class PrepareNextSlotScheduler {
// the slot 0 of next epoch will likely use this Previous Root Checkpoint state for state transition so we transfer cache here
// the resulting state with cache will be cached in Checkpoint State Cache which is used for the upcoming block processing
// for other slots dontTransferCached=true because we don't run state transition on this state
{dontTransferCache: !isEpochTransition},
//
// Shuffling calculation will be done asynchronously when passing asyncShufflingCalculation=true. Shuffling will be queued in
// beforeProcessEpoch and should theoretically be ready immediately after the synchronous epoch transition finished and the
// event loop is free. In long periods of non-finality too many forks will cause the shufflingCache to throw an error for
// too many queued shufflings so only run async during normal epoch transition. See issue ChainSafe/lodestar#7244
{dontTransferCache: !isEpochTransition, asyncShufflingCalculation: true},
RegenCaller.precomputeEpoch
);

Expand Down
18 changes: 13 additions & 5 deletions packages/beacon-node/src/chain/regen/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,12 @@ export enum RegenFnName {
getCheckpointState = "getCheckpointState",
}

export type StateCloneOpts = {
export type StateRegenerationOpts = {
dontTransferCache: boolean;
/**
* Do not queue shuffling calculation async. Forces sync JIT calculation in afterProcessEpoch if not passed as `true`
*/
asyncShufflingCalculation?: boolean;
};

export interface IStateRegenerator extends IStateRegeneratorInternal {
Expand All @@ -56,15 +60,19 @@ export interface IStateRegeneratorInternal {
* Return a valid pre-state for a beacon block
* This will always return a state in the latest viable epoch
*/
getPreState(block: BeaconBlock, opts: StateCloneOpts, rCaller: RegenCaller): Promise<CachedBeaconStateAllForks>;
getPreState(
block: BeaconBlock,
opts: StateRegenerationOpts,
rCaller: RegenCaller
): Promise<CachedBeaconStateAllForks>;

/**
* Return a valid checkpoint state
* This will always return a state with `state.slot % SLOTS_PER_EPOCH === 0`
*/
getCheckpointState(
cp: phase0.Checkpoint,
opts: StateCloneOpts,
opts: StateRegenerationOpts,
rCaller: RegenCaller
): Promise<CachedBeaconStateAllForks>;

Expand All @@ -74,12 +82,12 @@ export interface IStateRegeneratorInternal {
getBlockSlotState(
blockRoot: RootHex,
slot: Slot,
opts: StateCloneOpts,
opts: StateRegenerationOpts,
rCaller: RegenCaller
): Promise<CachedBeaconStateAllForks>;

/**
* Return the exact state with `stateRoot`
*/
getState(stateRoot: RootHex, rCaller: RegenCaller, opts?: StateCloneOpts): Promise<CachedBeaconStateAllForks>;
getState(stateRoot: RootHex, rCaller: RegenCaller, opts?: StateRegenerationOpts): Promise<CachedBeaconStateAllForks>;
}
18 changes: 12 additions & 6 deletions packages/beacon-node/src/chain/regen/queued.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,13 @@ import {JobItemQueue} from "../../util/queue/index.js";
import {CheckpointHex, toCheckpointHex} from "../stateCache/index.js";
import {BlockStateCache, CheckpointStateCache} from "../stateCache/types.js";
import {RegenError, RegenErrorCode} from "./errors.js";
import {IStateRegenerator, IStateRegeneratorInternal, RegenCaller, RegenFnName, StateCloneOpts} from "./interface.js";
import {
IStateRegenerator,
IStateRegeneratorInternal,
RegenCaller,
RegenFnName,
StateRegenerationOpts,
} from "./interface.js";
import {RegenModules, StateRegenerator} from "./regen.js";

const REGEN_QUEUE_MAX_LEN = 256;
Expand Down Expand Up @@ -86,7 +92,7 @@ export class QueuedStateRegenerator implements IStateRegenerator {
*/
getPreStateSync(
block: BeaconBlock,
opts: StateCloneOpts = {dontTransferCache: true}
opts: StateRegenerationOpts = {dontTransferCache: true}
): CachedBeaconStateAllForks | null {
const parentRoot = toRootHex(block.parentRoot);
const parentBlock = this.forkChoice.getBlockHex(parentRoot);
Expand Down Expand Up @@ -212,7 +218,7 @@ export class QueuedStateRegenerator implements IStateRegenerator {
*/
async getPreState(
block: BeaconBlock,
opts: StateCloneOpts,
opts: StateRegenerationOpts,
rCaller: RegenCaller
): Promise<CachedBeaconStateAllForks> {
this.metrics?.regenFnCallTotal.inc({caller: rCaller, entrypoint: RegenFnName.getPreState});
Expand All @@ -231,7 +237,7 @@ export class QueuedStateRegenerator implements IStateRegenerator {

async getCheckpointState(
cp: phase0.Checkpoint,
opts: StateCloneOpts,
opts: StateRegenerationOpts,
rCaller: RegenCaller
): Promise<CachedBeaconStateAllForks> {
this.metrics?.regenFnCallTotal.inc({caller: rCaller, entrypoint: RegenFnName.getCheckpointState});
Expand All @@ -256,7 +262,7 @@ export class QueuedStateRegenerator implements IStateRegenerator {
async getBlockSlotState(
blockRoot: RootHex,
slot: Slot,
opts: StateCloneOpts,
opts: StateRegenerationOpts,
rCaller: RegenCaller
): Promise<CachedBeaconStateAllForks> {
this.metrics?.regenFnCallTotal.inc({caller: rCaller, entrypoint: RegenFnName.getBlockSlotState});
Expand All @@ -268,7 +274,7 @@ export class QueuedStateRegenerator implements IStateRegenerator {
async getState(
stateRoot: RootHex,
rCaller: RegenCaller,
opts: StateCloneOpts = {dontTransferCache: true}
opts: StateRegenerationOpts = {dontTransferCache: true}
): Promise<CachedBeaconStateAllForks> {
this.metrics?.regenFnCallTotal.inc({caller: rCaller, entrypoint: RegenFnName.getState});

Expand Down
14 changes: 7 additions & 7 deletions packages/beacon-node/src/chain/regen/regen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import {getCheckpointFromState} from "../blocks/utils/checkpoint.js";
import {ChainEvent, ChainEventEmitter} from "../emitter.js";
import {BlockStateCache, CheckpointStateCache} from "../stateCache/types.js";
import {RegenError, RegenErrorCode} from "./errors.js";
import {IStateRegeneratorInternal, RegenCaller, StateCloneOpts} from "./interface.js";
import {IStateRegeneratorInternal, RegenCaller, StateRegenerationOpts} from "./interface.js";

export type RegenModules = {
db: IBeaconDb;
Expand Down Expand Up @@ -51,7 +51,7 @@ export class StateRegenerator implements IStateRegeneratorInternal {
*/
async getPreState(
block: BeaconBlock,
opts: StateCloneOpts,
opts: StateRegenerationOpts,
regenCaller: RegenCaller
): Promise<CachedBeaconStateAllForks> {
const parentBlock = this.modules.forkChoice.getBlock(block.parentRoot);
Expand Down Expand Up @@ -84,7 +84,7 @@ export class StateRegenerator implements IStateRegeneratorInternal {
*/
async getCheckpointState(
cp: phase0.Checkpoint,
opts: StateCloneOpts,
opts: StateRegenerationOpts,
regenCaller: RegenCaller,
allowDiskReload = false
): Promise<CachedBeaconStateAllForks> {
Expand All @@ -99,7 +99,7 @@ export class StateRegenerator implements IStateRegeneratorInternal {
async getBlockSlotState(
blockRoot: RootHex,
slot: Slot,
opts: StateCloneOpts,
opts: StateRegenerationOpts,
regenCaller: RegenCaller,
allowDiskReload = false
): Promise<CachedBeaconStateAllForks> {
Expand Down Expand Up @@ -146,7 +146,7 @@ export class StateRegenerator implements IStateRegeneratorInternal {
async getState(
stateRoot: RootHex,
caller: RegenCaller,
opts?: StateCloneOpts,
opts?: StateRegenerationOpts,
// internal option, don't want to expose to external caller
allowDiskReload = false
): Promise<CachedBeaconStateAllForks> {
Expand Down Expand Up @@ -322,7 +322,7 @@ async function processSlotsByCheckpoint(
preState: CachedBeaconStateAllForks,
slot: Slot,
regenCaller: RegenCaller,
opts: StateCloneOpts
opts: StateRegenerationOpts
): Promise<CachedBeaconStateAllForks> {
let postState = await processSlotsToNearestCheckpoint(modules, preState, slot, regenCaller, opts);
if (postState.slot < slot) {
Expand All @@ -343,7 +343,7 @@ async function processSlotsToNearestCheckpoint(
preState: CachedBeaconStateAllForks,
slot: Slot,
regenCaller: RegenCaller,
opts: StateCloneOpts
opts: StateRegenerationOpts
): Promise<CachedBeaconStateAllForks> {
const preSlot = preState.slot;
const postSlot = slot;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {CachedBeaconStateAllForks} from "@lodestar/state-transition";
import {Epoch, RootHex} from "@lodestar/types";
import {toRootHex} from "@lodestar/utils";
import {Metrics} from "../../metrics/index.js";
import {StateCloneOpts} from "../regen/interface.js";
import {StateRegenerationOpts} from "../regen/interface.js";
import {MapTracker} from "./mapMetrics.js";
import {BlockStateCache} from "./types.js";

Expand Down Expand Up @@ -39,7 +39,7 @@ export class BlockStateCacheImpl implements BlockStateCache {
}
}

get(rootHex: RootHex, opts?: StateCloneOpts): CachedBeaconStateAllForks | null {
get(rootHex: RootHex, opts?: StateRegenerationOpts): CachedBeaconStateAllForks | null {
this.metrics?.lookups.inc();
const item = this.head?.stateRoot === rootHex ? this.head.state : this.cache.get(rootHex);
if (!item) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {RootHex} from "@lodestar/types";
import {toRootHex} from "@lodestar/utils";
import {Metrics} from "../../metrics/index.js";
import {LinkedList} from "../../util/array.js";
import {StateCloneOpts} from "../regen/interface.js";
import {StateRegenerationOpts} from "../regen/interface.js";
import {MapTracker} from "./mapMetrics.js";
import {BlockStateCache} from "./types.js";

Expand Down Expand Up @@ -93,7 +93,7 @@ export class FIFOBlockStateCache implements BlockStateCache {
/**
* Get a state from this cache given a state root hex.
*/
get(rootHex: RootHex, opts?: StateCloneOpts): CachedBeaconStateAllForks | null {
get(rootHex: RootHex, opts?: StateRegenerationOpts): CachedBeaconStateAllForks | null {
this.metrics?.lookups.inc();
const item = this.cache.get(rootHex);
if (!item) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {CachedBeaconStateAllForks} from "@lodestar/state-transition";
import {Epoch, RootHex, phase0} from "@lodestar/types";
import {MapDef, toRootHex} from "@lodestar/utils";
import {Metrics} from "../../metrics/index.js";
import {StateCloneOpts} from "../regen/interface.js";
import {StateRegenerationOpts} from "../regen/interface.js";
import {MapTracker} from "./mapMetrics.js";
import {CacheItemType, CheckpointStateCache} from "./types.js";

Expand Down Expand Up @@ -42,7 +42,7 @@ export class InMemoryCheckpointStateCache implements CheckpointStateCache {
this.maxEpochs = maxEpochs;
}

async getOrReload(cp: CheckpointHex, opts?: StateCloneOpts): Promise<CachedBeaconStateAllForks | null> {
async getOrReload(cp: CheckpointHex, opts?: StateRegenerationOpts): Promise<CachedBeaconStateAllForks | null> {
return this.get(cp, opts);
}

Expand All @@ -54,7 +54,7 @@ export class InMemoryCheckpointStateCache implements CheckpointStateCache {
async getOrReloadLatest(
rootHex: string,
maxEpoch: number,
opts?: StateCloneOpts
opts?: StateRegenerationOpts
): Promise<CachedBeaconStateAllForks | null> {
return this.getLatest(rootHex, maxEpoch, opts);
}
Expand All @@ -64,7 +64,7 @@ export class InMemoryCheckpointStateCache implements CheckpointStateCache {
return 0;
}

get(cp: CheckpointHex, opts?: StateCloneOpts): CachedBeaconStateAllForks | null {
get(cp: CheckpointHex, opts?: StateRegenerationOpts): CachedBeaconStateAllForks | null {
this.metrics?.lookups.inc();
const cpKey = toCheckpointKey(cp);
const item = this.cache.get(cpKey);
Expand Down Expand Up @@ -98,7 +98,7 @@ export class InMemoryCheckpointStateCache implements CheckpointStateCache {
/**
* Searches for the latest cached state with a `root`, starting with `epoch` and descending
*/
getLatest(rootHex: RootHex, maxEpoch: Epoch, opts?: StateCloneOpts): CachedBeaconStateAllForks | null {
getLatest(rootHex: RootHex, maxEpoch: Epoch, opts?: StateRegenerationOpts): CachedBeaconStateAllForks | null {
// sort epochs in descending order, only consider epochs lte `epoch`
const epochs = Array.from(this.epochIndex.keys())
.sort((a, b) => b - a)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {Logger, MapDef, fromHex, sleep, toHex, toRootHex} from "@lodestar/utils"
import {Metrics} from "../../metrics/index.js";
import {AllocSource, BufferPool, BufferWithKey} from "../../util/bufferPool.js";
import {IClock} from "../../util/clock.js";
import {StateCloneOpts} from "../regen/interface.js";
import {StateRegenerationOpts} from "../regen/interface.js";
import {serializeState} from "../serializeState.js";
import {CPStateDatastore, DatastoreKey} from "./datastore/index.js";
import {MapTracker} from "./mapMetrics.js";
Expand Down Expand Up @@ -168,7 +168,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
* - Get block for processing
* - Regen head state
*/
async getOrReload(cp: CheckpointHex, opts?: StateCloneOpts): Promise<CachedBeaconStateAllForks | null> {
async getOrReload(cp: CheckpointHex, opts?: StateRegenerationOpts): Promise<CachedBeaconStateAllForks | null> {
const stateOrStateBytesData = await this.getStateOrLoadDb(cp, opts);
if (stateOrStateBytesData === null || isCachedBeaconState(stateOrStateBytesData)) {
return stateOrStateBytesData?.clone(opts?.dontTransferCache) ?? null;
Expand Down Expand Up @@ -240,7 +240,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
*/
async getStateOrLoadDb(
cp: CheckpointHex,
opts?: StateCloneOpts
opts?: StateRegenerationOpts
): Promise<CachedBeaconStateAllForks | LoadedStateBytesData | null> {
const cpKey = toCacheKey(cp);
const inMemoryState = this.get(cpKey, opts);
Expand Down Expand Up @@ -272,7 +272,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
/**
* Similar to get() api without reloading from disk
*/
get(cpOrKey: CheckpointHex | string, opts?: StateCloneOpts): CachedBeaconStateAllForks | null {
get(cpOrKey: CheckpointHex | string, opts?: StateRegenerationOpts): CachedBeaconStateAllForks | null {
this.metrics?.cpStateCache.lookups.inc();
const cpKey = typeof cpOrKey === "string" ? cpOrKey : toCacheKey(cpOrKey);
const cacheItem = this.cache.get(cpKey);
Expand Down Expand Up @@ -323,7 +323,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
/**
* Searches in-memory state for the latest cached state with a `root` without reload, starting with `epoch` and descending
*/
getLatest(rootHex: RootHex, maxEpoch: Epoch, opts?: StateCloneOpts): CachedBeaconStateAllForks | null {
getLatest(rootHex: RootHex, maxEpoch: Epoch, opts?: StateRegenerationOpts): CachedBeaconStateAllForks | null {
// sort epochs in descending order, only consider epochs lte `epoch`
const epochs = Array.from(this.epochIndex.keys())
.sort((a, b) => b - a)
Expand All @@ -349,7 +349,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
async getOrReloadLatest(
rootHex: RootHex,
maxEpoch: Epoch,
opts?: StateCloneOpts
opts?: StateRegenerationOpts
): Promise<CachedBeaconStateAllForks | null> {
// sort epochs in descending order, only consider epochs lte `epoch`
const epochs = Array.from(this.epochIndex.keys())
Expand Down
12 changes: 6 additions & 6 deletions packages/beacon-node/src/chain/stateCache/types.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {routes} from "@lodestar/api";
import {CachedBeaconStateAllForks} from "@lodestar/state-transition";
import {Epoch, RootHex, phase0} from "@lodestar/types";
import {StateCloneOpts} from "../regen/interface.js";
import {StateRegenerationOpts} from "../regen/interface.js";

export type CheckpointHex = {epoch: Epoch; rootHex: RootHex};

Expand All @@ -21,7 +21,7 @@ export type CheckpointHex = {epoch: Epoch; rootHex: RootHex};
* The cache key is state root
*/
export interface BlockStateCache {
get(rootHex: RootHex, opts?: StateCloneOpts): CachedBeaconStateAllForks | null;
get(rootHex: RootHex, opts?: StateRegenerationOpts): CachedBeaconStateAllForks | null;
add(item: CachedBeaconStateAllForks): void;
setHeadState(item: CachedBeaconStateAllForks | null): void;
/**
Expand Down Expand Up @@ -60,15 +60,15 @@ export interface BlockStateCache {
*/
export interface CheckpointStateCache {
init?: () => Promise<void>;
getOrReload(cp: CheckpointHex, opts?: StateCloneOpts): Promise<CachedBeaconStateAllForks | null>;
getOrReload(cp: CheckpointHex, opts?: StateRegenerationOpts): Promise<CachedBeaconStateAllForks | null>;
getStateOrBytes(cp: CheckpointHex): Promise<CachedBeaconStateAllForks | Uint8Array | null>;
get(cpOrKey: CheckpointHex | string, opts?: StateCloneOpts): CachedBeaconStateAllForks | null;
get(cpOrKey: CheckpointHex | string, opts?: StateRegenerationOpts): CachedBeaconStateAllForks | null;
add(cp: phase0.Checkpoint, state: CachedBeaconStateAllForks): void;
getLatest(rootHex: RootHex, maxEpoch: Epoch, opts?: StateCloneOpts): CachedBeaconStateAllForks | null;
getLatest(rootHex: RootHex, maxEpoch: Epoch, opts?: StateRegenerationOpts): CachedBeaconStateAllForks | null;
getOrReloadLatest(
rootHex: RootHex,
maxEpoch: Epoch,
opts?: StateCloneOpts
opts?: StateRegenerationOpts
): Promise<CachedBeaconStateAllForks | null>;
updatePreComputedCheckpoint(rootHex: RootHex, epoch: Epoch): number | null;
prune(finalizedEpoch: Epoch, justifiedEpoch: Epoch): void;
Expand Down
Loading

0 comments on commit dbe2188

Please sign in to comment.