Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Bulk upsert does not behave like a single Upsert, with an ingestion pipeline #10864

Open
acidul opened this issue Oct 23, 2023 · 3 comments
Labels
bug Something isn't working good first issue Good for newcomers Indexing Indexing, Bulk Indexing and anything related to indexing

Comments

@acidul
Copy link

acidul commented Oct 23, 2023

Describe the bug
A single Upsert works as expected with an ingestion pipeline.
But the same operation in a Bulk upsert doesn't give the same result.

To Reproduce
Steps to reproduce the behavior:

  1. Create an ingestion pipeline that aims to calculate a duration between 2 dates
PUT _ingest/pipeline/pipeline-duration
{
  "description": "This pipeline complete data with begin, end and duration",
  "processors": [
    {
      "script": {
        "description": "Old duration",
        "lang": "painless",
        "source": "ctx.old_duration = 'Old duration was : '+ctx.event_duration;"
      }
    },
    {
      "script": {
        "description": "Begin computation",
        "lang": "painless",
        "params": { "_source": "" }, 
        "source": """
          if (ctx.event_begin == null || ((long) ctx.event_min) < ((long) ctx.event_begin)) { 
            ctx.event_begin = (long)ctx.event_min; 
          } else { 
            ctx.event_min = ctx.event_begin 
          } 
        """
      }
    },
    {
      "script": {
        "description": "End computation",
        "lang": "painless",
        "params": { "_source": "" }, 
        "source": """
          if (ctx.event_end == null || ((long) ctx.event_max) > ((long) ctx.event_end)) { 
            ctx.event_end = (long)ctx.event_max; 
          } else { 
            ctx.event_max = ctx.event_end 
          } 
        """
      }
    },
    {
      "script": {
        "description": "Duration computation",
        "lang": "painless",
        "source": "ctx.event_duration = (long)(ctx['event_max']-ctx['event_min']);"
      }
    }
  ]
}

  1. Create an index with the ingest pipeline
PUT index-duration
{
  "aliases": {},
  "mappings": {
    "properties": {
      "event_min": {
        "type": "float"
      },
      "event_max": {
        "type": "float"
      },
      "event_begin": {
        "type": "date",
        "format": "epoch_millis"
      },
      "event_end": {
        "type": "date",
        "format": "epoch_millis"
      },
      "event_duration": {
        "type": "long"
      },
      "event_name": {
        "type": "keyword"
      },
      "old_duration": {
        "type": "keyword"
      }
    }
  },
  "settings": {
    "index": {
      "final_pipeline": "pipeline-duration"
    }
  }
}

  1. Execute the 2 following Update operations :
POST index-duration/_update/doc_duration
{
  "doc" : {
    "event_min": 1,
    "event_max": 2,
	"event_name": "occurrence_1"
  },
  "doc_as_upsert": true
} 

second time

POST index-duration/_update/doc_duration
{
  "doc" : {
    "event_min": 3,
    "event_max": 5,
    "event_name": "occurrence_2"
  },
  "doc_as_upsert": true
} 

then check the document
GET index-duration/_doc/doc_duration
it gives :

  "_source": {
    "event_end": 5,
    "old_duration": "Old duration was : 1",
    "event_duration": 4,
    "event_min": 1,
    "event_name": "occurrence_2",
    "event_max": 5,
    "event_begin": 1
  }

  1. Now execute the same Update operations but embedded in a bulk
POST _bulk
{ "update": { "_index": "index-duration", "_id": "doc_duration_issue" } }
{ "doc" : { "event_min": 1, "event_max": 2, "event_name": "occurrence_1"},"doc_as_upsert": true} 

second time :

POST _bulk
{ "update": { "_index": "index-duration", "_id": "doc_duration_issue" } }
{ "doc" : { "event_min": 3, "event_max": 5, "event_name": "occurrence_2"},"doc_as_upsert": true} 

Then check the document with
GET index-duration/_doc/doc_duration_issue
it gives :

  "_source": {
    "event_end": 5,
    "old_duration": "Old duration was : null",
    "event_duration": 2,
    "event_min": 3,
    "event_name": "occurrence_2",
    "event_max": 5,
    "event_begin": 3
  }

