Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Memory leak with parallel_bulk on AOSS #871

Open
ArnaudParant opened this issue Dec 18, 2024 · 7 comments
Open

[BUG] Memory leak with parallel_bulk on AOSS #871

ArnaudParant opened this issue Dec 18, 2024 · 7 comments
Labels
bug Something isn't working

Comments

@ArnaudParant
Copy link

ArnaudParant commented Dec 18, 2024

Hi,

What is the bug?

There is a big memory leak with helpers.parallel_bulk on Amazon Opensearch Serverless.
Actually, it almost never return the success, info of the input actions and finish with an OOM.

It's curious because it does not have the issue on the classic Amazon Opensearch version or Elasticsearch.

How can one reproduce the bug?

Use the bellow code.

What is the expected behavior?

Well return the result of the input actions and no memory leak.

What is your host/environment?

Local machine inserting in Amazon Opensearch Serverless.

Do you have any additional context?

opensearch-py==2.7.1
boto3==1.35.45

Test code

Just replace AOSS_HOST with your actual host.

import random
import string
import boto3
from opensearchpy import AWSV4SignerAuth, OpenSearch, RequestsHttpConnection, helpers


OS_BULK_SIZE = 1_000
OS_THREAD_COUNT = 4
OS_QUEUE_SIZE = 4
OS_INDEX_NAME = "test_oom"
AOSS_REGION = "eu-west-3"
AOSS_HOST = f"xxx.{AOSS_REGION}.aoss.amazonaws.com"

DEFAULT_ELASTICSEARCH_MAPPING = {
    "mappings": {
        "dynamic": "strict",
        "properties": {
            "id": {"type": "integer"},
            "value": {"type": "keyword"},
        },
    },
}


def get_client():
    credentials = boto3.Session().get_credentials()
    auth = AWSV4SignerAuth(credentials, AOSS_REGION, "aoss")
    return OpenSearch(
        hosts=[{"host": AOSS_HOST, "port": 443}],
        http_auth = auth,
        http_compress = True,
        use_ssl = True,
        verify_certs = True,
        timeout=1,
        connection_class = RequestsHttpConnection,
        pool_maxsize = 20
    )


def generator(stop: int = 40_000_000):
    for id in range(stop):
        doc = {"id": id, "value": random.choices(string.ascii_lowercase, k=30)}
        yield {"_op_type": "index", "_index": OS_INDEX_NAME, "_id": id, "_source": doc}


def main():
    client = get_client()

    exists = client.indices.exists(OS_INDEX_NAME)

    if exists:
        client.indices.delete(index=OS_INDEX_NAME, ignore=[400, 404])
        print(f"Deleted index: {OS_INDEX_NAME}")

    client.indices.create(index=OS_INDEX_NAME, body=DEFAULT_ELASTICSEARCH_MAPPING)
    print(f"Created index: {OS_INDEX_NAME}")

    count = 1
    print("Starting insertion...")
    for _ in helpers.parallel_bulk(client, generator(), thread_count=OS_THREAD_COUNT, chunk_size=OS_BULK_SIZE, queue_size=OS_QUEUE_SIZE):
        if count % OS_BULK_SIZE == 0:
            print(f"Inserted {count} documents")
        count += 1

    print("Done")


if __name__ == "__main__":
    main()

Temporary solution

def bulk_generator(iterable, size):
    bulk_count = 0
    count = 0
    bulk = []

    for elm in iterable:
        bulk.append(elm)
        count += 1

        if count >= size:
            bulk_count += 1
            logger.debug(f"Sending bulk {bulk_count} of {count} documents")
            yield bulk
            count = 0
            bulk = []

    if count > 0:
        bulk_count += 1
        logger.debug(f"Sending bulk {bulk_count} of {count} documents")
        yield bulk


count = 1
print("Starting insertion...")
for bulk in bulk_generator(documents, 300_000):
    for _ in helpers.parallel_bulk(client, generator(), thread_count=OS_THREAD_COUNT, chunk_size=OS_BULK_SIZE, queue_size=OS_QUEUE_SIZE):
        if count % OS_BULK_SIZE == 0:
            print(f"Inserted {count} documents")
        count += 1

print("Done")
@ArnaudParant ArnaudParant added bug Something isn't working untriaged Need triage labels Dec 18, 2024
@dblock dblock removed the untriaged Need triage label Dec 18, 2024
@dblock
Copy link
Member

dblock commented Dec 18, 2024

@ArnaudParant what's the root cause? Is the server timing out causing the parallel requests to back up?

It would be great to turn the repro into a failing test.

@ArnaudParant
Copy link
Author

Hi,

It does not seem to have an external root cause, no error is raised and documents are even well inserted into the server.

The repro is failing on my side when connected to AOSS, taking all the RAM on the machine, pod or something until OOM.

@dblock
Copy link
Member

dblock commented Dec 19, 2024

Don't lose the repro. It's likely a pool being held too long, log, buffering, or something that should have long been garbage collected. Try to add and get a flame graph to see what the process is using memory for? There's a bunch of ideas in https://www.geeksforgeeks.org/monitoring-memory-usage-of-a-running-python-program/, I don't have a favorite one.

