-
Notifications
You must be signed in to change notification settings - Fork 1
/
BigqueryArchive.js
113 lines (95 loc) · 4.13 KB
/
BigqueryArchive.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
const bigqueryClient = require( '@google-cloud/bigquery' );
const datastoreClient = require( './lib/DbUtility.js' );
var bigquery;
class BigqueryArchive {
static init( config ) {
bigquery = bigqueryClient({ projectId: config.gcsProjectId }).dataset( config.gcsBqDataset );
return this;
}
run( bigQuery, config, callback ) {
this.bigQuery = bigQuery;
this.kind = config.kind;
this.config = config;
this.callback = callback;
this.schema = config.schema;
this.datastore = datastoreClient({ projectId:process.env.GCP_PROJ_ID, kind:this.kind, schema:this.schema });
if( this.config.lastValue != null ) {
this.updateBigqueryFromDatastore();
} else {
this.findAndUpdateLastValue();
}
}
findAndUpdateLastValue() {
var queryStr = `SELECT MAX(${ this.config.sortBy }) as value FROM [${ this.kind }]`;
bigquery.query( queryStr, (err,rows) => {
if( !err ) {
if( rows && rows[0] && rows[0].value ) {
var lastValue = rows[0].value.value;
this.config.lastValue = new Date([ lastValue.slice(0, 10), 'T', lastValue.slice( 11 ), 'Z' ].join(''));
console.log( `${ this.kind }: lastValue read from Bigquery is ${ this.config.lastValue }.` );
}
this.updateBigqueryFromDatastore();
} else {
console.error(`${ this.kind }: Error while reading from Bigquery.`);
this.callback( err, null );
}
});
}
updateBigqueryFromDatastore() {
var filter = [];
var entities={};
console.log( `${ this.kind }: lastValue after which entities needs to be fetched from Datastore is ${ this.config.lastValue }.` );
if( this.config.lastValue != null ) {
filter.push([ this.config.sortBy, '>=', new Date( this.config.lastValue ) ]);
}
var orderBy = [ this.config.sortBy ];
this.datastore.query( filter, null, null, this.config.batchSize, orderBy ).then( ( updates ) => {
console.log( `${ this.kind }: Found ${ updates.data.length } new additions/updations.` );
if( updates.data.length <= 1 ) {
this.callback( null, updates.data.length );
} else {
var firstEntity = updates.data.splice( 0, 1 );
console.log( `${ this.kind }: Removed First Entity ` + JSON.stringify( firstEntity ) );
this.insertInBigQuery( updates.data, updates.data.length );
}
}).catch( ( err ) => {
console.error(`${ this.kind }: Error while querying on datastore.`);
this.callback( err, null );
});
}
insertInBigQuery( entities, updateCount ) {
console.log( `${ this.kind }: Inserting ${ entities.length } entities to BQ.` );
var rows = [];
entities.forEach( ( entity ) => {
var insertId;
insertId = entity[ this.schema.primaryKey ];
rows.push( { insertId:insertId, json:entity } );
});
bigquery.table( this.kind ).insert( rows, { raw: true }, ( err, apiResponse ) => {
if(err) {
console.error(`${ this.kind }: Error while inserting In BigQuery`);
if( err.name === 'PartialFailureError' ) {
err.errors.forEach( ( err1 ) => {
err1.errors.forEach( ( err2 ) => {
if( err2.reason !== 'stopped' ) {
console.error( `${ this.kind }: ${ JSON.stringify( err1.row ) }` );
console.error( `${ this.kind }: ${ JSON.stringify( err2.reason ) }` );
console.error( `${ this.kind }: ${ JSON.stringify( err2.debugInfo ) }` );
console.error( `${ this.kind }: ${ JSON.stringify( err2.message ) }` );
}
});
});
} else if( err.message === 'Request payload size exceeds the limit: 10485760 bytes.' || err.code === 'ESOCKETTIMEDOUT' ) {
this.config.batchSize = Math.max( this.config.batchSize - 25, 2 );
}
this.callback( err, null );
} else {
this.config.lastValue = new Date(entities[ entities.length -1 ][ this.config.sortBy ]);
console.log( `${ this.kind }: ${ rows.length } records inserted !` );
this.config.batchSize = Math.min( this.config.batchSize + 25, 1000 );
this.callback( null, rows.length );
}
});
}
}
module.exports = BigqueryArchive;