Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support to send attachments #28

Merged
merged 3 commits into from
Dec 5, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 42 additions & 1 deletion target_salesforce_v3/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -848,6 +848,8 @@ def lookup_fields_dict(self):
@property
def name(self):
return self.stream_name

not_searchable_by_mail = ["ContentVersion"]

def get_fields_for_object(self, object_type):
"""Check if Salesforce has an object type and fetches its fields."""
Expand Down Expand Up @@ -883,6 +885,10 @@ def preprocess_record(self, record, context):
self.logger.info("Skipping record, because it was not found on Salesforce.")
return {}

# add field to link attachments
if self.name == "ContentVersion":
fields.update({"LinkedEntityId": {"createable": True}})

# keep only available fields and that are creatable or updatable
# NOTE: we need to keep relations (__r, xId)
record = {k:v for k,v in record.items() if k.endswith("__r") or fields.get(k+"Id") or (fields.get(k) and (fields[k]["createable"] or fields[k]["updateable"] or k.lower() in ["id", "externalid"]))}
Expand All @@ -907,7 +913,7 @@ def preprocess_record(self, record, context):
req = self.request_api("GET", "queryAll", params={"q": query})
req = req.json().get("records")
# lookup for record with email fields
elif self.config.get("lookup_by_email", True):
elif self.config.get("lookup_by_email", True) and self.name not in self.not_searchable_by_mail:
# Try to find object instance using email
email_fields = ["Email", "npe01__AlternateEmail__c", "npe01__HomeEmail__c", "npe01__Preferred_Email__c", "npe01__WorkEmail__c"]
email_values = [record.get(email_field) for email_field in email_fields if record.get(email_field)]
Expand Down Expand Up @@ -954,7 +960,11 @@ def upsert_record(self, record, context):
if record == {}:
self.logger.info(f"Processing record for type {self.stream_name} failed. Check logs.")
return

# for files pop object id to link the file
linked_object_id = record.pop("LinkedEntityId", None) if self.name == "ContentVersion" else None

# get object fields
fields_desc = self.sf_fields_description(object_type=object_type)

possible_update_fields = []
Expand Down Expand Up @@ -999,6 +1009,7 @@ def upsert_record(self, record, context):
return object_id, True, state_updates

id = response.json().get("id")
self.link_attachment_to_object(id, linked_object_id)
self.logger.info(f"{object_type} updated with id: {id}")
return id, True, state_updates
except Exception as e:
Expand All @@ -1011,6 +1022,7 @@ def upsert_record(self, record, context):
response = self.request_api("PATCH", endpoint=url, request_data={k: record[k] for k in set(list(record.keys())) - set([id_field])})
id = response.json().get("id")
self.logger.info(f"{object_type} updated with id: {id}")
self.link_attachment_to_object(id, linked_object_id)
return id, True, state_updates
except Exception as e:
self.logger.exception(f"Could not PATCH to {url}: {e}")
Expand All @@ -1022,6 +1034,7 @@ def upsert_record(self, record, context):
response = self.request_api("POST", endpoint=endpoint, request_data=record)
id = response.json().get("id")
self.logger.info(f"{object_type} created with id: {id}")
self.link_attachment_to_object(id, linked_object_id)
return id, True, state_updates
except Exception as e:
if "INVALID_FIELD_FOR_INSERT_UPDATE" in str(e):
Expand All @@ -1047,3 +1060,31 @@ def upsert_record(self, record, context):
self.logger.exception(f"Error encountered while creating {object_type}")
raise e


def link_attachment_to_object(self, file_id, linked_object_id):
if self.name == "ContentVersion" and not linked_object_id:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if this is called when linked_object_id is None and self.name != "ContentVersion"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed, thank you!

self.logger.info(f"Object id not found to link file with id {file_id}")
return

try:
# get contentdocumentid
content_endpoint = "query"
params = {"q": f"SELECT ContentDocumentId FROM ContentVersion WHERE Id = '{file_id}'"}
content_document_id = self.request_api("GET", endpoint=content_endpoint, params=params)
content_document_id = content_document_id.json()
if content_document_id.get("records"):
content_document_id = content_document_id["records"][0]["ContentDocumentId"]
else:
raise Exception(f"Failed while trying to link file {file_id} and object {linked_object_id} because ContentDocumentId was not found")

endpoint = "sobjects/ContentDocumentLink"
record = {
"ContentDocumentId": content_document_id,
"LinkedEntityId": linked_object_id,
"ShareType": "V"
}
response = self.request_api("POST", endpoint=endpoint, request_data=record)
self.logger.info(f"File with id {file_id} succesfully linked to object with id {linked_object_id}. Link id {response.json()['id']}")
except Exception as e:
self.logger.info(f"Failed while trying to link file {file_id} and object {linked_object_id}")
raise e
Loading