diff --git a/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/io/FsPath.java b/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/io/FsPath.java index b5e9af86ff..a1c0839b2f 100644 --- a/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/io/FsPath.java +++ b/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/io/FsPath.java @@ -24,6 +24,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.nio.file.FileSystems; +import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.attribute.PosixFilePermission; import java.nio.file.attribute.PosixFilePermissions; diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/conf/LinkisStorageConf.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/conf/LinkisStorageConf.java index 66fafeacda..74950c15fe 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/conf/LinkisStorageConf.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/conf/LinkisStorageConf.java @@ -39,7 +39,7 @@ public class LinkisStorageConf { public static final String FILE_TYPE = CommonVars.apply( "wds.linkis.storage.file.type", - "dolphin,sql,scala,py,hql,python,out,log,text,sh,jdbc,ngql,psql,fql,tsql") + "dolphin,sql,scala,py,hql,python,out,log,text,txt,sh,jdbc,ngql,psql,fql,tsql") .getValue(); private static volatile String[] fileTypeArr = null; diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/csv/StorageCSVWriter.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/csv/StorageCSVWriter.java index ba4d877f3a..d98be40337 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/csv/StorageCSVWriter.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/csv/StorageCSVWriter.java @@ -91,14 +91,17 @@ public void addMetaData(MetaData metaData) throws IOException { private String compact(String[] row) { String quotationMarks = "\""; + String dealNewlineSymbolMarks = "\n"; StringBuilder rowBuilder = new StringBuilder(); for (String value : row) { - String decoratedValue = - StringUtils.isBlank(value) - ? value - : quoteRetouchEnable - ? quotationMarks + value.replaceAll(quotationMarks, "") + quotationMarks - : value; + String decoratedValue = value; + if (StringUtils.isNotBlank(value)) { + if (quoteRetouchEnable) { + decoratedValue = quotationMarks + value.replaceAll(quotationMarks, "") + quotationMarks; + } + decoratedValue = decoratedValue.replaceAll(dealNewlineSymbolMarks, " "); + logger.debug("decorateValue with input: {} output: {} ", value, decoratedValue); + } rowBuilder.append(decoratedValue).append(delimiter); } if (rowBuilder.length() > 0 && rowBuilder.toString().endsWith(delimiter)) { diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/domain/DataType.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/domain/DataType.java index ad9e0ee882..6808f693ec 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/domain/DataType.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/domain/DataType.java @@ -26,6 +26,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.linkis.storage.domain.Dolphin.LINKIS_NULL; + public enum DataType { NullType("void", 0), StringType("string", 12), @@ -63,7 +65,8 @@ public enum DataType { public static final String LOWCASE_NULL_VALUE = "null"; // TODO Change to fine-grained regular expressions(改为精细化正则表达式) - public static final Pattern DECIMAL_REGEX = Pattern.compile("^decimal\\(\\d*\\,\\d*\\)"); + public static final Pattern DECIMAL_REGEX = + Pattern.compile("^decimal\\(\\s*\\d*\\s*,\\s*\\d*\\s*\\)"); public static final Pattern SHORT_REGEX = Pattern.compile("^short.*"); public static final Pattern INT_REGEX = Pattern.compile("^int.*"); @@ -130,7 +133,11 @@ public static DataType toDataType(String dataType) { } public static Object toValue(DataType dataType, String value) { + Object result = null; + if (isLinkisNull(value)) { + return result; + } try { switch (dataType) { case NullType: @@ -187,12 +194,16 @@ public static Object toValue(DataType dataType, String value) { result = value; } } catch (Exception e) { - logger.debug("Failed to " + value + " switch to dataType:", e); + logger.debug("Failed to {} switch to dataType:", value, e); result = value; } return result; } + public static boolean isLinkisNull(String value) { + return value == null || value.equals(LINKIS_NULL); + } + public static boolean isNull(String value) { return value == null || value.equals(NULL_VALUE) || value.trim().equals(""); } diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/domain/Dolphin.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/domain/Dolphin.java index b6badd2849..35c71295e4 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/domain/Dolphin.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/domain/Dolphin.java @@ -59,6 +59,9 @@ public class Dolphin { public static final String NULL = "NULL"; public static final byte[] NULL_BYTES = "NULL".getBytes(Charset.forName("utf-8")); + public static final String LINKIS_NULL = "LINKIS_NULL"; + public static final byte[] LINKIS_NULL_BYTES = LINKIS_NULL.getBytes(Charset.forName("utf-8")); + public static final int INT_LEN = 10; public static final int FILE_EMPTY = 31; @@ -79,6 +82,14 @@ public static String getString(byte[] bytes, int start, int len) { return new String(bytes, start, len, Dolphin.CHAR_SET); } + public static String toStringValue(String value) { + if (LINKIS_NULL.equals(value)) { + return NULL; + } else { + return value; + } + } + /** * Read an integer value that converts the array to a byte of length 10 bytes * 读取整数值,该值为将数组转换为10字节长度的byte diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/exception/StorageErrorCode.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/exception/StorageErrorCode.java index 3c82ceb523..fad0d83a12 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/exception/StorageErrorCode.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/exception/StorageErrorCode.java @@ -20,7 +20,10 @@ public enum StorageErrorCode { /** */ - FS_NOT_INIT(53001, "please init first(请先初始化)"); + FS_NOT_INIT(53001, "please init first"), + + INCONSISTENT_DATA(53001, "Inconsistent row data read,read %s,need rowLen %s"), + FS_OOM(53002, "OOM occurred while reading the file"); StorageErrorCode(int errorCode, String message) { this.code = errorCode; diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetReaderFactory.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetReaderFactory.java index 749ec9a24e..3047b715a0 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetReaderFactory.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetReaderFactory.java @@ -99,6 +99,7 @@ public static ResultSetReader getTableResultReader(String res) { } Fs fs = FSFactory.getFs(resPath); + logger.info("Try to init Fs with path:{}", resPath.getPath()); try { fs.init(null); InputStream read = fs.read(resPath); diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSetReader.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSetReader.java index 35f7483c82..23d65e0614 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSetReader.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSetReader.java @@ -33,7 +33,6 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; -import java.util.Arrays; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,7 +52,6 @@ public class StorageResultSetReader private Fs fs; private final int READ_CACHE = 1024; - private final byte[] bytes = new byte[READ_CACHE]; public StorageResultSetReader(ResultSet resultSet, InputStream inputStream) { super(resultSet, inputStream); @@ -84,21 +82,18 @@ public byte[] readLine() { return null; } - byte[] rowBuffer = new byte[0]; int len = 0; - - while (rowLen > 0 && len >= 0) { - if (rowLen > READ_CACHE) { - len = StorageUtils.readBytes(inputStream, bytes, READ_CACHE); - } else { - len = StorageUtils.readBytes(inputStream, bytes, rowLen); - } - - if (len > 0) { - rowLen -= len; - rowBuffer = Arrays.copyOf(rowBuffer, rowBuffer.length + len); - System.arraycopy(bytes, 0, rowBuffer, rowBuffer.length - len, len); - } + byte[] rowBuffer = null; + try { + rowBuffer = new byte[rowLen]; + len = StorageUtils.readBytes(inputStream, rowBuffer, READ_CACHE); + } catch (OutOfMemoryError error) { + logger.error("Result set read oom, read size {} Byte", rowLen); + throw new RuntimeException(error); + } + if (len != rowLen) { + throw new RuntimeException( + "Can't get the value of the field, maybe the IO stream has been read or has been closed!(拿不到字段的值,也许IO流已读取完毕或已被关闭!)"); } rowCount++; return rowBuffer; diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/table/TableRecord.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/table/TableRecord.java index f13d42b0b6..c8270714ba 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/table/TableRecord.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/table/TableRecord.java @@ -19,9 +19,6 @@ import org.apache.linkis.common.io.Record; import org.apache.linkis.storage.resultset.ResultRecord; -import org.apache.linkis.storage.utils.StorageUtils; - -import java.util.Arrays; public class TableRecord implements ResultRecord { @@ -35,10 +32,4 @@ public TableRecord(Object[] row) { public Record cloneRecord() { return new TableRecord(row.clone()); } - - public String[] tableRecordToString(String nullValue) { - return Arrays.stream(row) - .map(col -> StorageUtils.colToString(col, nullValue)) - .toArray(String[]::new); - } } diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/table/TableResultDeserializer.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/table/TableResultDeserializer.java index 7b7838fd25..7e1d6c35fe 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/table/TableResultDeserializer.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/table/TableResultDeserializer.java @@ -52,13 +52,13 @@ public TableMetaData createMetaData(byte[] bytes) { List columns = new ArrayList<>(); for (int i = 0; i < colArray.length; i += 3) { int len = Integer.parseInt(colArray[i]); - String colName = Dolphin.getString(bytes, index, len); + String colName = Dolphin.toStringValue(Dolphin.getString(bytes, index, len)); index += len; len = Integer.parseInt(colArray[i + 1]); - String colType = Dolphin.getString(bytes, index, len); + String colType = Dolphin.toStringValue(Dolphin.getString(bytes, index, len)); index += len; len = Integer.parseInt(colArray[i + 2]); - String colComment = Dolphin.getString(bytes, index, len); + String colComment = Dolphin.toStringValue(Dolphin.getString(bytes, index, len)); index += len; columns.add(new Column(colName, DataType.toDataType(colType), colComment)); } diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/table/TableResultSerializer.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/table/TableResultSerializer.java index 6abe4c56d5..5f40aa33f3 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/table/TableResultSerializer.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/table/TableResultSerializer.java @@ -61,7 +61,7 @@ private byte[] lineToBytes(Object[] line) { int colByteLen = 0; int length = 0; for (Object data : line) { - byte[] bytes = data == null ? Dolphin.NULL_BYTES : Dolphin.getBytes(data); + byte[] bytes = data == null ? Dolphin.LINKIS_NULL_BYTES : Dolphin.getBytes(data); dataBytes.add(bytes); byte[] colBytes = Dolphin.getBytes(bytes.length); colIndex.add(colBytes); diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSource.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSource.java index cee72dfcd7..0ed650186d 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSource.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSource.java @@ -135,6 +135,7 @@ static FileSource create(FsPath fsPath, InputStream is) { } static FileSplit createResultSetFileSplit(FsPath fsPath, InputStream is) { + logger.info("try create result set file split with path:{}", fsPath.getPath()); ResultSet resultset = ResultSetFactory.getInstance().getResultSetByPath(fsPath); ResultSetReader resultsetReader = ResultSetReaderFactory.getResultSetReader(resultset, is); return new FileSplit(resultsetReader, resultset.resultSetType()); diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/ResultsetFileSource.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/ResultsetFileSource.java index d8562b7c5b..fb064a8f4f 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/ResultsetFileSource.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/ResultsetFileSource.java @@ -17,6 +17,7 @@ package org.apache.linkis.storage.source; +import org.apache.linkis.storage.domain.Dolphin; import org.apache.linkis.storage.resultset.table.TableRecord; import org.apache.linkis.storage.utils.StorageUtils; @@ -36,9 +37,18 @@ record -> { .map( r -> { if (r == null || r.equals("NULL")) { - return nullValue; + if (nullValue.equals(Dolphin.LINKIS_NULL)) { + return r; + } else { + return nullValue; + } } else if (r.equals("")) { - return getParams().getOrDefault("nullValue", ""); + String emptyValue = getParams().getOrDefault("nullValue", ""); + if (emptyValue.equals(Dolphin.LINKIS_NULL)) { + return ""; + } else { + return nullValue; + } } else if (r instanceof Double) { return StorageUtils.doubleToString((Double) r); } else {