Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1460351: [v1.17 - v1.18] Can't create dataframe "withcolumn" when column value can have varying length #1725

Closed
jeromesubs opened this issue Jun 3, 2024 · 3 comments
Assignees
Labels
bug Something isn't working local testing Local Testing issues/PRs triaged

Comments

@jeromesubs
Copy link

Please answer these questions before submitting your issue. Thanks!

  1. What version of Python are you using?

    Python 3.11.8 (main, Feb 26 2024, 21:39:34) [GCC 11.2.0]

  2. What are the Snowpark Python and pandas versions in the environment?

    pandas==2.2.1
    snowflake-snowpark-python==1.18.0

  3. What did you do?

    Create a dataframe
    Create a second dataframe adding a new column with conditions, the column value can be one of two string of different length

  4. What did you expect to see?

    I would expect the second dataframe to be created without error

Here is the code:

class TestBr135(TestCase):

    def __init__(self, *args, **kwargs):
        super(TestBr135, self).__init__(*args, **kwargs)
        self.__session = Session.builder.config('local_testing', True).create()

    def test_br135(self):
        input_data = [
            ("SSSS", 12344322, date(2019, 12, 22), date(2019, 12, 30))
        ]

        schema = T.StructType([
            T.StructField('COLUMN1', T.StringType()),
            T.StructField('COLUMN2', T.IntegerType()),
            T.StructField('DATE1', T.DateType()),
            T.StructField('DATE2', T.DateType())
        ])

        df_input = self.__session.create_dataframe(input_data, schema)

        df_result = df_input.withColumn("NEWCOLUMN", f.when((f.col("DATE1").is_not_null and
                                                                        f.col("DATE2").is_not_null and
                                                                       (f.datediff("day", f.col("DATE1"), f.col("DATE2")) <= 365)), f.lit("String Length")).otherwise(f.lit("String with different length")))
        
        df_result.show(50)

The show command is where the test crash, but even by debugging and looking at df_result properties, you can see the error:
"CaseWhen expressions have conflicting data types: StringType(13) != StringType(28)"

@jeromesubs jeromesubs added bug Something isn't working local testing Local Testing issues/PRs needs triage Initial RCA is required labels Jun 3, 2024
@github-actions github-actions bot changed the title [v1.17 - v1.18] Can't create dataframe "withcolumn" when column can have varying length SNOW-1460351: [v1.17 - v1.18] Can't create dataframe "withcolumn" when column can have varying length Jun 3, 2024
@jeromesubs
Copy link
Author

Here is the stack trace:

/br135_test.py:44:


