From 93f1b406df999756ea967d7663fa4c8f66a256a8 Mon Sep 17 00:00:00 2001 From: bob-sl Date: Tue, 8 Jun 2021 10:45:52 -0400 Subject: [PATCH] Update to use streamer package --- external-bridge.js | 2 +- package-lock.json | 8 ++++ package.json | 1 + stream.js | 97 ---------------------------------------------- 4 files changed, 10 insertions(+), 98 deletions(-) delete mode 100644 stream.js diff --git a/external-bridge.js b/external-bridge.js index e9ef321..cc04454 100644 --- a/external-bridge.js +++ b/external-bridge.js @@ -1,5 +1,5 @@ const utils = require('./utils'); -const streamer = require('./stream'); +const streamer = require('@steem-monsters/splinterlands-tx-streamer'); const db = require('@splinterlands/pg-querybuilder'); const interface = require('@splinterlands/hive-interface'); diff --git a/package-lock.json b/package-lock.json index a8d0840..be84fb3 100644 --- a/package-lock.json +++ b/package-lock.json @@ -36,6 +36,14 @@ "pg": "^8.5.1" } }, + "@steem-monsters/splinterlands-tx-streamer": { + "version": "1.0.3", + "resolved": "https://npm.pkg.github.com/download/@steem-monsters/splinterlands-tx-streamer/1.0.3/8f0766b3183f8bc90f6bcac9fab0c310fa5f33c84e9373907855b6ab29883ddb", + "integrity": "sha512-xUkUVdLtncFK1z5PweRlzjpxX277Uh3Sfknr1X2JcN2GGqPKrq7pxhbadl9pClhHK2tH4xiZwDw7ppTPCfv7kg==", + "requires": { + "request": "^2.88.2" + } + }, "accepts": { "version": "1.3.7", "resolved": "https://registry.npmjs.org/accepts/-/accepts-1.3.7.tgz", diff --git a/package.json b/package.json index ac9d9ce..2a365c1 100644 --- a/package.json +++ b/package.json @@ -11,6 +11,7 @@ "dependencies": { "@splinterlands/hive-interface": "^2.1.3", "@splinterlands/pg-querybuilder": "^1.2.0", + "@steem-monsters/splinterlands-tx-streamer": "^1.0.3", "express": "^4.17.1", "request": "^2.88.2" } diff --git a/stream.js b/stream.js deleted file mode 100644 index db5ab07..0000000 --- a/stream.js +++ /dev/null @@ -1,97 +0,0 @@ -const fs = require('fs'); -const utils = require('./utils'); - -let cb = null; -let _last_block = null; -let _is_streaming = false; -let _options = { - state_file_name: 'sl-state.json', - game_api_url: 'http://localhost:3000' -}; - -async function start(callback, options) { - cb = callback; - _options = Object.assign(_options, options); - let last_block = await loadState(); - utils.log(`Streamer starting from block: ${last_block || 'HEAD'}. Op Types: [${!options.types || options.types.length == 0 ? 'All' : options.types}]`); - _is_streaming = true; - getNextBlock(last_block); -} - -async function getNextBlock(last_block) { - let cur_block_num = await utils.getHeadBlockNum().catch(err => { - utils.log(`Error loading last block: ${err}!`, 1, 'Red'); - return null; - }); - - if(!cur_block_num) { - setTimeout(() => getNextBlock(last_block), 1000); - return; - } - - let head_block = cur_block_num - (_options.blocks_behind_head || 0); - - if(!last_block || isNaN(last_block)) - last_block = head_block - 1; - - // We are 20+ blocks behind! - if(head_block >= last_block + 20) - utils.log('Streaming is ' + (head_block - last_block) + ' blocks behind!', 1, 'Red'); - - // If we have a new block, process it - while(head_block > last_block) { - try { - await processBlock(last_block); - saveState(last_block); - last_block++; - } catch (err) { - utils.log(`Error loading block: ${last_block}, Error: ${err}!`, 1, 'Red'); - break; - } - } - - // Attempt to load the next block after a 1 second delay (or faster if we're behind and need to catch up) - setTimeout(() => getNextBlock(last_block), 1000); -} - -async function processBlock(block_num) { - utils.log(`Processing block [${block_num}]`, block_num % 1000 == 0 ? 1 : 4); - let transactions = await utils.getBlock(block_num); - - utils.log(`Processing ${transactions.length} transactions...`, 4); - - if(!transactions) - return; - - for(let i = 0; i < transactions.length; i++) { - try { - if(cb) { - if(!_options.types || _options.types.length == 0 || _options.types.includes(transactions[i].type)) - cb(transactions[i]); - } - } catch(err) { utils.log(`Error processing transaction [${block.transaction_ids[i]}]: ${err}`, 1, 'Red'); } - } -} - -async function loadState() { - // Check if state has been saved to disk, in which case load it - if (fs.existsSync(_options.state_file_name)) { - let state = JSON.parse(fs.readFileSync(_options.state_file_name)); - utils.log('Restored saved state: ' + JSON.stringify(state)); - return state.last_block; - } -} - -function saveState(last_block) { - _last_block = last_block; - - // Save the last block read to disk - fs.writeFile(_options.state_file_name, JSON.stringify({ last_block }), function (err) { - if (err) - utils.log(err); - }); -} - -function getStatus() { return { streaming: _is_streaming, last_block: _last_block }; } - -module.exports = { start, getStatus };