Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Buffered insertions #61

Merged
merged 17 commits into from
Nov 2, 2023
Merged

Buffered insertions #61

merged 17 commits into from
Nov 2, 2023

Conversation

JulienR1
Copy link
Contributor

@JulienR1 JulienR1 commented Oct 31, 2023

Adds in-memory batching to the sink.
The pqueue has been removed in favor of a single promise that inserts data while more data is stored locally asynchronously.

The behavior can be configured in .env or with the CLI using MAX_BUFFER_SIZE and INSERTION_DELAY.

@JulienR1 JulienR1 changed the title Feature/buffer inserts Buffered insertions Oct 31, 2023
@JulienR1 JulienR1 marked this pull request as ready for review October 31, 2023 21:53
Copy link
Contributor

@DenisCarriere DenisCarriere left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks good, but your Promise handling looks a bit wonky

You are trying to recreate your Promise based Queue batching

You should simply just bring back in PQueue for that section alone

Have a look at:
https://github.com/sindresorhus/p-queue#advanced-example

await queue.onEmpty()
// or
await queue.onIdle();

To know when all the promises have been resolved to continue to the next step

const queue = new PQueue({ concurrency: config.queueConcurrency });

let nextUpdateTime: number = 0;
let promise: Promise<unknown> = Promise.resolve();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be replaced with PQueue

prometheus.queue_size.set(queue.size);
if (queue.size > config.queueLimit) await setTimeout(1000);
if (new Date().getTime() > nextUpdateTime) {
await promise;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

p-queue

blocks: [],
};

promise = new Promise<void>(async (resolve) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pqueue

}
}

resolve();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pqueue

@JulienR1
Copy link
Contributor Author

JulienR1 commented Nov 1, 2023

Edited the workflow with p-queue. It stores 2 promises at once: the timeout promise and the insertion promise.

The timeout promise keeps track of the next insertion time. It has to be in the pqueue to block the process until it is the correct insertion time later on.

The insertion promise is the call to ClickHouse. It will delay everything if it is slow.

Here is a summary of the process:

image

@DenisCarriere DenisCarriere merged commit 7c5bcec into main Nov 2, 2023
1 check passed
@DenisCarriere
Copy link
Contributor

👍 update looks good

@JulienR1 JulienR1 deleted the feature/buffer-inserts branch November 2, 2023 15:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants