From e945c3b63bd64e6cbbf9697d26a6a755ce742a59 Mon Sep 17 00:00:00 2001 From: Vinicius Mesel Date: Fri, 17 Nov 2023 16:48:19 -0300 Subject: [PATCH] Adds support for field_meta/stream_meta on the catalog.json (#6) --------- Co-authored-by: Hassan Syyid --- tap_hubspot_beta/client_base.py | 21 +++++++++++++++++++++ tap_hubspot_beta/streams.py | 7 +++++-- tap_hubspot_beta/tap.py | 18 ++++++++++++++++++ 3 files changed, 44 insertions(+), 2 deletions(-) diff --git a/tap_hubspot_beta/client_base.py b/tap_hubspot_beta/client_base.py index c915b90..d64411a 100644 --- a/tap_hubspot_beta/client_base.py +++ b/tap_hubspot_beta/client_base.py @@ -28,6 +28,27 @@ class hubspotStream(RESTStream): properties_url = None page_size = 100 + stream_metadata = {} + fields_metadata = {} + object_type = None + fields_metadata = {} + + def load_fields_metadata(self): + if not self.properties_url: + self.logger.info(f"Skipping fields_meta for {self.name} stream, because there is no properties_url set") + return + + req = requests.get( + f"{self.url_base}{self.properties_url}", + headers = self.authenticator.auth_headers or {}, + ) + + if req.status_code != 200: + self.logger.info(f"Skipping fields_meta for {self.name} stream") + return + + self.fields_metadata = {v["name"]: v for v in req.json()} + def _request( self, prepared_request: requests.PreparedRequest, context: Optional[dict] ) -> requests.Response: diff --git a/tap_hubspot_beta/streams.py b/tap_hubspot_beta/streams.py index 7c9edfa..3c8ab17 100644 --- a/tap_hubspot_beta/streams.py +++ b/tap_hubspot_beta/streams.py @@ -64,6 +64,7 @@ class EngagementStream(hubspotV1Stream): primary_keys = ["id"] replication_key = None page_size = 250 + properties_url = "properties/v2/engagements/properties" schema = th.PropertiesList( th.Property("id", th.IntegerType), @@ -543,6 +544,7 @@ class FormSubmissionsStream(hubspotV1Stream): # NOTE: There is no primary_key for this stream replication_key = "submittedAt" path = "/form-integrations/v1/submissions/forms/{form_id}" + properties_url = "properties/v2/form_submissions/properties" schema = th.PropertiesList( th.Property("form_id", th.StringType), @@ -773,6 +775,7 @@ class CompaniesStream(ObjectSearchV3): """Companies Stream""" name = "companies" + object_type = "companies" path = "crm/v3/objects/companies/search" replication_key_filter = "hs_lastmodifieddate" properties_url = "properties/v1/companies/properties" @@ -1295,7 +1298,7 @@ class PostalMailStream(ObjectSearchV3): path = "crm/v3/objects/postal_mail/search" primary_keys = ["id"] replication_key_filter = "hs_lastmodifieddate" - properties_url = "properties/v1/postal_mail/properties" + properties_url = "properties/v2/postal_mail/properties" schema = th.PropertiesList( th.Property("id", th.StringType), @@ -1320,7 +1323,7 @@ class CommunicationsStream(ObjectSearchV3): path = "crm/v3/objects/communications/search" primary_keys = ["id"] replication_key_filter = "hs_lastmodifieddate" - properties_url = "properties/v1/communications/properties" + properties_url = "properties/v2/communications/properties" schema = th.PropertiesList( th.Property("id", th.StringType), diff --git a/tap_hubspot_beta/tap.py b/tap_hubspot_beta/tap.py index e62aa0c..d96e69a 100644 --- a/tap_hubspot_beta/tap.py +++ b/tap_hubspot_beta/tap.py @@ -122,6 +122,24 @@ def discover_streams(self) -> List[Stream]: """Return a list of discovered streams.""" return [stream_class(tap=self) for stream_class in STREAM_TYPES] + @property + def catalog_dict(self) -> dict: + """Get catalog dictionary. + + Returns: + The tap's catalog as a dict + """ + catalog = super().catalog_dict + streams = self.streams + for stream in catalog["streams"]: + stream_class = streams[stream["tap_stream_id"]] + stream["stream_meta"] = {} + if hasattr(stream_class, "load_fields_metadata"): + stream_class.load_fields_metadata() + for field in stream["schema"]["properties"]: + stream["schema"]["properties"][field]["field_meta"] = stream_class.fields_metadata.get(field, {}) + return catalog + if __name__ == "__main__": Taphubspot.cli()