Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1348700: Error on .filter for specific filters #1440

Closed
sfc-gh-pkommini opened this issue Apr 26, 2024 · 2 comments
Closed

SNOW-1348700: Error on .filter for specific filters #1440

sfc-gh-pkommini opened this issue Apr 26, 2024 · 2 comments
Assignees
Labels
bug Something isn't working local testing Local Testing issues/PRs needs triage Initial RCA is required

Comments

@sfc-gh-pkommini
Copy link

sfc-gh-pkommini commented Apr 26, 2024

Please answer these questions before submitting your issue. Thanks!

  1. What version of Python are you using?

Python 3.10.11

  1. What are the Snowpark Python and pandas versions in the environment?
pandas==2.2.2
pandas-stubs==2.2.1.240316
snowflake-snowpark-python==1.15.0
  1. What did you do?

We have a function as below which queries a Snowflake table. If specific filters are specified we then apply the filter by adding a .filter. If that filter is unspecified we then skip applying that filter.

The situation here is that on the states, cost_centers and employee_names filters we get the error shown below the function definition.

When we don't specify those filters it all works as expected.

This is the test setup:

    # REVIEW_DATA table mock
    review_data_df = session.create_dataframe(
        data=REVIEW_DATA_DF_SAMPLE_1,
        schema=StructType(
            [
                StructField("ID", LongType()),
                StructField("REVIEW_ID", LongType()),
                StructField("USERNAME", StringType()),
                StructField("NAME", StringType()),
                StructField("EMAIL", StringType()),
                StructField("SYSTEM_NAME", StringType()),
                StructField("ROLE", StringType()),
                StructField("COST_CENTER", StringType()),
                StructField("TEAM", StringType()),
                StructField("MANAGER_NAME", StringType()),
                StructField("MANAGER_EMAIL", StringType()),
                StructField("APPROVER_USEREMAIL", StringType()),
                StructField("TIMESTAMP", TimestampType()),
                StructField("ACTION", StringType()),
                StructField("COMMENTS", StringType()),
                StructField("LOOKBACK_WHYGRANTTHEN_ID", LongType()),
                StructField("LOOKBACK_WHYREVOKENOW_ID", LongType()),
                StructField("STATE", StringType()),
                StructField("STATE_COMMENTS", StringType()),
                StructField("UPDATED_AT", TimestampType()),
                StructField("UPDATED_BY", StringType()),
                StructField("COMPLY_APPROVER", StringType()),
                StructField("COMPLY_ACTION_TIMESTAMP", TimestampType()),
                StructField("ORIGINAL_MGR_EMAIL", StringType()),
                StructField("RBAC_ALIGNMENT", StringType()),
                StructField("PREV_QUARTER_ACTION", StringType()),
            ]
        ),
    )
    review_data_df.write.save_as_table(
        table_name=REVIEW_DATA_TABLE, mode="overwrite", table_type="temporary"
    )  # type: ignore

    # UAR_REVIEW_SUBMISSIONS table mock
    review_submissions_df = session.create_dataframe(
        data=REVIEW_SUBMISSIONS_DF_SAMPLE_1,
        schema=StructType(
            [
                StructField("ID", LongType()),
                StructField("USERNAME", StringType()),
                StructField("NAME", StringType()),
                StructField("EMAIL", StringType()),
                StructField("SYSTEM_NAME", StringType()),
                StructField("ROLE", StringType()),
                StructField("COST_CENTER", StringType()),
                StructField("TEAM", StringType()),
                StructField("MANAGER_NAME", StringType()),
                StructField("MANAGER_EMAIL", StringType()),
                StructField("APPROVER_USEREMAIL", StringType()),
                StructField("TIMESTAMP", TimestampType()),
                StructField("ACTION", StringType()),
                StructField("COMMENTS", StringType()),
                StructField("LOOKBACK_WHYGRANTTHEN_ID", LongType()),
                StructField("LOOKBACK_WHYREVOKENOW_ID", LongType()),
                StructField("STATE", StringType()),
                StructField("STATE_COMMENTS", StringType()),
                StructField("UPDATED_AT", TimestampType()),
                StructField("UPDATED_BY", StringType()),
                StructField("COMPLY_APPROVER", StringType()),
                StructField("COMPLY_ACTION_TIMESTAMP", TimestampType()),
                StructField("ORIGINAL_MGR_EMAIL", StringType()),
                StructField("RBAC_ALIGNMENT", StringType()),
                StructField("PREV_QUARTER_ACTION", StringType()),
            ]
        ),
    )
    review_submissions_df.write.save_as_table(
        table_name=REVIEW_SUBMISSIONS_LATEST_VIEW,
        mode="overwrite",
        table_type="temporary",
    )  # type: ignore

    lookback_answers_df = session.create_dataframe(
        data=LOOKBACK_ANSWERS_DF_SAMPLE_1,
        schema=StructType(
            [
                StructField("ID", LongType()),
                StructField("LOOKBACK_QUESTION", StringType()),
                StructField("LOOKBACK_ANS_CHOICE", StringType()),
                StructField("HELP_TEXT", StringType()),
                StructField("UPDATED_BY", StringType()),
                StructField("UPDATED_AT", TimestampType()),
                StructField("SORT_ORDER", LongType()),
            ]
        ),
    )
    lookback_answers_df.write.save_as_table(
        table_name=LOOKBACK_ANSWERS_TABLE, mode="overwrite", table_type="temporary"
    )  # type: ignore

    mock_dt = datetime(2023, 5, 17, 9, 30)
    mock_datetime = mocker.patch(
        "uar.manager_app_utils.manager_app_models.pending_reviews.datetime"
    )
    mock_datetime.now.return_value = mock_dt

