Skip to content

Commit

Permalink
Merge pull request #444 from data-integrations/fix/Cloudsql_PostgresSQL
Browse files Browse the repository at this point in the history
Changes in Postgres_Precision_Fix - Sync up with Releases
  • Loading branch information
itsankit-google authored Sep 20, 2023
2 parents 67bf43b + 09a5ace commit 9c006c6
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ Please, refer to PostgreSQL data types documentation to figure out proper format
| double precision | double | |
| integer | int | |
| numeric(precision, scale)/decimal(precision, scale) | decimal | |
| numeric(with 0 precision) | string | |
| real | float | |
| smallint | int | |
| text | string | |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ Please, refer to PostgreSQL data types documentation to figure out proper format
| double precision | double | |
| integer | int | |
| numeric(precision, scale)/decimal(precision, scale) | decimal | |
| numeric(with 0 precision) | string | |
| real | float | |
| smallint | int | |
| smallserial | int | |
Expand Down
1 change: 1 addition & 0 deletions postgresql-plugin/docs/Postgres-batchsink.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ Please, refer to PostgreSQL data types documentation to figure out proper format
| double precision | double | |
| integer | int | |
| numeric(precision, scale)/decimal(precision, scale) | decimal | |
| numeric(with 0 precision) | string | |
| real | float | |
| smallint | int | |
| text | string | |
Expand Down
1 change: 1 addition & 0 deletions postgresql-plugin/docs/Postgres-batchsource.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ Please, refer to PostgreSQL data types documentation to figure out proper format
| double precision | double | |
| integer | int | |
| numeric(precision, scale)/decimal(precision, scale) | decimal | |
| numeric(with 0 precision) | string | |
| real | float | |
| smallint | int | |
| smallserial | int | |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,46 +63,50 @@ protected void handleField(ResultSet resultSet, StructuredRecord.Builder recordB
String columnTypeName = metadata.getColumnTypeName(columnIndex);
if (isUseSchema(metadata, columnIndex)) {
setFieldAccordingToSchema(resultSet, recordBuilder, field, columnIndex);
} else if (sqlType == Types.TIMESTAMP && columnTypeName.equalsIgnoreCase("timestamp")) {
return;
}
if (sqlType == Types.TIMESTAMP && columnTypeName.equalsIgnoreCase("timestamp")) {
Timestamp timestamp = resultSet.getTimestamp(columnIndex, DBUtils.PURE_GREGORIAN_CALENDAR);
if (timestamp != null) {
ZonedDateTime zonedDateTime = OffsetDateTime.of(timestamp.toLocalDateTime(), OffsetDateTime.now().getOffset())
.atZoneSameInstant(ZoneId.of("UTC"));
.atZoneSameInstant(ZoneId.of("UTC"));
Schema nonNullableSchema = field.getSchema().isNullable() ?
field.getSchema().getNonNullable() : field.getSchema();
field.getSchema().getNonNullable() : field.getSchema();
setZonedDateTimeBasedOnOuputSchema(recordBuilder, nonNullableSchema.getLogicalType(),
field.getName(), zonedDateTime);
field.getName(), zonedDateTime);
} else {
recordBuilder.set(field.getName(), null);
}
} else if (sqlType == Types.TIMESTAMP && columnTypeName.equalsIgnoreCase("timestamptz")) {
return;
}
if (sqlType == Types.TIMESTAMP && columnTypeName.equalsIgnoreCase("timestamptz")) {
OffsetDateTime timestamp = resultSet.getObject(columnIndex, OffsetDateTime.class);
if (timestamp != null) {
recordBuilder.setTimestamp(field.getName(), timestamp.atZoneSameInstant(ZoneId.of("UTC")));
} else {
recordBuilder.set(field.getName(), null);
}
} else {
int columnType = metadata.getColumnType(columnIndex);
if (columnType == Types.NUMERIC) {
Schema nonNullableSchema = field.getSchema().isNullable() ?
field.getSchema().getNonNullable() : field.getSchema();
int precision = metadata.getPrecision(columnIndex);
if (precision == 0 && Schema.Type.STRING.equals(nonNullableSchema.getType())) {
// When output schema is set to String for precision less numbers
recordBuilder.set(field.getName(), resultSet.getString(columnIndex));
} else if (Schema.LogicalType.DECIMAL.equals(nonNullableSchema.getLogicalType())) {
BigDecimal orgValue = resultSet.getBigDecimal(columnIndex);
if (orgValue != null) {
BigDecimal decimalValue = new BigDecimal(orgValue.toPlainString())
.setScale(nonNullableSchema.getScale(), RoundingMode.HALF_EVEN);
recordBuilder.setDecimal(field.getName(), decimalValue);
}
}
return;
}
int columnType = metadata.getColumnType(columnIndex);
if (columnType == Types.NUMERIC) {
Schema nonNullableSchema = field.getSchema().isNullable() ?
field.getSchema().getNonNullable() : field.getSchema();
int precision = metadata.getPrecision(columnIndex);
if (precision == 0 && Schema.Type.STRING.equals(nonNullableSchema.getType())) {
// When output schema is set to String for precision less numbers
recordBuilder.set(field.getName(), resultSet.getString(columnIndex));
return;
}
BigDecimal orgValue = resultSet.getBigDecimal(columnIndex);
if (Schema.LogicalType.DECIMAL.equals(nonNullableSchema.getLogicalType()) && orgValue != null) {
BigDecimal decimalValue = new BigDecimal(orgValue.toPlainString())
.setScale(nonNullableSchema.getScale(), RoundingMode.HALF_EVEN);
recordBuilder.setDecimal(field.getName(), decimalValue);
return;
}
setField(resultSet, recordBuilder, field, columnIndex, sqlType, sqlPrecision, sqlScale);
}
setField(resultSet, recordBuilder, field, columnIndex, sqlType, sqlPrecision, sqlScale);
}

