diff --git a/pom.xml b/pom.xml
index d309125..9ac0219 100644
--- a/pom.xml
+++ b/pom.xml
@@ -21,19 +21,19 @@
io.cdap.plugin
cobol-to-avro-transform
jar
- 1.2.0-SNAPSHOT
+ 1.3.0-SNAPSHOT
Cobol to Avro Transform
1.7.7
- 6.0.0-SNAPSHOT
+ 6.1.0-SNAPSHOT
18.0
3.0.7
0.4.2
1.2.3
UTF-8
- system:cdap-data-streams[6.0.0-SNAPSHOT,7.0.0-SNAPSHOT)
- system:cdap-data-pipeline[6.0.0-SNAPSHOT,7.0.0-SNAPSHOT)
+ system:cdap-data-streams[6.1.0-SNAPSHOT,7.0.0-SNAPSHOT)
+ system:cdap-data-pipeline[6.1.0-SNAPSHOT,7.0.0-SNAPSHOT)
@@ -91,6 +91,12 @@
${logback.version}
test
+
+ io.cdap.cdap
+ hydrator-test
+ ${cdap.version}
+ test
+
io.cdap.cdap
cdap-formats
@@ -125,8 +131,8 @@
maven-compiler-plugin
3.1
-
- 1.7
+
+ 1.8
@@ -137,9 +143,7 @@
<_exportcontents>
- io.cdap.plugin.*;
- com.google.common.*;
- com.legstar.*
+ io.cdap.plugin.cobol.*;
*;inline=false;scope=compile
true
diff --git a/src/main/java/io/cdap/plugin/cobol/CobolRecordConverter.java b/src/main/java/io/cdap/plugin/cobol/CobolRecordConverter.java
index d1598e7..4934646 100644
--- a/src/main/java/io/cdap/plugin/cobol/CobolRecordConverter.java
+++ b/src/main/java/io/cdap/plugin/cobol/CobolRecordConverter.java
@@ -17,31 +17,24 @@
package io.cdap.plugin.cobol;
import com.legstar.avro.cob2avro.io.AbstractZosDatumReader;
-import com.legstar.cob2xsd.Cob2XsdConfig;
import io.cdap.cdap.api.annotation.Description;
-import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.format.StructuredRecord;
-import io.cdap.cdap.api.plugin.EndpointPluginContext;
-import io.cdap.cdap.api.plugin.PluginConfig;
+import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.etl.api.Emitter;
+import io.cdap.cdap.etl.api.FailureCollector;
+import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.Transform;
import io.cdap.cdap.etl.api.TransformContext;
import io.cdap.cdap.format.StructuredRecordStringConverter;
import io.cdap.plugin.common.AvroConverter;
import io.cdap.plugin.common.StreamByteSource;
-import io.cdap.plugin.common.StreamCharSource;
-import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.Properties;
-import javax.annotation.Nullable;
/**
* {@link Transform} plugin to convert COBOL data file into StructuredRecords.
@@ -52,31 +45,37 @@
public class CobolRecordConverter extends Transform {
private static final Logger LOG = LoggerFactory.getLogger(CobolRecordConverter.class);
- private final Config config;
+ private final CobolRecordConverterConfig config;
- public CobolRecordConverter(Config config) {
+ public CobolRecordConverter(CobolRecordConverterConfig config) {
this.config = config;
}
private CopybookReader copybookReader;
- private Schema avroSchema;
- private io.cdap.cdap.api.data.schema.Schema schema;
+ private Schema schema;
@Override
public void initialize(TransformContext context) throws Exception {
super.initialize(context);
- Properties properties = new Properties();
- properties.setProperty(Cob2XsdConfig.CODE_FORMAT, config.getCodeFormat());
- StreamCharSource streamCharSource
- = new StreamCharSource(new ByteArrayInputStream(config.copybook.getBytes(StandardCharsets.UTF_8)));
- copybookReader = new CopybookReader(streamCharSource, properties);
- this.avroSchema = copybookReader.getSchema();
- this.schema = AvroConverter.fromAvroSchema(avroSchema);
+ this.copybookReader = config.getCopybookReader();
+ this.schema = config.getOutputSchemaAndValidate(copybookReader);
+ }
+
+ @Override
+ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
+ super.configurePipeline(pipelineConfigurer);
+
+ FailureCollector failureCollector = pipelineConfigurer.getStageConfigurer().getFailureCollector();
+ io.cdap.cdap.api.data.schema.Schema inputSchema = pipelineConfigurer.getStageConfigurer().getInputSchema();
+ Schema outputSchema = config.getOutputSchemaAndValidate(failureCollector, inputSchema);
+ failureCollector.getOrThrowException();
+
+ pipelineConfigurer.getStageConfigurer().setOutputSchema(outputSchema);
}
@Override
public void transform(StructuredRecord input, Emitter emitter) throws Exception {
- byte[] body = input.get(config.fieldName);
+ byte[] body = input.get(config.getContentFieldName());
StreamByteSource source = new StreamByteSource(new ByteArrayInputStream(body), body.length);
try (AbstractZosDatumReader reader = copybookReader.createRecordReader(source, config.getCharset(),
config.hasRDW())) {
@@ -86,67 +85,4 @@ public void transform(StructuredRecord input, Emitter emitter)
}
}
}
-
- class GetSchemaRequest {
- public String copybook;
- @Nullable
- public String codeFormat;
-
- private String getCodeFormat() {
- return codeFormat == null ? Cob2XsdConfig.CodeFormat.FIXED_FORMAT.name() : codeFormat;
- }
- }
-
- /**
- * Endpoint method to get the output schema given copybook.
- *
- * @param request {@link GetSchemaRequest} containing information about the cobol copybook.
- * @param pluginContext context to create plugins
- * @return schema of fields
- * @throws IOException if there are any errors converting schema
- */
- @javax.ws.rs.Path("outputSchema")
- public io.cdap.cdap.api.data.schema.Schema getSchema(GetSchemaRequest request,
- EndpointPluginContext pluginContext) throws IOException {
- Properties properties = new Properties();
- properties.setProperty(Cob2XsdConfig.CODE_FORMAT, request.getCodeFormat());
- StreamCharSource streamCharSource
- = new StreamCharSource(new ByteArrayInputStream(request.copybook.getBytes(StandardCharsets.UTF_8)));
- CopybookReader reader = new CopybookReader(streamCharSource, properties);
- Schema avroSchema = reader.getSchema();
- return AvroConverter.fromAvroSchema(avroSchema);
- }
-
- public static final class Config extends PluginConfig {
- @Description("COBOL Copybook")
- @Macro
- private String copybook;
-
- @Description("CodeFormat in the Copybook")
- @Nullable
- private String codeFormat;
-
- @Description("Charset used to read the data. Default Charset is 'IBM01140'.")
- @Nullable
- private String charset;
-
- @Description("Records start with Record Descriptor Word")
- @Nullable
- private Boolean rdw;
-
- @Description("Name of the field containing COBOL records")
- private String fieldName;
-
- public String getCodeFormat() {
- return codeFormat == null ? Cob2XsdConfig.CodeFormat.FIXED_FORMAT.name() : codeFormat;
- }
-
- public String getCharset() {
- return charset == null ? "IBM01140" : charset;
- }
-
- public boolean hasRDW() {
- return rdw == null ? true : rdw;
- }
- }
}
diff --git a/src/main/java/io/cdap/plugin/cobol/CobolRecordConverterConfig.java b/src/main/java/io/cdap/plugin/cobol/CobolRecordConverterConfig.java
new file mode 100644
index 0000000..b44f267
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/cobol/CobolRecordConverterConfig.java
@@ -0,0 +1,221 @@
+/*
+ * Copyright © 2017-2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.cobol;
+
+import com.legstar.cob2xsd.Cob2XsdConfig;
+import io.cdap.cdap.api.annotation.Description;
+import io.cdap.cdap.api.annotation.Macro;
+import io.cdap.cdap.api.annotation.Name;
+import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.cdap.api.plugin.PluginConfig;
+import io.cdap.cdap.etl.api.FailureCollector;
+import io.cdap.plugin.common.AvroConverter;
+import io.cdap.plugin.common.StreamCharSource;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Properties;
+import javax.annotation.Nullable;
+
+public class CobolRecordConverterConfig extends PluginConfig {
+ public static final String PROPERTY_COPYBOOK = "copybook";
+ public static final String PROPERTY_CONTENT_FIELD_NAME = "contentFieldName";
+ public static final String PROPERTY_CHARSET = "charset";
+
+ @Name(PROPERTY_COPYBOOK)
+ @Description("COBOL Copybook")
+ @Macro
+ private final String copybook;
+
+ @Description("CodeFormat in the Copybook")
+ @Nullable
+ private final String codeFormat;
+
+ @Name(PROPERTY_CHARSET)
+ @Description("Charset used to read the data. Default Charset is 'IBM01140'.")
+ @Nullable
+ private final String charset;
+
+ @Description("Records start with Record Descriptor Word")
+ @Nullable
+ private final Boolean rdw;
+
+ @Name(PROPERTY_CONTENT_FIELD_NAME)
+ @Description("Name of the field containing COBOL records")
+ private final String contentFieldName;
+
+ public CobolRecordConverterConfig(String copybook, @Nullable String codeFormat, @Nullable String charset,
+ @Nullable Boolean rdw, String contentFieldName) {
+ this.copybook = copybook;
+ this.codeFormat = codeFormat;
+ this.charset = charset;
+ this.rdw = rdw;
+ this.contentFieldName = contentFieldName;
+ }
+
+ private CobolRecordConverterConfig(Builder builder) {
+ this.copybook = builder.copybook;
+ this.codeFormat = builder.codeFormat;
+ this.charset = builder.charset;
+ this.rdw = builder.rdw;
+ this.contentFieldName = builder.contentFieldName;
+ }
+
+ public String getCopybook() {
+ return copybook;
+ }
+
+ public String getCodeFormat() {
+ return codeFormat == null ? Cob2XsdConfig.CodeFormat.FIXED_FORMAT.name() : codeFormat;
+ }
+
+ public String getCharset() {
+ return charset == null ? "IBM01140" : charset;
+ }
+
+ public boolean hasRDW() {
+ return rdw == null ? true : rdw;
+ }
+
+ public String getContentFieldName() {
+ return contentFieldName;
+ }
+
+ public byte[] getCopybookBytes() {
+ return copybook.getBytes(StandardCharsets.UTF_8);
+ }
+
+ public Schema getOutputSchemaAndValidate(FailureCollector failureCollector, Schema inputSchema) {
+ Schema.Field contentField = inputSchema.getField(contentFieldName);
+ if (contentField == null) {
+ failureCollector.addFailure(String.format("Field '%s' is not present in input schema.", contentFieldName),
+ null).withConfigProperty(PROPERTY_CONTENT_FIELD_NAME)
+ .withInputSchemaField(PROPERTY_CONTENT_FIELD_NAME, null);
+ } else {
+ Schema contentFieldSchema = contentField.getSchema();
+
+ if (contentFieldSchema.isNullable()) {
+ contentFieldSchema = contentFieldSchema.getNonNullable();
+ }
+
+ if (contentFieldSchema.getLogicalType() != null || contentFieldSchema.getType() != Schema.Type.BYTES) {
+ failureCollector.addFailure(String.format("Field '%s' must be of type 'bytes' but is of type '%s'.",
+ contentField.getName(), contentFieldSchema.getDisplayName()),
+ null).withConfigProperty(PROPERTY_CONTENT_FIELD_NAME)
+ .withInputSchemaField(PROPERTY_CONTENT_FIELD_NAME, null);
+ }
+ }
+
+ if (!Charset.isSupported(getCharset())) {
+ failureCollector.addFailure(String.format("The charset name '%s' is not supported by your java environment.",
+ getCharset()),
+ "Make sure you have lib/charsets.jar in your jre.")
+ .withConfigProperty(PROPERTY_CHARSET);
+ // if above failed, we cannot proceed to copybook parsing.
+ throw failureCollector.getOrThrowException();
+ }
+
+ CopybookReader copybookReader;
+ try {
+ copybookReader = getCopybookReader();
+ } catch(Exception ex) {
+ failureCollector.addFailure(String.format("Error while reading copybook: '%s'", ex.getMessage()),
+ "Please make sure it has correct format")
+ .withConfigProperty(PROPERTY_COPYBOOK)
+ .withStacktrace(ex.getStackTrace());
+ throw failureCollector.getOrThrowException();
+ }
+
+ try {
+ return getOutputSchemaAndValidate(copybookReader);
+ } catch(Exception ex) {
+ failureCollector.addFailure(String.format("Error while generating schema from the copybook: '%s'",
+ ex.getMessage()), null)
+ .withConfigProperty(PROPERTY_COPYBOOK)
+ .withStacktrace(ex.getStackTrace());
+ throw failureCollector.getOrThrowException();
+ }
+ }
+
+ public CopybookReader getCopybookReader() throws IOException {
+ Properties properties = new Properties();
+ properties.setProperty(Cob2XsdConfig.CODE_FORMAT, getCodeFormat());
+ StreamCharSource streamCharSource
+ = new StreamCharSource(new ByteArrayInputStream(getCopybookBytes()));
+ return new CopybookReader(streamCharSource, properties);
+ }
+
+ public Schema getOutputSchemaAndValidate(CopybookReader copybookReader) {
+ org.apache.avro.Schema avroSchema = copybookReader.getSchema();
+ return AvroConverter.fromAvroSchema(avroSchema);
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static Builder builder(CobolRecordConverterConfig copy) {
+ return new Builder()
+ .setCopybook(copy.getCopybook())
+ .setCodeFormat(copy.getCodeFormat())
+ .setCharset(copy.getCharset())
+ .setRdw(copy.hasRDW())
+ .setContentFieldName(copy.getContentFieldName());
+ }
+
+ public static final class Builder {
+ private String copybook;
+ private String codeFormat;
+ private String charset;
+ private Boolean rdw;
+ private String contentFieldName;
+
+ public Builder setCopybook(String copybook) {
+ this.copybook = copybook;
+ return this;
+ }
+
+ public Builder setCodeFormat(String codeFormat) {
+ this.codeFormat = codeFormat;
+ return this;
+ }
+
+ public Builder setCharset(String charset) {
+ this.charset = charset;
+ return this;
+ }
+
+ public Builder setRdw(Boolean rdw) {
+ this.rdw = rdw;
+ return this;
+ }
+
+ public Builder setContentFieldName(String contentFieldName) {
+ this.contentFieldName = contentFieldName;
+ return this;
+ }
+
+ private Builder() {
+ }
+
+ public CobolRecordConverterConfig build() {
+ return new CobolRecordConverterConfig(this);
+ }
+ }
+}
diff --git a/src/main/test/io/cdap/plugin/cobol/CobolRecordConverterConfigTest.java b/src/main/test/io/cdap/plugin/cobol/CobolRecordConverterConfigTest.java
new file mode 100644
index 0000000..2e001df
--- /dev/null
+++ b/src/main/test/io/cdap/plugin/cobol/CobolRecordConverterConfigTest.java
@@ -0,0 +1,139 @@
+/*
+ * Copyright © 2017-2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.cobol;
+
+import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.cdap.etl.api.validation.CauseAttributes;
+import io.cdap.cdap.etl.api.validation.ValidationException;
+import io.cdap.cdap.etl.api.validation.ValidationFailure;
+import io.cdap.cdap.etl.mock.validation.MockFailureCollector;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.annotation.Nonnull;
+
+public class CobolRecordConverterConfigTest {
+ private static final String MOCK_STAGE = "mockStage";
+ private static final Schema VALID_SCHEMA =
+ Schema.recordOf("schema",
+ Schema.Field.of("body", Schema.nullableOf(Schema.of(Schema.Type.BYTES))));
+
+ private static final String copybook = " 01 CUSTOMER-DATA.\n" +
+ " 05 CUSTOMER-ID PIC 9(6).\n" +
+ " 05 PERSONAL-DATA.\n" +
+ " 10 CUSTOMER-NAME PIC X(20).\n" +
+ " 10 CUSTOMER-ADDRESS PIC X(20).\n" +
+ " 10 CUSTOMER-PHONE PIC X(8).\n" +
+ " 05 TRANSACTIONS.\n" +
+ " 10 TRANSACTION-NBR PIC 9(9) COMP.\n" +
+ " 10 TRANSACTION OCCURS 0 TO 5\n" +
+ " DEPENDING ON TRANSACTION-NBR.\n" +
+ " 15 TRANSACTION-DATE PIC X(8).\n" +
+ " 15 FILLER REDEFINES TRANSACTION-DATE.\n" +
+ " 20 TRANSACTION-DAY PIC X(2).\n" +
+ " 20 FILLER PIC X.\n" +
+ " 20 TRANSACTION-MONTH PIC X(2).\n" +
+ " 20 FILLER PIC X.\n" +
+ " 20 TRANSACTION-YEAR PIC X(2).\n" +
+ " 15 TRANSACTION-AMOUNT PIC S9(13)V99 COMP-3.\n" +
+ " 15 TRANSACTION-COMMENT PIC X(9).";
+
+ private static final CobolRecordConverterConfig VALID_CONFIG =
+ new CobolRecordConverterConfig(copybook, "FIXED_FORMAT",
+ "IBM01140", true, "body");
+
+ @Test
+ public void testValidConfig() {
+ MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE);
+ VALID_CONFIG.getOutputSchemaAndValidate(failureCollector, VALID_SCHEMA);
+ Assert.assertTrue(failureCollector.getValidationFailures().isEmpty());
+ }
+
+ @Test
+ public void testContentFieldDoesNotExists() {
+ Schema schema = Schema.recordOf("schema",
+ Schema.Field.of("body", Schema.nullableOf(Schema.of(Schema.Type.STRING))));
+
+ MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE);
+ VALID_CONFIG.getOutputSchemaAndValidate(failureCollector, schema);
+ assertValidationFailed(failureCollector, CobolRecordConverterConfig.PROPERTY_CONTENT_FIELD_NAME);
+ }
+
+ @Test
+ public void testContentFieldOfWrongType() {
+ CobolRecordConverterConfig config = CobolRecordConverterConfig.builder(VALID_CONFIG)
+ .setContentFieldName("nonExisting")
+ .build();
+
+ MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE);
+ config.getOutputSchemaAndValidate(failureCollector, VALID_SCHEMA);
+ assertValidationFailed(failureCollector, CobolRecordConverterConfig.PROPERTY_CONTENT_FIELD_NAME);
+ }
+
+ @Test
+ public void testInvalidCharset() {
+ CobolRecordConverterConfig config = CobolRecordConverterConfig.builder(VALID_CONFIG)
+ .setCharset("nonExisting")
+ .build();
+
+ MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE);
+ try {
+ config.getOutputSchemaAndValidate(failureCollector, VALID_SCHEMA);
+ Assert.fail("ValidationException exception was expected, but not thrown.");
+ } catch (ValidationException ex) {
+ }
+
+ assertValidationFailed(failureCollector, CobolRecordConverterConfig.PROPERTY_CHARSET);
+ }
+
+ @Test
+ public void testInvalidCopybook() {
+ CobolRecordConverterConfig config = CobolRecordConverterConfig.builder(VALID_CONFIG)
+ .setCopybook("invalidCopybook")
+ .build();
+
+ MockFailureCollector failureCollector = new MockFailureCollector(MOCK_STAGE);
+ try {
+ config.getOutputSchemaAndValidate(failureCollector, VALID_SCHEMA);
+ Assert.fail("ValidationException exception was expected, but not thrown.");
+ } catch (ValidationException ex) {
+ }
+
+ assertValidationFailed(failureCollector, CobolRecordConverterConfig.PROPERTY_COPYBOOK);
+ }
+
+ private static void assertValidationFailed(MockFailureCollector failureCollector, String paramName) {
+ List failureList = failureCollector.getValidationFailures();
+
+ Assert.assertEquals(1, failureList.size());
+ ValidationFailure failure = failureList.get(0);
+ List causeList = getCauses(failure, CauseAttributes.STAGE_CONFIG);
+ Assert.assertEquals(1, causeList.size());
+ ValidationFailure.Cause cause = causeList.get(0);
+ Assert.assertEquals(paramName, cause.getAttribute(CauseAttributes.STAGE_CONFIG));
+ }
+
+ @Nonnull
+ private static List getCauses(ValidationFailure failure, String stacktrace) {
+ return failure.getCauses()
+ .stream()
+ .filter(cause -> cause.getAttribute(stacktrace) != null)
+ .collect(Collectors.toList());
+ }
+}
diff --git a/widgets/CobolRecordConverter-transform.json b/widgets/CobolRecordConverter-transform.json
index 14db757..87d0cff 100644
--- a/widgets/CobolRecordConverter-transform.json
+++ b/widgets/CobolRecordConverter-transform.json
@@ -14,14 +14,11 @@
"widget-attributes": {
"rows": "4"
},
- "plugin-function": {
- "method": "POST",
- "widget": "outputSchema",
- "output-property": "schema",
- "plugin-method": "outputSchema",
- "required-fields": ["copybook"],
- "missing-required-fields-message": "'Copybook' is required to fetch schema."
- }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Name of the field containing COBOL records",
+ "name": "contentFieldName"
},
{
"widget-type": "select",
@@ -55,11 +52,6 @@
"default": "true"
}
},
- {
- "widget-type": "textbox",
- "label": "Name of the field containing COBOL records",
- "name": "fieldName"
- }
]
}
],