From 54d01ba7900c4b81ce81bf973c22409ab4d2c3bf Mon Sep 17 00:00:00 2001 From: winlin Date: Sat, 31 Aug 2024 18:20:18 +0800 Subject: [PATCH] Refine errors for HTTP streaming. --- proxy/http.go | 116 +++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 101 insertions(+), 15 deletions(-) diff --git a/proxy/http.go b/proxy/http.go index 6ba223e8cf1..453face23cc 100644 --- a/proxy/http.go +++ b/proxy/http.go @@ -6,16 +6,18 @@ package main import ( "context" "fmt" + "io" "net" "net/http" + "net/url" "os" "path" - "srs-proxy/errors" "strconv" "strings" "sync" "time" + "srs-proxy/errors" "srs-proxy/logger" ) @@ -168,7 +170,7 @@ func (v *HTTPStreaming) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } - if err := v.serve(ctx, w, r); err != nil { + handleErr := func(err error) { if perr, ok := err.(*RTMPProxyError); ok { if perr.isBackend { handleBackendErr(perr.err) @@ -178,10 +180,24 @@ func (v *HTTPStreaming) ServeHTTP(w http.ResponseWriter, r *http.Request) { } else { handleClientErr(err) } + } + + if err := v.serve(ctx, w, r); err != nil { + if merr, ok := err.(*RTMPMultipleError); ok { + // If multiple errors, handle all of them. + for _, err := range merr.errs { + handleErr(err) + } + } else { + // If single error, directly handle it. + handleErr(err) + } if !v.written { apiError(ctx, w, r, err) } + } else { + logger.Df(ctx, "HTTP client done") } } @@ -218,11 +234,18 @@ func (v *HTTPStreaming) serve(ctx context.Context, w http.ResponseWriter, r *htt } if err = v.serveByBackend(ctx, w, r, backend, streamURL); err != nil { - wrappedErr := errors.Wrapf(err, "serve %v by backend %+v", originalURL, backend) + extraMsg := fmt.Sprintf("serve %v by backend %+v", originalURL, backend) if perr, ok := err.(*RTMPProxyError); ok { - return &RTMPProxyError{perr.isBackend, wrappedErr} + return &RTMPProxyError{perr.isBackend, errors.Wrapf(perr.err, extraMsg)} + } else if merr, ok := err.(*RTMPMultipleError); ok { + var errs []error + for _, e := range merr.errs { + errs = append(errs, errors.Wrapf(e, extraMsg)) + } + return NewRTMPMultipleError(errs...) + } else { + return errors.Wrapf(err, extraMsg) } - return wrappedErr } return nil @@ -241,6 +264,19 @@ func (v *HTTPStreaming) serveByBackend(ctx context.Context, w http.ResponseWrite httpPort = int(iv) } + // If any goroutine quit, cancel another one. + parentCtx := ctx + ctx, cancel := context.WithCancel(ctx) + + go func() { + select { + case <-ctx.Done(): + case <-r.Context().Done(): + // If client request cancelled, cancel the proxy goroutine. + cancel() + } + }() + // Connect to backend SRS server via HTTP client. backendURL := fmt.Sprintf("http://%v:%v%s", backend.IP, httpPort, r.URL.Path) req, err := http.NewRequestWithContext(ctx, "GET", backendURL, nil) @@ -250,6 +286,14 @@ func (v *HTTPStreaming) serveByBackend(ctx context.Context, w http.ResponseWrite resp, err := http.DefaultClient.Do(req) if err != nil { + if urlErr, ok := err.(*url.Error); ok { + if urlErr.Err == io.EOF { + return &RTMPProxyError{true, errors.Errorf("do request to %v EOF", backendURL)} + } + if urlErr.Err == context.Canceled && r.Context().Err() != nil { + return &RTMPProxyError{false, errors.Wrapf(io.EOF, "client closed")} + } + } return &RTMPProxyError{true, errors.Wrapf(err, "do request to %v", backendURL)} } defer resp.Body.Close() @@ -267,19 +311,61 @@ func (v *HTTPStreaming) serveByBackend(ctx context.Context, w http.ResponseWrite } v.written = true + logger.Df(ctx, "HTTP start streaming") + + // For all proxy goroutines. + var wg sync.WaitGroup + defer wg.Wait() + + // Detect the client closed. + wg.Add(1) + var r0 error + go func() { + defer wg.Done() + defer cancel() + + r0 = func() error { + select { + case <-ctx.Done(): + return nil + case <-r.Context().Done(): + return &RTMPProxyError{false, errors.Wrapf(io.EOF, "client closed")} + } + }() + }() // Copy all data from backend to client. - buf := make([]byte, 4096) - for { - n, err := resp.Body.Read(buf) - if err != nil { - return &RTMPProxyError{true, errors.Wrapf(err, "read stream from %v", backendURL)} - } + wg.Add(1) + var r1 error + go func() { + defer wg.Done() + defer cancel() - if _, err := w.Write(buf[:n]); err != nil { - return &RTMPProxyError{false, errors.Wrapf(err, "write stream client")} - } + r1 = func() error { + buf := make([]byte, 4096) + for { + n, err := resp.Body.Read(buf) + if err != nil { + return &RTMPProxyError{true, errors.Wrapf(err, "read stream from %v", backendURL)} + } + + if _, err := w.Write(buf[:n]); err != nil { + return &RTMPProxyError{false, errors.Wrapf(err, "write stream client")} + } + } + }() + }() + + // Wait until all goroutine quit. + wg.Wait() + + // Reset the error if caused by another goroutine. + if errors.Cause(r0) == context.Canceled && parentCtx.Err() == nil { + r0 = nil + } + if errors.Cause(r1) == context.Canceled && parentCtx.Err() == nil { + r1 = nil } - return nil + return NewRTMPMultipleError(r0, r1, parentCtx.Err()) }