Skip to content

Commit

Permalink
Fixed many race conditions with Spark
Browse files Browse the repository at this point in the history
  • Loading branch information
vpinna80 committed Jul 25, 2024
1 parent ec92c92 commit 5a207c7
Show file tree
Hide file tree
Showing 110 changed files with 1,135 additions and 820 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.AbstractMap.SimpleEntry;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -57,11 +58,10 @@ public interface DataPoint extends Map<DataStructureComponent<?, ?, ?>, ScalarVa
* @param components the component whose values are used for the ordering
* @return the Comparator instance
*/
@SafeVarargs
public static <S extends ValueDomainSubset<S, D>, D extends ValueDomain> SerComparator<DataPoint> compareBy(DataStructureComponent<Identifier, ?, ?>... components)
public static <S extends ValueDomainSubset<S, D>, D extends ValueDomain> SerComparator<DataPoint> compareBy(List<DataStructureComponent<?, ?, ?>> components)
{
SerToIntBiFunction<DataPoint, DataPoint> comparator = null;
for (DataStructureComponent<Identifier, ?, ?> component: components)
for (DataStructureComponent<?, ?, ?> component: components)
{
@SuppressWarnings("unchecked")
DataStructureComponent<Identifier, S, D> c = (DataStructureComponent<Identifier, S, D>) component;
Expand All @@ -83,7 +83,12 @@ public static <S extends ValueDomainSubset<S, D>, D extends ValueDomain> SerComp
};
return comparator::applyAsInt;
}


public static <S extends ValueDomainSubset<S, D>, D extends ValueDomain> SerComparator<DataPoint> compareBy(DataStructureComponent<?, ?, ?> component)
{
return compareBy(List.of(component));
}

public static SerBinaryOperator<DataPoint> combiner(SerBiFunction<DataPoint, DataPoint, Lineage> lineageCombiner)
{
return (dp1, dp2) -> dp1.combine(dp2, lineageCombiner);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -223,20 +224,7 @@ public <A, T, TT> Stream<T> streamByKeys(Set<DataStructureComponent<Identifier,
*/
public <T extends Map<DataStructureComponent<?, ?, ?>, ScalarValue<?, ?, ?, ?>>> DataSet aggregate(DataSetMetadata structure,
Set<DataStructureComponent<Identifier, ?, ?>> keys, SerCollector<DataPoint, ?, T> groupCollector,
SerBiFunction<T, Map<DataStructureComponent<Identifier, ?, ?>, ScalarValue<?, ?, ?, ?>>, DataPoint> finisher);

/**
* Creates a new DataSet by applying a window function over multiple source components of this DataSet.
* Each application can produce one or more results that will be exploded into multiple datapoints.
* @param components A map from source components to result components
* @param clause The clause specifying the window
* @param extractors extractors that extract a value from a datapoint
* @param collectors Collectors that compute the intermediate results for each source component and window
* @param finishers Finishers to transform intermediate results and partition keys into a collection of new values
*
* @param <TT> The intermediate result of a single analytic function on one window
* @return The dataset result of the analytic invocation
*/
SerBiFunction<T, Entry<Lineage[], Map<DataStructureComponent<Identifier, ?, ?>, ScalarValue<?, ?, ?, ?>>>, DataPoint> finisher);

/**
* Creates a new DataSet by applying a window function over a component of this DataSet.
Expand All @@ -261,12 +249,6 @@ public <T, TT> DataSet analytic(SerFunction<DataPoint, Lineage> lineageOp, DataS
DataStructureComponent<?, ?, ?> destComp, WindowClause clause, SerFunction<DataPoint, T> extractor,
SerCollector<T, ?, TT> collector, SerBiFunction<TT, T, Collection<ScalarValue<?, ?, ?, ?>>> finisher);

public default DataSet analytic(SerFunction<DataPoint, Lineage> lineageOp, DataStructureComponent<?, ?, ?> component,
WindowClause clause, SerCollector<ScalarValue<?, ?, ?, ?>, ?, ScalarValue<?, ?, ?, ?>> collector)
{
return analytic(lineageOp, component, component, clause, null, collector, null);
}

/**
* Creates a new DataSet as the union of this and other datasets.
* The datasets must have the same structure, and duplicated datapoints are taken from the leftmost operand
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,11 @@ public interface ValueDomain extends Serializable
*
* @return the representation type for this domain
*/
public Class<? extends Serializable> getRepresentation();
public Class<? extends Serializable> getRepresentation();


/**
* @return the engine representation class for values belonging to this domain
*/
public Class<?> getValueClass();
}
90 changes: 90 additions & 0 deletions vtl-api/src/main/java/it/bancaditalia/oss/vtl/util/Holder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright © 2020 Banca D'Italia
*
* Licensed under the EUPL, Version 1.2 (the "License");
* You may not use this work except in compliance with the
* License.
* You may obtain a copy of the License at:
*
* https://joinup.ec.europa.eu/sites/default/files/custom-page/attachment/2020-03/EUPL-1.2%20EN.txt
*
* Unless required by applicable law or agreed to in
* writing, software distributed under the License is
* distributed on an "AS IS" basis,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied.
*
* See the License for the specific language governing
* permissions and limitations under the License.
*/
package it.bancaditalia.oss.vtl.util;

