Skip to content

Commit

Permalink
4
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Oct 30, 2023
1 parent efe0695 commit 74942f8
Show file tree
Hide file tree
Showing 10 changed files with 438 additions and 164 deletions.
177 changes: 177 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/mysql/DummyMysqlChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,16 @@

package org.apache.doris.mysql;

import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ConnectProcessor;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import javax.net.ssl.SSLEngine;


/**
Expand All @@ -27,55 +35,224 @@
* And don't need to allocate a real ByteBuffer.
*/
public class DummyMysqlChannel extends MysqlChannel {
private static final Logger LOG = LogManager.getLogger(DummyMysqlChannel.class);

public DummyMysqlChannel() {
this.serializer = MysqlSerializer.newInstance();
}

public void setSequenceId(int sequenceId) {
try {
throw new RuntimeException("111111 setSequenceId");
} catch (Exception e) {
LOG.warn(e.getMessage() + Arrays.toString(e.getStackTrace()));
}
this.sequenceId = sequenceId;
}

@Override
public String getRemoteIp() {
try {
throw new RuntimeException("111111 getRemoteIp");
} catch (Exception e) {
LOG.warn(e.getMessage() + Arrays.toString(e.getStackTrace()));
}
return "";
}

@Override
public String getRemoteHostPortString() {
try {
throw new RuntimeException("111111 getRemoteHostPortString");
} catch (Exception e) {
LOG.warn(e.getMessage() + Arrays.toString(e.getStackTrace()));
}
return "";
}

@Override
public void close() {
try {
throw new RuntimeException("111111 close");
} catch (Exception e) {
LOG.warn(e.getMessage() + Arrays.toString(e.getStackTrace()));
}
}

@Override
protected int readAll(ByteBuffer dstBuf, boolean isHeader) throws IOException {
try {
throw new RuntimeException("111111 readAll");
} catch (Exception e) {
LOG.warn(e.getMessage() + Arrays.toString(e.getStackTrace()));
}
return 0;
}

@Override
public ByteBuffer fetchOnePacket() throws IOException {
try {
throw new RuntimeException("111111 fetchOnePacket");
} catch (Exception e) {
LOG.warn(e.getMessage() + Arrays.toString(e.getStackTrace()));
}
return ByteBuffer.allocate(0);
}

@Override
public void flush() throws IOException {
try {
throw new RuntimeException("111111 flush");
} catch (Exception e) {
LOG.warn(e.getMessage() + Arrays.toString(e.getStackTrace()));
}
}

@Override
public void sendOnePacket(ByteBuffer packet) throws IOException {
try {
throw new RuntimeException("111111 sendOnePacket");
} catch (Exception e) {
LOG.warn(e.getMessage() + Arrays.toString(e.getStackTrace()));
}
}

@Override
public void sendAndFlush(ByteBuffer packet) throws IOException {
try {
throw new RuntimeException("111111 sendAndFlush");
} catch (Exception e) {
LOG.warn(e.getMessage() + Arrays.toString(e.getStackTrace()));
}
}

@Override
public void reset() {
try {
throw new RuntimeException("111111 reset");
} catch (Exception e) {
LOG.warn(e.getMessage() + Arrays.toString(e.getStackTrace()));
}
}

@Override
public MysqlSerializer getSerializer() {
try {
throw new RuntimeException("111111 getSerializer");
} catch (Exception e) {
LOG.warn(e.getMessage() + Arrays.toString(e.getStackTrace()));
}
return serializer;
}

@Override
public void setClientDeprecatedEOF() {
try {
throw new RuntimeException("111111 setClientDeprecatedEOF");
} catch (Exception e) {
LOG.warn(e.getMessage() + Arrays.toString(e.getStackTrace()));
}
clientDeprecatedEOF = true;
}

@Override
public boolean clientDeprecatedEOF() {
try {
throw new RuntimeException("111111 clientDeprecatedEOF");
} catch (Exception e) {
LOG.warn(e.getMessage() + Arrays.toString(e.getStackTrace()));
}
return clientDeprecatedEOF;
}

@Override
public void initSslBuffer() {
try {
throw new RuntimeException("111111 initSslBuffer");
} catch (Exception e) {
LOG.warn(e.getMessage() + Arrays.toString(e.getStackTrace()));
}
// allocate buffer when needed.
this.remainingBuffer = ByteBuffer.allocate(16 * 1024);
this.remainingBuffer.flip();
this.tempBuffer = ByteBuffer.allocate(16 * 1024);
this.sslHeaderByteBuffer = ByteBuffer.allocate(SSL_PACKET_HEADER_LEN);
}

@Override
public void setSslEngine(SSLEngine sslEngine) {
try {
throw new RuntimeException("111111 setSslEngine");
} catch (Exception e) {
LOG.warn(e.getMessage() + Arrays.toString(e.getStackTrace()));
}
this.sslEngine = sslEngine;
decryptAppData = ByteBuffer.allocate(sslEngine.getSession().getApplicationBufferSize() * 2);
encryptNetData = ByteBuffer.allocate(sslEngine.getSession().getPacketBufferSize() * 2);
}

@Override
public void setSslMode(boolean sslMode) {
try {
throw new RuntimeException("111111 setSslMode");
} catch (Exception e) {
LOG.warn(e.getMessage() + Arrays.toString(e.getStackTrace()));
}
isSslMode = sslMode;
if (isSslMode) {
// channel in ssl mode means handshake phase has finished.
isSslHandshaking = false;
}
}

@Override
public boolean isSend() {
try {
throw new RuntimeException("111111 isSend");
} catch (Exception e) {
LOG.warn(e.getMessage() + Arrays.toString(e.getStackTrace()));
}
return isSend;
}

@Override
public void startAcceptQuery(ConnectContext connectContext, ConnectProcessor connectProcessor) {
try {
throw new RuntimeException("111111 startAcceptQuery");
} catch (Exception e) {
LOG.warn(e.getMessage() + Arrays.toString(e.getStackTrace()));
}
conn.getSourceChannel().setReadListener(new ReadListener(connectContext, connectProcessor));
conn.getSourceChannel().resumeReads();
}

@Override
public void suspendAcceptQuery() {
try {
throw new RuntimeException("111111 suspendAcceptQuery");
} catch (Exception e) {
LOG.warn(e.getMessage() + Arrays.toString(e.getStackTrace()));
}
conn.getSourceChannel().suspendReads();
}

@Override
public void resumeAcceptQuery() {
try {
throw new RuntimeException("111111 resumeAcceptQuery");
} catch (Exception e) {
LOG.warn(e.getMessage() + Arrays.toString(e.getStackTrace()));
}
conn.getSourceChannel().resumeReads();
}

@Override
public void stopAcceptQuery() throws IOException {
try {
throw new RuntimeException("111111 stopAcceptQuery");
} catch (Exception e) {
LOG.warn(e.getMessage() + Arrays.toString(e.getStackTrace()));
}
conn.getSourceChannel().shutdownReads();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class MysqlChannel {
// next sequence id to receive or send
protected int sequenceId;
// channel connected with client
private StreamConnection conn;
protected StreamConnection conn;
// used to receive/send header, avoiding new this many time.
protected ByteBuffer headerByteBuffer;
protected ByteBuffer defaultBuffer;
Expand All @@ -72,12 +72,12 @@ public class MysqlChannel {

protected boolean isSslMode;
protected boolean isSslHandshaking;
private SSLEngine sslEngine;
protected SSLEngine sslEngine;

protected volatile MysqlSerializer serializer;

// mysql flag CLIENT_DEPRECATE_EOF
private boolean clientDeprecatedEOF;
protected boolean clientDeprecatedEOF;

protected MysqlChannel() {
// For DummyMysqlChannel
Expand Down
14 changes: 11 additions & 3 deletions fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.apache.doris.nereids.stats.StatsErrorEstimator;
import org.apache.doris.plugin.AuditEvent.AuditEventBuilder;
import org.apache.doris.resource.Tag;
import org.apache.doris.service.arrowflight.MysqlChannelToFlightSql;
import org.apache.doris.service.arrowflight.FlightSqlChannel;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.Histogram;
import org.apache.doris.system.Backend;
Expand All @@ -51,6 +51,7 @@
import org.apache.doris.transaction.TransactionEntry;
import org.apache.doris.transaction.TransactionStatus;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -104,6 +105,7 @@ public enum ConnectType {
protected volatile String peerIdentity;
// mysql net
protected volatile MysqlChannel mysqlChannel;
protected volatile FlightSqlChannel flightSqlChannel;
// state
protected volatile QueryState state;
protected volatile long returnRows;
Expand Down Expand Up @@ -351,7 +353,8 @@ public ConnectContext(String peerIdentity) {
returnRows = 0;
isKilled = false;
sessionVariable = VariableMgr.newSessionVariable();
mysqlChannel = new MysqlChannelToFlightSql();
mysqlChannel = new DummyMysqlChannel();
flightSqlChannel = new FlightSqlChannel();
command = MysqlCommand.COM_SLEEP;
if (Config.use_fuzzy_session_variable) {
sessionVariable.initFuzzyModeVariables();
Expand Down Expand Up @@ -603,11 +606,16 @@ public MysqlChannel getMysqlChannel() {
return mysqlChannel;
}

public FlightSqlChannel getFlightSqlChannel() {
Preconditions.checkState(connectType.equals(ConnectType.ARROW_FLIGHT_SQL));
return flightSqlChannel;
}

public String getClientIP() {
if (connectType.equals(ConnectType.MYSQL)) {
return mysqlChannel.getRemoteHostPortString();
} else {
return "0.0.0.0"; // TODO
return flightSqlChannel.getRemoteHostPortString();
}
}

Expand Down
36 changes: 22 additions & 14 deletions fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -2223,24 +2223,32 @@ private void sendFields(List<String> colNames, List<Type> types) throws IOExcept
}

public void sendResultSet(ResultSet resultSet) throws IOException {
context.updateReturnRows(resultSet.getResultRows().size());
// Send meta data.
sendMetaData(resultSet.getMetaData());
if (context.getConnectType().equals(ConnectType.MYSQL)) {
context.updateReturnRows(resultSet.getResultRows().size());
// Send meta data.
sendMetaData(resultSet.getMetaData());

// Send result set.
for (List<String> row : resultSet.getResultRows()) {
serializer.reset();
for (String item : row) {
if (item == null || item.equals(FeConstants.null_string)) {
serializer.writeNull();
} else {
serializer.writeLenEncodedString(item);
// Send result set.
for (List<String> row : resultSet.getResultRows()) {
serializer.reset();
for (String item : row) {
if (item == null || item.equals(FeConstants.null_string)) {
serializer.writeNull();
} else {
serializer.writeLenEncodedString(item);
}
}
context.getMysqlChannel().sendOnePacket(serializer.toByteBuffer());
}
context.getMysqlChannel().sendOnePacket(serializer.toByteBuffer());
}

context.getState().setEof();
context.getState().setEof();
} else if (context.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL)) {
context.updateReturnRows(resultSet.getResultRows().size());
context.getFlightSqlChannel().addResultSet(DebugUtil.printId(context.queryId()), context.getRunningQuery(), resultSet);
context.getState().setEof();
} else {
LOG.error("sendResultSet error connect type");
}
}

// Process show statement
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@
import org.apache.doris.qe.QueryState;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.qe.VariableMgr;
import org.apache.doris.service.arrowflight.FlightSQLConnectProcessor;
import org.apache.doris.service.arrowflight.FlightSqlConnectProcessor;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.ResultRow;
import org.apache.doris.statistics.StatisticsCacheKey;
Expand Down Expand Up @@ -1112,7 +1112,7 @@ public TMasterOpResult forward(TMasterOpRequest params) throws TException {
if (context.getConnectType().equals(ConnectType.MYSQL)) {
processor = new MysqlConnectProcessor(context);
} else if (context.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL)) {
processor = new FlightSQLConnectProcessor(context);
processor = new FlightSqlConnectProcessor(context);
} else {
LOG.warn("unknown ConnectType: {}", context.getConnectType());
}
Expand Down
Loading

0 comments on commit 74942f8

Please sign in to comment.