Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PDI-20182] Add LONG VARCHAR and LONG VARBINARY to Vertica bulk loade… #94

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,10 @@ private ColumnSpec getColumnSpecFromField( ValueMetaInterface inputValueMeta, Va
} else if ( targetColumnTypeName.equals( "NUMERIC" ) ) {
return new ColumnSpec( ColumnSpec.PrecisionScaleWidthType.NUMERIC, targetValueMeta.getLength(), targetValueMeta
.getPrecision() );
} else if ( targetColumnTypeName.equals( "LONG VARCHAR" ) ) {
return new ColumnSpec( ColumnSpec.VariableWidthType.LONG_VARCHAR, targetValueMeta.getLength() );
} else if ( targetColumnTypeName.equals( "LONG VARBINARY" ) ) {
return new ColumnSpec( ColumnSpec.VariableWidthType.LONG_VARBINARY, targetValueMeta.getLength() );
}
throw new IllegalArgumentException( "Column type " + targetColumnTypeName + " not supported." ); //$NON-NLS-1$
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ private ConstantWidthType( ColumnType type, int bytes ) {
}

public enum VariableWidthType {
VARCHAR( ColumnType.VARCHAR ), VARBINARY( ColumnType.VARBINARY );
VARCHAR( ColumnType.VARCHAR ), VARBINARY( ColumnType.VARBINARY ), LONG_VARCHAR( ColumnType.LONG_VARCHAR ), LONG_VARBINARY( ColumnType.LONG_VARBINARY );

private final ColumnType type;
private final int bytes = -1;
Expand Down Expand Up @@ -130,11 +130,11 @@ public ColumnSpec( ConstantWidthType constantWidthType ) {
this.maxLength = constantWidthType.bytes;
}

public ColumnSpec( VariableWidthType variableWidthType, int maxlenght ) {
public ColumnSpec( VariableWidthType variableWidthType, int maxlength ) {
this.type = variableWidthType.type;
this.bytes = variableWidthType.bytes;
this.scale = 0;
this.maxLength = maxlenght;
this.maxLength = maxlength;
}

public void setCharBuffer( CharBuffer charBuffer ) {
Expand Down Expand Up @@ -271,6 +271,7 @@ public void encode( ValueMetaInterface valueMeta, Object value ) throws Characte
this.mainBuffer.putLong( TimeUnit.MILLISECONDS.toMicros( milliSeconds ) );
break;
case VARBINARY:
case LONG_VARBINARY:
sizePosition = this.mainBuffer.position();
this.mainBuffer.putInt( 0 );
prevPosition = this.mainBuffer.position();
Expand All @@ -281,6 +282,7 @@ public void encode( ValueMetaInterface valueMeta, Object value ) throws Characte
case NUMERIC:
// Numeric is encoded as VARCHAR. COPY statement uses is as a FILLER column for Vertica itself
// to convert into internal NUMERIC data format.
case LONG_VARCHAR:
case VARCHAR:
this.charBuffer.clear();
this.charEncoder.reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@
package org.pentaho.di.verticabulkload.nativebinary;

public enum ColumnType {
INTEGER, BOOLEAN, FLOAT, CHAR, VARCHAR, DATE, TIME, TIMETZ, TIMESTAMP, TIMESTAMPTZ, INTERVAL, BINARY, VARBINARY, NUMERIC
INTEGER, BOOLEAN, FLOAT, CHAR, VARCHAR, DATE, TIME, TIMETZ, TIMESTAMP, TIMESTAMPTZ, INTERVAL, BINARY, VARBINARY, NUMERIC, LONG_VARBINARY, LONG_VARCHAR
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ public class StreamEncoder {
private static final byte BYTE_CR = (byte) 0x0D;

private static final int MAX_CHAR_LENGTH = 65000;

//https://docs.vertica.com/24.1.x/en/sql-reference/data-types/long-data-types/
private static final int MAX_LONG_VARCHAR_LENGTH = 32000000;

private static final int MAXIMUM_BUFFER_SIZE = Integer.MAX_VALUE - 8;

Expand Down Expand Up @@ -73,7 +76,9 @@ public StreamEncoder( List<ColumnSpec> columns, PipedInputStream inputStream ) t

this.charset = Charset.forName( "UTF-8" );

CharBuffer charBuffer = CharBuffer.allocate( MAX_CHAR_LENGTH );
boolean usingLongVarchar = columns.stream().anyMatch( cs -> ColumnType.LONG_VARCHAR.equals( cs.type ) );

CharBuffer charBuffer = CharBuffer.allocate( usingLongVarchar ? MAX_LONG_VARCHAR_LENGTH : MAX_CHAR_LENGTH );
CharsetEncoder charEncoder = charset.newEncoder();

this.pipedOutputStream = new PipedOutputStream( inputStream );
Expand All @@ -83,9 +88,11 @@ public StreamEncoder( List<ColumnSpec> columns, PipedInputStream inputStream ) t

for ( ColumnSpec column : columns ) {
switch ( column.type ) {
case LONG_VARBINARY:
case VARBINARY:
this.rowMaxSize += 4; // consider data size bytes for variable length field
break;
case LONG_VARCHAR:
case VARCHAR:
this.rowMaxSize += 4; // consider data size bytes for variable length field
case NUMERIC:
Expand Down