Skip to content

Commit

Permalink
change identifier, add pr 502 506
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba committed Nov 6, 2024
1 parent 7afddab commit 33093eb
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,11 @@ public TScanOpenResult openScanner(TScanOpenParams openParams) {
open();
}
TException ex = null;
TScanOpenResult result = null;
for (int attempt = 0; attempt < retries; ++attempt) {
logger.debug("Attempt {} to openScanner {}.", attempt, routing);
try {
TScanOpenResult result = client.openScanner(openParams);
result = client.openScanner(openParams);
if (result == null) {
logger.warn("Open scanner result from {} is null.", routing);
continue;
Expand All @@ -155,12 +156,28 @@ public TScanOpenResult openScanner(TScanOpenParams openParams) {
result.getStatus().getErrorMsgs());
continue;
}
logger.info(
"OpenScanner success for Doris BE '{}' with contextId '{}' for tablets '{}'.",
routing,
result.getContextId(),
openParams.tablet_ids);
return result;
} catch (TException e) {
logger.warn("Open scanner from {} failed.", routing, e);
ex = e;
}
}
if (result != null && (TStatusCode.OK != (result.getStatus().getStatusCode()))) {
logger.error(
ErrorMessages.DORIS_INTERNAL_FAIL_MESSAGE,
routing,
result.getStatus().getStatusCode(),
result.getStatus().getErrorMsgs());
throw new DorisInternalException(
routing.toString(),
result.getStatus().getStatusCode(),
result.getStatus().getErrorMsgs());
}
logger.error(ErrorMessages.CONNECT_FAILED_MESSAGE, routing);
throw new ConnectedFailedException(routing.toString(), ex);
}
Expand Down Expand Up @@ -244,7 +261,10 @@ public void closeScanner(TScanCloseParams closeParams) {
logger.warn("Close scanner from {} failed.", routing, e);
}
}
logger.info("CloseScanner to Doris BE '{}' success.", routing);
logger.info(
"CloseScanner to Doris BE '{}' success for contextId {} ",
routing,
closeParams.getContextId());
close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,4 +144,20 @@ public String toString() {
+ '\''
+ '}';
}

public String toStringWithoutPlan() {
return "PartitionDefinition{"
+ "database='"
+ database
+ '\''
+ ", table='"
+ table
+ '\''
+ ", beAddress='"
+ beAddress
+ '\''
+ ", tabletIds="
+ tabletIds
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ public synchronized void writeRecord(String database, String table, byte[] recor
lock.lock();
try {
while (currentCacheBytes.get() >= maxBlockedBytes) {
checkFlushException();
LOG.info(
"Cache full, waiting for flush, currentBytes: {}, maxBlockedBytes: {}",
currentCacheBytes.get(),
Expand Down Expand Up @@ -486,11 +487,22 @@ public void load(String label, BatchRecordBuffer buffer) throws IOException {
putBuilder.setLabel(label + "_" + retry);
reason = respContent.getMessage();
} else {
String errMsg =
String.format(
"stream load error: %s, see more in %s",
respContent.getMessage(),
respContent.getErrorURL());
String errMsg = null;
if (StringUtils.isBlank(respContent.getMessage())
&& StringUtils.isBlank(respContent.getErrorURL())) {
// sometimes stream load will not return message
errMsg =
String.format(
"stream load error, response is %s",
loadResult);
throw new DorisBatchLoadException(errMsg);
} else {
errMsg =
String.format(
"stream load error: %s, see more in %s",
respContent.getMessage(),
respContent.getErrorURL());
}
throw new DorisBatchLoadException(errMsg);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ public Optional<DorisSourceSplit> getNext(@Nullable String hostname) {
}

@Override
public void addSplits(Collection<DorisSourceSplit> splits) {
LOG.info("Adding splits: {}", splits);
splits.addAll(splits);
public void addSplits(Collection<DorisSourceSplit> newSplits) {
LOG.info("Adding splits: {}", newSplits);
splits.addAll(newSplits);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,11 @@ public void run() {
} catch (InterruptedException e) {
throw new DorisRuntimeException(e);
}
} else {
LOG.info(
"Async scan finished , tablets: {}, offset: {}",
partition.getTabletIds(),
offset);
}
}
} finally {
Expand Down Expand Up @@ -245,6 +250,11 @@ public boolean hasNext() {
eos.set(nextResult.isEos());
if (!eos.get()) {
rowBatch = new RowBatch(nextResult, schema).readArrow();
} else {
LOG.info(
"Scan finished, tablets: {}, offset: {}",
partition.getTabletIds(),
offset);
}
}
hasNext = !eos.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public static ValueReader createReader(
DorisReadOptions readOptions,
Logger logger)
throws DorisException {
logger.info("create reader for partition: {}", partition);
logger.info("create reader for partition: {}", partition.toStringWithoutPlan());
if (readOptions.getUseFlightSql()) {
return new DorisFlightValueReader(
partition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
@PublicEvolving
public class DorisConfigOptions {

public static final String IDENTIFIER = "selectdb-preview";
public static final String IDENTIFIER = "doris";
// common option
public static final ConfigOption<String> FENODES =
ConfigOptions.key("fenodes")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,15 @@ public DorisTableInputSplit(int splitNumber, PartitionDefinition partition) {
public int getSplitNumber() {
return splitNumber;
}

@Override
public String toString() {
return String.format(
"DorisTableInputSplit: %s.%s,id=%s,be=%s,tablets=%s",
partition.getDatabase(),
partition.getTable(),
splitNumber,
partition.getBeAddress(),
partition.getTabletIds());
}
}

0 comments on commit 33093eb

Please sign in to comment.