Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Abnormal nodes stats in indexing/search metrics during on-going index/shard closure. #12552

Open
vikasvb90 opened this issue Mar 7, 2024 · 9 comments
Assignees
Labels
bug Something isn't working Indexing & Search Indexing Indexing, Bulk Indexing and anything related to indexing

Comments

@vikasvb90
Copy link
Contributor

vikasvb90 commented Mar 7, 2024

Describe the bug

Node level stats are supposed to be monotonically increasing in nature until the node is restarted but this principle doesn't hold during a race condition when there are index or shard closures going on in parallel.

If there are one or more shard closures or index closures happening in parallel with nodes stats execution then during fetch of any stats of shards whose states have changed to closed, either an IllegalIndexShardStateException or an AlreadyClosedException is thrown which is ignored here and stats of the respective shard are skipped in the response. This inconsistency of lower nodes stats values being returned in response only stays until beforeIndexShardClosed of oldstats is invoked.

How is this still possible when beforeIndexShardClosed is updating old shard stats before shard is closed ?
Delta shard stats are computed here by iterating over in-memory map of indices of IndicesService and then over in-memory map of shard ids present in IndexService against each Index. Before method beforeIndexShardClosed gets invoked index gets removed from the indices map here and similarly shard gets removed from shard id map here . Between removing an index or shard from the respective in-memory map and invoking beforeIndexShardClosed if any stats request comes in, it will only be able to return yet to be updated value of old shard stats.

Related component

Indexing

To Reproduce

This isn't easy to reproduce as stats needs to be invoked at certain intermediate state. An integ test can reproduce this.

Expected behavior

Lets explore what options we have:

  1. Swapping execution of removing index or shard id from in-memory maps with closing index/shard. We probably don't want to delay removing shard or index from node on a close request and this will also not completely resolve the issue as there are still multiple race conditions such as a shard gets closed and by this time old shard stats are not yet updated, etc.
  2. Changing sequence of shard stats fetch and old node stats fetch will also not work because of similar race conditions.
  3. Unless we develop a coordination between index/shard closure and stats update, there isn't any solution which can resolve this and building this seems an over kill. The simplest approach would be to gracefully handle this scenario on the client side by maintaining a shard_count field in node stats so that whenever client sees a lower node stats value and a lower shard_count then it can simply ignore that event (or decide whatever it wants to do with it).

Sequence in which stats are fetched today is old stats which store stats of the shards previously assigned on the node are copied to a local variable, stats of current shards are fetched and then these are merged to old stats local variable and returned.

Assuming R-> Remove index/shard from map, O -> Copying old stats in local variable, C-> Shard or index closure, D-> Delta stats of the current shard. And (R happens before C and O happens before D)
Following combinations are possible and all result either in lesser shard_count and lesser node stats value or correct node stats.

  1. ORCD -> Less
  2. ROCD -> Less
  3. RCOD -> Correct
  4. ORDC -> Less
  5. ODRC -> Correct
  6. RODC -> Less

On receiving lesser shard_count and lesser node stats, client can either choose to re-invoke stats or ignore the event.

Edit 1:
Even with just shard_count we won't be able to distinguish between node restart case and shard closing case. We need to keep another in-memory counter which tracks count of removed shards and increments whenever an index or a shard is removed from indices or shard map. In stats execution, we can match removed count with oldShardStats shard_count and if match fails then we can return a field partial_results set to true in the response. We can still send shard_count as an additional info. We also need to distinguish b/w an index closure and a shard closure and prevent counting shards again when index is closed in the parent path.

Additional Details

Plugins
Please list all plugins currently enabled.

Screenshots
If applicable, add screenshots to help explain your problem.

Host/Environment (please complete the following information):

  • OS: [e.g. iOS]
  • Version [e.g. 22]

Additional context
Add any other context about the problem here.

@vikasvb90 vikasvb90 added bug Something isn't working Indexing & Search labels Mar 7, 2024
@github-actions github-actions bot added Indexing Indexing, Bulk Indexing and anything related to indexing untriaged labels Mar 7, 2024
@khushbr khushbr self-assigned this Mar 7, 2024
@khushbr khushbr removed the untriaged label Mar 7, 2024
@khushbr
Copy link

khushbr commented Mar 8, 2024

@vikasvb90 Nice find!

I have couple of thoughts in regard to proposed solution:

  1. Instead of maintaining shard_count in stats, we can maintain a inflight_shards_closure variable, the value is incremented post shardId removal from shards Hashmap here and decremented post beforeIndexShardClosed in IndexService. inflight_shards_closure will be reset to default value 0L on process restart. Additionally, I propose that close/remove state management should operate at shard level as a shard can be closed for reasons other than index removal (Stale Shard copy, Routing update failure) - the index removal proceeds the shard closure, so in case where index is removed from hashmap(A) but shards have not been closed (B), and a stats request comes in, the response is computed ignoring the removed index.
  2. Instead of partial_results, we should respond with field stale_results to hint 'the cumulative statistics also account for shards that are no longer on this node, which is tracked by oldShardsStats'. This information is present as a code comment, we can look into adding this to public documentation in context of newly added field stale_results.
  3. On Client side handling of stale_results, it should be treated as missing data and re-tried with back-off.
  4. I am wondering if we need the additional field shard_count in stats response; should it map to shard_count in oldShardStats or a f(oldShardStats, statsByShard, inflight_shards_closure) ? Alternatively, partial_results field would suffice to communicate to client that the result returned is Stale due to currently inflight shard actions.

@vikasvb90
Copy link
Contributor Author

vikasvb90 commented Mar 8, 2024

@khushbr

  1. As we discussed, there are race conditions in incrementing and decrementing inflight_shards_closure. Also, in case of failures in shard closure, we can't guarantee the variable to get decremented. If we return partial_results as true and if client has built retries on it then client will repeatedly invoke stats and can never get a proper response resulting in missing metric points indefinitely.
  2. stale denotes something is outdated but in our case something is missing (stats of some shards) and hence, the name partial. Open to other thoughts.
  3. We cannot afford to just have counting only at shard level as there may be race condition between an index close and a shard close.
  4. shard_count I proposed this just to return an additional info in stats

@khushbr
Copy link

khushbr commented Mar 14, 2024

Reviving this thread.

The Index closure operation involves multiple-steps IndexID removal from Hashmap -> Index Close call -> Per Shard Remove call -> ShardID removal from Hashmap -> Shard Close call -> finally, the asynchronous listener beforeIndexShardClosed performing the oldShardsStats update.

A fix can be updating the 'oldShardsStats' right after the 'IndexID removal from Hashmap' , but that means running expensive stat computation while blocking more critical actions. Also, since the operation is not atomic, it will only reduce the window of race condition but not completely eliminate.

Computing the 'partial_results' status runs into same reasoning, we cannot deterministically deduce the value due to race conditions. Additionally, from user point of view - partial result of stats per node makes no sense. We should either send no stats (if we can deterministically identify this scenario) or send the complete value, as the user has no way to determine what is missing and how to address it.


Instead, we can update the documentation of _stats API to convey the API is best effort and eventually consistent; in rare scenario of shard closure - the stats value is going to be lower than actual.

The client on their end can choose to treat such scenarios (present value is lower than previous) as missing data (recommended) as the stat value is expected to be monotonically increasing.

@vikasvb90
Copy link
Contributor Author

vikasvb90 commented Mar 14, 2024

Additionally, from user point of view - partial result of stats per node makes no sense. I disagree with this. It definitely makes sense by giving an indication properly via partial_results field. This can also be used to extend existing index level stats to shard level stats where if some shards are missing, stats against some shards wouldn't be returned. This approach aligns with OpenSearch's bulk behavior as well where overall bulk status is 2xx but there can be partial failures in subset of docs and an indication via field errors is sent to indicate that there were partial failures.
I don't agree that just updating public doc saying that this API may not be reliable is a good idea. This can be a very common case when there are thousands of shards on cluster and frequent rollovers happen. On client side, this would require a lot of logic building to figure out if there's an inconsistency in stats itself vs node being restarted.

Regarding the approach I suggested above, it doesn't require any heavy computation. From consistency perspective, since we are updating in-memory shardClosed count right after index/shard closure, chances of nodes/stats being inconsistent are negligible. Also, we can completely eliminate even this rare condition by swapping removal from map and updating in-memory closed shard count. We first update in-memory closed shard count and then remove from map and in node stats we only set the response to partial when we see shard count in old stats is less than shard count in in-memory variable. This will make this API completely deterministic and we also don't need to put a question mark on the reliability of this API.

Edit: There can be a false positive in rarest of the scenarios but even then client doesn't need to handle this explicitly. Client can be generic and just have retries/ignorance whenever it sees partial_results set to true.

@khushbr
Copy link

khushbr commented Mar 14, 2024

Also, we can completely eliminate even this rare condition by swapping removal from map and updating in-memory closed shard count.

What happens if the stats API requests lands between removal from Index Map and before updating the in-memory closed shard count. Within an index, we can have 'n' shards and closing each shard(And the corresponding 'oldShardsStats' update) is sequential operation. Maintaining 2 counters - Index closed and shard closed will still not guarantee strong consistency.

@vikasvb90
Copy link
Contributor Author

vikasvb90 commented Mar 14, 2024

We don't need two counters. Just one counter for overall closed shard count. Also, as I said, we first update the in-memory shard closed count and then update index or shard map.

@shwetathareja
Copy link
Member

shwetathareja commented Mar 18, 2024

Thanks @vikasvb90 and @khushbr for discussion on the possible solutions. This gives good context.