private void setZonedDateTimeBasedOnOuputSchema(StructuredRecord.Builder recordBuilder,
Expand All @@ -121,12 +125,8 @@ private void setZonedDateTimeBasedOnOuputSchema(StructuredRecord.Builder recordB
private static boolean isUseSchema(ResultSetMetaData metadata, int columnIndex) throws SQLException {
String columnTypeName = metadata.getColumnTypeName(columnIndex);
// If the column Type Name is present in the String mapped PostgreSQL types then return true.
if (PostgresSchemaReader.STRING_MAPPED_POSTGRES_TYPES_NAMES.contains(columnTypeName)
|| PostgresSchemaReader.STRING_MAPPED_POSTGRES_TYPES.contains(metadata.getColumnType(columnIndex))) {
return true;
}

return false;
return (PostgresSchemaReader.STRING_MAPPED_POSTGRES_TYPES_NAMES.contains(columnTypeName)
|| PostgresSchemaReader.STRING_MAPPED_POSTGRES_TYPES.contains(metadata.getColumnType(columnIndex)));
}

private Object createPGobject(String type, String value, ClassLoader classLoader) throws SQLException {
Expand All @@ -152,16 +152,14 @@ protected void writeNonNullToDB(PreparedStatement stmt, Schema fieldSchema,
if (PostgresSchemaReader.STRING_MAPPED_POSTGRES_TYPES_NAMES.contains(columnType.getTypeName()) ||
PostgresSchemaReader.STRING_MAPPED_POSTGRES_TYPES.contains(columnType.getType())) {
stmt.setObject(sqlIndex, createPGobject(columnType.getTypeName(),
record.get(fieldName),
stmt.getClass().getClassLoader()));
record.get(fieldName),
stmt.getClass().getClassLoader()));
return;
}
if (columnType.getType() == Types.NUMERIC && record.get(fieldName) != null &&
fieldSchema.getType() == Schema.Type.STRING) {
stmt.setBigDecimal(sqlIndex, new BigDecimal((String) record.get(fieldName)));
return;
} else if (columnType.getType() == Types.NUMERIC) {
if (record.get(fieldName) != null) {
if (fieldSchema.getType() == Schema.Type.STRING) {
stmt.setBigDecimal(sqlIndex, new BigDecimal((String) record.get(fieldName)));
return;
}
}
}

super.writeNonNullToDB(stmt, fieldSchema, fieldName, fieldIndex);
Expand Down

0 comments on commit 9c006c6

Please sign in to comment.