diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..f82f33d
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,56 @@
+*.class
+.*.swp
+.beamer
+# Package Files #
+*.jar
+*.war
+*.ear
+
+# Intellij Files & Dir #
+*.iml
+*.ipr
+*.iws
+atlassian-ide-plugin.xml
+out/
+.DS_Store
+./lib/
+.idea
+
+# Gradle Files & Dir #
+build/
+.gradle/
+.stickyStorage
+.build/
+target/
+
+# Node log
+npm-*.log
+logs/
+.nux_enabled
+.nux_dashboard
+
+# Singlenode and test data files.
+/templates/
+/artifacts/
+/data/
+/data-fabric-tests/data/
+
+# gateway test leftover
+/gateway/data/
+/watchdog/data/
+
+# Checkstyle report
+examples/checkstyle_report.xml
+
+# Examples Stuff
+dependency-reduced-pom.xml
+
+# Hive db Stuff
+derby.log
+
+# generated config files
+/cdap-web-app/conf/generated
+/cdap-client-tests/conf/generated
+
+# generated by docs build
+*.pyc
diff --git a/README.md b/README.md
index d70b9ef..6f9daa9 100644
--- a/README.md
+++ b/README.md
@@ -1 +1,85 @@
-# cobol-to-avro-transform
+Cobol to Avro Converter
+===================================
+
+
+[![Build Status](https://travis-ci.org/hydrator/cobol-to-avro-transform.svg?branch=develop)](https://travis-ci.org/hydrator/cobol-to-avro-transform) [![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)
+
+Cobol to Avro Converter accepts the Cobol Copybook and converts it into the Avro schema. Generated Avro schema
+is used to convert the records in the Cobol data file into Apache Avro format.
+
+Usage Notes
+-----------
+
+The plugin accepts a Cobol Copybook as a configuration to generate an Avro schema and use that schema to convert
+records into Avro format. Code in the Copybook can be pasted in the "Copybook" property. Avro schema corresponding to
+the Copybook can be retrieved using the "Get Schema" button. "Code Format" property is used to specify the code format
+in the Copybook, which can either be FIXED_FORMAT or FREE_FORMAT. Charset used to read the data can be specified using
+"Charset" property, which defaults to "IBM01140". Cobol data file can contain variable-length logical records, in which case
+record consists of a record descriptor word (RDW) followed by the data. Plugin can be configured to use with
+variable-length records by setting property "Records start with Record Descriptor Word" to true.
+
+Cobol to Avro Converter is usually used with the WholeFileReader source plugin. WholeFileReader reads the entire data file and pass it
+to the converter as an array of bytes. Name of the field containing Cobol records as an array of bytes can be configured.
+
+Plugin Configuration
+--------------------
+
+| Configuration | Required | Default | Description |
+| :------------ | :------: | :----- | :---------- |
+| **Copybook** | **Y** | N/A | Specifies the Cobol Copybook for which Avro schema need to be generated. |
+| **Code Format** | **N** | FIXED_FORMAT | Specifies the format of the Copybook source code. |
+| **Charset** | **N** | IBM01140 | Specifies the EBCDIC charset used to read the data. |
+| **Record Descriptor Word** | **N** | True | Specifies whether the data file contains the variable-length records. |
+
+
+Build
+-----
+To build this plugin:
+
+```
+ mvn clean package
+```
+
+The build will create a .jar and .json file under the ``target`` directory.
+These files can be used to deploy your plugins.
+
+Deployment
+----------
+You can deploy your plugins using the CDAP CLI:
+
+ > load artifact .jar config-file .json>
+
+## Mailing Lists
+
+CDAP User Group and Development Discussions:
+
+* `cdap-user@googlegroups.com `
+
+The *cdap-user* mailing list is primarily for users using the product to develop
+applications or building plugins for appplications. You can expect questions from
+users, release announcements, and any other discussions that we think will be helpful
+to the users.
+
+## Slack Channel
+
+CDAP Slack Channel: http://cdap-users.herokuapp.com/
+
+
+## License and Trademarks
+
+Copyright © 2017 Cask Data, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+in compliance with the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software distributed under the
+License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+either express or implied. See the License for the specific language governing permissions
+and limitations under the License.
+
+Cask is a trademark of Cask Data, Inc. All rights reserved.
+
+Apache, Apache HBase, and HBase are trademarks of The Apache Software Foundation. Used with
+permission. No endorsement by The Apache Software Foundation is implied by the use of these marks.
\ No newline at end of file
diff --git a/docs/CobolRecordConverter-transform.md b/docs/CobolRecordConverter-transform.md
new file mode 100644
index 0000000..0543d29
--- /dev/null
+++ b/docs/CobolRecordConverter-transform.md
@@ -0,0 +1,30 @@
+# Cobol to Avro Converter
+
+Description
+-----------
+Cobol to Avro Converter accepts the Cobol Copybook and converts it into the Avro schema. Generated Avro schema
+is used to convert the records in the Cobol data file into Apache Avro format.
+
+
+Use Case
+--------
+
+Much of the World’s largest and most critical industries - healthcare, finance, insurance, retail etc. - still generate
+huge majority of their data in the mainframe. Storing data in the mainframe involves high cost of maintenance, and need
+expertise and special tools to perform data analysis. Offloading the data to Apache Hadoop saves cost, however main
+challenge is the lack of connectivity between mainframe and Apache Hadoop. Cobol to Avro Converter provides such connectivity.
+It converts mainframe data into Avro format using schema specified by the Copybook.
+
+
+Properties
+----------
+
+**copybook:** The Cobol copybook source code
+
+**codeFormat:** Code format associated with the copybook source code
+
+**charset:** The EBCDIC Charset used to read the data
+
+**rdw:** Specifies whether the Cobol record starts with Record Descriptor Word
+
+**fieldName:** Name of the field containing Cobol records in the form of array of bytes
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..bba9d09
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,217 @@
+
+ 4.0.0
+ co.cask
+ cobol-to-avro-transform
+ jar
+ 1.0-SNAPSHOT
+
+ 1.7.7
+ 4.1.0
+ 18.0
+ 3.0.7
+ 0.4.2
+ 1.2.3
+ UTF-8
+
+
+ widgets
+ docs
+
+
+ [3.3.0,10.0.0-SNAPSHOT)
+
+
+
+ system:cdap-etl-batch,
+ system:cdap-data-pipeline,
+ system:cdap-data-streams
+
+
+
+ ${project.basedir}
+
+
+
+
+ co.cask.cdap
+ cdap-api
+ ${cdap.version}
+
+
+ co.cask.cdap
+ cdap-etl-api
+ ${cdap.version}
+
+
+ com.google.guava
+ guava
+ ${guava.version}
+
+
+ org.codehaus.janino
+ janino
+ 3.0.7
+
+
+ com.legsem.legstar
+ legstar.avro.cob2avro
+ ${legstar.avro.version}
+
+
+ org.slf4j
+ *
+
+
+
+
+ org.apache.avro
+ avro
+ ${avro.version}
+
+
+ junit
+ junit
+ 4.12
+ test
+
+
+ ch.qos.logback
+ logback-core
+ ${logback.version}
+ test
+
+
+ ch.qos.logback
+ logback-classic
+ ${logback.version}
+ test
+
+
+ co.cask.cdap
+ cdap-formats
+ ${cdap.version}
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-antrun-plugin
+ 1.7
+
+
+ create-artifact-config
+ prepare-package
+
+
+
+
+
+
+ run
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.1
+
+
+ 1.7
+
+
+
+ org.apache.felix
+ maven-bundle-plugin
+ 2.5.4
+ true
+
+
+ <_exportcontents>
+ co.cask.*;
+ com.google.common.*;
+ com.legstar.*
+
+ *;inline=false;scope=compile
+ true
+ lib
+
+
+
+
+ package
+
+ bundle
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/main/java/co/cask/cobol/CopybookReader.java b/src/main/java/co/cask/cobol/CopybookReader.java
new file mode 100644
index 0000000..74f296e
--- /dev/null
+++ b/src/main/java/co/cask/cobol/CopybookReader.java
@@ -0,0 +1,252 @@
+/*
+ * Copyright © 2017 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package co.cask.cobol;
+
+import com.github.jknack.handlebars.Handlebars;
+import com.github.jknack.handlebars.Template;
+import com.google.common.base.Joiner;
+import com.google.common.io.ByteSource;
+import com.google.common.io.CharSource;
+import com.google.common.io.Closeables;
+import com.google.common.io.Resources;
+import com.legstar.avro.cob2avro.io.AbstractZosDatumReader;
+import com.legstar.avro.cob2avro.io.ZosVarDatumReader;
+import com.legstar.avro.cob2avro.io.ZosVarRdwDatumReader;
+import com.legstar.avro.translator.Xsd2AvroTranslator;
+import com.legstar.avro.translator.Xsd2AvroTranslatorException;
+import com.legstar.base.context.EbcdicCobolContext;
+import com.legstar.base.generator.Xsd2CobolTypesModelBuilder;
+import com.legstar.base.type.CobolType;
+import com.legstar.base.type.composite.CobolComplexType;
+import com.legstar.cob2xsd.Cob2Xsd;
+import com.legstar.cob2xsd.Cob2XsdConfig;
+import com.legstar.cob2xsd.antlr.RecognizerException;
+import com.legstar.cobol.model.CobolDataItem;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.ws.commons.schema.XmlSchema;
+import org.apache.ws.commons.schema.XmlSchemaCollection;
+import org.apache.ws.commons.schema.XmlSchemaSerializer;
+import org.codehaus.janino.JavaSourceClassLoader;
+import org.codehaus.janino.util.resource.Resource;
+import org.codehaus.janino.util.resource.ResourceFinder;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Reader;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import javax.annotation.Nullable;
+
+/**
+ * This class helps parsing Cobol Copybook and decoding Ebcdic encoded dataset
+ */
+public class CopybookReader {
+
+ private final Schema avroSchema;
+ private final ClassLoader cobolTypeClassLoader;
+
+ public CopybookReader(CharSource copybookSource, Properties cobolConfig) throws IOException {
+ Cob2Xsd cob2xsd = new Cob2Xsd(new Cob2XsdConfig(cobolConfig));
+
+ try (Reader reader = copybookSource.openStream()) {
+ // Parse the copybook
+ List cobolDataItems = parseCopybook(cob2xsd, reader);
+
+ // Generate XML schema from the copybook
+ XmlSchema xmlSchema = new XmlSchemaCollection().read(
+ cob2xsd.emitXsd(cobolDataItems, "co.cask.cobol").getSchemaDocument());
+
+ // Convert XML schema to Avro schema
+ Schema avroSchema = translate(xmlSchema);
+
+ // Generate the CobolType classes ClassLoader
+ this.cobolTypeClassLoader = createCobolTypesClassLoader(xmlSchema, "co.cask.cobol");
+ this.avroSchema = avroSchema;
+ } catch (RecognizerException e) {
+ throw new IOException("Failed to parse cobol copybook: " + System.lineSeparator()
+ + Joiner.on(System.lineSeparator()).join(cob2xsd.getErrorHistory()), e);
+ } catch (XmlSchemaSerializer.XmlSchemaSerializerException | Xsd2AvroTranslatorException e) {
+ throw new IOException("Failed to generate Avro schema from cobol copybook", e);
+ }
+ }
+
+ /**
+ * Returns all Avro schema created from the Cobol copybook
+ *
+ * @return a {@link Map} from record name to record {@link Schema}.
+ */
+ public Schema getSchema() {
+ return avroSchema;
+ }
+
+ /**
+ * Creates a {@link AbstractZosDatumReader} for reading Ebcdic encoded dataset into Avro {@link GenericRecord}.
+ *
+ * @param source The {@link ByteSource} for the dataset
+ * @param charset The charset used to create EBCDIC COBOL context
+ * @param hasRecordDescriptorWord {@code true} for data that has the record descriptor word prefix for each record;
+ * {@code false} other
+ * @return A {@link AbstractZosDatumReader} for reading
+ * @throws IOException If failed to create the reader
+ */
+ public AbstractZosDatumReader createRecordReader(ByteSource source, String charset,
+ boolean hasRecordDescriptorWord) throws IOException {
+ String cobolTypeClassName = avroSchema.getNamespace() + "." + avroSchema.getName();
+ CobolComplexType cobolType;
+ try {
+ cobolType = (CobolComplexType) cobolTypeClassLoader.loadClass(cobolTypeClassName).newInstance();
+ } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+ // This shouldn't happen since we generated the class
+ throw new IOException("Failed to instantiate instance of class " + cobolTypeClassName, e);
+ }
+
+ long size = source.size();
+ InputStream is = source.openBufferedStream();
+ try {
+ if (hasRecordDescriptorWord) {
+ return new ZosVarRdwDatumReader<>(is, size, new EbcdicCobolContext(charset), cobolType, avroSchema);
+ }
+ return new ZosVarDatumReader<>(is, size, new EbcdicCobolContext(charset), cobolType, avroSchema);
+ } catch (IOException e) {
+ Closeables.closeQuietly(is);
+ throw e;
+ }
+ }
+
+ private List parseCopybook(Cob2Xsd cob2xsd, Reader reader) throws RecognizerException {
+ Cob2XsdConfig config = cob2xsd.getConfig();
+
+ List cobolDataItems = new ArrayList<>();
+ for (CobolDataItem item : cob2xsd.toModel(reader)) {
+ if (config.ignoreOrphanPrimitiveElements() && item.getChildren().isEmpty()) {
+ continue;
+ }
+ cobolDataItems.add(item);
+ }
+
+ // If the copybook is empty, the list would be empty.
+ // If the copybook has top level record, the size of the list would be 1.
+ if (cobolDataItems.size() <= 1) {
+ return cobolDataItems;
+ }
+
+ // If the copybook doesn't have top level record, insert one
+ CobolDataItem item = new CobolDataItem(1, "GENERATED-TOP-RECORD");
+ item.setChildren(cobolDataItems);
+
+ cobolDataItems = new ArrayList<>();
+ cobolDataItems.add(item);
+
+ return cobolDataItems;
+ }
+
+ /**
+ * Translates a {@link XmlSchema} into a Avro {@link Schema}.
+ *
+ * @param xmlSchema the {@link XmlSchema} to translate from
+ * @return a avro record {@link Schema}
+ * @throws Xsd2AvroTranslatorException if translation failed
+ */
+ private Schema translate(XmlSchema xmlSchema) throws Xsd2AvroTranslatorException {
+ Xsd2AvroTranslator avroTranslator = new Xsd2AvroTranslator();
+ return new Schema.Parser().parse(avroTranslator.translate(xmlSchema, "co.cask.cobol", "schema"));
+ }
+
+ /**
+ * Creates a {@link ClassLoader} for loading {@link CobolType} classes that can be used for reading data encoded using
+ * Cobol copybook.
+ *
+ * @param xmlSchema The {@link XmlSchema} representation of the Cobol copybook.
+ * @param classPackage Name of the java package for the generated classes to locates in.
+ * @return a {@link ClassLoader} for loading {@link CobolType} classes
+ * @throws IOException if failed to create the ClassLoader
+ */
+ private ClassLoader createCobolTypesClassLoader(XmlSchema xmlSchema, String classPackage) throws IOException {
+ final Map sources = generateCobolTypes(xmlSchema, classPackage);
+ final long lastModified = System.currentTimeMillis();
+ return new JavaSourceClassLoader(getClass().getClassLoader(), new ResourceFinder() {
+
+ @Nullable
+ @Override
+ public Resource findResource(final String resourceName) {
+ String className = resourceName.replace('/', '.').substring(0, resourceName.length() - ".java".length());
+ final String sourceCode = sources.get(className);
+ if (sourceCode == null) {
+ return null;
+ }
+ return new Resource() {
+ @Override
+ public InputStream open() throws IOException {
+ return new ByteArrayInputStream(sourceCode.getBytes("UTF-8"));
+ }
+
+ @Override
+ public String getFileName() {
+ return resourceName;
+ }
+
+ @Override
+ public long lastModified() {
+ return lastModified;
+ }
+ };
+ }
+ }, null);
+ }
+
+ /**
+ * Generates the source code of different {@link CobolType} that can be used for reading data encoded using
+ * Cobol copybook.
+ *
+ * @param xmlSchema The {@link XmlSchema} representation of the Cobol copybook.
+ * @param classPackage Name of the java package for the generated classes to locates in.
+ * @return A {@link Map} from class name to class source code
+ * @throws IOException if failed to generate the classes
+ */
+ private Map generateCobolTypes(XmlSchema xmlSchema, String classPackage) throws IOException {
+ URL resource = getClass().getClassLoader().getResource("java.class.hbs");
+ if (resource == null) {
+ // This shouldn't happen
+ throw new IllegalStateException("Resource not found: java.class.hbs");
+ }
+
+ Handlebars handlebars = new Handlebars();
+ Template template = handlebars.compileInline(Resources.toString(resource, StandardCharsets.UTF_8));
+ Map model = new Xsd2CobolTypesModelBuilder().build(xmlSchema);
+
+ Map sources = new HashMap<>();
+ for (Map.Entry entry : model.entrySet()) {
+ String className = entry.getKey();
+ Map config = new HashMap<>();
+ config.put("target_package_name", classPackage);
+ config.put("class_name", className);
+ config.put("root_type_name", entry.getKey());
+ config.put("root_cobol_name", entry.getValue().cobolName);
+ config.put("complex_types", entry.getValue().complexTypes);
+ config.put("choice_types", entry.getValue().choiceTypes);
+ sources.put(classPackage + "." + className, template.apply(config));
+ }
+ return sources;
+ }
+}
diff --git a/src/main/java/co/cask/common/AvroConverter.java b/src/main/java/co/cask/common/AvroConverter.java
new file mode 100644
index 0000000..a265d83
--- /dev/null
+++ b/src/main/java/co/cask/common/AvroConverter.java
@@ -0,0 +1,260 @@
+/*
+ * Copyright © 2017 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package co.cask.common;
+
+import co.cask.cdap.api.data.format.StructuredRecord;
+import co.cask.cdap.api.data.schema.Schema;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.generic.GenericEnumSymbol;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.collections.map.HashedMap;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+/**
+ * Helper class to convert Avro types into CDAP types
+ */
+public final class AvroConverter {
+
+ private static final Function SCHEMA_CONVERTER =
+ new Function() {
+ @Override
+ public Schema apply(org.apache.avro.Schema input) {
+ return fromAvroSchema(input);
+ }
+ };
+
+ private static final Function FIELD_CONVERTER =
+ new Function() {
+ @Override
+ public Schema.Field apply(org.apache.avro.Schema.Field input) {
+ return Schema.Field.of(input.name(), SCHEMA_CONVERTER.apply(input.schema()));
+ }
+ };
+
+
+ /**
+ * Creates a CDAP {@link Schema} from an avro {@link org.apache.avro.Schema}.
+ */
+ public static Schema fromAvroSchema(org.apache.avro.Schema avroSchema) {
+ switch (avroSchema.getType()) {
+ case NULL:
+ return Schema.of(Schema.Type.NULL);
+ case BOOLEAN:
+ return Schema.of(Schema.Type.BOOLEAN);
+ case INT:
+ return Schema.of(Schema.Type.INT);
+ case LONG:
+ return Schema.of(Schema.Type.LONG);
+ case FLOAT:
+ return Schema.of(Schema.Type.FLOAT);
+ case DOUBLE:
+ return Schema.of(Schema.Type.DOUBLE);
+ case STRING:
+ return Schema.of(Schema.Type.STRING);
+ case BYTES:
+ return Schema.of(Schema.Type.BYTES);
+ case FIXED:
+ return Schema.of(Schema.Type.BYTES);
+ case ENUM:
+ return Schema.enumWith(avroSchema.getEnumSymbols());
+ case ARRAY:
+ return Schema.arrayOf(fromAvroSchema(avroSchema.getElementType()));
+ case MAP:
+ return Schema.mapOf(Schema.of(Schema.Type.STRING), fromAvroSchema(avroSchema.getValueType()));
+ case RECORD:
+ return Schema.recordOf(avroSchema.getName(), Iterables.transform(avroSchema.getFields(), FIELD_CONVERTER));
+ case UNION:
+ return Schema.unionOf(Iterables.transform(avroSchema.getTypes(), SCHEMA_CONVERTER));
+ }
+
+ // This shouldn't happen.
+ throw new IllegalArgumentException("Unsupported Avro schema type " + avroSchema.getType());
+ }
+
+ /**
+ * Creates a {@link StructuredRecord} from a {@link GenericRecord}.
+ *
+ * @param record the {@link GenericRecord}
+ * @param schema the {@link Schema} of the {@link StructuredRecord} to create
+ * @return a new {@link StructuredRecord}
+ */
+ public static StructuredRecord fromAvroRecord(GenericRecord record, Schema schema) {
+ StructuredRecord.Builder builder = StructuredRecord.builder(schema);
+
+ org.apache.avro.Schema avroSchema = record.getSchema();
+ for (Schema.Field field : schema.getFields()) {
+ String name = field.getName();
+ builder.set(name, fromAvroValue(field.getSchema(), avroSchema.getField(name).schema(), record.get(name)));
+ }
+ return builder.build();
+ }
+
+ @Nullable
+ private static Object fromAvroValue(Schema schema, org.apache.avro.Schema avroSchema, Object value) {
+ switch (schema.getType()) {
+ case NULL:
+ Preconditions.checkArgument(avroSchema.getType() == org.apache.avro.Schema.Type.NULL);
+ return null;
+ case BOOLEAN:
+ Preconditions.checkArgument(avroSchema.getType() == org.apache.avro.Schema.Type.BOOLEAN);
+ return value;
+ case INT:
+ Preconditions.checkArgument(avroSchema.getType() == org.apache.avro.Schema.Type.INT);
+ return value;
+ case LONG:
+ Preconditions.checkArgument(avroSchema.getType() == org.apache.avro.Schema.Type.LONG);
+ return value;
+ case FLOAT:
+ Preconditions.checkArgument(avroSchema.getType() == org.apache.avro.Schema.Type.FLOAT);
+ return value;
+ case DOUBLE:
+ Preconditions.checkArgument(avroSchema.getType() == org.apache.avro.Schema.Type.DOUBLE);
+ return value;
+ case BYTES:
+ Preconditions.checkArgument(avroSchema.getType() == org.apache.avro.Schema.Type.BYTES);
+ return value;
+ case STRING:
+ Preconditions.checkArgument(avroSchema.getType() == org.apache.avro.Schema.Type.STRING);
+ return value.toString();
+ case ENUM:
+ Preconditions.checkArgument(avroSchema.getType() == org.apache.avro.Schema.Type.ENUM);
+ return value.toString();
+ case ARRAY:
+ Preconditions.checkArgument(avroSchema.getType() == org.apache.avro.Schema.Type.ARRAY);
+ return convertAvroArray(schema.getComponentSchema(), avroSchema.getElementType(), (GenericArray) value);
+ case MAP:
+ Preconditions.checkArgument(avroSchema.getType() == org.apache.avro.Schema.Type.MAP);
+ Preconditions.checkArgument(schema.getMapSchema().getKey().getType() == Schema.Type.STRING);
+ return convertAvroMap(schema.getMapSchema().getValue(), avroSchema.getValueType(), (Map) value);
+ case RECORD:
+ Preconditions.checkArgument(avroSchema.getType() == org.apache.avro.Schema.Type.RECORD);
+ return fromAvroRecord((GenericRecord) value, schema);
+ case UNION:
+ Preconditions.checkArgument(avroSchema.getType() == org.apache.avro.Schema.Type.UNION);
+ return convertUnion(schema, avroSchema, value);
+ }
+ throw new IllegalArgumentException("Unsupported schema type " + schema.getType());
+ }
+
+ private static Collection> convertAvroArray(Schema elementSchema,
+ org.apache.avro.Schema avroElementSchema, GenericArray> array) {
+ List