From 9e219a911705a8cc1127bf6a2fd11ec3fb8c2aee Mon Sep 17 00:00:00 2001 From: Zou Xinyi Date: Thu, 2 Jan 2025 00:12:40 +0800 Subject: [PATCH] 3 --- .../arrowflight/DorisFlightSqlProducer.java | 31 ++- .../arrowflight/FlightSqlSchemaHelper.java | 215 +++++++++--------- 2 files changed, 123 insertions(+), 123 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java index 59676639ad18b2..154fd9f0b6b83c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java @@ -72,12 +72,10 @@ import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.util.AutoCloseables; -import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.ipc.WriteChannel; import org.apache.arrow.vector.ipc.message.MessageSerializer; import org.apache.arrow.vector.types.pojo.Schema; -import org.apache.arrow.vector.util.Text; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -451,20 +449,20 @@ public FlightInfo getFlightInfoCatalogs(final CommandGetCatalogs request, final @Override public void getStreamCatalogs(final CallContext context, final ServerStreamListener listener) { - try (final VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(Schemas.GET_CATALOGS_SCHEMA, - rootAllocator)) { - listener.start(vectorSchemaRoot); - vectorSchemaRoot.allocateNew(); - VarCharVector catalogNameVector = (VarCharVector) vectorSchemaRoot.getVector("catalog_name"); - // Only show Internal Catalog, which is consistent with `jdbc:mysql`. - // Otherwise, if the configured ExternalCatalog cannot be connected, - // `catalog.getAllDbs()` will be stuck and wait until the timeout period ends. - catalogNameVector.setSafe(0, new Text("internal")); - vectorSchemaRoot.setRowCount(1); - listener.putNext(); - listener.completed(); - } catch (final Exception ex) { - handleStreamException(ex, "", listener); + try { + ConnectContext connectContext = flightSessionsManager.getConnectContext(context.peerIdentity()); + FlightSqlSchemaHelper flightSqlSchemaHelper = new FlightSqlSchemaHelper(connectContext); + final Schema schema = Schemas.GET_CATALOGS_SCHEMA; + + try (final VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schema, rootAllocator)) { + listener.start(vectorSchemaRoot); + vectorSchemaRoot.allocateNew(); + flightSqlSchemaHelper.getCatalogs(vectorSchemaRoot); + listener.putNext(); + listener.completed(); + } + } catch (final Exception e) { + handleStreamException(e, "", listener); } } @@ -547,7 +545,6 @@ public FlightInfo getFlightInfoPrimaryKeys(final CommandGetPrimaryKeys request, @Override public void getStreamPrimaryKeys(final CommandGetPrimaryKeys command, final CallContext context, final ServerStreamListener listener) { - throw CallStatus.UNIMPLEMENTED.withDescription("getStreamPrimaryKeys unimplemented").toRuntimeException(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlSchemaHelper.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlSchemaHelper.java index eec1c47d9c21ca..577d46d9073ab0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlSchemaHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlSchemaHelper.java @@ -31,7 +31,6 @@ import org.apache.doris.thrift.TListTableStatusResult; import org.apache.doris.thrift.TTableStatus; -import com.google.common.base.Preconditions; import org.apache.arrow.adapter.jdbc.JdbcToArrowUtils; import org.apache.arrow.flight.sql.FlightSqlColumnMetadata; import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetDbSchemas; @@ -63,103 +62,27 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Set; public class FlightSqlSchemaHelper { private static final Logger LOG = LogManager.getLogger(FlightSqlSchemaHelper.class); private final ConnectContext ctx; private final FrontendServiceImpl impl; private boolean includeSchema; - private String catalogFilterPattern; - private String dbSchemaFilterPattern; - private String tableNameFilterPattern; - private List tableTypesList; - private static final byte[] EMPTY_SERIALIZED_SCHEMA = getSerializedSchema(Collections.emptyList()); + private String catalogFilterPattern = null; + private String dbSchemaFilterPattern = null; + private String tableNameFilterPattern = null; + private List tableTypesList = null; public FlightSqlSchemaHelper(ConnectContext context) { ctx = context; impl = new FrontendServiceImpl(ExecuteEnv.getInstance()); } - /** - * Set in the Tables request object the parameter that user passed via CommandGetTables. - */ - public void setParameterForGetTables(CommandGetTables command) { - includeSchema = command.getIncludeSchema(); - // Only show Internal Catalog, which is consistent with `jdbc:mysql`. - // Otherwise, if the configured ExternalCatalog cannot be connected, - // `catalog.getAllDbs()` will be stuck and wait until the timeout period ends. - catalogFilterPattern = command.hasCatalog() ? command.getCatalog() : "internal"; - dbSchemaFilterPattern = command.hasDbSchemaFilterPattern() ? command.getDbSchemaFilterPattern() : null; - if (command.hasTableNameFilterPattern()) { - if (command.getTableNameFilterPattern().contains(".")) { - Preconditions.checkState(command.getTableNameFilterPattern().split("\\.", -1).length == 2); - dbSchemaFilterPattern = command.getTableNameFilterPattern().split("\\.", -1)[0]; - tableNameFilterPattern = command.getTableNameFilterPattern().split("\\.", -1)[1]; - } else { - tableNameFilterPattern = command.getTableNameFilterPattern(); - } - } else { - tableNameFilterPattern = null; - } - tableTypesList = command.getTableTypesList().isEmpty() ? null : command.getTableTypesList(); - } - - /** - * Set in the Schemas request object the parameter that user passed via CommandGetDbSchemas. - */ - public void setParameterForGetDbSchemas(CommandGetDbSchemas command) { - catalogFilterPattern = command.hasCatalog() ? command.getCatalog() : "internal"; - dbSchemaFilterPattern = command.hasDbSchemaFilterPattern() ? command.getDbSchemaFilterPattern() : null; - } - - /** - * Call FrontendServiceImpl->getDbNames. - */ - private TGetDbsResult getDbNames() throws TException { - TGetDbsParams getDbsParams = new TGetDbsParams(); - getDbsParams.setCatalog(catalogFilterPattern); - if (dbSchemaFilterPattern != null) { - getDbsParams.setPattern(dbSchemaFilterPattern); - } - getDbsParams.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift()); - return impl.getDbNames(getDbsParams); - } - - /** - * Call FrontendServiceImpl->listTableStatus. - */ - private TListTableStatusResult listTableStatus(String dbName, String catalogName) throws TException { - TGetTablesParams getTablesParams = new TGetTablesParams(); - getTablesParams.setDb(dbName); - if (!catalogName.isEmpty()) { - getTablesParams.setCatalog(catalogName); - } - if (tableNameFilterPattern != null) { - getTablesParams.setPattern(tableNameFilterPattern); - } - if (tableTypesList != null) { - getTablesParams.setType(tableTypesList.get(0)); // currently only one type is supported. - } - getTablesParams.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift()); - return impl.listTableStatus(getTablesParams); - } - - /** - * Call FrontendServiceImpl->describeTables. - */ - private TDescribeTablesResult describeTables(String dbName, String catalogName, List tablesName) - throws TException { - TDescribeTablesParams describeTablesParams = new TDescribeTablesParams(); - describeTablesParams.setDb(dbName); - if (!catalogName.isEmpty()) { - describeTablesParams.setCatalog(catalogName); - } - describeTablesParams.setTablesName(tablesName); - describeTablesParams.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift()); - return impl.describeTables(describeTablesParams); - } + private static final byte[] EMPTY_SERIALIZED_SCHEMA = getSerializedSchema(Collections.emptyList()); /** * Convert Doris data type to an arrowType. @@ -252,6 +175,93 @@ private static Map createFlightSqlColumnMetadata(final String db return columnMetadataBuilder.build().getMetadataMap(); } + protected static byte[] getSerializedSchema(List fields) { + if (EMPTY_SERIALIZED_SCHEMA == null && fields == null) { + fields = Collections.emptyList(); + } else if (fields == null) { + return Arrays.copyOf(EMPTY_SERIALIZED_SCHEMA, EMPTY_SERIALIZED_SCHEMA.length); + } + + final ByteArrayOutputStream columnOutputStream = new ByteArrayOutputStream(); + final Schema schema = new Schema(fields); + + try { + MessageSerializer.serialize(new WriteChannel(Channels.newChannel(columnOutputStream)), schema); + } catch (final IOException e) { + throw new RuntimeException("IO Error when serializing schema '" + schema + "'.", e); + } + + return columnOutputStream.toByteArray(); + } + + /** + * Set in the Tables request object the parameter that user passed via CommandGetTables. + */ + public void setParameterForGetTables(CommandGetTables command) { + includeSchema = command.getIncludeSchema(); + catalogFilterPattern = command.hasCatalog() ? command.getCatalog() : "internal"; + dbSchemaFilterPattern = command.hasDbSchemaFilterPattern() ? command.getDbSchemaFilterPattern() : null; + tableNameFilterPattern = command.hasTableNameFilterPattern() ? command.getTableNameFilterPattern() : null; + tableTypesList = command.getTableTypesList().isEmpty() ? null : command.getTableTypesList(); + } + + /** + * Set in the Schemas request object the parameter that user passed via CommandGetDbSchemas. + */ + public void setParameterForGetDbSchemas(CommandGetDbSchemas command) { + catalogFilterPattern = command.hasCatalog() ? command.getCatalog() : "internal"; + dbSchemaFilterPattern = command.hasDbSchemaFilterPattern() ? command.getDbSchemaFilterPattern() : null; + } + + /** + * Call FrontendServiceImpl->getDbNames. + */ + private TGetDbsResult getDbNames() throws TException { + TGetDbsParams getDbsParams = new TGetDbsParams(); + if (catalogFilterPattern != null) { + getDbsParams.setCatalog(catalogFilterPattern); + } + if (dbSchemaFilterPattern != null) { + getDbsParams.setPattern(dbSchemaFilterPattern); + } + getDbsParams.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift()); + return impl.getDbNames(getDbsParams); + } + + /** + * Call FrontendServiceImpl->listTableStatus. + */ + private TListTableStatusResult listTableStatus(String dbName, String catalogName) throws TException { + TGetTablesParams getTablesParams = new TGetTablesParams(); + getTablesParams.setDb(dbName); + if (!catalogName.isEmpty()) { + getTablesParams.setCatalog(catalogName); + } + if (tableNameFilterPattern != null) { + getTablesParams.setPattern(tableNameFilterPattern); + } + if (tableTypesList != null) { + getTablesParams.setType(tableTypesList.get(0)); // currently only one type is supported. + } + getTablesParams.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift()); + return impl.listTableStatus(getTablesParams); + } + + /** + * Call FrontendServiceImpl->describeTables. + */ + private TDescribeTablesResult describeTables(String dbName, String catalogName, List tablesName) + throws TException { + TDescribeTablesParams describeTablesParams = new TDescribeTablesParams(); + describeTablesParams.setDb(dbName); + if (!catalogName.isEmpty()) { + describeTablesParams.setCatalog(catalogName); + } + describeTablesParams.setTablesName(tablesName); + describeTablesParams.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift()); + return impl.describeTables(describeTablesParams); + } + /** * Construct > */ @@ -300,23 +310,23 @@ private Map> buildTableToFields(String dbName, TDescribeTabl return tableToFields; } - protected static byte[] getSerializedSchema(List fields) { - if (EMPTY_SERIALIZED_SCHEMA == null && fields == null) { - fields = Collections.emptyList(); - } else if (fields == null) { - return Arrays.copyOf(EMPTY_SERIALIZED_SCHEMA, EMPTY_SERIALIZED_SCHEMA.length); - } + /** + * for FlightSqlProducer Schemas.GET_CATALOGS_SCHEMA + */ + public void getCatalogs(VectorSchemaRoot vectorSchemaRoot) throws TException { + VarCharVector catalogNameVector = (VarCharVector) vectorSchemaRoot.getVector("catalog_name"); - final ByteArrayOutputStream columnOutputStream = new ByteArrayOutputStream(); - final Schema schema = new Schema(fields); + Set catalogsSet = new LinkedHashSet<>(); + catalogsSet.add("internal"); // An ordered Set with "internal" first. + TGetDbsResult getDbsResult = getDbNames(); + catalogsSet.addAll(getDbsResult.getCatalogs()); - try { - MessageSerializer.serialize(new WriteChannel(Channels.newChannel(columnOutputStream)), schema); - } catch (final IOException e) { - throw new RuntimeException("IO Error when serializing schema '" + schema + "'.", e); + int catalogIndex = 0; + for (String catalog : catalogsSet) { + catalogNameVector.setSafe(catalogIndex, new Text(catalog)); + catalogIndex++; } - - return columnOutputStream.toByteArray(); + vectorSchemaRoot.setRowCount(catalogIndex); } /** @@ -368,14 +378,7 @@ public void getTables(VectorSchemaRoot vectorSchemaRoot) throws TException { for (TTableStatus tableStatus : listTableStatusResult.getTables()) { catalogNameVector.setSafe(tablesCount, new Text(catalogName)); schemaNameVector.setSafe(tablesCount, new Text(dbName)); - // DBeaver uses `Arrow Flight SQL JDBC Driver Core [16.1.0]` driver to connect to Doris. - // The metadata only shows one layer of `Tables`. All tables will be displayed together, - // so the database name and table name are spelled together for distinction. - // When DBeaver uses `MySQL Connector/J [mysql-connector-j-8.2.0` driver to connect to Doris, - // the metadata will show two layers of `Databases - Tables`. - // - // TODO, show two layers of original data `Databases - Tables` in DBeaver. - tableNameVector.setSafe(tablesCount, new Text(dbName + "." + tableStatus.getName())); + tableNameVector.setSafe(tablesCount, new Text(tableStatus.getName())); tableTypeVector.setSafe(tablesCount, new Text(tableStatus.getType())); if (includeSchema) { List fields = tableToFields.get(tableStatus.getName());