It's common to see high volume of data ingestion plus having to run aggregate every minute or any other defined time span, for real time insights. To achieve this you probably need a database, a streaming tool, and some query tool. And even with this, most ofteh there're challenges with scalability and kv inserts, and speed of which to aggregate data since they keep coming in every second at large volume. Coupled with a high speed SLA, it becomes a headache for many an architects and DBAs.
This demo wants to show you a Couchbase point-of-view. There are more than 1 way to do it actually, we just need a good balance of dev simplify, service level, and scalability. In this demo we'll look at 3 approaches, what i call the CONVENIENT the QUICK, and the QUICKEST.
๐๐ป Financial service is not the only industry that can benefit from this demo. Essentially any use case that require lightning-fast aggregation applies.
Set up a Couchbase cluster with Data, Index, Query, Eventing service deployed. If you are not familiar with Couchbase cluster setup, follow this documentation. I'm using 4 machines of t2.2xlarge with 16 vCPU and 32GiB of memory. 2 machines with Data, Index and Query servies deployed, and 2 other with Eventing.
Yes. You can deploy services independently on Couchbase that's one of the benefits you get - making your infrastructure more resilient, and so sudden surge on query, for example, won't consume all computing powers and cause massive time out to your data insertions. To dig deeper, read our documents on Multi-dimensional Scaling.
Clone this repo
git clone https://github.com/sillyjason/auto_data_aggregation
At the project root directory, create a .env file with the following env variables
# EE Environment Variables
EE_HOSTNAME= // hostname of any Couchbase node with Data service deployed
EVENTING_HOSTNAME= // hostname of your Couchbase node with Eventing service deployed
#CB User Credential
CB_USERNAME= // username for admin credentials to Couchbase cluster
CB_PASSWORD= // password for admin credentials to Couchbase cluster
Install Python dependencies (and create a python virtual environment if it helps)
pip install -r requirements.txt
Run the script for setting up buckets, scopes and collections
python3 setupcollections.py
Run the script for setting up Eventing functions
python3 setupeventing.py
๐๐ป Eventing is Couchbase's answer to event-driven architecture, at scale. Other than common implementations with oplogs, we implemented a pub/sub streaming mechanism to make sure scalability is taken care of from day 1.
We'll call the Couchbase Rest API endpoints to set up 3 Eventing functions. on_data_input, together with on_data_input_junior is Couchbase's answer to real-time data processing at speed and scale.
recur_aggregation_trigger is for creating the recurring job for aggregation every minute.
If you're interested in understanding Couchbase Eventing in further depth, this page has a few short examples.
Run the script to build relevant indxes and create initial documents needed as triggers.
python3 setupdata.py
We're ready to start ingesting data. Run the dataingest script, which will start writing into the Couchbase bucket with a speed of 1000 insert per second.
python3 dataingest.py
Go to Couchbase and verify this qps. The ops/sec metric should reflect this number, give or take. "Give" as in additional writes from the Eventing functions, "take" as in my when the local machine (such as my laptop) running has limited computing power.
Go to Eventing tab, and let's deploy the function recur_aggregation_trigger.
๐๐ป Leave the other 2 functions alone at the moment. They'll serve their purpose later.
Now when we go to Documents tab, select data.aggregation.m_rt_all namespace, the minutely aggregation result show gradually show up.
๐ Voilร ! Now we see Couchbase being more than a mere NoSQL JSON store. If you're wondering how the minutely aggregation is done, the secret sause is a mix of the following:
- Couchbase Eventing which is an in-memory pub-sub implementation of db-level event-driven architecting,
- Ability to embed SQL queries into the script of the Eventing function
- Create Recurring Timers directly with Couchbase Eventing to automate the job
Quite convenient eh? But let's do some checking and evaluation.
How do we make sure the query is not missing out on any transactions? Say, for the document 2024-11-16T10:48:00Z, which represent the minute of 10:48, the output shows a total of 56,000 documents retrieved from query. Is that the ACTUAL number of documents inserted into Couchbase for the past minute?
Easy. Couchbase let's you query your data with SQL syntax. Let's run the following at Query tab:
SELECT SUM(1) AS total_transactions
FROM `main`.`data`.`data`
WHERE time_str LIKE "2024-11-16 10:48%"
Bingo. The deviation from 60,000 (1000 writes per second) is again, a fact that I need to upgrade my laptop.
Let's run some queries to see when exactly are our Eventing timers fired, and hence, get some insights of when the output would be available.
Stay on Query tab, and let's fire the following.
select start_time_fmt, trigger_time_fmt
from `main`.`aggregation`.`m_rt_all`
order by trigger_time
Switch to Table view. Notice anything? The trigger_time_fmt indicates when this function is triggered. And we can see for every minute (10:35, for example), the aggregation needs to wait around 5 seconds past the next minute to trigger (10:36:06, in the same example). Then it's safe to assume the process of query and update aggregate doc will add up the latency.
Let's stop the ingestion script for now and think about this option. For some industries (such as F&B or retail), this seconds delay shouldn't be a big deal. For others (such as financial services), this delay is unacceptable. So, how can we make it faster?
Couchbase recurring timer, despite being convenient to set up and manage, does not guarantee wall-clock accuracy. That is to say, if the requirement is the aggregation function be fired at exactly the beginning nanoseond of the minute, Couchbase Eventing might not be the best option. Let's look at option2.
Let's first flush main bucket.
And we don't need the function recur_aggregation_trigger anymore. So let's de-activate it.
Go back to IDE, open another Terminal window, and run the timer script which will trigger the aggregation exactly at the beginning milliseconds of the minute:
python3 timer.py
Go grab a cub of your favourate espresso, come back and there should already be some minutely aggregations.
After the caffeine break, let's use a more convenient way to look up the aggregate data this time. Go to Query tab.
select meta().id,
count,
sender_task_start_time,
MILLIS_TO_UTC(META().cas/1000000) as doc_available_time_fmt,
trunc(META().cas/1000000 - str_to_millis(meta().id) - 60000) as readiness_time_delta
from `main`.`aggregation`.`m_api_all`
order by meta().id
We're now querying against the m_api_all collection which holds the result of queries fired through timer.py. Switch to Table view so results are more readable.
Let's break it down:
- "count": how many transactions are captured in this aggregation
- "doc_available_time_fmt": when this aggregation document became available on server. We're leveraging a metadata attribute here
- "id": on which minute was this aggregation done
- "readiness_time_delta": take the first row as an example. It was for 12:27:00, so once 12:28:00 is passed, this doc should be ready ASAP. The time delta since 12:28 is therefore calculated
So, an aggregation every minute is ready at ~500 milliseconds passed the minute. Let's stop the data ingestion script do some reflection.
Is it the perfect approach? Depends.
For one, 500ms might not be a satisfactory latency in some cases.
More importantly, Couchbase Query service plus Index Service is used here for aggregation, and although through Couchbase Database Change Protocol (DCP), data mutations are subscribed by Indexers to update indexes in near-real-time, they adhere to a eventual-consistency pattern. So in situations where there's huge amount of data mutations and limited resources, it can take a while for update the index.
While Couchbase Query provides SCAN CONSISTENCY for queries, we don't want the index building to becomes bottlenecks when speed is paramount.
๐๐ป There's also a key distinction between client time and server time. We're implementing based on the former but there are also scenarios where the latter makes more sense.
Is there a way to make sure the aggregation doc is there at the exact millisecond pass the minute?
Yes.
Couchbase is built for excellent scalability with key-value ops. Together with Eventing, we can capture mutations and perform aggregation logics, and concurrently update result extremely fast and reliably.
That means we don't have the wait until the minute is passed to start working on the aggregation; rather, every new transaction is captured and aggregated instantly. Let's dig in (remember to shut down the timer.py scripts if you have not yet, since we no longer need it).
Let's flush again the bucket. Then, go to Eventing tab and deploy function on_data_input_junior. Leave on_data_input alone for now.
Once deployment is done, restart the data ingestion script. Give it a couple of minutes, and let's run the query again to see how early our aggregation can be ready.
select meta().id,
count,
sender_task_start_time,
MILLIS_TO_UTC(META().cas/1000000) as doc_available_time_fmt,
trunc(META().cas/1000000 - str_to_millis(meta().id) - 60000) as readiness_time_delta
from `main`.`aggregation`.`m_e_kv_all`
order by start_time
Pay attention to the readiness_time_delta output.
Why are there minus time delta? Well, that means the aggregation is done right after the data mutations.
What happening under the hood, is that the 1000 writes per second is being instantaneously passed from Couchbase Data service to Eventing service, which will write back to the same aggregation doc. This means, if your last transaction happens at the last 10 millisecond of the minute, given Couchbase 5 ms to process, the aggregation document is ready BEFORE the next minute!
If you're a database guru, a big red light's probably looming over your head - race condition. After all, we are doing 1000 thousand aggregation updates per second on a single document.
Worry not, it can be taken care of by Couchbase's CAS mechanism and versatility of Javascript used by Couchbase Eventing functions. If you are interested on how exactly this is done, head over to Eventing, while we can see how a re-try mechanism is implemented to circumvent the race condition problem.
Here's a quick code sample of how this is done:
// use a while loop to keep retrying the operation until it succeeds
// this is to handle the potential CAS mismatch issue
// define the success flag and retry counter
let success = false;
let retry_counter = 0;
while ( !success ) {
// update the document with the new transaction
// the m_e_kv_all is the collection varialbe, while aggregation_meta contains the
// CAS value will should be compared and validated before this update take effect.
// otherwise, an error message will be returned to trigger retries.
let aggregate_result = couchbase.mutateIn(m_e_kv_all, aggregation_meta, [
couchbase.MutateInSpec.replace("count", new_count),
couchbase.MutateInSpec.replace("total_amt", new_total_amt),
couchbase.MutateInSpec.replace("average_amt", new_average_amt),
couchbase.MutateInSpec.upsert("cust_type", new_cust_type),
]);
if (aggregate_result.success) {
// print the try_counter only if it's greater than 0
if ( retry_counter > 0 ) {
log("insert success at retry: ", retry_counter);
}
// set the success flag to true and exit the loop
success = true;
}
// increment the retry counter
retry_counter++;
// retry
var result = couchbase.get(m_e_kv_all, { "id": doc_key });
if (result.success) {
current_doc = result.doc;
aggregation_meta = result.meta;
}
}
If you'interested to dig on, there's also the other function on_data_inpput, which does exact the same thing as its little brother - except it aggregates at individual user level. Feel free to deploy it and see what happens to the main.aggregation.m_e_kv_users collection!
And in this case, since we'll be looking at multiple user aggregation documents, the way to validate that Eventing is indeed capturing all mutations, is through another quick SQL, something like below:
SELECT SUM(count) AS count_sum
FROM `main`.`aggregation`.`m_e_kv_users`
WHERE META().id LIKE "2024-11-18T12:21%"