import static java.util.Objects.requireNonNull;

import java.io.Serializable;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.util.function.BinaryOperator;

/**
* Simplified version of AtomicReference (to be used with Spark)
* @param <V>
*/
public class Holder<V> implements Serializable
{
private static final long serialVersionUID = 1L;
private static final VarHandle VALUE;

static {
try
{
MethodHandles.Lookup l = MethodHandles.lookup();
VALUE = l.findVarHandle(Holder.class, "value", Object.class);
}
catch (ReflectiveOperationException e)
{
throw new ExceptionInInitializerError(e);
}
}

public final Class<?> repr;
private volatile V value;

public Holder(Class<?> repr)
{
this.repr = requireNonNull(repr);
}

public V get()
{
return value;
}

public void set(V newValue)
{
value = newValue;
}

public boolean compareAndSet(V expectedValue, V newValue)
{
return VALUE.compareAndSet(this, expectedValue, newValue);
}

public V accumulateAndGet(V x, BinaryOperator<V> accumulatorFunction)
{
V prev = get(), next = null;
for (boolean haveNext = false;;)
{
if (!haveNext)
next = accumulatorFunction.apply(prev, x);
if (VALUE.weakCompareAndSet(this, prev, next))
return next;
haveNext = (prev == (prev = get()));
}
}

public String toString()
{
return String.valueOf(get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,20 @@
*/
package it.bancaditalia.oss.vtl.util;

import java.io.Serializable;
import java.util.Comparator;
import java.util.function.BinaryOperator;

@FunctionalInterface
public interface SerBinaryOperator<T> extends BinaryOperator<T>, SerBiFunction<T, T, T>
{
public static <T> SerBinaryOperator<T> minBy(Comparator<? super T> comparator)
public static <T, C extends Comparator<? super T> & Serializable> SerBinaryOperator<T> minBy(C comparator)
{
SerToIntBiFunction<T, T> fn = comparator::compare;
return (a, b) -> fn.applyAsInt(a, b) <= 0 ? a : b;
}

public static <T> SerBinaryOperator<T> maxBy(Comparator<? super T> comparator)
public static <T, C extends Comparator<? super T> & Serializable> SerBinaryOperator<T> maxBy(C comparator)
{
SerToIntBiFunction<T, T> fn = comparator::compare;
return (a, b) -> fn.applyAsInt(a, b) >= 0 ? a : b;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class SerCollector<T, A, R> implements Collector<T, A, R>, Serializable
private final EnumSet<Characteristics> characteristics;

public static <T, A, R> SerCollector<T, A, R> of(SerSupplier<A> supplier, SerBiConsumer<A, T> accumulator,
SerBinaryOperator<A> combiner, SerFunction<A, R> finisher, Set<Characteristics> characteristics)
SerBinaryOperator<A> combiner, SerFunction<A, R> finisher, EnumSet<Characteristics> characteristics)
{
return new SerCollector<>(supplier, accumulator, combiner, finisher, characteristics);
}
Expand All @@ -52,7 +52,7 @@ public static <T, A> SerCollector<T, A, A> of(SerSupplier<A> supplier, SerBiCons
}

protected SerCollector(SerSupplier<A> supplier, SerBiConsumer<A, T> accumulator, SerBinaryOperator<A> combiner,
SerFunction<A, R> finisher, Set<Characteristics> characteristics)
SerFunction<A, R> finisher, EnumSet<Characteristics> characteristics)
{
this.supplier = supplier;
this.accumulator = accumulator;
Expand Down
Loading

0 comments on commit 5a207c7

Please sign in to comment.