Skip to content

Commit

Permalink
Major update to be up to date with substreams-sink
Browse files Browse the repository at this point in the history
  • Loading branch information
maoueh committed Mar 21, 2024
1 parent 7e4ea33 commit bb3b9de
Show file tree
Hide file tree
Showing 32 changed files with 371 additions and 1,264 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
- name: Install Go
uses: actions/setup-go@v2
with:
go-version: 1.19.x
go-version: 1.22.x

- uses: actions/cache@v2
with:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
if: "${{ !startsWith(github.event.head_commit.message, 'GitBook: [#') }}"
strategy:
matrix:
go-version: [1.19.x]
go-version: [1.22.x]
os: [ubuntu-latest]
steps:
- name: Set up Go
Expand Down
101 changes: 0 additions & 101 deletions .goreleaser.yaml

This file was deleted.

2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

* Updated to

* `graphload run` now accepts `+<count>` like `+10_000` to run for 10 000 relative to module's start block.
7 changes: 6 additions & 1 deletion bundler/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,17 @@ package bundler
import (
"encoding/json"
"fmt"
"github.com/golang/protobuf/proto"

"google.golang.org/protobuf/proto"
)

type Encoder func(proto.Message) ([]byte, error)

func JSONLEncode(message proto.Message) ([]byte, error) {
return JSONLEncodeAny(message)
}

