From 09a5ace2b883058eb98be3227aa1581a78030c3a Mon Sep 17 00:00:00 2001 From: Shubhangi-cs Date: Mon, 4 Sep 2023 13:42:01 +0530 Subject: [PATCH] Changes in Postgres_Precision_Fix - Sync up releases --- .../docs/CloudSQLPostgreSQL-batchsink.md | 1 + .../docs/CloudSQLPostgreSQL-batchsource.md | 1 + postgresql-plugin/docs/Postgres-batchsink.md | 1 + .../docs/Postgres-batchsource.md | 1 + .../plugin/postgres/PostgresDBRecord.java | 74 +++++++++---------- 5 files changed, 40 insertions(+), 38 deletions(-) diff --git a/cloudsql-postgresql-plugin/docs/CloudSQLPostgreSQL-batchsink.md b/cloudsql-postgresql-plugin/docs/CloudSQLPostgreSQL-batchsink.md index 4f2107e96..338a67c9e 100644 --- a/cloudsql-postgresql-plugin/docs/CloudSQLPostgreSQL-batchsink.md +++ b/cloudsql-postgresql-plugin/docs/CloudSQLPostgreSQL-batchsink.md @@ -150,6 +150,7 @@ Please, refer to PostgreSQL data types documentation to figure out proper format | double precision | double | | | integer | int | | | numeric(precision, scale)/decimal(precision, scale) | decimal | | +| numeric(with 0 precision) | string | | | real | float | | | smallint | int | | | text | string | | diff --git a/cloudsql-postgresql-plugin/docs/CloudSQLPostgreSQL-batchsource.md b/cloudsql-postgresql-plugin/docs/CloudSQLPostgreSQL-batchsource.md index c13fb06fe..8d9ad7171 100644 --- a/cloudsql-postgresql-plugin/docs/CloudSQLPostgreSQL-batchsource.md +++ b/cloudsql-postgresql-plugin/docs/CloudSQLPostgreSQL-batchsource.md @@ -174,6 +174,7 @@ Please, refer to PostgreSQL data types documentation to figure out proper format | double precision | double | | | integer | int | | | numeric(precision, scale)/decimal(precision, scale) | decimal | | +| numeric(with 0 precision) | string | | | real | float | | | smallint | int | | | smallserial | int | | diff --git a/postgresql-plugin/docs/Postgres-batchsink.md b/postgresql-plugin/docs/Postgres-batchsink.md index 9e1e8404f..b8a996463 100644 --- a/postgresql-plugin/docs/Postgres-batchsink.md +++ b/postgresql-plugin/docs/Postgres-batchsink.md @@ -79,6 +79,7 @@ Please, refer to PostgreSQL data types documentation to figure out proper format | double precision | double | | | integer | int | | | numeric(precision, scale)/decimal(precision, scale) | decimal | | +| numeric(with 0 precision) | string | | | real | float | | | smallint | int | | | text | string | | diff --git a/postgresql-plugin/docs/Postgres-batchsource.md b/postgresql-plugin/docs/Postgres-batchsource.md index 8bd018baf..af359022d 100644 --- a/postgresql-plugin/docs/Postgres-batchsource.md +++ b/postgresql-plugin/docs/Postgres-batchsource.md @@ -110,6 +110,7 @@ Please, refer to PostgreSQL data types documentation to figure out proper format | double precision | double | | | integer | int | | | numeric(precision, scale)/decimal(precision, scale) | decimal | | +| numeric(with 0 precision) | string | | | real | float | | | smallint | int | | | smallserial | int | | diff --git a/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresDBRecord.java b/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresDBRecord.java index 8524aae23..070f905da 100644 --- a/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresDBRecord.java +++ b/postgresql-plugin/src/main/java/io/cdap/plugin/postgres/PostgresDBRecord.java @@ -63,46 +63,50 @@ protected void handleField(ResultSet resultSet, StructuredRecord.Builder recordB String columnTypeName = metadata.getColumnTypeName(columnIndex); if (isUseSchema(metadata, columnIndex)) { setFieldAccordingToSchema(resultSet, recordBuilder, field, columnIndex); - } else if (sqlType == Types.TIMESTAMP && columnTypeName.equalsIgnoreCase("timestamp")) { + return; + } + if (sqlType == Types.TIMESTAMP && columnTypeName.equalsIgnoreCase("timestamp")) { Timestamp timestamp = resultSet.getTimestamp(columnIndex, DBUtils.PURE_GREGORIAN_CALENDAR); if (timestamp != null) { ZonedDateTime zonedDateTime = OffsetDateTime.of(timestamp.toLocalDateTime(), OffsetDateTime.now().getOffset()) - .atZoneSameInstant(ZoneId.of("UTC")); + .atZoneSameInstant(ZoneId.of("UTC")); Schema nonNullableSchema = field.getSchema().isNullable() ? - field.getSchema().getNonNullable() : field.getSchema(); + field.getSchema().getNonNullable() : field.getSchema(); setZonedDateTimeBasedOnOuputSchema(recordBuilder, nonNullableSchema.getLogicalType(), - field.getName(), zonedDateTime); + field.getName(), zonedDateTime); } else { recordBuilder.set(field.getName(), null); } - } else if (sqlType == Types.TIMESTAMP && columnTypeName.equalsIgnoreCase("timestamptz")) { + return; + } + if (sqlType == Types.TIMESTAMP && columnTypeName.equalsIgnoreCase("timestamptz")) { OffsetDateTime timestamp = resultSet.getObject(columnIndex, OffsetDateTime.class); if (timestamp != null) { recordBuilder.setTimestamp(field.getName(), timestamp.atZoneSameInstant(ZoneId.of("UTC"))); } else { recordBuilder.set(field.getName(), null); } - } else { - int columnType = metadata.getColumnType(columnIndex); - if (columnType == Types.NUMERIC) { - Schema nonNullableSchema = field.getSchema().isNullable() ? - field.getSchema().getNonNullable() : field.getSchema(); - int precision = metadata.getPrecision(columnIndex); - if (precision == 0 && Schema.Type.STRING.equals(nonNullableSchema.getType())) { - // When output schema is set to String for precision less numbers - recordBuilder.set(field.getName(), resultSet.getString(columnIndex)); - } else if (Schema.LogicalType.DECIMAL.equals(nonNullableSchema.getLogicalType())) { - BigDecimal orgValue = resultSet.getBigDecimal(columnIndex); - if (orgValue != null) { - BigDecimal decimalValue = new BigDecimal(orgValue.toPlainString()) - .setScale(nonNullableSchema.getScale(), RoundingMode.HALF_EVEN); - recordBuilder.setDecimal(field.getName(), decimalValue); - } - } + return; + } + int columnType = metadata.getColumnType(columnIndex); + if (columnType == Types.NUMERIC) { + Schema nonNullableSchema = field.getSchema().isNullable() ? + field.getSchema().getNonNullable() : field.getSchema(); + int precision = metadata.getPrecision(columnIndex); + if (precision == 0 && Schema.Type.STRING.equals(nonNullableSchema.getType())) { + // When output schema is set to String for precision less numbers + recordBuilder.set(field.getName(), resultSet.getString(columnIndex)); + return; + } + BigDecimal orgValue = resultSet.getBigDecimal(columnIndex); + if (Schema.LogicalType.DECIMAL.equals(nonNullableSchema.getLogicalType()) && orgValue != null) { + BigDecimal decimalValue = new BigDecimal(orgValue.toPlainString()) + .setScale(nonNullableSchema.getScale(), RoundingMode.HALF_EVEN); + recordBuilder.setDecimal(field.getName(), decimalValue); return; } - setField(resultSet, recordBuilder, field, columnIndex, sqlType, sqlPrecision, sqlScale); } + setField(resultSet, recordBuilder, field, columnIndex, sqlType, sqlPrecision, sqlScale); } private void setZonedDateTimeBasedOnOuputSchema(StructuredRecord.Builder recordBuilder, @@ -121,12 +125,8 @@ private void setZonedDateTimeBasedOnOuputSchema(StructuredRecord.Builder recordB private static boolean isUseSchema(ResultSetMetaData metadata, int columnIndex) throws SQLException { String columnTypeName = metadata.getColumnTypeName(columnIndex); // If the column Type Name is present in the String mapped PostgreSQL types then return true. - if (PostgresSchemaReader.STRING_MAPPED_POSTGRES_TYPES_NAMES.contains(columnTypeName) - || PostgresSchemaReader.STRING_MAPPED_POSTGRES_TYPES.contains(metadata.getColumnType(columnIndex))) { - return true; - } - - return false; + return (PostgresSchemaReader.STRING_MAPPED_POSTGRES_TYPES_NAMES.contains(columnTypeName) + || PostgresSchemaReader.STRING_MAPPED_POSTGRES_TYPES.contains(metadata.getColumnType(columnIndex))); } private Object createPGobject(String type, String value, ClassLoader classLoader) throws SQLException { @@ -152,16 +152,14 @@ protected void writeNonNullToDB(PreparedStatement stmt, Schema fieldSchema, if (PostgresSchemaReader.STRING_MAPPED_POSTGRES_TYPES_NAMES.contains(columnType.getTypeName()) || PostgresSchemaReader.STRING_MAPPED_POSTGRES_TYPES.contains(columnType.getType())) { stmt.setObject(sqlIndex, createPGobject(columnType.getTypeName(), - record.get(fieldName), - stmt.getClass().getClassLoader())); + record.get(fieldName), + stmt.getClass().getClassLoader())); + return; + } + if (columnType.getType() == Types.NUMERIC && record.get(fieldName) != null && + fieldSchema.getType() == Schema.Type.STRING) { + stmt.setBigDecimal(sqlIndex, new BigDecimal((String) record.get(fieldName))); return; - } else if (columnType.getType() == Types.NUMERIC) { - if (record.get(fieldName) != null) { - if (fieldSchema.getType() == Schema.Type.STRING) { - stmt.setBigDecimal(sqlIndex, new BigDecimal((String) record.get(fieldName))); - return; - } - } } super.writeNonNullToDB(stmt, fieldSchema, fieldName, fieldIndex);