Skip to content

Commit

Permalink
Adds support for field_meta/stream_meta on the catalog.json (#6)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: Hassan Syyid <[email protected]>
  • Loading branch information
vmesel and hsyyid authored Nov 17, 2023
1 parent 6b23ad9 commit e945c3b
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 2 deletions.
21 changes: 21 additions & 0 deletions tap_hubspot_beta/client_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,27 @@ class hubspotStream(RESTStream):
properties_url = None
page_size = 100

stream_metadata = {}
fields_metadata = {}
object_type = None
fields_metadata = {}

def load_fields_metadata(self):
if not self.properties_url:
self.logger.info(f"Skipping fields_meta for {self.name} stream, because there is no properties_url set")
return

req = requests.get(
f"{self.url_base}{self.properties_url}",
headers = self.authenticator.auth_headers or {},
)

if req.status_code != 200:
self.logger.info(f"Skipping fields_meta for {self.name} stream")
return

self.fields_metadata = {v["name"]: v for v in req.json()}

def _request(
self, prepared_request: requests.PreparedRequest, context: Optional[dict]
) -> requests.Response:
Expand Down
7 changes: 5 additions & 2 deletions tap_hubspot_beta/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class EngagementStream(hubspotV1Stream):
primary_keys = ["id"]
replication_key = None
page_size = 250
properties_url = "properties/v2/engagements/properties"

schema = th.PropertiesList(
th.Property("id", th.IntegerType),
Expand Down Expand Up @@ -543,6 +544,7 @@ class FormSubmissionsStream(hubspotV1Stream):
# NOTE: There is no primary_key for this stream
replication_key = "submittedAt"
path = "/form-integrations/v1/submissions/forms/{form_id}"
properties_url = "properties/v2/form_submissions/properties"

schema = th.PropertiesList(
th.Property("form_id", th.StringType),
Expand Down Expand Up @@ -773,6 +775,7 @@ class CompaniesStream(ObjectSearchV3):
"""Companies Stream"""

name = "companies"
object_type = "companies"
path = "crm/v3/objects/companies/search"
replication_key_filter = "hs_lastmodifieddate"
properties_url = "properties/v1/companies/properties"
Expand Down Expand Up @@ -1295,7 +1298,7 @@ class PostalMailStream(ObjectSearchV3):
path = "crm/v3/objects/postal_mail/search"
primary_keys = ["id"]
replication_key_filter = "hs_lastmodifieddate"
properties_url = "properties/v1/postal_mail/properties"
properties_url = "properties/v2/postal_mail/properties"

schema = th.PropertiesList(
th.Property("id", th.StringType),
Expand All @@ -1320,7 +1323,7 @@ class CommunicationsStream(ObjectSearchV3):
path = "crm/v3/objects/communications/search"
primary_keys = ["id"]
replication_key_filter = "hs_lastmodifieddate"
properties_url = "properties/v1/communications/properties"
properties_url = "properties/v2/communications/properties"

schema = th.PropertiesList(
th.Property("id", th.StringType),
Expand Down
18 changes: 18 additions & 0 deletions tap_hubspot_beta/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,24 @@ def discover_streams(self) -> List[Stream]:
"""Return a list of discovered streams."""
return [stream_class(tap=self) for stream_class in STREAM_TYPES]

@property
def catalog_dict(self) -> dict:
"""Get catalog dictionary.
Returns:
The tap's catalog as a dict
"""
catalog = super().catalog_dict
streams = self.streams
for stream in catalog["streams"]:
stream_class = streams[stream["tap_stream_id"]]
stream["stream_meta"] = {}
if hasattr(stream_class, "load_fields_metadata"):
stream_class.load_fields_metadata()
for field in stream["schema"]["properties"]:
stream["schema"]["properties"][field]["field_meta"] = stream_class.fields_metadata.get(field, {})
return catalog


if __name__ == "__main__":
Taphubspot.cli()

0 comments on commit e945c3b

Please sign in to comment.