diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java index 067dcca0c6..1815006cd4 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java @@ -357,7 +357,7 @@ public void sort() throws IOException { } Preconditions.checkArgument(buffers.get(bufferIndex) != null, "block should not be empty"); //TODO: fix per item being passed. - span = new SortSpan((ByteBuffer)buffers.get(bufferIndex).clear(), (1024*1024), + span = new SortSpan((ByteBuffer)buffers.get(bufferIndex).clear(), items, perItem, ConfigUtils.getIntermediateOutputKeyComparator(this.conf)); } else { // queue up the sort diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java index 84ec143808..3955676076 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java @@ -858,6 +858,32 @@ public void testWithLargeKeyValueWithMinBlockSize() throws IOException { basicTest(1, 5, (2 << 20), (48 * 1024l * 1024l), 16 << 20); } + @Test + public void testWithLargeRecordAndLowMemory() throws IOException { + this.numOutputs = 1; + this.initialAvailableMem = 1 * 1024 * 1024; + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, true); + conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB, 1); + PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, numOutputs, initialAvailableMem); + + // Set the record size to exceed 2k to trigger bug described in TEZ-4542. + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < 3072; i++) { + builder.append("1"); + } + Text value = new Text(builder.toString()); + long size = 50 * 1024 * 1024; + while (size > 0) { + Text key = RandomTextGenerator.generateSentence(); + sorter.write(key, value); + size -= key.getLength(); + } + + sorter.flush(); + sorter.close(); + verifyOutputPermissions(outputContext.getUniqueIdentifier()); + } + private void verifyOutputPermissions(String spillId) throws IOException { String subpath = Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + "/" + spillId + "/" + Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING;