Skip to content

Commit

Permalink
Create data validation for the three required information
Browse files Browse the repository at this point in the history
  • Loading branch information
butkeraites-hotglue committed Aug 30, 2024
1 parent 93a16c8 commit 49e174b
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 14 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ license = "Apache-2.0"

[tool.poetry.dependencies]
python = "<3.11,>=3.7.1"
target-hotglue = "^0.0.2"
target-hotglue = "^0.0.3"
requests = "^2.31.0"
backports-cached-property = "^1.0.2"
hotglue-models-crm = { git = "https://gitlab.com/hotglue/hotglue-models-crm.git", branch = "master"}
Expand Down
7 changes: 6 additions & 1 deletion target_salesforce_v3/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,12 @@ def url(self, endpoint=None):
return f"{instance_url}/services/data/v{self.api_version}/{endpoint}"

def validate_input(self, record: dict):
return self.unified_schema(**record).dict()
if not record:
return {}
if isinstance(record,dict):
return self.unified_schema(**record).dict()
else:
raise Exception(f"Invalid record: {record}")

def sf_fields(self, object_type=None):
if not object_type:
Expand Down
6 changes: 6 additions & 0 deletions target_salesforce_v3/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
class InvalidDealRecord(Exception):
pass


class MissingObjectInSalesforceError(Exception):
pass
40 changes: 28 additions & 12 deletions target_salesforce_v3/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
from datetime import datetime
from singer_sdk.exceptions import FatalAPIError, RetriableAPIError

from target_salesforce_v3.exceptions import InvalidDealRecord, MissingObjectInSalesforceError


class MissingObjectInSalesforceError(Exception):
pass


class ContactsSink(SalesforceV3Sink):
Expand Down Expand Up @@ -391,23 +391,39 @@ class DealsSink(SalesforceV3Sink):
def reference_data(self):
params = {"q": "SELECT id, name from Account"}
response = self.request_api("GET", endpoint="query", params=params)
response = response.json()["records"]
response = response.json().get("records",{})
return [{k: v for k, v in r.items() if k in ["Id", "Name"]} for r in response]

def preprocess_record(self, record, context):
try:
has_name = record.get("title")
has_stage_name = (record.get("pipeline_stage_id") or record.get("status"))
has_close_date = record.get("close_date")
if not (has_name and has_stage_name and has_close_date):
raise InvalidDealRecord(
f'The record does not contain all the necessary fields for creating a new Opportunity: '
f'Name (title: {has_name}), '
f'StageName (pipeline_stage_id: {record.get("pipeline_stage_id")} or status: {record.get("status")}), '
f'CloseDate (close_date: {has_close_date})'
)
except Exception as exc:
return {"error": repr(exc)}
if isinstance(record.get("custom_fields"), str):
record["custom_fields"] = json.loads(record["custom_fields"])
try:
record["custom_fields"] = json.loads(record["custom_fields"])
except:
self.logger.info(f"custom_fields is not a valid Json document: {record['custom_fields']}")

record = self.validate_input(record)

stage = record.get("pipeline_stage_id")
if not stage:
stage = record.get("status") # fallback on field
record_stage = record.get("pipeline_stage_id")
if not record_stage:
record_stage = record.get("status") # fallback on field

stage = self.get_pickable(stage, "StageName", select_first=True)
record_stage = self.get_pickable(record_stage, "StageName", select_first=True)

type = record.get("type")
type = self.get_pickable(type, "Type")
record_type = record.get("type")
record_type = self.get_pickable(record_type, "Type")

if record.get("contact_external_id") and not record.get("contact_id"):
external_id = record["contact_external_id"]
Expand All @@ -426,10 +442,10 @@ def preprocess_record(self, record, context):

mapping = {
"Name": record.get("title"),
"StageName": stage,
"StageName": record_stage,
"CloseDate": record["close_date"].strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
"Description": record.get("description"),
"Type": type,
"Type": record_type,
"Amount": record.get("monetary_amount"),
"Probability": record.get("win_probability"),
"LeadSource": record.get("lead_source"),
Expand Down

0 comments on commit 49e174b

Please sign in to comment.