This is the test:

   expected_df_states_filtered = pd.DataFrame(
        data=[
            [
                14,
                "[email protected]",
                "name-4",
                "uname-4",
                "role-4",
                "system-1",
                "[email protected]",
                DATETIME_SAMPLE_1,
                "[email protected]",
                None,
                None,
                None,
                DATETIME_SAMPLE_1,
                None,
                None,
                "returned",
                None,
                "Exception",
                "cost-center-1",
                "Approved/[email protected]",
                None,
                None,
                1,
            ]
        ],
        columns=GET_REVIEWS_FOR_USER_SCHEMA,
    )
    actual_df_states_filtered = get_reviews_for_user(
        manager_email="[email protected]",
        filter={"states": ["returned"]},
        page_size=DEFAULT_PAGE_SIZE,
        page_number=0,
    )
    assert_frame_equal(expected_df_states_filtered, actual_df_states_filtered)
def get_reviews_for_user(
    manager_email: str,
    filter: dict,
    page_size: int = DEFAULT_PAGE_SIZE,
    page_number: int = 0,
) -> Any:
    """Query the snowflake data cloud.

    Args:
        user (str): User email.

    Returns:
        pd.DataFrame: Returns reviews that a line manager needs to review for a user access review process.
    """
    session = db.init_connection("snowflake")
    offset = get_offset(
        page_size,
        page_number,
    )

    df_reviews = session.table(REVIEW_DATA_TABLE)
    df_review_submissions = session.table(REVIEW_SUBMISSIONS_LATEST_VIEW).filter(
        (col("manager_email") == lit(manager_email))
    )

    df_lookback_answers = session.table(LOOKBACK_ANSWERS_TABLE)
    df_reviews_joined = (
        df_reviews.join(
            right=df_review_submissions,
            on="id",
            how="left",
            rsuffix="_submitted",
        )
        .join(
            df_lookback_answers,
            col("lookback_whygrantthen_id") == df_lookback_answers["id"],
            join_type="left",
            rsuffix="_wt",
        )
        .join(
            df_lookback_answers,
            col("lookback_whyrevokenow_id") == df_lookback_answers["id"],
            join_type="left",
            rsuffix="_wn",
        )
    )

    df_reviews_filtered = df_reviews_joined.filter(
        (col("MANAGER_EMAIL") == lit(manager_email))
        & (
            (
                col("STATE").is_null() & col("STATE_SUBMITTED").is_null()
            )
            | (
                (col("STATE") == lit("returned"))
                & (col("STATE_SUBMITTED") == lit("submitted"))
                & (col("UPDATED_AT") > col("updated_at_submitted"))
            )
            | (
                (col("state") == lit("returned"))
                & (col("state_submitted").is_null())
                & (col("updated_at_submitted").is_null())
            )
        )
    )

    systems = list(map(lambda s: s.lower(), filter.get("systems", [])))
    roles = list(map(lambda s: s.lower(), filter.get("roles", [])))
    cost_centers = list(map(lambda s: s.lower(), filter.get("cost_centers", [])))
    states = list(map(lambda s: s.lower() if s else None, filter.get("states", [])))
    employee_emails = list(map(lambda s: s.lower(), filter.get("employee_emails", [])))
    usernames = list(map(lambda s: s.lower(), filter.get("usernames", [])))
    employee_names = list(map(lambda s: s.lower(), filter.get("employee_names", [])))
    last_action = filter.get("last_action", None)

    if systems:
        df_reviews_filtered = df_reviews_filtered.filter(
            lower(col("SYSTEM_NAME")).in_(systems)
        )

    if roles:
        df_reviews_filtered = df_reviews_filtered.filter(lower(col("ROLE")).in_(roles))

    if cost_centers:
        cost_centers_check_null = False
        if NULL_STRING in cost_centers:
            cost_centers_check_null = True
            cost_centers.remove(NULL_STRING)
        if cost_centers_check_null and cost_centers:
            df_reviews_filtered = df_reviews_filtered.filter(
                col("COST_CENTER").is_null()
                | lower(col("COST_CENTER")).in_(cost_centers)
            )
        elif cost_centers_check_null and not cost_centers:
            df_reviews_filtered = df_reviews_filtered.filter(
                col("COST_CENTER").is_null()
            )
        else:
            df_reviews_filtered = df_reviews_filtered.filter(
                lower(col("COST_CENTER")).in_(cost_centers)
            )

    if states:
        if None in states:
            states.remove(None)
            df_reviews_filtered = df_reviews_filtered.filter(
                lower(col("STATE")).in_(states) | col("STATE").is_null()
            )
        else:
            df_reviews_filtered = df_reviews_filtered.filter(
                lower(col("STATE")).in_(states)
            )

    if usernames:
        df_reviews_filtered = df_reviews_filtered.filter(
            lower(col("USERNAME")).in_(usernames)
        )

    if employee_emails:
        df_reviews_filtered = df_reviews_filtered.filter(
            lower(col("EMAIL")).in_(employee_emails)
        )

    if employee_names:
        employee_names_check_null = False
        if NULL_STRING in employee_names:
            employee_names_check_null = True
            employee_names.remove(NULL_STRING)
        if employee_names_check_null and employee_names:
            df_reviews_filtered = df_reviews_filtered.filter(
                col("NAME").is_null() | lower(col("NAME")).in_(employee_names)
            )
        elif employee_names_check_null and not employee_names:
            df_reviews_filtered = df_reviews_filtered.filter(col("NAME").is_null())
        else:
            df_reviews_filtered = df_reviews_filtered.filter(
                lower(col("NAME")).in_(employee_names)
            )

    if last_action:
        if last_action in REVIEW_ACTIONS:
            last_action = f"{last_action.lower()}%"
            df_reviews_filtered = df_reviews_filtered.filter(
                lower(col("PREV_QUARTER_ACTION")).like(lit(last_action))
            )
        else:  # Case of New Review
            df_reviews_filtered = df_reviews_filtered.filter(
                col("PREV_QUARTER_ACTION").is_null()
            )

    df_reviews_selected = (
        df_reviews_filtered.select(
            [
                col("id").alias("review_data_id"),
            ]
            + [col(c) for c in REVIEWS_FOR_USER_DATA_COLUMNS]
            + [
                when(
                    col("prev_quarter_action").is_null(),
                    cast(lit("new"), StringType()),
                )
                .when(
                    lower(col("prev_quarter_action")).like("approved/keep%"),
                    cast(lit("approved"), StringType()),
                )
                .otherwise(col("prev_quarter_action"))
                .alias("last_action")
            ]
            + [
                col("lookback_ans_choice").alias("Why was this originally Granted?"),
                col("lookback_ans_choice_wn").alias("Why are you revoking this Role?"),
                count("*").over().alias("FULL_COUNT"),
            ]
        )
        .order_by([col("review_data_id")], ascending=True)
        .limit(
            n=page_size,
            offset=offset,
        )
    )
    return df_reviews_selected
