-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.ts
107 lines (93 loc) · 3.53 KB
/
index.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import * as dbg from 'debug';
import { config } from 'dotenv';
import * as Firebase from 'firebase';
import { Observable } from '@reactivex/rxjs';
import { createAgent, RECEIVED_CRAWL_JOBS, RECEIVED_TEST_JOBS, CRAWL_JOBS_REQUEST } from './src/agent';
import { crawler } from './src/crawler';
import { publish } from './src/publisher';
if (process.env.NODE_ENV !== 'production') {
config();
}
const debug = dbg('hlad-main');
const firebaseRef = new Firebase(`https://${process.env.FIREBASE_ID}.firebaseio.com`);
const CRAWL_JOBS_KEY = 'crawl_jobs';
const TEST_JOBS_KEY = 'test_jobs';
const TEST_RESULTS_KEY = 'test_results';
const RECIPES_KEY = 'recipes';
const ENDPOINT_SETTINGS = {
URL: process.env.API_URL,
token: process.env.API_TOKEN,
channel: process.env.API_CHANEL_ID,
};
// don't log tokens and channels
debug(`Publish endpoint: ${JSON.stringify(process.env.API_URL)}`);
const agent$ = createAgent(firebaseRef).share();
// pick recipe from firebase for further processing in `crawlJobs$`
const recipes$ = Observable.create(observer => {
firebaseRef
.child(RECIPES_KEY)
.on('value', recipesSnapshot => {
const recipes = recipesSnapshot.val();
debug(`Received recipes: ${recipes}`);
observer.next(recipes);
}, recipesError => {
debug(`Error when fetching recipes: ${recipesError}`);
observer.error({ type: 'RECIPES_FETCH_ERROR', error: recipesError})
});
});
// call URL from recipe, and parse HTMLText response with recipe definition and remove crawlJob
// publish to endpoint based on `ENDPOINT_SETTINGS`
const crawlJobs$ = agent$
.filter(({ payload, eventType }) => payload && eventType === RECEIVED_CRAWL_JOBS)
.do(val => debug(`crawlJobsSource$ value: ${JSON.stringify(val)}`))
.switchMap(
// fire on crawlJob
crawlJob => recipes$.flatMap(recipesHash => crawler(recipesHash, 200))
)
.do((payload: any) => {
publish(ENDPOINT_SETTINGS, payload.lunchString);
debug(`Removing finished crawlJob ${CRAWL_JOBS_KEY}`);
firebaseRef
.child(CRAWL_JOBS_KEY)
.remove();
});
const crawlJobRequest$ = agent$
.filter(({ eventType }) => eventType === CRAWL_JOBS_REQUEST)
.do(val => debug(`crawlJobRequest$ value: ${JSON.stringify(val)}`))
.switchMap(crawlJob => recipes$.flatMap(recipesHash => crawler(recipesHash, 200)))
.do((payload: any) => {
publish(ENDPOINT_SETTINGS, payload.lunchString);
});
// Logic for testing recipes
// call URL from recipe, parse response, post results to TEST_RESULTS_KEY
const testJobs$ = agent$
.filter(({ payload, eventType }) => payload && eventType === RECEIVED_TEST_JOBS)
.flatMap(({ payload }) => crawler(payload, 0))
.do((result: any) => {
const [ firebaseKey ] = Object.keys(result.recipe);
if (!firebaseKey) {
debug('Missing testJob firebaseKey!');
return;
}
debug(`Processing testJob: ${firebaseKey}`);
firebaseRef
.child(`${TEST_RESULTS_KEY}`)
.push()
.set({
pendingTestResultKey: firebaseKey, // pendingTestResultKey is used by hlad-ui for paring requests
result: result.lunchString,
}, (resultsErr) => {
if (resultsErr) {
debug(`${TEST_RESULTS_KEY} error: ${resultsErr}`);
} else {
debug(`Removing testJob: ${firebaseKey}`);
firebaseRef
.child(`${TEST_JOBS_KEY}/${firebaseKey}`)
.remove();
}
});
});
Observable.merge(crawlJobs$, crawlJobRequest$, testJobs$)
.filter((val: any) => val.payload)
.do(val => debug(`Value after processing: ${JSON.stringify(val)}`))
.subscribe();