From 6f1a1d91eca749e67bd10a5afe88f3a101219059 Mon Sep 17 00:00:00 2001 From: Marek Horst Date: Wed, 27 Sep 2023 10:41:27 +0200 Subject: [PATCH] Closes #1426: Run IIS experiments by relying on spark 3.4 version WIP. Commenting out avro related methods from scala sources which were relying on avro deserializer class which was made private in spark3. Aligning sources with those changes by changing the way dataframes are constructed from collections of avro records and other refactoring required to compile IIS sources successsfully. This does not mean the code is already operational, some tests fail and still need to be fixed. Upgrading logging system dependencies to match sharelib log4j dependencies version. Upgrading maven-plugin-plugin version to solve build bug induced by upgraded log4j version. --- iis-common/pom.xml | 2 +- .../NamedCountersAccumulableParam.java | 48 ---------- .../spark/avro/AvroDataFrameSupport.scala | 20 ++-- .../spark/avro/AvroDatasetSupport.scala | 32 +++---- .../common/spark/avro/AvroDatasetWriter.scala | 8 +- .../NamedCountersAccumulableParamTest.java | 92 ------------------- .../spark/avro/AvroDataFrameSupportTest.java | 32 +++---- .../spark/avro/AvroDatasetSupportTest.java | 38 ++++---- .../spark/avro/AvroDatasetWriterTest.java | 34 +++---- .../CitationRelationExporterIOUtils.java | 17 ++-- .../citation/CitationRelationExporterJob.java | 9 +- .../relation/citation/oozie_app/workflow.xml | 2 +- .../CitationRelationExporterIOUtilsTest.java | 38 +++++--- .../CitationRelationExporterJobTest.java | 48 ++++------ .../CitationRelationExporterUtilsTest.java | 83 +++++++---------- .../TaraReferenceExtractionIOUtilsTest.java | 51 +++++++--- .../TaraReferenceExtractionUtilsTest.java | 73 ++++++++------- pom.xml | 52 ++++++++--- 18 files changed, 297 insertions(+), 382 deletions(-) delete mode 100644 iis-common/src/main/java/eu/dnetlib/iis/common/counter/NamedCountersAccumulableParam.java delete mode 100644 iis-common/src/test/java/eu/dnetlib/iis/common/counter/NamedCountersAccumulableParamTest.java diff --git a/iis-common/pom.xml b/iis-common/pom.xml index 1ec04d597..53c37a3a8 100644 --- a/iis-common/pom.xml +++ b/iis-common/pom.xml @@ -258,7 +258,7 @@ - 2.11.12 + 2.12.14 diff --git a/iis-common/src/main/java/eu/dnetlib/iis/common/counter/NamedCountersAccumulableParam.java b/iis-common/src/main/java/eu/dnetlib/iis/common/counter/NamedCountersAccumulableParam.java deleted file mode 100644 index 88033ea35..000000000 --- a/iis-common/src/main/java/eu/dnetlib/iis/common/counter/NamedCountersAccumulableParam.java +++ /dev/null @@ -1,48 +0,0 @@ -package eu.dnetlib.iis.common.counter; - -import org.apache.spark.AccumulableParam; - -import scala.Tuple2; - -/** - * Spark {@link AccumulableParam} for tracking multiple counter values using {@link NamedCounters}. - * - * @author madryk - */ -public class NamedCountersAccumulableParam implements AccumulableParam> { - - private static final long serialVersionUID = 1L; - - - //------------------------ LOGIC -------------------------- - - /** - * Increments {@link NamedCounters} counter with the name same as the first element of passed incrementValue tuple - * by value defined in the second element of incrementValue tuple. - */ - @Override - public NamedCounters addAccumulator(NamedCounters counters, Tuple2 incrementValue) { - counters.increment(incrementValue._1, incrementValue._2); - return counters; - } - - /** - * Merges two passed {@link NamedCounters}. - */ - @Override - public NamedCounters addInPlace(NamedCounters counters1, NamedCounters counters2) { - for (String counterName2 : counters2.counterNames()) { - counters1.increment(counterName2, counters2.currentValue(counterName2)); - } - return counters1; - } - - /** - * Returns passed initialCounters value without any modifications. - */ - @Override - public NamedCounters zero(NamedCounters initialCounters) { - return initialCounters; - } - -} diff --git a/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDataFrameSupport.scala b/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDataFrameSupport.scala index 5a122bd44..3ad0f7a1d 100644 --- a/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDataFrameSupport.scala +++ b/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDataFrameSupport.scala @@ -26,9 +26,9 @@ class AvroDataFrameSupport(val spark: SparkSession) extends Serializable { * @tparam T Type of elements. * @return DataFrame containing data from the given list. */ - def createDataFrame[T](data: java.util.List[T], avroSchema: Schema): DataFrame = { - createDataFrame(data.asScala, avroSchema) - } +// def createDataFrame[T](data: java.util.List[T], avroSchema: Schema): DataFrame = { +// createDataFrame(data.asScala, avroSchema) +// } /** * Creates a dataframe from a given collection. @@ -38,13 +38,13 @@ class AvroDataFrameSupport(val spark: SparkSession) extends Serializable { * @tparam T Type of elements. * @return DataFrame containing data from the given seq. */ - def createDataFrame[T](data: Seq[T], avroSchema: Schema): DataFrame = { - val rowSchema = SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType] - val encoder = RowEncoder.apply(rowSchema).resolveAndBind() - val deserializer = new AvroDeserializer(avroSchema, rowSchema) - val rows = data.map(record => encoder.fromRow(deserializer.deserialize(record).asInstanceOf[InternalRow])) - spark.createDataFrame(spark.sparkContext.parallelize(rows), rowSchema) - } +// def createDataFrame[T](data: Seq[T], avroSchema: Schema): DataFrame = { +// val rowSchema = SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType] +// val encoder = RowEncoder.apply(rowSchema).resolveAndBind() +// val deserializer = new AvroDeserializer(avroSchema, rowSchema) +// val rows = data.map(record => encoder.fromRow(deserializer.deserialize(record).asInstanceOf[InternalRow])) +// spark.createDataFrame(spark.sparkContext.parallelize(rows), rowSchema) +// } /** * Creates a dataset from given dataframe using kryo encoder. diff --git a/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDatasetSupport.scala b/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDatasetSupport.scala index ffc1abd57..ea39f79c8 100644 --- a/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDatasetSupport.scala +++ b/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDatasetSupport.scala @@ -23,20 +23,20 @@ class AvroDatasetSupport(val spark: SparkSession) extends Serializable { * @tparam T Type of objects in the dataset. * @return DataFrame of objects corresponding to records in the given dataset. */ - def toDF[T <: SpecificRecordBase](ds: Dataset[T], avroSchema: Schema): DataFrame = { - val avroSchemaStr = avroSchema.toString - val rowSchema = SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType] - val encoder = RowEncoder(rowSchema).resolveAndBind() - - object SerializationSupport extends Serializable { - @transient private lazy val deserializer = new AvroDeserializer(new Schema.Parser().parse(avroSchemaStr), rowSchema) - private val rows = ds.rdd.map(record => encoder.fromRow(deserializer.deserialize(record).asInstanceOf[InternalRow])) - - def doToDF(): DataFrame = { - spark.createDataFrame(rows, rowSchema) - } - } - - SerializationSupport.doToDF() - } +// def toDF[T <: SpecificRecordBase](ds: Dataset[T], avroSchema: Schema): DataFrame = { +// val avroSchemaStr = avroSchema.toString +// val rowSchema = SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType] +// val encoder = RowEncoder(rowSchema).resolveAndBind() +// +// object SerializationSupport extends Serializable { +// @transient private lazy val deserializer = new AvroDeserializer(new Schema.Parser().parse(avroSchemaStr), rowSchema) +// private val rows = ds.rdd.map(record => encoder.fromRow(deserializer.deserialize(record).asInstanceOf[InternalRow])) +// +// def doToDF(): DataFrame = { +// spark.createDataFrame(rows, rowSchema) +// } +// } +// +// SerializationSupport.doToDF() +// } } diff --git a/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDatasetWriter.scala b/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDatasetWriter.scala index db4384943..20258d3a9 100644 --- a/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDatasetWriter.scala +++ b/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDatasetWriter.scala @@ -18,8 +18,8 @@ class AvroDatasetWriter[T <: SpecificRecordBase](ds: Dataset[T]) extends Seriali * @param path Path to the data store. * @param avroSchema Avro schema of the records. */ - def write(path: String, avroSchema: Schema): Unit = { - new AvroDataFrameWriter(new AvroDatasetSupport(ds.sparkSession).toDF(ds, avroSchema)) - .write(path, avroSchema) - } +// def write(path: String, avroSchema: Schema): Unit = { +// new AvroDataFrameWriter(new AvroDatasetSupport(ds.sparkSession).toDF(ds, avroSchema)) +// .write(path, avroSchema) +// } } diff --git a/iis-common/src/test/java/eu/dnetlib/iis/common/counter/NamedCountersAccumulableParamTest.java b/iis-common/src/test/java/eu/dnetlib/iis/common/counter/NamedCountersAccumulableParamTest.java deleted file mode 100644 index faee84963..000000000 --- a/iis-common/src/test/java/eu/dnetlib/iis/common/counter/NamedCountersAccumulableParamTest.java +++ /dev/null @@ -1,92 +0,0 @@ -package eu.dnetlib.iis.common.counter; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import scala.Tuple2; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.is; -import static org.junit.jupiter.api.Assertions.assertSame; - -/** - * @author madryk - */ -public class NamedCountersAccumulableParamTest { - - private NamedCountersAccumulableParam countersAccumulableParam = new NamedCountersAccumulableParam(); - - - private String counterName1 = "COUNTER_1"; - - private String counterName2 = "COUNTER_2"; - - private String counterName3 = "COUNTER_3"; - - - private NamedCounters namedCounters1 = new NamedCounters(new String[] {counterName1, counterName2}); - - private NamedCounters namedCounters2 = new NamedCounters(new String[] {counterName2, counterName3}); - - - @BeforeEach - public void setup() { - namedCounters1.increment(counterName1, 3L); - namedCounters1.increment(counterName2, 1L); - namedCounters2.increment(counterName2, 7L); - namedCounters2.increment(counterName3, 5L); - } - - - //------------------------ TESTS -------------------------- - - @Test - public void addAccumulator() { - - // execute - - NamedCounters retNamedCounters = countersAccumulableParam.addAccumulator(namedCounters1, new Tuple2<>(counterName2, 9L)); - - // assert - - assertSame(retNamedCounters, namedCounters1); - - assertThat(retNamedCounters.counterNames(), containsInAnyOrder(counterName1, counterName2)); - assertThat(retNamedCounters.currentValue(counterName1), is(3L)); - assertThat(retNamedCounters.currentValue(counterName2), is(10L)); - } - - @Test - public void addInPlace() { - - // execute - - NamedCounters retNamedCounters = countersAccumulableParam.addInPlace(namedCounters1, namedCounters2); - - // assert - - assertSame(retNamedCounters, namedCounters1); - - assertThat(retNamedCounters.counterNames(), containsInAnyOrder(counterName1, counterName2, counterName3)); - assertThat(retNamedCounters.currentValue(counterName1), is(3L)); - assertThat(retNamedCounters.currentValue(counterName2), is(8L)); - assertThat(retNamedCounters.currentValue(counterName3), is(5L)); - } - - @Test - public void zero() { - - // execute - - NamedCounters retNamedCounters = countersAccumulableParam.zero(namedCounters1); - - // assert - - assertSame(retNamedCounters, namedCounters1); - - assertThat(retNamedCounters.counterNames(), containsInAnyOrder(counterName1, counterName2)); - assertThat(retNamedCounters.currentValue(counterName1), is(3L)); - assertThat(retNamedCounters.currentValue(counterName2), is(1L)); - } - -} diff --git a/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameSupportTest.java b/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameSupportTest.java index 85a07bc42..5e28202b3 100644 --- a/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameSupportTest.java +++ b/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameSupportTest.java @@ -27,22 +27,22 @@ public void beforeEach() { support = new AvroDataFrameSupport(spark()); } - @Test - @DisplayName("Avro dataframe support creates dataframe from collection of avro type") - public void givenACollectionOfAvroType_whenConvertedToDataFrame_thenProperDataFrameIsReturned() { - Person person = Person.newBuilder().setId(1).setName("name").setAge(2).build(); - List data = Collections.singletonList(person); - - Dataset result = support.createDataFrame(data, Person.SCHEMA$); - - assertSchemasEqualIgnoringNullability(Person.SCHEMA$, result.schema()); - List rows = result.collectAsList(); - assertEquals(1, rows.size()); - Row row = rows.get(0); - assertEquals(person.getId(), row.getAs("id")); - assertEquals(person.getName(), row.getAs("name")); - assertEquals(person.getAge(), row.getAs("age")); - } +// @Test +// @DisplayName("Avro dataframe support creates dataframe from collection of avro type") +// public void givenACollectionOfAvroType_whenConvertedToDataFrame_thenProperDataFrameIsReturned() { +// Person person = Person.newBuilder().setId(1).setName("name").setAge(2).build(); +// List data = Collections.singletonList(person); +// +// Dataset result = support.createDataFrame(data, Person.SCHEMA$); +// +// assertSchemasEqualIgnoringNullability(Person.SCHEMA$, result.schema()); +// List rows = result.collectAsList(); +// assertEquals(1, rows.size()); +// Row row = rows.get(0); +// assertEquals(person.getId(), row.getAs("id")); +// assertEquals(person.getName(), row.getAs("name")); +// assertEquals(person.getAge(), row.getAs("age")); +// } @Test @DisplayName("Avro dataframe support converts dataframe of avro type to dataset of avro type") diff --git a/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDatasetSupportTest.java b/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDatasetSupportTest.java index 29e196bce..e57403116 100644 --- a/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDatasetSupportTest.java +++ b/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDatasetSupportTest.java @@ -24,25 +24,25 @@ public void beforeEach() { super.beforeEach(); } - @Test - @DisplayName("Avro dataset support converts dataset of avro type to dataframe of avro type") - public void givenDatasetOfAvroType_whenConvertedToDataFrame_thenProperDataFrameIsReturned() { - Person person = Person.newBuilder().setId(1).setName("name").setAge(2).build(); - Dataset ds = spark().createDataset( - Collections.singletonList(person), - Encoders.kryo(Person.class) - ); - - Dataset result = new AvroDatasetSupport(spark()).toDF(ds, Person.SCHEMA$); - - assertSchemasEqualIgnoringNullability(Person.SCHEMA$, result.schema()); - List rows = result.collectAsList(); - assertEquals(1, rows.size()); - Row row = rows.get(0); - assertEquals(person.getId(), row.getAs("id")); - assertEquals(person.getName(), row.getAs("name")); - assertEquals(person.getAge(), row.getAs("age")); - } +// @Test +// @DisplayName("Avro dataset support converts dataset of avro type to dataframe of avro type") +// public void givenDatasetOfAvroType_whenConvertedToDataFrame_thenProperDataFrameIsReturned() { +// Person person = Person.newBuilder().setId(1).setName("name").setAge(2).build(); +// Dataset ds = spark().createDataset( +// Collections.singletonList(person), +// Encoders.kryo(Person.class) +// ); +// +// Dataset result = new AvroDatasetSupport(spark()).toDF(ds, Person.SCHEMA$); +// +// assertSchemasEqualIgnoringNullability(Person.SCHEMA$, result.schema()); +// List rows = result.collectAsList(); +// assertEquals(1, rows.size()); +// Row row = rows.get(0); +// assertEquals(person.getId(), row.getAs("id")); +// assertEquals(person.getName(), row.getAs("name")); +// assertEquals(person.getAge(), row.getAs("age")); +// } public static void assertSchemasEqualIgnoringNullability(Schema avroSchema, StructType sqlSchema) { assertEquals(SchemaConverters.toSqlType(avroSchema).dataType().asNullable(), sqlSchema.asNullable()); diff --git a/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDatasetWriterTest.java b/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDatasetWriterTest.java index 64966505a..beab31aa2 100644 --- a/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDatasetWriterTest.java +++ b/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDatasetWriterTest.java @@ -24,21 +24,21 @@ public void beforeEach() { super.beforeEach(); } - @Test - @DisplayName("Avro dataset writer writes dataset of avro type") - public void givenDatasetOfAvroType_whenWrittenToOutput_thenWriteSucceeds(@TempDir Path workingDir) throws IOException { - Path outputDir = workingDir.resolve("output"); - Person person = Person.newBuilder().setId(1).setName("name").setAge(2).build(); - Dataset ds = spark().createDataset( - Collections.singletonList(person), - Encoders.kryo(Person.class) - ); - - new AvroDatasetWriter<>(ds).write(outputDir.toString(), Person.SCHEMA$); - - List personList = AvroTestUtils.readLocalAvroDataStore(outputDir.toString()); - assertEquals(1, personList.size()); - Person personRead = personList.get(0); - assertEquals(person, personRead); - } +// @Test +// @DisplayName("Avro dataset writer writes dataset of avro type") +// public void givenDatasetOfAvroType_whenWrittenToOutput_thenWriteSucceeds(@TempDir Path workingDir) throws IOException { +// Path outputDir = workingDir.resolve("output"); +// Person person = Person.newBuilder().setId(1).setName("name").setAge(2).build(); +// Dataset ds = spark().createDataset( +// Collections.singletonList(person), +// Encoders.kryo(Person.class) +// ); +// +// new AvroDatasetWriter<>(ds).write(outputDir.toString(), Person.SCHEMA$); +// +// List personList = AvroTestUtils.readLocalAvroDataStore(outputDir.toString()); +// assertEquals(1, personList.size()); +// Person personRead = personList.get(0); +// assertEquals(person, personRead); +// } } \ No newline at end of file diff --git a/iis-wf/iis-wf-export-actionmanager/src/main/java/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/CitationRelationExporterIOUtils.java b/iis-wf/iis-wf-export-actionmanager/src/main/java/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/CitationRelationExporterIOUtils.java index ce58552d3..51fa848c6 100644 --- a/iis-wf/iis-wf-export-actionmanager/src/main/java/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/CitationRelationExporterIOUtils.java +++ b/iis-wf/iis-wf-export-actionmanager/src/main/java/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/CitationRelationExporterIOUtils.java @@ -3,9 +3,11 @@ import eu.dnetlib.iis.common.java.io.HdfsUtils; import eu.dnetlib.iis.common.schemas.ReportEntry; import eu.dnetlib.iis.common.spark.avro.AvroDataFrameReader; -import eu.dnetlib.iis.common.spark.avro.AvroDatasetWriter; +import eu.dnetlib.iis.common.spark.avro.AvroDataFrameWriter; import eu.dnetlib.iis.common.utils.RDDUtils; import eu.dnetlib.iis.export.schemas.Citations; +import pl.edu.icm.sparkutils.avro.SparkAvroSaver; + import org.apache.hadoop.io.Text; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.PairFunction; @@ -24,7 +26,7 @@ public final class CitationRelationExporterIOUtils { private static final Logger logger = LoggerFactory.getLogger(CitationRelationExporterIOUtils.class); - + private CitationRelationExporterIOUtils() { } @@ -80,14 +82,17 @@ private static JavaPairRDD datasetToPairRDD(Dataset serialized .mapToPair((PairFunction) content -> new Tuple2<>(new Text(), content)); } - public static void storeReportEntries(SparkSession spark, + public static void storeReportEntries(SparkAvroSaver avroSaver, Dataset reportEntries, String outputReportPath) { - storeReportEntries(reportEntries, outputReportPath, (ds, path) -> - new AvroDatasetWriter<>(ds).write(path, ReportEntry.SCHEMA$)); + storeReportEntries(avroSaver, reportEntries, outputReportPath, (ds, path) -> + // FIXME avoiding relying on writing Dataset which apparently is not easily achievable in spark3 + // due to AvroDatasetSupport scala functions referring to classes which were made private in spark3 + avroSaver.saveJavaRDD(ds.javaRDD(), ReportEntry.SCHEMA$, path)); + // new AvroDataFrameWriter(ds).write(path, ReportEntry.SCHEMA$)); } - public static void storeReportEntries(Dataset reportEntries, + public static void storeReportEntries(SparkAvroSaver avroSaver, Dataset reportEntries, String outputReportPath, BiConsumer, String> writeFn) { logger.info("Storing report data in path {}.", outputReportPath); diff --git a/iis-wf/iis-wf-export-actionmanager/src/main/java/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/CitationRelationExporterJob.java b/iis-wf/iis-wf-export-actionmanager/src/main/java/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/CitationRelationExporterJob.java index 8748dc3be..d98794016 100644 --- a/iis-wf/iis-wf-export-actionmanager/src/main/java/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/CitationRelationExporterJob.java +++ b/iis-wf/iis-wf-export-actionmanager/src/main/java/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/CitationRelationExporterJob.java @@ -6,6 +6,8 @@ import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.iis.common.schemas.ReportEntry; import eu.dnetlib.iis.wf.export.actionmanager.entity.ConfidenceLevelUtils; +import pl.edu.icm.sparkutils.avro.SparkAvroSaver; + import org.apache.hadoop.io.Text; import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; @@ -22,13 +24,16 @@ import static org.apache.spark.sql.functions.udf; public class CitationRelationExporterJob { - + + private static final SparkAvroSaver avroSaver = new SparkAvroSaver(); + private CitationRelationExporterJob() { } private static final Logger logger = LoggerFactory.getLogger(CitationRelationExporterJob.class); public static void main(String[] args) { + JobParameters params = new JobParameters(); JCommander jcommander = new JCommander(params); jcommander.parse(args); @@ -52,7 +57,7 @@ public static void main(String[] args) { storeSerializedActions(spark, serializedActions, params.outputRelationPath); Dataset reportEntries = relationsToReportEntries(spark, relations); - storeReportEntries(spark, reportEntries, params.outputReportPath); + storeReportEntries(avroSaver, reportEntries, params.outputReportPath); }); } diff --git a/iis-wf/iis-wf-export-actionmanager/src/main/resources/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/oozie_app/workflow.xml b/iis-wf/iis-wf-export-actionmanager/src/main/resources/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/oozie_app/workflow.xml index 3b63836e5..d242d7cf7 100644 --- a/iis-wf/iis-wf-export-actionmanager/src/main/resources/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/oozie_app/workflow.xml +++ b/iis-wf/iis-wf-export-actionmanager/src/main/resources/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/oozie_app/workflow.xml @@ -85,7 +85,7 @@ - yarn-cluster + yarn cluster citation relation spark exporter eu.dnetlib.iis.wf.export.actionmanager.relation.citation.CitationRelationExporterJob diff --git a/iis-wf/iis-wf-export-actionmanager/src/test/java/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/CitationRelationExporterIOUtilsTest.java b/iis-wf/iis-wf-export-actionmanager/src/test/java/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/CitationRelationExporterIOUtilsTest.java index 4e494f168..64f2441f2 100644 --- a/iis-wf/iis-wf-export-actionmanager/src/test/java/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/CitationRelationExporterIOUtilsTest.java +++ b/iis-wf/iis-wf-export-actionmanager/src/test/java/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/CitationRelationExporterIOUtilsTest.java @@ -1,7 +1,23 @@ package eu.dnetlib.iis.wf.export.actionmanager.relation.citation; -import eu.dnetlib.iis.common.schemas.ReportEntry; -import eu.dnetlib.iis.common.spark.TestWithSharedSparkSession; +import static eu.dnetlib.iis.wf.export.actionmanager.relation.citation.CitationRelationExporterIOUtils.clearOutput; +import static eu.dnetlib.iis.wf.export.actionmanager.relation.citation.CitationRelationExporterIOUtils.readCitations; +import static eu.dnetlib.iis.wf.export.actionmanager.relation.citation.CitationRelationExporterIOUtils.storeReportEntries; +import static eu.dnetlib.iis.wf.export.actionmanager.relation.citation.CitationRelationExporterIOUtils.storeSerializedActions; +import static org.hamcrest.CoreMatchers.sameInstance; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Collections; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Function; + import org.apache.hadoop.io.Text; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.sql.Dataset; @@ -12,18 +28,11 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.ArgumentCaptor; import org.mockito.junit.jupiter.MockitoExtension; -import scala.Tuple2; -import java.util.Collections; -import java.util.function.BiConsumer; -import java.util.function.Consumer; -import java.util.function.Function; - -import static eu.dnetlib.iis.wf.export.actionmanager.relation.citation.CitationRelationExporterIOUtils.*; -import static org.hamcrest.CoreMatchers.sameInstance; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.Mockito.*; +import eu.dnetlib.iis.common.schemas.ReportEntry; +import eu.dnetlib.iis.common.spark.TestWithSharedSparkSession; +import pl.edu.icm.sparkutils.avro.SparkAvroSaver; +import scala.Tuple2; @ExtendWith(MockitoExtension.class) class CitationRelationExporterIOUtilsTest extends TestWithSharedSparkSession { @@ -70,12 +79,13 @@ public void givenMockWriteFunction_whenSerializedActionsAreStored_thenMockIsUsed @Test @DisplayName("Report entries are stored in output") public void givenMockWriteFunction_whenReportEntriesAreStored_thenMockIsUsed() { + SparkAvroSaver avroSaver = mock(SparkAvroSaver.class); BiConsumer, String> writeFn = mock(BiConsumer.class); Dataset repartitionedReportEntries = mock(Dataset.class); Dataset reportEntries = mock(Dataset.class); when(reportEntries.repartition(1)).thenReturn(repartitionedReportEntries); - storeReportEntries(reportEntries, "path/to/report", writeFn); + storeReportEntries(avroSaver, reportEntries, "path/to/report", writeFn); verify(writeFn, atLeastOnce()).accept(repartitionedReportEntries, "path/to/report"); } diff --git a/iis-wf/iis-wf-export-actionmanager/src/test/java/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/CitationRelationExporterJobTest.java b/iis-wf/iis-wf-export-actionmanager/src/test/java/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/CitationRelationExporterJobTest.java index bb5b9c76f..816914a9b 100644 --- a/iis-wf/iis-wf-export-actionmanager/src/test/java/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/CitationRelationExporterJobTest.java +++ b/iis-wf/iis-wf-export-actionmanager/src/test/java/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/CitationRelationExporterJobTest.java @@ -1,5 +1,20 @@ package eu.dnetlib.iis.wf.export.actionmanager.relation.citation; +import static eu.dnetlib.iis.wf.export.actionmanager.relation.citation.Matchers.matchingAtomicAction; +import static org.hamcrest.CoreMatchers.hasItem; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.io.Text; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + import eu.dnetlib.dhp.schema.action.AtomicAction; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.iis.common.citations.schemas.CitationEntry; @@ -8,30 +23,14 @@ import eu.dnetlib.iis.common.report.ReportEntryFactory; import eu.dnetlib.iis.common.schemas.ReportEntry; import eu.dnetlib.iis.common.spark.TestWithSharedSparkSession; -import eu.dnetlib.iis.common.spark.avro.AvroDataFrameSupport; import eu.dnetlib.iis.common.spark.avro.AvroDataFrameWriter; import eu.dnetlib.iis.common.spark.avro.AvroDatasetReader; import eu.dnetlib.iis.export.schemas.Citations; import eu.dnetlib.iis.wf.export.actionmanager.AtomicActionDeserializationUtils; import eu.dnetlib.iis.wf.export.actionmanager.OafConstants; import eu.dnetlib.iis.wf.export.actionmanager.module.BuilderModuleHelper; -import org.apache.avro.generic.GenericData; -import org.apache.hadoop.io.Text; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; import scala.Tuple2; -import java.io.IOException; -import java.nio.file.Path; -import java.util.Collections; -import java.util.List; - -import static eu.dnetlib.iis.wf.export.actionmanager.relation.citation.Matchers.matchingAtomicAction; -import static org.hamcrest.CoreMatchers.hasItem; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.jupiter.api.Assertions.assertEquals; - class CitationRelationExporterJobTest extends TestWithSharedSparkSession { @Test @@ -39,17 +38,16 @@ class CitationRelationExporterJobTest extends TestWithSharedSparkSession { public void givenInputCitationsPath_whenRun_thenSerializedAtomicActionsAndReportsAreCreated(@TempDir Path rootInputPath, @TempDir Path rootOutputPath) throws IOException { List citationsList = Collections.singletonList( - createCitations( + CitationRelationExporterTestUtils.createCitations( "DocumentId", Collections.singletonList( createCitationEntry("DestinationDocumentId", 1.0f) )) ); Path inputCitationsPath = rootInputPath.resolve("citations"); - new AvroDataFrameWriter( - new AvroDataFrameSupport(spark()).createDataFrame(citationsList, Citations.SCHEMA$)).write( - inputCitationsPath.toString() - ); + + new AvroDataFrameWriter(CitationRelationExporterTestUtils.createDataFrame(spark(), citationsList)).write(inputCitationsPath.toString()); + float trustLevelThreshold = 0.5f; Path outputRelationPath = rootOutputPath.resolve("output"); Path outputReportPath = rootOutputPath.resolve("report"); @@ -91,16 +89,10 @@ public void givenInputCitationsPath_whenRun_thenSerializedAtomicActionsAndReport )); } - private static Citations createCitations(String documentId, List citationEntries) { - return Citations.newBuilder() - .setDocumentId(documentId) - .setCitations(new GenericData.Array<>(Citations.SCHEMA$.getField("citations").schema(), citationEntries)) - .build(); - } - private static CitationEntry createCitationEntry(String destinationDocumentId, Float confidenceLevel) { return CitationEntry.newBuilder() .setPosition(0) + .setRawText("irrelevant") .setDestinationDocumentId(destinationDocumentId) .setConfidenceLevel(confidenceLevel) .setExternalDestinationDocumentIds(Collections.emptyMap()) diff --git a/iis-wf/iis-wf-export-actionmanager/src/test/java/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/CitationRelationExporterUtilsTest.java b/iis-wf/iis-wf-export-actionmanager/src/test/java/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/CitationRelationExporterUtilsTest.java index 9da1cab4d..94f8a968b 100644 --- a/iis-wf/iis-wf-export-actionmanager/src/test/java/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/CitationRelationExporterUtilsTest.java +++ b/iis-wf/iis-wf-export-actionmanager/src/test/java/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/CitationRelationExporterUtilsTest.java @@ -1,16 +1,21 @@ package eu.dnetlib.iis.wf.export.actionmanager.relation.citation; -import eu.dnetlib.dhp.schema.oaf.Relation; -import eu.dnetlib.iis.common.citations.schemas.CitationEntry; -import eu.dnetlib.iis.common.report.ReportEntryFactory; -import eu.dnetlib.iis.common.schemas.ReportEntry; -import eu.dnetlib.iis.common.spark.TestWithSharedSparkSession; -import eu.dnetlib.iis.common.spark.avro.AvroDataFrameSupport; -import eu.dnetlib.iis.export.schemas.Citations; -import eu.dnetlib.iis.wf.export.actionmanager.AtomicActionDeserializationUtils; -import eu.dnetlib.iis.wf.export.actionmanager.OafConstants; -import eu.dnetlib.iis.wf.export.actionmanager.module.BuilderModuleHelper; -import org.apache.avro.generic.GenericData; +import static eu.dnetlib.iis.wf.export.actionmanager.relation.citation.CitationRelationExporterUtils.processCitations; +import static eu.dnetlib.iis.wf.export.actionmanager.relation.citation.CitationRelationExporterUtils.relationsToReportEntries; +import static eu.dnetlib.iis.wf.export.actionmanager.relation.citation.CitationRelationExporterUtils.relationsToSerializedActions; +import static eu.dnetlib.iis.wf.export.actionmanager.relation.citation.Matchers.matchingRelation; +import static org.apache.spark.sql.functions.udf; +import static org.hamcrest.CoreMatchers.hasItem; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + import org.apache.hadoop.io.Text; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; @@ -22,19 +27,15 @@ import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; -import java.io.IOException; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - -import static eu.dnetlib.iis.wf.export.actionmanager.relation.citation.CitationRelationExporterUtils.*; -import static eu.dnetlib.iis.wf.export.actionmanager.relation.citation.Matchers.matchingRelation; -import static org.apache.spark.sql.functions.udf; -import static org.hamcrest.CoreMatchers.hasItem; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.iis.common.citations.schemas.CitationEntry; +import eu.dnetlib.iis.common.report.ReportEntryFactory; +import eu.dnetlib.iis.common.schemas.ReportEntry; +import eu.dnetlib.iis.common.spark.TestWithSharedSparkSession; +import eu.dnetlib.iis.export.schemas.Citations; +import eu.dnetlib.iis.wf.export.actionmanager.AtomicActionDeserializationUtils; +import eu.dnetlib.iis.wf.export.actionmanager.OafConstants; +import eu.dnetlib.iis.wf.export.actionmanager.module.BuilderModuleHelper; class CitationRelationExporterUtilsTest extends TestWithSharedSparkSession { @@ -44,11 +45,10 @@ public class ProcessCitationsTest { @Test @DisplayName("Processing returns empty dataset for input with empty citation entries") public void givenCitationsWithEmptyCitationEntries_whenProcessed_thenEmptyDataSetIsReturned() { - AvroDataFrameSupport avroDataFrameSupport = new AvroDataFrameSupport(spark()); UserDefinedFunction isValidConfidenceLevel = udf((UDF1) confidenceLevel -> true, DataTypes.BooleanType); - List results = processCitations(avroDataFrameSupport.createDataFrame(Collections.emptyList(), Citations.SCHEMA$), + List results = processCitations(CitationRelationExporterTestUtils.createDataFrame(spark(), Collections.emptyList()), isValidConfidenceLevel).collectAsList(); assertTrue(results.isEmpty()); @@ -57,15 +57,13 @@ public void givenCitationsWithEmptyCitationEntries_whenProcessed_thenEmptyDataSe @Test @DisplayName("Processing returns empty dataset for input without destination document id") public void givenCitationsWithNullDestinationDocumentId_whenProcessed_thenEmptyDataSetIsReturned() { - AvroDataFrameSupport avroDataFrameSupport = new AvroDataFrameSupport(spark()); List citationEntries = Collections.singletonList( createCitationEntry(null, 0.5f) ); - Citations citations = createCitations("DocumentId", citationEntries); + Citations citations = CitationRelationExporterTestUtils.createCitations("DocumentId", citationEntries); UserDefinedFunction isValidConfidenceLevel = udf((UDF1) confidenceLevel -> true, DataTypes.BooleanType); - Dataset citationsDF = avroDataFrameSupport.createDataFrame(Collections.singletonList(citations), - Citations.SCHEMA$); + Dataset citationsDF = CitationRelationExporterTestUtils.createDataFrame(spark(), Collections.singletonList(citations)); List results = processCitations(citationsDF, isValidConfidenceLevel).collectAsList(); @@ -75,15 +73,13 @@ public void givenCitationsWithNullDestinationDocumentId_whenProcessed_thenEmptyD @Test @DisplayName("Processing returns empty dataset for input without confidence level") public void givenCitationsWithNullConfidenceLevel_whenProcessed_thenEmptyDataSetIsReturned() { - AvroDataFrameSupport avroDataFrameSupport = new AvroDataFrameSupport(spark()); List citationEntries = Collections.singletonList( createCitationEntry("DestinationDocumentId", null) ); - Citations citations = createCitations("DocumentId", citationEntries); + Citations citations = CitationRelationExporterTestUtils.createCitations("DocumentId", citationEntries); UserDefinedFunction isValidConfidenceLevel = udf((UDF1) confidenceLevel -> true, DataTypes.BooleanType); - Dataset citationsDF = avroDataFrameSupport.createDataFrame(Collections.singletonList(citations), - Citations.SCHEMA$); + Dataset citationsDF = CitationRelationExporterTestUtils.createDataFrame(spark(), Collections.singletonList(citations)); List results = processCitations(citationsDF, isValidConfidenceLevel).collectAsList(); @@ -93,15 +89,13 @@ public void givenCitationsWithNullConfidenceLevel_whenProcessed_thenEmptyDataSet @Test @DisplayName("Processing returns empty dataset for input with invalid confidence level") public void givenCitationsWithConfidenceLevelBelowThreshold_whenProcessed_thenEmptyDataSetIsReturned() { - AvroDataFrameSupport avroDataFrameSupport = new AvroDataFrameSupport(spark()); List citationEntries = Collections.singletonList( createCitationEntry("DestinationDocumentId", 0.5f) ); - Citations citations = createCitations("DocumentId", citationEntries); + Citations citations = CitationRelationExporterTestUtils.createCitations("DocumentId", citationEntries); UserDefinedFunction isValidConfidenceLevel = udf((UDF1) confidenceLevel -> false, DataTypes.BooleanType); - Dataset citationsDF = avroDataFrameSupport.createDataFrame(Collections.singletonList(citations), - Citations.SCHEMA$); + Dataset citationsDF = CitationRelationExporterTestUtils.createDataFrame(spark(), Collections.singletonList(citations)); List results = processCitations(citationsDF, isValidConfidenceLevel).collectAsList(); @@ -111,16 +105,14 @@ public void givenCitationsWithConfidenceLevelBelowThreshold_whenProcessed_thenEm @Test @DisplayName("Processing returns dataset with relations for valid input") public void givenOneCitationsRecord_whenProcessed_thenDataSetWithTwoRelationsIsReturned() { - AvroDataFrameSupport avroDataFrameSupport = new AvroDataFrameSupport(spark()); List citationEntries = Arrays.asList( createCitationEntry("DestinationDocumentId", 0.9f), createCitationEntry("DestinationDocumentId", 0.8f) ); - Citations citations = createCitations("DocumentId", citationEntries); + Citations citations = CitationRelationExporterTestUtils.createCitations("DocumentId", citationEntries); UserDefinedFunction isValidConfidenceLevel = udf((UDF1) confidenceLevel -> confidenceLevel > 0.5, DataTypes.BooleanType); - Dataset citationsDF = avroDataFrameSupport.createDataFrame(Collections.singletonList(citations), - Citations.SCHEMA$); + Dataset citationsDF = CitationRelationExporterTestUtils.createDataFrame(spark(), Collections.singletonList(citations)); List results = processCitations(citationsDF, isValidConfidenceLevel).collectAsList(); @@ -163,13 +155,6 @@ public void givenRelations_whenCreatingReportEntries_thenReportEntriesAreReturne assertThat(results, hasItem(ReportEntryFactory.createCounterReportEntry("processing.citationMatching.relation.iscitedby.docs", 2))); } - private static Citations createCitations(String documentId, List citationEntries) { - return Citations.newBuilder() - .setDocumentId(documentId) - .setCitations(new GenericData.Array<>(Citations.SCHEMA$.getField("citations").schema(), citationEntries)) - .build(); - } - private static CitationEntry createCitationEntry(String destinationDocumentId, Float confidenceLevel) { return CitationEntry.newBuilder() .setPosition(0) diff --git a/iis-wf/iis-wf-referenceextraction/src/test/java/eu/dnetlib/iis/wf/referenceextraction/project/tara/TaraReferenceExtractionIOUtilsTest.java b/iis-wf/iis-wf-referenceextraction/src/test/java/eu/dnetlib/iis/wf/referenceextraction/project/tara/TaraReferenceExtractionIOUtilsTest.java index 620f516f9..658d91347 100644 --- a/iis-wf/iis-wf-referenceextraction/src/test/java/eu/dnetlib/iis/wf/referenceextraction/project/tara/TaraReferenceExtractionIOUtilsTest.java +++ b/iis-wf/iis-wf-referenceextraction/src/test/java/eu/dnetlib/iis/wf/referenceextraction/project/tara/TaraReferenceExtractionIOUtilsTest.java @@ -1,22 +1,31 @@ package eu.dnetlib.iis.wf.referenceextraction.project.tara; -import eu.dnetlib.iis.common.spark.TestWithSharedSparkSession; -import eu.dnetlib.iis.common.spark.avro.AvroDataFrameSupport; -import eu.dnetlib.iis.referenceextraction.project.schemas.DocumentToProject; +import static eu.dnetlib.iis.wf.referenceextraction.project.tara.TaraReferenceExtractionIOUtils.clearOutput; +import static eu.dnetlib.iis.wf.referenceextraction.project.tara.TaraReferenceExtractionIOUtils.storeInOutput; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.avro.SchemaConverters; +import org.apache.spark.sql.types.StructType; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; -import java.io.IOException; -import java.util.Collections; -import java.util.List; - -import static eu.dnetlib.iis.wf.referenceextraction.project.tara.TaraReferenceExtractionIOUtils.*; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.mockito.Mockito.*; +import eu.dnetlib.iis.common.spark.TestWithSharedSparkSession; +import eu.dnetlib.iis.referenceextraction.project.schemas.DocumentToProject; +import eu.dnetlib.iis.wf.referenceextraction.project.tara.TaraReferenceExtractionIOUtils.AvroDataStoreWriter; +import eu.dnetlib.iis.wf.referenceextraction.project.tara.TaraReferenceExtractionIOUtils.OutputCleaner; public class TaraReferenceExtractionIOUtilsTest extends TestWithSharedSparkSession { @@ -41,14 +50,12 @@ public void storeInOutputShouldRunProperly() { .setConfidenceLevel(1.0f) .build(); List documentToProjectList = Collections.singletonList(documentToProject); - Dataset documentToProjectDF = new AvroDataFrameSupport(spark()).createDataFrame( - documentToProjectList, - DocumentToProject.SCHEMA$); + Dataset documentToProjectDF = createDataFrame(documentToProjectList); AvroDataStoreWriter writer = mock(AvroDataStoreWriter.class); // when storeInOutput(documentToProjectDF, "path/to/output", writer); - + // then ArgumentCaptor> dataFrameCaptor = ArgumentCaptor.forClass(Dataset.class); verify(writer, atLeastOnce()).write(dataFrameCaptor.capture(), @@ -64,4 +71,18 @@ public void storeInOutputShouldRunProperly() { assertEquals(documentToProject.getConfidenceLevel(), documentToProjectRow.getAs("confidenceLevel"), 1e-3); assertNull(documentToProjectRow.getAs("textsnippet")); } + + private Dataset createDataFrame(List inputList) { + List dataFrameList = new ArrayList<>(inputList.size()); + for (DocumentToProject input : inputList) { + dataFrameList.add(RowFactory.create(input.getDocumentId(), input.getProjectId(), input.getConfidenceLevel(), + input.getTextsnippet())); + } + + Dataset result = spark().createDataFrame(dataFrameList, + (StructType) SchemaConverters.toSqlType(DocumentToProject.SCHEMA$).dataType()); + + return result; + } + } \ No newline at end of file diff --git a/iis-wf/iis-wf-referenceextraction/src/test/java/eu/dnetlib/iis/wf/referenceextraction/project/tara/TaraReferenceExtractionUtilsTest.java b/iis-wf/iis-wf-referenceextraction/src/test/java/eu/dnetlib/iis/wf/referenceextraction/project/tara/TaraReferenceExtractionUtilsTest.java index 82b89a3a1..d602048a6 100644 --- a/iis-wf/iis-wf-referenceextraction/src/test/java/eu/dnetlib/iis/wf/referenceextraction/project/tara/TaraReferenceExtractionUtilsTest.java +++ b/iis-wf/iis-wf-referenceextraction/src/test/java/eu/dnetlib/iis/wf/referenceextraction/project/tara/TaraReferenceExtractionUtilsTest.java @@ -1,51 +1,48 @@ package eu.dnetlib.iis.wf.referenceextraction.project.tara; -import eu.dnetlib.iis.common.spark.TestWithSharedSparkSession; -import eu.dnetlib.iis.common.spark.avro.AvroDataFrameSupport; -import eu.dnetlib.iis.common.spark.pipe.PipeExecutionEnvironment; -import eu.dnetlib.iis.metadataextraction.schemas.DocumentText; -import eu.dnetlib.iis.referenceextraction.project.schemas.DocumentToProject; -import eu.dnetlib.iis.transformers.metadatamerger.schemas.ExtractedDocumentMetadataMergedWithOriginal; -import eu.dnetlib.iis.transformers.metadatamerger.schemas.PublicationType; -import org.apache.spark.SparkFiles; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.RowFactory; -import org.apache.spark.sql.avro.SchemaConverters; -import org.junit.jupiter.api.Test; +import static eu.dnetlib.iis.wf.referenceextraction.project.tara.TaraReferenceExtractionUtils.buildDocumentMetadata; +import static eu.dnetlib.iis.wf.referenceextraction.project.tara.TaraReferenceExtractionUtils.runReferenceExtraction; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.stream.Collectors; -import static eu.dnetlib.iis.wf.referenceextraction.project.tara.TaraReferenceExtractionUtils.buildDocumentMetadata; -import static eu.dnetlib.iis.wf.referenceextraction.project.tara.TaraReferenceExtractionUtils.runReferenceExtraction; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNull; +import org.apache.spark.SparkFiles; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.avro.SchemaConverters; +import org.apache.spark.sql.types.StructType; +import org.junit.jupiter.api.Test; + +import eu.dnetlib.iis.common.spark.TestWithSharedSparkSession; +import eu.dnetlib.iis.common.spark.pipe.PipeExecutionEnvironment; +import eu.dnetlib.iis.metadataextraction.schemas.DocumentText; +import eu.dnetlib.iis.referenceextraction.project.schemas.DocumentToProject; +import eu.dnetlib.iis.transformers.metadatamerger.schemas.ExtractedDocumentMetadataMergedWithOriginal; public class TaraReferenceExtractionUtilsTest extends TestWithSharedSparkSession { @Test public void buildDocumentMetadataShouldRunProperly() { // given - AvroDataFrameSupport avroDataFrameSupport = new AvroDataFrameSupport(spark()); - Dataset documentTextDF = avroDataFrameSupport.createDataFrame( + Dataset documentTextDF = createTextDataFrame( Arrays.asList( createDocumentText("docId-1", "text-1"), createDocumentText("docId-2", "text-2") - ), - DocumentText.SCHEMA$); - Dataset extractedDocumentMetadataMergedWithOriginalDF = avroDataFrameSupport.createDataFrame( + )); + Dataset extractedDocumentMetadataMergedWithOriginalDF = createMetaDataFrame( Arrays.asList( - createExtractedDocumentMetadataMergedWithOriginal("docId-1"), - createExtractedDocumentMetadataMergedWithOriginal("docId-a") - ), - ExtractedDocumentMetadataMergedWithOriginal.SCHEMA$); + createDocumentMetadata("docId-1") + )); // when Dataset resultDF = buildDocumentMetadata(documentTextDF, extractedDocumentMetadataMergedWithOriginalDF); @@ -89,6 +86,20 @@ public void shouldRunReferenceExtraction() throws IOException { assertForDocumentToProject(row, "docId-1", "projId-1", 1.0f); } + private Dataset createTextDataFrame(List inputList) { + List dataFrameList = new ArrayList<>(inputList.size()); + for (DocumentText input : inputList) { + dataFrameList.add(RowFactory.create(input.getId(), input.getText())); + } + return spark().createDataFrame(dataFrameList, + (StructType) SchemaConverters.toSqlType(DocumentText.SCHEMA$).dataType()); + } + + private Dataset createMetaDataFrame(List inputList) { + return spark().createDataFrame(inputList, + (StructType) SchemaConverters.toSqlType(ExtractedDocumentMetadataMergedWithOriginal.SCHEMA$).dataType()); + } + private static DocumentText createDocumentText(String id, String text) { return DocumentText.newBuilder() .setId(id) @@ -96,15 +107,11 @@ private static DocumentText createDocumentText(String id, String text) { .build(); } - private static ExtractedDocumentMetadataMergedWithOriginal createExtractedDocumentMetadataMergedWithOriginal(String id) { - return ExtractedDocumentMetadataMergedWithOriginal.newBuilder() - .setId(id) - .setPublicationType(PublicationType.newBuilder().build()) - .build(); + private static Row createDocumentMetadata(String id) { + return createDocumentMetadata(id, null); } - private static Row createDocumentMetadata(String id, - String text) { + private static Row createDocumentMetadata(String id, String text) { return RowFactory.create(id, null, null, text); } diff --git a/pom.xml b/pom.xml index 99512b7be..0781e40f1 100644 --- a/pom.xml +++ b/pom.xml @@ -100,6 +100,33 @@ + + + org.apache.logging.log4j + log4j-1.2-api + 2.19.0 + + + + org.apache.logging.log4j + log4j-api + 2.19.0 + + + + org.apache.logging.log4j + log4j-core + 2.19.0 + + + + org.apache.logging.log4j + log4j-slf4j-impl + 2.19.0 + + + + + javax.servlet javax.servlet-api @@ -350,7 +378,10 @@ pl.edu.icm.spark-utils spark-utils_2.12 1.0.2 + + @@ -747,6 +778,15 @@ target/test-classes + + org.apache.maven.plugins + maven-plugin-plugin + 3.6.0 + + true + + + org.apache.maven.plugins maven-compiler-plugin @@ -878,16 +918,6 @@ - - - org.apache.maven.plugins - maven-plugin-plugin - [3.2,) - - descriptor - - -