Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multiple stream sync #30

Merged
merged 2 commits into from
Oct 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,77 @@ Here is the default value:
When you define http_headers config value, the default value is nullified.
So you should redefine "User-Agent" and "Content-type" when you need them.

## Multiple streams

tap-rest-api suports settings for multiple streams.

- `url` is set as string for default value.
- `urls` is a dictionary to overwrite the default `url` for the specified stream ID given as the dictionary key
- `{stream}` can be used as parameter in URL.
- `timestamp_key`, `datetime_key`, `index_key` can be set either as string or dictionary. If a stream ID exists in the dictionary key in one of the items, it will be used. If not, the key defaults a string defined one with priotiry (timestamp_key > datetime_key > index_key.
- Active streams must be defined as a comma separated stream IDs either in the config file or in the command `--stream <streams>`
- Streams must be registered in catalog file with `selected: true` ([example](https://github.com/anelendata/tap-rest-api/blob/master/examples/usgs/catalog/earthquakes.json))

Here is an example for [Chargify API](https://developers.chargify.com/docs/api-docs)

```
{
"url": "https://{{ subdomain }}.chargify.com/{stream}.json?direction=asc&per_page={items_per_page}&page={current_page_one_base}&date_field={datetime_key}&start_datetime={start_datetime}",
"urls": {
"events": "https://{{ subdomain }}.chargify.com/events.json?direction=asc&per_page={items_per_page}&page={current_page_one_base}&date_field=created_at&since_id={start_index}",
"price_points": "https://{{ subdomain }}.chargify.com/products_price_points.json?direction=asc&per_page={items_per_page}&page={current_page_one_base}&filter[date_field]=updated_at&filter[start_datetime]={start_datetime}&filter[end_datetime]={end_datetime}",
"segments": "https://{{ subdomain }}.chargify.com/components/{{ component_id }}/price_points/{{ price_point_id }}/segments.json?per_page={items_per_page}&page={current_page_one_base}",
"statements": "https://{{ subdomain }}.chargify.com/statements.json?direction=asc&per_page={items_per_page}&page={current_page_one_base}&sort=created_at",
"transactions": "https://{{ subdomain }}.chargify.com/transactions.json?direction=asc&per_page={items_per_page}&page={current_page_one_base}&since_id={start_index}&order_by=id",
"customers_meta": "https://{{ subdomain }}.chargify.com/customers/metadata.json?direction=asc&date_field=updated_at&per_page={items_per_page}&page={current_page_one_base}&with_deleted=true&start_datetime={start_datetime}&end_datetime={end_datetime}",
"subscriptions_meta": "https://{{ subdomain }}.chargify.com/subscriptions/metadata.json?direction=asc&date_field=updated_at&per_page={items_per_page}&page={current_page_one_base}&with_deleted=true&start_datetime={start_datetime}&end_datetime={end_datetime}"
},
"streams": "components,coupons,customers,events,invoices,price_points,products,product_families,subscriptions,subscriptions_components,transactions",
"auth_method": "basic",
"username": "{{ api_key }}",
"password": "x",
"record_list_level": {
"customers_meta": "$.metadata[*]",
"invoices": "$.invoices[*]",
"price_points": "$.price_points[*]",
"segments": "$.segments[*]",
"subscriptions_components": "$.subscriptions_components[*]",
"subscriptions_meta": "$.metadata[*]"
},
"record_level": {
"components": "$.component",
"coupons": "$.coupon",
"customers": "$.customer",
"events": "$.event",
"product_families": "$.product_family",
"products": "$.product",
"statements": "$.statement",
"subscriptions": "$.subscription",
"transactions": "$.transaction"
},
"datetime_key": {
"components": "updated_at",
"coupons": "updated_at",
"customers": "updated_at",
"invoices": "updated_at",
"price_points": "updated_at",
"product_families": "updated_at",
"products": "updated_at",
"subscriptions": "updated_at",
"subscriptions_components": "updated_at"
},
"index_key": {
"events": "id",
"transactions": "id",
"segments": "id",
"statements": "id",
"customers_meta": "id",
"subscriptions_meta": "id"
},
"items_per_page": 200
}
```

## State

This tap emits [state](https://github.com/singer-io/getting-started/blob/master/docs/CONFIG_AND_STATE.md#state-file).
Expand Down
13 changes: 12 additions & 1 deletion tap_rest_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,20 @@ def parse_args(spec_file, required_config_keys):

# Capture additional args
for arg in SPEC["args"].keys():
type_list = SPEC["args"][arg]["type"]
type_ = None
if isinstance(type_list, list):
for t in type_list:
if t.lower() in ["object", "array"]:
continue
type_ = t
else:
type_ = type_list
if not type_:
raise Exception("Config spec exception at {arg}")
parser.add_argument(
"--" + arg,
type=TYPES[SPEC["args"][arg]["type"]],
type=TYPES[type_],
default=SPEC["args"][arg].get("default"),
help=SPEC["args"][arg].get("help"),
required=SPEC["args"][arg].get("required", False))
Expand Down
12 changes: 6 additions & 6 deletions tap_rest_api/default_spec.json
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@

"timestamp_key":
{
"type": "string",
"type": ["string", "object"],
"default": null,
"help": "POSIX timestamp key(column) name. If this is not null, timestamp_key is ignored"
"help": "POSIX timestamp key(column) name. If this is not null, timestamp_key is ignored. Use dictionary to specify per stream."
},
"start_timestamp":
{
Expand All @@ -88,9 +88,9 @@

"datetime_key":
{
"type": "string",
"type": ["string", "object"],
"default": null,
"help": "Datetime key(column) name. If this is not null, timestamp_key is ignored"
"help": "Datetime key(column) name. If this is not null, timestamp_key is ignored. Use dictionary to specify per stream."
},
"start_datetime":
{
Expand All @@ -107,9 +107,9 @@

"index_key":
{
"type": "string",
"type": ["string", "object"],
"default": null,
"help": "Index key (column) name"
"help": "Index key (column) name. Use dictionary to specify per stream."
},
"start_index":
{
Expand Down
149 changes: 101 additions & 48 deletions tap_rest_api/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,11 @@ def parse_datetime_tz(datetime_str, default_tz_offset=0):
def human_readable(bookmark_type, t):
readable = t
if t is not None and bookmark_type == "timestamp":
readable = str(t) + " (" + str(
datetime.datetime.fromtimestamp(t)) + ")"
try:
ds = datetime.datetime.fromtimestamp(t)
except:
raise Exception("bookmark type is set to timestamp, but the value {t} isn't timestamp")
readable = f"{str(t)} ({str(ds)})"
return readable


Expand Down Expand Up @@ -82,23 +85,50 @@ def get_record_list(raw_data, record_list_level):
return data


def get_bookmark_type(config):
if config.get("timestamp_key"):
return "timestamp"
if config.get("datetime_key"):
return "datetime"
if config.get("index_key"):
return "index"
def get_bookmark_type_and_key(config, stream):
"""
If config value timestamp_key, datetime_key, or index_key is a dictionary
and has value for the stream, it will be prioritized.
Otherwise, timestamp_key > datetime_key > index_key
"""
bm_type = None
bm_key = None
ts_keys = config.get("timestamp_key")
dt_keys = config.get("datetime_key")
i_keys = config.get("index_key")

if ts_keys:
if isinstance(ts_keys, dict) and ts_keys.get(stream):
return "timestamp", ts_keys.get(stream)
elif isinstance(ts_keys, str):
bm_type = "timestamp"
bm_key = ts_keys
if dt_keys:
if isinstance(dt_keys, dict) and dt_keys.get(stream):
return "datetime", dt_keys.get(stream)
elif isinstance(dt_keys, str) and bm_type is None:
bm_type = "datetime"
bm_key = dt_keys
if i_keys:
if isinstance(i_keys, dict) and i_keys.get(stream):
return "index", i_keys.get(stream)
elif isinstance(i_keys, str) and bm_type is None:
bm_type = "index"
bm_key = i_keys

if bm_type and bm_key:
return bm_type, bm_key

raise KeyError("You need to set timestamp_key, datetime_key, or index_key")


def get_streams_to_sync(streams, state):
'''Get the streams to sync'''
current_stream = singer.get_currently_syncing(state)
result = streams
result = dict(streams)

if current_stream:
for key in result.keys():
for key in streams.keys():
if result[key].tap_stream_id != current_stream:
result.pop(key, None)

Expand Down Expand Up @@ -133,8 +163,9 @@ def get_start(config, state, tap_stream_id, bookmark_key):
up when timestamp_key is set but start_timestamp is not set.
"""
current_bookmark = singer.get_bookmark(state, tap_stream_id, bookmark_key)
bookmark_type, _ = get_bookmark_type_and_key(config, tap_stream_id)
if current_bookmark is None:
if config.get("timestamp_key"):
if bookmark_type == "timestamp":
if (not config.get("start_timestamp") and
not config.get("start_datetime")):
raise KeyError("timestamp_key is set but neither " +
Expand All @@ -145,38 +176,38 @@ def get_start(config, state, tap_stream_id, bookmark_key):
config["start_datetime"]).timestamp()
else:
current_bookmark = get_float_timestamp(current_bookmark)
elif config.get("datetime_key"):
elif bookmark_type == "datetime":
if not config.get("start_datetime"):
raise KeyError(
"datetime_key is set but start_datetime is not set")
current_bookmark = config.get("start_datetime")
elif config.get("index_key"):
elif bookmark_type == "index":
if config.get("start_index") is None:
raise KeyError("index_key is set but start_index is not set")
current_bookmark = config.get("start_index")

return current_bookmark


def get_end(config):
def get_end(config, tap_stream_id):
"""
For human convenience, end_datetime (more human readable) is also looked
up when timestamp_key is set but end_timestamp is not set.
"""
if config.get("timestamp_key"):
bookmark_type, _ = get_bookmark_type_and_key(config, tap_stream_id)
if bookmark_type == "timestamp":
end_from_config = config.get("end_timestamp")
if end_from_config is None:
if config.get("end_datetime") is not None:
end_from_config = dateutil.parser.parse(
config["end_datetime"]).timestamp()
else:
end_from_config = datetime.datetime.now().timestamp()
elif config.get("datetime_key"):
if config.get("end_datetime") is not None:
end_from_config = config.get("end_datetime")
else:
elif bookmark_type == "datetime":
end_from_config = config.get("end_datetime")
if not end_from_config:
end_from_config = datetime.datetime.now().isoformat()
elif config.get("index_key"):
elif bookmark_type == "index":
end_from_config = config.get("end_index")
return end_from_config

Expand All @@ -195,18 +226,19 @@ def get_float_timestamp(ts):
return value


def get_last_update(config, record, current):
def get_last_update(config, tap_stream_id, record, current):
last_update = current
if config.get("timestamp_key"):
value = _get_jsonpath(record, config["timestamp_key"])[0]
bookmark_type, bookmark_key = get_bookmark_type_and_key(config, tap_stream_id)
if bookmark_type == "timestamp":
value = _get_jsonpath(record, bookmark_key)[0]
if value:
value = get_float_timestamp(value)
if value > current:
last_update = value
else:
KeyError("timestamp_key not found in the record")
elif config.get("datetime_key"):
value = _get_jsonpath(record, config["datetime_key"])[0]
elif bookmark_type == "datetime":
value = _get_jsonpath(record, bookmark_key)[0]
if not value:
KeyError("datetime_key not found in the record")

Expand All @@ -215,8 +247,8 @@ def get_last_update(config, record, current):

if record_datetime > current_datetime:
last_update = record_datetime.isoformat()
elif config.get("index_key"):
current_index = str(_get_jsonpath(record, config["index_key"])[0])
elif bookmark_type == "index":
current_index = str(_get_jsonpath(record, bookmark_key)[0])
LOGGER.debug("Last update will be updated from %s to %s" %
(last_update, current_index))
# When index is an integer, it's dangerous to compare 9 and 10 as
Expand All @@ -242,27 +274,48 @@ def get_last_update(config, record, current):


def get_init_endpoint_params(config, state, tap_stream_id):
params = config
bookmark_type, bookmark_key = get_bookmark_type_and_key(config, tap_stream_id)
params = dict(config)
start = get_start(config, state, tap_stream_id, "last_update")
end = get_end(config)

if config.get("timestamp_key"):
params.update({"start_timestamp": start})
params.update({"end_timestamp": end})
params.update({"start_datetime":
datetime.datetime.fromtimestamp(start).isoformat()})
params.update({"end_datetime":
datetime.datetime.fromtimestamp(end).isoformat()})
elif config.get("datetime_key"):
params.update({"start_datetime": start})
params.update({"end_datetime": end})
params.update({"start_timestamp":
dateutil.parser.parse(start).timestamp()})
params.update({"end_timestamp":
dateutil.parser.parse(end).timestamp()})
elif config.get("index_key"):
params.update({"start_index": start})
params.update({"end_index": end})
end = get_end(config, tap_stream_id)
if bookmark_type == "timestamp":
start_datetime = datetime.datetime.fromtimestamp(start).isoformat()
end_datetime = datetime.datetime.fromtimestamp(end).isoformat()
params.update({
"start_timestamp": start,
"end_timestamp": end,
"start_datetime": start_datetime,
"end_datetime": end_datetime,
"start_date": start_datetime[0:10],
"end_date": end_datetime[0:10],
"timestamp_key": bookmark_key,
})
elif bookmark_type == "datetime":
params.update({
"start_timestamp": dateutil.parser.parse(start).timestamp(),
"end_timestamp": dateutil.parser.parse(end).timestamp(),
"start_datetime": start,
"end_datetime": end,
"start_date": start[0:10],
"end_date": end[0:10],
"datetime_key": bookmark_key,
})
elif bookmark_type == "index":
start_datetime = config.get("start_datetime")
start_date = start_datetime[0:10] if start_datetime else None
end_datetime = config.get("end_datetime")
if not end_datetime:
end_datetime = datetime.datetime.utcnow().isoformat()
end_date = end_datetime[0:10] if end_datetime else None
params.update({
"start_datetime": start_datetime,
"end_datetime": end_datetime,
"start_date": start_date,
"end_date": end_date,
"start_index": start,
"end_index": end,
"index_key": bookmark_key,
})

params.update(
{
Expand Down
Loading