-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add verbose pipeline parameter to output each processor's execution details #16843
base: main
Are you sure you want to change the base?
Add verbose pipeline parameter to output each processor's execution details #16843
Conversation
d5a2c4c
to
d931750
Compare
…etails Signed-off-by: Junwei Dai <[email protected]>
Signed-off-by: Junwei Dai <[email protected]> # Conflicts: # CHANGELOG.md
Signed-off-by: Junwei Dai <[email protected]>
Signed-off-by: Junwei Dai <[email protected]>
Signed-off-by: Junwei Dai <[email protected]>
Signed-off-by: Junwei Dai <[email protected]>
2.use exist xcontentUtil to read 3.move processor excution key to ProcessorExecutionDetail Signed-off-by: Junwei Dai <[email protected]>
Signed-off-by: Junwei Dai <[email protected]>
a805cd9
to
021ecec
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good start.
Few questions
-
Search pipelines can be used 3 ways: using it as default pipeline, as a temporary search request and through query param. Before applying the verbose pipeline param, we should check if search pipeline is applied in the Search Request. You have mentioned that in the RFC in resolvePipeline but I don't see the check in the PR.
-
Once the check is done then need 3 more tests for verbose pipeline execution with all the ways mentioned above
We are touching few core search files. @msfroh can you take a look at it too?
@@ -302,6 +305,9 @@ public SearchSourceBuilder(StreamInput in) throws IOException { | |||
if (in.getVersion().onOrAfter(Version.V_2_18_0)) { | |||
searchPipeline = in.readOptionalString(); | |||
} | |||
if (in.getVersion().onOrAfter(Version.CURRENT)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a general BWC dance we do for such cases, we don't need an issue to track this. @junweid62 can take care of it by following the below
- Do this on main with
onOrAfter(Version.V_3_0_0))
. Get it merged. - You'll need a manual backport to 2.x, where you do
onOrAfter(Version.V_2_19_0)
. Don't get it merged right away. - Before merging the backport to 2.x, open another PR on main to change it to onOrAfter(Version.V_2_19_0).
- Merge the backport PR.
- Merge the main version update PR.
server/src/main/java/org/opensearch/search/pipeline/ProcessorExecutionDetail.java
Outdated
Show resolved
Hide resolved
return; | ||
} | ||
|
||
if (data instanceof List) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could also have a case of data instanceof Map
. Moreover, did we try all the core processors to come up with such cases?
server/src/main/java/org/opensearch/search/pipeline/ProcessorExecutionDetail.java
Show resolved
Hide resolved
server/src/main/java/org/opensearch/search/pipeline/ProcessorExecutionDetail.java
Outdated
Show resolved
Hide resolved
*/ | ||
@SuppressWarnings("unchecked") | ||
public List<ProcessorExecutionDetail> getProcessorExecutionDetails() { | ||
Object details = attributes.get(PROCESSOR_EXECUTION_DETAILS_KEY); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why keeping it as Object here and not directly List<ProcessorExecutionDetail>
since that's how we added it in the first place? That would reduce the below check
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason I originally used an Object check was to account for potential flexibility in how values might be stored in attributes. Since attributes is a generic map, I wanted to ensure that even if PROCESSOR_EXECUTION_DETAILS_KEY was accidentally associated with a non-List value, the code would handle it gracefully without throwing a ClassCastException. The instanceof check provided that extra layer of safety.
However, after reviewing the usage of PROCESSOR_EXECUTION_DETAILS_KEY, I confirmed that it is always associated with a List, so I simplified the code as suggested.
server/src/test/java/org/opensearch/action/search/SearchResponseTests.java
Outdated
Show resolved
Hide resolved
Signed-off-by: Junwei Dai <[email protected]>
Let's make sure to include the status, plus any associated error message at a per-processor granularity as well. |
Signed-off-by: Junwei Dai <[email protected]>
2.refactor error message Signed-off-by: Junwei Dai <[email protected]>
Signed-off-by: Junwei Dai <[email protected]>
❌ Gradle check result for 7276e6c: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
server/src/main/java/org/opensearch/search/pipeline/Pipeline.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/opensearch/search/pipeline/PipelineProcessingContext.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/opensearch/search/pipeline/Pipeline.java
Outdated
Show resolved
Hide resolved
2.Removed redundant logic for cleaner and simpler implementation. Signed-off-by: Junwei Dai <[email protected]>
Hi @msfroh, thank you for your detailed feedback. I have addressed all your comments and made the suggested changes, please take a look and let me know if any further adjustments are needed. |
Signed-off-by: Junwei Dai <[email protected]>
❌ Gradle check result for 2c5759d: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
Signed-off-by: Junwei Dai <[email protected]>
❌ Gradle check result for 2c16992: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
Signed-off-by: Junwei Dai <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Almost there! Good work @junweid62
import org.opensearch.core.action.ActionListener; | ||
|
||
/** | ||
* Wrapper for SearchRequestProcessor to track execution details. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* Wrapper for SearchRequestProcessor to track execution details. | |
* Wrapper for SearchRequestProcessor to track execution details. | |
* | |
* @opensearch.internal |
expectThrows( | ||
IllegalArgumentException.class, | ||
() -> searchPipelineService.resolvePipeline(searchRequest, indexNameExpressionResolver) | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add assert for the message as well
assertTrue(e.getMessage(), e.getMessage().contains(" The 'verbose pipelines' option requires a search pipeline to be defined.");"));
* @param wrappedProcessor the actual processor to be wrapped | ||
*/ | ||
public TrackingSearchResponseProcessorWrapper(SearchResponseProcessor wrappedProcessor) { | ||
if (wrappedProcessor == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can have a test to check for the null case here
@@ -156,7 +160,7 @@ void transformRequest(SearchRequest request, ActionListener<SearchRequest> reque | |||
long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start); | |||
afterRequestProcessor(processor, took); | |||
onRequestProcessorFailed(processor); | |||
if (processor.isIgnoreFailure()) { | |||
if (processor.isIgnoreFailure() || r.source().verbosePipeline()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this extra check for verbosePipeline?
return pipeline.transformResponseListener(this, ActionListener.wrap(response -> { | ||
// Extract processor execution details | ||
List<ProcessorExecutionDetail> details = requestContext.getProcessorExecutionDetails(); | ||
logger.info("it is going to be executed in [{}]", details); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leftover?
@@ -426,6 +425,9 @@ public PipelinedRequest resolvePipeline(SearchRequest searchRequest, IndexNameEx | |||
pipeline = pipelineHolder.pipeline; | |||
} | |||
} | |||
if (searchRequest.source() != null && searchRequest.source().verbosePipeline() && pipeline.getId().equals(NOOP_PIPELINE_ID)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (searchRequest.source() != null && searchRequest.source().verbosePipeline() && pipeline.getId().equals(NOOP_PIPELINE_ID)) { | |
if (searchRequest.source() != null && searchRequest.source().verbosePipeline() && NOOP_PIPELINE_ID.equals(pipelineId) == false) { |
@@ -426,6 +425,9 @@ public PipelinedRequest resolvePipeline(SearchRequest searchRequest, IndexNameEx | |||
pipeline = pipelineHolder.pipeline; | |||
} | |||
} | |||
if (searchRequest.source() != null && searchRequest.source().verbosePipeline() && pipeline.getId().equals(NOOP_PIPELINE_ID)) { | |||
throw new IllegalArgumentException("The 'verbose pipelines' option requires a search pipeline to be defined."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throw new IllegalArgumentException("The 'verbose pipelines' option requires a search pipeline to be defined."); | |
throw new IllegalArgumentException("The 'verbose pipeline' option requires a search pipeline to be defined."); |
* @opensearch.internal | ||
* @since 2.19.0 | ||
*/ | ||
@PublicApi(since = "2.19.0") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be an internal API which is what is mentioned on L293
Jan 16 revision:
When the verbose pipeline mode is enabled, if a processor encounters an error during execution, the search process will not be interrupted. Instead, the error will be documented in the processor's execution details (e.g., in ProcessorExecutionDetail) and the remaining search process will proceed as normal.
Description
Related RFC : #16705
This PR introduces enhancements to OpenSearch's search pipeline functionality, focusing on improving the traceability and debugging of search request and response transformations. It addresses the increasing complexity of search pipeline processors by implementing verbose mode support, which provides detailed insights into processor execution.
Adds Verbose Mode for Search Pipelines:
verbose_pipeline
parameter to search requests, default to false.Improves Pipeline Debugging:
Supports All Pipeline Configurations:
Test Framework Enhancements:
Example output with request processor:
filter_query
response processor:rename_field
andsort
Related Issues
Resolves #14745
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.