Expected behavior
Bulk Upsert and "Single" Upsert should have the same behavior when there is an ingestion pipeline.
We expect to get the same values than doc_duration :

  "_source": {
    "event_end": 5,
    "old_duration": "Old duration was : 1",
    "event_duration": 4,
    "event_min": 1,
    "event_name": "occurrence_2",
    "event_max": 5,
    "event_begin": 1
  }

Plugins
odfe-node1 opensearch-alerting 2.11.0.0
odfe-node1 opensearch-anomaly-detection 2.11.0.0
odfe-node1 opensearch-asynchronous-search 2.11.0.0
odfe-node1 opensearch-cross-cluster-replication 2.11.0.0
odfe-node1 opensearch-custom-codecs 2.11.0.0
odfe-node1 opensearch-geospatial 2.11.0.0
odfe-node1 opensearch-index-management 2.11.0.0
odfe-node1 opensearch-job-scheduler 2.11.0.0
odfe-node1 opensearch-knn 2.11.0.0
odfe-node1 opensearch-ml 2.11.0.0
odfe-node1 opensearch-neural-search 2.11.0.0
odfe-node1 opensearch-notifications 2.11.0.0
odfe-node1 opensearch-notifications-core 2.11.0.0
odfe-node1 opensearch-observability 2.11.0.0
odfe-node1 opensearch-performance-analyzer 2.11.0.0
odfe-node1 opensearch-reports-scheduler 2.11.0.0
odfe-node1 opensearch-security 2.11.0.0
odfe-node1 opensearch-security-analytics 2.11.0.0
odfe-node1 opensearch-sql 2.11.0.0

Host/Environment (please complete the following information):

  • docker images : opensearchproject/opensearch:2.11.0

Additional context
This issue is not exactly the same than the one posted here : #2607
That's why I prefer open a new one.

@acidul acidul added bug Something isn't working untriaged labels Oct 23, 2023
@msfroh
Copy link
Collaborator

msfroh commented Oct 23, 2023

Interesting... I suspect that the root cause may be the same as #2607, but this is another good test case.

Hopefully we can address both issues with one fix.

@msfroh msfroh added the good first issue Good for newcomers label Oct 24, 2023
@gaobinlong
Copy link
Collaborator

It looks a little bit complicated, update api will transform updateRequest to a new indexRequest if the document exists, and the new doc in the indexRequest has been merged with the existing document, see:

return prepareUpdateIndexRequest(shardId, request, getResult, request.detectNoop());

, after that the new indexRequest will be sent to TransportBulkAction and then execute pipeline.

However, bulk api doesn't do that transformation before executing pipeline, so the behaviors are different. We may not transform updateRequest in TransportBulkAction because the method updateHelper.prepare() can only be called at shard level.

Another finding is that the behavior of executing pipeline between upsert AND doc_as_upsert are also different in bulk api, that's because of this line:

indexRequest = updateRequest.docAsUpsert() ? updateRequest.doc() : updateRequest.upsertRequest();
.

When the document with ID 1 exists, and doc_as_upsert is true, the pipeline will be executed on the partial doc {"x":3, "y":5}:

curl -X POST "localhost:9200/_bulk?pretty" -H 'Content-Type: application/json' -d'
{ "update": { "_index": "test1", "_id": "1" } }
{ "doc" : {"x":3, "y":5}, "doc_as_upsert":true}

, but when upsert is set, the pipeline will be executed on the upsert doc {"x":1}, nothing changed because this doc will not be used anymore:

curl -X POST "localhost:9200/_bulk?pretty" -H 'Content-Type: application/json' -d'
{ "update": { "_index": "test1", "_id": "1" } }
{ "doc" : {"x":3, "y":5}, "upsert":{"x":1}}

@anasalkouz anasalkouz added the Indexing Indexing, Bulk Indexing and anything related to indexing label Jan 4, 2024
@peternied
Copy link
Member

[Triage - attendees 1 2 3 4]
@acidul Thanks for filing this bug

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working good first issue Good for newcomers Indexing Indexing, Bulk Indexing and anything related to indexing
Projects
None yet
Development

No branches or pull requests

5 participants