diff --git a/epoch.go b/epoch.go index f7f15ac8..3e85ecdd 100644 --- a/epoch.go +++ b/epoch.go @@ -332,10 +332,23 @@ func NewEpochFromConfig( minerIP := fmt.Sprintf("%s:%s", ip, port) klog.V(3).Infof("piece CID %s is stored on miner %s (%s)", piece.CommP, minerID, minerIP) formattedURL := fmt.Sprintf("http://%s/piece/%s", minerIP, piece.CommP.String()) - return splitcarfetcher.NewRemoteFileSplitCarReader( - piece.CommP.String(), - formattedURL, - ) + + { + rfspc, _, err := splitcarfetcher.NewRemoteHTTPFileAsIoReaderAt( + c.Context, + formattedURL, + ) + if err != nil { + return nil, fmt.Errorf("failed to create remote file split car reader from %q: %w", formattedURL, err) + } + + return &readCloserWrapper{ + rac: rfspc, + name: formattedURL, + size: rfspc.Size(), + isSplitCar: true, + }, nil + } }) if err != nil { return nil, fmt.Errorf("failed to open CAR file from pieces: %w", err) @@ -350,10 +363,24 @@ func NewEpochFromConfig( if !ok { return nil, fmt.Errorf("failed to find URL for piece CID %s", piece.CommP) } - return splitcarfetcher.NewRemoteFileSplitCarReader( - piece.CommP.String(), - pieceURL.URI.String(), - ) + + { + formattedURL := pieceURL.URI.String() + rfspc, _, err := splitcarfetcher.NewRemoteHTTPFileAsIoReaderAt( + c.Context, + formattedURL, + ) + if err != nil { + return nil, fmt.Errorf("failed to create remote file split car reader from %q: %w", formattedURL, err) + } + + return &readCloserWrapper{ + rac: rfspc, + name: formattedURL, + size: rfspc.Size(), + isSplitCar: true, + }, nil + } }) if err != nil { return nil, fmt.Errorf("failed to open CAR file from pieces: %w", err) diff --git a/http-client.go b/http-client.go index ddbe9651..06ab7d0f 100644 --- a/http-client.go +++ b/http-client.go @@ -1,44 +1 @@ package main - -import ( - "net" - "net/http" - "time" - - "github.com/klauspost/compress/gzhttp" -) - -var ( - defaultMaxIdleConnsPerHost = 100 - defaultTimeout = 1000 * time.Second - defaultKeepAlive = 180 * time.Second -) - -func newHTTPTransport() *http.Transport { - return &http.Transport{ - IdleConnTimeout: time.Minute, - MaxConnsPerHost: defaultMaxIdleConnsPerHost, - MaxIdleConnsPerHost: defaultMaxIdleConnsPerHost, - MaxIdleConns: defaultMaxIdleConnsPerHost, - Proxy: http.ProxyFromEnvironment, - DialContext: (&net.Dialer{ - Timeout: defaultTimeout, - KeepAlive: defaultKeepAlive, - }).DialContext, - ForceAttemptHTTP2: true, - // MaxIdleConns: 100, - TLSHandshakeTimeout: 10 * time.Second, - // ExpectContinueTimeout: 1 * time.Second, - } -} - -// newHTTPClient returns a new Client from the provided config. -// Client is safe for concurrent use by multiple goroutines. -func newHTTPClient() *http.Client { - tr := newHTTPTransport() - - return &http.Client{ - Timeout: defaultTimeout, - Transport: gzhttp.Transport(tr), - } -} diff --git a/http-range.go b/http-range.go index b9220353..04fa4083 100644 --- a/http-range.go +++ b/http-range.go @@ -1,15 +1,11 @@ package main import ( - "context" - "fmt" "io" - "net/http" "path/filepath" "strings" "time" - "github.com/goware/urlx" "k8s.io/klog/v2" ) @@ -18,167 +14,16 @@ type ReaderAtCloser interface { io.Closer } -func getContentSizeWithHeadOrZeroRange(url string) (int64, error) { - // try sending a HEAD request to the server to get the file size: - resp, err := http.Head(url) - if err != nil { - return 0, err - } - if resp.StatusCode != http.StatusOK { - // try sending a GET request with a zero range to the server to get the file size: - req := &http.Request{ - Method: "GET", - URL: resp.Request.URL, - Header: make(http.Header), - } - req.Header.Set("Range", "bytes=0-0") - resp, err = http.DefaultClient.Do(req) - if err != nil { - return 0, err - } - if resp.StatusCode != http.StatusPartialContent { - return 0, fmt.Errorf("unexpected status code: %d", resp.StatusCode) - } - // now find the content length: - contentRange := resp.Header.Get("Content-Range") - if contentRange == "" { - return 0, fmt.Errorf("missing Content-Range header") - } - var contentLength int64 - _, err := fmt.Sscanf(contentRange, "bytes 0-0/%d", &contentLength) - if err != nil { - return 0, err - } - return contentLength, nil - } - return resp.ContentLength, nil -} - -// remoteHTTPFileAsIoReaderAt returns a ReaderAtCloser for a remote file. -// The returned ReaderAtCloser is backed by a http.Client. -func remoteHTTPFileAsIoReaderAt(ctx context.Context, url string) (ReaderAtCloser, error) { - // send a request to the server to get the file size: - contentLength, err := getContentSizeWithHeadOrZeroRange(url) - if err != nil { - return nil, err - } - if contentLength == 0 { - return nil, fmt.Errorf("missing Content-Length/Content-Range header, or file is empty") - } - - // Create a cache with a default expiration time of 5 minutes, and which - // purges expired items every 10 minutes - rr := &HTTPSingleFileRemoteReaderAt{ - url: url, - contentLength: contentLength, - client: newHTTPClient(), - } - parsedURL, err := urlx.Parse(url) - if err != nil { - return nil, err - } - name := filepath.Base(parsedURL.Path) - - rc := NewRangeCache( - contentLength, - name, - func(p []byte, off int64) (n int, err error) { - return remoteReadAt(rr.client, rr.url, p, off) - }) - rc.StartCacheGC(ctx, 1*time.Minute) - rr.ca = rc - - return rr, nil -} - -type HTTPSingleFileRemoteReaderAt struct { - url string - contentLength int64 - client *http.Client - ca *RangeCache -} - -// Close implements io.Closer. -func (r *HTTPSingleFileRemoteReaderAt) Close() error { - r.client.CloseIdleConnections() - return r.ca.Close() -} - -func retryExpotentialBackoff( - ctx context.Context, - startDuration time.Duration, - maxRetries int, - fn func() error, -) error { - var err error - for i := 0; i < maxRetries; i++ { - err = fn() - if err == nil { - return nil - } - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(startDuration): - startDuration *= 2 - } - } - return fmt.Errorf("failed after %d retries; last error: %w", maxRetries, err) -} - -func (r *HTTPSingleFileRemoteReaderAt) ReadAt(p []byte, off int64) (n int, err error) { - if off >= r.contentLength { - return 0, io.EOF - } - v, err := r.ca.GetRange(context.Background(), off, int64(len(p))) - if err != nil { - return 0, err - } - n = copy(p, v) - if n < len(p) { - return n, io.ErrUnexpectedEOF - } - return n, nil -} - -func remoteReadAt(client *http.Client, url string, p []byte, off int64) (n int, err error) { - req, err := http.NewRequest("GET", url, nil) - if err != nil { - return 0, err - } - { - req.Header.Set("Connection", "keep-alive") - req.Header.Set("Keep-Alive", "timeout=600") - } - - req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", off, off+int64(len(p)))) - - var resp *http.Response - err = retryExpotentialBackoff( - context.Background(), - 100*time.Millisecond, - 3, - func() error { - resp, err = client.Do(req) - return err - }) - if err != nil { - return 0, err - } - defer resp.Body.Close() - { - n, err := io.ReadFull(resp.Body, p) - if err != nil { - return 0, err - } - return n, nil - } +type readCloserWrapper struct { + rac ReaderAtCloser + isRemote bool + isSplitCar bool + name string + size int64 } -type readCloserWrapper struct { - rac ReaderAtCloser - isRemote bool - name string +func (r *readCloserWrapper) Size() int64 { + return r.size } // when reading print a dot @@ -201,8 +46,12 @@ func (r *readCloserWrapper) ReadAt(p []byte, off int64) (n int, err error) { prefix = icon + azureBG("[READ-INDEX]") } // if has suffix .car, then it's a car file - if strings.HasSuffix(r.name, ".car") { - prefix = icon + purpleBG("[READ-CAR]") + if strings.HasSuffix(r.name, ".car") || r.isSplitCar { + if r.isSplitCar { + prefix = icon + azureBG("[READ-SPLIT-CAR]") + } else { + prefix = icon + purpleBG("[READ-CAR]") + } } klog.V(5).Infof(prefix+" %s:%d+%d (%s)\n", filepath.Base(r.name), off, len(p), took) } diff --git a/range-cache.go b/range-cache/range-cache.go similarity index 99% rename from range-cache.go rename to range-cache/range-cache.go index e8c84cb4..46d86e40 100644 --- a/range-cache.go +++ b/range-cache/range-cache.go @@ -1,4 +1,4 @@ -package main +package rangecache import ( "context" diff --git a/range-cache_test.go b/range-cache/range-cache_test.go similarity index 97% rename from range-cache_test.go rename to range-cache/range-cache_test.go index 794e0a9e..4d4e9733 100644 --- a/range-cache_test.go +++ b/range-cache/range-cache_test.go @@ -1,4 +1,4 @@ -package main +package rangecache import ( "bytes" diff --git a/split-car-fetcher/fetcher.go b/split-car-fetcher/fetcher.go index c43e692a..6db14931 100644 --- a/split-car-fetcher/fetcher.go +++ b/split-car-fetcher/fetcher.go @@ -100,60 +100,6 @@ func GetContentSizeWithHeadOrZeroRange(url string) (int64, error) { return resp.ContentLength, nil } -type RemoteFileSplitCarReader struct { - commP string - url string - size int64 - httpClient *http.Client -} - -func NewRemoteFileSplitCarReader(commP string, url string) (*RemoteFileSplitCarReader, error) { - size, err := GetContentSizeWithHeadOrZeroRange(url) - if err != nil { - return nil, fmt.Errorf("failed to get content size from %q: %s", url, err) - } - return &RemoteFileSplitCarReader{ - commP: commP, - url: url, - size: size, - httpClient: http.DefaultClient, - }, nil -} - -func (fscr *RemoteFileSplitCarReader) ReadAt(p []byte, off int64) (n int, err error) { - req, err := http.NewRequest("GET", fscr.url, nil) - if err != nil { - return 0, err - } - req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", off, off+int64(len(p))-1)) - { - req.Header.Set("Connection", "keep-alive") - req.Header.Set("Keep-Alive", "timeout=600") - } - resp, err := fscr.httpClient.Do(req) - if err != nil { - return 0, err - } - defer resp.Body.Close() - if resp.StatusCode != http.StatusPartialContent { - return 0, fmt.Errorf("GET %q: unexpected status code: %d", fscr.url, resp.StatusCode) - } - n, err = io.ReadFull(resp.Body, p) - if err != nil { - return 0, err - } - return n, nil -} - -func (fscr *RemoteFileSplitCarReader) Close() error { - fscr.httpClient.CloseIdleConnections() - return nil -} - -func (fscr *RemoteFileSplitCarReader) Size() int64 { - return fscr.size -} - func NewSplitCarReader( files *carlet.CarPiecesAndMetadata, readerCreator SplitCarFileReaderCreator, @@ -216,7 +162,7 @@ func NewSplitCarReader( } // if remote, then the file must be at least as header size + content size: - if _, ok := fi.(*RemoteFileSplitCarReader); ok { + if _, ok := fi.(*HTTPSingleFileRemoteReaderAt); ok { expectedMinSize := int(cf.HeaderSize) + int(cf.ContentSize) if size < expectedMinSize { return nil, fmt.Errorf( diff --git a/split-car-fetcher/http.go b/split-car-fetcher/http.go new file mode 100644 index 00000000..cc9b1456 --- /dev/null +++ b/split-car-fetcher/http.go @@ -0,0 +1,44 @@ +package splitcarfetcher + +import ( + "net" + "net/http" + "time" + + "github.com/klauspost/compress/gzhttp" +) + +var ( + DefaultMaxIdleConnsPerHost = 100 + DefaultTimeout = 1000 * time.Second + DefaultKeepAlive = 180 * time.Second +) + +func NewHTTPTransport() *http.Transport { + return &http.Transport{ + IdleConnTimeout: time.Minute, + MaxConnsPerHost: DefaultMaxIdleConnsPerHost, + MaxIdleConnsPerHost: DefaultMaxIdleConnsPerHost, + MaxIdleConns: DefaultMaxIdleConnsPerHost, + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: DefaultTimeout, + KeepAlive: DefaultKeepAlive, + }).DialContext, + ForceAttemptHTTP2: true, + // MaxIdleConns: 100, + TLSHandshakeTimeout: 10 * time.Second, + // ExpectContinueTimeout: 1 * time.Second, + } +} + +// NewHTTPClient returns a new Client from the provided config. +// Client is safe for concurrent use by multiple goroutines. +func NewHTTPClient() *http.Client { + tr := NewHTTPTransport() + + return &http.Client{ + Timeout: DefaultTimeout, + Transport: gzhttp.Transport(tr), + } +} diff --git a/split-car-fetcher/remote-file.go b/split-car-fetcher/remote-file.go new file mode 100644 index 00000000..c5c179de --- /dev/null +++ b/split-car-fetcher/remote-file.go @@ -0,0 +1,139 @@ +package splitcarfetcher + +import ( + "context" + "fmt" + "io" + "net/http" + "path/filepath" + "time" + + "github.com/goware/urlx" + rangecache "github.com/rpcpool/yellowstone-faithful/range-cache" +) + +// NewRemoteHTTPFileAsIoReaderAt returns a ReaderAtCloser for a remote file. +// The returned ReaderAtCloser is backed by a http.Client. +func NewRemoteHTTPFileAsIoReaderAt(ctx context.Context, url string) (ReaderAtCloserSize, int64, error) { + // send a request to the server to get the file size: + contentLength, err := GetContentSizeWithHeadOrZeroRange(url) + if err != nil { + return nil, 0, err + } + if contentLength == 0 { + return nil, 0, fmt.Errorf("missing Content-Length/Content-Range header, or file is empty") + } + + // Create a cache with a default expiration time of 5 minutes, and which + // purges expired items every 10 minutes + rr := &HTTPSingleFileRemoteReaderAt{ + url: url, + contentLength: contentLength, + client: NewHTTPClient(), + } + parsedURL, err := urlx.Parse(url) + if err != nil { + return nil, 0, err + } + name := filepath.Base(parsedURL.Path) + + rc := rangecache.NewRangeCache( + contentLength, + name, + func(p []byte, off int64) (n int, err error) { + return remoteReadAt(rr.client, rr.url, p, off) + }) + rc.StartCacheGC(ctx, 1*time.Minute) + rr.ca = rc + + return rr, contentLength, nil +} + +type HTTPSingleFileRemoteReaderAt struct { + url string + contentLength int64 + client *http.Client + ca *rangecache.RangeCache +} + +// Close implements io.Closer. +func (r *HTTPSingleFileRemoteReaderAt) Close() error { + r.client.CloseIdleConnections() + return r.ca.Close() +} + +// Size returns the size of the file. +func (r *HTTPSingleFileRemoteReaderAt) Size() int64 { + return r.contentLength +} + +func retryExpotentialBackoff( + ctx context.Context, + startDuration time.Duration, + maxRetries int, + fn func() error, +) error { + var err error + for i := 0; i < maxRetries; i++ { + err = fn() + if err == nil { + return nil + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(startDuration): + startDuration *= 2 + } + } + return fmt.Errorf("failed after %d retries; last error: %w", maxRetries, err) +} + +func (r *HTTPSingleFileRemoteReaderAt) ReadAt(p []byte, off int64) (n int, err error) { + if off >= r.contentLength { + return 0, io.EOF + } + v, err := r.ca.GetRange(context.Background(), off, int64(len(p))) + if err != nil { + return 0, err + } + n = copy(p, v) + if n < len(p) { + return n, io.ErrUnexpectedEOF + } + return n, nil +} + +func remoteReadAt(client *http.Client, url string, p []byte, off int64) (n int, err error) { + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return 0, err + } + { + req.Header.Set("Connection", "keep-alive") + req.Header.Set("Keep-Alive", "timeout=600") + } + + req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", off, off+int64(len(p)))) + + var resp *http.Response + err = retryExpotentialBackoff( + context.Background(), + 100*time.Millisecond, + 3, + func() error { + resp, err = client.Do(req) + return err + }) + if err != nil { + return 0, err + } + defer resp.Body.Close() + { + n, err := io.ReadFull(resp.Body, p) + if err != nil { + return 0, err + } + return n, nil + } +} diff --git a/storage.go b/storage.go index fd6d37ba..ddc4995a 100644 --- a/storage.go +++ b/storage.go @@ -13,6 +13,7 @@ import ( cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/rpcpool/yellowstone-faithful/ipld/ipldbindcode" solanatxmetaparsers "github.com/rpcpool/yellowstone-faithful/solana-tx-meta-parsers" + splitcarfetcher "github.com/rpcpool/yellowstone-faithful/split-car-fetcher" "golang.org/x/exp/mmap" "k8s.io/klog/v2" ) @@ -28,9 +29,9 @@ func openIndexStorage( where = strings.TrimSpace(where) if strings.HasPrefix(where, "http://") || strings.HasPrefix(where, "https://") { klog.Infof("opening index file from %q as HTTP remote file", where) - rac, err := remoteHTTPFileAsIoReaderAt(ctx, where) + rac, size, err := splitcarfetcher.NewRemoteHTTPFileAsIoReaderAt(ctx, where) if err != nil { - return nil, fmt.Errorf("failed to open remote index file: %w", err) + return nil, fmt.Errorf("failed to open remote index file %q: %w", where, err) } if !klog.V(5).Enabled() { return rac, nil @@ -39,6 +40,7 @@ func openIndexStorage( rac: rac, name: where, isRemote: true, + size: size, }, nil } // TODO: add support for IPFS gateways. @@ -61,13 +63,14 @@ func openCarStorage(ctx context.Context, where string) (*carv2.Reader, ReaderAtC where = strings.TrimSpace(where) if strings.HasPrefix(where, "http://") || strings.HasPrefix(where, "https://") { klog.Infof("opening CAR file from %q as HTTP remote file", where) - rem, err := remoteHTTPFileAsIoReaderAt(ctx, where) + rem, size, err := splitcarfetcher.NewRemoteHTTPFileAsIoReaderAt(ctx, where) if err != nil { - return nil, nil, fmt.Errorf("failed to open remote CAR file: %w", err) + return nil, nil, fmt.Errorf("failed to open remote CAR file %q: %w", where, err) } return nil, &readCloserWrapper{ rac: rem, name: where, + size: size, }, nil } // TODO: add support for IPFS gateways.