From 43960694a24d1830ecb9d2823915e49922014ed2 Mon Sep 17 00:00:00 2001 From: gagliardetto Date: Wed, 21 Feb 2024 15:48:16 +0100 Subject: [PATCH] Add same speed improvements to split car fetching as for remote car (single file). --- epoch.go | 8 +- http-client.go | 43 ----- http-range.go | 161 ------------------ range-cache.go => range-cache/range-cache.go | 2 +- .../range-cache_test.go | 2 +- split-car-fetcher/fetcher.go | 56 +----- split-car-fetcher/http.go | 44 +++++ split-car-fetcher/remote-file.go | 139 +++++++++++++++ storage.go | 5 +- 9 files changed, 193 insertions(+), 267 deletions(-) rename range-cache.go => range-cache/range-cache.go (99%) rename range-cache_test.go => range-cache/range-cache_test.go (97%) create mode 100644 split-car-fetcher/http.go create mode 100644 split-car-fetcher/remote-file.go diff --git a/epoch.go b/epoch.go index 5bbb314f..8d207a5c 100644 --- a/epoch.go +++ b/epoch.go @@ -334,8 +334,8 @@ func NewEpochFromConfig( formattedURL := fmt.Sprintf("http://%s/piece/%s", minerIP, piece.CommP.String()) { - rfspc, err := splitcarfetcher.NewRemoteFileSplitCarReader( - piece.CommP.String(), + rfspc, _, err := splitcarfetcher.NewRemoteHTTPFileAsIoReaderAt( + c.Context, formattedURL, ) if err != nil { @@ -366,8 +366,8 @@ func NewEpochFromConfig( { formattedURL := pieceURL.URI.String() - rfspc, err := splitcarfetcher.NewRemoteFileSplitCarReader( - piece.CommP.String(), + rfspc, _, err := splitcarfetcher.NewRemoteHTTPFileAsIoReaderAt( + c.Context, formattedURL, ) if err != nil { diff --git a/http-client.go b/http-client.go index ddbe9651..06ab7d0f 100644 --- a/http-client.go +++ b/http-client.go @@ -1,44 +1 @@ package main - -import ( - "net" - "net/http" - "time" - - "github.com/klauspost/compress/gzhttp" -) - -var ( - defaultMaxIdleConnsPerHost = 100 - defaultTimeout = 1000 * time.Second - defaultKeepAlive = 180 * time.Second -) - -func newHTTPTransport() *http.Transport { - return &http.Transport{ - IdleConnTimeout: time.Minute, - MaxConnsPerHost: defaultMaxIdleConnsPerHost, - MaxIdleConnsPerHost: defaultMaxIdleConnsPerHost, - MaxIdleConns: defaultMaxIdleConnsPerHost, - Proxy: http.ProxyFromEnvironment, - DialContext: (&net.Dialer{ - Timeout: defaultTimeout, - KeepAlive: defaultKeepAlive, - }).DialContext, - ForceAttemptHTTP2: true, - // MaxIdleConns: 100, - TLSHandshakeTimeout: 10 * time.Second, - // ExpectContinueTimeout: 1 * time.Second, - } -} - -// newHTTPClient returns a new Client from the provided config. -// Client is safe for concurrent use by multiple goroutines. -func newHTTPClient() *http.Client { - tr := newHTTPTransport() - - return &http.Client{ - Timeout: defaultTimeout, - Transport: gzhttp.Transport(tr), - } -} diff --git a/http-range.go b/http-range.go index 451397ab..04fa4083 100644 --- a/http-range.go +++ b/http-range.go @@ -1,15 +1,11 @@ package main import ( - "context" - "fmt" "io" - "net/http" "path/filepath" "strings" "time" - "github.com/goware/urlx" "k8s.io/klog/v2" ) @@ -18,163 +14,6 @@ type ReaderAtCloser interface { io.Closer } -func getContentSizeWithHeadOrZeroRange(url string) (int64, error) { - // try sending a HEAD request to the server to get the file size: - resp, err := http.Head(url) - if err != nil { - return 0, err - } - if resp.StatusCode != http.StatusOK { - // try sending a GET request with a zero range to the server to get the file size: - req := &http.Request{ - Method: "GET", - URL: resp.Request.URL, - Header: make(http.Header), - } - req.Header.Set("Range", "bytes=0-0") - resp, err = http.DefaultClient.Do(req) - if err != nil { - return 0, err - } - if resp.StatusCode != http.StatusPartialContent { - return 0, fmt.Errorf("unexpected status code: %d", resp.StatusCode) - } - // now find the content length: - contentRange := resp.Header.Get("Content-Range") - if contentRange == "" { - return 0, fmt.Errorf("missing Content-Range header") - } - var contentLength int64 - _, err := fmt.Sscanf(contentRange, "bytes 0-0/%d", &contentLength) - if err != nil { - return 0, err - } - return contentLength, nil - } - return resp.ContentLength, nil -} - -// remoteHTTPFileAsIoReaderAt returns a ReaderAtCloser for a remote file. -// The returned ReaderAtCloser is backed by a http.Client. -func remoteHTTPFileAsIoReaderAt(ctx context.Context, url string) (ReaderAtCloser, int64, error) { - // send a request to the server to get the file size: - contentLength, err := getContentSizeWithHeadOrZeroRange(url) - if err != nil { - return nil, 0, err - } - if contentLength == 0 { - return nil, 0, fmt.Errorf("missing Content-Length/Content-Range header, or file is empty") - } - - // Create a cache with a default expiration time of 5 minutes, and which - // purges expired items every 10 minutes - rr := &HTTPSingleFileRemoteReaderAt{ - url: url, - contentLength: contentLength, - client: newHTTPClient(), - } - parsedURL, err := urlx.Parse(url) - if err != nil { - return nil, 0, err - } - name := filepath.Base(parsedURL.Path) - - rc := NewRangeCache( - contentLength, - name, - func(p []byte, off int64) (n int, err error) { - return remoteReadAt(rr.client, rr.url, p, off) - }) - rc.StartCacheGC(ctx, 1*time.Minute) - rr.ca = rc - - return rr, contentLength, nil -} - -type HTTPSingleFileRemoteReaderAt struct { - url string - contentLength int64 - client *http.Client - ca *RangeCache -} - -// Close implements io.Closer. -func (r *HTTPSingleFileRemoteReaderAt) Close() error { - r.client.CloseIdleConnections() - return r.ca.Close() -} - -func retryExpotentialBackoff( - ctx context.Context, - startDuration time.Duration, - maxRetries int, - fn func() error, -) error { - var err error - for i := 0; i < maxRetries; i++ { - err = fn() - if err == nil { - return nil - } - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(startDuration): - startDuration *= 2 - } - } - return fmt.Errorf("failed after %d retries; last error: %w", maxRetries, err) -} - -func (r *HTTPSingleFileRemoteReaderAt) ReadAt(p []byte, off int64) (n int, err error) { - if off >= r.contentLength { - return 0, io.EOF - } - v, err := r.ca.GetRange(context.Background(), off, int64(len(p))) - if err != nil { - return 0, err - } - n = copy(p, v) - if n < len(p) { - return n, io.ErrUnexpectedEOF - } - return n, nil -} - -func remoteReadAt(client *http.Client, url string, p []byte, off int64) (n int, err error) { - req, err := http.NewRequest("GET", url, nil) - if err != nil { - return 0, err - } - { - req.Header.Set("Connection", "keep-alive") - req.Header.Set("Keep-Alive", "timeout=600") - } - - req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", off, off+int64(len(p)))) - - var resp *http.Response - err = retryExpotentialBackoff( - context.Background(), - 100*time.Millisecond, - 3, - func() error { - resp, err = client.Do(req) - return err - }) - if err != nil { - return 0, err - } - defer resp.Body.Close() - { - n, err := io.ReadFull(resp.Body, p) - if err != nil { - return 0, err - } - return n, nil - } -} - type readCloserWrapper struct { rac ReaderAtCloser isRemote bool diff --git a/range-cache.go b/range-cache/range-cache.go similarity index 99% rename from range-cache.go rename to range-cache/range-cache.go index e8c84cb4..46d86e40 100644 --- a/range-cache.go +++ b/range-cache/range-cache.go @@ -1,4 +1,4 @@ -package main +package rangecache import ( "context" diff --git a/range-cache_test.go b/range-cache/range-cache_test.go similarity index 97% rename from range-cache_test.go rename to range-cache/range-cache_test.go index 794e0a9e..4d4e9733 100644 --- a/range-cache_test.go +++ b/range-cache/range-cache_test.go @@ -1,4 +1,4 @@ -package main +package rangecache import ( "bytes" diff --git a/split-car-fetcher/fetcher.go b/split-car-fetcher/fetcher.go index c43e692a..6db14931 100644 --- a/split-car-fetcher/fetcher.go +++ b/split-car-fetcher/fetcher.go @@ -100,60 +100,6 @@ func GetContentSizeWithHeadOrZeroRange(url string) (int64, error) { return resp.ContentLength, nil } -type RemoteFileSplitCarReader struct { - commP string - url string - size int64 - httpClient *http.Client -} - -func NewRemoteFileSplitCarReader(commP string, url string) (*RemoteFileSplitCarReader, error) { - size, err := GetContentSizeWithHeadOrZeroRange(url) - if err != nil { - return nil, fmt.Errorf("failed to get content size from %q: %s", url, err) - } - return &RemoteFileSplitCarReader{ - commP: commP, - url: url, - size: size, - httpClient: http.DefaultClient, - }, nil -} - -func (fscr *RemoteFileSplitCarReader) ReadAt(p []byte, off int64) (n int, err error) { - req, err := http.NewRequest("GET", fscr.url, nil) - if err != nil { - return 0, err - } - req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", off, off+int64(len(p))-1)) - { - req.Header.Set("Connection", "keep-alive") - req.Header.Set("Keep-Alive", "timeout=600") - } - resp, err := fscr.httpClient.Do(req) - if err != nil { - return 0, err - } - defer resp.Body.Close() - if resp.StatusCode != http.StatusPartialContent { - return 0, fmt.Errorf("GET %q: unexpected status code: %d", fscr.url, resp.StatusCode) - } - n, err = io.ReadFull(resp.Body, p) - if err != nil { - return 0, err - } - return n, nil -} - -func (fscr *RemoteFileSplitCarReader) Close() error { - fscr.httpClient.CloseIdleConnections() - return nil -} - -func (fscr *RemoteFileSplitCarReader) Size() int64 { - return fscr.size -} - func NewSplitCarReader( files *carlet.CarPiecesAndMetadata, readerCreator SplitCarFileReaderCreator, @@ -216,7 +162,7 @@ func NewSplitCarReader( } // if remote, then the file must be at least as header size + content size: - if _, ok := fi.(*RemoteFileSplitCarReader); ok { + if _, ok := fi.(*HTTPSingleFileRemoteReaderAt); ok { expectedMinSize := int(cf.HeaderSize) + int(cf.ContentSize) if size < expectedMinSize { return nil, fmt.Errorf( diff --git a/split-car-fetcher/http.go b/split-car-fetcher/http.go new file mode 100644 index 00000000..cc9b1456 --- /dev/null +++ b/split-car-fetcher/http.go @@ -0,0 +1,44 @@ +package splitcarfetcher + +import ( + "net" + "net/http" + "time" + + "github.com/klauspost/compress/gzhttp" +) + +var ( + DefaultMaxIdleConnsPerHost = 100 + DefaultTimeout = 1000 * time.Second + DefaultKeepAlive = 180 * time.Second +) + +func NewHTTPTransport() *http.Transport { + return &http.Transport{ + IdleConnTimeout: time.Minute, + MaxConnsPerHost: DefaultMaxIdleConnsPerHost, + MaxIdleConnsPerHost: DefaultMaxIdleConnsPerHost, + MaxIdleConns: DefaultMaxIdleConnsPerHost, + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: DefaultTimeout, + KeepAlive: DefaultKeepAlive, + }).DialContext, + ForceAttemptHTTP2: true, + // MaxIdleConns: 100, + TLSHandshakeTimeout: 10 * time.Second, + // ExpectContinueTimeout: 1 * time.Second, + } +} + +// NewHTTPClient returns a new Client from the provided config. +// Client is safe for concurrent use by multiple goroutines. +func NewHTTPClient() *http.Client { + tr := NewHTTPTransport() + + return &http.Client{ + Timeout: DefaultTimeout, + Transport: gzhttp.Transport(tr), + } +} diff --git a/split-car-fetcher/remote-file.go b/split-car-fetcher/remote-file.go new file mode 100644 index 00000000..c5c179de --- /dev/null +++ b/split-car-fetcher/remote-file.go @@ -0,0 +1,139 @@ +package splitcarfetcher + +import ( + "context" + "fmt" + "io" + "net/http" + "path/filepath" + "time" + + "github.com/goware/urlx" + rangecache "github.com/rpcpool/yellowstone-faithful/range-cache" +) + +// NewRemoteHTTPFileAsIoReaderAt returns a ReaderAtCloser for a remote file. +// The returned ReaderAtCloser is backed by a http.Client. +func NewRemoteHTTPFileAsIoReaderAt(ctx context.Context, url string) (ReaderAtCloserSize, int64, error) { + // send a request to the server to get the file size: + contentLength, err := GetContentSizeWithHeadOrZeroRange(url) + if err != nil { + return nil, 0, err + } + if contentLength == 0 { + return nil, 0, fmt.Errorf("missing Content-Length/Content-Range header, or file is empty") + } + + // Create a cache with a default expiration time of 5 minutes, and which + // purges expired items every 10 minutes + rr := &HTTPSingleFileRemoteReaderAt{ + url: url, + contentLength: contentLength, + client: NewHTTPClient(), + } + parsedURL, err := urlx.Parse(url) + if err != nil { + return nil, 0, err + } + name := filepath.Base(parsedURL.Path) + + rc := rangecache.NewRangeCache( + contentLength, + name, + func(p []byte, off int64) (n int, err error) { + return remoteReadAt(rr.client, rr.url, p, off) + }) + rc.StartCacheGC(ctx, 1*time.Minute) + rr.ca = rc + + return rr, contentLength, nil +} + +type HTTPSingleFileRemoteReaderAt struct { + url string + contentLength int64 + client *http.Client + ca *rangecache.RangeCache +} + +// Close implements io.Closer. +func (r *HTTPSingleFileRemoteReaderAt) Close() error { + r.client.CloseIdleConnections() + return r.ca.Close() +} + +// Size returns the size of the file. +func (r *HTTPSingleFileRemoteReaderAt) Size() int64 { + return r.contentLength +} + +func retryExpotentialBackoff( + ctx context.Context, + startDuration time.Duration, + maxRetries int, + fn func() error, +) error { + var err error + for i := 0; i < maxRetries; i++ { + err = fn() + if err == nil { + return nil + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(startDuration): + startDuration *= 2 + } + } + return fmt.Errorf("failed after %d retries; last error: %w", maxRetries, err) +} + +func (r *HTTPSingleFileRemoteReaderAt) ReadAt(p []byte, off int64) (n int, err error) { + if off >= r.contentLength { + return 0, io.EOF + } + v, err := r.ca.GetRange(context.Background(), off, int64(len(p))) + if err != nil { + return 0, err + } + n = copy(p, v) + if n < len(p) { + return n, io.ErrUnexpectedEOF + } + return n, nil +} + +func remoteReadAt(client *http.Client, url string, p []byte, off int64) (n int, err error) { + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return 0, err + } + { + req.Header.Set("Connection", "keep-alive") + req.Header.Set("Keep-Alive", "timeout=600") + } + + req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", off, off+int64(len(p)))) + + var resp *http.Response + err = retryExpotentialBackoff( + context.Background(), + 100*time.Millisecond, + 3, + func() error { + resp, err = client.Do(req) + return err + }) + if err != nil { + return 0, err + } + defer resp.Body.Close() + { + n, err := io.ReadFull(resp.Body, p) + if err != nil { + return 0, err + } + return n, nil + } +} diff --git a/storage.go b/storage.go index 85789ab9..e553ac51 100644 --- a/storage.go +++ b/storage.go @@ -13,6 +13,7 @@ import ( cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/rpcpool/yellowstone-faithful/ipld/ipldbindcode" solanatxmetaparsers "github.com/rpcpool/yellowstone-faithful/solana-tx-meta-parsers" + splitcarfetcher "github.com/rpcpool/yellowstone-faithful/split-car-fetcher" "golang.org/x/exp/mmap" "k8s.io/klog/v2" ) @@ -28,7 +29,7 @@ func openIndexStorage( where = strings.TrimSpace(where) if strings.HasPrefix(where, "http://") || strings.HasPrefix(where, "https://") { klog.Infof("opening index file from %q as HTTP remote file", where) - rac, size, err := remoteHTTPFileAsIoReaderAt(ctx, where) + rac, size, err := splitcarfetcher.NewRemoteHTTPFileAsIoReaderAt(ctx, where) if err != nil { return nil, fmt.Errorf("failed to open remote index file: %w", err) } @@ -62,7 +63,7 @@ func openCarStorage(ctx context.Context, where string) (*carv2.Reader, ReaderAtC where = strings.TrimSpace(where) if strings.HasPrefix(where, "http://") || strings.HasPrefix(where, "https://") { klog.Infof("opening CAR file from %q as HTTP remote file", where) - rem, size, err := remoteHTTPFileAsIoReaderAt(ctx, where) + rem, size, err := splitcarfetcher.NewRemoteHTTPFileAsIoReaderAt(ctx, where) if err != nil { return nil, nil, fmt.Errorf("failed to open remote CAR file: %w", err) }