Skip to content

Commit

Permalink
Merge pull request data-integrations#246 from data-integrations/null-…
Browse files Browse the repository at this point in the history
…in-non-null-fix

Add option to allow null values in non-null custom fields in sObjects
  • Loading branch information
vanathi-g authored May 16, 2024
2 parents 695cfb4 + e8bfa13 commit 87c6fd9
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 10 deletions.
39 changes: 32 additions & 7 deletions src/main/java/io/cdap/plugin/salesforce/SalesforceSchemaUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,28 @@ public static String normalizeAvroName(String name) {
* @throws ConnectionException if unable to connect to Salesforce
*/
public static Schema getSchema(AuthenticatorCredentials credentials, SObjectDescriptor sObjectDescriptor)
throws ConnectionException {
return getSchema(credentials, sObjectDescriptor, false);
}

/**
* Connects to Salesforce and obtains description of sObjects needed to determine schema field types.
* Based on this information, creates schema for the fields used in sObject descriptor.
*
* @param credentials connection credentials
* @param sObjectDescriptor sObject descriptor
* @param setAllCustomFieldsNullable set all custom fields nullable by default
* @return CDAP schema
* @throws ConnectionException if unable to connect to Salesforce
*/
public static Schema getSchema(AuthenticatorCredentials credentials, SObjectDescriptor sObjectDescriptor,
boolean setAllCustomFieldsNullable)
throws ConnectionException {
PartnerConnection partnerConnection = SalesforceConnectionUtil.getPartnerConnection(credentials);
SObjectsDescribeResult describeResult = SObjectsDescribeResult.of(partnerConnection,
sObjectDescriptor.getName(), sObjectDescriptor.getFeaturedSObjects());

return getSchemaWithFields(sObjectDescriptor, describeResult);
return getSchemaWithFields(sObjectDescriptor, describeResult, setAllCustomFieldsNullable);
}

/**
Expand Down Expand Up @@ -190,12 +206,20 @@ public static void checkCompatibility(Schema actualSchema, Schema providedSchema

public static Schema getSchemaWithFields(SObjectDescriptor sObjectDescriptor,
SObjectsDescribeResult describeResult) {
return getSchemaWithFields(sObjectDescriptor, describeResult, Collections.emptyList());
return getSchemaWithFields(sObjectDescriptor, describeResult, Collections.emptyList(), false);
}

public static Schema getSchemaWithFields(SObjectDescriptor sObjectDescriptor,
SObjectsDescribeResult describeResult,
boolean setAllCustomFieldsNullable) {
return getSchemaWithFields(sObjectDescriptor, describeResult,
Collections.emptyList(), setAllCustomFieldsNullable);
}

public static Schema getSchemaWithFields(SObjectDescriptor sObjectDescriptor,
SObjectsDescribeResult describeResult,
List<String> topLevelParents) {
List<String> topLevelParents,
boolean setAllCustomFieldsNullable) {
List<Schema.Field> schemaFields = new ArrayList<>();

for (SObjectDescriptor.FieldDescriptor fieldDescriptor : sObjectDescriptor.getFields()) {
Expand All @@ -214,7 +238,7 @@ public static Schema getSchemaWithFields(SObjectDescriptor sObjectDescriptor,
fieldDescriptor.getFullName(), parentsPath));
}

fieldSchema = createFieldSchema(field, fieldDescriptor.hasParents());
fieldSchema = createFieldSchema(field, fieldDescriptor.hasParents(), setAllCustomFieldsNullable);
}

Schema queryFieldSchema = functionType.getSchema(fieldSchema);
Expand Down Expand Up @@ -250,7 +274,7 @@ public static Schema getSchemaWithFields(SObjectDescriptor sObjectDescriptor,
*/
for (SObjectDescriptor childSObject : sObjectDescriptor.getChildSObjects()) {
Schema childSchema = getSchemaWithFields(childSObject, describeResult,
Collections.singletonList(sObjectDescriptor.getName()));
Collections.singletonList(sObjectDescriptor.getName()), setAllCustomFieldsNullable);

String childName = normalizeAvroName(childSObject.getName());
Schema.Field childField = Schema.Field.of(childName,
Expand All @@ -264,8 +288,9 @@ public static Schema getSchemaWithFields(SObjectDescriptor sObjectDescriptor,

// Setting all the child columns as Nullable as in child object these fields can be mandatory but its reference
// object in parent class can be null.
private static Schema createFieldSchema(Field field, boolean isChild) {
private static Schema createFieldSchema(Field field, boolean isChild, boolean setAllCustomFieldsNullable) {
Schema fieldSchema = SALESFORCE_TYPE_TO_CDAP_SCHEMA.getOrDefault(field.getType(), DEFAULT_SCHEMA);
return field.isNillable() || isChild ? Schema.nullableOf(fieldSchema) : fieldSchema;
return field.isNillable() || isChild || (setAllCustomFieldsNullable && field.isCustom()) ?
Schema.nullableOf(fieldSchema) : fieldSchema;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -190,20 +190,32 @@ public void transform(KeyValue<Schema, Map<String, String>> input,
}

/**
* Get Salesforce schema by query.
* Get Salesforce schema by query
*
* @param config Salesforce Source Batch config
* @return schema calculated from query
*/
public static Schema getSchema(SalesforceSourceConfig config, OAuthInfo oAuthInfo) {
return getSchema(config, oAuthInfo, false);
}

/**
* Get Salesforce schema by query, with the option to allow null values in non-nullable custom fields
*
* @param config Salesforce Source Batch config
* @param setAllCustomFieldsNullable set all custom fields nullable by default
* @return schema calculated from query
*/
public static Schema getSchema(SalesforceSourceConfig config, OAuthInfo oAuthInfo,
boolean setAllCustomFieldsNullable) {
String query = config.getQuery(System.currentTimeMillis(), oAuthInfo);
SObjectDescriptor sObjectDescriptor = SObjectDescriptor.fromQuery(query);
try {
AuthenticatorCredentials credentials = new AuthenticatorCredentials(oAuthInfo,
config.getConnection().getConnectTimeout(),
config.getConnection().getReadTimeout(),
config.getConnection().getProxyUrl());
return SalesforceSchemaUtil.getSchema(credentials, sObjectDescriptor);
return SalesforceSchemaUtil.getSchema(credentials, sObjectDescriptor, setAllCustomFieldsNullable);
} catch (ConnectionException e) {
String errorMessage = SalesforceConnectionUtil.getSalesforceErrorMessageFromException(e);
throw new RuntimeException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package io.cdap.plugin.salesforce;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.sforce.soap.partner.Field;
import com.sforce.soap.partner.FieldType;
import io.cdap.cdap.api.data.format.StructuredRecord;
Expand Down Expand Up @@ -166,6 +168,52 @@ public void testGetSchemaWithFields() {
new org.apache.avro.Schema.Parser().parse(actualSchema.toString());
}

@Test
public void testSchemaWithSetAllCustomFieldsNullable() {
String objectName = "CustomTable";

List<SObjectDescriptor.FieldDescriptor> fieldDescriptors = Stream
.of("Name", "Value", "CreatedDate")
.map(name -> getFieldWithType(name, FieldType.anyType, false))
.map(SObjectDescriptor.FieldDescriptor::new)
.collect(Collectors.toList());
fieldDescriptors.add(new SObjectDescriptor.FieldDescriptor(
Collections.singletonList("Name"), null, SalesforceFunctionType.NONE));
fieldDescriptors.add(new SObjectDescriptor.FieldDescriptor(
Collections.singletonList("Value"), null, SalesforceFunctionType.NONE));
fieldDescriptors.add(new SObjectDescriptor.FieldDescriptor(
Collections.singletonList("CreatedDate"), null, SalesforceFunctionType.NONE));
SObjectDescriptor sObjectDescriptor = new SObjectDescriptor(objectName, fieldDescriptors, ImmutableList.of());

Map<String, Field> objectFields = new LinkedHashMap<>();
objectFields.put("Name", getCustomFieldWithType("Name", FieldType.string, false));
objectFields.put("Value", getCustomFieldWithType("Value", FieldType.currency, false));
objectFields.put("CreatedDate", getCustomFieldWithType("CreatedDate", FieldType.date, false));
SObjectsDescribeResult describeResult = SObjectsDescribeResult.of(ImmutableMap.of(objectName, objectFields));

// Testing case with flag setAllCustomFieldsNullable = true
Schema expectedSchema = Schema.recordOf("output",
Schema.Field.of("Name", Schema.nullableOf(Schema.of(Schema.Type.STRING))),
Schema.Field.of("Value", Schema.nullableOf(Schema.of(Schema.Type.DOUBLE))),
Schema.Field.of("CreatedDate", Schema.nullableOf(Schema.of(Schema.LogicalType.DATE)))
);

Schema actualSchema = SalesforceSchemaUtil.getSchemaWithFields(sObjectDescriptor, describeResult, true);

Assert.assertEquals(expectedSchema, actualSchema);

// Testing case with flag setAllCustomFieldsNullable = false
expectedSchema = Schema.recordOf("output",
Schema.Field.of("Name", Schema.of(Schema.Type.STRING)),
Schema.Field.of("Value", Schema.of(Schema.Type.DOUBLE)),
Schema.Field.of("CreatedDate", Schema.of(Schema.LogicalType.DATE))
);

actualSchema = SalesforceSchemaUtil.getSchemaWithFields(sObjectDescriptor, describeResult, false);

Assert.assertEquals(expectedSchema, actualSchema);
}

@Test
public void testValidateSupportedFieldSchemas() {
Schema schema = Schema.recordOf("schema",
Expand Down Expand Up @@ -262,6 +310,13 @@ public void testCheckCompatibilityIncorrectNullability() {
SalesforceSchemaUtil.checkCompatibility(actualSchema, providedSchema);
}

private Field getCustomFieldWithType(String name, FieldType type, boolean isNillable) {
Field customField = getFieldWithType(name, type, isNillable);
customField.setCustom(true);
return customField;

}

private Field getFieldWithType(String name, FieldType type, boolean isNillable) {
Field field = new Field();
field.setName(name);
Expand All @@ -270,7 +325,7 @@ private Field getFieldWithType(String name, FieldType type, boolean isNillable)

return field;
}

@Test
public void testSourceSchemaNotNullIfConnectionMacroAndImportManually() {
Schema schema = Schema.recordOf("output",
Expand Down

0 comments on commit 87c6fd9

Please sign in to comment.