Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue with BigQuery counts in Spark after writing to table #1290

Closed
rchynoweth opened this issue Sep 5, 2024 · 7 comments
Closed

Issue with BigQuery counts in Spark after writing to table #1290

rchynoweth opened this issue Sep 5, 2024 · 7 comments
Assignees

Comments

@rchynoweth
Copy link

rchynoweth commented Sep 5, 2024

I am having an issue getting accurate counts when reading/writing to BigQuery from Databricks after installing the connector.

Connector Version: spark-3.5-bigquery-0.39.1.jar
Apache Spark 3.5.0
Scala 2.12
Databricks 14.3LTS

Code to replicate:

from pyspark.sql import Row

# Create a list of Rows
data = [
    Row(col1="value1_1", col2="value1_2", col3="value1_3"),
    Row(col1="value2_1", col2="value2_2", col3="value2_3"),
    Row(col1="value3_1", col2="value3_2", col3="value3_3"),
    Row(col1="value4_1", col2="value4_2", col3="value4_3"),
    Row(col1="value5_1", col2="value5_2", col3="value5_3"),
    Row(col1="value6_1", col2="value6_2", col3="value6_3"),
    Row(col1="value7_1", col2="value7_2", col3="value7_3"),
    Row(col1="value8_1", col2="value8_2", col3="value8_3"),
    Row(col1="value9_1", col2="value9_2", col3="value9_3"),
    Row(col1="value10_1", col2="value10_2", col3="value10_3")
]

# Create DataFrame
df = spark.createDataFrame(data)

# count table
(spark.read
  .format("bigquery")
  .option("table", 'mydataset.test_read_write_table')
  .option("project", 'myproject')
  .load()
  ).count()

### prints 50

# write 10 rows to table
(df.write
  .mode('append')
  .format("bigquery")
  .option("project", 'myproject')
  .option("writeMethod", "direct")
  .save('mydataset.test_read_write_table')
)

# check count again
(spark.read
  .format("bigquery")
  .option("table", 'mydataset.test_read_write_table')
  .option("project", 'myproject')
  .load()
).count()
### prints 50 but should be 60

When I check the BQ table the rows are updated but not reflected in my Dataframe. If I use the query option instead of the table and perform "select count(1) from mydataset.test_read_write_table" then the counts are accurate. This seems like a potential cache problem which I tried using the cacheExpirationTimeInMinutes option to 0 but it seems to not work. However, if I set it to a positive integer it does work after the time setting is up.

@vishalkarve15 vishalkarve15 self-assigned this Sep 10, 2024
@agrawal-siddharth
Copy link
Collaborator

What happens if you omit the first count check? I'm trying to narrow down if there is an issue with the spark.read being called twice.

@rchynoweth
Copy link
Author

Interesting, it appears that the last count returned is still incorrect if I omit the first count. To summarize, I just did the following:

  1. Ran a count in the BQ Console to determine the existing row count (140).
  2. Create a DF with 10 rows.
  3. Wrote the 10 rows to BQ via direct write.
  4. Ran spark.read with a count. It returned the incorrect number of rows (140).
  5. Ran a count in the BQ Console and it returned the proper row count (150).

However, after a couple minutes it returned the proper count as a dataframe.

vishalkarve15 added a commit to vishalkarve15/spark-bigquery-connector that referenced this issue Sep 16, 2024
vishalkarve15 added a commit that referenced this issue Oct 24, 2024
* Issue #1290: Stopped using metadata for optimized count path

* add changes, fix test

* handle requirePartitionFilter

* increase timeout for presubmit
@MichalBogoryja
Copy link

Hi @rchynoweth,
I've encountered a similar issue. It looks like since spark-BQ connector 0.34.0 enableReadSessionCaching property is by default "true' (and corresponding one readSessionCacheDurationMins default is 5mins). I'm not 100% sure but for me, if you read the same table (even if it was changed in the meantime) within 5 minutes spark doesn't read any data from BQ but from the cache. I've tried to clean the cache but with no success. Simply set this property to "false" once starting your spark session and you should get the correct count.
However, the option to refresh the table should be added.

@rchynoweth
Copy link
Author

@MichalBogoryja - do you have to set it in the spark settings? Not in the dataframe read options? Doesn't work for me in the dataframe read.

@vishalkarve15
Copy link
Contributor

vishalkarve15 commented Nov 20, 2024

This has been fixed and will be available in the next release. In the meantime, you can test it using the nightly build. E.g. gs://spark-lib-nightly-snapshots/spark-3.5-bigquery-nightly-snapshot.jar

@vishalkarve15
Copy link
Contributor

Fixed in 0.41.1

@rchynoweth
Copy link
Author

Awesome. Thanks @vishalkarve15 !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants