Skip to content

Commit

Permalink
refactor(tracing): update axiom.go to use TracerProvider instead of T…
Browse files Browse the repository at this point in the history
…racer

refactor(tracing): update trace.go to use TracerProvider instead of Tracer
feat(ratelimit): add sync.Mutex and map for RatelimitServiceClient in service.go
feat(ratelimit): implement peer client caching in sync_with_origin.go
refactor(keys): update service.ts to import Context from correct path
feat(metrics): add isolateId to LogdrainMetrics constructor
feat(middleware): generate isolateId in init.ts and pass to LogdrainMetrics
feat(middleware): remove isolateId and coldstartAt from metrics.ts
feat(util): add default isolateId in instrumentedFetch
refactor(routes): update v1_keys_updateKey.ts to use nullish coalescing operator
refactor(metrics): remove isolateId and isolateLifetime from metricSchema
  • Loading branch information
chronark committed Aug 6, 2024
1 parent b3f2edc commit 52283b6
Show file tree
Hide file tree
Showing 11 changed files with 60 additions and 44 deletions.
18 changes: 5 additions & 13 deletions apps/agent/pkg/tracing/axiom.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"

axiom "github.com/axiomhq/axiom-go/axiom/otel"
"go.opentelemetry.io/otel"
)

type Config struct {
Expand All @@ -19,20 +18,13 @@ type Config struct {
type Closer func() error

func Init(ctx context.Context, config Config) (Closer, error) {

close, err := axiom.InitTracing(
ctx,
config.Dataset,
config.Application,
config.Version,
axiom.SetNoEnv(),
axiom.SetToken(config.AxiomToken),
)

tp, err := axiom.TracerProvider(ctx, config.Dataset, config.Application, config.Version, axiom.SetNoEnv(), axiom.SetToken(config.AxiomToken))
if err != nil {
return nil, fmt.Errorf("unable to init tracing: %w", err)
}
globalTracer = tp

globalTracer = otel.Tracer("main")
return close, nil
return func() error {
return tp.Shutdown(context.Background())
}, nil
}
10 changes: 7 additions & 3 deletions apps/agent/pkg/tracing/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,16 @@ import (
"go.opentelemetry.io/otel/trace/noop"
)

var globalTracer trace.Tracer
var globalTracer trace.TracerProvider

func init() {
globalTracer = noop.NewTracerProvider().Tracer("noop")
globalTracer = noop.NewTracerProvider()
}

func Start(ctx context.Context, name string, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
return globalTracer.Start(ctx, name, opts...)
return globalTracer.Tracer("main").Start(ctx, name, opts...)
}

func GetGlobalTraceProvider() trace.TracerProvider {
return globalTracer
}
7 changes: 7 additions & 0 deletions apps/agent/services/ratelimit/service.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package ratelimit

import (
"sync"
"time"

ratelimitv1 "github.com/unkeyed/unkey/apps/agent/gen/proto/ratelimit/v1"
"github.com/unkeyed/unkey/apps/agent/gen/proto/ratelimit/v1/ratelimitv1connect"
"github.com/unkeyed/unkey/apps/agent/pkg/batch"
"github.com/unkeyed/unkey/apps/agent/pkg/cluster"
"github.com/unkeyed/unkey/apps/agent/pkg/logging"
Expand All @@ -22,6 +24,9 @@ type service struct {
syncBuffer chan syncWithOriginRequest
metrics metrics.Metrics
consistencyChecker *consistencyChecker

peersMu sync.RWMutex
peers map[string]ratelimitv1connect.RatelimitServiceClient
}

type Config struct {
Expand All @@ -40,6 +45,8 @@ func New(cfg Config) (Service, error) {
metrics: cfg.Metrics,
consistencyChecker: newConsistencyChecker(cfg.Logger),
syncBuffer: make(chan syncWithOriginRequest, 1000),
peersMu: sync.RWMutex{},
peers: map[string]ratelimitv1connect.RatelimitServiceClient{},
}

if cfg.Cluster != nil {
Expand Down
24 changes: 19 additions & 5 deletions apps/agent/services/ratelimit/sync_with_origin.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,22 @@ func (s *service) syncWithOrigin(req syncWithOriginRequest) {
if !strings.Contains(url, "://") {
url = "http://" + url
}
interceptor, err := otelconnect.NewInterceptor()
if err != nil {
s.logger.Error().Err(err).Msg("failed to create otel interceptor")
return

s.peersMu.RLock()
c, ok := s.peers[peer.Id]
s.peersMu.RUnlock()
if !ok {
interceptor, err := otelconnect.NewInterceptor(otelconnect.WithTracerProvider(tracing.GetGlobalTraceProvider()))
if err != nil {
tracing.RecordError(span, err)
s.logger.Err(err).Msg("failed to create interceptor")
return
}
c = ratelimitv1connect.NewRatelimitServiceClient(http.DefaultClient, url, connect.WithInterceptors(interceptor))
s.peersMu.Lock()
s.peers[peer.Id] = c
s.peersMu.Unlock()
}
c := ratelimitv1connect.NewRatelimitServiceClient(http.DefaultClient, url, connect.WithInterceptors(interceptor))

connectReq := connect.NewRequest(&ratelimitv1.PushPullRequest{
Events: req.events,
Expand All @@ -64,6 +74,10 @@ func (s *service) syncWithOrigin(req syncWithOriginRequest) {

res, err := c.PushPull(ctx, connectReq)
if err != nil {
s.peersMu.Lock()
s.logger.Warn().Str("peerId", peer.Id).Msg("resetting peer client due to error")
delete(s.peers, peer.Id)
s.peersMu.Unlock()
tracing.RecordError(span, err)
s.logger.Warn().Err(err).Str("peerId", peer.Id).Str("url", url).Msg("failed to push pull")
return
Expand Down
7 changes: 4 additions & 3 deletions apps/api/src/pkg/keys/service.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import type { Cache } from "@/pkg/cache";
import type { Api, Database, Key, Ratelimit } from "@/pkg/db";
import type { Context } from "@/pkg/hono/app";
import type { Metrics } from "@/pkg/metrics";
import type { RateLimiter } from "@/pkg/ratelimit";
import type { UsageLimiter } from "@/pkg/usagelimit";
import { BaseError, Err, FetchError, Ok, type Result, SchemaError } from "@unkey/error";
import { sha256 } from "@unkey/hash";
import type { PermissionQuery, RBAC } from "@unkey/rbac";
import type { Logger } from "@unkey/worker-logging";
import type { Context } from "hono";
import type { Analytics } from "../analytics";

export class DisabledWorkspaceError extends BaseError<{ workspaceId: string }> {
Expand Down Expand Up @@ -167,10 +167,11 @@ export class KeyService {
ipAddress: c.req.header("True-Client-IP") ?? c.req.header("CF-Connecting-IP"),
userAgent: c.req.header("User-Agent"),
requestedResource: "",
edgeRegion: "",
// @ts-expect-error - the cf object will be there on cloudflare
region: c.req.raw?.cf?.country ?? "",
ownerId: res.val.key.ownerId ?? undefined,
// @ts-expect-error - the cf object will be there on cloudflare
region: c.req.raw?.cf?.colo ?? "",
edgeRegion: c.req.raw?.cf?.colo ?? "",
keySpaceId: res.val.key.keyAuthId,
}),
);
Expand Down
4 changes: 4 additions & 0 deletions apps/api/src/pkg/metrics/logdrain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,23 @@ import type { Metrics } from "./interface";

export class LogdrainMetrics implements Metrics {
private readonly requestId: string;
private readonly isolateId: string;
private readonly environment: LogSchema["environment"];

constructor(opts: {
requestId: string;
isolateId: string;
environment: LogSchema["environment"];
}) {
this.requestId = opts.requestId;
this.isolateId = opts.isolateId;
this.environment = opts.environment;
}

public emit(metric: Metric): void {
const log = new Log({
requestId: this.requestId,
isolateId: this.isolateId,
environment: this.environment,
application: "api",
type: "metric",
Expand Down
12 changes: 12 additions & 0 deletions apps/api/src/pkg/middleware/init.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,24 @@ import { Vault } from "../vault";
*/
const rlMap = new Map();

/**
* workerId and coldStartAt are used to track the lifetime of the worker
* and are set once when the worker is first initialized.
*
* subsequent requests will use the same workerId and coldStartAt
*/
let isolateId: string | null = null;

/**
* Initialize all services.
*
* Call this once before any hono handlers run.
*/
export function init(): MiddlewareHandler<HonoEnv> {
return async (c, next) => {
if (!isolateId) {
isolateId = crypto.randomUUID();
}
const requestId = newId("request");
c.set("requestId", requestId);
c.res.headers.set("Unkey-Request-Id", requestId);
Expand Down Expand Up @@ -63,6 +74,7 @@ export function init(): MiddlewareHandler<HonoEnv> {
? new LogdrainMetrics({
requestId,
environment: c.env.ENVIRONMENT,
isolateId: isolateId!,
})
: new NoopMetrics();

Expand Down
15 changes: 0 additions & 15 deletions apps/api/src/pkg/middleware/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,8 @@ import type { HonoEnv } from "../hono/env";

type DiscriminateMetric<T, M = Metric> = M extends { metric: T } ? M : never;

/**
* workerId and coldStartAt are used to track the lifetime of the worker
* and are set once when the worker is first initialized.
*
* subsequent requests will use the same workerId and coldStartAt
*/
let isolateId: string | null = null;
let coldstartAt: number | null = null;

export function metrics(): MiddlewareHandler<HonoEnv> {
return async (c, next) => {
if (!isolateId) {
isolateId = crypto.randomUUID();
}
const { metrics, analytics, logger } = c.get("services");
// logger.info("request", {
// method: c.req.method,
Expand All @@ -26,8 +14,6 @@ export function metrics(): MiddlewareHandler<HonoEnv> {
//
const start = performance.now();
const m = {
isolateId,
isolateLifetime: coldstartAt ? Date.now() - coldstartAt : 0,
metric: "metric.http.request",
path: c.req.path,
host: new URL(c.req.url).host,
Expand All @@ -44,7 +30,6 @@ export function metrics(): MiddlewareHandler<HonoEnv> {
fromAgent: c.req.header("Unkey-Redirect"),
context: {},
} as DiscriminateMetric<"metric.http.request">;
coldstartAt = Date.now();

try {
const telemetry = {
Expand Down
1 change: 1 addition & 0 deletions apps/api/src/pkg/util/instrument-fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ export function instrumentedFetch(c?: Context) {
const metrics = c
? c.get("services").metrics
: new LogdrainMetrics({
isolateId: "unknown",
requestId: "unknown",
environment: "unknown",
});
Expand Down
2 changes: 1 addition & 1 deletion apps/api/src/routes/v1_keys_updateKey.ts
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ export const registerV1KeysUpdate = (app: App) =>
? req.ratelimit.async
: req.ratelimit?.type === "fast",
ratelimitLimit:
req.ratelimit === null ? null : req.ratelimit?.limit ? req.ratelimit?.refillRate : null,
req.ratelimit === null ? null : req.ratelimit?.limit ?? req.ratelimit?.refillRate ?? null,
ratelimitDuration:
req.ratelimit === null
? null
Expand Down
4 changes: 0 additions & 4 deletions internal/metrics/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,6 @@ export const metricSchema = z.discriminatedUnion("metric", [
status: z.number(),
error: z.string().optional(),
serviceLatency: z.number(),
// ms since worker initilized for the first time
// a non zero value means the worker is reused
isolateLifetime: z.number(),
isolateId: z.string(),
// Regional data might be different on non-cloudflare deployments
colo: z.string().optional(),
continent: z.string().optional(),
Expand Down

0 comments on commit 52283b6

Please sign in to comment.