Skip to content

Commit

Permalink
Improve slot schedule building
Browse files Browse the repository at this point in the history
  • Loading branch information
gagliardetto committed Sep 14, 2023
1 parent 40d394f commit 29a4671
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 19 deletions.
10 changes: 8 additions & 2 deletions cmd/radiance/car/createcar/cmd-create-car.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,15 @@ func run(c *cobra.Command, args []string) {
klog.Infof("Started processing DB #%d from slot %d", dbIdex, slotMeta.Slot)
}
if dbIdex != latestDB {
klog.Infof("Switched to DB #%d; started processing new DB from slot %d (prev: %d)", dbIdex, slotMeta.Slot, latestSlot)
msg := fmt.Sprintf("Switched to DB #%d; started processing new DB from slot %d", dbIdex, slotMeta.Slot)
if latestSlot > 0 {
msg += fmt.Sprintf(" (prev: %d)", latestSlot)
} else {
msg += fmt.Sprintf(" (no prev)")
}
klog.Info(msg)
// TODO: warn if we skipped slots
if slotMeta.Slot > latestSlot+1 {
if latestSlot > 0 && slotMeta.Slot > latestSlot+1 {
klog.Warningf(
"Detected skipped slots %d to %d after DB switch (last slot of previous DB: %d); started processing new DB from slot %d",
latestSlot+1,
Expand Down
86 changes: 69 additions & 17 deletions pkg/blockstore/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ func (schedule *TraversalSchedule) init(
}
}
}()
officialEpochStart, _ := slotedges.CalcEpochLimits(epoch)

activationSlot := cloneUint64Ptr(nextRevisionActivationSlot)
var prevProcessedSlot *uint64
Expand All @@ -432,6 +433,21 @@ func (schedule *TraversalSchedule) init(
for hi := range reversedHandles {
handle := reversedHandles[hi]

logErrorf := func(format string, args ...interface{}) {
// format error and add context (db name)
klog.Error(
fmt.Errorf(format, args...),
fmt.Sprintf(" current_db=%s", handle.DB.DB.Name()),
)
}
logInfof := func(format string, args ...interface{}) {
// format error and add context (db name)
klog.Info(
fmt.Sprintf(format, args...),
fmt.Sprintf(" current_db=%s", handle.DB.DB.Name()),
)
}

var lastOfPReviousDB *uint64
if hi > 0 {
prevSlots := schedule.schedule[hi-1].slots
Expand All @@ -440,12 +456,13 @@ func (schedule *TraversalSchedule) init(
}
}
klog.Infof(
("wanted=%d, prevProcessed=%v; lastOfPreviousDB=%v; starting with db %s"),
"Starting with db %s, wanted=%d, prevProcessed=%v; lastOfPreviousDB=%v;",
handle.DB.DB.Name(),
wanted,
uint64OrNil(prevProcessedSlot),
uint64OrNil(lastOfPReviousDB),
handle.DB.DB.Name(),
)

var slots []uint64
opts := getReadOptions()
defer putReadOptions(opts)
Expand All @@ -458,12 +475,15 @@ func (schedule *TraversalSchedule) init(
key := MakeSlotKey(wanted)
iter.Seek(key[:])
if !iter.Valid() {
klog.Errorf(("seeked to slot %d but got invalid (not found); this probably means that you need to add the next DB too."), wanted)
logErrorf(
"seeked to slot %d but got invalid (not found): moving down to next DB",
wanted,
)
break slotLoop
}
gotRoot, ok := ParseSlotKey(iter.Key().Data())
if !ok {
return fmt.Errorf("Invalid slot key: %x", iter.Key().Data())
return fmt.Errorf("found invalid slot key in DB %s: %x", handle.DB.DB.Name(), iter.Key().Data())
}
if prevProcessedSlot != nil && gotRoot == *prevProcessedSlot {
// slots = append(slots, wanted)
Expand All @@ -472,48 +492,68 @@ func (schedule *TraversalSchedule) init(
iter.Prev()
gotRoot, ok = ParseSlotKey(iter.Key().Data())
if !ok {
return fmt.Errorf("Invalid slot key: %x", iter.Key().Data())
return fmt.Errorf("found invalid slot key in DB %s: %x", handle.DB.DB.Name(), iter.Key().Data())
}
}
}

if start != 0 && gotRoot < start-1 {
// inlude one more slot
klog.Infof(("reached slot %d which is lower than the low bound %d (OK)"), gotRoot, start)
logInfof(
"reached slot %d which is lower than the low bound %d (OK)",
gotRoot,
start,
)
break slotLoop
}
// Check what we got (might be a different gotRoot than what we wanted):
recovered := false
if gotRoot != wanted {
klog.Errorf(("seeked to slot %d but got slot %d; trying recovery..."), wanted, gotRoot)
logErrorf(
"seeked to slot %d but got slot %d; trying to recover missing root...",
wanted,
gotRoot,
)
// The wanted root was not found in the CfRoot; if it exists in the CfMeta, we can still use it.
_, err := handle.DB.GetSlotMeta(wanted)
if err == nil {
klog.Infof(("recovery worked: recovered the wanted slot %d"), wanted)
logInfof("recovery successful: recovered the wanted slot %d", wanted)
recovered = true
if _, ok := recoveries[wanted]; ok {
klog.Infof(
logInfof(
"Root slot already recovered: %d",
wanted,
)
}
recoveries[wanted] = struct{}{}
gotRoot = wanted
} else {
// recovery failed.
// recovery failed:
if prevProcessedSlot != nil {
// We really wanted that slot because it was the parent of the previous slot.
// If we can't find it, we can't continue.
return fmt.Errorf("Failed to get meta for slot %d (parent of %d): %s", wanted, err, *prevProcessedSlot)
return fmt.Errorf(
"db %q: failed to get meta for slot %d (parent of %d): %s",
handle.DB.DB.Name(),
wanted,
*prevProcessedSlot,
err,
)
}
// This is fine because the first wanted slot was a guess, and it wasn't found (and the DB returned a different slot).
// We can just use the slot we got.
klog.Infof(("recovery failed: wanted %d (which doesn't exist), but can only get %d"), wanted, gotRoot)
logInfof(
"recovery failed: wanted %d (which doesn't exist), but can only get %d",
wanted,
gotRoot,
)
if wanted == stop && gotRoot < stop {
// We wanted the last slot, but we got a smaller slot.
// This means that we can't be sure the DB is complete.
// We can't continue.
return fmt.Errorf(
"Wanted to get slot %d but got %d instead, which is smaller than the stop slot %d (and we can't be sure the DB is complete)",
"db %q: wanted to get slot %d but got %d instead, which is smaller than the stop slot %d (and we can't be sure the DB is complete)",
handle.DB.DB.Name(),
wanted,
gotRoot,
stop,
Expand All @@ -525,10 +565,10 @@ func (schedule *TraversalSchedule) init(
// now get the meta for this slot
meta, err := handle.DB.GetSlotMeta(gotRoot)
if err != nil {
return fmt.Errorf("Failed to get meta for slot %d: %s", gotRoot, err)
return fmt.Errorf("db %q: failed to get meta for slot %d: %s", handle.DB.DB.Name(), gotRoot, err)
}
if meta == nil {
return fmt.Errorf("Meta for slot %d is nil", gotRoot)
return fmt.Errorf("db %q: meta for slot %d is nil", handle.DB.DB.Name(), gotRoot)
}

// If we have a shred revision, we need to check if the slot is in the range of the next revision activation.
Expand All @@ -550,7 +590,7 @@ func (schedule *TraversalSchedule) init(
if err == nil {
// Success!
} else {
klog.Warningf(
logInfof(
"recovered slot %d from DB %s is missing data (moving down to another DB): %s",
gotRoot,
handle.DB.DB.Name(),
Expand All @@ -566,7 +606,7 @@ func (schedule *TraversalSchedule) init(
// Has all the data except for what's the parent slot.
// We skip adding it here, and hope we will find it in the next DB.
// We also assume that if meta.ParentSlot == math.MaxUint64, then this is the end of the DB.
break
break slotLoop
}
slots = append(slots, gotRoot)
if gotRoot == 0 {
Expand All @@ -578,6 +618,12 @@ func (schedule *TraversalSchedule) init(
// }

prevProcessedSlot = &gotRoot
{
// if prevProcessedSlot is already in the previous Epoch, we can stop.
if prevProcessedSlot != nil && *prevProcessedSlot < officialEpochStart {
break slotLoop
}
}
wanted = meta.ParentSlot
}

Expand All @@ -591,6 +637,12 @@ func (schedule *TraversalSchedule) init(
handle: handle,
slots: unique(slots),
})
{
// if prevProcessedSlot is already in the previous Epoch, we can stop.
if prevProcessedSlot != nil && *prevProcessedSlot < officialEpochStart {
break
}
}
}
// reverse the schedule so that it is in ascending order
reverse(schedule.schedule)
Expand Down

0 comments on commit 29a4671

Please sign in to comment.