# @st.cache_data(ttl=7200)
def get_reviews_for_user(
    manager_email: str,
    filter: dict,
    page_size: int = DEFAULT_PAGE_SIZE,
    page_number: int = 0,
) -> Any:
    """Query the snowflake data cloud.

    Args:
        user (str): User email.

    Returns:
        pd.DataFrame: Results from the cases table returned as a data frame.
    """
    session = db.init_connection("snowflake")
    offset = get_offset(
        page_size,
        page_number,
    )

    df_reviews = session.table(REVIEW_DATA_TABLE)
    df_review_submissions = session.table(REVIEW_SUBMISSIONS_LATEST_VIEW).filter(
        (col("manager_email") == lit(manager_email))
    )

    df_lookback_answers = session.table(LOOKBACK_ANSWERS_TABLE)
    df_reviews_joined = (
        df_reviews.join(
            right=df_review_submissions,
            on="id",
            how="left",
            rsuffix="_submitted",
        )
        .join(
            df_lookback_answers,
            col("lookback_whygrantthen_id") == df_lookback_answers["id"],
            join_type="left",
            rsuffix="_wt",
        )
        .join(
            df_lookback_answers,
            col("lookback_whyrevokenow_id") == df_lookback_answers["id"],
            join_type="left",
            rsuffix="_wn",
        )
    )

    df_reviews_filtered = df_reviews_joined.filter(
        (col("MANAGER_EMAIL") == lit(manager_email))
        & (
            (  # Show those that are null in both REVIEW_DATA and REVIEW_SUBMISSIONS
                col("STATE").is_null() & col("STATE_SUBMITTED").is_null()
            )
            | (  # or Also show those that have state:
                # "returned in REVIEW_DATA & "submitted" in REVIEW_SUBMISSIONS
                # & have a later entry in REVIEW_DATA than in REVIEW_SUBMISSIONS
                (col("STATE") == lit("returned"))
                & (col("STATE_SUBMITTED") == lit("submitted"))
                & (col("UPDATED_AT") > col("updated_at_submitted"))
            )
            | (  # or Also show those that have state:
                # "returned in REVIEW_DATA & NULL in UAR_REVIEW_SUBMISSIONS
                # This condition should never happen but we're handling
                # the case when if it does happen to be extra cautious
                (col("state") == lit("returned"))
                & (col("state_submitted").is_null())
                & (col("updated_at_submitted").is_null())
            )
        )
    )

    systems = list(map(lambda s: s.lower(), filter.get("systems", [])))
    roles = list(map(lambda s: s.lower(), filter.get("roles", [])))
    cost_centers = list(map(lambda s: s.lower(), filter.get("cost_centers", [])))
    states = list(map(lambda s: s.lower() if s else None, filter.get("states", [])))
    employee_emails = list(map(lambda s: s.lower(), filter.get("employee_emails", [])))
    usernames = list(map(lambda s: s.lower(), filter.get("usernames", [])))
    employee_names = list(map(lambda s: s.lower(), filter.get("employee_names", [])))
    last_action = filter.get("last_action", None)

    if systems:
        df_reviews_filtered = df_reviews_filtered.filter(
            lower(col("SYSTEM_NAME")).in_(systems)
        )

    if roles:
        df_reviews_filtered = df_reviews_filtered.filter(lower(col("ROLE")).in_(roles))

    if cost_centers:
        cost_centers_check_null = False
        if NULL_STRING in cost_centers:
            cost_centers_check_null = True
            cost_centers.remove(NULL_STRING)
        if cost_centers_check_null and cost_centers:
            df_reviews_filtered = df_reviews_filtered.filter(
                col("COST_CENTER").is_null()
                | lower(col("COST_CENTER")).in_(cost_centers)
            )
        elif cost_centers_check_null and not cost_centers:
            df_reviews_filtered = df_reviews_filtered.filter(
                col("COST_CENTER").is_null()
            )
        else:
            df_reviews_filtered = df_reviews_filtered.filter(
                lower(col("COST_CENTER")).in_(cost_centers)
            )

    if states:
        if None in states:
            states.remove(None)
            df_reviews_filtered = df_reviews_filtered.filter(
                lower(col("STATE")).in_(states) | col("STATE").is_null()
            )
        else:
            df_reviews_filtered = df_reviews_filtered.filter(
                lower(col("STATE")).in_(states)
            )

    if usernames:
        df_reviews_filtered = df_reviews_filtered.filter(
            lower(col("USERNAME")).in_(usernames)
        )

    if employee_emails:
        df_reviews_filtered = df_reviews_filtered.filter(
            lower(col("EMAIL")).in_(employee_emails)
        )

    if employee_names:
        employee_names_check_null = False
        if NULL_STRING in employee_names:
            employee_names_check_null = True
            employee_names.remove(NULL_STRING)
        if employee_names_check_null and employee_names:
            df_reviews_filtered = df_reviews_filtered.filter(
                col("NAME").is_null() | lower(col("NAME")).in_(employee_names)
            )
        elif employee_names_check_null and not employee_names:
            df_reviews_filtered = df_reviews_filtered.filter(col("NAME").is_null())
        else:
            df_reviews_filtered = df_reviews_filtered.filter(
                lower(col("NAME")).in_(employee_names)
            )

    if last_action:
        if last_action in REVIEW_ACTIONS:
            last_action = f"{last_action.lower()}%"
            df_reviews_filtered = df_reviews_filtered.filter(
                lower(col("PREV_QUARTER_ACTION")).like(lit(last_action))
            )
        else:  # Case of New Review
            df_reviews_filtered = df_reviews_filtered.filter(
                col("PREV_QUARTER_ACTION").is_null()
            )

    df_reviews_selected = (
        df_reviews_filtered.select(
            [
                col("id").alias("review_data_id"),
            ]
            + [col(c) for c in REVIEWS_FOR_USER_DATA_COLUMNS]
            + [
                when(
                    col("prev_quarter_action").is_null(), cast(lit("new"), StringType())
                )
                .when(
                    lower(col("prev_quarter_action")) == "approved/keep",
                    cast(lit("approved"), StringType()),
                )
                .otherwise(col("prev_quarter_action"))
                .alias("last_action")
            ]
            + [
                col("lookback_ans_choice").alias("Why was this originally Granted?"),
                col("lookback_ans_choice_wn").alias("Why are you revoking this Role?"),
                count("*").over().alias("FULL_COUNT"),
            ]
        )
        .order_by([col("review_data_id")], ascending=True)
        .limit(
            n=page_size,
            offset=offset,
        )
    )
    return df_reviews_selected

