Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propagate errors to AppSync #2162

Merged
merged 7 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/fresh-candles-leave.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@aws-amplify/ai-constructs': minor
---

Propagate errors to AppSync
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import {
import { ConversationTurnEventToolsProvider } from './event-tools-provider';
import { ConversationMessageHistoryRetriever } from './conversation_message_history_retriever';
import * as bedrock from '@aws-sdk/client-bedrock-runtime';
import { ValidationError } from './errors';

/**
* This class is responsible for interacting with Bedrock Converse API
Expand Down Expand Up @@ -87,7 +88,7 @@ export class BedrockConverseAdapter {
this.clientToolByName.set(t.name, t);
});
if (duplicateTools.size > 0) {
throw new Error(
throw new ValidationError(
`Tools must have unique names. Duplicate tools: ${[
...duplicateTools,
].join(', ')}.`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { ConversationTurnEvent, StreamingResponseChunk } from './types';
import { BedrockConverseAdapter } from './bedrock_converse_adapter';
import { ContentBlock } from '@aws-sdk/client-bedrock-runtime';
import { ConversationTurnResponseSender } from './conversation_turn_response_sender';
import { Lazy } from './lazy';

void describe('Conversation turn executor', () => {
const event: ConversationTurnEvent = {
Expand Down Expand Up @@ -62,8 +63,8 @@ void describe('Conversation turn executor', () => {
await new ConversationTurnExecutor(
event,
[],
bedrockConverseAdapter,
responseSender,
new Lazy(() => responseSender),
new Lazy(() => bedrockConverseAdapter),
consoleMock
).execute();

Expand Down Expand Up @@ -156,8 +157,8 @@ void describe('Conversation turn executor', () => {
await new ConversationTurnExecutor(
streamingEvent,
[],
bedrockConverseAdapter,
responseSender,
new Lazy(() => responseSender),
new Lazy(() => bedrockConverseAdapter),
consoleMock
).execute();

Expand Down Expand Up @@ -214,22 +215,30 @@ void describe('Conversation turn executor', () => {
() => Promise.resolve()
);

const responseSenderSendErrorsMock = mock.method(
responseSender,
'sendErrors',
() => Promise.resolve()
);

const consoleErrorMock = mock.fn();
const consoleLogMock = mock.fn();
const consoleDebugMock = mock.fn();
const consoleWarnMock = mock.fn();
const consoleMock = {
error: consoleErrorMock,
log: consoleLogMock,
debug: consoleDebugMock,
warn: consoleWarnMock,
} as unknown as Console;

await assert.rejects(
() =>
new ConversationTurnExecutor(
event,
[],
bedrockConverseAdapter,
responseSender,
new Lazy(() => responseSender),
new Lazy(() => bedrockConverseAdapter),
consoleMock
).execute(),
(error: Error) => {
Expand Down Expand Up @@ -263,6 +272,16 @@ void describe('Conversation turn executor', () => {
consoleErrorMock.mock.calls[0].arguments[1],
bedrockError
);
assert.strictEqual(responseSenderSendErrorsMock.mock.calls.length, 1);
assert.deepStrictEqual(
responseSenderSendErrorsMock.mock.calls[0].arguments[0],
[
{
errorType: 'Error',
message: 'Bedrock failed',
},
]
);
});

void it('logs and propagates error if response sender throws', async () => {
Expand Down Expand Up @@ -290,22 +309,30 @@ void describe('Conversation turn executor', () => {
() => Promise.resolve()
);

const responseSenderSendErrorsMock = mock.method(
responseSender,
'sendErrors',
() => Promise.resolve()
);

const consoleErrorMock = mock.fn();
const consoleLogMock = mock.fn();
const consoleDebugMock = mock.fn();
const consoleWarnMock = mock.fn();
const consoleMock = {
error: consoleErrorMock,
log: consoleLogMock,
debug: consoleDebugMock,
warn: consoleWarnMock,
} as unknown as Console;

await assert.rejects(
() =>
new ConversationTurnExecutor(
event,
[],
bedrockConverseAdapter,
responseSender,
new Lazy(() => responseSender),
new Lazy(() => bedrockConverseAdapter),
consoleMock
).execute(),
(error: Error) => {
Expand Down Expand Up @@ -339,5 +366,180 @@ void describe('Conversation turn executor', () => {
consoleErrorMock.mock.calls[0].arguments[1],
responseSenderError
);
assert.strictEqual(responseSenderSendErrorsMock.mock.calls.length, 1);
assert.deepStrictEqual(
responseSenderSendErrorsMock.mock.calls[0].arguments[0],
[
{
errorType: 'Error',
message: 'Failed to send response',
},
]
);
});

void it('throws original exception if error sender fails', async () => {
const bedrockConverseAdapter = new BedrockConverseAdapter(event, []);
const originalError = new Error('original error');
mock.method(bedrockConverseAdapter, 'askBedrock', () =>
Promise.reject(originalError)
);
const responseSender = new ConversationTurnResponseSender(event);
mock.method(responseSender, 'sendResponse', () => Promise.resolve());

mock.method(responseSender, 'sendResponseChunk', () => Promise.resolve());

const responseSenderSendErrorsMock = mock.method(
responseSender,
'sendErrors',
() => Promise.reject(new Error('sender error'))
);

const consoleErrorMock = mock.fn();
const consoleLogMock = mock.fn();
const consoleDebugMock = mock.fn();
const consoleWarnMock = mock.fn();
const consoleMock = {
error: consoleErrorMock,
log: consoleLogMock,
debug: consoleDebugMock,
warn: consoleWarnMock,
} as unknown as Console;

await assert.rejects(
() =>
new ConversationTurnExecutor(
event,
[],
new Lazy(() => responseSender),
new Lazy(() => bedrockConverseAdapter),
consoleMock
).execute(),
(error: Error) => {
assert.strictEqual(error, originalError);
return true;
}
);

assert.strictEqual(responseSenderSendErrorsMock.mock.calls.length, 1);
assert.deepStrictEqual(
responseSenderSendErrorsMock.mock.calls[0].arguments[0],
[
{
errorType: 'Error',
message: 'original error',
},
]
);
});

void it('serializes unknown errors', async () => {
const bedrockConverseAdapter = new BedrockConverseAdapter(event, []);
const unknownError = { some: 'shape' };
mock.method(bedrockConverseAdapter, 'askBedrock', () =>
Promise.reject(unknownError)
);
const responseSender = new ConversationTurnResponseSender(event);
mock.method(responseSender, 'sendResponse', () => Promise.resolve());

mock.method(responseSender, 'sendResponseChunk', () => Promise.resolve());

const responseSenderSendErrorsMock = mock.method(
responseSender,
'sendErrors',
() => Promise.resolve()
);

const consoleErrorMock = mock.fn();
const consoleLogMock = mock.fn();
const consoleDebugMock = mock.fn();
const consoleWarnMock = mock.fn();
const consoleMock = {
error: consoleErrorMock,
log: consoleLogMock,
debug: consoleDebugMock,
warn: consoleWarnMock,
} as unknown as Console;

await assert.rejects(
() =>
new ConversationTurnExecutor(
event,
[],
new Lazy(() => responseSender),
new Lazy(() => bedrockConverseAdapter),
consoleMock
).execute(),
(error: Error) => {
assert.strictEqual(error, unknownError);
return true;
}
);

assert.strictEqual(responseSenderSendErrorsMock.mock.calls.length, 1);
assert.deepStrictEqual(
responseSenderSendErrorsMock.mock.calls[0].arguments[0],
[
{
errorType: 'UnknownError',
message: '{"some":"shape"}',
},
]
);
});

void it('reports initialization errors', async () => {
const bedrockConverseAdapter = new BedrockConverseAdapter(event, []);
mock.method(bedrockConverseAdapter, 'askBedrock', () => Promise.resolve());
const responseSender = new ConversationTurnResponseSender(event);
mock.method(responseSender, 'sendResponse', () => Promise.resolve());

mock.method(responseSender, 'sendResponseChunk', () => Promise.resolve());

const responseSenderSendErrorsMock = mock.method(
responseSender,
'sendErrors',
() => Promise.resolve()
);

const consoleErrorMock = mock.fn();
const consoleLogMock = mock.fn();
const consoleDebugMock = mock.fn();
const consoleWarnMock = mock.fn();
const consoleMock = {
error: consoleErrorMock,
log: consoleLogMock,
debug: consoleDebugMock,
warn: consoleWarnMock,
} as unknown as Console;

const initializationError = new Error('initialization error');
await assert.rejects(
() =>
new ConversationTurnExecutor(
event,
[],
new Lazy(() => responseSender),
new Lazy(() => {
throw initializationError;
}),
consoleMock
).execute(),
(error: Error) => {
assert.strictEqual(error, initializationError);
return true;
}
);

assert.strictEqual(responseSenderSendErrorsMock.mock.calls.length, 1);
assert.deepStrictEqual(
responseSenderSendErrorsMock.mock.calls[0].arguments[0],
[
{
errorType: 'Error',
message: 'initialization error',
},
]
);
});
});
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { ConversationTurnResponseSender } from './conversation_turn_response_sender.js';
import { ConversationTurnEvent, ExecutableTool, JSONSchema } from './types.js';
import { BedrockConverseAdapter } from './bedrock_converse_adapter.js';
import { Lazy } from './lazy';

/**
* This class is responsible for orchestrating conversation turn execution.
Expand All @@ -16,11 +17,13 @@ export class ConversationTurnExecutor {
constructor(
private readonly event: ConversationTurnEvent,
additionalTools: Array<ExecutableTool>,
private readonly bedrockConverseAdapter = new BedrockConverseAdapter(
event,
additionalTools
// We're deferring dependency initialization here so that we can capture all validation errors.
private readonly responseSender = new Lazy(
() => new ConversationTurnResponseSender(event)
),
private readonly bedrockConverseAdapter = new Lazy(
() => new BedrockConverseAdapter(event, additionalTools)
),
private readonly responseSender = new ConversationTurnResponseSender(event),
private readonly logger = console
) {}

Expand All @@ -32,14 +35,14 @@ export class ConversationTurnExecutor {
this.logger.debug('Event received:', this.event);

if (this.event.streamResponse) {
const chunks = this.bedrockConverseAdapter.askBedrockStreaming();
const chunks = this.bedrockConverseAdapter.value.askBedrockStreaming();
for await (const chunk of chunks) {
await this.responseSender.sendResponseChunk(chunk);
await this.responseSender.value.sendResponseChunk(chunk);
}
} else {
const assistantResponse =
await this.bedrockConverseAdapter.askBedrock();
await this.responseSender.sendResponse(assistantResponse);
await this.bedrockConverseAdapter.value.askBedrock();
await this.responseSender.value.sendResponse(assistantResponse);
}

this.logger.log(
Expand All @@ -50,10 +53,28 @@ export class ConversationTurnExecutor {
`Failed to handle conversation turn event, currentMessageId=${this.event.currentMessageId}, conversationId=${this.event.conversationId}`,
e
);
await this.tryForwardError(e);
// Propagate error to mark lambda execution as failed in metrics.
throw e;
}
};

private tryForwardError = async (e: unknown) => {
try {
let errorType = 'UnknownError';
let message: string;
if (e instanceof Error) {
errorType = e.name;
message = e.message;
} else {
message = JSON.stringify(e);
}
await this.responseSender.value.sendErrors([{ errorType, message }]);
} catch (e) {
// Best effort, only log the fact that we tried to send error back to AppSync.
this.logger.warn('Failed to send error mutation', e);
}
};
}

/**
Expand Down
Loading
Loading