Skip to content

Commit

Permalink
optimize storage module
Browse files Browse the repository at this point in the history
  • Loading branch information
peacewong committed Oct 1, 2023
1 parent f11d239 commit a77aab0
Show file tree
Hide file tree
Showing 13 changed files with 68 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.FileSystems;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class LinkisStorageConf {
public static final String FILE_TYPE =
CommonVars.apply(
"wds.linkis.storage.file.type",
"dolphin,sql,scala,py,hql,python,out,log,text,sh,jdbc,ngql,psql,fql,tsql")
"dolphin,sql,scala,py,hql,python,out,log,text,txt,sh,jdbc,ngql,psql,fql,tsql")
.getValue();

private static volatile String[] fileTypeArr = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,17 @@ public void addMetaData(MetaData metaData) throws IOException {

private String compact(String[] row) {
String quotationMarks = "\"";
String dealNewlineSymbolMarks = "\n";
StringBuilder rowBuilder = new StringBuilder();
for (String value : row) {
String decoratedValue =
StringUtils.isBlank(value)
? value
: quoteRetouchEnable
? quotationMarks + value.replaceAll(quotationMarks, "") + quotationMarks
: value;
String decoratedValue = value;
if (StringUtils.isNotBlank(value)) {
if (quoteRetouchEnable) {
decoratedValue = quotationMarks + value.replaceAll(quotationMarks, "") + quotationMarks;
}
decoratedValue = decoratedValue.replaceAll(dealNewlineSymbolMarks, " ");
logger.debug("decorateValue with input: {} output: {} ", value, decoratedValue);
}
rowBuilder.append(decoratedValue).append(delimiter);
}
if (rowBuilder.length() > 0 && rowBuilder.toString().endsWith(delimiter)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.linkis.storage.domain.Dolphin.LINKIS_NULL;

public enum DataType {
NullType("void", 0),
StringType("string", 12),
Expand Down Expand Up @@ -63,7 +65,8 @@ public enum DataType {
public static final String LOWCASE_NULL_VALUE = "null";

// TODO Change to fine-grained regular expressions(改为精细化正则表达式)
public static final Pattern DECIMAL_REGEX = Pattern.compile("^decimal\\(\\d*\\,\\d*\\)");
public static final Pattern DECIMAL_REGEX =
Pattern.compile("^decimal\\(\\s*\\d*\\s*,\\s*\\d*\\s*\\)");

public static final Pattern SHORT_REGEX = Pattern.compile("^short.*");
public static final Pattern INT_REGEX = Pattern.compile("^int.*");
Expand Down Expand Up @@ -130,7 +133,11 @@ public static DataType toDataType(String dataType) {
}

public static Object toValue(DataType dataType, String value) {

Object result = null;
if (isLinkisNull(value)) {
return result;
}
try {
switch (dataType) {
case NullType:
Expand Down Expand Up @@ -187,12 +194,16 @@ public static Object toValue(DataType dataType, String value) {
result = value;
}
} catch (Exception e) {
logger.debug("Failed to " + value + " switch to dataType:", e);
logger.debug("Failed to {} switch to dataType:", value, e);
result = value;
}
return result;
}

public static boolean isLinkisNull(String value) {
return value == null || value.equals(LINKIS_NULL);
}

public static boolean isNull(String value) {
return value == null || value.equals(NULL_VALUE) || value.trim().equals("");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ public class Dolphin {
public static final String NULL = "NULL";
public static final byte[] NULL_BYTES = "NULL".getBytes(Charset.forName("utf-8"));

public static final String LINKIS_NULL = "LINKIS_NULL";
public static final byte[] LINKIS_NULL_BYTES = LINKIS_NULL.getBytes(Charset.forName("utf-8"));

public static final int INT_LEN = 10;

public static final int FILE_EMPTY = 31;
Expand All @@ -79,6 +82,14 @@ public static String getString(byte[] bytes, int start, int len) {
return new String(bytes, start, len, Dolphin.CHAR_SET);
}

public static String toStringValue(String value) {
if (LINKIS_NULL.equals(value)) {
return NULL;
} else {
return value;
}
}

/**
* Read an integer value that converts the array to a byte of length 10 bytes
* 读取整数值,该值为将数组转换为10字节长度的byte
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
public enum StorageErrorCode {

/** */
FS_NOT_INIT(53001, "please init first(请先初始化)");
FS_NOT_INIT(53001, "please init first"),

INCONSISTENT_DATA(53001, "Inconsistent row data read,read %s,need rowLen %s"),
FS_OOM(53002, "OOM occurred while reading the file");

StorageErrorCode(int errorCode, String message) {
this.code = errorCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public static ResultSetReader getTableResultReader(String res) {
}

Fs fs = FSFactory.getFs(resPath);
logger.info("Try to init Fs with path:{}", resPath.getPath());
try {
fs.init(null);
InputStream read = fs.read(resPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -53,7 +52,6 @@ public class StorageResultSetReader<K extends MetaData, V extends Record>
private Fs fs;

private final int READ_CACHE = 1024;
private final byte[] bytes = new byte[READ_CACHE];

public StorageResultSetReader(ResultSet<K, V> resultSet, InputStream inputStream) {
super(resultSet, inputStream);
Expand Down Expand Up @@ -84,21 +82,18 @@ public byte[] readLine() {
return null;
}

byte[] rowBuffer = new byte[0];
int len = 0;

while (rowLen > 0 && len >= 0) {
if (rowLen > READ_CACHE) {
len = StorageUtils.readBytes(inputStream, bytes, READ_CACHE);
} else {
len = StorageUtils.readBytes(inputStream, bytes, rowLen);
}

if (len > 0) {
rowLen -= len;
rowBuffer = Arrays.copyOf(rowBuffer, rowBuffer.length + len);
System.arraycopy(bytes, 0, rowBuffer, rowBuffer.length - len, len);
}
byte[] rowBuffer = null;
try {
rowBuffer = new byte[rowLen];
len = StorageUtils.readBytes(inputStream, rowBuffer, READ_CACHE);
} catch (OutOfMemoryError error) {
logger.error("Result set read oom, read size {} Byte", rowLen);
throw new RuntimeException(error);
}
if (len != rowLen) {
throw new RuntimeException(
"Can't get the value of the field, maybe the IO stream has been read or has been closed!(拿不到字段的值,也许IO流已读取完毕或已被关闭!)");
}
rowCount++;
return rowBuffer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@

import org.apache.linkis.common.io.Record;
import org.apache.linkis.storage.resultset.ResultRecord;
import org.apache.linkis.storage.utils.StorageUtils;

import java.util.Arrays;

public class TableRecord implements ResultRecord {

Expand All @@ -35,10 +32,4 @@ public TableRecord(Object[] row) {
public Record cloneRecord() {
return new TableRecord(row.clone());
}

public String[] tableRecordToString(String nullValue) {
return Arrays.stream(row)
.map(col -> StorageUtils.colToString(col, nullValue))
.toArray(String[]::new);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ public TableMetaData createMetaData(byte[] bytes) {
List<Column> columns = new ArrayList<>();
for (int i = 0; i < colArray.length; i += 3) {
int len = Integer.parseInt(colArray[i]);
String colName = Dolphin.getString(bytes, index, len);
String colName = Dolphin.toStringValue(Dolphin.getString(bytes, index, len));
index += len;
len = Integer.parseInt(colArray[i + 1]);
String colType = Dolphin.getString(bytes, index, len);
String colType = Dolphin.toStringValue(Dolphin.getString(bytes, index, len));
index += len;
len = Integer.parseInt(colArray[i + 2]);
String colComment = Dolphin.getString(bytes, index, len);
String colComment = Dolphin.toStringValue(Dolphin.getString(bytes, index, len));
index += len;
columns.add(new Column(colName, DataType.toDataType(colType), colComment));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ private byte[] lineToBytes(Object[] line) {
int colByteLen = 0;
int length = 0;
for (Object data : line) {
byte[] bytes = data == null ? Dolphin.NULL_BYTES : Dolphin.getBytes(data);
byte[] bytes = data == null ? Dolphin.LINKIS_NULL_BYTES : Dolphin.getBytes(data);
dataBytes.add(bytes);
byte[] colBytes = Dolphin.getBytes(bytes.length);
colIndex.add(colBytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ static FileSource create(FsPath fsPath, InputStream is) {
}

static FileSplit createResultSetFileSplit(FsPath fsPath, InputStream is) {
logger.info("try create result set file split with path:{}", fsPath.getPath());
ResultSet resultset = ResultSetFactory.getInstance().getResultSetByPath(fsPath);
ResultSetReader resultsetReader = ResultSetReaderFactory.getResultSetReader(resultset, is);
return new FileSplit(resultsetReader, resultset.resultSetType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.linkis.storage.source;

import org.apache.linkis.storage.domain.Dolphin;
import org.apache.linkis.storage.resultset.table.TableRecord;
import org.apache.linkis.storage.utils.StorageUtils;

Expand All @@ -36,9 +37,18 @@ record -> {
.map(
r -> {
if (r == null || r.equals("NULL")) {
return nullValue;
if (nullValue.equals(Dolphin.LINKIS_NULL)) {
return r;
} else {
return nullValue;
}
} else if (r.equals("")) {
return getParams().getOrDefault("nullValue", "");
String emptyValue = getParams().getOrDefault("nullValue", "");
if (emptyValue.equals(Dolphin.LINKIS_NULL)) {
return "";
} else {
return nullValue;
}
} else if (r instanceof Double) {
return StorageUtils.doubleToString((Double) r);
} else {
Expand Down

0 comments on commit a77aab0

Please sign in to comment.