Skip to content

Commit

Permalink
feat: handle zipped and JSON data better
Browse files Browse the repository at this point in the history
Closes #24
  • Loading branch information
theburningmonk committed Oct 14, 2019
1 parent e9182b2 commit 4dd02a2
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 24 deletions.
30 changes: 25 additions & 5 deletions src/commands/tail-kinesis.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
const _ = require("lodash");
const zlib = require("zlib");
const AWS = require("aws-sdk");
const {Command, flags} = require("@oclif/command");
const {checkVersion} = require("../lib/version-check");
require("colors");

// let isZipped;

class TailKinesisCommand extends Command {
async run() {
const {flags} = this.parse(TailKinesisCommand);
Expand Down Expand Up @@ -101,11 +104,7 @@ const pollKinesis = async (streamName, shardIds) => {
}).promise();

if (!_.isEmpty(resp.Records)) {
resp.Records.forEach(record => {
const timestamp = new Date().toJSON().grey.bold.bgWhite;
const data = Buffer.from(record.Data, "base64").toString("utf-8");
console.log(timestamp, "\n", data);
});
resp.Records.forEach(show);
}

shardIterator = resp.NextShardIterator;
Expand All @@ -117,4 +116,25 @@ const pollKinesis = async (streamName, shardIds) => {
console.log("stopped");
};

const show = (record) => {
const timestamp = new Date().toJSON().grey.bold.bgWhite;
console.log(timestamp);

const buffer = Buffer.from(record.Data, "base64");

let data;
try {
data = zlib.gunzipSync(buffer).toString("utf8");
} catch (_error) {
data = buffer.toString("utf8");
}

try {
const obj = JSON.parse(data);
console.log(JSON.stringify(obj, undefined, 2));
} catch (_error) {
console.log(data);
}
};

module.exports = TailKinesisCommand;
79 changes: 60 additions & 19 deletions test/commands/tail-kinesis.test.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
const _ = require("lodash");
const {expect, test} = require("@oclif/test");
const zlib = require("zlib");
const AWS = require("aws-sdk");
const Promise = require("bluebird");

Expand All @@ -18,12 +19,6 @@ const consoleLog = jest.fn();
console.log = consoleLog;

beforeEach(() => {
mockDescribeStream.mockReset();
mockGetShardIterator.mockReset();
mockGetRecords.mockReset();
consoleLog.mockReset();
mockOpenStdin.mockReset();

mockGetShardIterator.mockReturnValue({
promise: () => Promise.resolve({
ShardIterator: "iterator"
Expand All @@ -35,6 +30,14 @@ beforeEach(() => {
});
});

afterEach(() => {
mockDescribeStream.mockReset();
mockGetShardIterator.mockReset();
mockGetRecords.mockReset();
consoleLog.mockReset();
mockOpenStdin.mockReset();
});

describe("tail-kinesis", () => {
describe("when the stream has one shard", () => {
beforeEach(() => {
Expand All @@ -50,11 +53,8 @@ describe("tail-kinesis", () => {
expect(ctx.stdout).to.contain("checking Kinesis stream [stream-dev] in [us-east-1]");
expect(ctx.stdout).to.contain("polling Kinesis stream [stream-dev] (1 shards)...");

// unfortunately, ctx.stdout doesn't seem to capture the messages published by console.log
// hence this workaround...
const logMessages = _.flatMap(consoleLog.mock.calls, call => call);
expect(logMessages).to.contain("message 1");
expect(logMessages).to.contain("message 2");
thenMessageIsLogged("message 1");
thenMessageIsLogged("message 2");
});
});

Expand All @@ -74,16 +74,55 @@ describe("tail-kinesis", () => {
expect(ctx.stdout).to.contain("checking Kinesis stream [stream-dev] in [us-east-1]");
expect(ctx.stdout).to.contain("polling Kinesis stream [stream-dev] (2 shards)...");

// unfortunately, ctx.stdout doesn't seem to capture the messages published by console.log
// hence this workaround...
const logMessages = _.flatMap(consoleLog.mock.calls, call => call);
expect(logMessages).to.contain("message 1");
expect(logMessages).to.contain("message 2");
expect(logMessages).to.contain("message 3");
thenMessageIsLogged("message 1");
thenMessageIsLogged("message 2");
thenMessageIsLogged("message 3");
});
});

describe("when the records are zipped", () => {
beforeEach(() => {
givenDescribeStreamsReturns(["shard01"]);
givenGetRecordsReturns(["message 1", "message 2"], true);
givenGetRecordsAlwaysReturns([]);
});

test
.stdout()
.command(["tail-kinesis", "-n", "stream-dev", "-r", "us-east-1"])
.it("displays unzipped messages in the console", () => {
thenMessageIsLogged("message 1");
thenMessageIsLogged("message 2");
});
});

describe("when the records are JSON", () => {
const data1 = { message: 42 };
const data2 = { foo: "bar" };

beforeEach(() => {
givenDescribeStreamsReturns(["shard01"]);
givenGetRecordsReturns([JSON.stringify(data1), JSON.stringify(data2)]);
givenGetRecordsAlwaysReturns([]);
});

test
.stdout()
.command(["tail-kinesis", "-n", "stream-dev", "-r", "us-east-1"])
.it("displays prettified JSON messages in the console", () => {
thenMessageIsLogged(JSON.stringify(data1, undefined, 2));
thenMessageIsLogged(JSON.stringify(data2, undefined, 2));
});
});
});

function thenMessageIsLogged(message) {
// unfortunately, ctx.stdout doesn't seem to capture the messages published by console.log
// hence this workaround...
const logMessages = _.flatMap(consoleLog.mock.calls, call => call).join("\n");
expect(logMessages).to.contain(message);
}

function givenDescribeStreamsReturns(shardIds) {
mockDescribeStream.mockReturnValueOnce({
promise: () => Promise.resolve({
Expand All @@ -96,11 +135,13 @@ function givenDescribeStreamsReturns(shardIds) {
});
};

function givenGetRecordsReturns(messages) {
function givenGetRecordsReturns(messages, zip = false) {
mockGetRecords.mockReturnValueOnce({
promise: () => Promise.resolve({
Records: messages.map(msg => ({
Data: Buffer.from(msg, "utf-8").toString("base64")
Data: zip
? zlib.gzipSync(Buffer.from(msg, "utf-8")).toString("base64")
: Buffer.from(msg, "utf-8").toString("base64")
})),
NextIterator: "iterator more"
})
Expand Down

0 comments on commit 4dd02a2

Please sign in to comment.