Skip to content

Commit

Permalink
Update the state archive to use a defined structure
Browse files Browse the repository at this point in the history
  • Loading branch information
nazarhussain committed Oct 23, 2024
1 parent 588fd9c commit 3c5ecef
Show file tree
Hide file tree
Showing 17 changed files with 382 additions and 202 deletions.
223 changes: 144 additions & 79 deletions packages/beacon-node/src/chain/historicalState/historicalState.ts
Original file line number Diff line number Diff line change
@@ -1,110 +1,120 @@
import {PubkeyIndexMap} from "@chainsafe/pubkey-index-map";
import {Slot} from "@lodestar/types";
import {Logger} from "@lodestar/logger";
import {BeaconConfig} from "@lodestar/config";
import {BeaconConfig, ChainForkConfig} from "@lodestar/config";
import {computeEpochAtSlot} from "@lodestar/state-transition";
import {formatBytes} from "@lodestar/utils";
import {IBeaconDb} from "../../db/interface.js";
import {HistoricalStateRegenMetrics, IBinaryDiffCodec, HistoricalStateSlotType} from "./types.js";
import {HistoricalStateRegenMetrics, IStateDiffCodec, HistoricalStateStorageType} from "./types.js";
import {replayBlocks} from "./utils/blockReplay.js";
import {HierarchicalLayers} from "./utils/hierarchicalLayers.js";
import {XDelta3Codec} from "./utils/xdelta3.js";
import {getDiffState} from "./utils/diff.js";
import {getDiffStateArchive} from "./utils/diff.js";
import {StateArchiveMode} from "../archiver/interface.js";
import {
computeDiffArchive,
getLastStoredStateArchive,
StateArchiveSSZType,
stateArchiveToStateBytes,
stateBytesToStateArchive,
} from "./utils/stateArchive.js";

export const codec: IBinaryDiffCodec = new XDelta3Codec();
export const codec: IStateDiffCodec = new XDelta3Codec();

type HStateOperationOptions = {
db: IBeaconDb;
config: BeaconConfig;
logger: Logger;
hierarchicalLayers: HierarchicalLayers;
metrics?: HistoricalStateRegenMetrics;
stateArchiveMode: StateArchiveMode;
};

