Skip to content

Commit

Permalink
Add anyMessage decoded MapOutputs + include protobufs
Browse files Browse the repository at this point in the history
  • Loading branch information
DenisCarriere committed Mar 28, 2023
1 parent eee5786 commit ba73642
Show file tree
Hide file tree
Showing 28 changed files with 2,522 additions and 32 deletions.
25 changes: 7 additions & 18 deletions example.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
const { Substreams, download, unpack } = require("./");
const { Substreams, download } = require("./");

// User parameters
const url = "https://github.com/pinax-network/subtivity-substreams/releases/download/v0.2.0/subtivity-ethereum-v0.2.0.spkg";
const outputModule = "map_block_stats";
const startBlockNum = "300000";
const url = "https://github.com/streamingfast/substreams-ethereum-quickstart/releases/download/1.0.0/substreams-ethereum-quickstart-v1.0.0.spkg";
const outputModule = "map_block";
const startBlockNum = "12292922";
const stopBlockNum = "+10";

(async () => {
Expand All @@ -16,33 +16,22 @@ const stopBlockNum = "+10";
stopBlockNum,
authorization: process.env.SUBSTREAMS_API_TOKEN
});

// Find Protobuf message types from registry
const { registry } = unpack(spkg);
const BlockStats = registry.findMessage("subtivity.v1.BlockStats");
if ( !BlockStats) throw new Error("Could not find BlockStats message type");

// first block received
substreams.on("start", (cursor, clock) => {
console.log({status: "start", cursor, clock});
});

// on every map output received
substreams.on("mapOutput", (output, clock) => {
const decoded = BlockStats.fromBinary(output.data.value.value);
console.log({decoded, clock});
// stream of decoded MapOutputs
substreams.on("anyMessage", (message) => {
console.log({message});
});

// end of stream
substreams.on("end", (cursor, clock) => {
console.log({status: "end", cursor, clock});
});

// head block time drift
substreams.on("head_block_time_drift", (seconds) => {
console.log({head_block_time_drift: seconds});
});

// start streaming Substream
substreams.start();
})();
4 changes: 2 additions & 2 deletions examples/subtivity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ const authorization = process.env.SUBSTREAMS_API_TOKEN;
substreams.on("start", (cursor, clock) => {
console.log({status: "start", cursor, clock});
});
substreams.on("mapOutput", (mapOutput, clock) => {
console.log({mapOutput, clock});
substreams.on("anyMessage", (message, clock) => {
console.log({message, clock});
});
substreams.start();
})();
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "substreams",
"description": "Substreams Javascript consumer",
"version": "0.4.2",
"version": "0.5.0",
"homepage": "https://github.com/pinax-network/substreams-js",
"main": "dist/index.js",
"types": "dist/index.d.ts",
Expand Down
101 changes: 101 additions & 0 deletions proto/pinax/substreams/sink/prometheus/v1/prometheus.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
syntax = "proto3";

package pinax.substreams.sink.prometheus.v1;

// Vector of Prometheus metrics
message PrometheusOperations {
repeated PrometheusOperation operations = 1;
}

message PrometheusOperation {
string name = 1; // Name of the Prometheus metric
map<string, string> labels = 2; // Labels represents a collection of label name -> value mappings.
oneof operation {
GaugeOp gauge = 3;
CounterOp counter = 4;
HistogramOp histogram = 5;
SummaryOp summary = 6;
}
}

message GaugeOp {
enum Operation {
// Protobuf default should not be used, this is used so that the consume can ensure that the value was actually specified
OPERATION_UNSPECIFIED = 0;
// Inc increments the Gauge by 1. Use Add to increment it by arbitrary values.
OPERATION_INC = 1;
// Add adds the given value to the Gauge. (The value can be negative, resulting in a decrease of the Gauge.)
OPERATION_ADD = 2; // float
// Set sets the Gauge to an arbitrary value.
OPERATION_SET = 3; // float
// Dec decrements the Gauge by 1. Use Sub to decrement it by arbitrary values.
OPERATION_DEC = 4;
// Sub subtracts the given value from the Gauge. (The value can be negative, resulting in an increase of the Gauge.)
OPERATION_SUB = 5; // float
// SetToCurrentTime sets the Gauge to the current Unix time in seconds.
OPERATION_SET_TO_CURRENT_TIME = 6;
// Remove metrics for the given label values
OPERATION_REMOVE = 7;
// Reset gauge values
OPERATION_RESET = 8;
}
Operation operation = 1;
double value = 2; // Value (Float) to be used in the operation
}

message CounterOp {
enum Operation {
// Protobuf default should not be used, this is used so that the consume can ensure that the value was actually specified
OPERATION_UNSPECIFIED = 0;
// Increments the Counter by 1.
OPERATION_INC = 1;
// Adds an arbitrary value to a Counter. (Returns an error if the value is < 0.)
OPERATION_ADD = 2; // float
// Remove metrics for the given label values
OPERATION_REMOVE = 7;
// Reset counter values
OPERATION_RESET = 8;
}
Operation operation = 1;
double value = 2; // Value (Float) to be used in the operation
}

