Skip to content

Commit

Permalink
Merge pull request #5 from yahoo/micro-batching-for-raw
Browse files Browse the repository at this point in the history
Adding microbatching support and setting for RAW
  • Loading branch information
akshaisarma authored Jan 5, 2017
2 parents c3b03a4 + 3772137 commit 574f2d8
Show file tree
Hide file tree
Showing 15 changed files with 365 additions and 136 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,7 @@ Storm topologies are generally launched with "fat" jars (jar-with-dependencies),
</plugin>
```

### Storm 1.0 and below
### Storm versions below 1.0

Since package prefixes changed from `backtype.storm` to `org.apache.storm` in Storm 1.0 and above, you will need to get the storm-0.10 version of Bullet if
your Storm cluster is still not at 1.0 or higher. You change your dependency to:
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>com.yahoo.bullet</groupId>
<artifactId>bullet-storm</artifactId>
<version>0.0.4-SNAPSHOT</version>
<version>0.1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>bullet-storm</name>

Expand Down
1 change: 1 addition & 0 deletions src/main/java/com/yahoo/bullet/BulletConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class BulletConfig extends Config {
public static final String SPECIFICATION_MAX_DURATION = "rule.max.duration";
public static final String AGGREGATION_DEFAULT_SIZE = "rule.aggregation.default.size";
public static final String AGGREGATION_MAX_SIZE = "rule.aggregation.max.size";
public static final String RAW_AGGREGATION_MICRO_BATCH_SIZE = "rule.aggregation.raw.micro.batch.size";

public static final String RECORD_INJECT_TIMESTAMP = "record.inject.timestamp.enable";
public static final String RECORD_INJECT_TIMESTAMP_KEY = "record.inject.timestamp.key";
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/com/yahoo/bullet/Topology.java
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ public static void submit(BulletConfig config, String recordComponent, TopologyB
Long maxDuration = (Long) config.get(BulletConfig.SPECIFICATION_MAX_DURATION);
Long defaultSize = (Long) config.get(BulletConfig.AGGREGATION_DEFAULT_SIZE);
Long maxSize = (Long) config.get(BulletConfig.AGGREGATION_MAX_SIZE);
Integer microBatchSize = ((Number) config.get(BulletConfig.RAW_AGGREGATION_MICRO_BATCH_SIZE)).intValue();
Integer tickInterval = ((Number) config.get(BulletConfig.TICK_INTERVAL_SECS)).intValue();

builder.setSpout(TopologyConstants.DRPC_COMPONENT, new DRPCSpout(function), drpcSpoutParallelism)
Expand Down Expand Up @@ -202,6 +203,7 @@ public static void submit(BulletConfig config, String recordComponent, TopologyB
stormConfig.put(BulletConfig.SPECIFICATION_MAX_DURATION, maxDuration);
stormConfig.put(BulletConfig.AGGREGATION_DEFAULT_SIZE, defaultSize);
stormConfig.put(BulletConfig.AGGREGATION_MAX_SIZE, maxSize);
stormConfig.put(BulletConfig.RAW_AGGREGATION_MICRO_BATCH_SIZE, microBatchSize);

// Metrics
Boolean enableMetrics = (Boolean) config.get(BulletConfig.TOPOLOGY_METRICS_ENABLE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ private void addToRecord(Map.Entry<GroupOperation, Number> metric, BulletRecord
record.setDouble(getResultName(operation), calculateAvg(value, operation.getField()));
break;
case COUNT_FIELD:
// Internal use only for AVG. Not exposed.
break;
case MIN:
case MAX:
Expand Down Expand Up @@ -201,9 +202,10 @@ public static String getResultName(GroupOperation operation) {
* @return A reified object or null if not successful.
*/
public static GroupData fromBytes(byte[] data) {
try {
try (
ByteArrayInputStream bis = new ByteArrayInputStream(data);
ObjectInputStream ois = new ObjectInputStream(bis);
) {
return (GroupData) ois.readObject();
} catch (IOException | ClassNotFoundException | RuntimeException e) {
log.error("Could not reify a GroupData from raw data {}", data);
Expand All @@ -219,11 +221,11 @@ public static GroupData fromBytes(byte[] data) {
* @return the serialized byte[] or null if not successful.
*/
public static byte[] toBytes(GroupData metric) {
try {
try (
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
) {
oos.writeObject(metric);
oos.close();
return bos.toByteArray();
} catch (IOException | RuntimeException e) {
log.error("Could not serialize given GroupData contents", metric.metrics);
Expand Down
128 changes: 75 additions & 53 deletions src/main/java/com/yahoo/bullet/operations/aggregations/Raw.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,50 +5,41 @@
*/
package com.yahoo.bullet.operations.aggregations;

import com.yahoo.bullet.BulletConfig;
import com.yahoo.bullet.parsing.Aggregation;
import com.yahoo.bullet.record.BulletRecord;
import lombok.extern.slf4j.Slf4j;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
* Implements the LIMIT operation on multiple raw {@link BulletRecord}.
*
* A call to {@link #getSerializedAggregation()} will <strong>only returns and removes </strong> the oldest consumed
* record. It does <strong>not</strong> serialize the entire aggregation. This is because the strategy is currently
* meant to be used in two contexts or modes:
* A call to {@link #getSerializedAggregation()} will return and removes the current collection of records, which
* is a {@link List} of {@link BulletRecord}.
*
* 1) Micro-Batch: For each {@link #consume(BulletRecord)} call, a call to {@link #getSerializedAggregation()} will
* return the successful serialized consumed record. {@link #isMicroBatch()} will return true till the record is
* serialized. Further calls to get the serialized record will return null till the next successful
* consumption. Checks for micro batching will also return false.
* A call to {@link #combine(byte[])} with the result of {@link #getSerializedAggregation()} will combine records from
* the {@link List} till the aggregation size is reached.
*
* This {@link Strategy} will consume till the specified aggregation size is reached.
*
* To reiterate, {@link #getSerializedAggregation()} <strong>only</strong> returns <strong>once</strong>the serialized
* byte[] representing the oldest successfully consumed {@link BulletRecord}. In other words, the size of the
* micro-batch is <strong>one</strong> record. This is done in order to not store too many records when simply
* consuming.
*
* 2) Combining: For each {@link #combine(byte[])} call (where each byte[] should be a single record because that is
* the result of {@link #getSerializedAggregation()}), a call to {@link #getAggregation()} will return a {@link List}
* combined so far. Further calls will <strong>return null</strong>. This {@link Strategy} will also only
* combine till the specified aggregation size is reached. In this mode, the strategy is not doing micro-batching.
*
* If you mix and match these calls, the aggregation will work as intended (consume and combine till the aggregation
* size is reached) but will be inefficient when serializing. Serializing will still only serialize the oldest consumed
* record and will be a O(n) operation where n is the number of records consumed/combined so far.
* This {@link Strategy} will only consume or combine till the specified aggregation size is reached.
*
*/
@Slf4j
public class Raw implements Strategy {
public static final Integer DEFAULT_MICRO_BATCH_SIZE = 1;
private List<BulletRecord> aggregate = new ArrayList<>();

private Integer size;
private int consumed = 0;
private int combined = 0;
private int microBatchSize = DEFAULT_MICRO_BATCH_SIZE;

/**
* Constructor that takes in an {@link Aggregation}. The size of the aggregation is used as a LIMIT
Expand All @@ -58,6 +49,8 @@ public class Raw implements Strategy {
*/
public Raw(Aggregation aggregation) {
size = aggregation.getSize();
microBatchSize = ((Number) aggregation.getConfiguration().getOrDefault(BulletConfig.RAW_AGGREGATION_MICRO_BATCH_SIZE,
DEFAULT_MICRO_BATCH_SIZE)).intValue();
}

@Override
Expand All @@ -68,7 +61,7 @@ public boolean isAcceptingData() {
@Override
public boolean isMicroBatch() {
// Anything more than a single record is a micro-batch
return aggregate.size() > 0;
return aggregate.size() >= microBatchSize;
}

@Override
Expand All @@ -81,56 +74,85 @@ public void consume(BulletRecord data) {
}

/**
* In the case of a Raw aggregation, serializing means return the serialized version of the last
* {@link BulletRecord} seen. Once the data has been serialized, further calls to obtain it again
* will result in nulls. In other words, this method can only return a valid byte[] once per
* successful consume call.
* Since {@link #getSerializedAggregation()} returns a {@link List} of {@link BulletRecord}, this method consumes
* that list. If the deserialized List has a size that takes the aggregated records above the aggregation size, only
* the first X records in the List will be combined till the size is reached.
*
* @return the serialized byte[] representing the last {@link BulletRecord} or null if it could not.
* @param serializedAggregation A serialized {@link List} of {@link BulletRecord}.
*/
@Override
public byte[] getSerializedAggregation() {
if (aggregate.isEmpty()) {
return null;
public void combine(byte[] serializedAggregation) {
if (!isAcceptingData() || serializedAggregation == null) {
return;
}
// This call is cheap if we are sticking to the consume -> getSerializedAggregation, since removing a single
// element from the list does not do a array copy.
BulletRecord batch = aggregate.remove(0);
try {
return batch.getAsByteArray();
} catch (IOException ioe) {
log.error("Could not serialize BulletRecord", batch);
List<BulletRecord> batch = read(serializedAggregation);
if (batch.isEmpty()) {
return;
}
int batchSize = batch.size();
int maximumLeft = size - aggregate.size();
if (batchSize <= maximumLeft) {
aggregate.addAll(batch);
combined += batchSize;
} else {
aggregate.addAll(batch.subList(0, maximumLeft));
combined += maximumLeft;
}
return null;
}

/**
* Since {@link #getSerializedAggregation()} returns a single serialized {@link BulletRecord}, this method consumes
* a single serialized {@link BulletRecord}. It also stops combining if more than the maximum allowed by the
* {@link Aggregation} has been reached.
* In the case of a Raw aggregation, serializing means return the serialized {@link List} of
* {@link BulletRecord} seen before the last call to this method. Once the data has been serialized, further calls
* to obtain it again without calling {@link #consume(BulletRecord)} or {@link #combine(byte[])} will result in
* nulls.
*
* @param serializedAggregation A serialized {@link BulletRecord}.
* @return the serialized byte[] representing the {@link List} of {@link BulletRecord} or null if it could not.
*/
@Override
public void combine(byte[] serializedAggregation) {
if (!isAcceptingData() || serializedAggregation == null) {
return;
public byte[] getSerializedAggregation() {
if (aggregate.isEmpty()) {
return null;
}
combined++;
aggregate.add(new BulletRecord(serializedAggregation));
List<BulletRecord> batch = aggregate;
aggregate = new ArrayList<>();
return write(batch);
}

/**
* Gets the combined records so far.
* Gets the aggregated records so far since the last call to {@link #getSerializedAggregation()}.
*
* @return a List of the combined {@link BulletRecord} so far. The List has a size that is at most the maximum
* specified by the {@link Aggregation}.
*/
@Override
public List<BulletRecord> getAggregation() {
// Guaranteed to be <= size
List<BulletRecord> batch = aggregate;
aggregate = new ArrayList<>();
return batch;
return aggregate;
}

private byte[] write(List<BulletRecord> batch) {
try (
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos)
) {
oos.writeObject(batch);
return bos.toByteArray();
} catch (IOException ioe) {
log.error("Could not serialize batch {}", batch);
log.error("Exception was ", ioe);
}
return null;
}

private List<BulletRecord> read(byte[] batch) {
try (
ByteArrayInputStream bis = new ByteArrayInputStream(batch);
ObjectInputStream ois = new ObjectInputStream(bis)
) {
return (List<BulletRecord>) ois.readObject();
} catch (IOException | ClassNotFoundException | ClassCastException e) {
log.error("Could not deserialize batch {}", batch);
log.error("Exception was ", e);
}
return Collections.emptyList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ default boolean isAcceptingData() {
default boolean isMicroBatch() {
return false;
}

/**
* Consumes a single {@link BulletRecord} into the aggregation.
*
Expand Down
12 changes: 11 additions & 1 deletion src/main/resources/bullet_defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ topology.tick.interval.secs: 5
topology.join.bolt.error.tick.timeout: 3

# This is the number of ticks for which a rule will be buffered past its expiry in order to wait for
# aggregations to trickle in from the filter bolts.
# aggregations to trickle in from the Filter Bolts.
topology.join.bolt.rule.tick.timeout: 3

# The default duration in milliseconds for a rule if one has not been specified.
Expand All @@ -65,6 +65,16 @@ rule.aggregation.default.size: 1
# The maximum number of records that will be aggregated per rule. Anything greater will be clamped to this value.
rule.aggregation.max.size: 30

# The maximum number of records that will be collected in the Filter Bolt till it is emitted - i.e. a micro-batch.
# Leaving this at 1 emits your raw aggregation records as soon as they are received in the Filter Bolt. This makes
# your raw aggregation query run snappier if the total number of matched records across the Filter Bolts exceeds
# the number of records your query is looking for but individually each Filter Bolt does not find enough records to
# satisfy the query. Since the records are emitted immediately, the Join Bolt will terminate your query as soon
# as the total records are received instead of waiting for the micro-batch size to be reached.
# If you set this too high (for example, higher than the query size), you will wait the entire duration of the query,
# and the number of ticks specified in topology.join.bolt.rule.tick.timeout.
rule.aggregation.raw.micro.batch.size: 1

# Enable logging meta information in the results. Configured metadata will be add to the meta section of the
# results: {"meta": {}, "records": []}
result.metadata.enable: true
Expand Down
40 changes: 34 additions & 6 deletions src/test/java/com/yahoo/bullet/TestHelpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,13 @@
import com.yahoo.bullet.record.BulletRecord;
import org.testng.Assert;

import java.io.IOException;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;

public class TestHelpers {
Expand All @@ -33,11 +38,34 @@ public static void assertJSONEquals(String actual, String expected) {
Assert.assertEquals(first, second);
}

public static byte[] getByteArray(BulletRecord record) {
try {
return record.getAsByteArray();
} catch (IOException ioe) {
throw new RuntimeException(ioe);
public static byte[] getListBytes(BulletRecord... records) {
List<BulletRecord> asList = new ArrayList<>();
for (BulletRecord record : records) {
asList.add(record);
}
return serialize(asList);
}

public static byte[] serialize(Object o) {
try (
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
) {
oos.writeObject(o);
return bos.toByteArray();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

public static Object deserialize(byte[] o) {
try (
ByteArrayInputStream bis = new ByteArrayInputStream(o);
ObjectInputStream ois = new ObjectInputStream(bis);
) {
return ois.readObject();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
Loading

0 comments on commit 574f2d8

Please sign in to comment.