Skip to content

Commit

Permalink
Removing local grouping for reducing data skew
Browse files Browse the repository at this point in the history
  • Loading branch information
akshaisarma committed Dec 14, 2016
1 parent 1a024b3 commit e05481a
Showing 1 changed file with 3 additions and 3 deletions.
6 changes: 3 additions & 3 deletions src/main/java/com/yahoo/bullet/Topology.java
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,13 @@ public static void submit(BulletConfig config, String recordComponent, TopologyB
.setMemoryLoad(drpcSpoutMemoryOnHeapLoad, drpcSpoutMemoryOffHeapLoad);

builder.setBolt(TopologyConstants.PREPARE_COMPONENT, new PrepareRequest(), prepareBoltParallelism)
.localOrShuffleGrouping(TopologyConstants.DRPC_COMPONENT)
.shuffleGrouping(TopologyConstants.DRPC_COMPONENT)
.setCPULoad(prepareBoltCPULoad)
.setMemoryLoad(prepareBoltMemoryOnHeapLoad, prepareBoltMemoryOffHeapLoad);

// Hook in the source of the BulletRecords
builder.setBolt(TopologyConstants.FILTER_COMPONENT, new FilterBolt(recordComponent, tickInterval), filterBoltParallelism)
.localOrShuffleGrouping(recordComponent)
.shuffleGrouping(recordComponent)
.allGrouping(TopologyConstants.PREPARE_COMPONENT, TopologyConstants.ARGS_STREAM)
.setCPULoad(filterBoltCPULoad)
.setMemoryLoad(filterBoltMemoryOnheapLoad, filterBoltMemoryOffHeapLoad);
Expand All @@ -178,7 +178,7 @@ public static void submit(BulletConfig config, String recordComponent, TopologyB
.setMemoryLoad(joinBoltMemoryOnHeapLoad, joinBoltMemoryOffHeapLoad);

builder.setBolt(TopologyConstants.RETURN_COMPONENT, new ReturnResults(), returnBoltParallelism)
.localOrShuffleGrouping(TopologyConstants.JOIN_COMPONENT, TopologyConstants.JOIN_STREAM)
.shuffleGrouping(TopologyConstants.JOIN_COMPONENT, TopologyConstants.JOIN_STREAM)
.setCPULoad(returnBoltCPULoad)
.setMemoryLoad(returnBoltMemoryOnHeapLoad, returnBoltMemoryOffHeapLoad);

Expand Down

0 comments on commit e05481a

Please sign in to comment.