The stats APIs are not strongly consistent API but considering these are cumulative metrics at node level, it doesn't provide good user experience if the value goes down. @vikasvb90 you can return partial but it doesn't give any idea which metric is impacted, would this partial status be at node level or for overall stats API? In order to figure what partial means here user would need to compare against previous value of each metric to understand partial here. Bulk API is different, you get doc level status, you know exactly which doc passed/ failed. Similarly, searches can be partial but user decides on which query to get partial response and application would have handling for the same as well. In either case, you don't need previous state to understand the impact of partial.

Lets explore the closed shard count counter approach further (as long as it doesn't impact applying the cluster state).

@khushbr
Copy link

khushbr commented Mar 19, 2024

I did a dirty POC for a fix based on keeping track of shard stats b/w Index removal from Hashmap and update of OldShardStats. PendingShardClosure is a ConcurrentLinkedQueue storing the shards pending closure, the new IndexShard elements are inserted at the tail of the queue.

Add IndexShard to pendingShardClosure queue in removeIndex()

                // For this Index, add all IndexShard refs to pendingShardClosure queue
                IndexService indService = newIndices.get(index.getUUID());
                while (indService.iterator().hasNext()) {
                    pendingShardClosure.add(indService.iterator().next());
                }
                indexService = newIndices.remove(index.getUUID());
                assert indexService != null : "IndexService is null 

Remove IndexShard from pendingShardClosure before adding to OldShardStats in beforeIndexShardClosed()

public synchronized void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) {
            if (indexShard != null) {
                // First remove IndexShard from pendingShardClosure queue before updating OldShardsStats
                final boolean removedShard = pendingShardClosure.remove(indexShard);
                if (removedShard)
                    getStats.addTotals(indexShard.getStats());
                    indexingStats.addTotals(indexShard.indexingStats());
                    // if this index was closed or deleted, we should eliminate the effect of the current scroll for this shard
                    searchStats.addTotalsForClosingShard(indexShard.searchStats());
                    mergeStats.addTotals(indexShard.mergeStats());
                    refreshStats.addTotals(indexShard.refreshStats());
                    flushStats.addTotals(indexShard.flushStats());
                    recoveryStats.addTotals(indexShard.recoveryStats());
            }
        }
    }

Make local copy of pendingShardClosure; For each IndexShard, add the stats to commonStats in stats(). This is done after OldShardStats computation.

        final IndexShard[] pendingShards = pendingShardClosure.toArray(new IndexShard[0]);
        for(IndexShard shard: pendingShards) {
            for (Flag flag : flags.getFlags()) {
                switch (flag) {
                    case Get:
                        commonStats.get.add(shard.getStats());
                        break;
                    case Indexing:
                        commonStats.indexing.add(shard.indexingStats());
                        break;
                    case Search:
                        commonStats.search.add(shard.searchStats());
                        break;
                    case Merge:
                        commonStats.merge.add(shard.mergeStats());
                        break;
                    case Refresh:
                        commonStats.refresh.add(shard.refreshStats());
                        break;
                    case Recovery:
                        commonStats.recoveryStats.add(shard.recoveryStats());
                        break;
                    case Flush:
                        commonStats.flush.add(shard.flushStats());
                        break;
                }
          

In summary, without taking a lock, we cannot guarantee consistency. In this solution, depending on the order of execution of events - the response can end up having double or missing data.

A - Remove IndexShard from pendingShardClosure queue in beforeIndexShardClosed()
B - Add stats of IndexShard to OldShardStats in beforeIndexShardClosed()
C - Make Local copy of OldShardStats in stats()
D - Make Local copy of pendingShardClosure in stats()

To reduce the chances of adding double value, we enforce (A) must happen before (B) and (C) must happen before (D).
In following scenarios we will see missing data: ACDB, ACBD, CABD, CADB.


Alternatively, we can solutionize to track for inconsistent race condition state with greater accuracy. On detecting such scenario, the engine will throw an Exception, which for Node local stats with show up as 5XX and in cluster stats, the co-ordinator will handle the exception and return empty json for this node in response.

@shwetathareja @vikasvb90 Let me know what you think.

@vikasvb90
Copy link
Contributor Author

I agree. Higher the number of different in-memory state updates and higher the execution b/w these updates and updates in the in-memory maps, higher will be possibilities of race conditions.
That's why initial solution of keeping a single monotonically increasing counter right before map updates drastically reduces inconsistency in determining state of stats based on which an indication in the form of 4xx and an empty response can be provided which would be close to 100% accurate.

@getsaurabh02 getsaurabh02 moved this from 🆕 New to Later (6 months plus) in Search Project Board Aug 15, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working Indexing & Search Indexing Indexing, Bulk Indexing and anything related to indexing
Projects
Status: Later (6 months plus)
Development

No branches or pull requests

3 participants