Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BigQuery Pushdown filtering on Spark 3.4.2 #1207

Open
sid-habu opened this issue Apr 3, 2024 · 11 comments
Open

BigQuery Pushdown filtering on Spark 3.4.2 #1207

sid-habu opened this issue Apr 3, 2024 · 11 comments
Labels
enhancement New feature or request

Comments

@sid-habu
Copy link

sid-habu commented Apr 3, 2024

I have a Big Query table foo with a DATE column bar_date. I am trying to query this table in Spark 3.4.2 using the spark-bigquery-with-dependencies:0.30.0 connector

I am unable to get the pushdown filtering to work as the physical plan shows PushedFilters: [] and pulls in all the data from BQ before doing the filtering in Spark

Below is my code. I even tried enabling BigQueryConnectorUtils.enablePushdownSession(spark) but found that it isn't supported yet for Spark 3.4+

// Enabling pushdown session for BigQuery in Spark 3.4.2 throws an error
// https://github.com/GoogleCloudDataproc/spark-bigquery-connector/issues/1000

//BigQueryConnectorUtils.enablePushdownSession(spark)

spark.sqlContext.read
  ......
  .option("table", "foo")
  .option("viewsEnabled", "true")
  .load()
  .filter(col("bar_date") >= 2024-01-01 && col("bar_date") <= 2024-03-31)

Physical plan after stripping the table name and requireColumns. The filter list is empty in the plan as-is

DirectBigQueryRelation: |Querying table ......, parameters sent from Spark:|requiredColumns=[.......],|filters=[]

I am sure I am missing something trivial as I expect this simple filtering to be pushed down to BigQuery.

@sid-habu sid-habu changed the title Pushdown filtering on Spark 3.4.2 doesn't work BigQuery Pushdown filtering on Spark 3.4.2 doesn't work Apr 3, 2024
@sid-habu sid-habu changed the title BigQuery Pushdown filtering on Spark 3.4.2 doesn't work BigQuery Pushdown filtering on Spark 3.4.2 Apr 3, 2024
@isha97
Copy link
Member

isha97 commented Apr 3, 2024

@sid-habu Currently, pushdown is only supported till spark 3.3.

@isha97 isha97 added the enhancement New feature or request label Apr 3, 2024
@sid-habu
Copy link
Author

sid-habu commented Apr 3, 2024

@sid-habu Currently, pushdown is only supported till spark 3.3.

@isha97 In that case, can you please confirm if I use a workaround by passing in a raw query, it will execute the filtering in BigQuery

spark.sqlContext.read
  ......
  .option("viewsEnabled", "true")
  .option("query", "select x,y from foo where bar_date >= '2024-01-01' && bar_date <= '2024-03-31'")
  .load()

@tom-s-powell
Copy link
Contributor

@isha97 is is true filters aren't supported in Spark 3.4+? I realise BigQueryConnectorUtils.enablePushdownSession doesn't work, but with Spark 3.4.1 and a Spark filter applied I see DirectBigQueryRelation receives a compiled filter that is passed as a rowRestriction in ReadSession.TableReadOptions in ReadSessionCreator.

However, when I look in the BigQuery console and at the project history, I see a SELECT * FROM <table-name> query being executed, not one that has the filters. I don't know whether this is standard behaviour of the BigQuery Storage Read API though?

This is when using spark-bigquery-with-dependencies_2.12.

@davidrabinowitz
Copy link
Member

@tom-s-powell Filter pushdown are enabled by default in all the connector flavors, and cannot be disabled. You can track the usage of filters in the application log - both the filters the connector gets from Spark and the compiled filter are logged under the INFO level. Notice that you cannot track the connector usage by looking at the jobs at the project history as there are no jobs - the data is being read directly from the table using the BigQuery Storage Read API using the mentioned ReadSession. Query jobs are created on two occasions only - when reading from views, and when reading from queries (spark.load.format("bigquery").load("SELECT ...")).

Regarding BigQueryConnectorUtils.enablePushdownSession() - this belongs to the unreleased feature of query pushdown. A more aptly name for the method would be enableQueryPushdownSession() and it will be changed soon.

@tom-s-powell
Copy link
Contributor

tom-s-powell commented Apr 30, 2024

I see thank you. So in the case of load("SELECT ...") or .option("query", "SELECT ...").load(), how does filter pushdown work? Are filters not pushed down to create the temp tables, but are when consuming from the temporary tables?

One use-case we have for using query is for time travel, something like SELECT * FROM <table-name> FOR SYSTEM_TIME AS OF TIMESTAMP_MILLIS(%s). In this instance, there is no pushdown? Filter/column push-down on the result?

EDIT: One other question would be around limits, and if that is a pushdown capability?

@davidrabinowitz
Copy link
Member

When loading from query (load("SELECT ...") or .option("query", "SELECT ...").load()) the connector creates a BigQuery query job and executes the query, as given. Further reads from the data frame are read from the temporary table that materializes the data, and additional filters are pushed in the ReadSession if needed.

In the SELECT * FROM <table-name> FOR SYSTEM_TIME AS OF TIMESTAMP_MILLIS(%s) there are no filters or column projection, therefore the created results table is an exact snapshot of the table in the given timestamp. Any reads from the dataframe should push both filters and column projection in the ReadSession aimed at the temporary table.

If the question is whether we can push down df.head(20) as the equivalent of SELECT ... LIMIT 20 then the answer is not yet, as the BigQuery Storage Read API does not provide this capability at the moment. A workaround is to run load("SELECT ... LIMIT ...") but this solution is less flexible as the retrieved rows are fixed.

@tom-s-powell
Copy link
Contributor

Thanks for the explanation. And there's no way of using the BigQuery Storage Read API to query time travel without creating a temporary table? I'm assuming there's cost associated with that.

The other case we have is for partitioned tables. We have had reports that partitions are not pruned, but I assumed that is because we are using query and thus filters aren't pushed to the job creating the temporary table. Is there a solution here, or would the filter need to be included in the query?

@parthshyara
Copy link

this belongs to the unreleased feature of query pushdown

@davidrabinowitz @isha97 Is there a timeline when this feature will be released for spark 3.5? We actually have a strong use-case for this requirement and without this, we're incurring huge unnecessary costs. I've already seen a couple pending / closed issues related to this feature request.

@parthshyara
Copy link

An additional question related to the previous comment. Is the pushdown only supported on tables and not views, ie. if viewsEnabled is set to True, will the pushdown not applied?
(For the purpose of this question, assume spark-3.3-bigquery is used which has pushdown implemented)

@parthshyara
Copy link

@davidrabinowitz Following up on the previous questions.

@anilpuliyeril
Copy link

Hi @davidrabinowitz,

Are there any plans to add support for Spark 3.4 and 3.5? If so, could you please share the timeline or any progress updates on this?

Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

6 participants