Skip to content

Commit

Permalink
Merge pull request #2 from airflow-plugins/develop-rc0.0.2
Browse files Browse the repository at this point in the history
Develop rc0.0.2
  • Loading branch information
Ben authored Jan 25, 2018
2 parents 324094f + 0c92c7e commit 7bb458d
Show file tree
Hide file tree
Showing 16 changed files with 1,903 additions and 1,258 deletions.
36 changes: 35 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,35 @@
# marketo_plugin
# Plugin - Marketo to S3

This plugin moves data from the [Marketo](http://developers.marketo.com/rest-api/) API to S3 based on the specified object

## Hooks
### MarketoHook
This hook handles the authentication and request to Marketo. This extends the HttpHook.

### S3Hook
[Core Airflow S3Hook](https://pythonhosted.org/airflow/_modules/S3_hook.html) with the standard boto dependency.

## Operators
### MarketoToS3Operator
This operator composes the logic for this plugin. It fetches the Marketo specified object and saves the result in a S3 Bucket, under a specified key, in
njson format. The parameters it can accept include the following.

#### NOTE: The only currently supported + tested output format is json. There are references to avro in this code but support for that format had to be delayed.

`marketo_conn_id` The Airflow connection id used to store the Marketo credentials.
`marketo_endpoint` The endpoint to retreive data for. Possible values include:
- activities
- campaigns
- leads
- programs
- lead_lists
- `start_at` The starting date parameter. Ignored by all endpoints but leads.
- `end_at` The ending date parameter. Ignored by all endpoints but leads.
- `payload` Payload variables -- all are optional
- `output_format` The output format of the data. Possible values include:
- json
- avro
Defaults to json.
- `s3_conn_id` The Airflow connection id used to store the S3 credentials.
- `s3_bucket` The S3 bucket to be used to store the Marketo data.
- `s3_key` The S3 key to be used to store the Marketo data.
97 changes: 75 additions & 22 deletions operators/marketo_to_s3_operator.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,27 @@
from airflow.hooks.S3_hook import S3Hook
from airflow.models import BaseOperator
from airflow.models import BaseOperator, SkipMixin

from MarketoPlugin.hooks.marketo_hook import MarketoHook
from MarketoPlugin.schemas import marketo_schema
from MarketoPlugin.schemas._schema import schema

import avro.schema
from avro.datafile import DataFileWriter
from avro.io import DatumWriter

import json
from time import sleep
import boa
import json
import logging
from tempfile import NamedTemporaryFile
from time import sleep
from csv import reader


class MarketoToS3Operator(BaseOperator):
class MarketoToS3Operator(BaseOperator, SkipMixin):
"""
NOTE: The only currently supported + tested output format is json.
There are references to avro in this code but support for that format
had to be delayed.
Marketo to S3 Operator
:param marketo_conn_id: The Airflow connection id used to store
the Marketo credentials.
Expand All @@ -35,6 +41,12 @@ class MarketoToS3Operator(BaseOperator):
endpoints but leads.
:type end_at: Isoformat timestamp
:param payload: Payload variables -- all are optional
:param output_format: The output format of the data. Possible
values include:
- json
- avro
Defaults to json.
:type output_format string
:param s3_conn_id: The Airflow connection id used to store
the S3 credentials.
:type s3_conn_id: string
Expand All @@ -57,17 +69,19 @@ def __init__(self,
s3_conn_id,
s3_bucket,
s3_key,
output_format='json',
start_at=None,
end_at=None,
payload={},
*args,
**kwargs):
super().__init__(*args, **kwargs)
self.marketo_conn_id = marketo_conn_id
self.endpoint = endpoint
self.endpoint = endpoint.lower()
self.s3_conn_id = s3_conn_id
self.s3_bucket = s3_bucket
self.s3_key = s3_key
self.output_format = output_format.lower()
self.start_at = start_at
self.end_at = end_at
self.payload = payload
Expand All @@ -80,6 +94,8 @@ def __init__(self,

raise Exception('Specified endpoint not currently supported.')

if self.output_format not in ('json'):
raise Exception('Specified output format not currently supported.')
def execute(self, context):
self.token = (MarketoHook(http_conn_id=self.marketo_conn_id)
.run(self.methodMapper('auth'))
Expand Down Expand Up @@ -114,7 +130,6 @@ def execute(self, context):
job = post_hook.run(self.methodMapper('leads_create'),
data=json.dumps(request),
token=self.token).json()
print(job)
export_id = [e['exportId'] for e in job['result']][0]

status = [e['status'] for e in post_hook.run('bulk/v1/leads/export/{0}/enqueue.json'.format(export_id),
Expand All @@ -127,20 +142,39 @@ def execute(self, context):

output = get_hook.run('bulk/v1/leads/export/{0}/file.json'.format(export_id),
token=self.token).text

output = output.split('\n')
headers = output.pop(0).split(',')
del output[0]
headers = [boa.constrict(header) for header in headers]
output = [row.split(',') for row in output]
output = [row for row in reader(output)]
output = [dict(zip(headers, row)) for row in output]
schema = getattr(marketo_schema, self.endpoint)
marketo_schema = schema[self.endpoint]
field_names = []
for field in schema['fields']:
for field in marketo_schema['fields']:
field_names.append(field['name'])
print('DIFF: ' + str(set(headers) - set(field_names)))
logging.info('DIFF: ' + str(set(headers) - set(field_names)))
else:
output = self.paginate_data()
logging.info(len('Output Length: ' + str(output)))

if len(output) == 0 or output is None:
logging.info("No records pulled from Marketo.")
downstream_tasks = context['task'].get_flat_relatives(upstream=False)
logging.info('Skipping downstream tasks...')
logging.debug("Downstream task_ids %s", downstream_tasks)

if downstream_tasks:
self.skip(context['dag_run'],
context['ti'].execution_date,
downstream_tasks)

self.outputManager(output, self.s3_key, self.s3_bucket)
return True
else:
self.outputManager(self.nullify_output(output),
self.s3_key,
self.s3_bucket,
self.output_format)

def methodMapper(self, endpoint):
"""
Expand Down Expand Up @@ -215,22 +249,41 @@ def make_request(http_conn_id,
output = [{boa.constrict(k): v for k, v in i.items()} for i in output]
return output

def outputManager(self, output, key, bucket):
schema = avro.schema.Parse(json.dumps(getattr(marketo_schema,
self.endpoint)))
writer = DataFileWriter(open("{0}.avro".format(self.endpoint), "wb"),
DatumWriter(),
schema)
for record in output:
writer.append(record)
def outputManager(self, output, key, bucket, output_format='json'):
if output_format == 'avro':
avro_schema = avro.schema.Parse(json.dumps(schema[self.endpoint]))
writer = DataFileWriter(open("{0}.avro".format(self.endpoint), "wb"),
DatumWriter(),
avro_schema)
for record in output:
writer.append(record)

writer.close()

output_file = "{0}.avro".format(self.endpoint)

elif output_format == 'json':
tmp = NamedTemporaryFile("w")

writer.close()
for row in output:
tmp.write(json.dumps(row) + '\n')

tmp.flush()

output_file = tmp.name

s3 = S3Hook(s3_conn_id=self.s3_conn_id)

s3.load_file(
filename="{0}.avro".format(self.endpoint),
filename=output_file,
key=self.s3_key,
bucket_name=self.s3_bucket,
replace=True
)

def nullify_output(self, output):
for record in output:
for k, v in record.items():
if v == 'null':
record[k] = None
return output
Binary file removed schemas/__pycache__/__init__.cpython-34.pyc
Binary file not shown.
Binary file removed schemas/__pycache__/_schema.cpython-34.pyc
Binary file not shown.
Binary file removed schemas/__pycache__/activities.cpython-34.pyc
Binary file not shown.
Binary file removed schemas/__pycache__/campaigns.cpython-34.pyc
Binary file not shown.
Binary file removed schemas/__pycache__/lead_lists.cpython-34.pyc
Binary file not shown.
Binary file removed schemas/__pycache__/leads.cpython-34.pyc
Binary file not shown.
Binary file removed schemas/__pycache__/marketo_schema.cpython-34.pyc
Binary file not shown.
12 changes: 12 additions & 0 deletions schemas/_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from MarketoPlugin.schemas.activities import activities
from MarketoPlugin.schemas.campaigns import campaigns
from MarketoPlugin.schemas.lead_lists import lead_lists
from MarketoPlugin.schemas.leads import leads
from MarketoPlugin.schemas.programs import programs


schema = {'activities': activities,
'campaigns': campaigns,
'lead_lists': lead_lists,
'leads': leads,
'programs': programs}
43 changes: 43 additions & 0 deletions schemas/activities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
activities = {"name": "activities",
"type": "record",
"fields": [{"name": "activity_date",
"type": ["null", "string"],
"default": "null"},
{"name": "activity_type_id",
"type": ["null", "int"],
"default": "null"},
{"name": "attributes",
"type": {
"type": "array",
"items": [
{"name": "attribute",
"type": "record",
"fields": [
{"name": "name",
"type": ["null", "string"],
"default": "null"},
{"name": "value",
"type": ["null", "string"],
"default": "null"}
]}]}
},
{"name": "campaign_id",
"type": ["null", "int"],
"default": "null"},
{"name": "id",
"type": ["null", "int"],
"default": "null"},
{"name": "lead_id",
"type": ["null", "int"],
"default": "null"},
{"name": "marketo_guid",
"type": ["null", "string"],
"default": "null"},
{"name": "primary_attribute_value",
"type": ["null", "string"],
"default": "null"},
{"name": "primary_attribute_value_id",
"type": ["null", "int"],
"default": "null"}
]
}
31 changes: 31 additions & 0 deletions schemas/campaigns.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
campaigns = {"name": "campaigns",
"type": "record",
"fields": [{"name": "active",
"type": ["null", "boolean"],
"default": "null"},
{"name": "created_at",
"type": ["null", "string"],
"default": "null"},
{"name": "description",
"type": ["null", "string"],
"default": "null"},
{"name": "name",
"type": ["null", "string"],
"default": "null"},
{"name": "program_id",
"type": ["null", "int"],
"default": "null"},
{"name": "program_name",
"type": ["null", "string"],
"default": "null"},
{"name": "type",
"type": ["null", "string"],
"default": "null"},
{"name": "updated_at",
"type": ["null", "string"],
"default": "null"},
{"name": "workspace_name",
"type": ["null", "string"],
"default": "null"}
]
}
24 changes: 24 additions & 0 deletions schemas/lead_lists.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
lead_lists = {"name": "lead_lists",
"type": "record",
"fields": [{"name": "created_at",
"type": ["null", "string"],
"default": "null"},
{"name": "description",
"type": ["null", "string"],
"default": "null"},
{"name": "id",
"type": ["null", "int"],
"default": "null"},
{"name": "name",
"type": ["null", "string"],
"default": "null"},
{"name": "program_name",
"type": ["null", "string"],
"default": "null"},
{"name": "updated_at",
"type": ["null", "string"],
"default": "null"},
{"name": "workspace_name",
"type": ["null", "string"],
"default": "null"}
]}
Loading

0 comments on commit 7bb458d

Please sign in to comment.