Skip to content

Commit

Permalink
3
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Jan 1, 2025
1 parent de57672 commit 9e219a9
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,10 @@
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.WriteChannel;
import org.apache.arrow.vector.ipc.message.MessageSerializer;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.arrow.vector.util.Text;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -451,20 +449,20 @@ public FlightInfo getFlightInfoCatalogs(final CommandGetCatalogs request, final

@Override
public void getStreamCatalogs(final CallContext context, final ServerStreamListener listener) {
try (final VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(Schemas.GET_CATALOGS_SCHEMA,
rootAllocator)) {
listener.start(vectorSchemaRoot);
vectorSchemaRoot.allocateNew();
VarCharVector catalogNameVector = (VarCharVector) vectorSchemaRoot.getVector("catalog_name");
// Only show Internal Catalog, which is consistent with `jdbc:mysql`.
// Otherwise, if the configured ExternalCatalog cannot be connected,
// `catalog.getAllDbs()` will be stuck and wait until the timeout period ends.
catalogNameVector.setSafe(0, new Text("internal"));
vectorSchemaRoot.setRowCount(1);
listener.putNext();
listener.completed();
} catch (final Exception ex) {
handleStreamException(ex, "", listener);
try {
ConnectContext connectContext = flightSessionsManager.getConnectContext(context.peerIdentity());
FlightSqlSchemaHelper flightSqlSchemaHelper = new FlightSqlSchemaHelper(connectContext);
final Schema schema = Schemas.GET_CATALOGS_SCHEMA;

try (final VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schema, rootAllocator)) {
listener.start(vectorSchemaRoot);
vectorSchemaRoot.allocateNew();
flightSqlSchemaHelper.getCatalogs(vectorSchemaRoot);
listener.putNext();
listener.completed();
}
} catch (final Exception e) {
handleStreamException(e, "", listener);
}
}

Expand Down Expand Up @@ -547,7 +545,6 @@ public FlightInfo getFlightInfoPrimaryKeys(final CommandGetPrimaryKeys request,
@Override
public void getStreamPrimaryKeys(final CommandGetPrimaryKeys command, final CallContext context,
final ServerStreamListener listener) {

throw CallStatus.UNIMPLEMENTED.withDescription("getStreamPrimaryKeys unimplemented").toRuntimeException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.doris.thrift.TListTableStatusResult;
import org.apache.doris.thrift.TTableStatus;

import com.google.common.base.Preconditions;
import org.apache.arrow.adapter.jdbc.JdbcToArrowUtils;
import org.apache.arrow.flight.sql.FlightSqlColumnMetadata;
import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetDbSchemas;
Expand Down Expand Up @@ -63,103 +62,27 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class FlightSqlSchemaHelper {
private static final Logger LOG = LogManager.getLogger(FlightSqlSchemaHelper.class);
private final ConnectContext ctx;
private final FrontendServiceImpl impl;
private boolean includeSchema;
private String catalogFilterPattern;
private String dbSchemaFilterPattern;
private String tableNameFilterPattern;
private List<String> tableTypesList;
private static final byte[] EMPTY_SERIALIZED_SCHEMA = getSerializedSchema(Collections.emptyList());
private String catalogFilterPattern = null;
private String dbSchemaFilterPattern = null;
private String tableNameFilterPattern = null;
private List<String> tableTypesList = null;

public FlightSqlSchemaHelper(ConnectContext context) {
ctx = context;
impl = new FrontendServiceImpl(ExecuteEnv.getInstance());
}

/**
* Set in the Tables request object the parameter that user passed via CommandGetTables.
*/
public void setParameterForGetTables(CommandGetTables command) {
includeSchema = command.getIncludeSchema();
// Only show Internal Catalog, which is consistent with `jdbc:mysql`.
// Otherwise, if the configured ExternalCatalog cannot be connected,
// `catalog.getAllDbs()` will be stuck and wait until the timeout period ends.
catalogFilterPattern = command.hasCatalog() ? command.getCatalog() : "internal";
dbSchemaFilterPattern = command.hasDbSchemaFilterPattern() ? command.getDbSchemaFilterPattern() : null;
if (command.hasTableNameFilterPattern()) {
if (command.getTableNameFilterPattern().contains(".")) {
Preconditions.checkState(command.getTableNameFilterPattern().split("\\.", -1).length == 2);
dbSchemaFilterPattern = command.getTableNameFilterPattern().split("\\.", -1)[0];
tableNameFilterPattern = command.getTableNameFilterPattern().split("\\.", -1)[1];
} else {
tableNameFilterPattern = command.getTableNameFilterPattern();
}
} else {
tableNameFilterPattern = null;
}
tableTypesList = command.getTableTypesList().isEmpty() ? null : command.getTableTypesList();
}

/**
* Set in the Schemas request object the parameter that user passed via CommandGetDbSchemas.
*/
public void setParameterForGetDbSchemas(CommandGetDbSchemas command) {
catalogFilterPattern = command.hasCatalog() ? command.getCatalog() : "internal";
dbSchemaFilterPattern = command.hasDbSchemaFilterPattern() ? command.getDbSchemaFilterPattern() : null;
}

/**
* Call FrontendServiceImpl->getDbNames.
*/
private TGetDbsResult getDbNames() throws TException {
TGetDbsParams getDbsParams = new TGetDbsParams();
getDbsParams.setCatalog(catalogFilterPattern);
if (dbSchemaFilterPattern != null) {
getDbsParams.setPattern(dbSchemaFilterPattern);
}
getDbsParams.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift());
return impl.getDbNames(getDbsParams);
}

/**
* Call FrontendServiceImpl->listTableStatus.
*/
private TListTableStatusResult listTableStatus(String dbName, String catalogName) throws TException {
TGetTablesParams getTablesParams = new TGetTablesParams();
getTablesParams.setDb(dbName);
if (!catalogName.isEmpty()) {
getTablesParams.setCatalog(catalogName);
}
if (tableNameFilterPattern != null) {
getTablesParams.setPattern(tableNameFilterPattern);
}
if (tableTypesList != null) {
getTablesParams.setType(tableTypesList.get(0)); // currently only one type is supported.
}
getTablesParams.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift());
return impl.listTableStatus(getTablesParams);
}

/**
* Call FrontendServiceImpl->describeTables.
*/
private TDescribeTablesResult describeTables(String dbName, String catalogName, List<String> tablesName)
throws TException {
TDescribeTablesParams describeTablesParams = new TDescribeTablesParams();
describeTablesParams.setDb(dbName);
if (!catalogName.isEmpty()) {
describeTablesParams.setCatalog(catalogName);
}
describeTablesParams.setTablesName(tablesName);
describeTablesParams.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift());
return impl.describeTables(describeTablesParams);
}
private static final byte[] EMPTY_SERIALIZED_SCHEMA = getSerializedSchema(Collections.emptyList());

/**
* Convert Doris data type to an arrowType.
Expand Down Expand Up @@ -252,6 +175,93 @@ private static Map<String, String> createFlightSqlColumnMetadata(final String db
return columnMetadataBuilder.build().getMetadataMap();
}

protected static byte[] getSerializedSchema(List<Field> fields) {
if (EMPTY_SERIALIZED_SCHEMA == null && fields == null) {
fields = Collections.emptyList();
} else if (fields == null) {
return Arrays.copyOf(EMPTY_SERIALIZED_SCHEMA, EMPTY_SERIALIZED_SCHEMA.length);
}

final ByteArrayOutputStream columnOutputStream = new ByteArrayOutputStream();
final Schema schema = new Schema(fields);

try {
MessageSerializer.serialize(new WriteChannel(Channels.newChannel(columnOutputStream)), schema);
} catch (final IOException e) {
throw new RuntimeException("IO Error when serializing schema '" + schema + "'.", e);
}

return columnOutputStream.toByteArray();
}

/**
* Set in the Tables request object the parameter that user passed via CommandGetTables.
*/
public void setParameterForGetTables(CommandGetTables command) {
includeSchema = command.getIncludeSchema();
catalogFilterPattern = command.hasCatalog() ? command.getCatalog() : "internal";
dbSchemaFilterPattern = command.hasDbSchemaFilterPattern() ? command.getDbSchemaFilterPattern() : null;
tableNameFilterPattern = command.hasTableNameFilterPattern() ? command.getTableNameFilterPattern() : null;
tableTypesList = command.getTableTypesList().isEmpty() ? null : command.getTableTypesList();
}

/**
* Set in the Schemas request object the parameter that user passed via CommandGetDbSchemas.
*/
public void setParameterForGetDbSchemas(CommandGetDbSchemas command) {
catalogFilterPattern = command.hasCatalog() ? command.getCatalog() : "internal";
dbSchemaFilterPattern = command.hasDbSchemaFilterPattern() ? command.getDbSchemaFilterPattern() : null;
}

/**
* Call FrontendServiceImpl->getDbNames.
*/
private TGetDbsResult getDbNames() throws TException {
TGetDbsParams getDbsParams = new TGetDbsParams();
if (catalogFilterPattern != null) {
getDbsParams.setCatalog(catalogFilterPattern);
}
if (dbSchemaFilterPattern != null) {
getDbsParams.setPattern(dbSchemaFilterPattern);
}
getDbsParams.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift());
return impl.getDbNames(getDbsParams);
}

