Skip to content

Commit

Permalink
[dbs-leipzig#1559] add help functions
Browse files Browse the repository at this point in the history
  • Loading branch information
alwba committed Aug 2, 2022
1 parent 90c6180 commit 807ec2d
Show file tree
Hide file tree
Showing 4 changed files with 250 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.gradoop.temporal.model.impl.operators.metric.functions;

/**
* Enum for defining an aggregate type.
*/
public enum AggregateType {
/**
* Minimum aggregation.
*/
MIN,
/**
* Maximum aggregation.
*/
MAX,
/**
* Average aggregation.
*/
AVG
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.temporal.model.impl.operators.metric.functions;

import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.gradoop.common.model.impl.id.GradoopId;

import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;

/**
* Reduce function to extract all timestamps where the degree of a vertex changes.
*/
public class ExtractAllTimePointsReduce implements GroupReduceFunction<Tuple2<GradoopId, TreeMap<Long, Integer>>, Tuple1<Long>> {

public ExtractAllTimePointsReduce() {
}

@Override
public void reduce(Iterable<Tuple2<GradoopId, TreeMap<Long, Integer>>> iterable, Collector<Tuple1<Long>> collector) throws Exception {
SortedSet<Long> timePoints = new TreeSet<>();

for (Tuple2<GradoopId, TreeMap<Long, Integer>> tuple : iterable) {
timePoints.addAll(tuple.f1.keySet());
}

for (Long timePoint: timePoints) {
collector.collect(new Tuple1<>(timePoint));
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.temporal.model.impl.operators.metric.functions;

import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.gradoop.common.model.impl.id.GradoopId;

import java.util.*;

/**
* A group reduce function that merges all Tuples (vId, degreeTree) to a dataset of tuples (time, aggDegree)
* that represents the aggregated degree value for the whole graph at the given time.
*/
public class GroupDegreeTreesToAggregateDegrees
implements GroupReduceFunction<Tuple2<GradoopId, TreeMap<Long, Integer>>, Tuple2<Long, Integer>> {

/**
* The aggregate type to use (min,max,avg).
*/
private final AggregateType aggregateType;
/**
* The timestamps where at least one vertex degree changes.
*/
private final SortedSet<Long> timePoints;

/**
* Creates an instance of this group reduce function.
*
* @param aggregateType the aggregate type to use (min,max,avg).
*/
public GroupDegreeTreesToAggregateDegrees(AggregateType aggregateType, DataSet<Tuple1<Long>> timePoints) {
this.aggregateType = aggregateType;

List<Tuple1<Long>> tuples;
try {
tuples = timePoints.collect();
this.timePoints = null;

for (int i = 0; i < timePoints.count(); i = i + 1) {
this.timePoints.add(tuples.get(i).getField(0));
}
} catch (Exception e) {
throw new RuntimeException(e);
}

}

@Override
public void reduce(Iterable<Tuple2<GradoopId, TreeMap<Long, Integer>>> iterable,
Collector<Tuple2<Long, Integer>> collector) throws Exception {

// init necessary maps and set
HashMap<GradoopId, TreeMap<Long, Integer>> degreeTrees = new HashMap<>();
HashMap<GradoopId, Integer> vertexDegrees = new HashMap<>();

// convert the iterables to a hashmap and remember all possible timestamps
for (Tuple2<GradoopId, TreeMap<Long, Integer>> tuple : iterable) {
degreeTrees.put(tuple.f0, tuple.f1);
}

int numberOfVertices = degreeTrees.size();

// Add default times
timePoints.add(Long.MIN_VALUE);

for (Long timePoint : timePoints) {
// skip last default time
if (Long.MAX_VALUE == timePoint) {
continue;
}
// Iterate over all vertices
for (Map.Entry<GradoopId, TreeMap<Long, Integer>> entry : degreeTrees.entrySet()) {
// Make sure the vertex is registered in the current vertexDegrees capture
if (!vertexDegrees.containsKey(entry.getKey())) {
vertexDegrees.put(entry.getKey(), 0);
}

// Check if timestamp is in tree, if not, take the lower key
if (entry.getValue().containsKey(timePoint)) {
vertexDegrees.put(entry.getKey(), entry.getValue().get(timePoint));
} else {
Long lowerKey = entry.getValue().lowerKey(timePoint);
if (lowerKey != null) {
vertexDegrees.put(entry.getKey(), entry.getValue().get(lowerKey));
}
}
}

// Here, every tree with this time point is iterated. Now we need to aggregate for the current time.
Optional<Integer> opt;
switch (aggregateType) {
case MIN:
opt = vertexDegrees.values().stream().reduce(Math::min);
opt.ifPresent(integer -> collector.collect(new Tuple2<>(timePoint, integer)));
break;
case MAX:
opt = vertexDegrees.values().stream().reduce(Math::max);
opt.ifPresent(integer -> collector.collect(new Tuple2<>(timePoint, integer)));
break;
case AVG:
opt = vertexDegrees.values().stream().reduce(Math::addExact);
opt.ifPresent(integer -> collector.collect(
new Tuple2<>(timePoint, (int) Math.ceil((double) integer / (double) numberOfVertices))));
break;
default:
throw new IllegalArgumentException("Aggregate type not specified.");
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.temporal.model.impl.operators.metric.functions;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.FunctionAnnotation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.gradoop.common.model.impl.id.GradoopId;

import java.util.Map;
import java.util.TreeMap;

/**
* Replaces the degree tree, that just stores the degree changes for each time, with a degree tree that
* stores the actual degree of the vertex at that time.
*/
@FunctionAnnotation.ForwardedFields("f0")
public class TransformDeltaToAbsoluteDegreeTree
implements MapFunction<Tuple2<GradoopId, TreeMap<Long, Integer>>,
Tuple2<GradoopId, TreeMap<Long, Integer>>> {

/**
* To reduce object instantiations.
*/
private TreeMap<Long, Integer> absoluteDegreeTree;

@Override
public Tuple2<GradoopId, TreeMap<Long, Integer>> map(
Tuple2<GradoopId, TreeMap<Long, Integer>> vIdTreeMapTuple) throws Exception {
// init the degree and the temporal tree
int degree = 0;
absoluteDegreeTree = new TreeMap<>();

// aggregate the degrees
for (Map.Entry<Long, Integer> entry : vIdTreeMapTuple.f1.entrySet()) {
degree += entry.getValue();
absoluteDegreeTree.put(entry.getKey(), degree);
}
vIdTreeMapTuple.f1 = absoluteDegreeTree;
return vIdTreeMapTuple;
}
}

0 comments on commit 807ec2d

Please sign in to comment.