diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/CsvWriter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/CsvWriter.java index 88547cc75721..d3587336da73 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/CsvWriter.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/CsvWriter.java @@ -25,6 +25,14 @@ public class CsvWriter extends FileWriter { + private String _headers; + + @Override + public void init(WriterSpec spec) { + super.init(spec); + _headers = StringUtils.join(_spec.getGenerator().nextRow().keySet(), ","); + } + @Override protected String generateRow(DataGenerator generator) { Map row = generator.nextRow(); @@ -38,6 +46,12 @@ protected String generateRow(DataGenerator generator) { return StringUtils.join(values, ","); } + @Override + protected void preprocess(java.io.FileWriter writer) + throws Exception { + writer.append(_headers).append('\n'); + } + private Object serializeIfMultiValue(Object obj) { if (obj instanceof List) { return StringUtils.join((List) obj, ";"); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/FileWriter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/FileWriter.java index b74c2ebbdebd..f02f245ced42 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/FileWriter.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/FileWriter.java @@ -20,7 +20,6 @@ import java.io.File; import java.util.Objects; -import org.apache.commons.lang.StringUtils; import org.apache.pinot.controller.recommender.data.generator.DataGenerator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,7 +28,7 @@ public abstract class FileWriter implements Writer { private static final Logger LOGGER = LoggerFactory.getLogger(FileWriter.class); - private FileWriterSpec _spec; + protected FileWriterSpec _spec; @Override public void init(WriterSpec spec) { _spec = (FileWriterSpec) spec; @@ -38,21 +37,29 @@ public void init(WriterSpec spec) { @Override public void write() throws Exception { - final int numPerFiles = (int) (_spec.getTotalDocs() / _spec.getNumFiles()); - final String headers = StringUtils.join(_spec.getGenerator().nextRow().keySet(), ","); + long totalDocs = _spec.getTotalDocs(); + final long docsPerFile = (long) Math.ceil((double) totalDocs / _spec.getNumFiles()); final String extension = getExtension() == null ? "" : String.format(".%s", getExtension()); - for (int i = 0; i < _spec.getNumFiles(); i++) { + long ingestedDocs = 0; + int fileIndex = 0; + while (ingestedDocs < totalDocs) { try (java.io.FileWriter writer = - new java.io.FileWriter(new File(_spec.getBaseDir(), String.format("output_%d%s", i, extension)))) { - writer.append(headers).append('\n'); - for (int j = 0; j < numPerFiles; j++) { + new java.io.FileWriter(new File(_spec.getBaseDir(), String.format("output_%d%s", fileIndex, extension)))) { + preprocess(writer); + for (int j = 0; j < docsPerFile && ingestedDocs < totalDocs; j++) { String appendString = generateRow(_spec.getGenerator()); writer.append(appendString).append('\n'); + ingestedDocs++; } } + fileIndex++; } } + protected void preprocess(java.io.FileWriter writer) + throws Exception { + } + protected String getExtension() { return null; }