Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat (spike): Package opentelemetry forwarding to xray #115

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion packages/@eventual/aws-cdk/src/service-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { HttpLambdaIntegration } from "@aws-cdk/aws-apigatewayv2-integrations-al
import { ServiceType } from "@eventual/core";
import { HttpMethod } from "aws-cdk-lib/aws-events";
import { Effect, IGrantable, PolicyStatement } from "aws-cdk-lib/aws-iam";
import { Code, Function } from "aws-cdk-lib/aws-lambda";
import { Code, Function, Tracing } from "aws-cdk-lib/aws-lambda";
import { NodejsFunction } from "aws-cdk-lib/aws-lambda-nodejs";
import { Arn, Stack } from "aws-cdk-lib";
import { Construct } from "constructs";
Expand Down Expand Up @@ -149,6 +149,7 @@ export class Api extends Construct {
"../../esm/handlers/api",
entry
),
tracing: Tracing.ACTIVE,
...baseNodeFnProps,
});
}
Expand All @@ -157,6 +158,7 @@ export class Api extends Construct {
return new Function(this, id, {
code: Code.fromAsset(outDir(this, entry)),
...baseNodeFnProps,
tracing: Tracing.ACTIVE,
handler: "index.handler",
});
}
Expand Down
2 changes: 2 additions & 0 deletions packages/@eventual/aws-cdk/src/service-function.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { ServiceType, SERVICE_TYPE_FLAG } from "@eventual/core";
import { Duration } from "aws-cdk-lib";
import {
Architecture,
Code,
Expand All @@ -22,6 +23,7 @@ export class ServiceFunction extends Function {
runtime: Runtime.NODEJS_16_X,
architecture: Architecture.ARM_64,
memorySize: 512,
timeout: Duration.seconds(10),
...props,
code: Code.fromAsset(outDir(scope, props.serviceType)),
handler: props.handler ?? "index.default",
Expand Down
15 changes: 15 additions & 0 deletions packages/@eventual/aws-cdk/src/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { Api } from "./service-api";
import { outDir } from "./utils";
import { IWorkflows, Workflows } from "./workflows";
import { Events } from "./events";
import { Telemetry } from "./telemetry";

export interface ServiceProps {
entry: string;
Expand Down Expand Up @@ -72,6 +73,8 @@ export class Service extends Construct implements IGrantable {

readonly grantPrincipal: IPrincipal;

readonly telemetry: Telemetry;

constructor(scope: Construct, id: string, props: ServiceProps) {
super(scope, id);

Expand Down Expand Up @@ -99,6 +102,10 @@ export class Service extends Construct implements IGrantable {
removalPolicy: RemovalPolicy.DESTROY,
});

this.telemetry = new Telemetry(this, "Telemetry", {
serviceName: this.serviceName,
});

const proxyScheduler = lazyInterface<IScheduler>();
const proxyWorkflows = lazyInterface<IWorkflows>();
const proxyActivities = lazyInterface<IActivities>();
Expand Down Expand Up @@ -140,6 +147,14 @@ export class Service extends Construct implements IGrantable {
events: this.events,
});

this.telemetry.attachToFunction(this.events.handler, "handler");
this.telemetry.attachToFunction(this.activities.worker, "worker");
this.telemetry.attachToFunction(
this.workflows.orchestrator,
"orchestrator"
);
this.telemetry.attachToFunction(this.scheduler.forwarder, "forwarder");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure the scheduler forwarded needs this? What is it logging? If it does need it, then the scheuler.handler also needs it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure can take it off.


this.grantPrincipal = new CompositePrincipal(
// when granting permissions to the service,
// propagate them to the following principals
Expand Down
63 changes: 63 additions & 0 deletions packages/@eventual/aws-cdk/src/telemetry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import { ENV_NAMES } from "@eventual/aws-runtime";
import lambda, {
Architecture,
Code,
ILayerVersion,
LayerVersion,
} from "aws-cdk-lib/aws-lambda";
import {
ILogGroup,
ILogStream,
LogGroup,
LogStream,
} from "aws-cdk-lib/aws-logs";
import { Construct } from "constructs";

interface TelemetryProps {
serviceName: string;
}

export interface ITelemetry {
logGroup: ILogGroup;
logStreams: ILogStream[];
collectorLayer: ILayerVersion;
}

export class Telemetry extends Construct {
logGroup: LogGroup;
logStreams: ILogStream[] = [];
collectorLayer: ILayerVersion;

constructor(scope: Construct, id: string, props: TelemetryProps) {
super(scope, id);

this.logGroup = new LogGroup(this, "LogGroup", {
logGroupName: `${props.serviceName}-telemetry`,
});

this.collectorLayer = new LayerVersion(this, "telemetry-collector", {
code: Code.fromAsset(
require.resolve("@eventual/aws-runtime/mini-collector-cloudwatch")
),
compatibleArchitectures: [Architecture.ARM_64],
});
}

attachToFunction(fn: lambda.Function, componentName: string) {
const logStream = new LogStream(this, `LogStream${componentName}`, {
logGroup: this.logGroup,
logStreamName: componentName,
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this what we want? A log steam per function for all time? Or is this just an experiment?

There is a limit to the number of writes to a log stream.

5 requests per second per log stream. Additional requests are throttled. This quota can't be changed.

The orchestrator, for all workflow executions, would be limited to 5TPS.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hrm ok didnt realise there was a throttle. I originally had it creating a new log stream every execution, but reliased without static streams it would be difficult to attach events listeners to the streams, to forward logs to the real collector. With static streams we can just set that up in cdk.

Copy link
Contributor Author

@cfraz89 cfraz89 Dec 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An option I can think of instead, actually, is just skip the logging to cloudwatch part. Instead the extension just sends the data to the otel collector running in a different lambda, over http

fn.addEnvironment(
ENV_NAMES.TELEMETRY_LOG_GROUP_NAME,
this.logGroup.logGroupName
);
fn.addEnvironment(
ENV_NAMES.TELEMETRY_LOG_STREAM_NAME,
logStream.logStreamName
);
fn.addEnvironment(ENV_NAMES.TELEMETRY_COMPONENT_NAME, componentName);
fn.addLayers(this.collectorLayer);
this.logStreams.push(logStream);
}
}
Binary file not shown.
12 changes: 11 additions & 1 deletion packages/@eventual/aws-runtime/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
".": {
"import": "./lib/esm/index.js",
"require": "./lib/cjs/index.js"
}
},
"./mini-collector-cloudwatch": "./mini-collector-cloudwatch.zip"
},
"main": "./lib/cjs/index.js",
"module": "./lib/esm/index.js",
Expand All @@ -14,9 +15,18 @@
},
"dependencies": {
"@aws-lambda-powertools/logger": "^1.4.1",
"@aws-sdk/client-cloudwatch-logs": "^3.226.0",
"@eventual/core": "workspace:^",
"@middy/core": "^3.6.2",
"@middy/error-logger": "^3.6.2",
"@opentelemetry/api": "^1.3.0",
"@opentelemetry/context-async-hooks": "^1.8.0",
"@opentelemetry/core": "^1.8.0",
"@opentelemetry/exporter-trace-otlp-grpc": "^0.34.0",
"@opentelemetry/resources": "^1.8.0",
"@opentelemetry/sdk-trace-base": "^1.8.0",
"@opentelemetry/sdk-trace-node": "^1.8.0",
"@opentelemetry/semantic-conventions": "^1.8.0",
"aws-embedded-metrics": "^4.0.0",
"aws-lambda": "^1.0.7",
"node-fetch": "^2.6.7",
Expand Down
123 changes: 69 additions & 54 deletions packages/@eventual/aws-runtime/src/clients/workflow-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import {
} from "@eventual/core";
import { ulid } from "ulidx";
import { AWSActivityRuntimeClient } from "./activity-runtime-client.js";
import { SpanKind, trace } from "@opentelemetry/api";

export interface AWSWorkflowClientProps {
readonly dynamo: DynamoDBClient;
Expand Down Expand Up @@ -50,68 +51,82 @@ export class AWSWorkflowClient extends WorkflowClient {
timeoutSeconds,
}: StartWorkflowRequest<W>) {
const executionId = formatExecutionId(workflowName, executionName);
console.log("execution input:", input);

await this.props.dynamo.send(
new PutItemCommand({
TableName: this.props.tableName,
Item: {
pk: { S: ExecutionRecord.PRIMARY_KEY },
sk: { S: ExecutionRecord.sortKey(executionId) },
id: { S: executionId },
name: { S: executionName },
workflowName: { S: workflowName },
status: { S: ExecutionStatus.IN_PROGRESS },
startTime: { S: new Date().toISOString() },
...(parentExecutionId
? {
parentExecutionId: { S: parentExecutionId },
seq: { N: seq!.toString(10) },
}
: {}),
},
})
);

const workflowStartedEvent = createEvent<WorkflowStarted>({
type: WorkflowEventType.WorkflowStarted,
input,
workflowName,
// generate the time for the workflow to timeout based on when it was started.
// the timer will be started by the orchestrator so the client does not need to have access to the timer client.
timeoutTime: timeoutSeconds
? new Date(new Date().getTime() + timeoutSeconds * 1000).toISOString()
: undefined,
context: {
name: executionName,
parentId: parentExecutionId,
const tracer = trace.getTracer(executionId, "0.0.0");
await tracer.startActiveSpan(
"startWorkflow",
{
attributes: { workflowName, input },
kind: SpanKind.PRODUCER,
Comment on lines +54 to +59
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to trace in the client or should we trace in the orchestrator (aka: those who call the client). Not all of the callers of the client will have tracing on.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah orchestrator probably makes more sense.

},
});

await this.submitWorkflowTask(executionId, workflowStartedEvent);

async () => {
console.log("execution input:", input);

await this.props.dynamo.send(
new PutItemCommand({
TableName: this.props.tableName,
Item: {
pk: { S: ExecutionRecord.PRIMARY_KEY },
sk: { S: ExecutionRecord.sortKey(executionId) },
id: { S: executionId },
name: { S: executionName },
workflowName: { S: workflowName },
status: { S: ExecutionStatus.IN_PROGRESS },
startTime: { S: new Date().toISOString() },
...(parentExecutionId
? {
parentExecutionId: { S: parentExecutionId },
seq: { N: seq!.toString(10) },
}
: {}),
},
})
);

const workflowStartedEvent = createEvent<WorkflowStarted>({
type: WorkflowEventType.WorkflowStarted,
input,
workflowName,
// generate the time for the workflow to timeout based on when it was started.
// the timer will be started by the orchestrator so the client does not need to have access to the timer client.
timeoutTime: timeoutSeconds
? new Date(
new Date().getTime() + timeoutSeconds * 1000
).toISOString()
: undefined,
context: {
name: executionName,
parentId: parentExecutionId,
},
});

await this.submitWorkflowTask(executionId, workflowStartedEvent);
}
);
return executionId;
}

public async submitWorkflowTask(
executionId: string,
...events: HistoryStateEvent[]
) {
// send workflow task to workflow queue
const workflowTask: SQSWorkflowTaskMessage = {
task: {
executionId,
events,
},
};

await this.props.sqs.send(
new SendMessageCommand({
MessageBody: JSON.stringify(workflowTask),
QueueUrl: this.props.workflowQueueUrl,
MessageGroupId: executionId,
})
);
const tracer = trace.getTracer(executionId, "0.0.0");
await tracer.startActiveSpan("submitWorkflowTask", async () => {
// send workflow task to workflow queue
const workflowTask: SQSWorkflowTaskMessage = {
task: {
executionId,
events,
},
};

await this.props.sqs.send(
new SendMessageCommand({
MessageBody: JSON.stringify(workflowTask),
QueueUrl: this.props.workflowQueueUrl,
MessageGroupId: executionId,
})
);
});
}

async getExecutions(): Promise<Execution[]> {
Expand Down
9 changes: 9 additions & 0 deletions packages/@eventual/aws-runtime/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ export namespace ENV_NAMES {
export const TIMER_QUEUE_URL = "EVENTUAL_TIMER_QUEUE_URL";
export const TIMER_QUEUE_ARN = "EVENTUAL_TIMER_QUEUE_ARN";
export const SCHEDULE_FORWARDER_ARN = "EVENTUAL_SCHEDULE_FORWARDER_ARN";
export const TELEMETRY_LOG_GROUP_NAME = "EVENTUAL_TELEMETRY_LOG_GROUP_NAME";
export const TELEMETRY_LOG_STREAM_NAME = "EVENTUAL_TELEMETRY_LOG_STREAM_NAME";
export const TELEMETRY_COMPONENT_NAME = "EVENTUAL_TELEMETRY_COMPONENT_NAME";
}

export function tryGetEnv(name: string) {
Expand Down Expand Up @@ -46,3 +49,9 @@ export const timerQueueArn = () => tryGetEnv(ENV_NAMES.TIMER_QUEUE_ARN);
export const timerQueueUrl = () => tryGetEnv(ENV_NAMES.TIMER_QUEUE_URL);
export const schedulerForwarderArn = () =>
tryGetEnv(ENV_NAMES.SCHEDULE_FORWARDER_ARN);
export const telemetryLogGroupName = () =>
tryGetEnv(ENV_NAMES.TELEMETRY_LOG_GROUP_NAME);
export const telemetryLogStreamName = () =>
tryGetEnv(ENV_NAMES.TELEMETRY_LOG_STREAM_NAME);
export const telemetryComponentName = () =>
tryGetEnv(ENV_NAMES.TELEMETRY_COMPONENT_NAME);
2 changes: 1 addition & 1 deletion packages/@eventual/aws-runtime/src/handlers/api-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import itty from "itty-router";
import { createEventClient, createWorkflowClient } from "../clients/create.js";

// TODO: remove once we can upgrade to Node 18 in AWS Lambda
import "./fetch-polyfill.js";
import "../fetch-polyfill.js";

const processRequest = createApiHandler({
workflowClient: createWorkflowClient(),
Expand Down
Loading