Skip to content

Commit

Permalink
Merge pull request #49 from cloudsufi/patch/err_71005
Browse files Browse the repository at this point in the history
[PLUGIN-1773] Add a check for error 71005
  • Loading branch information
psainics committed Apr 4, 2024
2 parents 880b960 + 722b581 commit da72150
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 7 deletions.
15 changes: 12 additions & 3 deletions src/main/java/io/cdap/plugin/sfmc/sink/DataExtensionClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,22 @@
import com.exacttarget.fuelsdk.internal.UpdateRequest;
import com.exacttarget.fuelsdk.internal.UpdateResponse;
import com.exacttarget.fuelsdk.internal.UpdateResult;
import com.google.common.collect.ImmutableSet;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.etl.api.FailureCollector;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Wrapper around an ETClient that understands objects at the level that the plugin cares about.
*/
public class DataExtensionClient {
private static final Set<Integer> PK_ERROR_CODES = ImmutableSet.of(2, 71005);
private final ETClient client;
private final String dataExtensionKey;

Expand Down Expand Up @@ -204,9 +207,7 @@ public List<ETResult<ETDataExtensionRow>> upsert(List<ETDataExtensionRow> rows)

List<ETDataExtensionRow> toUpdate = new ArrayList<>();
for (ETResult<ETDataExtensionRow> row : inserts.getResults()) {
// super hacky to check the error message... but there is no better way
if (row.getStatus() == ETResult.Status.ERROR && row.getErrorCode() == 2 && row.getErrorMessage() != null &&
row.getErrorMessage().toLowerCase().contains("primary key")) {
if (isPrimaryKeyError(row)) {
ETDataExtensionRow failed = row.getObject();
ETDataExtensionRow copy = new ETDataExtensionRow();
copy.setDataExtensionKey(failed.getDataExtensionKey());
Expand All @@ -226,6 +227,14 @@ public List<ETResult<ETDataExtensionRow>> upsert(List<ETDataExtensionRow> rows)
return result;
}

boolean isPrimaryKeyError(ETResult<ETDataExtensionRow> row) {
// super hacky to check the error message... but there is no better way
// error code 2 and 71005 are "Primary key violation"
return row.getStatus() == ETResult.Status.ERROR && row.getErrorCode() != null &&
PK_ERROR_CODES.contains(row.getErrorCode()) && row.getErrorMessage() != null &&
row.getErrorMessage().toLowerCase().contains("primary key");
}

private <T> T call(SFMCCall<T> callable) throws ETSdkException {
ClassLoader oldClassloader = Thread.currentThread().getContextClassLoader();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,5 +277,18 @@ public void testUpsert() throws Exception {
DataExtensionClient dataExtensionClient = Mockito.spy(new DataExtensionClient(client, dataExtensionKey));
Assert.assertNotNull(dataExtensionClient.upsert(row));
}

@Test
public void testIsPrimaryKeyError() {
ETResult<ETDataExtensionRow> mockRow = Mockito.mock(ETResult.class);
Mockito.when(mockRow.getStatus()).thenReturn(ETResult.Status.ERROR);
Mockito.when(mockRow.getErrorCode()).thenReturn(71005);
Mockito.when(mockRow.getErrorMessage()).thenReturn("This is Primary Key Violation on colum abc");
ETClient client = Mockito.mock(ETClient.class);
String dataExtensionKey = "DE";
DataExtensionClient dataExtensionClient = new DataExtensionClient(client, dataExtensionKey);
boolean result = dataExtensionClient.isPrimaryKeyError(mockRow);
Assert.assertTrue(result);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class MarketingCloudSourceTest {
private static final String SOAP_ENDPOINT = "soapEndPoint";
private MarketingCloudSource marketingCloudSource;
private MarketingCloudSourceConfig marketingCloudSourceConfig;
private MarketingCloudClient client;

@Before
public void initialize() throws ETSdkException {
Expand All @@ -83,6 +84,16 @@ public void initialize() throws ETSdkException {
.setObjectList(null)
.build();
marketingCloudSource = new MarketingCloudSource(marketingCloudSourceConfig);
client = PowerMockito.mock(MarketingCloudClient.class);
ETResponse<ETDataExtension> etResponse = Mockito.mock(ETResponse.class);
List<ETDataExtension> etDataExtensions = new ArrayList<>();
ETDataExtension etDataExtension = new ETDataExtension();
etDataExtension.setKey("DE");
etDataExtension.setName("DE");
etDataExtensions.add(etDataExtension);
List<? extends ETApiObject> etApiObjects = etDataExtensions;
PowerMockito.when(client.retrieveDataExtensionKeys()).thenReturn(etResponse);
Mockito.doReturn(etApiObjects).when(etResponse).getObjects();
}

@Test
Expand Down Expand Up @@ -113,7 +124,6 @@ public void testConfigurePipeline() throws Exception {
list.add(sObjectInfo);
MockFailureCollector mockFailureCollector = new MockFailureCollector();
MockPipelineConfigurer mockPipelineConfigurer = new MockPipelineConfigurer(null, plugins);
MarketingCloudClient client = PowerMockito.mock(MarketingCloudClient.class);
ETClient etClient = PowerMockito.mock(ETClient.class);
PowerMockito.whenNew(ETClient.class).withArguments(Mockito.anyString()).thenReturn(etClient);
PowerMockito.whenNew(MarketingCloudClient.class).withArguments(Mockito.any()).thenReturn(client);
Expand Down Expand Up @@ -163,7 +173,6 @@ public void testPrepareRun() throws Exception {
columns.add(column);
MarketingCloudObjectInfo sObjectInfo = Mockito.mock(MarketingCloudObjectInfo.class);
list.add(sObjectInfo);
MarketingCloudClient client = PowerMockito.mock(MarketingCloudClient.class);
ETClient etClient = PowerMockito.mock(ETClient.class);
PowerMockito.whenNew(ETClient.class).withArguments(Mockito.anyString()).thenReturn(etClient);
PowerMockito.whenNew(MarketingCloudClient.class).withArguments(Mockito.any()).thenReturn(client);
Expand Down Expand Up @@ -199,7 +208,6 @@ public void testConfigurePipelineWMapReduce() throws Exception {
list.add(sObjectInfo);
MockFailureCollector mockFailureCollector = new MockFailureCollector();
MockPipelineConfigurer mockPipelineConfigurer = new MockPipelineConfigurer(null, plugins);
MarketingCloudClient client = PowerMockito.mock(MarketingCloudClient.class);
ETClient etClient = PowerMockito.mock(ETClient.class);
PowerMockito.whenNew(ETClient.class).withArguments(Mockito.anyString()).thenReturn(etClient);
PowerMockito.whenNew(MarketingCloudClient.class).withArguments(Mockito.any()).thenReturn(client);
Expand All @@ -226,7 +234,6 @@ public void testPrepareRunMarketingObjectInfo() throws Exception {
MarketingCloudObjectInfo marketingCloudObjectInfo = new MarketingCloudObjectInfo(object, tables);
Collection<MarketingCloudObjectInfo> list = new ArrayList<>();
list.add(marketingCloudObjectInfo);
MarketingCloudClient client = PowerMockito.mock(MarketingCloudClient.class);
ETClient etClient = PowerMockito.mock(ETClient.class);
PowerMockito.whenNew(ETClient.class).withArguments(Mockito.anyString()).thenReturn(etClient);
PowerMockito.whenNew(MarketingCloudClient.class).withArguments(Mockito.any()).thenReturn(client);
Expand Down

0 comments on commit da72150

Please sign in to comment.