export async function getHistoricalState(
{slot, archiveMode}: {slot: Slot; archiveMode: StateArchiveMode},
slot: Slot,
{
stateArchiveMode,
db,
logger,
config,
metrics,
hierarchicalLayers,
pubkey2index,
}: {
config: BeaconConfig;
db: IBeaconDb;
pubkey2index: PubkeyIndexMap;
logger: Logger;
hierarchicalLayers: HierarchicalLayers;
metrics?: HistoricalStateRegenMetrics;
}
}: HStateOperationOptions & {pubkey2index: PubkeyIndexMap}
): Promise<Uint8Array | null> {
const regenTimer = metrics?.regenTime.startTimer();
const epoch = computeEpochAtSlot(slot);
const strategy = hierarchicalLayers.getSlotType(slot, archiveMode);
logger.verbose("Fetching state archive", {strategy, slot, epoch});
const slotType = hierarchicalLayers.getStorageType(slot, stateArchiveMode);
logger.verbose("Fetching state archive", {slotType, slot, epoch});

switch (strategy) {
case HistoricalStateSlotType.Full: {
switch (slotType) {
case HistoricalStateStorageType.Full: {
const loadStateTimer = metrics?.loadSnapshotStateTime.startTimer();
const state = await db.stateArchive.getBinary(slot);
loadStateTimer?.();
regenTimer?.({strategy: HistoricalStateSlotType.Full});
regenTimer?.({strategy: HistoricalStateStorageType.Full});
return state;
}

case HistoricalStateSlotType.Snapshot: {
case HistoricalStateStorageType.Snapshot: {
const loadStateTimer = metrics?.loadSnapshotStateTime.startTimer();
const state = await db.stateArchive.getBinary(slot);
const stateArchive = await db.stateArchive.getBinary(slot);

const state = stateArchive
? stateArchiveToStateBytes(StateArchiveSSZType.deserialize(stateArchive), config)
: null;

loadStateTimer?.();
regenTimer?.({strategy: HistoricalStateSlotType.Snapshot});
regenTimer?.({strategy: HistoricalStateStorageType.Snapshot});
return state;
}
case HistoricalStateSlotType.Diff: {
const {diffStateBytes: diffState} = await getDiffState(
case HistoricalStateStorageType.Diff: {
const {stateArchive} = await getDiffStateArchive(
{slot, skipSlotDiff: false},
{db, metrics, logger, hierarchicalLayers: hierarchicalLayers, codec}
);
regenTimer?.({strategy: HistoricalStateSlotType.Diff});
regenTimer?.({strategy: HistoricalStateStorageType.Diff});

return diffState;
return stateArchive ? stateArchiveToStateBytes(stateArchive, config) : null;
}
case HistoricalStateSlotType.BlockReplay: {
const {diffStateBytes, diffSlots} = await getDiffState(
case HistoricalStateStorageType.BlockReplay: {
const {stateArchive, diffSlots} = await getDiffStateArchive(
{slot, skipSlotDiff: false},
{db, metrics, logger, hierarchicalLayers: hierarchicalLayers, codec}
);

if (!diffStateBytes) {
regenTimer?.({strategy: HistoricalStateSlotType.BlockReplay});
if (!stateArchive) {
regenTimer?.({strategy: HistoricalStateStorageType.BlockReplay});
return null;
}

const state = replayBlocks(
{toSlot: slot, lastFullSlot: diffSlots[diffSlots.length - 1], lastFullStateBytes: diffStateBytes},
{
toSlot: slot,
lastFullSlot: diffSlots[diffSlots.length - 1],
lastFullStateBytes: stateArchiveToStateBytes(stateArchive, config),
},
{config, db, metrics, pubkey2index}
);

regenTimer?.({strategy: HistoricalStateSlotType.BlockReplay});
regenTimer?.({strategy: HistoricalStateStorageType.BlockReplay});

return state;
}
}
}

export async function putHistoricalState(
{slot, archiveMode, stateBytes}: {slot: Slot; archiveMode: StateArchiveMode; stateBytes: Uint8Array},
{
db,
logger,
metrics,
hierarchicalLayers,
}: {
db: IBeaconDb;
logger: Logger;
metrics?: HistoricalStateRegenMetrics;
hierarchicalLayers: HierarchicalLayers;
}
slot: Slot,
stateBytes: Uint8Array,
{db, logger, metrics, hierarchicalLayers, stateArchiveMode, config}: HStateOperationOptions
): Promise<void> {
const epoch = computeEpochAtSlot(slot);
const strategy = hierarchicalLayers.getSlotType(slot, archiveMode);
logger.info("Archiving historical state", {epoch, slot, strategy});
const storageType = hierarchicalLayers.getStorageType(slot, stateArchiveMode);
logger.info("Archiving historical state", {epoch, slot, slotType: storageType});

switch (strategy) {
case HistoricalStateSlotType.Full: {
switch (storageType) {
case HistoricalStateStorageType.Full: {
metrics?.stateSnapshotSize.set(stateBytes.byteLength);
await db.stateArchive.putBinary(slot, stateBytes);
logger.verbose("State stored as full", {
Expand All @@ -115,38 +125,40 @@ export async function putHistoricalState(
break;
}

case HistoricalStateSlotType.Snapshot: {
case HistoricalStateStorageType.Snapshot: {
const stateArchiveBytes = StateArchiveSSZType.serialize(stateBytesToStateArchive(stateBytes, config));
await db.stateArchive.putBinary(slot, stateArchiveBytes);

metrics?.stateSnapshotSize.set(stateBytes.byteLength);
await db.stateArchive.putBinary(slot, stateBytes);
logger.verbose("State stored as snapshot", {
epoch,
slot,
snapshotSize: formatBytes(stateBytes.byteLength),
snapshotSize: formatBytes(stateArchiveBytes.byteLength),
});
break;
}
case HistoricalStateSlotType.Diff: {
const {diffStateBytes: diffState} = await getDiffState(
case HistoricalStateStorageType.Diff: {
const {stateArchive: diffStateArchive} = await getDiffStateArchive(
{slot, skipSlotDiff: true},
{db, metrics, logger, hierarchicalLayers: hierarchicalLayers, codec}
{db, metrics, logger, hierarchicalLayers, codec}
);

if (!diffState) return;
if (!diffStateArchive) return;

const diff = codec.compute(diffState, stateBytes);
await db.stateArchive.putBinary(slot, diff);

metrics?.stateDiffSize.set(diff.byteLength);
const diffArchiveBytes = StateArchiveSSZType.serialize(
computeDiffArchive(diffStateArchive, stateBytesToStateArchive(stateBytes, config), codec)
);
await db.stateArchive.putBinary(slot, diffArchiveBytes);

metrics?.stateDiffSize.set(diffArchiveBytes.byteLength);
logger.verbose("State stored as diff", {
epoch,
slot,
baseSize: formatBytes(diffState.byteLength),
diffSize: formatBytes(diff.byteLength),
diffSize: formatBytes(diffArchiveBytes.byteLength),
});
break;
}
case HistoricalStateSlotType.BlockReplay: {
case HistoricalStateStorageType.BlockReplay: {
logger.verbose("Skipping storage of historical state for block replay", {
epoch,
slot,
Expand All @@ -163,53 +175,106 @@ export async function getLastStoredState({
metrics,
logger,
archiveMode,
forkConfig,
}: {
db: IBeaconDb;
hierarchicalLayers: HierarchicalLayers;
archiveMode: StateArchiveMode;
forkConfig: ChainForkConfig;
metrics?: HistoricalStateRegenMetrics;
logger?: Logger;
}): Promise<{stateBytes: Uint8Array | null; slot: Slot | null}> {
const lastStoredDiffSlot = await db.stateArchive.lastKey();
const lastStoredSnapshotSlot = await db.stateArchive.lastKey();
if (archiveMode === StateArchiveMode.Frequency) {
const lastStoredSlot = await db.stateArchive.lastKey();
return {stateBytes: lastStoredSlot ? await db.stateArchive.getBinary(lastStoredSlot) : null, slot: lastStoredSlot};
}

logger?.info("Last archived state slots", {snapshot: lastStoredSnapshotSlot, diff: lastStoredDiffSlot});
const lastStoredDiffArchive = await getLastStoredStateArchive({db, snapshot: false});
const lastStoredSnapshotArchive = await getLastStoredStateArchive({db, snapshot: true});

if (lastStoredDiffSlot === null && lastStoredSnapshotSlot === null) {
if (!lastStoredDiffArchive && !lastStoredSnapshotArchive) {
logger?.verbose("State archive db is empty");
return {stateBytes: null, slot: null};
}

const lastStoredSlot = Math.max(lastStoredDiffSlot ?? 0, lastStoredSnapshotSlot ?? 0);
const strategy = hierarchicalLayers.getSlotType(lastStoredSlot, archiveMode);
logger?.verbose("Loading the last archived state", {strategy, slot: lastStoredSlot});
if (!lastStoredSnapshotArchive) {
logger?.verbose("State archive db does not contain any snapshot state");
// TODO: Need to clean the stateArchive db
return {stateBytes: null, slot: null};
}

logger?.info("Last archived state slots", {
snapshot: lastStoredSnapshotArchive?.slot,
diff: lastStoredDiffArchive?.slot,
});

switch (strategy) {
case HistoricalStateSlotType.Full:
return {stateBytes: await db.stateArchive.getBinary(lastStoredSlot), slot: lastStoredSlot};
case HistoricalStateSlotType.Snapshot:
const lastStoredSlot = Math.max(lastStoredDiffArchive?.slot ?? 0, lastStoredSnapshotArchive.slot ?? 0);
const storageType = hierarchicalLayers.getStorageType(lastStoredSlot, archiveMode);
logger?.verbose("Loading the last archived state", {storageType, slot: lastStoredSlot});

switch (storageType) {
case HistoricalStateStorageType.Full: {
return {stateBytes: await db.stateArchive.getBinary(lastStoredSlot), slot: lastStoredSlot};
case HistoricalStateSlotType.Diff: {
if (lastStoredSlot === lastStoredSnapshotSlot) {
}
case HistoricalStateStorageType.Snapshot: {
const stateArchive = await db.stateArchive.getBinary(lastStoredSlot);

return {
stateBytes: stateArchive
? stateArchiveToStateBytes(StateArchiveSSZType.deserialize(stateArchive), forkConfig)
: null,
slot: lastStoredSlot,
};
}
case HistoricalStateStorageType.Diff: {
if (lastStoredSlot === lastStoredSnapshotArchive.slot) {
logger?.warn("Last archived snapshot is not at expected epoch boundary, possibly because of checkpoint sync.");
return {stateBytes: await db.stateArchive.getBinary(lastStoredSlot), slot: lastStoredSlot};
}

const {diffStateBytes} = await getDiffState(
const diffStateArchive = await getDiffStateArchive(
{slot: lastStoredSlot, skipSlotDiff: false},
{db, metrics, logger, hierarchicalLayers: hierarchicalLayers, codec}
);

if (!diffStateArchive.stateArchive) throw new Error("Can not compute the last stored state");

return {
stateBytes: diffStateBytes,
stateBytes: stateArchiveToStateBytes(diffStateArchive.stateArchive, forkConfig),
slot: lastStoredSlot,
};
}
case HistoricalStateSlotType.BlockReplay:
if (lastStoredSlot === lastStoredSnapshotSlot) {
case HistoricalStateStorageType.BlockReplay:
if (lastStoredSlot === lastStoredSnapshotArchive.slot) {
logger?.warn("Last archived snapshot is not at expected epoch boundary, possibly because of checkpoint sync.");
return {stateBytes: await db.stateArchive.getBinary(lastStoredSlot), slot: lastStoredSlot};
return {stateBytes: stateArchiveToStateBytes(lastStoredSnapshotArchive, forkConfig), slot: lastStoredSlot};
}

throw new Error(`Unexpected stored slot for a non epoch slot=${lastStoredSlot}`);
}
}

export async function migrateStateArchive({
db,
archiveMode,
logger,
}: {db: IBeaconDb; archiveMode: StateArchiveMode; logger?: Logger}): Promise<void> {
if (archiveMode === StateArchiveMode.Differential) {
const lastStoredSlot = await db.stateArchive.lastKey();
if (!lastStoredSlot) return;

const archiveBytes = await db.stateArchive.getBinary(lastStoredSlot);
if (!archiveBytes) return;

try {
StateArchiveSSZType.deserialize(archiveBytes);
} catch {
logger?.info("Found that stateArchiveMode was switch recently. Cleaning up state archives to store new format.");
for await (const slot of db.stateArchive.keysStream()) {
await db.stateArchive.delete(slot);
}
}
}

return;
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,6 @@ export class HistoricalStateRegen implements IHistoricalStateRegen {
}

async storeHistoricalState(slot: number, stateBytes: Uint8Array): Promise<void> {
return this.api.storeHistoricalState(slot, this.stateArchiveMode, stateBytes);
return this.api.storeHistoricalState(slot, stateBytes, this.stateArchiveMode);
}
}
4 changes: 2 additions & 2 deletions packages/beacon-node/src/chain/historicalState/metrics.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {EpochTransitionStep, StateCloneSource, StateHashTreeRootSource} from "@lodestar/state-transition";
import {RegistryMetricCreator} from "../../metrics/index.js";
import {HistoricalStateRegenMetrics, RegenErrorType, HistoricalStateSlotType} from "./types.js";
import {HistoricalStateRegenMetrics, RegenErrorType, HistoricalStateStorageType} from "./types.js";

export function getMetrics(metricsRegister: RegistryMetricCreator): HistoricalStateRegenMetrics {
return {
Expand Down Expand Up @@ -88,7 +88,7 @@ export function getMetrics(metricsRegister: RegistryMetricCreator): HistoricalSt
registerValidatorStatuses: () => {},

// historical state regen metrics
regenTime: metricsRegister.histogram<{strategy: HistoricalStateSlotType}>({
regenTime: metricsRegister.histogram<{strategy: HistoricalStateStorageType}>({
name: "lodestar_historical_state_regen_time_seconds",
help: "Time to regenerate a historical state in seconds",
// Historical state regen can take up to 3h as of Aug 2024
Expand Down
Loading

0 comments on commit 3c5ecef

Please sign in to comment.