Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
df-wg committed Nov 28, 2024
1 parent 2e8954d commit e18d81f
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 53 deletions.
65 changes: 29 additions & 36 deletions router/core/engine_loader_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"net/http"
"slices"
"time"
)
Expand Down Expand Up @@ -44,7 +43,7 @@ type engineLoaderHooksRequestContext struct {

type JointHooks interface {
resolve.LoaderHooks
resolve.HttpLoaderHooks
//resolve.HttpLoaderHooks
}

func NewEngineRequestHooks(metricStore metric.Store, logger *requestlogger.SubgraphAccessLogger) JointHooks {
Expand Down Expand Up @@ -83,7 +82,7 @@ func (f *engineLoaderHooks) OnLoad(ctx context.Context, ds resolve.DataSourceInf
})
}

func (f *engineLoaderHooks) OnFinished(ctx context.Context, statusCode int, ds resolve.DataSourceInfo, err error) {
func (f *engineLoaderHooks) OnFinished(ctx context.Context, ds resolve.DataSourceInfo, responseInfo *resolve.ResponseInfo) {

if resolve.IsIntrospectionDataSource(ds.ID) {
return
Expand All @@ -106,7 +105,7 @@ func (f *engineLoaderHooks) OnFinished(ctx context.Context, statusCode int, ds r
defer span.End()

commonAttrs := []attribute.KeyValue{
semconv.HTTPStatusCode(statusCode),
semconv.HTTPStatusCode(responseInfo.StatusCode),
rotel.WgSubgraphID.String(ds.ID),
rotel.WgSubgraphName.String(ds.Name),
}
Expand All @@ -124,15 +123,37 @@ func (f *engineLoaderHooks) OnFinished(ctx context.Context, statusCode int, ds r

metricAddOpt := otelmetric.WithAttributeSet(attribute.NewSet(metricAttrs...))

if err != nil {
if f.accessLogger != nil {
fields := []zap.Field{
zap.String("subgraph_name", ds.Name),
zap.String("subgraph_id", ds.ID),
zap.Int("status", responseInfo.StatusCode),
}
if responseInfo.Err != nil {
fields = append(fields, zap.Any("error", responseInfo.Err))
}
if ctx != nil {
hookCtx, ok := ctx.Value(engineLoaderHooksContextKey).(*engineLoaderHooksRequestContext)
if !ok {
return
}

latency := time.Since(hookCtx.startTime)
fields = append(fields, zap.Duration("latency", latency))
}

f.accessLogger.WriteRequestLog(responseInfo, fields)
}

if responseInfo.Err != nil {
// Set error status. This is the fetch error from the engine
// Downstream errors are extracted from the subgraph response
span.SetStatus(codes.Error, err.Error())
span.RecordError(err)
span.SetStatus(codes.Error, responseInfo.Err.Error())
span.RecordError(responseInfo.Err)

var errorCodesAttr []string

if unwrapped, ok := err.(multiError); ok {
if unwrapped, ok := responseInfo.Err.(multiError); ok {
errs := unwrapped.Unwrap()
for _, e := range errs {
var subgraphError *resolve.SubgraphError
Expand Down Expand Up @@ -188,31 +209,3 @@ func (f *engineLoaderHooks) OnFinished(ctx context.Context, statusCode int, ds r

span.SetAttributes(traceAttrs...)
}

func (f *engineLoaderHooks) OnHttpFinished(ctx context.Context, ds resolve.DataSourceInfo, err error, request *http.Request, response *http.Response) {
if resolve.IsIntrospectionDataSource(ds.ID) {
return
}

if f.accessLogger != nil {
fields := []zap.Field{
zap.String("subgraph_name", ds.Name),
zap.String("subgraph_id", ds.ID),
zap.Int("status", response.StatusCode),
}
if err != nil {
fields = append(fields, zap.Any("error", err))
}
if ctx != nil {
hookCtx, ok := ctx.Value(engineLoaderHooksContextKey).(*engineLoaderHooksRequestContext)
if !ok {
return
}

latency := time.Since(hookCtx.startTime)
fields = append(fields, zap.Duration("latency", latency))
}

f.accessLogger.WriteRequestLog(request, response, fields)
}
}
7 changes: 3 additions & 4 deletions router/core/graph_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,6 @@ func (s *graphServer) buildGraphMux(ctx context.Context,
Authorizer: NewCosmoAuthorizer(authorizerOptions),
SubgraphErrorPropagation: s.subgraphErrorPropagation,
EngineLoaderHooks: hooks,
HttpLoaderHooks: hooks,
}

if s.redisClient != nil {
Expand Down Expand Up @@ -875,7 +874,7 @@ func (s *graphServer) buildGraphMux(ctx context.Context,
return gm, nil
}

func (s *graphServer) accessLogsFieldHandler(attributes []config.CustomAttribute, panicError any, request *http.Request, response *http.Response) []zapcore.Field {
func (s *graphServer) accessLogsFieldHandler(attributes []config.CustomAttribute, panicError any, request *http.Request, responseHeader *http.Header) []zapcore.Field {
reqContext := getRequestContext(request.Context())
if reqContext == nil {
return nil
Expand All @@ -884,8 +883,8 @@ func (s *graphServer) accessLogsFieldHandler(attributes []config.CustomAttribute
resFields = append(resFields, logging.WithRequestID(middleware.GetReqID(request.Context())))

for _, field := range attributes {
if field.ValueFrom != nil && field.ValueFrom.ResponseHeader != "" && response != nil {
resFields = append(resFields, NewStringLogField(response.Header.Get(field.ValueFrom.ResponseHeader), field))
if field.ValueFrom != nil && field.ValueFrom.ResponseHeader != "" && responseHeader != nil {
resFields = append(resFields, NewStringLogField(responseHeader.Get(field.ValueFrom.ResponseHeader), field))
} else if field.ValueFrom != nil && field.ValueFrom.RequestHeader != "" {
resFields = append(resFields, NewStringLogField(request.Header.Get(field.ValueFrom.RequestHeader), field))
} else if field.ValueFrom != nil && field.ValueFrom.ContextField != "" && reqContext.operation != nil {
Expand Down
7 changes: 0 additions & 7 deletions router/core/graphql_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ type HandlerOptions struct {
RateLimitConfig *config.RateLimitConfiguration
SubgraphErrorPropagation config.SubgraphErrorPropagationConfiguration
EngineLoaderHooks resolve.LoaderHooks
HttpLoaderHooks resolve.HttpLoaderHooks
}

func NewGraphQLHandler(opts HandlerOptions) *GraphQLHandler {
Expand All @@ -97,7 +96,6 @@ func NewGraphQLHandler(opts HandlerOptions) *GraphQLHandler {
rateLimitConfig: opts.RateLimitConfig,
subgraphErrorPropagation: opts.SubgraphErrorPropagation,
engineLoaderHooks: opts.EngineLoaderHooks,
httpLoaderHooks: opts.HttpLoaderHooks,
}
return graphQLHandler
}
Expand All @@ -123,7 +121,6 @@ type GraphQLHandler struct {
rateLimitConfig *config.RateLimitConfiguration
subgraphErrorPropagation config.SubgraphErrorPropagationConfiguration
engineLoaderHooks resolve.LoaderHooks
httpLoaderHooks resolve.HttpLoaderHooks

enableExecutionPlanCacheResponseHeader bool
enablePersistedOperationCacheResponseHeader bool
Expand Down Expand Up @@ -162,10 +159,6 @@ func (h *GraphQLHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx.SetEngineLoaderHooks(h.engineLoaderHooks)
}

if h.httpLoaderHooks != nil {
ctx.SetEngineHttpLoaderHooks(h.httpLoaderHooks)
}

ctx = h.configureRateLimiting(ctx)

switch p := requestContext.operation.preparedPlan.preparedPlan.(type) {
Expand Down
6 changes: 5 additions & 1 deletion router/internal/requestlogger/requestlogger.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"go.uber.org/zap/zapcore"
)

type ContextFunc func(fields []config.CustomAttribute, panic any, r *http.Request, re *http.Response) []zapcore.Field
type ContextFunc func(fields []config.CustomAttribute, panic any, r *http.Request, rh *http.Header) []zapcore.Field

// Option provides a functional approach to define
// configuration for a handler; such as setting the logging
Expand Down Expand Up @@ -169,6 +169,10 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

func (al accessLogger) getRequestFields(r *http.Request) []zapcore.Field {
if r == nil {
return al.baseFields
}

start := time.Now()
url := r.URL
path := url.Path
Expand Down
10 changes: 5 additions & 5 deletions router/internal/requestlogger/subgraphlogger.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package requestlogger

import (
"github.com/wundergraph/cosmo/router/pkg/config"
"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"net/http"
)

type accessLogger struct {
Expand Down Expand Up @@ -43,11 +43,11 @@ func NewSubgraphAccessLogger(logger *zap.Logger, opts SubgraphOptions) *Subgraph
}
}

func (h *SubgraphAccessLogger) WriteRequestLog(r *http.Request, rs *http.Response, subgraphFields []zap.Field) {
path := r.URL.Path
fields := h.accessLogger.getRequestFields(r)
func (h *SubgraphAccessLogger) WriteRequestLog(respInfo *resolve.ResponseInfo, subgraphFields []zap.Field) {
path := respInfo.Request.URL.Path
fields := h.accessLogger.getRequestFields(respInfo.Request)
if h.accessLogger.fieldsHandler != nil {
fields = append(fields, h.accessLogger.fieldsHandler(h.accessLogger.attributes, nil, r, rs)...)
fields = append(fields, h.accessLogger.fieldsHandler(h.accessLogger.attributes, nil, respInfo.Request, &respInfo.ResponseHeaders)...)
}

fields = append(subgraphFields, fields...)
Expand Down

0 comments on commit e18d81f

Please sign in to comment.