Skip to content

Commit

Permalink
Update substreams
Browse files Browse the repository at this point in the history
- add default host (optional)
- set default startblock
- improve stopBlock handling
- update example using ETH
  • Loading branch information
DenisCarriere committed Jan 31, 2023
1 parent 0f3f3df commit 1eadea5
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 51 deletions.
25 changes: 11 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,32 +35,29 @@ yarn add substreams
const { Substreams, download } = require("substreams");

// User input
const host = "<FIREHOSE HOST>";
const substream = "https://ipfs.pinax.network/ipfs/QmfE7kdRAPihhvij4ej3rUM2Sp3PcXQ9rTFCQPhPGB5dr5";
const outputModules = ["map_action_traces"];
const startBlockNum = "283000000";
const stopBlockNum = "283001000";
const spkg = "https://github.com/pinax-network/subtivity-substreams/releases/download/v0.1.0/subtivity-ethereum-v0.1.0.spkg";
const outputModule = "map_block_stats";
const startBlockNum = "300000";
const stopBlockNum = "+10";

// Initialize Substreams
const substreams = new Substreams(host, {
const substreams = new Substreams(outputModule, {
startBlockNum,
stopBlockNum,
outputModules,
authorization: process.env.SUBSTREAMS_API_TOKEN
});

(async () => {
// download Substream from IPFS
const {modules, registry} = await download(substream);
const {modules, registry} = await download(spkg);

// Find Protobuf message types from registry
const ActionTraces = registry.findMessage("sf.antelope.type.v1.ActionTraces");
if ( !ActionTraces) throw new Error("Could not find ActionTraces message type");
const BlockStats = registry.findMessage("subtivity.v1.BlockStats");
if ( !BlockStats) throw new Error("Could not find BlockStats message type");

substreams.on("mapOutput", output => {
const { actionTraces } = ActionTraces.fromBinary(output.data.mapOutput.value);
for ( const actionTrace of actionTraces ) {
console.log(actionTrace);
}
const decoded = BlockStats.fromBinary(output.data.mapOutput.value);
console.log(decoded);
});

// start streaming Substream
Expand Down
27 changes: 12 additions & 15 deletions example.js
Original file line number Diff line number Diff line change
@@ -1,32 +1,29 @@
const { Substreams, download } = require("substreams");
const { Substreams, download } = require("./");

// User input
const host = "<FIREHOSE HOST>";
const substream = "https://ipfs.pinax.network/ipfs/QmfE7kdRAPihhvij4ej3rUM2Sp3PcXQ9rTFCQPhPGB5dr5";
const outputModules = ["map_action_traces"];
const startBlockNum = "283000000";
const stopBlockNum = "283001000";
const spkg = "https://github.com/pinax-network/subtivity-substreams/releases/download/v0.1.0/subtivity-ethereum-v0.1.0.spkg";
const outputModule = "map_block_stats";
const startBlockNum = "300000";
const stopBlockNum = "+10";

// Initialize Substreams
const substreams = new Substreams(host, {
const substreams = new Substreams(outputModule, {
startBlockNum,
stopBlockNum,
outputModules,
authorization: process.env.SUBSTREAMS_API_TOKEN
});

(async () => {
// download Substream from IPFS
const {modules, registry} = await download(substream);
const {modules, registry} = await download(spkg);

// Find Protobuf message types from registry
const ActionTraces = registry.findMessage("sf.antelope.type.v1.ActionTraces");
if ( !ActionTraces) throw new Error("Could not find ActionTraces message type");
const BlockStats = registry.findMessage("subtivity.v1.BlockStats");
if ( !BlockStats) throw new Error("Could not find BlockStats message type");

substreams.on("mapOutput", output => {
const { actionTraces } = ActionTraces.fromBinary(output.data.mapOutput.value);
for ( const actionTrace of actionTraces ) {
console.log(actionTrace);
}
const decoded = BlockStats.fromBinary(output.data.mapOutput.value);
console.log(decoded);
});

// start streaming Substream
Expand Down
35 changes: 35 additions & 0 deletions examples/subtivity.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { Substreams, download } from "../src";

// User input
const spkg = "https://github.com/pinax-network/subtivity-substreams/releases/download/v0.1.0/subtivity-ethereum-v0.1.0.spkg";
const outputModule = "map_block_stats";
const startBlockNum = "300000";
const stopBlockNum = "+10";

// Initialize Substreams
const substreams = new Substreams(outputModule, {
startBlockNum,
stopBlockNum,
authorization: process.env.SUBSTREAMS_API_TOKEN
});

(async () => {
// download Substream from IPFS
const {modules, registry} = await download(spkg);

// Find Protobuf message types from registry
const BlockStats = registry.findMessage("subtivity.v1.BlockStats");
if ( !BlockStats) throw new Error("Could not find BlockStats message type");

substreams.on("mapOutput", output => {
const decoded = BlockStats.fromBinary(output.data.mapOutput.value);
console.log(decoded);
});

// start streaming Substream
await substreams.start(modules);

// end of Substream
console.log("done");
process.exit();
})();
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "substreams",
"description": "Substreams Javascript consumer",
"version": "0.2.1",
"version": "0.3.0",
"homepage": "https://github.com/pinax-network/substreams-js",
"main": "dist/index.js",
"types": "dist/index.d.js",
Expand Down
31 changes: 11 additions & 20 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ export * from "./generated/sf/substreams/v1/substreams"
export * from "./utils";

// Utils
import { parseBlockData } from './utils';
import { parseBlockData, parseStopBlock } from './utils';

interface MapOutput extends ModuleOutput {
data: {
Expand Down Expand Up @@ -48,21 +48,22 @@ export class Substreams extends (EventEmitter as new () => TypedEmitter<MessageE
public client: StreamClient;

// configs
public host = "mainnet.eth.streamingfast.io:443";
public startBlockNum?: string;
public stopBlockNum?: string;
public outputModule?: string;
public outputModules?: string[];
public cursor?: string;
public startCursor?: string;
public irreversibilityCondition?: string;
public forkSteps?: ForkStep[];
public initialStoreSnapshotForModules?: string[];
public debugInitialStoreSnapshotForModules?: string[];
public productionMode?: boolean;
public productionMode = false;

private stopped = false;

constructor(host: string, outputModule?: string | string[], options: {
constructor(outputModule: string, options: {
host?: string,
startBlockNum?: string,
stopBlockNum?: string,
authorization?: string,
Expand All @@ -74,16 +75,16 @@ export class Substreams extends (EventEmitter as new () => TypedEmitter<MessageE
debugInitialStoreSnapshotForModules?: string[],
} = {}) {
super();
if ( Array.isArray(outputModule) ) this.outputModules = outputModule;
else this.outputModule = outputModule;
this.startBlockNum = options.startBlockNum;
this.stopBlockNum = options.stopBlockNum;
this.outputModule = outputModule;
this.startBlockNum = options.startBlockNum ?? "0";
this.stopBlockNum = parseStopBlock(this.startBlockNum, options.stopBlockNum);
this.startCursor = options.startCursor;
this.irreversibilityCondition = options.irreversibilityCondition;
this.forkSteps = options.forkSteps;
this.initialStoreSnapshotForModules = options.initialStoreSnapshotForModules;
this.productionMode = options.productionMode;
this.debugInitialStoreSnapshotForModules = options.debugInitialStoreSnapshotForModules;
this.productionMode = options.productionMode ?? false;
this.host = options.host ?? "mainnet.eth.streamingfast.io:443";

// Credentials
const metadata = new Metadata();
Expand All @@ -96,7 +97,7 @@ export class Substreams extends (EventEmitter as new () => TypedEmitter<MessageE
// Substream Client
this.client = new StreamClient(
new GrpcTransport({
host,
host: this.host,
channelCredentials: creds,
}),
);
Expand All @@ -115,15 +116,6 @@ export class Substreams extends (EventEmitter as new () => TypedEmitter<MessageE
if ( !Number.isInteger(startBlockNum)) throw new Error("startBlockNum must be an integer");
}

// production mode validation
if ( this.productionMode ) {
if ( !this.outputModule ) throw new Error("outputModule is required");

// development mode validation
} else {
if ( !this.outputModule && !this.outputModules?.length ) throw new Error("outputModule or outputModules is required");
}

// Setup Substream
const stream = this.client.blocks(Request.create({
modules,
Expand All @@ -138,7 +130,6 @@ export class Substreams extends (EventEmitter as new () => TypedEmitter<MessageE
this.emit("block", block);

for ( const output of block.outputs ) {
console.log(output);
if ( output.data.oneofKind == "mapOutput" ) {
const { value } = output.data.mapOutput;
if ( !value.length ) continue;
Expand Down
7 changes: 6 additions & 1 deletion src/utils.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import assert from 'node:assert';
import { describe, it } from 'node:test';
import { formatDate, isIpfs } from './utils';
import { formatDate, isIpfs, parseStopBlock } from './utils';

describe('substreams', () => {
it("formatDate", () => {
Expand All @@ -13,6 +13,11 @@ describe('substreams', () => {
assert.equal(isIpfs("QmUatvHNjq696qkB8SBz5VBytcEeTrM1VwFyy4Rt4Z43mX"), true);
assert.equal(isIpfs("not IPFS"), false);
});

it("parseStopBlock", async () => {
assert.equal(parseStopBlock("0", "+100"), "100");
assert.equal(parseStopBlock("400", "+100"), "500");
});
});

/**
Expand Down
7 changes: 7 additions & 0 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ export function getSeconds( clock?: Clock ) {

export const isIpfs = ( str: string ) => /^Qm[1-9A-Za-z]{44}$/.test(str);

export function parseStopBlock( startBlock: string, stopBlock?: string ) {
if (!stopBlock) return;
if ( stopBlock.includes("+")) return String(Number(startBlock) + Number(stopBlock));
if ( stopBlock.includes("-")) throw new Error(`stopBlock cannot be negative: ${stopBlock}`);
return stopBlock;
}

export async function download( url: string ) {
if ( isIpfs(url) ) url = `https://ipfs.pinax.network/ipfs/${url}`;
const binary = await downloadBuffer(url);
Expand Down

0 comments on commit 1eadea5

Please sign in to comment.