Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-45866][SQL] Fix for Reuse of Exchange in AQE not happening when DPP filters are pushed down to the underlying Scan (like iceberg) #49152

Open
wants to merge 10 commits into
base: master
Choose a base branch
from

Conversation

ahshahid
Copy link

@ahshahid ahshahid commented Dec 11, 2024

What changes were proposed in this pull request?

The main change in this PR is to augment the trait of SupportsRuntimeV2Filtering by adding two new methods
`default boolean equalToIgnoreRuntimeFilters(Scan other) {
return this.equals(other);
}

default int hashCodeIgnoreRuntimeFilters() {
return this.hashCode();
}`

which the underlying V2 Scan should implement.
The BatchScanExec also gets modified accordingly where it invokes this two methods to check the equality of the Scan.

Pls note that this PR includes code of 2 other PRs too

  1. SPARK-45658
    This PR though not required per se, but is good to have for correctness ( & my other PR for broadcast var pushdown relies on this fix)
  2. SPARK-45926
    This PR is necessary to reproduce the issue and hence its code is needed for this PR to show the issue.

Also for this test to pass the code of DataSourceV2Relation.computeStats should disable throwing assertion error in testing, as that is a separate bug which gets hit, when the bug test for this PR is run.. So for now to allow the bug test to pass, I have modified the Utils.isTesting to avoid hitting the IllegalStateException

Why are the changes needed?

This change is need IMO to fix the issue of re-use of exchange not happening when DPP filters are pushed to the scan level.
The issue is this:
In certain types of queries for eg TPCDS Query 14b, the reuse of exchange does not happen in AQE , resulting in perf degradation.
The spark TPCDS tests are unable to catch the problem, because the InMemoryScan used for testing do not implement the equals & hashCode correctly , in the sense , that they do take into account the pushed down run time filters.

In concrete Scan implementations, for eg iceberg's SparkBatchQueryScan , the equality check , apart from other things, also involves Runtime Filters pushed ( which is correct).

Below is description of how this issue surfaces.
For a given stage being materialized, just before materialization starts, the run time filters are confined to the BatchScanExec level.
Only when the actual RDD corresponding to the BatchScanExec, is being evaluated, do the runtime filters get pushed to the underlying Scan.

Now if a new stage is created and it checks in the stageCache using its canonicalized plan to see if a stage can be reused, it fails to find the r-usable stage even if the stage exists, because the canonicalized spark plan present in the stage cache, has now the run time filters pushed to the Scan , so the incoming canonicalized spark plan does not match the key as their underlying scans differ . that is incoming spark plan's scan does not have runtime filters , while the canonicalized spark plan present as key in the stage cache has the scan with runtime filters pushed.

The fix as I have worked is to provide, two methods in the SupportsRuntimeV2Filtering interface ,
default boolean equalToIgnoreRuntimeFilters(Scan other)

{ return this.equals(other); }
default int hashCodeIgnoreRuntimeFilters()

{ return this.hashCode(); }
In the BatchScanExec, if the scan implements SupportsRuntimeV2Filtering, then instead of batch.equals, it should call scan.equalToIgnoreRuntimeFilters

And the underlying Scan implementations should provide equality which excludes run time filters.

Similarly the hashCode of BatchScanExec, should use scan.hashCodeIgnoreRuntimeFilters instead of ( batch.hashCode).

Does this PR introduce any user-facing change?

No. But the respective DataSourceV2Relations may need to augment their code.

How was this patch tested?

Added bug test for the same.

Was this patch authored or co-authored using generative AI tooling?

No

…ilters are pushed down to the underlying Scan (like iceberg)
@github-actions github-actions bot added the SQL label Dec 11, 2024
@ahshahid
Copy link
Author

To see the bug, you can do any of the following:

  1. Comment out the implementation of equalsIgnoreRuntimeFilter and hashCodeIgnoreRuntimeFilters from InMemoryV2FilterBatchScan.
    or
  2. Take the BatchScanExec from master and update the code and run the test

@ahshahid
Copy link
Author

ahshahid commented Dec 12, 2024

Pls note:
Actual source code changes are relatively few only 3 - 4 files modified. rest is newly added test resources to reproduce the issue. may be possible to write a similar bug test with minimal test resources. Can be done after review.

@ahshahid ahshahid changed the title [SARK-45866][SQL] Fix for Reuse of Exchange in AQE not happening when DPP filters are pushed down to the underlying Scan (like iceberg) [SPARK-45866][SQL] Fix for Reuse of Exchange in AQE not happening when DPP filters are pushed down to the underlying Scan (like iceberg) Feb 27, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant