From a20031528da3c29371f3122eccf745061905a00d Mon Sep 17 00:00:00 2001 From: Shounak kulkarni Date: Tue, 26 Dec 2023 13:21:16 +0530 Subject: [PATCH] Data generator reorganisation (#12122) --- .../data/DataGenerationHelpers.java | 119 ++++++++++++++++++ .../data/generator/DataGenerator.java | 88 +++---------- .../data/generator/DataGeneratorSpec.java | 28 ++++- .../{generator => writer}/AvroWriter.java | 72 ++++++++--- .../data/writer/AvroWriterSpec.java | 56 +++++++++ .../recommender/data/writer/CsvWriter.java | 52 ++++++++ .../recommender/data/writer/FileWriter.java | 74 +++++++++++ .../data/writer/FileWriterSpec.java | 47 +++++++ .../recommender/data/writer/JsonWriter.java | 43 +++++++ .../recommender/data/writer/Writer.java | 49 ++++++++ .../recommender/data/writer/WriterSpec.java | 34 +++++ .../provisioning/MemoryEstimator.java | 5 +- .../admin/command/GenerateDataCommand.java | 60 +-------- 13 files changed, 577 insertions(+), 150 deletions(-) create mode 100644 pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/DataGenerationHelpers.java rename pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/{generator => writer}/AvroWriter.java (59%) create mode 100644 pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/AvroWriterSpec.java create mode 100644 pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/CsvWriter.java create mode 100644 pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/FileWriter.java create mode 100644 pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/FileWriterSpec.java create mode 100644 pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/JsonWriter.java create mode 100644 pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/Writer.java create mode 100644 pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/WriterSpec.java diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/DataGenerationHelpers.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/DataGenerationHelpers.java new file mode 100644 index 000000000000..b4017abfa81f --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/DataGenerationHelpers.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pinot.controller.recommender.data; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.math.IntRange; +import org.apache.pinot.controller.recommender.data.generator.DataGenerator; +import org.apache.pinot.controller.recommender.data.generator.DataGeneratorSpec; +import org.apache.pinot.controller.recommender.data.writer.AvroWriter; +import org.apache.pinot.controller.recommender.data.writer.AvroWriterSpec; +import org.apache.pinot.controller.recommender.data.writer.CsvWriter; +import org.apache.pinot.controller.recommender.data.writer.FileWriterSpec; +import org.apache.pinot.controller.recommender.data.writer.JsonWriter; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.TimeFieldSpec; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class DataGenerationHelpers { + + private DataGenerationHelpers() { + } + + private static final Logger LOGGER = LoggerFactory.getLogger(DataGenerationHelpers.class); + + public static void generateAvro(DataGenerator generator, long totalDocs, int numFiles, String outDir, + boolean isOverrideOutDir) throws Exception { + AvroWriter avroWriter = new AvroWriter(); + avroWriter.init(new AvroWriterSpec(generator, handleOutDir(outDir, isOverrideOutDir), totalDocs, numFiles)); + avroWriter.write(); + } + + public static void generateCsv(DataGenerator generator, long totalDocs, int numFiles, String outDir, + boolean isOverrideOutDir) throws Exception { + CsvWriter csvWriter = new CsvWriter(); + csvWriter.init(new FileWriterSpec(generator, handleOutDir(outDir, isOverrideOutDir), totalDocs, numFiles)); + csvWriter.write(); + } + + public static void generateJson(DataGenerator generator, long totalDocs, int numFiles, String outDir, + boolean isOverrideOutDir) throws Exception { + JsonWriter jsonWriter = new JsonWriter(); + jsonWriter.init(new FileWriterSpec(generator, handleOutDir(outDir, isOverrideOutDir), totalDocs, numFiles)); + jsonWriter.write(); + } + + private static File handleOutDir(String outDir, boolean isOverrideOutDir) + throws IOException { + File dir = new File(outDir); + if (dir.exists() && !isOverrideOutDir) { + LOGGER.error("output directory already exists, and override is set to false"); + throw new RuntimeException("output directory exists"); + } + if (dir.exists()) { + FileUtils.deleteDirectory(dir); + } + dir.mkdir(); + return dir; + } + + public static DataGeneratorSpec buildDataGeneratorSpec(Schema schema, List columns, + HashMap dataTypes, HashMap fieldTypes, + HashMap timeUnits, HashMap cardinality, HashMap range, + HashMap> pattern, Map mvCountMap, Map lengthMap) { + for (final FieldSpec fs : schema.getAllFieldSpecs()) { + String col = fs.getName(); + columns.add(col); + dataTypes.put(col, fs.getDataType()); + fieldTypes.put(col, fs.getFieldType()); + + switch (fs.getFieldType()) { + case DIMENSION: + cardinality.putIfAbsent(col, 1000); + break; + case METRIC: + range.putIfAbsent(col, new IntRange(1, 1000)); + break; + case TIME: + range.putIfAbsent(col, new IntRange(1, 1000)); + TimeFieldSpec tfs = (TimeFieldSpec) fs; + timeUnits.put(col, tfs.getIncomingGranularitySpec().getTimeType()); + break; + + // forward compatibility with pattern generator + case DATE_TIME: + case COMPLEX: + break; + default: + throw new RuntimeException("Invalid field type."); + } + } + return new DataGeneratorSpec(columns, cardinality, range, pattern, mvCountMap, lengthMap, dataTypes, fieldTypes, + timeUnits); + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/generator/DataGenerator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/generator/DataGenerator.java index 8069813a0295..4c818846ede2 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/generator/DataGenerator.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/generator/DataGenerator.java @@ -18,19 +18,16 @@ */ package org.apache.pinot.controller.recommender.data.generator; -import com.fasterxml.jackson.databind.ObjectMapper; -import java.io.File; -import java.io.FileWriter; import java.io.IOException; import java.nio.file.Paths; import java.util.ArrayList; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; -import org.apache.commons.io.FileUtils; -import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.math.IntRange; +import org.apache.pinot.controller.recommender.data.DataGenerationHelpers; import org.apache.pinot.spi.data.DimensionFieldSpec; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.FieldSpec.DataType; @@ -39,7 +36,6 @@ import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.TimeFieldSpec; import org.apache.pinot.spi.data.TimeGranularitySpec; -import org.apache.pinot.spi.data.readers.FileFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,7 +46,6 @@ // TODO: add DATE_TIME to the data generator public class DataGenerator { private static final Logger LOGGER = LoggerFactory.getLogger(DataGenerator.class); - private File _outDir; DataGeneratorSpec _genSpec; @@ -63,17 +58,6 @@ public DataGenerator() { public void init(DataGeneratorSpec spec) throws IOException { _genSpec = spec; - _outDir = new File(_genSpec.getOutputDir()); - if (_outDir.exists() && !_genSpec.isOverrideOutDir()) { - LOGGER.error("output directory already exists, and override is set to false"); - throw new RuntimeException("output directory exists"); - } - - if (_outDir.exists()) { - FileUtils.deleteDirectory(_outDir); - } - - _outDir.mkdir(); for (final String column : _genSpec.getColumns()) { DataType dataType = _genSpec.getDataTypeMap().get(column); @@ -99,59 +83,17 @@ public void init(DataGeneratorSpec spec) } } - public void generateAvro(long totalDocs, int numFiles) - throws IOException { - final int numPerFiles = (int) (totalDocs / numFiles); - for (int i = 0; i < numFiles; i++) { - try (AvroWriter writer = new AvroWriter(_outDir, i, _generators, fetchSchema())) { - for (int j = 0; j < numPerFiles; j++) { - writer.writeNext(); - } - } - } - } - - public void generateCsv(long totalDocs, int numFiles) - throws IOException { - final int numPerFiles = (int) (totalDocs / numFiles); - for (int i = 0; i < numFiles; i++) { - try (FileWriter writer = new FileWriter(new File(_outDir, String.format("output_%d.csv", i)))) { - writer.append(StringUtils.join(_genSpec.getColumns(), ",")).append('\n'); - for (int j = 0; j < numPerFiles; j++) { - Object[] values = new Object[_genSpec.getColumns().size()]; - for (int k = 0; k < _genSpec.getColumns().size(); k++) { - Object next = _generators.get(_genSpec.getColumns().get(k)).next(); - values[k] = serializeIfMultiValue(next); - } - writer.append(StringUtils.join(values, ",")).append('\n'); - } - } - } - } - - public void generateJson(long totalDocs, int numFiles) - throws IOException { - final int numPerFiles = (int) (totalDocs / numFiles); - final ObjectMapper mapper = new ObjectMapper(); - for (int i = 0; i < numFiles; i++) { - try (FileWriter writer = new FileWriter(new File(_outDir, String.format("output_%d.json", i)))) { - for (int j = 0; j < numPerFiles; j++) { - Map row = new HashMap<>(); - for (int k = 0; k < _genSpec.getColumns().size(); k++) { - String key = _genSpec.getColumns().get(k); - row.put(key, _generators.get(key).next()); - } - writer.append(mapper.writeValueAsString(row)).append('\n'); - } - } - } - } - - private Object serializeIfMultiValue(Object obj) { - if (obj instanceof List) { - return StringUtils.join((List) obj, ";"); + /* + * Returns a LinkedHashMap of columns and their respective generated values. + * This ensures that the entries are ordered as per the column list + * + * */ + public Map nextRow() { + Map row = new LinkedHashMap<>(); + for (String key : _genSpec.getColumns()) { + row.put(key, _generators.get(key).next()); } - return obj; + return row; } public Schema fetchSchema() { @@ -193,7 +135,7 @@ private FieldSpec buildSpec(DataGeneratorSpec genSpec, String column) { } public static void main(String[] args) - throws IOException { + throws Exception { final Map dataTypes = new HashMap<>(); final Map fieldTypes = new HashMap<>(); @@ -257,11 +199,11 @@ public static void main(String[] args) String outputDir = Paths.get(System.getProperty("java.io.tmpdir"), "csv-data").toString(); final DataGeneratorSpec spec = new DataGeneratorSpec(columnNames, cardinality, range, template, mvCountMap, lengthMap, dataTypes, fieldTypes, - timeUnits, FileFormat.CSV, outputDir, true); + timeUnits); final DataGenerator gen = new DataGenerator(); gen.init(spec); - gen.generateCsv(100, 1); + DataGenerationHelpers.generateCsv(gen, 100, 1, outputDir, true); System.out.println("CSV data is generated under: " + outputDir); } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/generator/DataGeneratorSpec.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/generator/DataGeneratorSpec.java index d89ea6696897..f64a7a984bd8 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/generator/DataGeneratorSpec.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/generator/DataGeneratorSpec.java @@ -42,15 +42,20 @@ public class DataGeneratorSpec { private final Map _fieldTypeMap; private final Map _timeUnitMap; - private final FileFormat _outputFileFormat; - private final String _outputDir; - private final boolean _overrideOutDir; - + @Deprecated + private FileFormat _outputFileFormat; + @Deprecated + private String _outputDir; + @Deprecated + private boolean _overrideOutDir; + + @Deprecated public DataGeneratorSpec() { this(new ArrayList(), new HashMap<>(), new HashMap<>(), new HashMap<>(), new HashMap<>(), new HashMap<>(), new HashMap<>(), new HashMap<>(), new HashMap<>(), FileFormat.AVRO, "/tmp/dataGen", true); } + @Deprecated public DataGeneratorSpec(List columns, Map cardinalityMap, Map rangeMap, Map> patternMap, Map mvCountMap, Map lengthMap, Map dataTypesMap, Map fieldTypesMap, Map timeUnitMap, @@ -71,6 +76,21 @@ public DataGeneratorSpec(List columns, Map cardinalityM _timeUnitMap = timeUnitMap; } + public DataGeneratorSpec(List columns, Map cardinalityMap, Map rangeMap, + Map> patternMap, Map mvCountMap, Map lengthMap, + Map dataTypesMap, Map fieldTypesMap, Map timeUnitMap) { + _columns = columns; + _cardinalityMap = cardinalityMap; + _rangeMap = rangeMap; + _patternMap = patternMap; + _mvCountMap = mvCountMap; + _lengthMap = lengthMap; + + _dataTypeMap = dataTypesMap; + _fieldTypeMap = fieldTypesMap; + _timeUnitMap = timeUnitMap; + } + public Map getDataTypeMap() { return _dataTypeMap; } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/generator/AvroWriter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/AvroWriter.java similarity index 59% rename from pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/generator/AvroWriter.java rename to pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/AvroWriter.java index 92bcf3be4dd8..1295b8991dcc 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/generator/AvroWriter.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/AvroWriter.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.controller.recommender.data.generator; +package org.apache.pinot.controller.recommender.data.writer; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; @@ -25,6 +25,7 @@ import java.io.File; import java.io.IOException; import java.util.Map; +import java.util.Objects; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumWriter; @@ -32,20 +33,13 @@ import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.utils.JsonUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -public class AvroWriter implements Closeable { - private final Map _generatorMap; - private final org.apache.avro.Schema _avroSchema; - private final DataFileWriter _recordWriter; - - public AvroWriter(File baseDir, int index, Map generatorMap, Schema schema) - throws IOException { - _generatorMap = generatorMap; - _avroSchema = getAvroSchema(schema); - _recordWriter = new DataFileWriter<>(new GenericDatumWriter(_avroSchema)); - _recordWriter.create(_avroSchema, new File(baseDir, "part-" + index + ".avro")); - } +public class AvroWriter implements Writer { + private static final Logger LOGGER = LoggerFactory.getLogger(AvroWriter.class); + private AvroWriterSpec _spec; public static org.apache.avro.Schema getAvroSchema(Schema schema) { ObjectNode avroSchema = JsonUtils.newObjectNode(); @@ -62,12 +56,56 @@ public static org.apache.avro.Schema getAvroSchema(Schema schema) { return new org.apache.avro.Schema.Parser().parse(avroSchema.toString()); } - public void writeNext() + @Override + public void init(WriterSpec spec) { + _spec = (AvroWriterSpec) spec; + } + + @Override + public void write() throws IOException { - GenericData.Record nextRecord = new GenericData.Record(_avroSchema); - for (String column : _generatorMap.keySet()) { - nextRecord.put(column, _generatorMap.get(column).next()); + final int numPerFiles = (int) (_spec.getTotalDocs() / _spec.getNumFiles()); + for (int i = 0; i < _spec.getNumFiles(); i++) { + try (AvroRecordAppender appender = new AvroRecordAppender( + new File(_spec.getBaseDir(), "part-" + i + ".avro"), getAvroSchema(_spec.getSchema()))) { + for (int j = 0; j < numPerFiles; j++) { + appender.append(_spec.getGenerator().nextRow()); + } + } + } + } + + @Override + public void cleanup() { + File baseDir = new File(_spec.getBaseDir().toURI()); + for (File file : Objects.requireNonNull(baseDir.listFiles())) { + if (!file.delete()) { + LOGGER.error("Unable to delete file {}", file.getAbsolutePath()); + } + } + if (!baseDir.delete()) { + LOGGER.error("Unable to delete directory {}", baseDir.getAbsolutePath()); } + } +} + +class AvroRecordAppender implements Closeable { + private final DataFileWriter _recordWriter; + private final org.apache.avro.Schema _avroSchema; + + public AvroRecordAppender(File file, org.apache.avro.Schema avroSchema) + throws IOException { + _avroSchema = avroSchema; + _recordWriter = new DataFileWriter<>(new GenericDatumWriter<>(_avroSchema)); + _recordWriter.create(_avroSchema, file); + } + + public void append(Map record) + throws IOException { + GenericData.Record nextRecord = new GenericData.Record(_avroSchema); + record.forEach((column, value) -> { + nextRecord.put(column, record.get(column)); + }); _recordWriter.append(nextRecord); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/AvroWriterSpec.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/AvroWriterSpec.java new file mode 100644 index 000000000000..a9ce2711308d --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/AvroWriterSpec.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.recommender.data.writer; + +import java.io.File; +import org.apache.pinot.controller.recommender.data.generator.DataGenerator; +import org.apache.pinot.spi.data.Schema; + + +public class AvroWriterSpec extends WriterSpec { + + private final File _baseDir; + private final long _totalDocs; + private final int _numFiles; + private final Schema _schema; + + public AvroWriterSpec(DataGenerator generator, File baseDir, long totalDocs, int numFiles) { + super(generator); + _baseDir = baseDir; + _totalDocs = totalDocs; + _numFiles = numFiles; + _schema = generator.fetchSchema(); + } + + public Schema getSchema() { + return _schema; + } + + public long getTotalDocs() { + return _totalDocs; + } + + public int getNumFiles() { + return _numFiles; + } + + public File getBaseDir() { + return _baseDir; + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/CsvWriter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/CsvWriter.java new file mode 100644 index 000000000000..88547cc75721 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/CsvWriter.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.recommender.data.writer; + +import java.util.List; +import java.util.Map; +import org.apache.commons.lang.StringUtils; +import org.apache.pinot.controller.recommender.data.generator.DataGenerator; + + +public class CsvWriter extends FileWriter { + @Override + protected String generateRow(DataGenerator generator) { + Map row = generator.nextRow(); + int colCount = row.size(); + Object[] values = new Object[colCount]; + int index = 0; + for (String key : row.keySet()) { + values[index] = serializeIfMultiValue(row.get(key)); + index++; + } + return StringUtils.join(values, ","); + } + + private Object serializeIfMultiValue(Object obj) { + if (obj instanceof List) { + return StringUtils.join((List) obj, ";"); + } + return obj; + } + + @Override + protected String getExtension() { + return "csv"; + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/FileWriter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/FileWriter.java new file mode 100644 index 000000000000..b74c2ebbdebd --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/FileWriter.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.recommender.data.writer; + +import java.io.File; +import java.util.Objects; +import org.apache.commons.lang.StringUtils; +import org.apache.pinot.controller.recommender.data.generator.DataGenerator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public abstract class FileWriter implements Writer { + private static final Logger LOGGER = LoggerFactory.getLogger(FileWriter.class); + + private FileWriterSpec _spec; + @Override + public void init(WriterSpec spec) { + _spec = (FileWriterSpec) spec; + } + + @Override + public void write() + throws Exception { + final int numPerFiles = (int) (_spec.getTotalDocs() / _spec.getNumFiles()); + final String headers = StringUtils.join(_spec.getGenerator().nextRow().keySet(), ","); + final String extension = getExtension() == null ? "" : String.format(".%s", getExtension()); + for (int i = 0; i < _spec.getNumFiles(); i++) { + try (java.io.FileWriter writer = + new java.io.FileWriter(new File(_spec.getBaseDir(), String.format("output_%d%s", i, extension)))) { + writer.append(headers).append('\n'); + for (int j = 0; j < numPerFiles; j++) { + String appendString = generateRow(_spec.getGenerator()); + writer.append(appendString).append('\n'); + } + } + } + } + + protected String getExtension() { + return null; + } + + @Override + public void cleanup() { + File baseDir = new File(_spec.getBaseDir().toURI()); + for (File file : Objects.requireNonNull(baseDir.listFiles())) { + if (!file.delete()) { + LOGGER.error("Unable to delete file {}", file.getAbsolutePath()); + } + } + if (!baseDir.delete()) { + LOGGER.error("Unable to delete directory {}", baseDir.getAbsolutePath()); + } + } + + protected abstract String generateRow(DataGenerator generator); +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/FileWriterSpec.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/FileWriterSpec.java new file mode 100644 index 000000000000..a7ef8cd49655 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/FileWriterSpec.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.recommender.data.writer; + +import java.io.File; +import org.apache.pinot.controller.recommender.data.generator.DataGenerator; + + +public class FileWriterSpec extends WriterSpec { + private final File _baseDir; + private final long _totalDocs; + private final int _numFiles; + public FileWriterSpec(DataGenerator generator, File baseDir, long totalDocs, int numFiles) { + super(generator); + _baseDir = baseDir; + _totalDocs = totalDocs; + _numFiles = numFiles; + } + + public File getBaseDir() { + return _baseDir; + } + + public long getTotalDocs() { + return _totalDocs; + } + + public int getNumFiles() { + return _numFiles; + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/JsonWriter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/JsonWriter.java new file mode 100644 index 000000000000..f867df9f2953 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/JsonWriter.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.recommender.data.writer; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.Map; +import org.apache.pinot.controller.recommender.data.generator.DataGenerator; + + +public class JsonWriter extends FileWriter { + @Override + protected String generateRow(DataGenerator generator) { + Map row = generator.nextRow(); + final ObjectMapper mapper = new ObjectMapper(); + try { + return mapper.writeValueAsString(row); + } catch (JsonProcessingException e) { + throw new RuntimeException("Issue while processing the json entry.", e); + } + } + + @Override + protected String getExtension() { + return "json"; + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/Writer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/Writer.java new file mode 100644 index 000000000000..b64aef01ae28 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/Writer.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.recommender.data.writer; + +import org.apache.pinot.controller.recommender.data.generator.DataGenerator; + + +/** + * Interface to write data to a datasource. + * Implementations of this interface should use {@link DataGenerator} to generate new rows + * and write those to the respective datasource. + */ +public interface Writer { + + /** + * Initialise the Writer + * @param spec {@link WriterSpec} object which contains {@link DataGenerator} object + * used to generate rows to write to the data source + */ + void init(WriterSpec spec); + + /** + * Writes the generated rows to the specified datasource + * @throws Exception + */ + void write() + throws Exception; + + /** + * Cleanup the data written to the datasource + */ + void cleanup(); +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/WriterSpec.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/WriterSpec.java new file mode 100644 index 000000000000..0a82601fa7ee --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/data/writer/WriterSpec.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.recommender.data.writer; + +import org.apache.pinot.controller.recommender.data.generator.DataGenerator; + + +public class WriterSpec { + private final DataGenerator _generator; + + public WriterSpec(DataGenerator generator) { + _generator = generator; + } + + public DataGenerator getGenerator() { + return _generator; + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/realtime/provisioning/MemoryEstimator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/realtime/provisioning/MemoryEstimator.java index 9873d17ad816..3c19db74b61b 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/realtime/provisioning/MemoryEstimator.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/realtime/provisioning/MemoryEstimator.java @@ -31,6 +31,7 @@ import java.util.stream.Stream; import org.apache.commons.io.FileUtils; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.controller.recommender.data.DataGenerationHelpers; import org.apache.pinot.controller.recommender.data.generator.DataGenerator; import org.apache.pinot.controller.recommender.data.generator.DataGeneratorSpec; import org.apache.pinot.controller.recommender.io.metadata.DateTimeFieldSpecMetadata; @@ -527,11 +528,11 @@ private File generateData() { String outputDir = new File(_workingDir, "csv").getAbsolutePath(); DataGeneratorSpec spec = new DataGeneratorSpec(colNames, cardinalities, new HashMap<>(), new HashMap<>(), mvCounts, lengths, dataTypes, - fieldTypes, timeUnits, FileFormat.CSV, outputDir, true); + fieldTypes, timeUnits); DataGenerator dataGenerator = new DataGenerator(); try { dataGenerator.init(spec); - dataGenerator.generateCsv(_numberOfRows, 1); + DataGenerationHelpers.generateCsv(dataGenerator, _numberOfRows, 1, outputDir, true); File outputFile = Paths.get(outputDir, "output_0.csv").toFile(); LOGGER.info("Successfully generated data file: {}", outputFile); return outputFile; diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/GenerateDataCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/GenerateDataCommand.java index faded1dd49b3..9a98f652fb8e 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/GenerateDataCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/GenerateDataCommand.java @@ -26,17 +26,15 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.commons.lang.math.IntRange; +import org.apache.pinot.controller.recommender.data.DataGenerationHelpers; import org.apache.pinot.controller.recommender.data.generator.DataGenerator; import org.apache.pinot.controller.recommender.data.generator.DataGeneratorSpec; import org.apache.pinot.controller.recommender.data.generator.SchemaAnnotation; -import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.FieldSpec.DataType; import org.apache.pinot.spi.data.FieldSpec.FieldType; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.Schema.SchemaBuilder; -import org.apache.pinot.spi.data.TimeFieldSpec; import org.apache.pinot.spi.data.TimeGranularitySpec; -import org.apache.pinot.spi.data.readers.FileFormat; import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.tools.Command; import org.slf4j.Logger; @@ -141,18 +139,18 @@ public boolean execute() buildCardinalityRangeMaps(_schemaAnnFile, cardinality, range, pattern); final DataGeneratorSpec spec = - buildDataGeneratorSpec(schema, columns, dataTypes, fieldTypes, timeUnits, cardinality, range, pattern, - mvCountMap, lengthMap); + DataGenerationHelpers.buildDataGeneratorSpec(schema, columns, dataTypes, fieldTypes, timeUnits, cardinality, + range, pattern, mvCountMap, lengthMap); final DataGenerator gen = new DataGenerator(); gen.init(spec); if (FORMAT_AVRO.equalsIgnoreCase(_format)) { - gen.generateAvro(_numRecords, _numFiles); + DataGenerationHelpers.generateAvro(gen, _numRecords, _numFiles, _outDir, _overwrite); } else if (FORMAT_CSV.equalsIgnoreCase(_format)) { - gen.generateCsv(_numRecords, _numFiles); + DataGenerationHelpers.generateCsv(gen, _numRecords, _numFiles, _outDir, _overwrite); } else if (FORMAT_JSON.equalsIgnoreCase(_format)) { - gen.generateJson(_numRecords, _numFiles); + DataGenerationHelpers.generateJson(gen, _numRecords, _numFiles, _outDir, _overwrite); } else { throw new IllegalArgumentException(String.format("Invalid output format '%s'", _format)); } @@ -182,52 +180,6 @@ private void buildCardinalityRangeMaps(String file, HashMap car } } - private DataGeneratorSpec buildDataGeneratorSpec(Schema schema, List columns, - HashMap dataTypes, HashMap fieldTypes, HashMap timeUnits, - HashMap cardinality, HashMap range, - HashMap> pattern, Map mvCountMap, Map lengthMap) { - for (final FieldSpec fs : schema.getAllFieldSpecs()) { - String col = fs.getName(); - - columns.add(col); - dataTypes.put(col, fs.getDataType()); - fieldTypes.put(col, fs.getFieldType()); - - switch (fs.getFieldType()) { - case DIMENSION: - if (cardinality.get(col) == null) { - cardinality.put(col, 1000); - } - break; - - case METRIC: - if (!range.containsKey(col)) { - range.put(col, new IntRange(1, 1000)); - } - break; - - case TIME: - if (!range.containsKey(col)) { - range.put(col, new IntRange(1, 1000)); - } - TimeFieldSpec tfs = (TimeFieldSpec) fs; - timeUnits.put(col, tfs.getIncomingGranularitySpec().getTimeType()); - break; - - // forward compatibility with pattern generator - case DATE_TIME: - case COMPLEX: - break; - - default: - throw new RuntimeException("Invalid field type."); - } - } - - return new DataGeneratorSpec(columns, cardinality, range, pattern, mvCountMap, lengthMap, dataTypes, fieldTypes, - timeUnits, FileFormat.AVRO, _outDir, _overwrite); - } - public static void main(String[] args) throws IOException { SchemaBuilder schemaBuilder = new SchemaBuilder();