diff --git a/iis-common/pom.xml b/iis-common/pom.xml index 1ec04d597..53c37a3a8 100644 --- a/iis-common/pom.xml +++ b/iis-common/pom.xml @@ -258,7 +258,7 @@ - 2.11.12 + 2.12.14 diff --git a/iis-common/src/main/java/eu/dnetlib/iis/common/counter/NamedCountersAccumulableParam.java b/iis-common/src/main/java/eu/dnetlib/iis/common/counter/NamedCountersAccumulableParam.java deleted file mode 100644 index 88033ea35..000000000 --- a/iis-common/src/main/java/eu/dnetlib/iis/common/counter/NamedCountersAccumulableParam.java +++ /dev/null @@ -1,48 +0,0 @@ -package eu.dnetlib.iis.common.counter; - -import org.apache.spark.AccumulableParam; - -import scala.Tuple2; - -/** - * Spark {@link AccumulableParam} for tracking multiple counter values using {@link NamedCounters}. - * - * @author madryk - */ -public class NamedCountersAccumulableParam implements AccumulableParam> { - - private static final long serialVersionUID = 1L; - - - //------------------------ LOGIC -------------------------- - - /** - * Increments {@link NamedCounters} counter with the name same as the first element of passed incrementValue tuple - * by value defined in the second element of incrementValue tuple. - */ - @Override - public NamedCounters addAccumulator(NamedCounters counters, Tuple2 incrementValue) { - counters.increment(incrementValue._1, incrementValue._2); - return counters; - } - - /** - * Merges two passed {@link NamedCounters}. - */ - @Override - public NamedCounters addInPlace(NamedCounters counters1, NamedCounters counters2) { - for (String counterName2 : counters2.counterNames()) { - counters1.increment(counterName2, counters2.currentValue(counterName2)); - } - return counters1; - } - - /** - * Returns passed initialCounters value without any modifications. - */ - @Override - public NamedCounters zero(NamedCounters initialCounters) { - return initialCounters; - } - -} diff --git a/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDataFrameSupport.scala b/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDataFrameSupport.scala index 5a122bd44..3ad0f7a1d 100644 --- a/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDataFrameSupport.scala +++ b/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDataFrameSupport.scala @@ -26,9 +26,9 @@ class AvroDataFrameSupport(val spark: SparkSession) extends Serializable { * @tparam T Type of elements. * @return DataFrame containing data from the given list. */ - def createDataFrame[T](data: java.util.List[T], avroSchema: Schema): DataFrame = { - createDataFrame(data.asScala, avroSchema) - } +// def createDataFrame[T](data: java.util.List[T], avroSchema: Schema): DataFrame = { +// createDataFrame(data.asScala, avroSchema) +// } /** * Creates a dataframe from a given collection. @@ -38,13 +38,13 @@ class AvroDataFrameSupport(val spark: SparkSession) extends Serializable { * @tparam T Type of elements. * @return DataFrame containing data from the given seq. */ - def createDataFrame[T](data: Seq[T], avroSchema: Schema): DataFrame = { - val rowSchema = SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType] - val encoder = RowEncoder.apply(rowSchema).resolveAndBind() - val deserializer = new AvroDeserializer(avroSchema, rowSchema) - val rows = data.map(record => encoder.fromRow(deserializer.deserialize(record).asInstanceOf[InternalRow])) - spark.createDataFrame(spark.sparkContext.parallelize(rows), rowSchema) - } +// def createDataFrame[T](data: Seq[T], avroSchema: Schema): DataFrame = { +// val rowSchema = SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType] +// val encoder = RowEncoder.apply(rowSchema).resolveAndBind() +// val deserializer = new AvroDeserializer(avroSchema, rowSchema) +// val rows = data.map(record => encoder.fromRow(deserializer.deserialize(record).asInstanceOf[InternalRow])) +// spark.createDataFrame(spark.sparkContext.parallelize(rows), rowSchema) +// } /** * Creates a dataset from given dataframe using kryo encoder. diff --git a/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDatasetSupport.scala b/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDatasetSupport.scala index ffc1abd57..ea39f79c8 100644 --- a/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDatasetSupport.scala +++ b/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDatasetSupport.scala @@ -23,20 +23,20 @@ class AvroDatasetSupport(val spark: SparkSession) extends Serializable { * @tparam T Type of objects in the dataset. * @return DataFrame of objects corresponding to records in the given dataset. */ - def toDF[T <: SpecificRecordBase](ds: Dataset[T], avroSchema: Schema): DataFrame = { - val avroSchemaStr = avroSchema.toString - val rowSchema = SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType] - val encoder = RowEncoder(rowSchema).resolveAndBind() - - object SerializationSupport extends Serializable { - @transient private lazy val deserializer = new AvroDeserializer(new Schema.Parser().parse(avroSchemaStr), rowSchema) - private val rows = ds.rdd.map(record => encoder.fromRow(deserializer.deserialize(record).asInstanceOf[InternalRow])) - - def doToDF(): DataFrame = { - spark.createDataFrame(rows, rowSchema) - } - } - - SerializationSupport.doToDF() - } +// def toDF[T <: SpecificRecordBase](ds: Dataset[T], avroSchema: Schema): DataFrame = { +// val avroSchemaStr = avroSchema.toString +// val rowSchema = SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType] +// val encoder = RowEncoder(rowSchema).resolveAndBind() +// +// object SerializationSupport extends Serializable { +// @transient private lazy val deserializer = new AvroDeserializer(new Schema.Parser().parse(avroSchemaStr), rowSchema) +// private val rows = ds.rdd.map(record => encoder.fromRow(deserializer.deserialize(record).asInstanceOf[InternalRow])) +// +// def doToDF(): DataFrame = { +// spark.createDataFrame(rows, rowSchema) +// } +// } +// +// SerializationSupport.doToDF() +// } } diff --git a/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDatasetWriter.scala b/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDatasetWriter.scala index db4384943..20258d3a9 100644 --- a/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDatasetWriter.scala +++ b/iis-common/src/main/scala/eu/dnetlib/iis/common/spark/avro/AvroDatasetWriter.scala @@ -18,8 +18,8 @@ class AvroDatasetWriter[T <: SpecificRecordBase](ds: Dataset[T]) extends Seriali * @param path Path to the data store. * @param avroSchema Avro schema of the records. */ - def write(path: String, avroSchema: Schema): Unit = { - new AvroDataFrameWriter(new AvroDatasetSupport(ds.sparkSession).toDF(ds, avroSchema)) - .write(path, avroSchema) - } +// def write(path: String, avroSchema: Schema): Unit = { +// new AvroDataFrameWriter(new AvroDatasetSupport(ds.sparkSession).toDF(ds, avroSchema)) +// .write(path, avroSchema) +// } } diff --git a/iis-common/src/test/java/eu/dnetlib/iis/common/counter/NamedCountersAccumulableParamTest.java b/iis-common/src/test/java/eu/dnetlib/iis/common/counter/NamedCountersAccumulableParamTest.java deleted file mode 100644 index faee84963..000000000 --- a/iis-common/src/test/java/eu/dnetlib/iis/common/counter/NamedCountersAccumulableParamTest.java +++ /dev/null @@ -1,92 +0,0 @@ -package eu.dnetlib.iis.common.counter; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import scala.Tuple2; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.is; -import static org.junit.jupiter.api.Assertions.assertSame; - -/** - * @author madryk - */ -public class NamedCountersAccumulableParamTest { - - private NamedCountersAccumulableParam countersAccumulableParam = new NamedCountersAccumulableParam(); - - - private String counterName1 = "COUNTER_1"; - - private String counterName2 = "COUNTER_2"; - - private String counterName3 = "COUNTER_3"; - - - private NamedCounters namedCounters1 = new NamedCounters(new String[] {counterName1, counterName2}); - - private NamedCounters namedCounters2 = new NamedCounters(new String[] {counterName2, counterName3}); - - - @BeforeEach - public void setup() { - namedCounters1.increment(counterName1, 3L); - namedCounters1.increment(counterName2, 1L); - namedCounters2.increment(counterName2, 7L); - namedCounters2.increment(counterName3, 5L); - } - - - //------------------------ TESTS -------------------------- - - @Test - public void addAccumulator() { - - // execute - - NamedCounters retNamedCounters = countersAccumulableParam.addAccumulator(namedCounters1, new Tuple2<>(counterName2, 9L)); - - // assert - - assertSame(retNamedCounters, namedCounters1); - - assertThat(retNamedCounters.counterNames(), containsInAnyOrder(counterName1, counterName2)); - assertThat(retNamedCounters.currentValue(counterName1), is(3L)); - assertThat(retNamedCounters.currentValue(counterName2), is(10L)); - } - - @Test - public void addInPlace() { - - // execute - - NamedCounters retNamedCounters = countersAccumulableParam.addInPlace(namedCounters1, namedCounters2); - - // assert - - assertSame(retNamedCounters, namedCounters1); - - assertThat(retNamedCounters.counterNames(), containsInAnyOrder(counterName1, counterName2, counterName3)); - assertThat(retNamedCounters.currentValue(counterName1), is(3L)); - assertThat(retNamedCounters.currentValue(counterName2), is(8L)); - assertThat(retNamedCounters.currentValue(counterName3), is(5L)); - } - - @Test - public void zero() { - - // execute - - NamedCounters retNamedCounters = countersAccumulableParam.zero(namedCounters1); - - // assert - - assertSame(retNamedCounters, namedCounters1); - - assertThat(retNamedCounters.counterNames(), containsInAnyOrder(counterName1, counterName2)); - assertThat(retNamedCounters.currentValue(counterName1), is(3L)); - assertThat(retNamedCounters.currentValue(counterName2), is(1L)); - } - -} diff --git a/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameSupportTest.java b/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameSupportTest.java index 85a07bc42..5e28202b3 100644 --- a/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameSupportTest.java +++ b/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDataFrameSupportTest.java @@ -27,22 +27,22 @@ public void beforeEach() { support = new AvroDataFrameSupport(spark()); } - @Test - @DisplayName("Avro dataframe support creates dataframe from collection of avro type") - public void givenACollectionOfAvroType_whenConvertedToDataFrame_thenProperDataFrameIsReturned() { - Person person = Person.newBuilder().setId(1).setName("name").setAge(2).build(); - List data = Collections.singletonList(person); - - Dataset result = support.createDataFrame(data, Person.SCHEMA$); - - assertSchemasEqualIgnoringNullability(Person.SCHEMA$, result.schema()); - List rows = result.collectAsList(); - assertEquals(1, rows.size()); - Row row = rows.get(0); - assertEquals(person.getId(), row.getAs("id")); - assertEquals(person.getName(), row.getAs("name")); - assertEquals(person.getAge(), row.getAs("age")); - } +// @Test +// @DisplayName("Avro dataframe support creates dataframe from collection of avro type") +// public void givenACollectionOfAvroType_whenConvertedToDataFrame_thenProperDataFrameIsReturned() { +// Person person = Person.newBuilder().setId(1).setName("name").setAge(2).build(); +// List data = Collections.singletonList(person); +// +// Dataset result = support.createDataFrame(data, Person.SCHEMA$); +// +// assertSchemasEqualIgnoringNullability(Person.SCHEMA$, result.schema()); +// List rows = result.collectAsList(); +// assertEquals(1, rows.size()); +// Row row = rows.get(0); +// assertEquals(person.getId(), row.getAs("id")); +// assertEquals(person.getName(), row.getAs("name")); +// assertEquals(person.getAge(), row.getAs("age")); +// } @Test @DisplayName("Avro dataframe support converts dataframe of avro type to dataset of avro type") diff --git a/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDatasetSupportTest.java b/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDatasetSupportTest.java index 29e196bce..e57403116 100644 --- a/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDatasetSupportTest.java +++ b/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDatasetSupportTest.java @@ -24,25 +24,25 @@ public void beforeEach() { super.beforeEach(); } - @Test - @DisplayName("Avro dataset support converts dataset of avro type to dataframe of avro type") - public void givenDatasetOfAvroType_whenConvertedToDataFrame_thenProperDataFrameIsReturned() { - Person person = Person.newBuilder().setId(1).setName("name").setAge(2).build(); - Dataset ds = spark().createDataset( - Collections.singletonList(person), - Encoders.kryo(Person.class) - ); - - Dataset result = new AvroDatasetSupport(spark()).toDF(ds, Person.SCHEMA$); - - assertSchemasEqualIgnoringNullability(Person.SCHEMA$, result.schema()); - List rows = result.collectAsList(); - assertEquals(1, rows.size()); - Row row = rows.get(0); - assertEquals(person.getId(), row.getAs("id")); - assertEquals(person.getName(), row.getAs("name")); - assertEquals(person.getAge(), row.getAs("age")); - } +// @Test +// @DisplayName("Avro dataset support converts dataset of avro type to dataframe of avro type") +// public void givenDatasetOfAvroType_whenConvertedToDataFrame_thenProperDataFrameIsReturned() { +// Person person = Person.newBuilder().setId(1).setName("name").setAge(2).build(); +// Dataset ds = spark().createDataset( +// Collections.singletonList(person), +// Encoders.kryo(Person.class) +// ); +// +// Dataset result = new AvroDatasetSupport(spark()).toDF(ds, Person.SCHEMA$); +// +// assertSchemasEqualIgnoringNullability(Person.SCHEMA$, result.schema()); +// List rows = result.collectAsList(); +// assertEquals(1, rows.size()); +// Row row = rows.get(0); +// assertEquals(person.getId(), row.getAs("id")); +// assertEquals(person.getName(), row.getAs("name")); +// assertEquals(person.getAge(), row.getAs("age")); +// } public static void assertSchemasEqualIgnoringNullability(Schema avroSchema, StructType sqlSchema) { assertEquals(SchemaConverters.toSqlType(avroSchema).dataType().asNullable(), sqlSchema.asNullable()); diff --git a/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDatasetWriterTest.java b/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDatasetWriterTest.java index 64966505a..beab31aa2 100644 --- a/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDatasetWriterTest.java +++ b/iis-common/src/test/java/eu/dnetlib/iis/common/spark/avro/AvroDatasetWriterTest.java @@ -24,21 +24,21 @@ public void beforeEach() { super.beforeEach(); } - @Test - @DisplayName("Avro dataset writer writes dataset of avro type") - public void givenDatasetOfAvroType_whenWrittenToOutput_thenWriteSucceeds(@TempDir Path workingDir) throws IOException { - Path outputDir = workingDir.resolve("output"); - Person person = Person.newBuilder().setId(1).setName("name").setAge(2).build(); - Dataset ds = spark().createDataset( - Collections.singletonList(person), - Encoders.kryo(Person.class) - ); - - new AvroDatasetWriter<>(ds).write(outputDir.toString(), Person.SCHEMA$); - - List personList = AvroTestUtils.readLocalAvroDataStore(outputDir.toString()); - assertEquals(1, personList.size()); - Person personRead = personList.get(0); - assertEquals(person, personRead); - } +// @Test +// @DisplayName("Avro dataset writer writes dataset of avro type") +// public void givenDatasetOfAvroType_whenWrittenToOutput_thenWriteSucceeds(@TempDir Path workingDir) throws IOException { +// Path outputDir = workingDir.resolve("output"); +// Person person = Person.newBuilder().setId(1).setName("name").setAge(2).build(); +// Dataset ds = spark().createDataset( +// Collections.singletonList(person), +// Encoders.kryo(Person.class) +// ); +// +// new AvroDatasetWriter<>(ds).write(outputDir.toString(), Person.SCHEMA$); +// +// List personList = AvroTestUtils.readLocalAvroDataStore(outputDir.toString()); +// assertEquals(1, personList.size()); +// Person personRead = personList.get(0); +// assertEquals(person, personRead); +// } } \ No newline at end of file diff --git a/iis-wf/iis-wf-export-actionmanager/src/main/java/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/CitationRelationExporterIOUtils.java b/iis-wf/iis-wf-export-actionmanager/src/main/java/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/CitationRelationExporterIOUtils.java index ce58552d3..51fa848c6 100644 --- a/iis-wf/iis-wf-export-actionmanager/src/main/java/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/CitationRelationExporterIOUtils.java +++ b/iis-wf/iis-wf-export-actionmanager/src/main/java/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/CitationRelationExporterIOUtils.java @@ -3,9 +3,11 @@ import eu.dnetlib.iis.common.java.io.HdfsUtils; import eu.dnetlib.iis.common.schemas.ReportEntry; import eu.dnetlib.iis.common.spark.avro.AvroDataFrameReader; -import eu.dnetlib.iis.common.spark.avro.AvroDatasetWriter; +import eu.dnetlib.iis.common.spark.avro.AvroDataFrameWriter; import eu.dnetlib.iis.common.utils.RDDUtils; import eu.dnetlib.iis.export.schemas.Citations; +import pl.edu.icm.sparkutils.avro.SparkAvroSaver; + import org.apache.hadoop.io.Text; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.PairFunction; @@ -24,7 +26,7 @@ public final class CitationRelationExporterIOUtils { private static final Logger logger = LoggerFactory.getLogger(CitationRelationExporterIOUtils.class); - + private CitationRelationExporterIOUtils() { } @@ -80,14 +82,17 @@ private static JavaPairRDD datasetToPairRDD(Dataset serialized .mapToPair((PairFunction) content -> new Tuple2<>(new Text(), content)); } - public static void storeReportEntries(SparkSession spark, + public static void storeReportEntries(SparkAvroSaver avroSaver, Dataset reportEntries, String outputReportPath) { - storeReportEntries(reportEntries, outputReportPath, (ds, path) -> - new AvroDatasetWriter<>(ds).write(path, ReportEntry.SCHEMA$)); + storeReportEntries(avroSaver, reportEntries, outputReportPath, (ds, path) -> + // FIXME avoiding relying on writing Dataset which apparently is not easily achievable in spark3 + // due to AvroDatasetSupport scala functions referring to classes which were made private in spark3 + avroSaver.saveJavaRDD(ds.javaRDD(), ReportEntry.SCHEMA$, path)); + // new AvroDataFrameWriter(ds).write(path, ReportEntry.SCHEMA$)); } - public static void storeReportEntries(Dataset reportEntries, + public static void storeReportEntries(SparkAvroSaver avroSaver, Dataset reportEntries, String outputReportPath, BiConsumer, String> writeFn) { logger.info("Storing report data in path {}.", outputReportPath); diff --git a/iis-wf/iis-wf-export-actionmanager/src/main/java/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/CitationRelationExporterJob.java b/iis-wf/iis-wf-export-actionmanager/src/main/java/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/CitationRelationExporterJob.java index 8748dc3be..d98794016 100644 --- a/iis-wf/iis-wf-export-actionmanager/src/main/java/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/CitationRelationExporterJob.java +++ b/iis-wf/iis-wf-export-actionmanager/src/main/java/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/CitationRelationExporterJob.java @@ -6,6 +6,8 @@ import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.iis.common.schemas.ReportEntry; import eu.dnetlib.iis.wf.export.actionmanager.entity.ConfidenceLevelUtils; +import pl.edu.icm.sparkutils.avro.SparkAvroSaver; + import org.apache.hadoop.io.Text; import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; @@ -22,13 +24,16 @@ import static org.apache.spark.sql.functions.udf; public class CitationRelationExporterJob { - + + private static final SparkAvroSaver avroSaver = new SparkAvroSaver(); + private CitationRelationExporterJob() { } private static final Logger logger = LoggerFactory.getLogger(CitationRelationExporterJob.class); public static void main(String[] args) { + JobParameters params = new JobParameters(); JCommander jcommander = new JCommander(params); jcommander.parse(args); @@ -52,7 +57,7 @@ public static void main(String[] args) { storeSerializedActions(spark, serializedActions, params.outputRelationPath); Dataset reportEntries = relationsToReportEntries(spark, relations); - storeReportEntries(spark, reportEntries, params.outputReportPath); + storeReportEntries(avroSaver, reportEntries, params.outputReportPath); }); } diff --git a/iis-wf/iis-wf-export-actionmanager/src/main/resources/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/oozie_app/workflow.xml b/iis-wf/iis-wf-export-actionmanager/src/main/resources/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/oozie_app/workflow.xml index 3b63836e5..d242d7cf7 100644 --- a/iis-wf/iis-wf-export-actionmanager/src/main/resources/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/oozie_app/workflow.xml +++ b/iis-wf/iis-wf-export-actionmanager/src/main/resources/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/oozie_app/workflow.xml @@ -85,7 +85,7 @@ - yarn-cluster + yarn cluster citation relation spark exporter eu.dnetlib.iis.wf.export.actionmanager.relation.citation.CitationRelationExporterJob diff --git a/iis-wf/iis-wf-export-actionmanager/src/test/java/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/CitationRelationExporterIOUtilsTest.java b/iis-wf/iis-wf-export-actionmanager/src/test/java/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/CitationRelationExporterIOUtilsTest.java index 4e494f168..64f2441f2 100644 --- a/iis-wf/iis-wf-export-actionmanager/src/test/java/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/CitationRelationExporterIOUtilsTest.java +++ b/iis-wf/iis-wf-export-actionmanager/src/test/java/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/CitationRelationExporterIOUtilsTest.java @@ -1,7 +1,23 @@ package eu.dnetlib.iis.wf.export.actionmanager.relation.citation; -import eu.dnetlib.iis.common.schemas.ReportEntry; -import eu.dnetlib.iis.common.spark.TestWithSharedSparkSession; +import static eu.dnetlib.iis.wf.export.actionmanager.relation.citation.CitationRelationExporterIOUtils.clearOutput; +import static eu.dnetlib.iis.wf.export.actionmanager.relation.citation.CitationRelationExporterIOUtils.readCitations; +import static eu.dnetlib.iis.wf.export.actionmanager.relation.citation.CitationRelationExporterIOUtils.storeReportEntries; +import static eu.dnetlib.iis.wf.export.actionmanager.relation.citation.CitationRelationExporterIOUtils.storeSerializedActions; +import static org.hamcrest.CoreMatchers.sameInstance; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Collections; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Function; + import org.apache.hadoop.io.Text; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.sql.Dataset; @@ -12,18 +28,11 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.ArgumentCaptor; import org.mockito.junit.jupiter.MockitoExtension; -import scala.Tuple2; -import java.util.Collections; -import java.util.function.BiConsumer; -import java.util.function.Consumer; -import java.util.function.Function; - -import static eu.dnetlib.iis.wf.export.actionmanager.relation.citation.CitationRelationExporterIOUtils.*; -import static org.hamcrest.CoreMatchers.sameInstance; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.Mockito.*; +import eu.dnetlib.iis.common.schemas.ReportEntry; +import eu.dnetlib.iis.common.spark.TestWithSharedSparkSession; +import pl.edu.icm.sparkutils.avro.SparkAvroSaver; +import scala.Tuple2; @ExtendWith(MockitoExtension.class) class CitationRelationExporterIOUtilsTest extends TestWithSharedSparkSession { @@ -70,12 +79,13 @@ public void givenMockWriteFunction_whenSerializedActionsAreStored_thenMockIsUsed @Test @DisplayName("Report entries are stored in output") public void givenMockWriteFunction_whenReportEntriesAreStored_thenMockIsUsed() { + SparkAvroSaver avroSaver = mock(SparkAvroSaver.class); BiConsumer, String> writeFn = mock(BiConsumer.class); Dataset repartitionedReportEntries = mock(Dataset.class); Dataset reportEntries = mock(Dataset.class); when(reportEntries.repartition(1)).thenReturn(repartitionedReportEntries); - storeReportEntries(reportEntries, "path/to/report", writeFn); + storeReportEntries(avroSaver, reportEntries, "path/to/report", writeFn); verify(writeFn, atLeastOnce()).accept(repartitionedReportEntries, "path/to/report"); } diff --git a/iis-wf/iis-wf-export-actionmanager/src/test/java/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/CitationRelationExporterJobTest.java b/iis-wf/iis-wf-export-actionmanager/src/test/java/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/CitationRelationExporterJobTest.java index bb5b9c76f..816914a9b 100644 --- a/iis-wf/iis-wf-export-actionmanager/src/test/java/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/CitationRelationExporterJobTest.java +++ b/iis-wf/iis-wf-export-actionmanager/src/test/java/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/CitationRelationExporterJobTest.java @@ -1,5 +1,20 @@ package eu.dnetlib.iis.wf.export.actionmanager.relation.citation; +import static eu.dnetlib.iis.wf.export.actionmanager.relation.citation.Matchers.matchingAtomicAction; +import static org.hamcrest.CoreMatchers.hasItem; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.io.Text; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + import eu.dnetlib.dhp.schema.action.AtomicAction; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.iis.common.citations.schemas.CitationEntry; @@ -8,30 +23,14 @@ import eu.dnetlib.iis.common.report.ReportEntryFactory; import eu.dnetlib.iis.common.schemas.ReportEntry; import eu.dnetlib.iis.common.spark.TestWithSharedSparkSession; -import eu.dnetlib.iis.common.spark.avro.AvroDataFrameSupport; import eu.dnetlib.iis.common.spark.avro.AvroDataFrameWriter; import eu.dnetlib.iis.common.spark.avro.AvroDatasetReader; import eu.dnetlib.iis.export.schemas.Citations; import eu.dnetlib.iis.wf.export.actionmanager.AtomicActionDeserializationUtils; import eu.dnetlib.iis.wf.export.actionmanager.OafConstants; import eu.dnetlib.iis.wf.export.actionmanager.module.BuilderModuleHelper; -import org.apache.avro.generic.GenericData; -import org.apache.hadoop.io.Text; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; import scala.Tuple2; -import java.io.IOException; -import java.nio.file.Path; -import java.util.Collections; -import java.util.List; - -import static eu.dnetlib.iis.wf.export.actionmanager.relation.citation.Matchers.matchingAtomicAction; -import static org.hamcrest.CoreMatchers.hasItem; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.jupiter.api.Assertions.assertEquals; - class CitationRelationExporterJobTest extends TestWithSharedSparkSession { @Test @@ -39,17 +38,16 @@ class CitationRelationExporterJobTest extends TestWithSharedSparkSession { public void givenInputCitationsPath_whenRun_thenSerializedAtomicActionsAndReportsAreCreated(@TempDir Path rootInputPath, @TempDir Path rootOutputPath) throws IOException { List citationsList = Collections.singletonList( - createCitations( + CitationRelationExporterTestUtils.createCitations( "DocumentId", Collections.singletonList( createCitationEntry("DestinationDocumentId", 1.0f) )) ); Path inputCitationsPath = rootInputPath.resolve("citations"); - new AvroDataFrameWriter( - new AvroDataFrameSupport(spark()).createDataFrame(citationsList, Citations.SCHEMA$)).write( - inputCitationsPath.toString() - ); + + new AvroDataFrameWriter(CitationRelationExporterTestUtils.createDataFrame(spark(), citationsList)).write(inputCitationsPath.toString()); + float trustLevelThreshold = 0.5f; Path outputRelationPath = rootOutputPath.resolve("output"); Path outputReportPath = rootOutputPath.resolve("report"); @@ -91,16 +89,10 @@ public void givenInputCitationsPath_whenRun_thenSerializedAtomicActionsAndReport )); } - private static Citations createCitations(String documentId, List citationEntries) { - return Citations.newBuilder() - .setDocumentId(documentId) - .setCitations(new GenericData.Array<>(Citations.SCHEMA$.getField("citations").schema(), citationEntries)) - .build(); - } - private static CitationEntry createCitationEntry(String destinationDocumentId, Float confidenceLevel) { return CitationEntry.newBuilder() .setPosition(0) + .setRawText("irrelevant") .setDestinationDocumentId(destinationDocumentId) .setConfidenceLevel(confidenceLevel) .setExternalDestinationDocumentIds(Collections.emptyMap()) diff --git a/iis-wf/iis-wf-export-actionmanager/src/test/java/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/CitationRelationExporterUtilsTest.java b/iis-wf/iis-wf-export-actionmanager/src/test/java/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/CitationRelationExporterUtilsTest.java index 9da1cab4d..94f8a968b 100644 --- a/iis-wf/iis-wf-export-actionmanager/src/test/java/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/CitationRelationExporterUtilsTest.java +++ b/iis-wf/iis-wf-export-actionmanager/src/test/java/eu/dnetlib/iis/wf/export/actionmanager/relation/citation/CitationRelationExporterUtilsTest.java @@ -1,16 +1,21 @@ package eu.dnetlib.iis.wf.export.actionmanager.relation.citation; -import eu.dnetlib.dhp.schema.oaf.Relation; -import eu.dnetlib.iis.common.citations.schemas.CitationEntry; -import eu.dnetlib.iis.common.report.ReportEntryFactory; -import eu.dnetlib.iis.common.schemas.ReportEntry; -import eu.dnetlib.iis.common.spark.TestWithSharedSparkSession; -import eu.dnetlib.iis.common.spark.avro.AvroDataFrameSupport; -import eu.dnetlib.iis.export.schemas.Citations; -import eu.dnetlib.iis.wf.export.actionmanager.AtomicActionDeserializationUtils; -import eu.dnetlib.iis.wf.export.actionmanager.OafConstants; -import eu.dnetlib.iis.wf.export.actionmanager.module.BuilderModuleHelper; -import org.apache.avro.generic.GenericData; +import static eu.dnetlib.iis.wf.export.actionmanager.relation.citation.CitationRelationExporterUtils.processCitations; +import static eu.dnetlib.iis.wf.export.actionmanager.relation.citation.CitationRelationExporterUtils.relationsToReportEntries; +import static eu.dnetlib.iis.wf.export.actionmanager.relation.citation.CitationRelationExporterUtils.relationsToSerializedActions; +import static eu.dnetlib.iis.wf.export.actionmanager.relation.citation.Matchers.matchingRelation; +import static org.apache.spark.sql.functions.udf; +import static org.hamcrest.CoreMatchers.hasItem; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + import org.apache.hadoop.io.Text; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; @@ -22,19 +27,15 @@ import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; -import java.io.IOException; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - -import static eu.dnetlib.iis.wf.export.actionmanager.relation.citation.CitationRelationExporterUtils.*; -import static eu.dnetlib.iis.wf.export.actionmanager.relation.citation.Matchers.matchingRelation; -import static org.apache.spark.sql.functions.udf; -import static org.hamcrest.CoreMatchers.hasItem; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.iis.common.citations.schemas.CitationEntry; +import eu.dnetlib.iis.common.report.ReportEntryFactory; +import eu.dnetlib.iis.common.schemas.ReportEntry; +import eu.dnetlib.iis.common.spark.TestWithSharedSparkSession; +import eu.dnetlib.iis.export.schemas.Citations; +import eu.dnetlib.iis.wf.export.actionmanager.AtomicActionDeserializationUtils; +import eu.dnetlib.iis.wf.export.actionmanager.OafConstants; +import eu.dnetlib.iis.wf.export.actionmanager.module.BuilderModuleHelper; class CitationRelationExporterUtilsTest extends TestWithSharedSparkSession { @@ -44,11 +45,10 @@ public class ProcessCitationsTest { @Test @DisplayName("Processing returns empty dataset for input with empty citation entries") public void givenCitationsWithEmptyCitationEntries_whenProcessed_thenEmptyDataSetIsReturned() { - AvroDataFrameSupport avroDataFrameSupport = new AvroDataFrameSupport(spark()); UserDefinedFunction isValidConfidenceLevel = udf((UDF1) confidenceLevel -> true, DataTypes.BooleanType); - List results = processCitations(avroDataFrameSupport.createDataFrame(Collections.emptyList(), Citations.SCHEMA$), + List results = processCitations(CitationRelationExporterTestUtils.createDataFrame(spark(), Collections.emptyList()), isValidConfidenceLevel).collectAsList(); assertTrue(results.isEmpty()); @@ -57,15 +57,13 @@ public void givenCitationsWithEmptyCitationEntries_whenProcessed_thenEmptyDataSe @Test @DisplayName("Processing returns empty dataset for input without destination document id") public void givenCitationsWithNullDestinationDocumentId_whenProcessed_thenEmptyDataSetIsReturned() { - AvroDataFrameSupport avroDataFrameSupport = new AvroDataFrameSupport(spark()); List citationEntries = Collections.singletonList( createCitationEntry(null, 0.5f) ); - Citations citations = createCitations("DocumentId", citationEntries); + Citations citations = CitationRelationExporterTestUtils.createCitations("DocumentId", citationEntries); UserDefinedFunction isValidConfidenceLevel = udf((UDF1) confidenceLevel -> true, DataTypes.BooleanType); - Dataset citationsDF = avroDataFrameSupport.createDataFrame(Collections.singletonList(citations), - Citations.SCHEMA$); + Dataset citationsDF = CitationRelationExporterTestUtils.createDataFrame(spark(), Collections.singletonList(citations)); List results = processCitations(citationsDF, isValidConfidenceLevel).collectAsList(); @@ -75,15 +73,13 @@ public void givenCitationsWithNullDestinationDocumentId_whenProcessed_thenEmptyD @Test @DisplayName("Processing returns empty dataset for input without confidence level") public void givenCitationsWithNullConfidenceLevel_whenProcessed_thenEmptyDataSetIsReturned() { - AvroDataFrameSupport avroDataFrameSupport = new AvroDataFrameSupport(spark()); List citationEntries = Collections.singletonList( createCitationEntry("DestinationDocumentId", null) ); - Citations citations = createCitations("DocumentId", citationEntries); + Citations citations = CitationRelationExporterTestUtils.createCitations("DocumentId", citationEntries); UserDefinedFunction isValidConfidenceLevel = udf((UDF1) confidenceLevel -> true, DataTypes.BooleanType); - Dataset citationsDF = avroDataFrameSupport.createDataFrame(Collections.singletonList(citations), - Citations.SCHEMA$); + Dataset citationsDF = CitationRelationExporterTestUtils.createDataFrame(spark(), Collections.singletonList(citations)); List results = processCitations(citationsDF, isValidConfidenceLevel).collectAsList(); @@ -93,15 +89,13 @@ public void givenCitationsWithNullConfidenceLevel_whenProcessed_thenEmptyDataSet @Test @DisplayName("Processing returns empty dataset for input with invalid confidence level") public void givenCitationsWithConfidenceLevelBelowThreshold_whenProcessed_thenEmptyDataSetIsReturned() { - AvroDataFrameSupport avroDataFrameSupport = new AvroDataFrameSupport(spark()); List citationEntries = Collections.singletonList( createCitationEntry("DestinationDocumentId", 0.5f) ); - Citations citations = createCitations("DocumentId", citationEntries); + Citations citations = CitationRelationExporterTestUtils.createCitations("DocumentId", citationEntries); UserDefinedFunction isValidConfidenceLevel = udf((UDF1) confidenceLevel -> false, DataTypes.BooleanType); - Dataset citationsDF = avroDataFrameSupport.createDataFrame(Collections.singletonList(citations), - Citations.SCHEMA$); + Dataset citationsDF = CitationRelationExporterTestUtils.createDataFrame(spark(), Collections.singletonList(citations)); List results = processCitations(citationsDF, isValidConfidenceLevel).collectAsList(); @@ -111,16 +105,14 @@ public void givenCitationsWithConfidenceLevelBelowThreshold_whenProcessed_thenEm @Test @DisplayName("Processing returns dataset with relations for valid input") public void givenOneCitationsRecord_whenProcessed_thenDataSetWithTwoRelationsIsReturned() { - AvroDataFrameSupport avroDataFrameSupport = new AvroDataFrameSupport(spark()); List citationEntries = Arrays.asList( createCitationEntry("DestinationDocumentId", 0.9f), createCitationEntry("DestinationDocumentId", 0.8f) ); - Citations citations = createCitations("DocumentId", citationEntries); + Citations citations = CitationRelationExporterTestUtils.createCitations("DocumentId", citationEntries); UserDefinedFunction isValidConfidenceLevel = udf((UDF1) confidenceLevel -> confidenceLevel > 0.5, DataTypes.BooleanType); - Dataset citationsDF = avroDataFrameSupport.createDataFrame(Collections.singletonList(citations), - Citations.SCHEMA$); + Dataset citationsDF = CitationRelationExporterTestUtils.createDataFrame(spark(), Collections.singletonList(citations)); List results = processCitations(citationsDF, isValidConfidenceLevel).collectAsList(); @@ -163,13 +155,6 @@ public void givenRelations_whenCreatingReportEntries_thenReportEntriesAreReturne assertThat(results, hasItem(ReportEntryFactory.createCounterReportEntry("processing.citationMatching.relation.iscitedby.docs", 2))); } - private static Citations createCitations(String documentId, List citationEntries) { - return Citations.newBuilder() - .setDocumentId(documentId) - .setCitations(new GenericData.Array<>(Citations.SCHEMA$.getField("citations").schema(), citationEntries)) - .build(); - } - private static CitationEntry createCitationEntry(String destinationDocumentId, Float confidenceLevel) { return CitationEntry.newBuilder() .setPosition(0) diff --git a/iis-wf/iis-wf-referenceextraction/src/test/java/eu/dnetlib/iis/wf/referenceextraction/project/tara/TaraReferenceExtractionIOUtilsTest.java b/iis-wf/iis-wf-referenceextraction/src/test/java/eu/dnetlib/iis/wf/referenceextraction/project/tara/TaraReferenceExtractionIOUtilsTest.java index 620f516f9..658d91347 100644 --- a/iis-wf/iis-wf-referenceextraction/src/test/java/eu/dnetlib/iis/wf/referenceextraction/project/tara/TaraReferenceExtractionIOUtilsTest.java +++ b/iis-wf/iis-wf-referenceextraction/src/test/java/eu/dnetlib/iis/wf/referenceextraction/project/tara/TaraReferenceExtractionIOUtilsTest.java @@ -1,22 +1,31 @@ package eu.dnetlib.iis.wf.referenceextraction.project.tara; -import eu.dnetlib.iis.common.spark.TestWithSharedSparkSession; -import eu.dnetlib.iis.common.spark.avro.AvroDataFrameSupport; -import eu.dnetlib.iis.referenceextraction.project.schemas.DocumentToProject; +import static eu.dnetlib.iis.wf.referenceextraction.project.tara.TaraReferenceExtractionIOUtils.clearOutput; +import static eu.dnetlib.iis.wf.referenceextraction.project.tara.TaraReferenceExtractionIOUtils.storeInOutput; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.avro.SchemaConverters; +import org.apache.spark.sql.types.StructType; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; -import java.io.IOException; -import java.util.Collections; -import java.util.List; - -import static eu.dnetlib.iis.wf.referenceextraction.project.tara.TaraReferenceExtractionIOUtils.*; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.mockito.Mockito.*; +import eu.dnetlib.iis.common.spark.TestWithSharedSparkSession; +import eu.dnetlib.iis.referenceextraction.project.schemas.DocumentToProject; +import eu.dnetlib.iis.wf.referenceextraction.project.tara.TaraReferenceExtractionIOUtils.AvroDataStoreWriter; +import eu.dnetlib.iis.wf.referenceextraction.project.tara.TaraReferenceExtractionIOUtils.OutputCleaner; public class TaraReferenceExtractionIOUtilsTest extends TestWithSharedSparkSession { @@ -41,14 +50,12 @@ public void storeInOutputShouldRunProperly() { .setConfidenceLevel(1.0f) .build(); List documentToProjectList = Collections.singletonList(documentToProject); - Dataset documentToProjectDF = new AvroDataFrameSupport(spark()).createDataFrame( - documentToProjectList, - DocumentToProject.SCHEMA$); + Dataset documentToProjectDF = createDataFrame(documentToProjectList); AvroDataStoreWriter writer = mock(AvroDataStoreWriter.class); // when storeInOutput(documentToProjectDF, "path/to/output", writer); - + // then ArgumentCaptor> dataFrameCaptor = ArgumentCaptor.forClass(Dataset.class); verify(writer, atLeastOnce()).write(dataFrameCaptor.capture(), @@ -64,4 +71,18 @@ public void storeInOutputShouldRunProperly() { assertEquals(documentToProject.getConfidenceLevel(), documentToProjectRow.getAs("confidenceLevel"), 1e-3); assertNull(documentToProjectRow.getAs("textsnippet")); } + + private Dataset createDataFrame(List inputList) { + List dataFrameList = new ArrayList<>(inputList.size()); + for (DocumentToProject input : inputList) { + dataFrameList.add(RowFactory.create(input.getDocumentId(), input.getProjectId(), input.getConfidenceLevel(), + input.getTextsnippet())); + } + + Dataset result = spark().createDataFrame(dataFrameList, + (StructType) SchemaConverters.toSqlType(DocumentToProject.SCHEMA$).dataType()); + + return result; + } + } \ No newline at end of file diff --git a/iis-wf/iis-wf-referenceextraction/src/test/java/eu/dnetlib/iis/wf/referenceextraction/project/tara/TaraReferenceExtractionUtilsTest.java b/iis-wf/iis-wf-referenceextraction/src/test/java/eu/dnetlib/iis/wf/referenceextraction/project/tara/TaraReferenceExtractionUtilsTest.java index 82b89a3a1..d602048a6 100644 --- a/iis-wf/iis-wf-referenceextraction/src/test/java/eu/dnetlib/iis/wf/referenceextraction/project/tara/TaraReferenceExtractionUtilsTest.java +++ b/iis-wf/iis-wf-referenceextraction/src/test/java/eu/dnetlib/iis/wf/referenceextraction/project/tara/TaraReferenceExtractionUtilsTest.java @@ -1,51 +1,48 @@ package eu.dnetlib.iis.wf.referenceextraction.project.tara; -import eu.dnetlib.iis.common.spark.TestWithSharedSparkSession; -import eu.dnetlib.iis.common.spark.avro.AvroDataFrameSupport; -import eu.dnetlib.iis.common.spark.pipe.PipeExecutionEnvironment; -import eu.dnetlib.iis.metadataextraction.schemas.DocumentText; -import eu.dnetlib.iis.referenceextraction.project.schemas.DocumentToProject; -import eu.dnetlib.iis.transformers.metadatamerger.schemas.ExtractedDocumentMetadataMergedWithOriginal; -import eu.dnetlib.iis.transformers.metadatamerger.schemas.PublicationType; -import org.apache.spark.SparkFiles; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.RowFactory; -import org.apache.spark.sql.avro.SchemaConverters; -import org.junit.jupiter.api.Test; +import static eu.dnetlib.iis.wf.referenceextraction.project.tara.TaraReferenceExtractionUtils.buildDocumentMetadata; +import static eu.dnetlib.iis.wf.referenceextraction.project.tara.TaraReferenceExtractionUtils.runReferenceExtraction; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.stream.Collectors; -import static eu.dnetlib.iis.wf.referenceextraction.project.tara.TaraReferenceExtractionUtils.buildDocumentMetadata; -import static eu.dnetlib.iis.wf.referenceextraction.project.tara.TaraReferenceExtractionUtils.runReferenceExtraction; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNull; +import org.apache.spark.SparkFiles; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.avro.SchemaConverters; +import org.apache.spark.sql.types.StructType; +import org.junit.jupiter.api.Test; + +import eu.dnetlib.iis.common.spark.TestWithSharedSparkSession; +import eu.dnetlib.iis.common.spark.pipe.PipeExecutionEnvironment; +import eu.dnetlib.iis.metadataextraction.schemas.DocumentText; +import eu.dnetlib.iis.referenceextraction.project.schemas.DocumentToProject; +import eu.dnetlib.iis.transformers.metadatamerger.schemas.ExtractedDocumentMetadataMergedWithOriginal; public class TaraReferenceExtractionUtilsTest extends TestWithSharedSparkSession { @Test public void buildDocumentMetadataShouldRunProperly() { // given - AvroDataFrameSupport avroDataFrameSupport = new AvroDataFrameSupport(spark()); - Dataset documentTextDF = avroDataFrameSupport.createDataFrame( + Dataset documentTextDF = createTextDataFrame( Arrays.asList( createDocumentText("docId-1", "text-1"), createDocumentText("docId-2", "text-2") - ), - DocumentText.SCHEMA$); - Dataset extractedDocumentMetadataMergedWithOriginalDF = avroDataFrameSupport.createDataFrame( + )); + Dataset extractedDocumentMetadataMergedWithOriginalDF = createMetaDataFrame( Arrays.asList( - createExtractedDocumentMetadataMergedWithOriginal("docId-1"), - createExtractedDocumentMetadataMergedWithOriginal("docId-a") - ), - ExtractedDocumentMetadataMergedWithOriginal.SCHEMA$); + createDocumentMetadata("docId-1") + )); // when Dataset resultDF = buildDocumentMetadata(documentTextDF, extractedDocumentMetadataMergedWithOriginalDF); @@ -89,6 +86,20 @@ public void shouldRunReferenceExtraction() throws IOException { assertForDocumentToProject(row, "docId-1", "projId-1", 1.0f); } + private Dataset createTextDataFrame(List inputList) { + List dataFrameList = new ArrayList<>(inputList.size()); + for (DocumentText input : inputList) { + dataFrameList.add(RowFactory.create(input.getId(), input.getText())); + } + return spark().createDataFrame(dataFrameList, + (StructType) SchemaConverters.toSqlType(DocumentText.SCHEMA$).dataType()); + } + + private Dataset createMetaDataFrame(List inputList) { + return spark().createDataFrame(inputList, + (StructType) SchemaConverters.toSqlType(ExtractedDocumentMetadataMergedWithOriginal.SCHEMA$).dataType()); + } + private static DocumentText createDocumentText(String id, String text) { return DocumentText.newBuilder() .setId(id) @@ -96,15 +107,11 @@ private static DocumentText createDocumentText(String id, String text) { .build(); } - private static ExtractedDocumentMetadataMergedWithOriginal createExtractedDocumentMetadataMergedWithOriginal(String id) { - return ExtractedDocumentMetadataMergedWithOriginal.newBuilder() - .setId(id) - .setPublicationType(PublicationType.newBuilder().build()) - .build(); + private static Row createDocumentMetadata(String id) { + return createDocumentMetadata(id, null); } - private static Row createDocumentMetadata(String id, - String text) { + private static Row createDocumentMetadata(String id, String text) { return RowFactory.create(id, null, null, text); } diff --git a/pom.xml b/pom.xml index 99512b7be..0781e40f1 100644 --- a/pom.xml +++ b/pom.xml @@ -100,6 +100,33 @@ + + + org.apache.logging.log4j + log4j-1.2-api + 2.19.0 + + + + org.apache.logging.log4j + log4j-api + 2.19.0 + + + + org.apache.logging.log4j + log4j-core + 2.19.0 + + + + org.apache.logging.log4j + log4j-slf4j-impl + 2.19.0 + + + + + javax.servlet javax.servlet-api @@ -350,7 +378,10 @@ pl.edu.icm.spark-utils spark-utils_2.12 1.0.2 + + @@ -747,6 +778,15 @@ target/test-classes + + org.apache.maven.plugins + maven-plugin-plugin + 3.6.0 + + true + + + org.apache.maven.plugins maven-compiler-plugin @@ -878,16 +918,6 @@ - - - org.apache.maven.plugins - maven-plugin-plugin - [3.2,) - - descriptor - - -