-
Notifications
You must be signed in to change notification settings - Fork 422
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Question - Resample #191
Comments
Hi, public class AggregatorTask {
private static final Logger LOG = LoggerFactory.getLogger(AggregatorTask.class);
public static void main(String[] args) throws IOException {
...
// create context
SparkConf conf = new SparkConf().setAppName("Aggregate Time Data");
JavaSparkContext context = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(context);
ZoneId zoneId = ZoneId.systemDefault();
Dataset<Row> df = ... // load data
// create DateTimeIndex (timestamps for data)
// assume 1s interval
ZonedDateTime start = ZonedDateTime.ofInstant(Instant.ofEpochMilli(1496268001000L), zoneId);
ZonedDateTime end = ZonedDateTime.ofInstant(Instant.ofEpochMilli(1496354399000L), zoneId);
DateTimeIndex dtIndex = DateTimeIndexFactory.uniformFromInterval(start, end, new SecondFrequency(1));
// create time series
TimeSeriesRDD<String> meterDataRdd = TimeSeriesRDD.timeSeriesRDDFromObservations(dtIndex, df, "timestamp",
"symbol", "value");
// cache data in memory
meterDataRdd.cache();
// fill gaps using linear interpolation
meterDataRdd = meterDataRdd.fill("linear");
// resample data
resampleAndSave(meterDataRdd, start, end, new MinuteFrequency(5), outputDir, pathSuffix + "\\5m");
resampleAndSave(meterDataRdd, start, end, new HourFrequency(1), outputDir, pathSuffix + "\\1h");
}
private static void resampleAndSave(TimeSeriesRDD<String> meterDataRdd, ZonedDateTime start,
ZonedDateTime end, Frequency frequency, String outputDirectory, String subDirectory) {
// resample data to given interval
DateTimeIndex targetIndex = DateTimeIndexFactory.uniformFromInterval(start, end, frequency);
TimeSeriesRDD<String> resampled = meterDataRdd.resample(targetIndex, new PowerResampler(),
false, false);
// save data to file
resampled.saveAsCsv(outputDirectory + "\\" + subDirectory);
}
public class PowerResampler implements Function3<double[], Object, Object, Object>, Serializable {
@Override
public Object apply(double[] values, Object startIndex, Object endIndex) {
// perform numerical integration
int start = (int) startIndex;
int end = (int) endIndex;
double result = 0;
for (int i = start; i < end - 1; i++) {
double a = values[i];
double c = values[i + 1];
double h = 1.0 / 3600.0;
result += (a + c) / 2.0 * h;
}
return result;
}
@Override
public Function1<double[], Function1<Object, Function1<Object, Object>>> curried() {
return null;
}
@Override
public Function1<Tuple3<double[], Object, Object>, Object> tupled() {
return null;
}
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi,
I was trying to use the resample functionality. Its being implemented in Scala, but is not added to Java API(JavaTimeSeriesRDD). How to add this?
The text was updated successfully, but these errors were encountered: