Skip to content

Commit

Permalink
Merge pull request #6 from yahoo/count-distinct-aggregation
Browse files Browse the repository at this point in the history
Adding COUNT DISTINCT with tests
  • Loading branch information
akshaisarma authored Jan 9, 2017
2 parents 574f2d8 + 9c02071 commit 9f563ea
Show file tree
Hide file tree
Showing 35 changed files with 1,420 additions and 316 deletions.
30 changes: 16 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,11 @@ the size of the aggregation (this applies for aggregations high cardinality aggr

The current aggregation types that are supported are:

| Aggregation | Meaning |
| ----------- | ------- |
| GROUP | The resulting output would be a record containing the result of an operation for each unique group in the specified fields (only supported with no fields at this time, which groups all records) |
| LIMIT | The resulting output would be at most the number specified in size. |
| Aggregation | Meaning |
| -------------- | ------- |
| GROUP | The resulting output would be a record containing the result of an operation for each unique group in the specified fields (only supported with no fields at this time, which groups all records) |
| COUNT DISTINCT | Computes the number of distinct elements in the fields. (May be approximate) |
| LIMIT | The resulting output would be at most the number specified in size. |

We currently only support GROUP operations if there is no fields being grouped on. In other words, you get the results of the operation on all records that matched your filters.

Expand Down Expand Up @@ -226,6 +227,16 @@ Attributes for GROUP:
}
```

Attributes for COUNT DISTINCT:

```javascript
"attributes": {
"newName": "the name of the resulting count column"
}
```

Note that the new names you specify in the fields map for aggregations do not apply. You must use the attributes here to give your resulting output count column a name.

See the [examples section](#examples) for a detailed description of how to perform these aggregations.

#### Coming Soon
Expand All @@ -239,12 +250,11 @@ sublinear space. We will also use Sketches as a way to control high cardinality
the Sketching data structure to drop excess groups. It is up to the user launching Bullet to determine to set Sketch sizes large or
small enough for to satisfy the queries that will be performed on that instance of Bullet.

Using Sketches, we are working on other aggregations including but not limited to:
Using Sketches, we have implemented COUNT DISTINCT and are working on other aggregations including but not limited to:

| Aggregation | Meaning |
| -------------- | ------- |
| GROUP | We currently support GROUP with no fields (group all); grouping on specific fields will be supported soon |
| COUNT DISTINCT | Computes the number of distinct elements in the column |
| TOP K | Returns the top K most freqently appearing values in the column |
| DISTRIBUTION | Computes distributions of the elements in the column |

Expand All @@ -258,14 +268,6 @@ Attributes for TOP K:
}
```

Attributes for COUNT DISTINCT:

```javascript
"attributes": {
"newName": "the name of the resulting count column"
}
```

The attributes for the DISTRIBUTION aggregation haven't been decided yet.

### Constraints
Expand Down
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
<storm.version>1.0.2</storm.version>
<storm.metrics.version>1.0.2</storm.metrics.version>
<bullet.record.version>0.0.3</bullet.record.version>
<sketches.version>0.8.3</sketches.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -89,6 +90,11 @@
<artifactId>bullet-record</artifactId>
<version>${bullet.record.version}</version>
</dependency>
<dependency>
<groupId>com.yahoo.datasketches</groupId>
<artifactId>sketches-core</artifactId>
<version>${sketches.version}</version>
</dependency>
<dependency>
<groupId>net.sf.jopt-simple</groupId>
<artifactId>jopt-simple</artifactId>
Expand Down
34 changes: 34 additions & 0 deletions src/main/java/com/yahoo/bullet/BulletConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import static java.util.Arrays.asList;

