Skip to content

Commit

Permalink
Propagate errors to AppSync (#2162)
Browse files Browse the repository at this point in the history
* Propagate errors to AppSync

* this works

* more tests

* validation error

* more tests
  • Loading branch information
sobolk authored Oct 31, 2024
1 parent ce44b1a commit 37dd87c
Show file tree
Hide file tree
Showing 11 changed files with 519 additions and 37 deletions.
5 changes: 5 additions & 0 deletions .changeset/fresh-candles-leave.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@aws-amplify/ai-constructs': minor
---

Propagate errors to AppSync
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import {
import { ConversationTurnEventToolsProvider } from './event-tools-provider';
import { ConversationMessageHistoryRetriever } from './conversation_message_history_retriever';
import * as bedrock from '@aws-sdk/client-bedrock-runtime';
import { ValidationError } from './errors';

/**
* This class is responsible for interacting with Bedrock Converse API
Expand Down Expand Up @@ -87,7 +88,7 @@ export class BedrockConverseAdapter {
this.clientToolByName.set(t.name, t);
});
if (duplicateTools.size > 0) {
throw new Error(
throw new ValidationError(
`Tools must have unique names. Duplicate tools: ${[
...duplicateTools,
].join(', ')}.`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { ConversationTurnEvent, StreamingResponseChunk } from './types';
import { BedrockConverseAdapter } from './bedrock_converse_adapter';
import { ContentBlock } from '@aws-sdk/client-bedrock-runtime';
import { ConversationTurnResponseSender } from './conversation_turn_response_sender';
import { Lazy } from './lazy';

void describe('Conversation turn executor', () => {
const event: ConversationTurnEvent = {
Expand Down Expand Up @@ -62,8 +63,8 @@ void describe('Conversation turn executor', () => {
await new ConversationTurnExecutor(
event,
[],
bedrockConverseAdapter,
responseSender,
new Lazy(() => responseSender),
new Lazy(() => bedrockConverseAdapter),
consoleMock
).execute();

Expand Down Expand Up @@ -156,8 +157,8 @@ void describe('Conversation turn executor', () => {
await new ConversationTurnExecutor(
streamingEvent,
[],
bedrockConverseAdapter,
responseSender,
new Lazy(() => responseSender),
new Lazy(() => bedrockConverseAdapter),
consoleMock
).execute();

Expand Down Expand Up @@ -214,22 +215,30 @@ void describe('Conversation turn executor', () => {
() => Promise.resolve()
);

const responseSenderSendErrorsMock = mock.method(
responseSender,
'sendErrors',
() => Promise.resolve()
);

const consoleErrorMock = mock.fn();
const consoleLogMock = mock.fn();
const consoleDebugMock = mock.fn();
const consoleWarnMock = mock.fn();
const consoleMock = {
error: consoleErrorMock,
log: consoleLogMock,
debug: consoleDebugMock,
warn: consoleWarnMock,
} as unknown as Console;

await assert.rejects(
() =>
new ConversationTurnExecutor(
event,
[],
bedrockConverseAdapter,
responseSender,
new Lazy(() => responseSender),
new Lazy(() => bedrockConverseAdapter),
consoleMock
).execute(),
(error: Error) => {
Expand Down Expand Up @@ -263,6 +272,16 @@ void describe('Conversation turn executor', () => {
consoleErrorMock.mock.calls[0].arguments[1],
bedrockError
);
assert.strictEqual(responseSenderSendErrorsMock.mock.calls.length, 1);
assert.deepStrictEqual(
responseSenderSendErrorsMock.mock.calls[0].arguments[0],
[
{
errorType: 'Error',
message: 'Bedrock failed',
},
]
);
});

void it('logs and propagates error if response sender throws', async () => {
Expand Down Expand Up @@ -290,22 +309,30 @@ void describe('Conversation turn executor', () => {
() => Promise.resolve()
);

const responseSenderSendErrorsMock = mock.method(
responseSender,
'sendErrors',
() => Promise.resolve()
);

const consoleErrorMock = mock.fn();
const consoleLogMock = mock.fn();
const consoleDebugMock = mock.fn();
const consoleWarnMock = mock.fn();
const consoleMock = {
error: consoleErrorMock,
log: consoleLogMock,
debug: consoleDebugMock,
warn: consoleWarnMock,
} as unknown as Console;

await assert.rejects(
() =>
new ConversationTurnExecutor(
event,
[],
bedrockConverseAdapter,
responseSender,
new Lazy(() => responseSender),
new Lazy(() => bedrockConverseAdapter),
consoleMock
).execute(),
(error: Error) => {
Expand Down Expand Up @@ -339,5 +366,180 @@ void describe('Conversation turn executor', () => {
consoleErrorMock.mock.calls[0].arguments[1],
responseSenderError
);
assert.strictEqual(responseSenderSendErrorsMock.mock.calls.length, 1);
assert.deepStrictEqual(
responseSenderSendErrorsMock.mock.calls[0].arguments[0],
[
{
errorType: 'Error',
message: 'Failed to send response',
},
]
);
});

void it('throws original exception if error sender fails', async () => {
const bedrockConverseAdapter = new BedrockConverseAdapter(event, []);
const originalError = new Error('original error');
mock.method(bedrockConverseAdapter, 'askBedrock', () =>
Promise.reject(originalError)
);
const responseSender = new ConversationTurnResponseSender(event);
mock.method(responseSender, 'sendResponse', () => Promise.resolve());

mock.method(responseSender, 'sendResponseChunk', () => Promise.resolve());

const responseSenderSendErrorsMock = mock.method(
responseSender,
'sendErrors',
() => Promise.reject(new Error('sender error'))
);

const consoleErrorMock = mock.fn();
const consoleLogMock = mock.fn();
const consoleDebugMock = mock.fn();
const consoleWarnMock = mock.fn();
const consoleMock = {
error: consoleErrorMock,
log: consoleLogMock,
debug: consoleDebugMock,
warn: consoleWarnMock,
} as unknown as Console;

await assert.rejects(
() =>
new ConversationTurnExecutor(
event,
[],
new Lazy(() => responseSender),
new Lazy(() => bedrockConverseAdapter),
consoleMock
).execute(),
(error: Error) => {
assert.strictEqual(error, originalError);
return true;
}
);

assert.strictEqual(responseSenderSendErrorsMock.mock.calls.length, 1);
assert.deepStrictEqual(
responseSenderSendErrorsMock.mock.calls[0].arguments[0],
[
{
errorType: 'Error',
message: 'original error',
},
]
);
});

void it('serializes unknown errors', async () => {
const bedrockConverseAdapter = new BedrockConverseAdapter(event, []);
const unknownError = { some: 'shape' };
mock.method(bedrockConverseAdapter, 'askBedrock', () =>
Promise.reject(unknownError)
);
const responseSender = new ConversationTurnResponseSender(event);
mock.method(responseSender, 'sendResponse', () => Promise.resolve());

mock.method(responseSender, 'sendResponseChunk', () => Promise.resolve());

const responseSenderSendErrorsMock = mock.method(
responseSender,
'sendErrors',
() => Promise.resolve()
);

const consoleErrorMock = mock.fn();
const consoleLogMock = mock.fn();
const consoleDebugMock = mock.fn();
const consoleWarnMock = mock.fn();
const consoleMock = {
error: consoleErrorMock,
log: consoleLogMock,
debug: consoleDebugMock,
warn: consoleWarnMock,
} as unknown as Console;

await assert.rejects(
() =>
new ConversationTurnExecutor(
event,
[],
new Lazy(() => responseSender),
new Lazy(() => bedrockConverseAdapter),
consoleMock
).execute(),
(error: Error) => {
assert.strictEqual(error, unknownError);
return true;
}
);

assert.strictEqual(responseSenderSendErrorsMock.mock.calls.length, 1);
assert.deepStrictEqual(
responseSenderSendErrorsMock.mock.calls[0].arguments[0],
[
{
errorType: 'UnknownError',
message: '{"some":"shape"}',
},
]
);
});

void it('reports initialization errors', async () => {
const bedrockConverseAdapter = new BedrockConverseAdapter(event, []);
mock.method(bedrockConverseAdapter, 'askBedrock', () => Promise.resolve());
const responseSender = new ConversationTurnResponseSender(event);
mock.method(responseSender, 'sendResponse', () => Promise.resolve());

mock.method(responseSender, 'sendResponseChunk', () => Promise.resolve());

const responseSenderSendErrorsMock = mock.method(
responseSender,
'sendErrors',
() => Promise.resolve()
);

const consoleErrorMock = mock.fn();
const consoleLogMock = mock.fn();
const consoleDebugMock = mock.fn();
const consoleWarnMock = mock.fn();
const consoleMock = {
error: consoleErrorMock,
log: consoleLogMock,
debug: consoleDebugMock,
warn: consoleWarnMock,
} as unknown as Console;

const initializationError = new Error('initialization error');
await assert.rejects(
() =>
new ConversationTurnExecutor(
event,
[],
new Lazy(() => responseSender),
new Lazy(() => {
throw initializationError;
}),
consoleMock
).execute(),
(error: Error) => {
assert.strictEqual(error, initializationError);
return true;
}
);

assert.strictEqual(responseSenderSendErrorsMock.mock.calls.length, 1);
assert.deepStrictEqual(
responseSenderSendErrorsMock.mock.calls[0].arguments[0],
[
{
errorType: 'Error',
message: 'initialization error',
},
]
);
});
});
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { ConversationTurnResponseSender } from './conversation_turn_response_sender.js';
import { ConversationTurnEvent, ExecutableTool, JSONSchema } from './types.js';
import { BedrockConverseAdapter } from './bedrock_converse_adapter.js';
import { Lazy } from './lazy';

/**
* This class is responsible for orchestrating conversation turn execution.
Expand All @@ -16,11 +17,13 @@ export class ConversationTurnExecutor {
constructor(
private readonly event: ConversationTurnEvent,
additionalTools: Array<ExecutableTool>,
private readonly bedrockConverseAdapter = new BedrockConverseAdapter(
event,
additionalTools
// We're deferring dependency initialization here so that we can capture all validation errors.
private readonly responseSender = new Lazy(
() => new ConversationTurnResponseSender(event)
),
private readonly bedrockConverseAdapter = new Lazy(
() => new BedrockConverseAdapter(event, additionalTools)
),
private readonly responseSender = new ConversationTurnResponseSender(event),
private readonly logger = console
) {}

Expand All @@ -32,14 +35,14 @@ export class ConversationTurnExecutor {
this.logger.debug('Event received:', this.event);

if (this.event.streamResponse) {
const chunks = this.bedrockConverseAdapter.askBedrockStreaming();
const chunks = this.bedrockConverseAdapter.value.askBedrockStreaming();
for await (const chunk of chunks) {
await this.responseSender.sendResponseChunk(chunk);
await this.responseSender.value.sendResponseChunk(chunk);
}
} else {
const assistantResponse =
await this.bedrockConverseAdapter.askBedrock();
await this.responseSender.sendResponse(assistantResponse);
await this.bedrockConverseAdapter.value.askBedrock();
await this.responseSender.value.sendResponse(assistantResponse);
}

this.logger.log(
Expand All @@ -50,10 +53,28 @@ export class ConversationTurnExecutor {
`Failed to handle conversation turn event, currentMessageId=${this.event.currentMessageId}, conversationId=${this.event.conversationId}`,
e
);
await this.tryForwardError(e);
// Propagate error to mark lambda execution as failed in metrics.
throw e;
}
};

private tryForwardError = async (e: unknown) => {
try {
let errorType = 'UnknownError';
let message: string;
if (e instanceof Error) {
errorType = e.name;
message = e.message;
} else {
message = JSON.stringify(e);
}
await this.responseSender.value.sendErrors([{ errorType, message }]);
} catch (e) {
// Best effort, only log the fact that we tried to send error back to AppSync.
this.logger.warn('Failed to send error mutation', e);
}
};
}

/**
Expand Down
Loading

0 comments on commit 37dd87c

Please sign in to comment.