Skip to content

Commit

Permalink
Create DateTimeGenerator and add it to data generator (apache#12206)
Browse files Browse the repository at this point in the history
  • Loading branch information
shounakmk219 authored Dec 28, 2023
1 parent a200315 commit 50912eb
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.math.IntRange;
Expand All @@ -34,6 +34,7 @@
import org.apache.pinot.controller.recommender.data.writer.CsvWriter;
import org.apache.pinot.controller.recommender.data.writer.FileWriterSpec;
import org.apache.pinot.controller.recommender.data.writer.JsonWriter;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.TimeFieldSpec;
Expand Down Expand Up @@ -82,10 +83,16 @@ private static File handleOutDir(String outDir, boolean isOverrideOutDir)
return dir;
}

public static DataGeneratorSpec buildDataGeneratorSpec(Schema schema, List<String> columns,
HashMap<String, FieldSpec.DataType> dataTypes, HashMap<String, FieldSpec.FieldType> fieldTypes,
HashMap<String, TimeUnit> timeUnits, HashMap<String, Integer> cardinality, HashMap<String, IntRange> range,
HashMap<String, Map<String, Object>> pattern, Map<String, Double> mvCountMap, Map<String, Integer> lengthMap) {
public static DataGeneratorSpec buildDataGeneratorSpec(Schema schema) {
final List<String> columns = new LinkedList<>();
final HashMap<String, FieldSpec.DataType> dataTypes = new HashMap<>();
final HashMap<String, FieldSpec.FieldType> fieldTypes = new HashMap<>();
final HashMap<String, TimeUnit> timeUnits = new HashMap<>();

final HashMap<String, Integer> cardinality = new HashMap<>();
final HashMap<String, IntRange> range = new HashMap<>();
final HashMap<String, String> granularityMap = new HashMap<>();
final HashMap<String, String> formatMap = new HashMap<>();
for (final FieldSpec fs : schema.getAllFieldSpecs()) {
String col = fs.getName();
columns.add(col);
Expand All @@ -104,16 +111,28 @@ public static DataGeneratorSpec buildDataGeneratorSpec(Schema schema, List<Strin
TimeFieldSpec tfs = (TimeFieldSpec) fs;
timeUnits.put(col, tfs.getIncomingGranularitySpec().getTimeType());
break;
case DATE_TIME:
DateTimeFieldSpec dtfs = (DateTimeFieldSpec) fs;
granularityMap.put(col, dtfs.getGranularity());
formatMap.put(col, dtfs.getFormat());
break;

// forward compatibility with pattern generator
case DATE_TIME:
case COMPLEX:
break;
default:
throw new RuntimeException("Invalid field type.");
}
}
return new DataGeneratorSpec(columns, cardinality, range, pattern, mvCountMap, lengthMap, dataTypes, fieldTypes,
timeUnits);
return new DataGeneratorSpec.Builder()
.setColumns(columns)
.setDataTypeMap(dataTypes)
.setFieldTypeMap(fieldTypes)
.setTimeUnitMap(timeUnits)
.setCardinalityMap(cardinality)
.setRangeMap(range)
.setDateTimeGranularityMap(granularityMap)
.setDateTimeFormatMap(formatMap)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.math.IntRange;
import org.apache.pinot.controller.recommender.data.DataGenerationHelpers;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.DimensionFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.FieldSpec.DataType;
Expand Down Expand Up @@ -63,7 +64,11 @@ public void init(DataGeneratorSpec spec)
DataType dataType = _genSpec.getDataTypeMap().get(column);

Generator generator;
if (_genSpec.getPatternMap().containsKey(column)) {
if (_genSpec.getDateTimeFormatMap().containsKey(column)
&& _genSpec.getDateTimeGranularityMap().containsKey(column)) {
generator = new DateTimeGenerator(_genSpec.getDateTimeFormatMap().get(column),
_genSpec.getDateTimeGranularityMap().get(column));
} else if (_genSpec.getPatternMap().containsKey(column)) {
generator = GeneratorFactory
.getGeneratorFor(PatternType.valueOf(_genSpec.getPatternMap().get(column).get("type").toString()),
_genSpec.getPatternMap().get(column));
Expand Down Expand Up @@ -123,6 +128,12 @@ private FieldSpec buildSpec(DataGeneratorSpec genSpec, String column) {
spec = new TimeFieldSpec(new TimeGranularitySpec(dataType, genSpec.getTimeUnitMap().get(column), column));
break;

case DATE_TIME:
String format = genSpec.getDateTimeFormatMap().get(column);
String granularity = genSpec.getDateTimeGranularityMap().get(column);
spec = new DateTimeFieldSpec(column, dataType, format, granularity);
break;

default:
throw new RuntimeException("Invalid Field type.");
}
Expand All @@ -148,6 +159,9 @@ public static void main(String[] args)
Map<String, Integer> lengthMap = new HashMap<>();
List<String> columnNames = new ArrayList<>();

final Map<String, String> dateTimeFormatMap = new HashMap<>();
final Map<String, String> dateTimeGranularityMap = new HashMap<>();

int cardinalityValue = 5;
int strLength = 5;

Expand Down Expand Up @@ -199,7 +213,7 @@ public static void main(String[] args)
String outputDir = Paths.get(System.getProperty("java.io.tmpdir"), "csv-data").toString();
final DataGeneratorSpec spec =
new DataGeneratorSpec(columnNames, cardinality, range, template, mvCountMap, lengthMap, dataTypes, fieldTypes,
timeUnits);
timeUnits, dateTimeFormatMap, dateTimeGranularityMap);

final DataGenerator gen = new DataGenerator();
gen.init(spec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public class DataGeneratorSpec {
private final Map<String, DataType> _dataTypeMap;
private final Map<String, FieldType> _fieldTypeMap;
private final Map<String, TimeUnit> _timeUnitMap;
private final Map<String, String> _dateTimeFormatMap;
private final Map<String, String> _dateTimeGranularityMap;

@Deprecated
private FileFormat _outputFileFormat;
Expand Down Expand Up @@ -74,11 +76,15 @@ public DataGeneratorSpec(List<String> columns, Map<String, Integer> cardinalityM
_dataTypeMap = dataTypesMap;
_fieldTypeMap = fieldTypesMap;
_timeUnitMap = timeUnitMap;

_dateTimeFormatMap = new HashMap<>();
_dateTimeGranularityMap = new HashMap<>();
}

public DataGeneratorSpec(List<String> columns, Map<String, Integer> cardinalityMap, Map<String, IntRange> rangeMap,
Map<String, Map<String, Object>> patternMap, Map<String, Double> mvCountMap, Map<String, Integer> lengthMap,
Map<String, DataType> dataTypesMap, Map<String, FieldType> fieldTypesMap, Map<String, TimeUnit> timeUnitMap) {
Map<String, DataType> dataTypesMap, Map<String, FieldType> fieldTypesMap, Map<String, TimeUnit> timeUnitMap,
Map<String, String> dateTimeFormatMap, Map<String, String> dateTimeGranularityMap) {
_columns = columns;
_cardinalityMap = cardinalityMap;
_rangeMap = rangeMap;
Expand All @@ -89,6 +95,8 @@ public DataGeneratorSpec(List<String> columns, Map<String, Integer> cardinalityM
_dataTypeMap = dataTypesMap;
_fieldTypeMap = fieldTypesMap;
_timeUnitMap = timeUnitMap;
_dateTimeGranularityMap = dateTimeGranularityMap;
_dateTimeFormatMap = dateTimeFormatMap;
}

public Map<String, DataType> getDataTypeMap() {
Expand Down Expand Up @@ -139,6 +147,14 @@ public String getOutputDir() {
return _outputDir;
}

public Map<String, String> getDateTimeFormatMap() {
return _dateTimeFormatMap;
}

public Map<String, String> getDateTimeGranularityMap() {
return _dateTimeGranularityMap;
}

@Override
public String toString() {
final StringBuilder builder = new StringBuilder();
Expand All @@ -156,4 +172,78 @@ public String toString() {
builder.append(", output dir : " + _outputDir);
return builder.toString();
}

public static class Builder {
private List<String> _columns = new ArrayList<>();
private Map<String, Integer> _cardinalityMap = new HashMap<>();
private Map<String, IntRange> _rangeMap = new HashMap<>();
private Map<String, Map<String, Object>> _patternMap = new HashMap<>();
private Map<String, Double> _mvCountMap = new HashMap<>();
private Map<String, Integer> _lengthMap = new HashMap<>();
private Map<String, DataType> _dataTypeMap = new HashMap<>();
private Map<String, FieldType> _fieldTypeMap = new HashMap<>();
private Map<String, TimeUnit> _timeUnitMap = new HashMap<>();
private Map<String, String> _dateTimeFormatMap = new HashMap<>();
private Map<String, String> _dateTimeGranularityMap = new HashMap<>();

public DataGeneratorSpec build() {
return new DataGeneratorSpec(_columns, _cardinalityMap, _rangeMap, _patternMap, _mvCountMap, _lengthMap,
_dataTypeMap, _fieldTypeMap, _timeUnitMap, _dateTimeFormatMap, _dateTimeGranularityMap);
}

public Builder setColumns(List<String> columns) {
_columns = columns;
return this;
}

public Builder setCardinalityMap(Map<String, Integer> cardinalityMap) {
_cardinalityMap = cardinalityMap;
return this;
}

public Builder setRangeMap(Map<String, IntRange> rangeMap) {
_rangeMap = rangeMap;
return this;
}

public Builder setPatternMap(Map<String, Map<String, Object>> patternMap) {
_patternMap = patternMap;
return this;
}

public Builder setMvCountMap(Map<String, Double> mvCountMap) {
_mvCountMap = mvCountMap;
return this;
}

public Builder setLengthMap(Map<String, Integer> lengthMap) {
_lengthMap = lengthMap;
return this;
}

public Builder setDataTypeMap(Map<String, DataType> dataTypeMap) {
_dataTypeMap = dataTypeMap;
return this;
}

public Builder setFieldTypeMap(Map<String, FieldType> fieldTypeMap) {
_fieldTypeMap = fieldTypeMap;
return this;
}

public Builder setTimeUnitMap(Map<String, TimeUnit> timeUnitMap) {
_timeUnitMap = timeUnitMap;
return this;
}

public Builder setDateTimeFormatMap(Map<String, String> dateTimeFormatMap) {
_dateTimeFormatMap = dateTimeFormatMap;
return this;
}

public Builder setDateTimeGranularityMap(Map<String, String> dateTimeGranularityMap) {
_dateTimeGranularityMap = dateTimeGranularityMap;
return this;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pinot.controller.recommender.data.generator;

import java.util.Date;
import java.util.Random;
import org.apache.pinot.spi.data.DateTimeFormatSpec;
import org.apache.pinot.spi.data.DateTimeGranularitySpec;


public class DateTimeGenerator implements Generator {

private static final int MULTIPLIER_CARDINALITY = 5;
private final DateTimeFormatSpec _formatSpec;
private final DateTimeGranularitySpec _granularitySpec;
private long _currentValue;
private Random _multiplier = new Random();

public DateTimeGenerator(String format, String granularity) {
_formatSpec = new DateTimeFormatSpec(format);
_granularitySpec = new DateTimeGranularitySpec(granularity);
}

@Override
public void init() {
_currentValue = new Date().getTime();
}

@Override
public Object next() {
_currentValue += _granularitySpec.granularityToMillis() * _multiplier.nextInt(MULTIPLIER_CARDINALITY);
return _formatSpec.fromMillisToFormat(_currentValue);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -526,9 +526,15 @@ private File generateData() {

// generate data
String outputDir = new File(_workingDir, "csv").getAbsolutePath();
DataGeneratorSpec spec =
new DataGeneratorSpec(colNames, cardinalities, new HashMap<>(), new HashMap<>(), mvCounts, lengths, dataTypes,
fieldTypes, timeUnits);
DataGeneratorSpec spec = new DataGeneratorSpec.Builder()
.setColumns(colNames)
.setCardinalityMap(cardinalities)
.setMvCountMap(mvCounts)
.setLengthMap(lengths)
.setDataTypeMap(dataTypes)
.setFieldTypeMap(fieldTypes)
.setTimeUnitMap(timeUnits)
.build();
DataGenerator dataGenerator = new DataGenerator();
try {
dataGenerator.init(spec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,14 @@

import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.math.IntRange;
import org.apache.pinot.controller.recommender.data.DataGenerationHelpers;
import org.apache.pinot.controller.recommender.data.generator.DataGenerator;
import org.apache.pinot.controller.recommender.data.generator.DataGeneratorSpec;
import org.apache.pinot.controller.recommender.data.generator.SchemaAnnotation;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.FieldSpec.FieldType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.Schema.SchemaBuilder;
import org.apache.pinot.spi.data.TimeGranularitySpec;
Expand Down Expand Up @@ -124,23 +120,10 @@ public boolean execute()
}

Schema schema = Schema.fromFile(new File(_schemaFile));

List<String> columns = new LinkedList<>();
final HashMap<String, DataType> dataTypes = new HashMap<>();
final HashMap<String, FieldType> fieldTypes = new HashMap<>();
final HashMap<String, TimeUnit> timeUnits = new HashMap<>();

final HashMap<String, Integer> cardinality = new HashMap<>();
final HashMap<String, IntRange> range = new HashMap<>();
final HashMap<String, Map<String, Object>> pattern = new HashMap<>();
final HashMap<String, Double> mvCountMap = new HashMap<>();
final HashMap<String, Integer> lengthMap = new HashMap<>();

buildCardinalityRangeMaps(_schemaAnnFile, cardinality, range, pattern);

final DataGeneratorSpec spec =
DataGenerationHelpers.buildDataGeneratorSpec(schema, columns, dataTypes, fieldTypes, timeUnits, cardinality,
range, pattern, mvCountMap, lengthMap);
DataGenerationHelpers.buildDataGeneratorSpec(schema);
buildCardinalityRangeMaps(_schemaAnnFile, spec);


final DataGenerator gen = new DataGenerator();
gen.init(spec);
Expand All @@ -158,8 +141,7 @@ public boolean execute()
return true;
}

private void buildCardinalityRangeMaps(String file, HashMap<String, Integer> cardinality,
HashMap<String, IntRange> range, Map<String, Map<String, Object>> pattern)
private void buildCardinalityRangeMaps(String file, DataGeneratorSpec spec)
throws IOException {
if (file == null) {
return; // Nothing to do here.
Expand All @@ -171,11 +153,11 @@ private void buildCardinalityRangeMaps(String file, HashMap<String, Integer> car
String column = sa.getColumn();

if (sa.isRange()) {
range.put(column, new IntRange(sa.getRangeStart(), sa.getRangeEnd()));
spec.getRangeMap().put(column, new IntRange(sa.getRangeStart(), sa.getRangeEnd()));
} else if (sa.getPattern() != null) {
pattern.put(column, sa.getPattern());
spec.getPatternMap().put(column, sa.getPattern());
} else {
cardinality.put(column, sa.getCardinality());
spec.getCardinalityMap().put(column, sa.getCardinality());
}
}
}
Expand Down

0 comments on commit 50912eb

Please sign in to comment.