diff --git a/src/query/__test__/full_primary_key_spec.ts b/src/query/__test__/full_primary_key_spec.ts index 988e06a..e5032c3 100644 --- a/src/query/__test__/full_primary_key_spec.ts +++ b/src/query/__test__/full_primary_key_spec.ts @@ -209,6 +209,77 @@ describe("FullPrimaryKey", () => { expect(res.records[1].title).to.eq("abc"); }); }); + + describe("#scanAll", () => { + it("should find all items", async () => { + await Card.metadata.connection.documentClient.put({ + TableName: Card.metadata.name, + Item: { + id: 10, + title: "abc", + }, + }).promise(); + await Card.metadata.connection.documentClient.put({ + TableName: Card.metadata.name, + Item: { + id: 10, + title: "abd", + }, + }).promise(); + await Card.metadata.connection.documentClient.put({ + TableName: Card.metadata.name, + Item: { + id: 10, + title: "aba", + }, + }).promise(); + + const res = await primaryKey.scanAll({}); + + expect(res.records.length).to.eq(3); + }); + + it("should find all items with parallelize", async () => { + await Card.metadata.connection.documentClient.put({ + TableName: Card.metadata.name, + Item: { + id: 10, + title: "abc", + }, + }).promise(); + await Card.metadata.connection.documentClient.put({ + TableName: Card.metadata.name, + Item: { + id: 10, + title: "abd", + }, + }).promise(); + await Card.metadata.connection.documentClient.put({ + TableName: Card.metadata.name, + Item: { + id: 10, + title: "aba", + }, + }).promise(); + await Card.metadata.connection.documentClient.put({ + TableName: Card.metadata.name, + Item: { + id: 10, + title: "ccc", + }, + }).promise(); + + const res = await primaryKey.scanAll({ parallelize: 3 }); + + expect(res.records.length).to.eq(4); + // Ordered by range key since it's "scan" + expect(res.records[0].title).to.eq("aba"); + expect(res.records[1].title).to.eq("abc"); + expect(res.records[2].title).to.eq("abd"); + expect(res.records[3].title).to.eq("ccc"); + }); + }); + describe("#update", () => { it("should be able to update items", async () => { await primaryKey.update(10, "abc", { count: ["ADD", 1] }); diff --git a/src/query/__test__/global_secondary_index_spec.ts b/src/query/__test__/global_secondary_index_spec.ts index c5c9f3b..5f1dbf1 100644 --- a/src/query/__test__/global_secondary_index_spec.ts +++ b/src/query/__test__/global_secondary_index_spec.ts @@ -7,6 +7,14 @@ import { Table } from '../../table'; @Decorator.Table({ name: "prod-Card" }) class Card extends Table { + static create(id: number, title: string, count: number) { + const record = new this(); + record.id = id; + record.title = title; + record.count = count; + return record; + } + @Decorator.Attribute() public id: number; @@ -24,6 +32,9 @@ class Card extends Table { @Decorator.FullGlobalSecondaryIndex('title', 'id') static readonly fullTitleIndex: Query.FullGlobalSecondaryIndex; + + @Decorator.Writer() + static readonly writer: Query.Writer; } describe("HashGlobalSecondaryIndex", () => { @@ -37,28 +48,11 @@ describe("HashGlobalSecondaryIndex", () => { describe("#query", () => { it("should find items", async () => { - await Card.metadata.connection.documentClient.put({ - TableName: Card.metadata.name, - Item: { - id: 10, - title: "abc", - }, - }).promise(); - await Card.metadata.connection.documentClient.put({ - TableName: Card.metadata.name, - Item: { - id: 11, - title: "abd", - }, - }).promise(); - await Card.metadata.connection.documentClient.put({ - TableName: Card.metadata.name, - Item: { - id: 12, - title: "abd", - }, - }).promise(); - + await Card.writer.batchPut([ + Card.create(10, "abc", 1), + Card.create(11, "abd", 1), + Card.create(12, "abd", 1), + ]); const res = await Card.hashTitleIndex.query("abd"); expect(res.records.length).to.eq(2); @@ -66,6 +60,25 @@ describe("HashGlobalSecondaryIndex", () => { expect(res.records[1].id).to.eq(11); }); }); + + describe("#scanAll", () => { + it("should find all items", async () => { + await Card.writer.batchPut([ + Card.create(10, "abc", 1), + Card.create(11, "abd", 1), + Card.create(12, "abd", 1), + Card.create(13, "abd", 1), + Card.create(14, "abe", 1), + Card.create(15, "abe", 1), + ]); + const res = await Card.hashTitleIndex.scanAll({ + scanBatchSize: 1, + parallelize: 3, + }); + + expect(res.records.length).to.eq(6); + }); + }); }); describe("FullGlobalSecondaryIndex", () => { @@ -79,36 +92,12 @@ describe("FullGlobalSecondaryIndex", () => { describe("#query", () => { it("should find items", async () => { - await Card.metadata.connection.documentClient.put({ - TableName: Card.metadata.name, - Item: { - id: 10, - title: "abc", - }, - }).promise(); - await Card.metadata.connection.documentClient.put({ - TableName: Card.metadata.name, - Item: { - id: 11, - title: "abd", - }, - }).promise(); - await Card.metadata.connection.documentClient.put({ - TableName: Card.metadata.name, - Item: { - id: 12, - title: "abd", - }, - }).promise(); - await Card.metadata.connection.documentClient.put({ - TableName: Card.metadata.name, - Item: { - id: 13, - title: "abd", - }, - }).promise(); - - + await Card.writer.batchPut([ + Card.create(10, "abc", 1), + Card.create(11, "abd", 1), + Card.create(12, "abd", 1), + Card.create(13, "abd", 1), + ]); const res = await Card.fullTitleIndex.query({ hash: "abd", range: [">=", 12], @@ -120,4 +109,22 @@ describe("FullGlobalSecondaryIndex", () => { expect(res.records[1].id).to.eq(12); }); }); + + describe("#scanAll", () => { + it("should find all items", async () => { + await Card.writer.batchPut([ + Card.create(10, "abc", 1), + Card.create(11, "abd", 1), + Card.create(12, "abd", 1), + Card.create(13, "abd", 1), + Card.create(14, "abe", 1), + Card.create(15, "abe", 1), + ]); + const res = await Card.fullTitleIndex.scanAll({ + scanBatchSize: 1, + parallelize: 3, + }); + expect(res.records.length).to.eq(6); + }); + }); }); diff --git a/src/query/__test__/hash_primary_key_spec.ts b/src/query/__test__/hash_primary_key_spec.ts index cc3ce4c..815343d 100644 --- a/src/query/__test__/hash_primary_key_spec.ts +++ b/src/query/__test__/hash_primary_key_spec.ts @@ -91,6 +91,64 @@ describe("HashPrimaryKey", () => { }); }); + describe("#scanAll", () => { + it("should find all items", async () => { + async function createCard() { + const card = new Card(); + card.id = faker.random.number(); + await card.save(); + return card; + } + + const cards = [ + await createCard(), + await createCard(), + await createCard(), + await createCard(), + await createCard(), + await createCard(), + await createCard(), + await createCard(), + await createCard(), + await createCard(), + await createCard(), + ]; + + const res = await primaryKey.scanAll({ scanBatchSize: 1 }); + + const ids = _.sortBy( + res.records, + (item) => item.id, + ).map((c) => c.id); + + expect(ids).to.deep.eq( _.sortBy(cards.map((c) => c.id), (i) => i) ); + }); + + it("should find all items with parallelize", async () => { + async function createCard() { + const card = new Card(); + card.id = faker.random.number(); + await card.save(); + return card; + } + + const cards = [ + await createCard(), + await createCard(), + await createCard(), + await createCard(), + ]; + + const res = await primaryKey.scanAll({ parallelize: 3 }); + + const ids = _.sortBy( + res.records, + (item) => item.id, + ).map((c) => c.id); + + expect(ids).to.deep.eq( _.sortBy(cards.map((c) => c.id), (i) => i) ); + }); + }); describe("#batchGet", async () => { it ("should return results in order", async () => { diff --git a/src/query/__test__/local_secondary_index_spec.ts b/src/query/__test__/local_secondary_index_spec.ts index 8baa952..6551ed6 100644 --- a/src/query/__test__/local_secondary_index_spec.ts +++ b/src/query/__test__/local_secondary_index_spec.ts @@ -64,4 +64,23 @@ describe("LocalSecondaryIndex", () => { expect(res.records[1].count).to.eq(3); }); }); + + describe("#scanAll", () => { + it("should find all items", async () => { + await Card.writer.batchPut([ + Card.create(10, "abc", 1), + Card.create(11, "abd", 2), + Card.create(12, "abd", 3), + Card.create(13, "abd", 4), + Card.create(14, "abe", 5), + Card.create(15, "abe", 6), + ]); + const res = await Card.countIndex.scanAll({ + scanBatchSize: 1, + parallelize: 3, + }); + + expect(res.records.length).to.eq(6); + }); + }); }); diff --git a/src/query/full_primary_key.ts b/src/query/full_primary_key.ts index 364b892..cb3bca3 100644 --- a/src/query/full_primary_key.ts +++ b/src/query/full_primary_key.ts @@ -9,6 +9,7 @@ import * as Query from './query'; import { batchGetFull, batchGetTrim } from "./batch_get"; import { batchWrite } from "./batch_write"; +import { scanAll } from "./scan_all"; const HASH_KEY_REF = "#hk"; const HASH_VALUE_REF = ":hkv"; @@ -186,6 +187,22 @@ export class FullPrimaryKey { }; } + async scanAll(options: { + parallelize?: number; + scanBatchSize?: number; + }) { + const res = await scanAll( + this.tableClass.metadata.connection.documentClient, + this.tableClass.metadata.name, + options, + ); + + return { + records: res.map((item) => Codec.deserialize(this.tableClass, item)), + count: res.length, + }; + } + async update( hashKey: HashKeyType, sortKey: RangeKeyType, diff --git a/src/query/global_secondary_index.ts b/src/query/global_secondary_index.ts index a5cb462..a6a6b13 100644 --- a/src/query/global_secondary_index.ts +++ b/src/query/global_secondary_index.ts @@ -7,6 +7,8 @@ import * as Codec from '../codec'; import * as Metadata from '../metadata'; import * as Query from './query'; +import { scanAll } from './scan_all'; + const HASH_KEY_REF = "#hk"; const HASH_VALUE_REF = ":hkv"; @@ -67,6 +69,22 @@ export class FullGlobalSecondaryIndex Codec.deserialize(this.tableClass, item)), + count: res.length, + }; + } } export class HashGlobalSecondaryIndex { @@ -103,4 +121,24 @@ export class HashGlobalSecondaryIndex { consumedCapacity: result.ConsumedCapacity, }; } + + async scanAll(options: { + parallelize?: number; + scanBatchSize?: number; + }) { + if (options.parallelize && options.parallelize < 1) { + throw new Error("Parallelize value at scanAll always positive number"); + } + + const res = await scanAll( + this.tableClass.metadata.connection.documentClient, + this.tableClass.metadata.name, + options, + ); + + return { + records: res.map((item) => Codec.deserialize(this.tableClass, item)), + count: res.length, + }; + } } diff --git a/src/query/hash_primary_key.ts b/src/query/hash_primary_key.ts index c5f1004..6794ae6 100644 --- a/src/query/hash_primary_key.ts +++ b/src/query/hash_primary_key.ts @@ -1,4 +1,5 @@ import { DynamoDB } from 'aws-sdk'; +import * as _ from 'lodash'; import * as Codec from '../codec'; import * as Metadata from '../metadata'; @@ -6,6 +7,7 @@ import { ITable, Table } from '../table'; import { batchGetFull, batchGetTrim } from "./batch_get"; import { batchWrite } from "./batch_write"; +import { scanAll } from './scan_all'; export class HashPrimaryKey { constructor( @@ -66,6 +68,22 @@ export class HashPrimaryKey { }; } + async scanAll(options: { + parallelize?: number; + scanBatchSize?: number; + }) { + const res = await scanAll( + this.tableClass.metadata.connection.documentClient, + this.tableClass.metadata.name, + options, + ); + + return { + records: res.map((item) => Codec.deserialize(this.tableClass, item)), + count: res.length, + }; + } + async batchGet(keys: HashKeyType[]) { const res = await batchGetTrim( this.tableClass.metadata.connection.documentClient, diff --git a/src/query/local_secondary_index.ts b/src/query/local_secondary_index.ts index 95c8eec..0626607 100644 --- a/src/query/local_secondary_index.ts +++ b/src/query/local_secondary_index.ts @@ -7,6 +7,8 @@ import * as Codec from '../codec'; import * as Metadata from '../metadata'; import * as Query from './query'; +import { scanAll } from './scan_all'; + const HASH_KEY_REF = "#hk"; const HASH_VALUE_REF = ":hkv"; @@ -67,4 +69,20 @@ export class LocalSecondaryIndex { consumedCapacity: result.ConsumedCapacity, }; } + + async scanAll(options: { + parallelize?: number; + scanBatchSize?: number; + }) { + const res = await scanAll( + this.tableClass.metadata.connection.documentClient, + this.tableClass.metadata.name, + options, + ); + + return { + records: res.map((item) => Codec.deserialize(this.tableClass, item)), + count: res.length, + }; + } } diff --git a/src/query/scan_all.ts b/src/query/scan_all.ts new file mode 100644 index 0000000..14debba --- /dev/null +++ b/src/query/scan_all.ts @@ -0,0 +1,51 @@ +import { DynamoDB } from "aws-sdk"; +import * as _ from "lodash"; + +export async function scanAll( + documentClient: DynamoDB.DocumentClient, + tableName: string, + options: { + parallelize?: number; + scanBatchSize?: number; + }, +) { + if (options.parallelize && options.parallelize < 1) { + throw new Error("Parallelize value at scanAll always positive number"); + } + const buffer: DynamoDB.AttributeMap[] = []; + const totalSegments = options.parallelize || 1; + let scanners = _.times(totalSegments) + .map((i) => ({ + scanner: async (exclusiveStartKey?: DynamoDB.DocumentClient.Key) => + await documentClient.scan({ + TableName: tableName, + Limit: options.scanBatchSize, + ExclusiveStartKey: exclusiveStartKey, + ReturnConsumedCapacity: "TOTAL", + TotalSegments: totalSegments, + Segment: i, + }).promise(), + key: undefined as DynamoDB.DocumentClient.Key | undefined, + })); + + do { + const results = await Promise.all( + _.chain(scanners) + .map(async ({ scanner, key }) => await scanner(key)) + .value(), + ); + buffer.push( + ..._.chain(results) + .map(({ Items }) => (Items || [])) + .flatten() + .value(), + ); + scanners = scanners + .map(({ scanner }, index) => ({ + scanner, key: results[index].LastEvaluatedKey, + })) + .filter(({ key }) => !!key ); + } while(scanners.length); + + return buffer; +}