From 722b581cefe4139fa889b7fdfbcde06c0cfab53e Mon Sep 17 00:00:00 2001 From: psainics Date: Wed, 27 Mar 2024 13:22:47 +0530 Subject: [PATCH] Add a check for error 71005 --- .../plugin/sfmc/sink/DataExtensionClient.java | 15 ++++++++++++--- .../plugin/sfmc/sink/DataExtensionClientTest.java | 13 +++++++++++++ .../sfmc/source/MarketingCloudSourceTest.java | 15 +++++++++++---- 3 files changed, 36 insertions(+), 7 deletions(-) diff --git a/src/main/java/io/cdap/plugin/sfmc/sink/DataExtensionClient.java b/src/main/java/io/cdap/plugin/sfmc/sink/DataExtensionClient.java index d78771e..7bd0aba 100644 --- a/src/main/java/io/cdap/plugin/sfmc/sink/DataExtensionClient.java +++ b/src/main/java/io/cdap/plugin/sfmc/sink/DataExtensionClient.java @@ -38,6 +38,7 @@ import com.exacttarget.fuelsdk.internal.UpdateRequest; import com.exacttarget.fuelsdk.internal.UpdateResponse; import com.exacttarget.fuelsdk.internal.UpdateResult; +import com.google.common.collect.ImmutableSet; import io.cdap.cdap.api.data.schema.Schema; import io.cdap.cdap.etl.api.FailureCollector; @@ -45,12 +46,14 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; /** * Wrapper around an ETClient that understands objects at the level that the plugin cares about. */ public class DataExtensionClient { + private static final Set PK_ERROR_CODES = ImmutableSet.of(2, 71005); private final ETClient client; private final String dataExtensionKey; @@ -204,9 +207,7 @@ public List> upsert(List rows) List toUpdate = new ArrayList<>(); for (ETResult row : inserts.getResults()) { - // super hacky to check the error message... but there is no better way - if (row.getStatus() == ETResult.Status.ERROR && row.getErrorCode() == 2 && row.getErrorMessage() != null && - row.getErrorMessage().toLowerCase().contains("primary key")) { + if (isPrimaryKeyError(row)) { ETDataExtensionRow failed = row.getObject(); ETDataExtensionRow copy = new ETDataExtensionRow(); copy.setDataExtensionKey(failed.getDataExtensionKey()); @@ -226,6 +227,14 @@ public List> upsert(List rows) return result; } + boolean isPrimaryKeyError(ETResult row) { + // super hacky to check the error message... but there is no better way + // error code 2 and 71005 are "Primary key violation" + return row.getStatus() == ETResult.Status.ERROR && row.getErrorCode() != null && + PK_ERROR_CODES.contains(row.getErrorCode()) && row.getErrorMessage() != null && + row.getErrorMessage().toLowerCase().contains("primary key"); + } + private T call(SFMCCall callable) throws ETSdkException { ClassLoader oldClassloader = Thread.currentThread().getContextClassLoader(); try { diff --git a/src/test/java/io/cdap/plugin/sfmc/sink/DataExtensionClientTest.java b/src/test/java/io/cdap/plugin/sfmc/sink/DataExtensionClientTest.java index 89f3fe7..90f6f79 100644 --- a/src/test/java/io/cdap/plugin/sfmc/sink/DataExtensionClientTest.java +++ b/src/test/java/io/cdap/plugin/sfmc/sink/DataExtensionClientTest.java @@ -277,5 +277,18 @@ public void testUpsert() throws Exception { DataExtensionClient dataExtensionClient = Mockito.spy(new DataExtensionClient(client, dataExtensionKey)); Assert.assertNotNull(dataExtensionClient.upsert(row)); } + + @Test + public void testIsPrimaryKeyError() { + ETResult mockRow = Mockito.mock(ETResult.class); + Mockito.when(mockRow.getStatus()).thenReturn(ETResult.Status.ERROR); + Mockito.when(mockRow.getErrorCode()).thenReturn(71005); + Mockito.when(mockRow.getErrorMessage()).thenReturn("This is Primary Key Violation on colum abc"); + ETClient client = Mockito.mock(ETClient.class); + String dataExtensionKey = "DE"; + DataExtensionClient dataExtensionClient = new DataExtensionClient(client, dataExtensionKey); + boolean result = dataExtensionClient.isPrimaryKeyError(mockRow); + Assert.assertTrue(result); + } } diff --git a/src/test/java/io/cdap/plugin/sfmc/source/MarketingCloudSourceTest.java b/src/test/java/io/cdap/plugin/sfmc/source/MarketingCloudSourceTest.java index a9cf0ea..60d98e9 100644 --- a/src/test/java/io/cdap/plugin/sfmc/source/MarketingCloudSourceTest.java +++ b/src/test/java/io/cdap/plugin/sfmc/source/MarketingCloudSourceTest.java @@ -65,6 +65,7 @@ public class MarketingCloudSourceTest { private static final String SOAP_ENDPOINT = "soapEndPoint"; private MarketingCloudSource marketingCloudSource; private MarketingCloudSourceConfig marketingCloudSourceConfig; + private MarketingCloudClient client; @Before public void initialize() throws ETSdkException { @@ -83,6 +84,16 @@ public void initialize() throws ETSdkException { .setObjectList(null) .build(); marketingCloudSource = new MarketingCloudSource(marketingCloudSourceConfig); + client = PowerMockito.mock(MarketingCloudClient.class); + ETResponse etResponse = Mockito.mock(ETResponse.class); + List etDataExtensions = new ArrayList<>(); + ETDataExtension etDataExtension = new ETDataExtension(); + etDataExtension.setKey("DE"); + etDataExtension.setName("DE"); + etDataExtensions.add(etDataExtension); + List etApiObjects = etDataExtensions; + PowerMockito.when(client.retrieveDataExtensionKeys()).thenReturn(etResponse); + Mockito.doReturn(etApiObjects).when(etResponse).getObjects(); } @Test @@ -113,7 +124,6 @@ public void testConfigurePipeline() throws Exception { list.add(sObjectInfo); MockFailureCollector mockFailureCollector = new MockFailureCollector(); MockPipelineConfigurer mockPipelineConfigurer = new MockPipelineConfigurer(null, plugins); - MarketingCloudClient client = PowerMockito.mock(MarketingCloudClient.class); ETClient etClient = PowerMockito.mock(ETClient.class); PowerMockito.whenNew(ETClient.class).withArguments(Mockito.anyString()).thenReturn(etClient); PowerMockito.whenNew(MarketingCloudClient.class).withArguments(Mockito.any()).thenReturn(client); @@ -163,7 +173,6 @@ public void testPrepareRun() throws Exception { columns.add(column); MarketingCloudObjectInfo sObjectInfo = Mockito.mock(MarketingCloudObjectInfo.class); list.add(sObjectInfo); - MarketingCloudClient client = PowerMockito.mock(MarketingCloudClient.class); ETClient etClient = PowerMockito.mock(ETClient.class); PowerMockito.whenNew(ETClient.class).withArguments(Mockito.anyString()).thenReturn(etClient); PowerMockito.whenNew(MarketingCloudClient.class).withArguments(Mockito.any()).thenReturn(client); @@ -199,7 +208,6 @@ public void testConfigurePipelineWMapReduce() throws Exception { list.add(sObjectInfo); MockFailureCollector mockFailureCollector = new MockFailureCollector(); MockPipelineConfigurer mockPipelineConfigurer = new MockPipelineConfigurer(null, plugins); - MarketingCloudClient client = PowerMockito.mock(MarketingCloudClient.class); ETClient etClient = PowerMockito.mock(ETClient.class); PowerMockito.whenNew(ETClient.class).withArguments(Mockito.anyString()).thenReturn(etClient); PowerMockito.whenNew(MarketingCloudClient.class).withArguments(Mockito.any()).thenReturn(client); @@ -226,7 +234,6 @@ public void testPrepareRunMarketingObjectInfo() throws Exception { MarketingCloudObjectInfo marketingCloudObjectInfo = new MarketingCloudObjectInfo(object, tables); Collection list = new ArrayList<>(); list.add(marketingCloudObjectInfo); - MarketingCloudClient client = PowerMockito.mock(MarketingCloudClient.class); ETClient etClient = PowerMockito.mock(ETClient.class); PowerMockito.whenNew(ETClient.class).withArguments(Mockito.anyString()).thenReturn(etClient); PowerMockito.whenNew(MarketingCloudClient.class).withArguments(Mockito.any()).thenReturn(client);