Skip to content

Commit

Permalink
Closes #1371: Add optional compression support in avro2json workflow
Browse files Browse the repository at this point in the history
Accepting `compression_method` input parameter set to `$UNDEFINED$` value by default (compression disabled).
  • Loading branch information
marekhorst committed Aug 3, 2022
1 parent 138ff2c commit 7790a51
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;

import eu.dnetlib.iis.common.WorkflowRuntimeParameters;
import eu.dnetlib.iis.common.java.io.HdfsUtils;
import eu.dnetlib.iis.common.spark.JavaSparkContextFactory;

import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
Expand Down Expand Up @@ -32,12 +36,21 @@ public static void main(String[] args) throws IOException {

SQLContext sqlContext = new SQLContext(sc);
Dataset<Row> input = sqlContext.read().format("com.databricks.spark.avro").load(params.input);
input.write().json(params.output);
if (isCompressionSet(params.compressionMethod)) {
input.write().option("compression", params.compressionMethod).json(params.output);
} else {
input.write().json(params.output);
}

}
}

//------------------------ PRIVATE --------------------------

private static boolean isCompressionSet(String compressionMethod) {
return StringUtils.isNotBlank(compressionMethod) && !WorkflowRuntimeParameters.UNDEFINED_NONEMPTY_VALUE.equals(compressionMethod);
}

@Parameters(separators = "=")
private static class AvroToRdbTransformerJobParameters {

Expand All @@ -46,6 +59,9 @@ private static class AvroToRdbTransformerJobParameters {

@Parameter(names = "-output", required = true)
private String output;

@Parameter(names = "-compressionMethod", required = false)
private String compressionMethod;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@
<name>output</name>
<description>output json datastore</description>
</property>
<property>
<name>compression_method</name>
<value>$UNDEFINED$</value>
<description>output JSON datastore compression method, disabled when set to $UNDEFINED$ value</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
Expand Down Expand Up @@ -96,6 +101,7 @@
</spark-opts>
<arg>-input=${input}</arg>
<arg>-output=${output}</arg>
<arg>-compressionMethod=${compression_method}</arg>
</spark>

<ok to="end"/>
Expand Down

0 comments on commit 7790a51

Please sign in to comment.