Skip to content

Commit

Permalink
Use sqlite to get data from a node (#75)
Browse files Browse the repository at this point in the history
* Use sqlite instead of grpc to get layers, rewards and accounts

* Remove poolsize param from mongodb flag

* Update go.mod

* Fix layers queue and sync

* Bump go version in Dockerfile

* Get ATXs at the start of the epoch

* Calculate circulation

* Use corresponding collections to get account summary instead of ledger

* Sort items by layer

* Add total rewards endpoint

* Mock sql methods

* Add target epoch field to ATX

* Calculate account last activity field

* Sort accounts by recent activity

* Fix unit tests

* Update go.mod

* Update CI workflow
  • Loading branch information
kacpersaw authored Nov 20, 2023
1 parent 34a9da1 commit 8aa6d3e
Show file tree
Hide file tree
Showing 42 changed files with 1,239 additions and 371 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name: CI

env:
go-version: '1.20'
go-version: '1.21.1'

# Trigger the workflow on all pull requests, and on push to specific branches
on:
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.apiserver
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.20-alpine3.17 AS build
FROM golang:1.21.1-alpine3.17 AS build
WORKDIR /src
COPY . .
RUN apk add --no-cache gcc musl-dev
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.collector
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.20-alpine3.17 AS build
FROM golang:1.21.1-alpine3.17 AS build
WORKDIR /src
COPY . .
RUN apk add --no-cache gcc musl-dev
Expand Down
28 changes: 22 additions & 6 deletions cmd/collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ import (
"context"
"fmt"
"github.com/spacemeshos/address"
"os"
"os/signal"
"syscall"
"time"

"github.com/spacemeshos/explorer-backend/collector"
"github.com/spacemeshos/explorer-backend/collector/sql"
"github.com/spacemeshos/explorer-backend/storage"
"github.com/spacemeshos/go-spacemesh/log"
"github.com/urfave/cli/v2"
"os"
"os/signal"
"syscall"
"time"
)

var (
Expand All @@ -29,6 +29,7 @@ var (
testnetBoolFlag bool
syncFromLayerFlag int
syncMissingLayersBoolFlag bool
sqlitePathStringFlag string
)

var flags = []cli.Flag{
Expand Down Expand Up @@ -87,6 +88,14 @@ var flags = []cli.Flag{
Value: true,
EnvVars: []string{"SPACEMESH_SYNC_MISSING_LAYERS"},
},
&cli.StringFlag{
Name: "sqlite",
Usage: "Path to node sqlite file",
Required: false,
Destination: &sqlitePathStringFlag,
Value: "explorer.sql",
EnvVars: []string{"SPACEMESH_SQLITE"},
},
}

func main() {
Expand All @@ -110,8 +119,15 @@ func main() {
return err
}

db, err := sql.Setup(sqlitePathStringFlag)
if err != nil {
log.Info("SQLite storage open error %v", err)
return err
}
dbClient := &sql.Client{}

c := collector.NewCollector(nodePublicAddressStringFlag, nodePrivateAddressStringFlag,
syncMissingLayersBoolFlag, syncFromLayerFlag, mongoStorage)
syncMissingLayersBoolFlag, syncFromLayerFlag, mongoStorage, db, dbClient)
mongoStorage.AccountUpdater = c

sigs := make(chan os.Signal, 1)
Expand Down
33 changes: 17 additions & 16 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package collector
import (
"context"
"errors"
"github.com/spacemeshos/explorer-backend/collector/sql"
"github.com/spacemeshos/go-spacemesh/common/types"
sql2 "github.com/spacemeshos/go-spacemesh/sql"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/keepalive"
"time"
Expand All @@ -16,23 +19,26 @@ import (
)

const (
streamType_node_SyncStatus int = 1
streamType_mesh_Layer int = 2
streamType_globalState int = 3
streamType_mesh_Malfeasance int = 4
streamType_node_SyncStatus int = 1
//streamType_mesh_Layer int = 2
streamType_globalState int = 2
streamType_mesh_Malfeasance int = 3

streamType_count int = 4
streamType_count int = 3
)

type Listener interface {
OnNetworkInfo(genesisId string, genesisTime uint64, epochNumLayers uint32, maxTransactionsPerSecond uint64, layerDuration uint64, postUnitSize uint64)
OnNodeStatus(connectedPeers uint64, isSynced bool, syncedLayer uint32, topLayer uint32, verifiedLayer uint32)
OnLayer(layer *pb.Layer)
OnAccount(account *pb.Account)
OnAccounts(accounts []*types.Account)
OnReward(reward *pb.Reward)
OnMalfeasanceProof(proof *pb.MalfeasanceProof)
OnTransactionReceipt(receipt *pb.TransactionReceipt)
GetLastLayer(parent context.Context) uint32
LayersInQueue() int
IsLayerInQueue(layer *pb.Layer) bool
GetEpochNumLayers() uint32
}

type Collector struct {
Expand All @@ -42,6 +48,8 @@ type Collector struct {
syncFromLayerFlag uint32

listener Listener
db *sql2.Database
dbClient sql.DatabaseClient

nodeClient pb.NodeServiceClient
meshClient pb.MeshServiceClient
Expand All @@ -59,15 +67,16 @@ type Collector struct {
notify chan int
}

func NewCollector(nodePublicAddress string, nodePrivateAddress string,
syncMissingLayersFlag bool, syncFromLayerFlag int, listener Listener) *Collector {
func NewCollector(nodePublicAddress string, nodePrivateAddress string, syncMissingLayersFlag bool, syncFromLayerFlag int, listener Listener, db *sql2.Database, dbClient sql.DatabaseClient) *Collector {
return &Collector{
apiPublicUrl: nodePublicAddress,
apiPrivateUrl: nodePrivateAddress,
syncMissingLayersFlag: syncMissingLayersFlag,
syncFromLayerFlag: uint32(syncFromLayerFlag),
listener: listener,
notify: make(chan int),
db: db,
dbClient: dbClient,
}
}

Expand Down Expand Up @@ -125,14 +134,6 @@ func (c *Collector) Run() error {
return nil
})

g.Go(func() error {
err := c.layersPump()
if err != nil {
return errors.Join(errors.New("cannot start sync layers pump"), err)
}
return nil
})

g.Go(func() error {
err := c.globalStatePump()
if err != nil {
Expand Down
7 changes: 6 additions & 1 deletion collector/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/spacemeshos/explorer-backend/collector"
"github.com/spacemeshos/go-spacemesh/sql"
"os"
"testing"
"time"
Expand Down Expand Up @@ -49,13 +50,16 @@ func TestMain(m *testing.M) {
os.Exit(1)
}

sqlDb, err := sql.Open("file:test.db?cache=shared&mode=memory", sql.WithConnections(16), sql.WithMigrations(nil))
seed := testseed.GetServerSeed()
generator = testseed.NewSeedGenerator(seed)
if err = generator.GenerateEpoches(10); err != nil {
fmt.Println("failed to generate epochs", err)
os.Exit(1)
}

dbClient := &testseed.Client{SeedGen: generator}

node, err = testserver.CreateFakeSMNode(generator.FirstLayerTime, generator, seed)
if err != nil {
fmt.Println("failed to generate fake node", err)
Expand Down Expand Up @@ -83,7 +87,8 @@ func TestMain(m *testing.M) {
}()

collectorApp = collector.NewCollector(fmt.Sprintf("localhost:%d", node.NodePort),
fmt.Sprintf("localhost:%d", privateNode.NodePort), false, 0, storageDB)
fmt.Sprintf("localhost:%d", privateNode.NodePort), false,
0, storageDB, sqlDb, dbClient)
storageDB.AccountUpdater = collectorApp
defer storageDB.Close()
go collectorApp.Run()
Expand Down
47 changes: 1 addition & 46 deletions collector/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,55 +60,10 @@ func (c *Collector) globalStatePump() error {
return err
}
item := response.GetDatum()
if account := item.GetAccountWrapper(); account != nil {
c.listener.OnAccount(account)
} else if reward := item.GetReward(); reward != nil {
if reward.Layer.Number > c.syncFromLayerFlag {
c.listener.OnReward(reward)
}
} else if receipt := item.GetReceipt(); receipt != nil {
if receipt := item.GetReceipt(); receipt != nil {
if receipt.Layer.Number > c.syncFromLayerFlag {
c.listener.OnTransactionReceipt(receipt)
}
}
}
}

/*
func (c *Collector) transactionsStatePump() error {
var req empty.Empty
log.Info("Start global state transactions state pump")
defer func() {
c.notify <- -streamType_global_TransactionState
log.Info("Stop global state transactions state pump")
}()
c.notify <- +streamType_global_TransactionState
stream, err := c.globalClient.TransactionStateStream(context.Background(), &req)
if err != nil {
log.Error("cannot get global state transactions state: %v", err)
return err
}
for {
txState, err := stream.Recv()
if err == io.EOF {
return err
}
if err != nil {
log.Error("cannot receive TransactionState: %v", err)
return err
}
log.Info("TransactionState: %v, %v", txState.GetId(), txState.GetState())
var id sm.TransactionID
copy(id[:], txState.GetId().GetId())
c.history.AddTransactionState(&id, txState.GetState());
}
return nil
}
*/
Loading

0 comments on commit 8aa6d3e

Please sign in to comment.