Skip to content

Commit

Permalink
Use in-memory slot registry
Browse files Browse the repository at this point in the history
  • Loading branch information
gagliardetto committed Sep 5, 2023
1 parent 4db65cd commit fd9d854
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 571 deletions.
74 changes: 63 additions & 11 deletions cmd/radiance/car/createcar/multistage.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"crypto/rand"
"encoding/binary"
"fmt"
"path/filepath"
"runtime"
"sync"
"sync/atomic"
Expand All @@ -23,7 +22,6 @@ import (
"github.com/rpcpool/yellowstone-faithful/ipld/ipldbindcode"
"github.com/rpcpool/yellowstone-faithful/iplddecoders"
concurrently "github.com/tejzpr/ordered-concurrently/v3"
"go.firedancer.io/radiance/cmd/radiance/car/createcar/registry"
"go.firedancer.io/radiance/pkg/blockstore"
radianceblockstore "go.firedancer.io/radiance/pkg/blockstore"
firecar "go.firedancer.io/radiance/pkg/ipld/car"
Expand Down Expand Up @@ -157,7 +155,7 @@ type Multistage struct {

storageCar *carHandle

reg *registry.Registry
reg *InMemorySlotRegistry

workerInputChan chan concurrently.WorkFunction
waitExecuted *sync.WaitGroup
Expand All @@ -167,6 +165,64 @@ type Multistage struct {
numWrittenObjects *atomic.Uint64
}

type InMemorySlotRegistry struct {
mu sync.RWMutex
registry map[uint64]SlotEntry
}

type SlotEntry struct {
Slot uint64
CID []byte
}

// func(slot uint64, cid []byte) error
func (r *InMemorySlotRegistry) SetCID(slot uint64, cid []byte) error {
r.mu.Lock()
defer r.mu.Unlock()
r.registry[slot] = SlotEntry{
Slot: slot,
CID: clone(cid),
}
return nil
}

func clone(b []byte) []byte {
if b == nil {
return nil
}
c := make([]byte, len(b))
copy(c, b)
return c
}

// GetAll
func (r *InMemorySlotRegistry) GetAll() ([]SlotEntry, error) {
r.mu.RLock()
defer r.mu.RUnlock()
out := make([]SlotEntry, 0, len(r.registry))
for i := range r.registry {
out = append(out, r.registry[i])
}
return out, nil
}

func (r *InMemorySlotRegistry) GetCID(slot uint64) (*cid.Cid, error) {
r.mu.RLock()
defer r.mu.RUnlock()
entry, ok := r.registry[slot]
if !ok {
return nil, fmt.Errorf("slot %d not found", slot)
}
l, c, err := cid.CidFromBytes(entry.CID)
if err != nil {
return nil, fmt.Errorf("failed to parse CID: %w", err)
}
if l != 36 {
return nil, fmt.Errorf("CID length mismatch: expected %d, got %d", 36, l)
}
return &c, nil
}

// - [x] in stage 1, we create a CAR file for all the slots
// - there's a registry of all slots that have been written, and their CIDs (very important)
// - [x] in stage 2, we add the missing parts of the DAG (same CAR file).
Expand Down Expand Up @@ -245,11 +301,8 @@ func NewMultistage(
}

{
registryFilepath := filepath.Join(finalCARFilepath + ".registry.bin")
cidLen := 36
reg, err := registry.New(registryFilepath, cidLen)
if err != nil {
return nil, fmt.Errorf("failed to create registry: %w", err)
reg := &InMemorySlotRegistry{
registry: make(map[uint64]SlotEntry),
}
cw.reg = reg
}
Expand Down Expand Up @@ -653,8 +706,8 @@ func (cw *Multistage) FinalizeDAG(
}
allSlots := make([]uint64, 0, len(allRegistered))
for _, slot := range allRegistered {
if slot.CID == nil || len(slot.CID) == 0 || !slot.Status.Is(registry.SlotStatusIncluded) {
continue
if slot.CID == nil || len(slot.CID) == 0 {
panic(fmt.Errorf("slot %d has no CID", slot.Slot))
}
allSlots = append(allSlots, slot.Slot)
}
Expand Down Expand Up @@ -700,7 +753,6 @@ func (cw *Multistage) FinalizeDAG(
klog.Infof("Replaced root in CAR with CID of epoch %d", epoch)
}

cw.reg.Destroy()
return epochRootLink, slotRecap, err
}

Expand Down
Loading

0 comments on commit fd9d854

Please sign in to comment.