Error:

E               pandas.errors.IndexingError: Unalignable boolean Series provided as indexer (index of the boolean Series and of the indexed object do not match).

Full Trace:

uar/manager_app_utils/manager_app_models/pending_reviews.py:562: in get_reviews_for_user
    df_reviews_filtered.collect(),
/opt/homebrew/Caskroom/miniconda/base/envs/snowpark/lib/python3.10/site-packages/snowflake/snowpark/_internal/telemetry.py:144: in wrap
    result = func(*args, **kwargs)
/opt/homebrew/Caskroom/miniconda/base/envs/snowpark/lib/python3.10/site-packages/snowflake/snowpark/dataframe.py:597: in collect
    return self._internal_collect_with_tag_no_telemetry(
/opt/homebrew/Caskroom/miniconda/base/envs/snowpark/lib/python3.10/site-packages/snowflake/snowpark/dataframe.py:645: in _internal_collect_with_tag_no_telemetry
    return self._session._conn.execute(
/opt/homebrew/Caskroom/miniconda/base/envs/snowpark/lib/python3.10/site-packages/snowflake/snowpark/mock/_connection.py:567: in execute
    res = execute_mock_plan(plan)
/opt/homebrew/Caskroom/miniconda/base/envs/snowpark/lib/python3.10/site-packages/snowflake/snowpark/mock/_plan.py:577: in execute_mock_plan
    from_df = execute_mock_plan(from_, expr_to_alias)
/opt/homebrew/Caskroom/miniconda/base/envs/snowpark/lib/python3.10/site-packages/snowflake/snowpark/mock/_plan.py:606: in execute_mock_plan
    column_series = calculate_expression(
/opt/homebrew/Caskroom/miniconda/base/envs/snowpark/lib/python3.10/site-packages/snowflake/snowpark/mock/_plan.py:1463: in calculate_expression
    return calculate_expression(exp.child, input_data, analyzer, expr_to_alias)
/opt/homebrew/Caskroom/miniconda/base/envs/snowpark/lib/python3.10/site-packages/snowflake/snowpark/mock/_plan.py:1737: in calculate_expression
    true_index = remaining[condition].index
/opt/homebrew/Caskroom/miniconda/base/envs/snowpark/lib/python3.10/site-packages/snowflake/snowpark/mock/_snowflake_data_type.py:248: in __getitem__
    result = super().__getitem__(item)
/opt/homebrew/Caskroom/miniconda/base/envs/snowpark/lib/python3.10/site-packages/pandas/core/frame.py:4093: in __getitem__
    return self._getitem_bool_array(key)
/opt/homebrew/Caskroom/miniconda/base/envs/snowpark/lib/python3.10/site-packages/pandas/core/frame.py:4149: in _getitem_bool_array
    key = check_bool_indexer(self.index, key)
  1. What did you expect to see?

We expected filtered results for that query.

@sfc-gh-pkommini sfc-gh-pkommini added bug Something isn't working needs triage Initial RCA is required local testing Local Testing issues/PRs labels Apr 26, 2024
@github-actions github-actions bot changed the title Error on `.filter for specific filters SNOW-1348700: Error on `.filter for specific filters Apr 26, 2024
@sfc-gh-pkommini sfc-gh-pkommini changed the title SNOW-1348700: Error on `.filter for specific filters SNOW-1348700: Error on .filter for specific filters Apr 26, 2024
@sfc-gh-stan sfc-gh-stan self-assigned this Apr 26, 2024
@sfc-gh-pkommini
Copy link
Author

Hi @sfc-gh-stan was able to slowly comment out various part of this query and was able to finally narrow down on the exact piece of code that's causing this issue.

The select statement is the issue:

    df_reviews_selected = (
        df_reviews_filtered.select(
            [
                col("id").alias("review_data_id"),
            ]
            + [col(c) for c in REVIEWS_FOR_USER_DATA_COLUMNS]
            + [
                when(
                    col("prev_quarter_action").is_null(),
                    cast(lit("new"), StringType()),
                )
                .when(
                    lower(col("prev_quarter_action")).like("approved/keep%"),
                    cast(lit("approved"), StringType()),
                )
                .otherwise(col("prev_quarter_action"))
                .alias("last_action")
            ]
            + [
                col("lookback_ans_choice").alias("Why was this originally Granted?"),
                col("lookback_ans_choice_wn").alias("Why are you revoking this Role?"),
                count("*").over().alias("FULL_COUNT"),
            ]
        )
        .order_by([col("review_data_id")], ascending=True)
        .limit(
            n=page_size,
            offset=offset,
        )
    )

And specifically this column when disabled and made simple causes the entire query's test to go through successfully for the given data but when left as show above errors out:

            + [
                when(
                    col("prev_quarter_action").is_null(),
                    cast(lit("new"), StringType()),
                )
                .when(
                    lower(col("prev_quarter_action")).like("approved/keep%"),
                    cast(lit("approved"), StringType()),
                )
                .otherwise(col("prev_quarter_action"))
                .alias("last_action")
            ]

when the above piece of code is replaced with the below code within the select statement. It all works and passes the test:

            + [
                col("prev_quarter_action").alias("last_action")
            ]

So I think it's the chained when tokens that's causing the issue. Let me know if you can test a simple when().when().otherwise().alias() logic and see if you're able to reproduce the error.

@sfc-gh-jrose
Copy link
Contributor

A fix for this was released in v1.19.0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working local testing Local Testing issues/PRs needs triage Initial RCA is required
Projects
None yet
Development

No branches or pull requests

3 participants