Skip to content

Commit

Permalink
Merge pull request data-integrations#165 from cloudsufi/bug-schema-de…
Browse files Browse the repository at this point in the history
…tection

[PLUGIN-1775]Changes done for schema bug while using enable quotes
  • Loading branch information
vikasrathee-cs authored Apr 19, 2024
2 parents 0497946 + d07e35e commit b0019d4
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.plugin.format.delimited.common.DataTypeDetectorStatusKeeper;
import io.cdap.plugin.format.delimited.common.DataTypeDetectorUtils;
import io.cdap.plugin.format.delimited.input.SplitQuotesIterator;
import io.cdap.plugin.http.source.batch.HttpBatchSourceConfig;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;

/**
Expand All @@ -39,7 +42,7 @@ public static Schema detectSchema(HttpBatchSourceConfig config, String delimiter
try {
for (int rowIndex = 0; rowIndex < sampleSize && rawStringPerLine.hasNext(); rowIndex++) {
line = rawStringPerLine.next();
rowValue = line.split(delimiter, -1);
rowValue = getRowValues(line, config.getEnableQuotesValues(), delimiter);
if (rowIndex == 0) {
columnNames = DataTypeDetectorUtils.setColumnNames(line, config.getCsvSkipFirstRow(),
config.getEnableQuotesValues(), delimiter);
Expand All @@ -61,4 +64,23 @@ public static Schema detectSchema(HttpBatchSourceConfig config, String delimiter
new HashMap<>(), columnNames, dataTypeDetectorStatusKeeper);
return Schema.recordOf("text", fields);
}

/**
* @param rawLine line to parse and find out the exact number of columns in a row.
* @param enableQuotedValues flag whether file can contain Quoted values.
* @param delimiter delimiter for the file
* @return Array of all the column values within the provided row.
*/
public static String[] getRowValues(String rawLine, boolean enableQuotedValues, String delimiter) {
if (!enableQuotedValues) {
return rawLine.split(delimiter, -1);
} else {
Iterator<String> splitsIterator = new SplitQuotesIterator(rawLine, delimiter, null, false);
List<String> rowValues = new ArrayList<>();
while (splitsIterator.hasNext()) {
rowValues.add(splitsIterator.next());
}
return rowValues.toArray(new String[rowValues.size()]);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class DelimitedSchemaDetectorTest {
RawStringPerLine rawStringPerLineIterator;
HttpBatchSourceConfig configSkipHeaderTrue;
HttpBatchSourceConfig configSkipHeaderFalse;
HttpBatchSourceConfig configSkipHeaderTrueAndQuotesEnabled;
String csvDelimiter = ",";
String tsvDelimiter = "\t";

Expand All @@ -46,6 +47,8 @@ public void setUp() {
MockitoAnnotations.initMocks(this);
configSkipHeaderTrue = HttpBatchSourceConfig.builder().setCsvSkipFirstRow("true").build();
configSkipHeaderFalse = HttpBatchSourceConfig.builder().setCsvSkipFirstRow("false").build();
configSkipHeaderTrueAndQuotesEnabled = HttpBatchSourceConfig.builder().setCsvSkipFirstRow("true")
.setEnableQuotesValues(true).build();
expectedSchemaWithHeaders = Schema.recordOf("text",
Schema.Field.of("name", Schema.of(Schema.Type.STRING)),
Schema.Field.of("age", Schema.of(Schema.Type.INT)),
Expand Down Expand Up @@ -101,4 +104,24 @@ public void testDetectSchemaTsvHeader() throws IOException {
Assert.assertEquals(expectedSchemaWithHeaders, schema);
}

@Test
public void testDetectSchemaWithQuotesEnabled() throws IOException {
String[] lines = new String[]{"name,age,isIndian,country", "\"raj,singh\",29,true,india", "rahul,30,false,"};
Mockito.when(rawStringPerLineIterator.hasNext()).thenReturn(true, true, true, false);
Mockito.when(rawStringPerLineIterator.next()).thenReturn(lines[0], lines[1], lines[2]);
Schema schema = DelimitedSchemaDetector.detectSchema(
configSkipHeaderTrueAndQuotesEnabled, csvDelimiter, rawStringPerLineIterator, null);
Assert.assertEquals(expectedSchemaWithHeaders, schema);
}

@Test
public void testDetectSchemaWithQuotesDisabled() throws IOException {
String[] lines = new String[]{"name,age,isIndian,country", "\"raj,singh\",29,true,india", "rahul,30,false,"};
Mockito.when(rawStringPerLineIterator.hasNext()).thenReturn(true, true, true, false);
Mockito.when(rawStringPerLineIterator.next()).thenReturn(lines[0], lines[1], lines[2]);
Schema schema = DelimitedSchemaDetector.detectSchema(
configSkipHeaderTrue, csvDelimiter, rawStringPerLineIterator, null);
Assert.assertNotEquals(expectedSchemaWithHeaders, schema);
}

}

0 comments on commit b0019d4

Please sign in to comment.