Skip to content

Commit

Permalink
feat(shardSplitting): improve error handling (#873)
Browse files Browse the repository at this point in the history
* feat(shardSplitting): improve error handling

* chore: spelling

* fix(shardQuerySplitting): stop on parse errors
  • Loading branch information
matyax authored Nov 5, 2024
1 parent e9515e2 commit 13f6782
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 13 deletions.
1 change: 1 addition & 0 deletions project-words.txt
Original file line number Diff line number Diff line change
Expand Up @@ -488,3 +488,4 @@ pangea
primarylabels
Shortlink
uninterpolated
Retriable
14 changes: 13 additions & 1 deletion src/services/shardQuerySplitting.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const originalWarn = console.warn;
beforeAll(() => {
jest.spyOn(console, 'log').mockImplementation(() => {});
jest.spyOn(console, 'warn').mockImplementation(() => {});
jest.spyOn(console, 'error').mockImplementation(() => {});
});
afterAll(() => {
console.log = originalLog;
Expand Down Expand Up @@ -156,7 +157,7 @@ describe('runShardSplitQuery()', () => {
jest
// @ts-expect-error
.spyOn(datasource, 'runQuery')
.mockReturnValueOnce(of({ state: LoadingState.Error, error: { refId: 'A', message: 'Error' }, data: [] }));
.mockReturnValueOnce(of({ state: LoadingState.Error, error: { refId: 'A', message: 'timeout' }, data: [] }));
// @ts-expect-error
jest.spyOn(global, 'setTimeout').mockImplementationOnce((callback) => {
callback();
Expand All @@ -169,6 +170,17 @@ describe('runShardSplitQuery()', () => {
});
});

test('Failed requests have loading state Error', async () => {
jest.mocked(datasource.languageProvider.fetchLabelValues).mockResolvedValue(['1']);
jest
// @ts-expect-error
.spyOn(datasource, 'runQuery')
.mockReturnValue(of({ state: LoadingState.Error, error: { refId: 'A', message: 'parse error' }, data: [] }));
await expect(runShardSplitQuery(datasource, request)).toEmitValuesWith((response: DataQueryResponse[]) => {
expect(response[0].state).toBe(LoadingState.Error);
});
});

test('Adjusts the group size based on errors and execution time', async () => {
const request = createRequest([{ expr: 'count_over_time($SELECTOR[1m])', refId: 'A' }], {
range: {
Expand Down
38 changes: 26 additions & 12 deletions src/services/shardQuerySplitting.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,17 @@ function splitQueriesByStreamShard(
subquerySubscription = null;
}

if (shouldStop) {
subscriber.complete();
return;
}

const done = () => {
mergedResponse.state = LoadingState.Done;
mergedResponse.state = shouldStop ? LoadingState.Error : LoadingState.Done;
subscriber.next(mergedResponse);
subscriber.complete();
};

if (shouldStop) {
done();
return;
}

const nextRequest = () => {
const nextCycle = Math.min(cycle + groupSize, shards.length);
if (cycle < shards.length && nextCycle <= shards.length) {
Expand All @@ -102,11 +102,12 @@ function splitQueriesByStreamShard(
};

const retry = (errorResponse?: DataQueryResponse) => {
if (errorResponse?.errors && errorResponse.errors[0].message?.includes('maximum of series')) {
logger.info(`Maximum series reached, skipping retry`);
return false;
} else if (errorResponse?.errors && errorResponse.errors[0].message?.includes('parse error')) {
logger.info(`Parse error, skipping retry`);
try {
if (errorResponse && !isRetriableError(errorResponse)) {
return false;
}
} catch (e) {
logger.error(e);
shouldStop = true;
return false;
}
Expand Down Expand Up @@ -204,7 +205,7 @@ function splitQueriesByStreamShard(
const selector = getSelectorForShardValues(splittingTargets[0].expr);

if (!isValidQuery(selector)) {
console.log(`Skipping invalid selector: ${selector}`);
debug(`Skipping invalid selector: ${selector}`);
subscriber.complete();
return;
}
Expand Down Expand Up @@ -296,6 +297,19 @@ function getInitialGroupSize(shards: number[]) {
return Math.floor(Math.sqrt(shards.length));
}

function isRetriableError(errorResponse: DataQueryResponse) {
const message = errorResponse.errors
? (errorResponse.errors[0].message ?? '').toLowerCase()
: errorResponse.error?.message ?? '';

Check warning on line 303 in src/services/shardQuerySplitting.ts

View workflow job for this annotation

GitHub Actions / build

'error' is deprecated. use errors instead -- will be removed in Grafana 10+
if (message.includes('timeout')) {
return true;
} else if (message.includes('parse error')) {
// If the error is a parse error, we want to signal to stop querying.
throw new Error(message);
}
return false;
}

// Enable to output debugging logs
const DEBUG_ENABLED = Boolean(localStorage.getItem(`${pluginJson.id}.sharding_debug_enabled`));
function debug(message: string) {
Expand Down

0 comments on commit 13f6782

Please sign in to comment.