Skip to content

Commit

Permalink
[core] Clean constants, caseSensitive, loader in Catalog (#4721)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Dec 16, 2024
1 parent 683fa19 commit cdd5bb7
Show file tree
Hide file tree
Showing 79 changed files with 320 additions and 325 deletions.
6 changes: 0 additions & 6 deletions docs/content/maintenance/configurations.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,6 @@ Options for paimon catalog.

{{< generated/catalog_configuration >}}

### FilesystemCatalogOptions

Options for Filesystem catalog.

{{< generated/file_system_catalog_configuration >}}

### HiveCatalogOptions

Options for Hive catalog.
Expand Down
2 changes: 1 addition & 1 deletion docs/content/program-api/flink-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ public class WriteCdcToTable {
Identifier identifier = Identifier.create("my_db", "T");
Options catalogOptions = new Options();
catalogOptions.set("warehouse", "/path/to/warehouse");
Catalog.Loader catalogLoader =
CatalogLoader catalogLoader =
() -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
Table table = catalogLoader.load().getTable(identifier);

Expand Down
12 changes: 6 additions & 6 deletions docs/layouts/shortcodes/generated/catalog_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,6 @@
</tr>
</thead>
<tbody>
<tr>
<td><h5>allow-upper-case</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Boolean</td>
<td>Indicates whether this catalog allow upper case, its default value depends on the implementation of the specific catalog.</td>
</tr>
<tr>
<td><h5>cache-enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
Expand Down Expand Up @@ -74,6 +68,12 @@
<td>Integer</td>
<td>Controls the max number for snapshots per table in the catalog are cached.</td>
</tr>
<tr>
<td><h5>case-sensitive</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Boolean</td>
<td>Indicates whether this catalog is case-sensitive.</td>
</tr>
<tr>
<td><h5>client-pool-size</h5></td>
<td style="word-wrap: break-word;">2</td>
Expand Down
12 changes: 6 additions & 6 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,12 @@
<td>String</td>
<td>Specify the file name prefix of data files.</td>
</tr>
<tr>
<td><h5>data-file.thin-mode</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Enable data file thin mode to avoid duplicate columns storage.</td>
</tr>
<tr>
<td><h5>delete-file.thread-num</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down Expand Up @@ -864,12 +870,6 @@
<td>Integer</td>
<td>Default spill compression zstd level. For higher compression rates, it can be configured to 9, but the read and write speed will significantly decrease.</td>
</tr>
<tr>
<td><h5>data-file.thin-mode</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Enable data file thin mode to avoid duplicate columns storage.</td>
</tr>
<tr>
<td><h5>streaming-read-mode</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ public class ArrowBundleRecords implements BundleRecords {

private final VectorSchemaRoot vectorSchemaRoot;
private final RowType rowType;
private final boolean allowUpperCase;
private final boolean caseSensitive;

public ArrowBundleRecords(
VectorSchemaRoot vectorSchemaRoot, RowType rowType, boolean allowUpperCase) {
VectorSchemaRoot vectorSchemaRoot, RowType rowType, boolean caseSensitive) {
this.vectorSchemaRoot = vectorSchemaRoot;
this.rowType = rowType;
this.allowUpperCase = allowUpperCase;
this.caseSensitive = caseSensitive;
}

public VectorSchemaRoot getVectorSchemaRoot() {
Expand All @@ -52,7 +52,7 @@ public long rowCount() {

@Override
public Iterator<InternalRow> iterator() {
ArrowBatchReader arrowBatchReader = new ArrowBatchReader(rowType, allowUpperCase);
ArrowBatchReader arrowBatchReader = new ArrowBatchReader(rowType, caseSensitive);
return arrowBatchReader.readBatch(vectorSchemaRoot).iterator();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
import java.util.List;
import java.util.stream.Collectors;

import static org.apache.paimon.utils.StringUtils.toLowerCaseIfNeed;

/** Utilities for creating Arrow objects. */
public class ArrowUtils {

Expand All @@ -66,13 +68,13 @@ public static VectorSchemaRoot createVectorSchemaRoot(
}

public static VectorSchemaRoot createVectorSchemaRoot(
RowType rowType, BufferAllocator allocator, boolean allowUpperCase) {
RowType rowType, BufferAllocator allocator, boolean caseSensitive) {
List<Field> fields =
rowType.getFields().stream()
.map(
f ->
toArrowField(
allowUpperCase ? f.name() : f.name().toLowerCase(),
toLowerCaseIfNeed(f.name(), caseSensitive),
f.id(),
f.type(),
0))
Expand All @@ -81,9 +83,9 @@ public static VectorSchemaRoot createVectorSchemaRoot(
}

public static FieldVector createVector(
DataField dataField, BufferAllocator allocator, boolean allowUpperCase) {
DataField dataField, BufferAllocator allocator, boolean caseSensitive) {
return toArrowField(
allowUpperCase ? dataField.name() : dataField.name().toLowerCase(),
toLowerCaseIfNeed(dataField.name(), caseSensitive),
dataField.id(),
dataField.type(),
0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,18 @@
import java.util.Iterator;
import java.util.List;

import static org.apache.paimon.utils.StringUtils.toLowerCaseIfNeed;

/** Reader from a {@link VectorSchemaRoot} to paimon rows. */
public class ArrowBatchReader {

private final InternalRowSerializer internalRowSerializer;
private final VectorizedColumnBatch batch;
private final Arrow2PaimonVectorConverter[] convertors;
private final RowType projectedRowType;
private final boolean allowUpperCase;
private final boolean caseSensitive;

public ArrowBatchReader(RowType rowType, boolean allowUpperCase) {
public ArrowBatchReader(RowType rowType, boolean caseSensitive) {
this.internalRowSerializer = new InternalRowSerializer(rowType);
ColumnVector[] columnVectors = new ColumnVector[rowType.getFieldCount()];
this.convertors = new Arrow2PaimonVectorConverter[rowType.getFieldCount()];
Expand All @@ -53,7 +55,7 @@ public ArrowBatchReader(RowType rowType, boolean allowUpperCase) {
for (int i = 0; i < columnVectors.length; i++) {
this.convertors[i] = Arrow2PaimonVectorConverter.construct(rowType.getTypeAt(i));
}
this.allowUpperCase = allowUpperCase;
this.caseSensitive = caseSensitive;
}

public Iterable<InternalRow> readBatch(VectorSchemaRoot vsr) {
Expand All @@ -63,8 +65,7 @@ public Iterable<InternalRow> readBatch(VectorSchemaRoot vsr) {
for (int i = 0; i < dataFields.size(); ++i) {
try {
String fieldName = dataFields.get(i).name();
Field field =
arrowSchema.findField(allowUpperCase ? fieldName : fieldName.toLowerCase());
Field field = arrowSchema.findField(toLowerCaseIfNeed(fieldName, caseSensitive));
int idx = arrowSchema.getFields().indexOf(field);
mapping[i] = idx;
} catch (IllegalArgumentException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ public class ArrowFormatCWriter implements AutoCloseable {
private final ArrowSchema schema;
private final ArrowFormatWriter realWriter;

public ArrowFormatCWriter(RowType rowType, int writeBatchSize, boolean allowUpperCase) {
this.realWriter = new ArrowFormatWriter(rowType, writeBatchSize, allowUpperCase);
public ArrowFormatCWriter(RowType rowType, int writeBatchSize, boolean caseSensitive) {
this.realWriter = new ArrowFormatWriter(rowType, writeBatchSize, caseSensitive);
RootAllocator allocator = realWriter.getAllocator();
array = ArrowArray.allocateNew(allocator);
schema = ArrowSchema.allocateNew(allocator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ public class ArrowFormatWriter implements AutoCloseable {
private final RootAllocator allocator;
private int rowId;

public ArrowFormatWriter(RowType rowType, int writeBatchSize, boolean allowUpperCase) {
public ArrowFormatWriter(RowType rowType, int writeBatchSize, boolean caseSensitive) {
allocator = new RootAllocator();

vectorSchemaRoot = ArrowUtils.createVectorSchemaRoot(rowType, allocator, allowUpperCase);
vectorSchemaRoot = ArrowUtils.createVectorSchemaRoot(rowType, allocator, caseSensitive);

fieldWriters = new ArrowFieldWriter[rowType.getFieldCount()];

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,12 @@ public class CatalogOptions {
.withDescription(
"Controls the max number for snapshots per table in the catalog are cached.");

public static final ConfigOption<Boolean> ALLOW_UPPER_CASE =
ConfigOptions.key("allow-upper-case")
public static final ConfigOption<Boolean> CASE_SENSITIVE =
ConfigOptions.key("case-sensitive")
.booleanType()
.noDefaultValue()
.withDescription(
"Indicates whether this catalog allow upper case, "
+ "its default value depends on the implementation of the specific catalog.");
.withFallbackKeys("allow-upper-case")
.withDescription("Indicates whether this catalog is case-sensitive.");

public static final ConfigOption<Boolean> SYNC_ALL_PROPERTIES =
ConfigOptions.key("sync-all-properties")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -542,8 +542,8 @@ public static String quote(String str) {
return "`" + str + "`";
}

public static String caseSensitiveConversion(String str, boolean allowUpperCase) {
return allowUpperCase ? str : str.toLowerCase();
public static String toLowerCaseIfNeed(String str, boolean caseSensitive) {
return caseSensitive ? str : str.toLowerCase();
}

public static boolean isNumeric(final CharSequence cs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@

import static org.apache.paimon.CoreOptions.TYPE;
import static org.apache.paimon.CoreOptions.createCommitUser;
import static org.apache.paimon.options.CatalogOptions.ALLOW_UPPER_CASE;
import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
Expand All @@ -82,7 +81,7 @@ protected AbstractCatalog(FileIO fileIO) {

protected AbstractCatalog(FileIO fileIO, Options options) {
this.fileIO = fileIO;
this.tableDefaultOptions = Catalog.tableDefaultOptions(options.toMap());
this.tableDefaultOptions = CatalogUtils.tableDefaultOptions(options.toMap());
this.catalogOptions = options;
}

Expand Down Expand Up @@ -123,11 +122,6 @@ protected boolean lockEnabled() {
return catalogOptions.getOptional(LOCK_ENABLED).orElse(fileIO.isObjectStore());
}

@Override
public boolean allowUpperCase() {
return catalogOptions.getOptional(ALLOW_UPPER_CASE).orElse(true);
}

protected boolean allowCustomTablePath() {
return false;
}
Expand Down Expand Up @@ -559,8 +553,9 @@ protected void checkNotSystemDatabase(String database) {
}

protected void validateIdentifierNameCaseInsensitive(Identifier identifier) {
Catalog.validateCaseInsensitive(allowUpperCase(), "Database", identifier.getDatabaseName());
Catalog.validateCaseInsensitive(allowUpperCase(), "Table", identifier.getObjectName());
CatalogUtils.validateCaseInsensitive(
caseSensitive(), "Database", identifier.getDatabaseName());
CatalogUtils.validateCaseInsensitive(caseSensitive(), "Table", identifier.getObjectName());
}

private void validateFieldNameCaseInsensitiveInSchemaChange(List<SchemaChange> changes) {
Expand All @@ -578,7 +573,7 @@ private void validateFieldNameCaseInsensitiveInSchemaChange(List<SchemaChange> c
}

protected void validateFieldNameCaseInsensitive(List<String> fieldNames) {
Catalog.validateCaseInsensitive(allowUpperCase(), "Field", fieldNames);
CatalogUtils.validateCaseInsensitive(caseSensitive(), "Field", fieldNames);
}

private void validateAutoCreateClose(Map<String, String> options) {
Expand Down
Loading

0 comments on commit cdd5bb7

Please sign in to comment.