From e05481a831e16357082ec836d3d3c7741b3caa7f Mon Sep 17 00:00:00 2001 From: Akshai Sarma Date: Wed, 14 Dec 2016 15:47:29 -0800 Subject: [PATCH] Removing local grouping for reducing data skew --- src/main/java/com/yahoo/bullet/Topology.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/yahoo/bullet/Topology.java b/src/main/java/com/yahoo/bullet/Topology.java index b45a6c8f..18485030 100644 --- a/src/main/java/com/yahoo/bullet/Topology.java +++ b/src/main/java/com/yahoo/bullet/Topology.java @@ -159,13 +159,13 @@ public static void submit(BulletConfig config, String recordComponent, TopologyB .setMemoryLoad(drpcSpoutMemoryOnHeapLoad, drpcSpoutMemoryOffHeapLoad); builder.setBolt(TopologyConstants.PREPARE_COMPONENT, new PrepareRequest(), prepareBoltParallelism) - .localOrShuffleGrouping(TopologyConstants.DRPC_COMPONENT) + .shuffleGrouping(TopologyConstants.DRPC_COMPONENT) .setCPULoad(prepareBoltCPULoad) .setMemoryLoad(prepareBoltMemoryOnHeapLoad, prepareBoltMemoryOffHeapLoad); // Hook in the source of the BulletRecords builder.setBolt(TopologyConstants.FILTER_COMPONENT, new FilterBolt(recordComponent, tickInterval), filterBoltParallelism) - .localOrShuffleGrouping(recordComponent) + .shuffleGrouping(recordComponent) .allGrouping(TopologyConstants.PREPARE_COMPONENT, TopologyConstants.ARGS_STREAM) .setCPULoad(filterBoltCPULoad) .setMemoryLoad(filterBoltMemoryOnheapLoad, filterBoltMemoryOffHeapLoad); @@ -178,7 +178,7 @@ public static void submit(BulletConfig config, String recordComponent, TopologyB .setMemoryLoad(joinBoltMemoryOnHeapLoad, joinBoltMemoryOffHeapLoad); builder.setBolt(TopologyConstants.RETURN_COMPONENT, new ReturnResults(), returnBoltParallelism) - .localOrShuffleGrouping(TopologyConstants.JOIN_COMPONENT, TopologyConstants.JOIN_STREAM) + .shuffleGrouping(TopologyConstants.JOIN_COMPONENT, TopologyConstants.JOIN_STREAM) .setCPULoad(returnBoltCPULoad) .setMemoryLoad(returnBoltMemoryOnHeapLoad, returnBoltMemoryOffHeapLoad);