Skip to content

Commit

Permalink
Use pointers; add comments
Browse files Browse the repository at this point in the history
  • Loading branch information
gagliardetto committed Aug 5, 2023
1 parent 06fbd2b commit c1e1822
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 61 deletions.
7 changes: 4 additions & 3 deletions cmd/radiance/car/createcar/cmd-create-car.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,10 @@ func run(c *cobra.Command, args []string) {

// Open blockstores
dbPaths := *flagDBs
handles := make([]blockstore.WalkHandle, len(*flagDBs))
handles := make([]*blockstore.WalkHandle, len(*flagDBs))
for i := range handles {
var err error
handles[i] = &blockstore.WalkHandle{}
handles[i].DB, err = blockstore.OpenReadOnly(dbPaths[i])
if err != nil {
klog.Exitf("Failed to open blockstore at %s: %s", dbPaths[i], err)
Expand All @@ -153,7 +154,7 @@ func run(c *cobra.Command, args []string) {
// Print the slots range in each DB:
err = schedule.Each(
c.Context(),
func(dbIndex int, db blockstore.WalkHandle, slots []uint64) error {
func(dbIndex int, db *blockstore.WalkHandle, slots []uint64) error {
if len(slots) == 0 {
return nil
}
Expand Down Expand Up @@ -248,7 +249,7 @@ func run(c *cobra.Command, args []string) {

err = iter.Iterate(
c.Context(),
func(dbIdex int, h blockstore.WalkHandle, slot uint64, shredRevision int) error {
func(dbIdex int, h *blockstore.WalkHandle, slot uint64, shredRevision int) error {
if *flagRequireFullEpoch && slotedges.CalcEpochForSlot(slot) != epoch {
return nil
}
Expand Down
27 changes: 10 additions & 17 deletions cmd/radiance/car/createcar/dataframe.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,15 @@ func CreateAndStoreFrames(
first := frames[0]
// otherwise, link them together, backwards
rest := frames[1:]
revertSlice(rest)
reverse(rest)

split := splitSlice(rest, NumNextLinks)

previousLinks := make([]datamodel.Link, 0)
for _, chunk := range split {
links := make([]datamodel.Link, len(chunk))
for j, frame := range chunk {
reverseLinkSlice(previousLinks)
reverse(previousLinks)
frame.Next = douplePointerLinkSlice(previousLinks)
dataFrameNode, err := frameToDatamodelNode(frame)
if err != nil {
Expand All @@ -171,12 +171,19 @@ func CreateAndStoreFrames(
previousLinks = links
}

reverseLinkSlice(previousLinks)
reverse(previousLinks)
first.Next = douplePointerLinkSlice(previousLinks)

return first, nil
}

func reverse[T any](x []T) {
for i := len(x)/2 - 1; i >= 0; i-- {
opp := len(x) - 1 - i
x[i], x[opp] = x[opp], x[i]
}
}

func douplePointerLinkSlice(l ipldbindcode.List__Link) **ipldbindcode.List__Link {
return pointerToPointerLinkSlice(&l)
}
Expand Down Expand Up @@ -227,20 +234,6 @@ const (
NumNextLinks = 5 // how many links to store in each dataFrame
)

func revertSlice(slice []*ipldbindcode.DataFrame) {
for i := len(slice)/2 - 1; i >= 0; i-- {
opp := len(slice) - 1 - i
slice[i], slice[opp] = slice[opp], slice[i]
}
}

func reverseLinkSlice(slice []datamodel.Link) {
for i := len(slice)/2 - 1; i >= 0; i-- {
opp := len(slice) - 1 - i
slice[i], slice[opp] = slice[opp], slice[i]
}
}

func splitSlice[T comparable](slice []T, size int) [][]T {
var chunks [][]T
for size < len(slice) {
Expand Down
6 changes: 3 additions & 3 deletions cmd/radiance/car/createcar/multistage.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@ import (
type blockWorker struct {
slotMeta *radianceblockstore.SlotMeta
CIDSetter func(slot uint64, cid []byte) error
handle blockstore.WalkHandle
handle *blockstore.WalkHandle
done func(numTx uint64)
}

func newBlockWorker(
slotMeta *radianceblockstore.SlotMeta,
CIDSetter func(slot uint64, cid []byte) error,
h blockstore.WalkHandle,
h *blockstore.WalkHandle,
done func(uint64),
) *blockWorker {
return &blockWorker{
Expand Down Expand Up @@ -301,7 +301,7 @@ func (cw *Multistage) getConcurrency() int {
// OnSlotFromDB is called when a block is received.
// This MUST be called in order of the slot number.
func (cw *Multistage) OnSlotFromDB(
h blockstore.WalkHandle,
h *blockstore.WalkHandle,
slotMeta *radianceblockstore.SlotMeta,
) error {
cw.waitExecuted.Add(1)
Expand Down
77 changes: 41 additions & 36 deletions pkg/blockstore/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ func (s TraversalSchedule) NewIterator(limit uint64) *ScheduleIterator {
// TraversalSchedule.Iterate iterates over the schedule.
func (i *ScheduleIterator) Iterate(
ctx context.Context,
f func(dbIdex int, h WalkHandle, slot uint64, shredRevision int) error,
f func(dbIdex int, h *WalkHandle, slot uint64, shredRevision int) error,
) error {
numDone := uint64(0)
return i.schedule.EachSlot(
ctx,
func(dbIdex int, h WalkHandle, slot uint64, shredRevision int) error {
func(dbIdex int, h *WalkHandle, slot uint64, shredRevision int) error {
if err := f(dbIdex, h, slot, shredRevision); err != nil {
return err
}
Expand Down Expand Up @@ -76,7 +76,7 @@ func (s TraversalSchedule) LastSlot() (uint64, bool) {
// Each iterates over each DB in the schedule.
func (s TraversalSchedule) Each(
ctx context.Context,
f func(dbIndex int, db WalkHandle, slots []uint64) error,
f func(dbIndex int, db *WalkHandle, slots []uint64) error,
) error {
for dbIndex, db := range s.schedule {
if ctx.Err() != nil {
Expand All @@ -95,7 +95,7 @@ func (s TraversalSchedule) Each(
// EachSlot iterates over each slot in the schedule.
func (s TraversalSchedule) EachSlot(
ctx context.Context,
f func(dbIdex int, h WalkHandle, slot uint64, shredRevision int) error,
f func(dbIdex int, h *WalkHandle, slot uint64, shredRevision int) error,
) error {
if err := s.initStatsTracker(ctx); err != nil {
return err
Expand All @@ -107,7 +107,7 @@ func (s TraversalSchedule) EachSlot(
activationSlot := cloneUint64Ptr(nextRevisionActivationSlot)
return s.Each(
ctx,
func(dbIndex int, db WalkHandle, slots []uint64) error {
func(dbIndex int, db *WalkHandle, slots []uint64) error {
for _, slot := range slots {
if ctx.Err() != nil {
return ctx.Err()
Expand Down Expand Up @@ -205,7 +205,7 @@ func (s TraversalSchedule) SatisfiesEpochEdges(epoch uint64) error {
}

type DBtoSlots struct {
handle WalkHandle
handle *WalkHandle
slots []uint64
}

Expand All @@ -230,7 +230,7 @@ func (d DBtoSlots) LastSlot() (uint64, bool) {
}

// DBtoSlots.DBHandle returns the DBHandle.
func (d DBtoSlots) DBHandle() WalkHandle {
func (d DBtoSlots) DBHandle() *WalkHandle {
return d.handle
}

Expand Down Expand Up @@ -291,7 +291,7 @@ func uint64OrNil(x *uint64) any {
func NewSchedule(
epoch uint64,
requireFullEpoch bool,
handles []WalkHandle,
handles []*WalkHandle,
shredRevision int,
nextRevisionActivationSlot uint64,
) (*TraversalSchedule, error) {
Expand Down Expand Up @@ -341,8 +341,8 @@ func (s *TraversalSchedule) Close() error {
return nil
}

func (s *TraversalSchedule) getHandles() []WalkHandle {
var out []WalkHandle
func (s *TraversalSchedule) getHandles() []*WalkHandle {
var out []*WalkHandle
for _, db := range s.schedule {
out = append(out, db.handle)
}
Expand Down Expand Up @@ -380,7 +380,7 @@ func (m *TraversalSchedule) GetSlotMetaFromAnyDB(slot uint64) (*SlotMeta, error)
return nil, fmt.Errorf("meta for slot %d not found in any DB", slot)
}

func slotEdges(handles []WalkHandle) (low, high uint64) {
func slotEdges(handles []*WalkHandle) (low, high uint64) {
if len(handles) == 0 {
return 0, 0
}
Expand All @@ -402,19 +402,16 @@ func (schedule *TraversalSchedule) init(
epoch uint64,
start,
stop uint64,
handles []WalkHandle,
handles []*WalkHandle,
shredRevision int,
nextRevisionActivationSlot *uint64,
) error {
schedule.shredRevision = shredRevision
schedule.nextShredRevisionActivationSlot = cloneUint64Ptr(nextRevisionActivationSlot)

reversedHandles := make([]WalkHandle, len(handles))
reversedHandles := make([]*WalkHandle, len(handles))
copy(reversedHandles, handles)
for i := len(reversedHandles)/2 - 1; i >= 0; i-- {
opp := len(reversedHandles) - 1 - i
reversedHandles[i], reversedHandles[opp] = reversedHandles[opp], reversedHandles[i]
}
reverse(reversedHandles)

overrides := make(map[uint64]uint64)
defer func() {
Expand All @@ -428,8 +425,13 @@ func (schedule *TraversalSchedule) init(

activationSlot := cloneUint64Ptr(nextRevisionActivationSlot)
var prevProcessedSlot *uint64
wanted := stop // NOTE: this is a guess; if not found, we will get the next bigger slot
for hi, h := range reversedHandles {
// Let's start with the last DB, last slot.
// We will work our way backwards from highest to lowest slot,
// from most recent to oldest DB.
wanted := stop // NOTE: this `stop` is a guess; if not found, we will get the next bigger slot
for hi := range reversedHandles {
handle := reversedHandles[hi]

var lastOfPReviousDB *uint64
if hi > 0 {
prevSlots := schedule.schedule[hi-1].slots
Expand All @@ -442,23 +444,22 @@ func (schedule *TraversalSchedule) init(
wanted,
uint64OrNil(prevProcessedSlot),
uint64OrNil(lastOfPReviousDB),
h.DB.DB.Name(),
handle.DB.DB.Name(),
)
var slots []uint64
opts := getReadOptions()
defer putReadOptions(opts)
// Open Next database
iter := h.DB.DB.NewIteratorCF(opts, h.DB.CfRoot)
iter := handle.DB.DB.NewIteratorCF(opts, handle.DB.CfRoot)
defer iter.Close()

// now get the meta for the parent slot
slotLoop:
for {
// FIND THE FIRST SLOT IN THE FIRST DB, ROOT
key := MakeSlotKey(wanted)
iter.Seek(key[:])
if !iter.Valid() {
klog.Infof(("seeked to slot %d but got invalid"), wanted)
break
klog.Errorf(("seeked to slot %d but got invalid (not found); this probably means that you need to add the next DB too."), wanted)
break slotLoop
}
gotRoot, ok := ParseSlotKey(iter.Key().Data())
if !ok {
Expand All @@ -478,13 +479,13 @@ func (schedule *TraversalSchedule) init(
if start != 0 && gotRoot < start-1 {
// inlude one more slot
klog.Infof(("reached slot %d which is lower than the low bound %d (OK)"), gotRoot, start)
break
break slotLoop
}
// Check what we got (might be a different gotRoot than what we wanted):
if gotRoot != wanted {
klog.Errorf(("seeked to slot %d but got slot %d; trying override"), wanted, gotRoot)
// The wanted root was not found in the CfRoot; if it exists in the CfMeta, we can still use it.
_, err := h.DB.GetSlotMeta(wanted)
_, err := handle.DB.GetSlotMeta(wanted)
if err == nil {
klog.Infof(("override worked: %d -> %d (recovered missing root)"), gotRoot, wanted)
if existingOverride, ok := overrides[gotRoot]; ok {
Expand Down Expand Up @@ -520,7 +521,7 @@ func (schedule *TraversalSchedule) init(
}

// now get the meta for this slot
meta, err := h.DB.GetSlotMeta(gotRoot)
meta, err := handle.DB.GetSlotMeta(gotRoot)
if err != nil {
return fmt.Errorf("Failed to get meta for slot %d: %s", gotRoot, err)
}
Expand All @@ -536,7 +537,7 @@ func (schedule *TraversalSchedule) init(
}

if false {
_, err = h.DB.GetEntries(meta, shredRevision)
_, err = handle.DB.GetEntries(meta, shredRevision)
if err == nil {
// Success!
} else {
Expand All @@ -552,7 +553,7 @@ func (schedule *TraversalSchedule) init(
}
slots = append(slots, gotRoot)
if gotRoot == 0 {
break
break slotLoop
}
// if gotRoot == meta.ParentSlot {
// // TODO: is this correct?
Expand All @@ -566,22 +567,26 @@ func (schedule *TraversalSchedule) init(
Uint64Ascending(slots)

if !uint64SlicesAreEqual(slots, unique(slots)) {
panic("slots are not unique")
panic(fmt.Errorf("slots from DB %s contains duplicates", handle.DB.DB.Name()))
}

schedule.schedule = append(schedule.schedule, DBtoSlots{
handle: h,
handle: handle,
slots: unique(slots),
})
}
// reverse the schedule so that it is in ascending order
for i := len(schedule.schedule)/2 - 1; i >= 0; i-- {
opp := len(schedule.schedule) - 1 - i
schedule.schedule[i], schedule.schedule[opp] = schedule.schedule[opp], schedule.schedule[i]
}
reverse(schedule.schedule)
return nil
}

func reverse[T any](x []T) {
for i := len(x)/2 - 1; i >= 0; i-- {
opp := len(x) - 1 - i
x[i], x[opp] = x[opp], x[i]
}
}

func greenBG(s string) string {
return blackFG("\033[48;5;2m" + s + "\033[0m")
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/blockstore/walk-handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type WalkHandle struct {
}

// sortWalkHandles detects bounds of each DB and sorts handles.
func sortWalkHandles(h []WalkHandle, shredRevision int, nextRevisionActivationSlot *uint64) error {
func sortWalkHandles(h []*WalkHandle, shredRevision int, nextRevisionActivationSlot *uint64) error {
for i, db := range h {
// Find lowest and highest available slot in DB.
start, err := getLowestCompletedSlot(db.DB, shredRevision, nextRevisionActivationSlot)
Expand All @@ -27,7 +27,7 @@ func sortWalkHandles(h []WalkHandle, shredRevision int, nextRevisionActivationSl
if err != nil {
return err
}
h[i] = WalkHandle{
h[i] = &WalkHandle{
Start: start,
Stop: stop,
DB: db.DB,
Expand Down

0 comments on commit c1e1822

Please sign in to comment.