diff --git a/src/main/java/com/yahoo/bullet/Topology.java b/src/main/java/com/yahoo/bullet/Topology.java index b45a6c8f..18485030 100644 --- a/src/main/java/com/yahoo/bullet/Topology.java +++ b/src/main/java/com/yahoo/bullet/Topology.java @@ -159,13 +159,13 @@ public static void submit(BulletConfig config, String recordComponent, TopologyB .setMemoryLoad(drpcSpoutMemoryOnHeapLoad, drpcSpoutMemoryOffHeapLoad); builder.setBolt(TopologyConstants.PREPARE_COMPONENT, new PrepareRequest(), prepareBoltParallelism) - .localOrShuffleGrouping(TopologyConstants.DRPC_COMPONENT) + .shuffleGrouping(TopologyConstants.DRPC_COMPONENT) .setCPULoad(prepareBoltCPULoad) .setMemoryLoad(prepareBoltMemoryOnHeapLoad, prepareBoltMemoryOffHeapLoad); // Hook in the source of the BulletRecords builder.setBolt(TopologyConstants.FILTER_COMPONENT, new FilterBolt(recordComponent, tickInterval), filterBoltParallelism) - .localOrShuffleGrouping(recordComponent) + .shuffleGrouping(recordComponent) .allGrouping(TopologyConstants.PREPARE_COMPONENT, TopologyConstants.ARGS_STREAM) .setCPULoad(filterBoltCPULoad) .setMemoryLoad(filterBoltMemoryOnheapLoad, filterBoltMemoryOffHeapLoad); @@ -178,7 +178,7 @@ public static void submit(BulletConfig config, String recordComponent, TopologyB .setMemoryLoad(joinBoltMemoryOnHeapLoad, joinBoltMemoryOffHeapLoad); builder.setBolt(TopologyConstants.RETURN_COMPONENT, new ReturnResults(), returnBoltParallelism) - .localOrShuffleGrouping(TopologyConstants.JOIN_COMPONENT, TopologyConstants.JOIN_STREAM) + .shuffleGrouping(TopologyConstants.JOIN_COMPONENT, TopologyConstants.JOIN_STREAM) .setCPULoad(returnBoltCPULoad) .setMemoryLoad(returnBoltMemoryOnHeapLoad, returnBoltMemoryOffHeapLoad);