/**
* Call FrontendServiceImpl->listTableStatus.
*/
private TListTableStatusResult listTableStatus(String dbName, String catalogName) throws TException {
TGetTablesParams getTablesParams = new TGetTablesParams();
getTablesParams.setDb(dbName);
if (!catalogName.isEmpty()) {
getTablesParams.setCatalog(catalogName);
}
if (tableNameFilterPattern != null) {
getTablesParams.setPattern(tableNameFilterPattern);
}
if (tableTypesList != null) {
getTablesParams.setType(tableTypesList.get(0)); // currently only one type is supported.
}
getTablesParams.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift());
return impl.listTableStatus(getTablesParams);
}

/**
* Call FrontendServiceImpl->describeTables.
*/
private TDescribeTablesResult describeTables(String dbName, String catalogName, List<String> tablesName)
throws TException {
TDescribeTablesParams describeTablesParams = new TDescribeTablesParams();
describeTablesParams.setDb(dbName);
if (!catalogName.isEmpty()) {
describeTablesParams.setCatalog(catalogName);
}
describeTablesParams.setTablesName(tablesName);
describeTablesParams.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift());
return impl.describeTables(describeTablesParams);
}

/**
* Construct <tableName, List<ArrowType>>
*/
Expand Down Expand Up @@ -300,23 +310,23 @@ private Map<String, List<Field>> buildTableToFields(String dbName, TDescribeTabl
return tableToFields;
}