message SummaryOp {
enum Operation {
// Protobuf default should not be used, this is used so that the consume can ensure that the value was actually specified
OPERATION_UNSPECIFIED = 0;
// Observe adds a single observation to the summary.
// Observations are usually positive or zero.
// Negative observations are accepted but prevent current versions of Prometheus from properly detecting counter resets in the sum of observations
OPERATION_OBSERVE = 1;
// Start a timer. Calling the returned function will observe the duration in seconds in the summary.
OPERATION_START_TIMER = 2;
// Remove metrics for the given label values
OPERATION_REMOVE = 7;
// Reset counter values
OPERATION_RESET = 8;
}
Operation operation = 1;
double value = 2; // Value (Float) to be used in the operation
}

message HistogramOp {
enum Operation {
// Protobuf default should not be used, this is used so that the consume can ensure that the value was actually specified
OPERATION_UNSPECIFIED = 0;
// Observe adds a single observation to the histogram.
// Observations are usually positive or zero.
// Negative observations are accepted but prevent current versions of Prometheus from properly detecting counter resets in the sum of observations.
OPERATION_OBSERVE = 1;
// Start a timer. Calling the returned function will observe the duration in seconds in the summary.
OPERATION_START_TIMER = 2;
// Initialize the metrics for the given combination of labels to zero
OPERATION_ZERO = 3;
// Remove metrics for the given label values
OPERATION_REMOVE = 7;
// Reset counter values
OPERATION_RESET = 8;
}
Operation operation = 1;
double value = 2; // Value (Float) to be used in the operation
}
33 changes: 33 additions & 0 deletions proto/pinax/substreams/sink/winston/v1/winston.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
syntax = "proto3";

package pinax.substreams.sink.winston.v1;

option go_package = "github.com/pinax-network/substreams-sink-winston/pb;pbkv";

// Vector of Winston Logging messages
message LoggerOperations {
repeated LoggerOperation operations = 1;
}

message LoggerOperation {
string service = 1;
LoggingLevels level = 2;
string message = 3;
map<string, string> meta = 4;
}

// Each level is given a specific integer priority.
// The higher the priority the more important the message is considered to be,
// and the lower the corresponding integer priority.
// For example, as specified exactly in RFC5424 the syslog levels are prioritized from 0 to 7 (highest to lowest).
enum LoggingLevels {
// UNSPECIFIED = 0; // Unspecified: default value
EMERG = 0; // Emergency: system is unusable
ALERT = 1; // Alert: action must be taken immediately
CRIT = 2; // Critical: critical conditions
ERROR = 3; // Error: error conditions
WARNING = 4; // Warning: warning conditions
NOTICE = 5; // Notice: normal but significant condition
INFO = 6; // Informational: informational messages
DEBUG = 7; // Debug: debug-level messages
}
29 changes: 29 additions & 0 deletions proto/sf/substreams/sink/database/v1/database.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
syntax = "proto3";

package sf.substreams.sink.database.v1;

option go_package = "github.com/streamingfast/substreams-database-change/pb;pbdatabase";

message DatabaseChanges {
repeated TableChange table_changes = 1;
}

message TableChange {
string table = 1;
string pk = 2;
uint64 ordinal = 3;
enum Operation {
UNSET = 0; // Protobuf default should not be used, this is used so that the consume can ensure that the value was actually specified
CREATE = 1;
UPDATE = 2;
DELETE = 3;
}
Operation operation = 4;
repeated Field fields = 5;
}

message Field {
string name = 1;
string new_value = 2;
string old_value = 3;
}
46 changes: 46 additions & 0 deletions proto/sf/substreams/sink/entity/v1/entity.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
syntax = "proto3";

package sf.substreams.entity.v1;

message EntityChanges {
repeated EntityChange entity_changes = 5;
}

message EntityChange {
string entity = 1;
string id = 2;
uint64 ordinal = 3;
enum Operation {
UNSET = 0; // Protobuf default should not be used, this is used so that the consume can ensure that the value was actually specified
CREATE = 1;
UPDATE = 2;
DELETE = 3;
}
Operation operation = 4;
repeated Field fields = 5;
}

message Value {
oneof typed {
int32 int32 = 1;
string bigdecimal = 2;
string bigint = 3;
string string = 4;
string bytes = 5;
bool bool = 6;

//reserved 7 to 9; // For future types

Array array = 10;
}
}

message Array {
repeated Value value = 1;
}

message Field {
string name = 1;
optional Value new_value = 3;
optional Value old_value = 5;
}
21 changes: 21 additions & 0 deletions proto/sf/substreams/sink/kv/v1/kv.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
syntax = "proto3";

package sf.substreams.sink.kv.v1;

option go_package = "github.com/streamingfast/substreams-sink-kv/pb;pbkv";

message KVOperations {
repeated KVOperation operations = 1;
}

message KVOperation {
string key = 1;
bytes value = 2;
uint64 ordinal = 3;
enum Type {
UNSET = 0; // Protobuf default should not be used, this is used so that the consume can ensure that the value was actually specified
SET = 1;
DELETE = 2;
}
Type type = 4;
}
Loading

0 comments on commit ba73642

Please sign in to comment.