../../../../miniconda3/envs/venv311/lib/python3.11/site-packages/snowflake/snowpark/_internal/telemetry.py:145: in wrap
result = func(*args, **kwargs)
../../../../miniconda3/envs/venv311/lib/python3.11/site-packages/snowflake/snowpark/dataframe.py:3176: in show
self._show_string(
../../../../miniconda3/envs/venv311/lib/python3.11/site-packages/snowflake/snowpark/dataframe.py:3294: in _show_string
result, meta = self._session._conn.get_result_and_metadata(
../../../../miniconda3/envs/venv311/lib/python3.11/site-packages/snowflake/snowpark/mock/_connection.py:806: in get_result_and_metadata
res = execute_mock_plan(plan, plan.expr_to_alias)
../../../../miniconda3/envs/venv311/lib/python3.11/site-packages/snowflake/snowpark/mock/_plan.py:708: in execute_mock_plan
column_series = calculate_expression(
../../../../miniconda3/envs/venv311/lib/python3.11/site-packages/snowflake/snowpark/mock/_plan.py:1693: in calculate_expression
return calculate_expression(exp.child, input_data, analyzer, expr_to_alias)


exp = <snowflake.snowpark._internal.analyzer.expression.CaseWhen object at 0x7f283d6541d0>
input_data = "COLUMN1" "COLUMN2" "DATE1" "DATE2"
0 SSSS 12344322 2019-12-22 2019-12-30
analyzer = <snowflake.snowpark.mock._analyzer.MockAnalyzer object at 0x7f283d61b650>
expr_to_alias = {}

def calculate_expression(
    exp: Expression,
    input_data: Union[TableEmulator, ColumnEmulator],
    analyzer: "MockAnalyzer",
    expr_to_alias: Dict[str, str],
    *,
    keep_literal: bool = False,
) -> ColumnEmulator:
    """
    Returns the calculated expression evaluated based on input table/column
    setting keep_literal to true returns Python datatype
    setting keep_literal to false returns a ColumnEmulator wrapping the Python datatype of a Literal
    """
    import numpy as np

    if isinstance(exp, Attribute):
        try:
            return input_data[expr_to_alias.get(exp.expr_id, exp.name)]
        except KeyError:
            # expr_id maps to the projected name, but input_data might still have the exp.name
            # dealing with the KeyError here, this happens in case df.union(df)
            # TODO: check SNOW-831880 for more context
            return input_data[exp.name]
    if isinstance(exp, (UnresolvedAttribute, Attribute)):
        if exp.is_sql_text:
            analyzer.session._conn.log_not_supported_error(
                external_feature_name="SQL Text Expression",
                internal_feature_name=type(exp).__name__,
                parameters_info={"exp.is_sql_text": str(exp.is_sql_text)},
                raise_error=NotImplementedError,
            )
        try:
            return input_data[exp.name]
        except KeyError:
            raise SnowparkLocalTestingException(f"invalid identifier {exp.name}")
    if isinstance(exp, (UnresolvedAlias, Alias)):
        return calculate_expression(exp.child, input_data, analyzer, expr_to_alias)
    if isinstance(exp, FunctionExpression):
        return handle_function_expression(exp, input_data, analyzer, expr_to_alias)
    if isinstance(exp, ListAgg):
        lhs = calculate_expression(exp.col, input_data, analyzer, expr_to_alias)
        lhs.sf_type = ColumnType(StringType(), exp.col.nullable)
        return _MOCK_FUNCTION_IMPLEMENTATION_MAP["listagg"](
            lhs,
            is_distinct=exp.is_distinct,
            delimiter=exp.delimiter,
        )
    if isinstance(exp, IsNull):
        child_column = calculate_expression(
            exp.child, input_data, analyzer, expr_to_alias
        )
        return ColumnEmulator(
            data=[bool(data is None) for data in child_column],
            sf_type=ColumnType(BooleanType(), True),
        )
    if isinstance(exp, IsNotNull):
        child_column = calculate_expression(
            exp.child, input_data, analyzer, expr_to_alias
        )
        return ColumnEmulator(
            data=[bool(data is not None) for data in child_column],
            sf_type=ColumnType(BooleanType(), True),
        )
    if isinstance(exp, IsNaN):
        child_column = calculate_expression(
            exp.child, input_data, analyzer, expr_to_alias
        )
        res = []
        for data in child_column:
            if data is None:
                res.append(None)
            else:
                try:
                    res.append(math.isnan(data))
                except TypeError:
                    res.append(False)
        return ColumnEmulator(
            data=res, dtype=object, sf_type=ColumnType(BooleanType(), True)
        )
    if isinstance(exp, Not):
        child_column = calculate_expression(
            exp.child, input_data, analyzer, expr_to_alias
        ).astype(bool)
        return ~child_column
    if isinstance(exp, UnresolvedAttribute):
        return analyzer.analyze(exp, expr_to_alias)
    if isinstance(exp, Literal):
        if not keep_literal:
            if isinstance(exp.datatype, StringType):
                # in live session, literal of string type will have size auto inferred
                exp.datatype = StringType(len(exp.value))
            res = ColumnEmulator(
                data=[exp.value for _ in range(len(input_data))],
                sf_type=ColumnType(exp.datatype, False),
                dtype=object,
            )
            res.index = input_data.index
            return res
        return exp.value
    if isinstance(exp, BinaryExpression):
        left = fix_drift_between_column_sf_type_and_dtype(
            calculate_expression(exp.left, input_data, analyzer, expr_to_alias)
        )
        right = fix_drift_between_column_sf_type_and_dtype(
            calculate_expression(exp.right, input_data, analyzer, expr_to_alias)
        )
        # TODO: Address mixed type calculation here. For instance Snowflake allows to add a date to a number, but
        #  pandas doesn't allow. Type coercion will address it.
        if isinstance(exp, Multiply):
            new_column = left * right
        elif isinstance(exp, Divide):
            new_column = left / right
        elif isinstance(exp, Add):
            new_column = left + right
        elif isinstance(exp, Subtract):
            new_column = left - right
        elif isinstance(exp, Remainder):
            new_column = left % right
        elif isinstance(exp, Pow):
            new_column = left**right
        elif isinstance(exp, EqualTo):
            new_column = left == right
            if left.hasnans and right.hasnans:
                new_column[
                    left.apply(lambda x: x is None) & right.apply(lambda x: x is None)
                ] = True
                new_column[
                    left.apply(lambda x: x is not None and np.isnan(x))
                    & right.apply(lambda x: x is not None and np.isnan(x))
                ] = True
                # NaN == NaN evaluates to False in pandas, but True in Snowflake
                new_column[new_column.isna() | new_column.isnull()] = False
        elif isinstance(exp, NotEqualTo):
            new_column = left != right
        elif isinstance(exp, GreaterThanOrEqual):
            new_column = left >= right
        elif isinstance(exp, GreaterThan):
            new_column = left > right
        elif isinstance(exp, LessThanOrEqual):
            new_column = left <= right
        elif isinstance(exp, LessThan):
            new_column = left < right
        elif isinstance(exp, And):
            new_column = (
                (left & right)
                if isinstance(input_data, TableEmulator) or not input_data
                else (left & right) & input_data
            )
        elif isinstance(exp, Or):
            new_column = (
                (left | right)
                if isinstance(input_data, TableEmulator) or not input_data
                else (left | right) & input_data
            )
        elif isinstance(exp, EqualNullSafe):
            either_isna = left.isna() | right.isna() | left.isnull() | right.isnull()
            both_isna = (left.isna() & right.isna()) | (left.isnull() & right.isnull())
            new_column = ColumnEmulator(
                [False] * len(left),
                dtype=bool,
                sf_type=ColumnType(BooleanType(), False),
            )
            new_column[either_isna] = False
            new_column[~either_isna] = left[~either_isna] == right[~either_isna]
            new_column[both_isna] = True
        elif isinstance(exp, BitwiseOr):
            new_column = left | right
        elif isinstance(exp, BitwiseXor):
            new_column = left ^ right
        elif isinstance(exp, BitwiseAnd):
            new_column = left & right
        else:
            analyzer.session._conn.log_not_supported_error(
                external_feature_name=f"Binary Expression {type(exp).__name__}",
                internal_feature_name=type(exp).__name__,
                raise_error=NotImplementedError,
            )
        return new_column
    if isinstance(exp, UnaryMinus):
        res = calculate_expression(exp.child, input_data, analyzer, expr_to_alias)
        return -res
    if isinstance(exp, RegExp):
        lhs = calculate_expression(exp.expr, input_data, analyzer, expr_to_alias)
        raw_pattern = calculate_expression(
            exp.pattern, input_data, analyzer, expr_to_alias
        )
        arguments = TableEmulator({"LHS": lhs, "PATTERN": raw_pattern})

        def _match_pattern(row) -> bool:
            input_str = row["LHS"]
            raw_pattern = row["PATTERN"]
            _pattern = (
                f"^{raw_pattern}" if not raw_pattern.startswith("^") else raw_pattern
            )
            _pattern = f"{_pattern}$" if not _pattern.endswith("$") else _pattern

            try:
                re.compile(_pattern)
            except re.error:
                raise SnowparkLocalTestingException(
                    f"Invalid regular expression {raw_pattern}"
                )

            return bool(re.match(_pattern, input_str))

        result = arguments.apply(_match_pattern, axis=1)
        result.sf_type = ColumnType(BooleanType(), True)
        return result
    if isinstance(exp, Like):
        lhs = calculate_expression(exp.expr, input_data, analyzer, expr_to_alias)
        pattern = convert_wildcard_to_regex(
            str(
                calculate_expression(exp.pattern, input_data, analyzer, expr_to_alias)[
                    0
                ]
            )
        )
        result = lhs.str.match(pattern)
        result.sf_type = ColumnType(BooleanType(), True)
        return result
    if isinstance(exp, InExpression):
        lhs = calculate_expression(exp.columns, input_data, analyzer, expr_to_alias)
        res = ColumnEmulator([False] * len(lhs), dtype=object)
        res.sf_type = ColumnType(BooleanType(), True)
        for val in exp.values:
            rhs = calculate_expression(val, input_data, analyzer, expr_to_alias)
            if isinstance(lhs, ColumnEmulator):
                if isinstance(rhs, ColumnEmulator):
                    res = res | lhs.isin(rhs)
                elif isinstance(rhs, TableEmulator):
                    res = res | lhs.isin(rhs.iloc[:, 0])
                else:
                    analyzer.session._conn.log_not_supported_error(
                        external_feature_name=f"IN expression with type {type(rhs).__name__} on the right",
                        internal_feature_name=type(exp).__name__,
                        parameters_info={"rhs": type(rhs).__name__},
                        raise_error=NotImplementedError,
                    )
            else:
                exists = lhs.apply(tuple, 1).isin(rhs.apply(tuple, 1))
                exists.sf_type = ColumnType(BooleanType(), False)
                res = res | exists
        return res
    if isinstance(exp, ScalarSubquery):
        return execute_mock_plan(exp.plan, expr_to_alias)
    if isinstance(exp, MultipleExpression):
        res = TableEmulator()
        for e in exp.expressions:
            res[analyzer.analyze(e, expr_to_alias)] = calculate_expression(
                e, input_data, analyzer, expr_to_alias
            )
        return res
    if isinstance(exp, Cast):
        column = calculate_expression(exp.child, input_data, analyzer, expr_to_alias)
        if isinstance(exp.to, DateType):
            return _MOCK_FUNCTION_IMPLEMENTATION_MAP["to_date"](
                column, try_cast=exp.try_
            )
        elif isinstance(exp.to, TimeType):
            return _MOCK_FUNCTION_IMPLEMENTATION_MAP["to_time"](
                column, try_cast=exp.try_
            )
        elif isinstance(exp.to, TimestampType):
            return _MOCK_FUNCTION_IMPLEMENTATION_MAP["to_timestamp"](
                column, try_cast=exp.try_
            )
        elif isinstance(exp.to, DecimalType):
            return _MOCK_FUNCTION_IMPLEMENTATION_MAP["to_decimal"](
                column,
                precision=exp.to.precision,
                scale=exp.to.scale,
                try_cast=exp.try_,
            )
        elif isinstance(
            exp.to, _IntegralType
        ):  # includes ByteType, ShortType, IntegerType, LongType
            res = _MOCK_FUNCTION_IMPLEMENTATION_MAP["to_decimal"](
                column, try_cast=exp.try_
            )
            res.set_sf_type(ColumnType(exp.to, nullable=column.sf_type.nullable))
            return res
        elif isinstance(exp.to, BinaryType):
            return _MOCK_FUNCTION_IMPLEMENTATION_MAP["to_binary"](
                column, try_cast=exp.try_
            )
        elif isinstance(exp.to, BooleanType):
            return _MOCK_FUNCTION_IMPLEMENTATION_MAP["to_boolean"](
                column, try_cast=exp.try_
            )
        elif isinstance(exp.to, StringType):
            return _MOCK_FUNCTION_IMPLEMENTATION_MAP["to_char"](
                column, try_cast=exp.try_
            )
        elif isinstance(exp.to, (DoubleType, FloatType)):
            return _MOCK_FUNCTION_IMPLEMENTATION_MAP["to_double"](
                column, try_cast=exp.try_
            )
        elif isinstance(exp.to, MapType):
            return _MOCK_FUNCTION_IMPLEMENTATION_MAP["to_object"](column)
        elif isinstance(exp.to, ArrayType):
            return _MOCK_FUNCTION_IMPLEMENTATION_MAP["to_array"](column)
        elif isinstance(exp.to, VariantType):
            return _MOCK_FUNCTION_IMPLEMENTATION_MAP["to_variant"](column)
        else:
            analyzer.session._conn.log_not_supported_error(
                external_feature_name=f"Cast to {type(exp.to).__name__}",
                internal_feature_name=type(exp).__name__,
                parameters_info={"exp.to": type(exp.to).__name__},
                raise_error=NotImplementedError,
            )
    if isinstance(exp, CaseWhen):
        remaining = input_data
        output_data = ColumnEmulator([None] * len(input_data))
        for case in exp.branches:
            if len(remaining) == 0:
                break
            condition = calculate_expression(
                case[0], input_data, analyzer, expr_to_alias
            ).fillna(value=False)
            value = calculate_expression(case[1], input_data, analyzer, expr_to_alias)

            true_index = remaining[condition].index
            output_data[true_index] = value[true_index]
            remaining = remaining[~remaining.index.isin(true_index)]

            if output_data.sf_type:
                if (
                    not isinstance(output_data.sf_type.datatype, NullType)
                    and output_data.sf_type != value.sf_type
                ):
                    raise SnowparkLocalTestingException(
                        f"CaseWhen expressions have conflicting data types: {output_data.sf_type} != {value.sf_type}"
                    )
            else:
                output_data.sf_type = value.sf_type

        if len(remaining) > 0 and exp.else_value:
            value = calculate_expression(
                exp.else_value, remaining, analyzer, expr_to_alias
            )
            output_data[remaining.index] = value[remaining.index]
            if output_data.sf_type:
                if (
                    not isinstance(output_data.sf_type.datatype, NullType)
                    and output_data.sf_type.datatype != value.sf_type.datatype
                ):
                  raise SnowparkLocalTestingException(
                        f"CaseWhen expressions have conflicting data types: {output_data.sf_type.datatype} != {value.sf_type.datatype}"
                    )

E snowflake.snowpark.mock.exceptions.SnowparkLocalTestingException: CaseWhen expressions have conflicting data types: StringType(13) != StringType(28)

../../../../miniconda3/envs/venv311/lib/python3.11/site-packages/snowflake/snowpark/mock/_plan.py:2003: SnowparkLocalTestingException

@sfc-gh-aling sfc-gh-aling added triaged and removed needs triage Initial RCA is required labels Jun 3, 2024
@sfc-gh-aling
Copy link
Contributor

thanking for reaching out! we will take a look at this ASAP

@sfc-gh-aling sfc-gh-aling self-assigned this Jun 3, 2024
@jeromesubs jeromesubs changed the title SNOW-1460351: [v1.17 - v1.18] Can't create dataframe "withcolumn" when column can have varying length SNOW-1460351: [v1.17 - v1.18] Can't create dataframe "withcolumn" when column value can have varying length Jun 4, 2024
@sfc-gh-aling
Copy link
Contributor

hey @jeromesubs , we have fixed the issue in the latest release 1.19.0. please upgrade and let us know how it goes!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working local testing Local Testing issues/PRs triaged
Projects
None yet
Development

No branches or pull requests

2 participants