-
Notifications
You must be signed in to change notification settings - Fork 5
/
api_sync_v1.py
131 lines (108 loc) · 4.43 KB
/
api_sync_v1.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
"""Workflow to sync saved CSV output data
to version 1 of the open skills API database
"""
import os
from datetime import datetime
import alembic.config
from airflow.operators import BaseOperator
from config import config
from skills_utils.time import datetime_to_year_quarter, datetime_to_quarter
from utils.db import get_apiv1_dbengine as get_db
from api_sync.v1 import \
load_jobs_master, \
load_alternate_titles, \
load_jobs_unusual_titles, \
load_skills_master, \
load_skills_importance, \
load_geo_title_counts, \
load_title_counts
from utils.dags import QuarterlySubDAG
default_args = {
'depends_on_past': False,
'start_date': datetime(2011, 1, 1),
}
def define_api_sync(main_dag_name):
dag = QuarterlySubDAG(main_dag_name, 'api_v1_sync')
output_folder = config.get('output_folder', 'output')
table_files = {
'jobs_master': 'job_titles_master_table.tsv',
'skills_master': 'skills_master_table.tsv',
'interesting_jobs': 'interesting_job_titles.tsv',
'skill_importance': 'ksas_importance.tsv',
'geo_title_count': 'geo_title_count_{}.csv',
'title_count': 'title_count_{}.csv',
}
def full_path(filename):
output_folder = os.environ.get('OUTPUT_FOLDER', None)
if not output_folder:
output_folder = config.get('output_folder', 'output')
return os.path.join(output_folder, filename)
class JobMaster(BaseOperator):
def execute(self, context):
engine = get_db()
load_jobs_master(full_path(table_files['jobs_master']), engine)
class SkillMaster(BaseOperator):
def execute(self, context):
engine = get_db()
load_skills_master(full_path(table_files['skills_master']), engine)
class JobAlternateTitles(BaseOperator):
def execute(self, context):
engine = get_db()
load_alternate_titles(full_path(table_files['jobs_master']), engine)
class JobUnusualTitles(BaseOperator):
def execute(self, context):
engine = get_db()
load_jobs_unusual_titles(full_path(table_files['interesting_jobs']), engine)
class SkillImportance(BaseOperator):
def execute(self, context):
engine = get_db()
load_skills_importance(full_path(table_files['skill_importance']), engine)
class GeoTitleCounts(BaseOperator):
def execute(self, context):
year, quarter = datetime_to_year_quarter(context['execution_date'])
quarter_string = datetime_to_quarter(context['execution_date'])
engine = get_db()
load_geo_title_counts(
filename=full_path(table_files['geo_title_count']).format(quarter_string),
year=year,
quarter=quarter,
db_engine=engine,
)
class TitleCounts(BaseOperator):
def execute(self, context):
year, quarter = datetime_to_year_quarter(context['execution_date'])
quarter_string = datetime_to_quarter(context['execution_date'])
engine = get_db()
load_title_counts(
filename=full_path(table_files['title_count']).format(quarter_string),
year=year,
quarter=quarter,
db_engine=engine,
)
class SchemaUpgrade(BaseOperator):
def execute(self, context):
alembic.config.main(argv=['--raiseerr', 'upgrade', 'head'])
schema_upgrade = SchemaUpgrade(task_id='schema_upgrade', dag=dag)
job_master = JobMaster(task_id='job_master', dag=dag)
skill_master = SkillMaster(task_id='skill_master', dag=dag)
alternate_titles = JobAlternateTitles(task_id='alternate_titles', dag=dag)
unusual_titles = JobUnusualTitles(task_id='unusual_titles', dag=dag)
skill_importance = SkillImportance(task_id='skill_importance', dag=dag)
geo_title_counts = GeoTitleCounts(task_id='geo_title_counts', dag=dag)
title_counts = TitleCounts(task_id='title_counts', dag=dag)
alternate_titles.set_upstream(job_master)
unusual_titles.set_upstream(job_master)
skill_importance.set_upstream(job_master)
skill_importance.set_upstream(skill_master)
all_tasks = [
job_master,
skill_master,
alternate_titles,
unusual_titles,
skill_importance,
geo_title_counts,
title_counts
]
for task in all_tasks:
task.set_upstream(schema_upgrade)
return dag