func JSONLEncodeAny(message any) ([]byte, error) {
buf := []byte{}
data, err := json.Marshal(message)
if err != nil {
Expand Down
15 changes: 8 additions & 7 deletions bundler/stats.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package bundler

import (
"time"

"github.com/streamingfast/bstream"
"github.com/streamingfast/dmetrics"
"go.uber.org/zap"
"time"
)

type boundaryStats struct {
Expand Down Expand Up @@ -63,11 +64,11 @@ func (s *boundaryStats) Log() []zap.Field {
zap.Duration("boundary_process_duration", s.boundaryProcessTime),
zap.Duration("upload_duration", s.uploadedDuration),
zap.Duration("data_process_duration", s.procesingDataTime),
zap.Float64("avg_upload_dur", s.avgUploadDuration.Average()),
zap.Float64("total_upload_dur", s.avgUploadDuration.Total()),
zap.Float64("avg_boundary_process_dur", s.avgBoundaryProcessDuration.Average()),
zap.Float64("total_boundary_process_dur", s.avgBoundaryProcessDuration.Total()),
zap.Float64("avg_data_process_dur", s.avgDataProcessDuration.Average()),
zap.Float64("total_data_process_dur", s.avgDataProcessDuration.Total()),
zap.Duration("avg_upload_dur", s.avgUploadDuration.Average()),
zap.Duration("total_upload_dur", s.avgUploadDuration.Total()),
zap.Duration("avg_boundary_process_dur", s.avgBoundaryProcessDuration.Average()),
zap.Duration("total_boundary_process_dur", s.avgBoundaryProcessDuration.Total()),
zap.Duration("avg_data_process_dur", s.avgDataProcessDuration.Average()),
zap.Duration("total_data_process_dur", s.avgDataProcessDuration.Total()),
}
}
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
)

const (
SUPPORTED_MODULE_TYPE = "sf.substreams.entity.v1.EntityChanges"
LEGACY_MODULE_TYPE = "substreams.entity.v1.EntityChanges"
SUPPORTED_MODULE_TYPE = "sf.substreams.sink.entity.v1.EntityChanges"
LEGACY_MODULE_TYPE = "sf.substreams.entity.v1.EntityChanges"
)

var SinkRunCmd = Command(sinkRunE,
Expand Down
File renamed without changes.
File renamed without changes.
4 changes: 2 additions & 2 deletions csvprocessor/entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package csvprocessor
import (
"fmt"

pbentity "github.com/streamingfast/substreams-graph-load/pb/entity/v1"
"github.com/streamingfast/substreams-graph-load/schema"
pbentity "github.com/streamingfast/substreams-sink-entity-changes/pb/sf/substreams/sink/entity/v1"
)

const FieldTypeBigint = "Bigint"
Expand Down Expand Up @@ -46,7 +46,7 @@ func (e *Entity) ValidateFields(desc *schema.EntityDesc) error {
}

func newEntity(in *EntityChangeAtBlockNum, desc *schema.EntityDesc) (*Entity, error) {
if in.EntityChange.Operation == pbentity.EntityChange_DELETE {
if in.EntityChange.Operation == pbentity.EntityChange_OPERATION_DELETE {
return nil, nil
}

Expand Down
10 changes: 5 additions & 5 deletions csvprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
"github.com/streamingfast/dstore"
"github.com/streamingfast/logging"
"github.com/streamingfast/shutter"
pbentity "github.com/streamingfast/substreams-graph-load/pb/entity/v1"
"github.com/streamingfast/substreams-graph-load/schema"
pbentity "github.com/streamingfast/substreams-sink-entity-changes/pb/sf/substreams/sink/entity/v1"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -235,7 +235,7 @@ func (p *Processor) processEntityFile(ctx context.Context, filename string) erro
prev, found := p.entities[ch.EntityChange.ID]

switch ch.EntityChange.Operation {
case pbentity.EntityChange_CREATE:
case pbentity.EntityChange_OPERATION_CREATE:
if found {
return fmt.Errorf("@%d got CREATE on entity %q but it already exists since block %d", ch.BlockNum, ch.EntityChange.ID, prev.StartBlock)
}
Expand All @@ -252,7 +252,7 @@ func (p *Processor) processEntityFile(ctx context.Context, filename string) erro
}
p.entities[ch.EntityChange.ID] = newEnt

case pbentity.EntityChange_UPDATE:
case pbentity.EntityChange_OPERATION_UPDATE:
if p.entityDesc.Immutable {
if err := newEnt.ValidateFields(p.entityDesc); err != nil {
return fmt.Errorf("@%d during UPDATE to an immutable entity: %w", ch.BlockNum, err)
Expand Down Expand Up @@ -282,7 +282,7 @@ func (p *Processor) processEntityFile(ctx context.Context, filename string) erro
prev.Update(newEnt)
p.entities[ch.EntityChange.ID] = prev

case pbentity.EntityChange_DELETE:
case pbentity.EntityChange_OPERATION_DELETE:
if p.entityDesc.Immutable {
return fmt.Errorf("entity %q got deleted but should be immutable", ch.EntityChange.ID)
}
Expand All @@ -295,7 +295,7 @@ func (p *Processor) processEntityFile(ctx context.Context, filename string) erro
}
delete(p.entities, ch.EntityChange.ID)

case pbentity.EntityChange_FINAL:
case pbentity.EntityChange_OPERATION_FINAL:
if p.entityDesc.Immutable {
continue
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@ main() {
fi

pushd "$ROOT" &> /dev/null
go install -ldflags "-X main.Version=$version -X main.Commit=$commit -X main.IsDirty=$dirty" ./cmd/graphload
go install -ldflags "-X main.Version=$version -X main.Commit=$commit -X main.IsDirty=$dirty" ./cmd/substreams-sink-graph-load
popd &> /dev/null

if [[ $KILL_AFTER != "" ]]; then
${GOPATH:-$HOME/go}/bin/graphload "$@" &
${GOPATH:-$HOME/go}/bin/substreams-sink-graph-load "$@" &
active_pid=$!

sleep $KILL_AFTER
kill -s TERM $active_pid &> /dev/null || true
else
exec ${GOPATH:-$HOME/go}/bin/graphload "$@"
exec ${GOPATH:-$HOME/go}/bin/substreams-sink-graph-load "$@"
fi
}

Expand Down
2 changes: 1 addition & 1 deletion devel/uniswap-v3/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ main() {
set -e

out="${OUT_DIR}:-"$ROOT/csv"}"
sink="$ROOT/../graphload"
sink="$ROOT/../substreams-sink-graph-load"

echo dsn
set -x
Expand Down
Loading

0 comments on commit bb3b9de

Please sign in to comment.