protected static byte[] getSerializedSchema(List<Field> fields) {
if (EMPTY_SERIALIZED_SCHEMA == null && fields == null) {
fields = Collections.emptyList();
} else if (fields == null) {
return Arrays.copyOf(EMPTY_SERIALIZED_SCHEMA, EMPTY_SERIALIZED_SCHEMA.length);
}
/**
* for FlightSqlProducer Schemas.GET_CATALOGS_SCHEMA
*/
public void getCatalogs(VectorSchemaRoot vectorSchemaRoot) throws TException {
VarCharVector catalogNameVector = (VarCharVector) vectorSchemaRoot.getVector("catalog_name");

final ByteArrayOutputStream columnOutputStream = new ByteArrayOutputStream();
final Schema schema = new Schema(fields);
Set<String> catalogsSet = new LinkedHashSet<>();
catalogsSet.add("internal"); // An ordered Set with "internal" first.
TGetDbsResult getDbsResult = getDbNames();
catalogsSet.addAll(getDbsResult.getCatalogs());

try {
MessageSerializer.serialize(new WriteChannel(Channels.newChannel(columnOutputStream)), schema);
} catch (final IOException e) {
throw new RuntimeException("IO Error when serializing schema '" + schema + "'.", e);
int catalogIndex = 0;
for (String catalog : catalogsSet) {
catalogNameVector.setSafe(catalogIndex, new Text(catalog));
catalogIndex++;
}

return columnOutputStream.toByteArray();
vectorSchemaRoot.setRowCount(catalogIndex);
}

/**
Expand Down Expand Up @@ -368,14 +378,7 @@ public void getTables(VectorSchemaRoot vectorSchemaRoot) throws TException {
for (TTableStatus tableStatus : listTableStatusResult.getTables()) {
catalogNameVector.setSafe(tablesCount, new Text(catalogName));
schemaNameVector.setSafe(tablesCount, new Text(dbName));
// DBeaver uses `Arrow Flight SQL JDBC Driver Core [16.1.0]` driver to connect to Doris.
// The metadata only shows one layer of `Tables`. All tables will be displayed together,
// so the database name and table name are spelled together for distinction.
// When DBeaver uses `MySQL Connector/J [mysql-connector-j-8.2.0` driver to connect to Doris,
// the metadata will show two layers of `Databases - Tables`.
//
// TODO, show two layers of original data `Databases - Tables` in DBeaver.
tableNameVector.setSafe(tablesCount, new Text(dbName + "." + tableStatus.getName()));
tableNameVector.setSafe(tablesCount, new Text(tableStatus.getName()));
tableTypeVector.setSafe(tablesCount, new Text(tableStatus.getType()));
if (includeSchema) {
List<Field> fields = tableToFields.get(tableStatus.getName());
Expand Down

0 comments on commit 9e219a9

Please sign in to comment.