From 1ca985e7cc1c2e8bf1b87c958c709afa6b09c064 Mon Sep 17 00:00:00 2001 From: Valentino Pinna Date: Wed, 25 Oct 2023 10:26:27 +0200 Subject: [PATCH] Fixes to CSV and SDMX Environments --- .../impl/environment/CSVFileEnvironment.java | 8 +++-- .../vtl/impl/environment/SDMXEnvironment.java | 2 +- vtl-envs/vtl-spark/pom.xml | 4 +++ .../environment/spark/DataPointEncoder.java | 5 ++-- .../impl/environment/spark/SparkDataSet.java | 29 +++++++++---------- .../environment/spark/SparkEnvironment.java | 5 ++-- .../impl/environment/spark/SparkUtils.java | 24 +-------------- 7 files changed, 29 insertions(+), 48 deletions(-) diff --git a/vtl-envs/vtl-envbase/src/main/java/it/bancaditalia/oss/vtl/impl/environment/CSVFileEnvironment.java b/vtl-envs/vtl-envbase/src/main/java/it/bancaditalia/oss/vtl/impl/environment/CSVFileEnvironment.java index c356f8f7c..9b1aee650 100644 --- a/vtl-envs/vtl-envbase/src/main/java/it/bancaditalia/oss/vtl/impl/environment/CSVFileEnvironment.java +++ b/vtl-envs/vtl-envbase/src/main/java/it/bancaditalia/oss/vtl/impl/environment/CSVFileEnvironment.java @@ -64,6 +64,7 @@ import it.bancaditalia.oss.vtl.config.ConfigurationManager; import it.bancaditalia.oss.vtl.config.ConfigurationManagerFactory; import it.bancaditalia.oss.vtl.environment.Environment; +import it.bancaditalia.oss.vtl.exceptions.VTLMissingComponentsException; import it.bancaditalia.oss.vtl.exceptions.VTLNestedException; import it.bancaditalia.oss.vtl.impl.environment.util.CSVParseUtils; import it.bancaditalia.oss.vtl.impl.environment.util.ProgressWindow; @@ -188,10 +189,13 @@ protected Stream streamFileName(String fileName, String alias) } else { + DataSetMetadata structure = maybeStructure; + metadata = Arrays.stream(reader.readLine().split(",")) - .map(maybeStructure::getComponent) - .map(Optional::get) + .map(toEntryWithValue(maybeStructure::getComponent)) + .map(e -> e.getValue().orElseThrow(() -> new VTLMissingComponentsException(e.getKey(), structure))) .collect(toList()); + masks = metadata.stream() .map(toEntryWithValue(DataStructureComponent::getDomain)) .map(keepingKey(ValueDomainSubset::getName)) diff --git a/vtl-envs/vtl-sdmxenv/src/main/java/it/bancaditalia/oss/vtl/impl/environment/SDMXEnvironment.java b/vtl-envs/vtl-sdmxenv/src/main/java/it/bancaditalia/oss/vtl/impl/environment/SDMXEnvironment.java index 300b9ee4f..6b4495275 100644 --- a/vtl-envs/vtl-sdmxenv/src/main/java/it/bancaditalia/oss/vtl/impl/environment/SDMXEnvironment.java +++ b/vtl-envs/vtl-sdmxenv/src/main/java/it/bancaditalia/oss/vtl/impl/environment/SDMXEnvironment.java @@ -329,7 +329,7 @@ public synchronized DataPoint next() builder.add(c, c.cast(StringValue.of(a.getCode()))); } - DataStructureComponent measure = structure.getComponents(Measure.class).iterator().next(); + DataStructureComponent measure = structure.getMeasures().iterator().next(); builder.add(measure, DoubleValue.of(parseDouble(obs.getMeasureValue(measure.getName())))); TemporalAccessor holder = parser.getValue().queryFrom(parser.getKey().parse(obs.getDimensionValue())); ScalarValue value; diff --git a/vtl-envs/vtl-spark/pom.xml b/vtl-envs/vtl-spark/pom.xml index a5b889b71..e4921a7ed 100644 --- a/vtl-envs/vtl-spark/pom.xml +++ b/vtl-envs/vtl-spark/pom.xml @@ -96,6 +96,10 @@ org.codehaus.mojo flatten-maven-plugin + + net.alchim31.maven + scala-maven-plugin + org.apache.maven.plugins maven-gpg-plugin diff --git a/vtl-envs/vtl-spark/src/main/java/it/bancaditalia/oss/vtl/impl/environment/spark/DataPointEncoder.java b/vtl-envs/vtl-spark/src/main/java/it/bancaditalia/oss/vtl/impl/environment/spark/DataPointEncoder.java index 4d5e6f451..832d2d41b 100644 --- a/vtl-envs/vtl-spark/src/main/java/it/bancaditalia/oss/vtl/impl/environment/spark/DataPointEncoder.java +++ b/vtl-envs/vtl-spark/src/main/java/it/bancaditalia/oss/vtl/impl/environment/spark/DataPointEncoder.java @@ -22,7 +22,6 @@ import static it.bancaditalia.oss.vtl.impl.environment.spark.SparkEnvironment.LineageSparkUDT; import static it.bancaditalia.oss.vtl.impl.environment.spark.SparkUtils.createStructFromComponents; import static it.bancaditalia.oss.vtl.impl.environment.spark.SparkUtils.getScalarFor; -import static it.bancaditalia.oss.vtl.impl.environment.spark.SparkUtils.sorter; import static java.util.stream.Collectors.joining; import java.io.Serializable; @@ -70,7 +69,7 @@ public DataPointEncoder(Set> dataStruc { structure = dataStructure instanceof DataSetMetadata ? (DataSetMetadata) dataStructure : new DataStructureBuilder(dataStructure).build(); components = structure.toArray(new DataStructureComponent[structure.size()]); - Arrays.sort(components, SparkUtils::sorter); + Arrays.sort(components, DataStructureComponent::byNameAndRole); List fields = new ArrayList<>(createStructFromComponents(components)); StructType schemaNoLineage = new StructType(fields.toArray(new StructField[components.length])); rowEncoderNoLineage = Encoders.row(schemaNoLineage); @@ -249,7 +248,7 @@ public DataPoint combine(DataPoint other, SerBiFunction membershipMeasure = membershipStructure.getComponents(Measure.class).iterator().next(); + DataStructureComponent membershipMeasure = membershipStructure.getMeasures().iterator().next(); Dataset newDF = dataFrame; if (!getMetadata().contains(membershipMeasure)) @@ -256,9 +253,9 @@ public DataSet subspace(Map lineageOperator, SerFunction, ? extends ScalarValue>> operator) { - final Set> originalIDs = getMetadata().getComponents(Identifier.class); - if (!metadata.getComponents(Identifier.class).containsAll(originalIDs)) - throw new VTLInvariantIdentifiersException("map", originalIDs, metadata.getComponents(Identifier.class)); + final Set> originalIDs = getMetadata().getIDs(); + if (!metadata.getIDs().containsAll(originalIDs)) + throw new VTLInvariantIdentifiersException("map", originalIDs, metadata.getIDs()); LOGGER.trace("Creating dataset by mapping from {} to {}", getMetadata(), metadata); @@ -270,7 +267,7 @@ public DataSet mapKeepingKeys(DataSetMetadata metadata, SerFunction sorter(a.getKey(), b.getKey())) + .sorted((a, b) -> DataStructureComponent.byNameAndRole(a.getKey(), b.getKey())) .map(Entry::getValue) .map(ScalarValue::get) .collect(toArray(new Object[map.size() + 1])); @@ -315,7 +312,7 @@ public DataSet flatmapKeepingKeys(DataSetMetadata metadata, SerFunction[] comps = resultEncoder.components; - Set> ids = getMetadata().getComponents(Identifier.class); + Set> ids = getMetadata().getIDs(); StructType schema = resultEncoder.schema; Dataset flattenedDf = dataFrame.flatMap((FlatMapFunction) row -> { @@ -350,7 +347,7 @@ public DataSet filteredMappedJoin(DataSetMetadata metadata, DataSet other, SerBi { SparkDataSet sparkOther = other instanceof SparkDataSet ? ((SparkDataSet) other) : new SparkDataSet(session, other.getMetadata(), other); - List commonIDs = getComponents(Identifier.class).stream() + List commonIDs = getMetadata().getIDs().stream() .filter(other.getMetadata()::contains) .map(DataStructureComponent::getName) .collect(toList()); @@ -440,7 +437,7 @@ public DataSet analytic(SerFunction lineageOp, // Sort by dest component @SuppressWarnings("unchecked") Entry, DataStructureComponent>[] compArray = (Entry, DataStructureComponent>[]) components.entrySet().toArray(new Entry[components.size()]); - Arrays.sort(compArray, (e1, e2) -> sorter(e1.getValue(), e2.getValue())); + Arrays.sort(compArray, (e1, e2) -> DataStructureComponent.byNameAndRole(e1.getValue(), e2.getValue())); // Create the udafs to generate each dest component Map destComponents = new HashMap<>(); @@ -569,11 +566,11 @@ public Stream streamByKeys(Set[] sortedKeys = (DataStructureComponent[]) keys.stream() - .sorted(byName()) + .sorted(DataStructureComponent::byName) .collect(toArray(new DataStructureComponent[keys.size()])); Column[] groupingCols = keys.stream() - .sorted(byName()) + .sorted(DataStructureComponent::byName) .map(DataStructureComponent::getName) .map(functions::col) .collect(toArray(new Column[keys.size()])); @@ -593,7 +590,7 @@ public Stream streamByKeys(Set for fill_time_series List> resultComponents = getMetadata().stream() - .sorted(SparkUtils::sorter) + .sorted(DataStructureComponent::byNameAndRole) .collect(toList()); // Use kryo encoder hoping that the class has been registered beforehand @@ -641,7 +638,7 @@ public DataSet union(SerFunction lineageOp, List ot .get(); // remove duplicates and add lineage - Column[] ids = getColumnsFromComponents(getMetadata().getComponents(Identifier.class)).toArray(new Column[0]); + Column[] ids = getColumnsFromComponents(getMetadata().getIDs()).toArray(new Column[0]); Column[] cols = getColumnsFromComponents(getMetadata()).toArray(new Column[getMetadata().size()]); Column lineage = new Column(Literal.create(LineageSparkUDT.serialize(LineageExternal.of("Union")), LineageSparkUDT)); result = result.withColumn("__index", first("__index").over(partitionBy(ids).orderBy(result.col("__index")))) @@ -656,7 +653,7 @@ public DataSet union(SerFunction lineageOp, List ot public DataSet setDiff(DataSet other) { SparkDataSet sparkOther = other instanceof SparkDataSet ? ((SparkDataSet) other) : new SparkDataSet(session, other.getMetadata(), other); - List ids = getMetadata().getComponents(Identifier.class).stream().map(DataStructureComponent::getName).collect(toList()); + List ids = getMetadata().getIDs().stream().map(DataStructureComponent::getName).collect(toList()); Dataset result = dataFrame.join(sparkOther.dataFrame, asScala((Iterable) ids).toSeq(), "leftanti"); Column[] cols = getColumnsFromComponents(getMetadata()).toArray(new Column[getMetadata().size() + 1]); diff --git a/vtl-envs/vtl-spark/src/main/java/it/bancaditalia/oss/vtl/impl/environment/spark/SparkEnvironment.java b/vtl-envs/vtl-spark/src/main/java/it/bancaditalia/oss/vtl/impl/environment/spark/SparkEnvironment.java index 8f5a31654..668627b62 100644 --- a/vtl-envs/vtl-spark/src/main/java/it/bancaditalia/oss/vtl/impl/environment/spark/SparkEnvironment.java +++ b/vtl-envs/vtl-spark/src/main/java/it/bancaditalia/oss/vtl/impl/environment/spark/SparkEnvironment.java @@ -77,7 +77,6 @@ import it.bancaditalia.oss.vtl.impl.types.lineage.LineageImpl; import it.bancaditalia.oss.vtl.impl.types.lineage.LineageNode; import it.bancaditalia.oss.vtl.impl.types.lineage.LineageSet; -import it.bancaditalia.oss.vtl.model.data.ComponentRole.Identifier; import it.bancaditalia.oss.vtl.model.data.DataSet; import it.bancaditalia.oss.vtl.model.data.DataSetMetadata; import it.bancaditalia.oss.vtl.model.data.DataStructureComponent; @@ -311,7 +310,7 @@ else if (field.dataType() instanceof IntegerType) Column[] converters = Arrays.stream(normalizedNames, 0, normalizedNames.length) .map(structure::getComponent) .map(Optional::get) - .sorted(SparkUtils::sorter) + .sorted(DataStructureComponent::byNameAndRole) .map(c -> udf(repr -> mapValue(c, repr.toString(), masks.get(c)).get(), types.get(c)) .apply(sourceDataFrame.col(newToOldNames.get(c.getName()))) .as(c.getName(), getMetadataFor(c))) @@ -323,7 +322,7 @@ else if (field.dataType() instanceof IntegerType) converters[converters.length - 1] = lit(serializedLineage).alias("$lineage$"); Dataset converted = sourceDataFrame.select(converters); - Column[] ids = getColumnsFromComponents(structure.getComponents(Identifier.class)).toArray(new Column[0]); + Column[] ids = getColumnsFromComponents(structure.getIDs()).toArray(new Column[0]); return new SparkDataSet(session, structure, converted.repartition(ids)); } } diff --git a/vtl-envs/vtl-spark/src/main/java/it/bancaditalia/oss/vtl/impl/environment/spark/SparkUtils.java b/vtl-envs/vtl-spark/src/main/java/it/bancaditalia/oss/vtl/impl/environment/spark/SparkUtils.java index 599527697..42110b06b 100644 --- a/vtl-envs/vtl-spark/src/main/java/it/bancaditalia/oss/vtl/impl/environment/spark/SparkUtils.java +++ b/vtl-envs/vtl-spark/src/main/java/it/bancaditalia/oss/vtl/impl/environment/spark/SparkUtils.java @@ -49,8 +49,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import java.util.stream.Stream; import org.apache.spark.sql.Column; @@ -220,31 +218,11 @@ public static List getColumnsFromComponents(Collection List structHelper(Stream> stream, SerFunction, F> mapper) { return stream - .sorted(SparkUtils::sorter) + .sorted(DataStructureComponent::byNameAndRole) .map(mapper) .collect(toList()); } - public static int sorter(DataStructureComponent c1, DataStructureComponent c2) - { - if (c1.is(Attribute.class) && !c2.is(Attribute.class)) - return 1; - else if (c1.is(Identifier.class) && !c2.is(Identifier.class)) - return -1; - else if (c1.is(Measure.class) && c2.is(Identifier.class)) - return 1; - else if (c1.is(Measure.class) && c2.is(Attribute.class)) - return -1; - - String n1 = c1.getName(), n2 = c2.getName(); - Pattern pattern = Pattern.compile("^(.+?)(\\d+)$"); - Matcher m1 = pattern.matcher(n1), m2 = pattern.matcher(n2); - if (m1.find() && m2.find() && m1.group(1).equals(m2.group(1))) - return Integer.compare(Integer.parseInt(m1.group(2)), Integer.parseInt(m2.group(2))); - else - return n1.compareTo(n2); - } - private SparkUtils() { }