@Slf4j
public class BulletConfig extends Config {
Expand Down Expand Up @@ -45,8 +51,15 @@ public class BulletConfig extends Config {
public static final String SPECIFICATION_MAX_DURATION = "rule.max.duration";
public static final String AGGREGATION_DEFAULT_SIZE = "rule.aggregation.default.size";
public static final String AGGREGATION_MAX_SIZE = "rule.aggregation.max.size";
public static final String AGGREGATION_COMPOSITE_FIELD_SEPARATOR = "rule.aggregation.composite.field.separator";

public static final String RAW_AGGREGATION_MICRO_BATCH_SIZE = "rule.aggregation.raw.micro.batch.size";

public static final String COUNT_DISTINCT_AGGREGATION_SKETCH_ENTRIES = "rule.aggregation.count.distinct.sketch.entries";
public static final String COUNT_DISTINCT_AGGREGATION_SKETCH_SAMPLING = "rule.aggregation.count.distinct.sketch.sampling";
public static final String COUNT_DISTINCT_AGGREGATION_SKETCH_FAMILY = "rule.aggregation.count.distinct.sketch.family";
public static final String COUNT_DISTINCT_AGGREGATION_SKETCH_RESIZE_FACTOR = "rule.aggregation.count.distinct.sketch.resize.factor";

public static final String RECORD_INJECT_TIMESTAMP = "record.inject.timestamp.enable";
public static final String RECORD_INJECT_TIMESTAMP_KEY = "record.inject.timestamp.key";

Expand All @@ -55,6 +68,18 @@ public class BulletConfig extends Config {
public static final String RESULT_METADATA_METRICS_CONCEPT_KEY = "name";
public static final String RESULT_METADATA_METRICS_NAME_KEY = "key";

public static final String RESULT_METADATA_METRICS_MAPPING = "result.metadata.metrics.mapping";

public static Set<String> TOPOLOGY_SUBMISSION_SETTINGS =
new HashSet<>(asList(DRPC_SPOUT_PARALLELISM, DRPC_SPOUT_CPU_LOAD, DRPC_SPOUT_MEMORY_ON_HEAP_LOAD,
DRPC_SPOUT_MEMORY_OFF_HEAP_LOAD, PREPARE_BOLT_PARALLELISM, PREPARE_BOLT_CPU_LOAD,
PREPARE_BOLT_MEMORY_ON_HEAP_LOAD, PREPARE_BOLT_MEMORY_OFF_HEAP_LOAD,
FILTER_BOLT_PARALLELISM, FILTER_BOLT_CPU_LOAD, FILTER_BOLT_MEMORY_ON_HEAP_LOAD,
FILTER_BOLT_MEMORY_OFF_HEAP_LOAD, JOIN_BOLT_PARALLELISM, JOIN_BOLT_CPU_LOAD,
JOIN_BOLT_MEMORY_ON_HEAP_LOAD, JOIN_BOLT_MEMORY_OFF_HEAP_LOAD,
RETURN_BOLT_PARALLELISM, RETURN_BOLT_CPU_LOAD, RETURN_BOLT_MEMORY_ON_HEAP_LOAD,
RETURN_BOLT_MEMORY_OFF_HEAP_LOAD, TOPOLOGY_SCHEDULER, TOPOLOGY_FUNCTION,
TOPOLOGY_NAME, TOPOLOGY_WORKERS, TOPOLOGY_DEBUG, TOPOLOGY_METRICS_ENABLE));
/**
* Constructor that loads specific file augmented with defaults.
*
Expand All @@ -73,4 +98,13 @@ public BulletConfig(String file) throws IOException {
public BulletConfig() throws IOException {
super();
}

/**
* Gets all the settings besides the {@link #TOPOLOGY_SUBMISSION_SETTINGS}.
*
* @return A {@link Map} of the other settings.
*/
public Map<String, Object> getBulletSettingsOnly() {
return getAllBut(Optional.of(TOPOLOGY_SUBMISSION_SETTINGS));
}
}
35 changes: 3 additions & 32 deletions src/main/java/com/yahoo/bullet/Topology.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ public static void submit(BulletConfig config, String recordComponent, TopologyB
Objects.requireNonNull(recordComponent);
Objects.requireNonNull(builder);

String name = (String) config.get(BulletConfig.TOPOLOGY_NAME);
String function = (String) config.get(BulletConfig.TOPOLOGY_FUNCTION);

Number drpcSpoutParallelism = (Number) config.get(BulletConfig.DRPC_SPOUT_PARALLELISM);
Expand Down Expand Up @@ -148,11 +149,6 @@ public static void submit(BulletConfig config, String recordComponent, TopologyB
Number returnBoltMemoryOnHeapLoad = (Number) config.get(BulletConfig.RETURN_BOLT_MEMORY_ON_HEAP_LOAD);
Number returnBoltMemoryOffHeapLoad = (Number) config.get(BulletConfig.RETURN_BOLT_MEMORY_OFF_HEAP_LOAD);

Long defaultDuration = (Long) config.get(BulletConfig.SPECIFICATION_DEFAULT_DURATION);
Long maxDuration = (Long) config.get(BulletConfig.SPECIFICATION_MAX_DURATION);
Long defaultSize = (Long) config.get(BulletConfig.AGGREGATION_DEFAULT_SIZE);
Long maxSize = (Long) config.get(BulletConfig.AGGREGATION_MAX_SIZE);
Integer microBatchSize = ((Number) config.get(BulletConfig.RAW_AGGREGATION_MICRO_BATCH_SIZE)).intValue();
Integer tickInterval = ((Number) config.get(BulletConfig.TICK_INTERVAL_SECS)).intValue();

builder.setSpout(TopologyConstants.DRPC_COMPONENT, new DRPCSpout(function), drpcSpoutParallelism)
Expand Down Expand Up @@ -198,41 +194,16 @@ public static void submit(BulletConfig config, String recordComponent, TopologyB
Number workers = (Number) config.get(BulletConfig.TOPOLOGY_WORKERS);
stormConfig.setNumWorkers(workers.intValue());

// Defaults for Rules
stormConfig.put(BulletConfig.SPECIFICATION_DEFAULT_DURATION, defaultDuration);
stormConfig.put(BulletConfig.SPECIFICATION_MAX_DURATION, maxDuration);
stormConfig.put(BulletConfig.AGGREGATION_DEFAULT_SIZE, defaultSize);
stormConfig.put(BulletConfig.AGGREGATION_MAX_SIZE, maxSize);
stormConfig.put(BulletConfig.RAW_AGGREGATION_MICRO_BATCH_SIZE, microBatchSize);

// Metrics
Boolean enableMetrics = (Boolean) config.get(BulletConfig.TOPOLOGY_METRICS_ENABLE);
if (enableMetrics) {
stormConfig.registerMetricsConsumer(LoggingMetricsConsumer.class);
stormConfig.put(Config.TOPOLOGY_WORKER_METRICS, METRICS);
}

// Inject timestamp into record. Only applies to raw records (not aggregations)
Boolean shouldInjectTimestamp = (Boolean) config.get(BulletConfig.RECORD_INJECT_TIMESTAMP);
String timestampKey = (String) config.get(BulletConfig.RECORD_INJECT_TIMESTAMP_KEY);
stormConfig.put(BulletConfig.RECORD_INJECT_TIMESTAMP, shouldInjectTimestamp);
stormConfig.put(BulletConfig.RECORD_INJECT_TIMESTAMP_KEY, timestampKey);

// Join Bolt Error Tick Timeout (how many ticks will an error be buffered before it is dropped)
Number joinBoltErrorTickTimeout = (Number) config.get(BulletConfig.JOIN_BOLT_ERROR_TICK_TIMEOUT);
stormConfig.put(BulletConfig.JOIN_BOLT_ERROR_TICK_TIMEOUT, joinBoltErrorTickTimeout);
// Put the rest of the Bullet settings without checking their types
stormConfig.putAll(config.getBulletSettingsOnly());

// Join Bolt Rule Tick Timeout (how many ticks a rule will be buffered post expiry before it is dropped)
Number joinBoltRuleTickTimeout = (Number) config.get(BulletConfig.JOIN_BOLT_RULE_TICK_TIMEOUT);
stormConfig.put(BulletConfig.JOIN_BOLT_RULE_TICK_TIMEOUT, joinBoltRuleTickTimeout);

// Metadata
Boolean enableMetadata = (Boolean) config.get(BulletConfig.RESULT_METADATA_ENABLE);
List<Map<String, String>> metadataConfig = (List<Map<String, String>>) config.get(BulletConfig.RESULT_METADATA_METRICS);
stormConfig.put(BulletConfig.RESULT_METADATA_ENABLE, enableMetadata);
stormConfig.put(BulletConfig.RESULT_METADATA_METRICS, metadataConfig);

String name = (String) config.get(BulletConfig.TOPOLOGY_NAME);
StormSubmitter.submitTopology(name, stormConfig, builder.createTopology());
}

Expand Down
60 changes: 16 additions & 44 deletions src/main/java/com/yahoo/bullet/drpc/JoinBolt.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
import com.yahoo.bullet.BulletConfig;
import com.yahoo.bullet.parsing.Error;
import com.yahoo.bullet.parsing.ParsingException;
import com.yahoo.bullet.record.BulletRecord;
import com.yahoo.bullet.result.Clip;
import com.yahoo.bullet.result.Metadata;
import com.yahoo.bullet.result.Metadata.Concept;
import com.yahoo.bullet.tracing.AggregationRule;
import lombok.extern.slf4j.Slf4j;
import org.apache.storm.task.OutputCollector;
Expand All @@ -25,16 +25,11 @@

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Consumer;

import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;

@Slf4j
public class JoinBolt extends RuleBolt<AggregationRule> {
public static final String JOIN_STREAM = Utils.DEFAULT_STREAM_ID;
Expand All @@ -43,17 +38,13 @@ public class JoinBolt extends RuleBolt<AggregationRule> {
public static final int DEFAULT_ERROR_TICKOUT = 3;
/** This is the default number of ticks for which we will a rule post expiry. */
public static final int DEFAULT_RULE_TICKOUT = 3;
public static final Set<String> CONCEPTS = new HashSet<>(asList(Metadata.RULE_ID, Metadata.RULE_BODY, Metadata.CREATION_TIME, Metadata.TERMINATION_TIME));

private Map<Long, Tuple> activeReturns;
// For doing a LEFT OUTER JOIN between Rules and ReturnInfo if the Rule has validation issues
private RotatingMap<Long, Clip> bufferedErrors;
// For doing a LEFT OUTER JOIN between Rules and intermediate aggregation, if the aggregations are lagging.
private RotatingMap<Long, AggregationRule> bufferedRules;

private boolean isMetadataEnabled = false;
private Map<String, String> metadataKeys;

/**
* Default constructor.
*/
Expand All @@ -75,34 +66,15 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll

activeReturns = new HashMap<>();

Number errorTickoutNumber = (Number) configuration.getOrDefault(BulletConfig.JOIN_BOLT_ERROR_TICK_TIMEOUT, DEFAULT_ERROR_TICKOUT);
Number errorTickoutNumber = (Number) configuration.getOrDefault(BulletConfig.JOIN_BOLT_ERROR_TICK_TIMEOUT,
DEFAULT_ERROR_TICKOUT);
int errorTickout = errorTickoutNumber.intValue();
bufferedErrors = new RotatingMap<>(errorTickout);

Number ruleTickoutNumber = (Number) configuration.getOrDefault(BulletConfig.JOIN_BOLT_RULE_TICK_TIMEOUT, DEFAULT_RULE_TICKOUT);
Number ruleTickoutNumber = (Number) configuration.getOrDefault(BulletConfig.JOIN_BOLT_RULE_TICK_TIMEOUT,
DEFAULT_RULE_TICKOUT);
int ruleTickout = ruleTickoutNumber.intValue();
bufferedRules = new RotatingMap<>(ruleTickout);

isMetadataEnabled = (Boolean) configuration.getOrDefault(BulletConfig.RESULT_METADATA_ENABLE, false);
if (isMetadataEnabled) {
log.info("Metadata collection is enabled");
metadataKeys = getMetadataMetrics(configuration);
}
}

private static Map<String, String> getMetadataMetrics(Map configuration) {
Map<String, String> keys = new HashMap<>();
List<Map> metricsKeys = (List<Map>) configuration.getOrDefault(BulletConfig.RESULT_METADATA_METRICS, emptyList());
// For each metric configured, load the name of the field to add it to the metadata as.
for (Map m : metricsKeys) {
String concept = (String) m.get(BulletConfig.RESULT_METADATA_METRICS_CONCEPT_KEY);
String name = (String) m.get(BulletConfig.RESULT_METADATA_METRICS_NAME_KEY);
if (CONCEPTS.contains(concept)) {
log.info("Enabling logging of {} as {} in metadata", concept, name);
keys.put(concept, name);
}
}
return keys;
}

@Override
Expand Down Expand Up @@ -242,10 +214,10 @@ private void emit(Long id, AggregationRule rule, Tuple returnTuple) {
Objects.requireNonNull(returnTuple);

// TODO Anchor this tuple to all tuples that caused its emission : rule tuple, return tuple, data tuple(s)
List<BulletRecord> records = rule.getData();
Metadata meta = getMetadata(id, rule);
emit(Clip.of(records).add(meta), returnTuple);
int emitted = records.size();
Clip records = rule.getData();
records.add(getMetadata(id, rule));
emit(records, returnTuple);
int emitted = records.getRecords().size();
log.info("Rule {} has been satisfied with {} records. Cleaning up...", id, emitted);
rulesMap.remove(id);
bufferedRules.remove(id);
Expand All @@ -260,20 +232,20 @@ private void emit(Clip clip, Tuple returnTuple) {
}

private Metadata getMetadata(Long id, AggregationRule rule) {
if (!isMetadataEnabled) {
if (metadataKeys.isEmpty()) {
return null;
}
Metadata meta = new Metadata();
consumeRegisteredConcept(Metadata.RULE_ID, (k) -> meta.add(k, id));
consumeRegisteredConcept(Metadata.RULE_BODY, (k) -> meta.add(k, rule.toString()));
consumeRegisteredConcept(Metadata.CREATION_TIME, (k) -> meta.add(k, rule.getStartTime()));
consumeRegisteredConcept(Metadata.TERMINATION_TIME, (k) -> meta.add(k, rule.getLastAggregationTime()));
consumeRegisteredConcept(Concept.RULE_ID, (k) -> meta.add(k, id));
consumeRegisteredConcept(Concept.RULE_BODY, (k) -> meta.add(k, rule.toString()));
consumeRegisteredConcept(Concept.CREATION_TIME, (k) -> meta.add(k, rule.getStartTime()));
consumeRegisteredConcept(Concept.TERMINATION_TIME, (k) -> meta.add(k, rule.getLastAggregationTime()));
return meta;
}

private void consumeRegisteredConcept(String concept, Consumer<String> action) {
private void consumeRegisteredConcept(Concept concept, Consumer<String> action) {
// Only consume the concept if we have a key for it: i.e. it was registered
String key = metadataKeys.get(concept);
String key = metadataKeys.get(concept.getName());
if (key != null) {
action.accept(key);
}
Expand Down
19 changes: 18 additions & 1 deletion src/main/java/com/yahoo/bullet/drpc/RuleBolt.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
*/
package com.yahoo.bullet.drpc;

import com.yahoo.bullet.BulletConfig;
import com.yahoo.bullet.result.Metadata;
import com.yahoo.bullet.tracing.AbstractRule;
import lombok.extern.slf4j.Slf4j;
import org.apache.storm.Config;
Expand All @@ -13,6 +15,8 @@
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.tuple.Tuple;

import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.stream.Collectors;
Expand All @@ -23,6 +27,8 @@ public abstract class RuleBolt<R extends AbstractRule> implements IRichBolt {
protected int tickInterval;
protected Map configuration;
protected OutputCollector collector;
protected Map<String, String> metadataKeys;

// TODO consider a rotating map with multilevels and reinserts upon rotating instead for scalability
protected Map<Long, R> rulesMap;

Expand All @@ -43,9 +49,20 @@ public RuleBolt() {

@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.configuration = stormConf;
// stormConf is not modifyable. Need to make a copy.
this.configuration = new HashMap<>(stormConf);
this.collector = collector;
rulesMap = new LinkedHashMap<>();

// Get all known Concepts
metadataKeys = Metadata.getConceptNames(configuration, new HashSet<>(Metadata.KNOWN_CONCEPTS));
if (!metadataKeys.isEmpty()) {
log.info("Metadata collection is enabled");
log.info("Collecting metadata for these concepts:\n{}", metadataKeys);
// Add all metadataKeys back to configuration for reuse so no need refetch them on every new rule
configuration.put(BulletConfig.RESULT_METADATA_METRICS_MAPPING, metadataKeys);
}

}

/**
Expand Down
Loading

0 comments on commit 9f563ea

Please sign in to comment.