Skip to content

Commit

Permalink
Data generator reorganisation (apache#12122)
Browse files Browse the repository at this point in the history
  • Loading branch information
shounakmk219 authored Dec 26, 2023
1 parent 344eba1 commit a200315
Show file tree
Hide file tree
Showing 13 changed files with 577 additions and 150 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pinot.controller.recommender.data;

import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.math.IntRange;
import org.apache.pinot.controller.recommender.data.generator.DataGenerator;
import org.apache.pinot.controller.recommender.data.generator.DataGeneratorSpec;
import org.apache.pinot.controller.recommender.data.writer.AvroWriter;
import org.apache.pinot.controller.recommender.data.writer.AvroWriterSpec;
import org.apache.pinot.controller.recommender.data.writer.CsvWriter;
import org.apache.pinot.controller.recommender.data.writer.FileWriterSpec;
import org.apache.pinot.controller.recommender.data.writer.JsonWriter;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.TimeFieldSpec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class DataGenerationHelpers {

private DataGenerationHelpers() {
}

private static final Logger LOGGER = LoggerFactory.getLogger(DataGenerationHelpers.class);

public static void generateAvro(DataGenerator generator, long totalDocs, int numFiles, String outDir,
boolean isOverrideOutDir) throws Exception {
AvroWriter avroWriter = new AvroWriter();
avroWriter.init(new AvroWriterSpec(generator, handleOutDir(outDir, isOverrideOutDir), totalDocs, numFiles));
avroWriter.write();
}

public static void generateCsv(DataGenerator generator, long totalDocs, int numFiles, String outDir,
boolean isOverrideOutDir) throws Exception {
CsvWriter csvWriter = new CsvWriter();
csvWriter.init(new FileWriterSpec(generator, handleOutDir(outDir, isOverrideOutDir), totalDocs, numFiles));
csvWriter.write();
}

public static void generateJson(DataGenerator generator, long totalDocs, int numFiles, String outDir,
boolean isOverrideOutDir) throws Exception {
JsonWriter jsonWriter = new JsonWriter();
jsonWriter.init(new FileWriterSpec(generator, handleOutDir(outDir, isOverrideOutDir), totalDocs, numFiles));
jsonWriter.write();
}

private static File handleOutDir(String outDir, boolean isOverrideOutDir)
throws IOException {
File dir = new File(outDir);
if (dir.exists() && !isOverrideOutDir) {
LOGGER.error("output directory already exists, and override is set to false");
throw new RuntimeException("output directory exists");
}
if (dir.exists()) {
FileUtils.deleteDirectory(dir);
}
dir.mkdir();
return dir;
}

public static DataGeneratorSpec buildDataGeneratorSpec(Schema schema, List<String> columns,
HashMap<String, FieldSpec.DataType> dataTypes, HashMap<String, FieldSpec.FieldType> fieldTypes,
HashMap<String, TimeUnit> timeUnits, HashMap<String, Integer> cardinality, HashMap<String, IntRange> range,
HashMap<String, Map<String, Object>> pattern, Map<String, Double> mvCountMap, Map<String, Integer> lengthMap) {
for (final FieldSpec fs : schema.getAllFieldSpecs()) {
String col = fs.getName();
columns.add(col);
dataTypes.put(col, fs.getDataType());
fieldTypes.put(col, fs.getFieldType());

switch (fs.getFieldType()) {
case DIMENSION:
cardinality.putIfAbsent(col, 1000);
break;
case METRIC:
range.putIfAbsent(col, new IntRange(1, 1000));
break;
case TIME:
range.putIfAbsent(col, new IntRange(1, 1000));
TimeFieldSpec tfs = (TimeFieldSpec) fs;
timeUnits.put(col, tfs.getIncomingGranularitySpec().getTimeType());
break;

// forward compatibility with pattern generator
case DATE_TIME:
case COMPLEX:
break;
default:
throw new RuntimeException("Invalid field type.");
}
}
return new DataGeneratorSpec(columns, cardinality, range, pattern, mvCountMap, lengthMap, dataTypes, fieldTypes,
timeUnits);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,16 @@
*/
package org.apache.pinot.controller.recommender.data.generator;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.math.IntRange;
import org.apache.pinot.controller.recommender.data.DataGenerationHelpers;
import org.apache.pinot.spi.data.DimensionFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.FieldSpec.DataType;
Expand All @@ -39,7 +36,6 @@
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.TimeFieldSpec;
import org.apache.pinot.spi.data.TimeGranularitySpec;
import org.apache.pinot.spi.data.readers.FileFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -50,7 +46,6 @@
// TODO: add DATE_TIME to the data generator
public class DataGenerator {
private static final Logger LOGGER = LoggerFactory.getLogger(DataGenerator.class);
private File _outDir;

DataGeneratorSpec _genSpec;

Expand All @@ -63,17 +58,6 @@ public DataGenerator() {
public void init(DataGeneratorSpec spec)
throws IOException {
_genSpec = spec;
_outDir = new File(_genSpec.getOutputDir());
if (_outDir.exists() && !_genSpec.isOverrideOutDir()) {
LOGGER.error("output directory already exists, and override is set to false");
throw new RuntimeException("output directory exists");
}

if (_outDir.exists()) {
FileUtils.deleteDirectory(_outDir);
}

_outDir.mkdir();

for (final String column : _genSpec.getColumns()) {
DataType dataType = _genSpec.getDataTypeMap().get(column);
Expand All @@ -99,59 +83,17 @@ public void init(DataGeneratorSpec spec)
}
}

public void generateAvro(long totalDocs, int numFiles)
throws IOException {
final int numPerFiles = (int) (totalDocs / numFiles);
for (int i = 0; i < numFiles; i++) {
try (AvroWriter writer = new AvroWriter(_outDir, i, _generators, fetchSchema())) {
for (int j = 0; j < numPerFiles; j++) {
writer.writeNext();
}
}
}
}

public void generateCsv(long totalDocs, int numFiles)
throws IOException {
final int numPerFiles = (int) (totalDocs / numFiles);
for (int i = 0; i < numFiles; i++) {
try (FileWriter writer = new FileWriter(new File(_outDir, String.format("output_%d.csv", i)))) {
writer.append(StringUtils.join(_genSpec.getColumns(), ",")).append('\n');
for (int j = 0; j < numPerFiles; j++) {
Object[] values = new Object[_genSpec.getColumns().size()];
for (int k = 0; k < _genSpec.getColumns().size(); k++) {
Object next = _generators.get(_genSpec.getColumns().get(k)).next();
values[k] = serializeIfMultiValue(next);
}
writer.append(StringUtils.join(values, ",")).append('\n');
}
}
}
}

public void generateJson(long totalDocs, int numFiles)
throws IOException {
final int numPerFiles = (int) (totalDocs / numFiles);
final ObjectMapper mapper = new ObjectMapper();
for (int i = 0; i < numFiles; i++) {
try (FileWriter writer = new FileWriter(new File(_outDir, String.format("output_%d.json", i)))) {
for (int j = 0; j < numPerFiles; j++) {
Map<String, Object> row = new HashMap<>();
for (int k = 0; k < _genSpec.getColumns().size(); k++) {
String key = _genSpec.getColumns().get(k);
row.put(key, _generators.get(key).next());
}
writer.append(mapper.writeValueAsString(row)).append('\n');
}
}
}
}

private Object serializeIfMultiValue(Object obj) {
if (obj instanceof List) {
return StringUtils.join((List) obj, ";");
/*
* Returns a LinkedHashMap of columns and their respective generated values.
* This ensures that the entries are ordered as per the column list
*
* */
public Map<String, Object> nextRow() {
Map<String, Object> row = new LinkedHashMap<>();
for (String key : _genSpec.getColumns()) {
row.put(key, _generators.get(key).next());
}
return obj;
return row;
}

public Schema fetchSchema() {
Expand Down Expand Up @@ -193,7 +135,7 @@ private FieldSpec buildSpec(DataGeneratorSpec genSpec, String column) {
}

public static void main(String[] args)
throws IOException {
throws Exception {

final Map<String, DataType> dataTypes = new HashMap<>();
final Map<String, FieldType> fieldTypes = new HashMap<>();
Expand Down Expand Up @@ -257,11 +199,11 @@ public static void main(String[] args)
String outputDir = Paths.get(System.getProperty("java.io.tmpdir"), "csv-data").toString();
final DataGeneratorSpec spec =
new DataGeneratorSpec(columnNames, cardinality, range, template, mvCountMap, lengthMap, dataTypes, fieldTypes,
timeUnits, FileFormat.CSV, outputDir, true);
timeUnits);

final DataGenerator gen = new DataGenerator();
gen.init(spec);
gen.generateCsv(100, 1);
DataGenerationHelpers.generateCsv(gen, 100, 1, outputDir, true);
System.out.println("CSV data is generated under: " + outputDir);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,20 @@ public class DataGeneratorSpec {
private final Map<String, FieldType> _fieldTypeMap;
private final Map<String, TimeUnit> _timeUnitMap;

private final FileFormat _outputFileFormat;
private final String _outputDir;
private final boolean _overrideOutDir;

@Deprecated
private FileFormat _outputFileFormat;
@Deprecated
private String _outputDir;
@Deprecated
private boolean _overrideOutDir;

@Deprecated
public DataGeneratorSpec() {
this(new ArrayList<String>(), new HashMap<>(), new HashMap<>(), new HashMap<>(), new HashMap<>(), new HashMap<>(),
new HashMap<>(), new HashMap<>(), new HashMap<>(), FileFormat.AVRO, "/tmp/dataGen", true);
}

@Deprecated
public DataGeneratorSpec(List<String> columns, Map<String, Integer> cardinalityMap, Map<String, IntRange> rangeMap,
Map<String, Map<String, Object>> patternMap, Map<String, Double> mvCountMap, Map<String, Integer> lengthMap,
Map<String, DataType> dataTypesMap, Map<String, FieldType> fieldTypesMap, Map<String, TimeUnit> timeUnitMap,
Expand All @@ -71,6 +76,21 @@ public DataGeneratorSpec(List<String> columns, Map<String, Integer> cardinalityM
_timeUnitMap = timeUnitMap;
}

public DataGeneratorSpec(List<String> columns, Map<String, Integer> cardinalityMap, Map<String, IntRange> rangeMap,
Map<String, Map<String, Object>> patternMap, Map<String, Double> mvCountMap, Map<String, Integer> lengthMap,
Map<String, DataType> dataTypesMap, Map<String, FieldType> fieldTypesMap, Map<String, TimeUnit> timeUnitMap) {
_columns = columns;
_cardinalityMap = cardinalityMap;
_rangeMap = rangeMap;
_patternMap = patternMap;
_mvCountMap = mvCountMap;
_lengthMap = lengthMap;

_dataTypeMap = dataTypesMap;
_fieldTypeMap = fieldTypesMap;
_timeUnitMap = timeUnitMap;
}

public Map<String, DataType> getDataTypeMap() {
return _dataTypeMap;
}
Expand Down
Loading

0 comments on commit a200315

Please sign in to comment.