@ArnaudParant
Copy link
Author

I modified a bit the code with prints and profile.

from memory_profiler import profile

def generator(stop: int = 40_000_000):
    count = 1

    for id in range(stop):
        doc = {"id": id, "value": random.choices(string.ascii_lowercase, k=30)}
        yield {"_op_type": "index", "_index": OS_INDEX_NAME, "_id": id, "_source": doc}

        if count % 10_000 == 0:
            print(f"Sent {count:,} documents")

        count += 1


@profile
def main():
    client = get_client()

    exists = client.indices.exists(OS_INDEX_NAME)

    if exists:
        client.indices.delete(index=OS_INDEX_NAME, ignore=[400, 404])
        print(f"Deleted index: {OS_INDEX_NAME}")

    client.indices.create(index=OS_INDEX_NAME, body=DEFAULT_ELASTICSEARCH_MAPPING)
    print(f"Created index: {OS_INDEX_NAME}")

    try:
        print("Starting insertion...")

        count = 1
        documents = generator()
        for success, info in helpers.parallel_bulk(client, documents, thread_count=4, chunk_size=4, queue_size=1_000):
            print(f"Deque: {success}, {info}")
            count += 1

    except KeyboardInterrupt:
        pass

    print("Done")

Unfortunately @profile is not working on the generator function, not sure if it's because of the generator return or because it's used inside a function already using the profile decorator, anyway I add some print to see the iterations.

$> time python test_oom.py
Deleted index: test_oom
Created index: test_oom
Starting insertion...
Sent 10,000 documents
Sent 20,000 documents
Sent 30,000 documents
Sent 40,000 documents
Sent 50,000 documents
Sent 60,000 documents
Sent 70,000 documents
Sent 80,000 documents
Sent 90,000 documents
Sent 100,000 documents
Sent 110,000 documents
Sent 120,000 documents
Sent 130,000 documents
Sent 140,000 documents
Sent 150,000 documents
Sent 160,000 documents
Sent 170,000 documents
Sent 180,000 documents
Sent 190,000 documents
Sent 200,000 documents
^CDone
Filename: /home/aparant/test_oom.py

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
    49     50.0 MiB     50.0 MiB           1   @profile
    50                                         def main():
    51     50.5 MiB      0.5 MiB           1       client = get_client()
    52
    53     51.9 MiB      1.3 MiB           1       exists = client.indices.exists(OS_INDEX_NAME)
    54
    55     51.9 MiB      0.0 MiB           1       if exists:
    56     51.9 MiB      0.0 MiB           1           client.indices.delete(index=OS_INDEX_NAME, ignore=[400, 404])
    57     51.9 MiB      0.0 MiB           1           print(f"Deleted index: {OS_INDEX_NAME}")
    58
    59     51.9 MiB      0.0 MiB           1       client.indices.create(index=OS_INDEX_NAME, body=DEFAULT_ELASTICSEARCH_MAPPING)
    60     51.9 MiB      0.0 MiB           1       print(f"Created index: {OS_INDEX_NAME}")
    61
    62     51.9 MiB      0.0 MiB           1       try:
    63     51.9 MiB      0.0 MiB           1           print("Starting insertion...")
    64
    65     51.9 MiB      0.0 MiB           1           count = 1
    66     51.9 MiB      0.0 MiB           1           documents = generator()
    67    293.1 MiB    241.3 MiB           1           for success, info in helpers.parallel_bulk(client, documents, thread_count=4, chunk_size=4, queue_size=1_000):
    68                                                     print(f"Deque: {success}, {info}")
    69                                                     count += 1
    70
    71    293.1 MiB      0.0 MiB           1       except KeyboardInterrupt:
    72    293.1 MiB      0.0 MiB           1           pass
    73
    74    293.1 MiB      0.0 MiB           1       print("Done")



real    48m16.843s
user    2m1.290s
sys     0m12.843s

I cut the process after a while, but we clearly see that helpers.parallel_bulk does not return, even if the input is consumed.
The memory usage is not huge here, but documents are very small, on my real use case it was much more important.
Anyway, it holds at least the returns.

On AOSS side we see that documents are well ingested

GET /_cat/indices?v
health status index                           uuid                 pri rep docs.count docs.deleted store.size pri.store.size
              test_oom                        n8by8pMB7QvxdGoaCbAe             202855            0     26.7mb         26.7mb

The store size in AOSS correspond to the memory increment of parallel_bulk, maybe it holds the documents as well.

@dblock
Copy link
Member

dblock commented Dec 23, 2024

Check what parallel_bulk is doing and try to modify it to narrow down what is being held onto? There should be a way to yield results incrementally and avoid holding onto them as a permanent workaround/fix.

@ArnaudParant
Copy link
Author

ArnaudParant commented Dec 23, 2024

Sorry, I will not have time with my company to do more investigation on it. I just wanted to write a bug report with a code to reproduce, and we'll probably not be the only ones with the issue, I guess.

For the moment, we already found a workaround (wrote in my first post)
Another way is to not use parallel_bulk, but streaming_bulk instead, for example.

@dblock
Copy link
Member

dblock commented Dec 23, 2024

Thanks for your help @ArnaudParant!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants