diff --git a/core/src/datayoga_core/blocks/relational/utils.py b/core/src/datayoga_core/blocks/relational/utils.py index 98cac95c..89d53393 100644 --- a/core/src/datayoga_core/blocks/relational/utils.py +++ b/core/src/datayoga_core/blocks/relational/utils.py @@ -5,6 +5,8 @@ import sqlalchemy as sa from datayoga_core.connection import Connection from datayoga_core.context import Context +from sqlalchemy.engine.interfaces import DBAPIConnection +from sqlalchemy.pool import ConnectionPoolEntry logger = logging.getLogger("dy") @@ -71,9 +73,29 @@ def get_engine(connection_name: str, context: Context, autocommit: bool = True) pool_pre_ping=True, **extra) + if db_type == DbType.ORACLE: + # add NLS_DATE_FORMAT for Oracle to explicitly set the date format to ISO + sa.event.listen(engine, "connect", alter_session_on_oracle_connect) + return engine, db_type +def alter_session_on_oracle_connect(dbapi_connection: DBAPIConnection, connection_record: ConnectionPoolEntry): + """SQLAlchemy event listener to alter the Oracle session settings upon connection. + + This callback function is intended to be used with SQLAlchemy's `connect` event. + It alters the Oracle session settings by setting the `NLS_DATE_FORMAT` for the session + to ensure that dates are formatted as 'YYYY-MM-DD HH24:MI:SS'. + + Args: + dbapi_connection (DBAPIConnection): The raw DB-API connection object provided by the Oracle driver. + connection_record (ConnectionPoolEntry): A record associated with the connection, managed by SQLAlchemy. + """ + cursor = dbapi_connection.cursor() + cursor.execute("ALTER SESSION SET NLS_DATE_FORMAT = 'YYYY-MM-DD HH24:MI:SS'") + cursor.close() + + def construct_table_reference(table: sa.Table, with_brackets: bool = False) -> str: """Constructs a table reference string. diff --git a/integration-tests/common/db_utils.py b/integration-tests/common/db_utils.py index a3819a33..a1b8bd2c 100644 --- a/integration-tests/common/db_utils.py +++ b/integration-tests/common/db_utils.py @@ -1,8 +1,8 @@ from typing import Any, Dict, Optional from datayoga_core.blocks.relational.utils import DEFAULT_DRIVERS, DbType -from sqlalchemy import (Column, Integer, String, Table, create_engine, inspect, - text) +from sqlalchemy import (Column, DateTime, Integer, String, Table, + create_engine, inspect, text) from sqlalchemy.engine import Engine from sqlalchemy.orm import declarative_base from testcontainers.core.generic import (ADDITIONAL_TRANSIENT_ERRORS, @@ -107,7 +107,8 @@ def create_emp_table(engine: Engine, schema_name: Optional[str]): Column("full_name", String(50)), Column("country", String(50)), Column("address", String(50)), - Column("gender", String(1)) + Column("gender", String(1)), + Column("date_of_birth", DateTime()) ] Table("emp", base.metadata, *columns, schema=schema_name) diff --git a/integration-tests/common/redis_utils.py b/integration-tests/common/redis_utils.py index 9398c21e..f67042eb 100644 --- a/integration-tests/common/redis_utils.py +++ b/integration-tests/common/redis_utils.py @@ -61,6 +61,7 @@ def add_to_emp_stream(redis_client: Redis): "_id": 1, "fname": "john", "lname": "doe", + "date_of_birth": "2012-12-13", "country_code": 972, "country_name": "israel", "credit_card": "1234-1234-1234-1234", @@ -75,6 +76,7 @@ def add_to_emp_stream(redis_client: Redis): "_id": 2, "fname": "jane", "lname": "doe", + "date_of_birth": "2011-11-13", "country_code": 972, "country_name": "israel", "credit_card": "1000-2000-3000-4000", diff --git a/integration-tests/resources/jobs/tests/redis_to_db2.dy.yaml b/integration-tests/resources/jobs/tests/redis_to_db2.dy.yaml index 684e1f6a..ae4e34e0 100644 --- a/integration-tests/resources/jobs/tests/redis_to_db2.dy.yaml +++ b/integration-tests/resources/jobs/tests/redis_to_db2.dy.yaml @@ -18,6 +18,7 @@ steps: country: concat([country_code, ' - ', upper(country_name)]) gender: gender addresses: addresses + date_of_birth: date_of_birth language: jmespath - uses: relational.write with: @@ -30,6 +31,7 @@ steps: - FULL_NAME: full_name - country - gender + - date_of_birth - uses: relational.write with: connection: db2-hr diff --git a/integration-tests/resources/jobs/tests/redis_to_mysql.dy.yaml b/integration-tests/resources/jobs/tests/redis_to_mysql.dy.yaml index 9fb9aa74..73261f2c 100644 --- a/integration-tests/resources/jobs/tests/redis_to_mysql.dy.yaml +++ b/integration-tests/resources/jobs/tests/redis_to_mysql.dy.yaml @@ -18,6 +18,7 @@ steps: country: concat([country_code, ' - ', upper(country_name)]) gender: gender addresses: addresses + date_of_birth: date_of_birth language: jmespath - uses: relational.write with: @@ -31,6 +32,7 @@ steps: - full_name: fullname - country - gender + - date_of_birth - uses: relational.write with: connection: mysql-hr diff --git a/integration-tests/resources/jobs/tests/redis_to_oracle.dy.yaml b/integration-tests/resources/jobs/tests/redis_to_oracle.dy.yaml index 77a8c6b3..bcffdee9 100644 --- a/integration-tests/resources/jobs/tests/redis_to_oracle.dy.yaml +++ b/integration-tests/resources/jobs/tests/redis_to_oracle.dy.yaml @@ -18,6 +18,7 @@ steps: country: concat([country_code, ' - ', upper(country_name)]) gender: gender addresses: addresses + date_of_birth: date_of_birth language: jmespath - uses: relational.write with: @@ -31,6 +32,7 @@ steps: - full_name - country - gender + - date_of_birth - uses: relational.write with: connection: oracle-hr diff --git a/integration-tests/resources/jobs/tests/redis_to_pg.dy.yaml b/integration-tests/resources/jobs/tests/redis_to_pg.dy.yaml index 06e89f48..a7b9c0a9 100644 --- a/integration-tests/resources/jobs/tests/redis_to_pg.dy.yaml +++ b/integration-tests/resources/jobs/tests/redis_to_pg.dy.yaml @@ -18,6 +18,7 @@ steps: country: concat([country_code, ' - ', upper(country_name)]) gender: gender addresses: addresses + date_of_birth: date_of_birth language: jmespath - uses: relational.write with: @@ -31,6 +32,7 @@ steps: - full_name - country - gender + - date_of_birth - uses: relational.write with: connection: psql-hr diff --git a/integration-tests/test_redis_to_relational.py b/integration-tests/test_redis_to_relational.py index 8d7b5375..eb00ec15 100644 --- a/integration-tests/test_redis_to_relational.py +++ b/integration-tests/test_redis_to_relational.py @@ -1,3 +1,4 @@ +import datetime import logging from contextlib import suppress from typing import Optional @@ -77,7 +78,9 @@ def check_results(engine: Engine, schema_name: Optional[str]): assert second_employee is not None assert second_employee["full_name"] == "Jane Doe" assert second_employee["country"] == "972 - ISRAEL" + assert second_employee["date_of_birth"] == datetime.datetime(2011, 11, 13, 0, 0) assert second_employee["gender"] == "F" + # address was not in the inserted record. verify that additional columns are set to null assert second_employee["address"] is None