Skip to content

Commit

Permalink
Add read stats for split cars & speed up loading (#91)
Browse files Browse the repository at this point in the history
* Add read stats for split cars

* Add same speed improvements to split car fetching as for remote car (single file).

* Improve errors
  • Loading branch information
gagliardetto authored Feb 22, 2024
1 parent e183b93 commit 97bc2fd
Show file tree
Hide file tree
Showing 9 changed files with 242 additions and 277 deletions.
43 changes: 35 additions & 8 deletions epoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,10 +332,23 @@ func NewEpochFromConfig(
minerIP := fmt.Sprintf("%s:%s", ip, port)
klog.V(3).Infof("piece CID %s is stored on miner %s (%s)", piece.CommP, minerID, minerIP)
formattedURL := fmt.Sprintf("http://%s/piece/%s", minerIP, piece.CommP.String())
return splitcarfetcher.NewRemoteFileSplitCarReader(
piece.CommP.String(),
formattedURL,
)

{
rfspc, _, err := splitcarfetcher.NewRemoteHTTPFileAsIoReaderAt(
c.Context,
formattedURL,
)
if err != nil {
return nil, fmt.Errorf("failed to create remote file split car reader from %q: %w", formattedURL, err)
}

return &readCloserWrapper{
rac: rfspc,
name: formattedURL,
size: rfspc.Size(),
isSplitCar: true,
}, nil
}
})
if err != nil {
return nil, fmt.Errorf("failed to open CAR file from pieces: %w", err)
Expand All @@ -350,10 +363,24 @@ func NewEpochFromConfig(
if !ok {
return nil, fmt.Errorf("failed to find URL for piece CID %s", piece.CommP)
}
return splitcarfetcher.NewRemoteFileSplitCarReader(
piece.CommP.String(),
pieceURL.URI.String(),
)

{
formattedURL := pieceURL.URI.String()
rfspc, _, err := splitcarfetcher.NewRemoteHTTPFileAsIoReaderAt(
c.Context,
formattedURL,
)
if err != nil {
return nil, fmt.Errorf("failed to create remote file split car reader from %q: %w", formattedURL, err)
}

return &readCloserWrapper{
rac: rfspc,
name: formattedURL,
size: rfspc.Size(),
isSplitCar: true,
}, nil
}
})
if err != nil {
return nil, fmt.Errorf("failed to open CAR file from pieces: %w", err)
Expand Down
43 changes: 0 additions & 43 deletions http-client.go
Original file line number Diff line number Diff line change
@@ -1,44 +1 @@
package main

import (
"net"
"net/http"
"time"

"github.com/klauspost/compress/gzhttp"
)

var (
defaultMaxIdleConnsPerHost = 100
defaultTimeout = 1000 * time.Second
defaultKeepAlive = 180 * time.Second
)

func newHTTPTransport() *http.Transport {
return &http.Transport{
IdleConnTimeout: time.Minute,
MaxConnsPerHost: defaultMaxIdleConnsPerHost,
MaxIdleConnsPerHost: defaultMaxIdleConnsPerHost,
MaxIdleConns: defaultMaxIdleConnsPerHost,
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: defaultTimeout,
KeepAlive: defaultKeepAlive,
}).DialContext,
ForceAttemptHTTP2: true,
// MaxIdleConns: 100,
TLSHandshakeTimeout: 10 * time.Second,
// ExpectContinueTimeout: 1 * time.Second,
}
}

// newHTTPClient returns a new Client from the provided config.
// Client is safe for concurrent use by multiple goroutines.
func newHTTPClient() *http.Client {
tr := newHTTPTransport()

return &http.Client{
Timeout: defaultTimeout,
Transport: gzhttp.Transport(tr),
}
}
179 changes: 14 additions & 165 deletions http-range.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
package main

import (
"context"
"fmt"
"io"
"net/http"
"path/filepath"
"strings"
"time"

"github.com/goware/urlx"
"k8s.io/klog/v2"
)

Expand All @@ -18,167 +14,16 @@ type ReaderAtCloser interface {
io.Closer
}

func getContentSizeWithHeadOrZeroRange(url string) (int64, error) {
// try sending a HEAD request to the server to get the file size:
resp, err := http.Head(url)
if err != nil {
return 0, err
}
if resp.StatusCode != http.StatusOK {
// try sending a GET request with a zero range to the server to get the file size:
req := &http.Request{
Method: "GET",
URL: resp.Request.URL,
Header: make(http.Header),
}
req.Header.Set("Range", "bytes=0-0")
resp, err = http.DefaultClient.Do(req)
if err != nil {
return 0, err
}
if resp.StatusCode != http.StatusPartialContent {
return 0, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}
// now find the content length:
contentRange := resp.Header.Get("Content-Range")
if contentRange == "" {
return 0, fmt.Errorf("missing Content-Range header")
}
var contentLength int64
_, err := fmt.Sscanf(contentRange, "bytes 0-0/%d", &contentLength)
if err != nil {
return 0, err
}
return contentLength, nil
}
return resp.ContentLength, nil
}

// remoteHTTPFileAsIoReaderAt returns a ReaderAtCloser for a remote file.
// The returned ReaderAtCloser is backed by a http.Client.
func remoteHTTPFileAsIoReaderAt(ctx context.Context, url string) (ReaderAtCloser, error) {
// send a request to the server to get the file size:
contentLength, err := getContentSizeWithHeadOrZeroRange(url)
if err != nil {
return nil, err
}
if contentLength == 0 {
return nil, fmt.Errorf("missing Content-Length/Content-Range header, or file is empty")
}

// Create a cache with a default expiration time of 5 minutes, and which
// purges expired items every 10 minutes
rr := &HTTPSingleFileRemoteReaderAt{
url: url,
contentLength: contentLength,
client: newHTTPClient(),
}
parsedURL, err := urlx.Parse(url)
if err != nil {
return nil, err
}
name := filepath.Base(parsedURL.Path)

rc := NewRangeCache(
contentLength,
name,
func(p []byte, off int64) (n int, err error) {
return remoteReadAt(rr.client, rr.url, p, off)
})
rc.StartCacheGC(ctx, 1*time.Minute)
rr.ca = rc

return rr, nil
}

type HTTPSingleFileRemoteReaderAt struct {
url string
contentLength int64
client *http.Client
ca *RangeCache
}

// Close implements io.Closer.
func (r *HTTPSingleFileRemoteReaderAt) Close() error {
r.client.CloseIdleConnections()
return r.ca.Close()
}

func retryExpotentialBackoff(
ctx context.Context,
startDuration time.Duration,
maxRetries int,
fn func() error,
) error {
var err error
for i := 0; i < maxRetries; i++ {
err = fn()
if err == nil {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(startDuration):
startDuration *= 2
}
}
return fmt.Errorf("failed after %d retries; last error: %w", maxRetries, err)
}

func (r *HTTPSingleFileRemoteReaderAt) ReadAt(p []byte, off int64) (n int, err error) {
if off >= r.contentLength {
return 0, io.EOF
}
v, err := r.ca.GetRange(context.Background(), off, int64(len(p)))
if err != nil {
return 0, err
}
n = copy(p, v)
if n < len(p) {
return n, io.ErrUnexpectedEOF
}
return n, nil
}

func remoteReadAt(client *http.Client, url string, p []byte, off int64) (n int, err error) {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return 0, err
}
{
req.Header.Set("Connection", "keep-alive")
req.Header.Set("Keep-Alive", "timeout=600")
}

req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", off, off+int64(len(p))))

var resp *http.Response
err = retryExpotentialBackoff(
context.Background(),
100*time.Millisecond,
3,
func() error {
resp, err = client.Do(req)
return err
})
if err != nil {
return 0, err
}
defer resp.Body.Close()
{
n, err := io.ReadFull(resp.Body, p)
if err != nil {
return 0, err
}
return n, nil
}
type readCloserWrapper struct {
rac ReaderAtCloser
isRemote bool
isSplitCar bool
name string
size int64
}

type readCloserWrapper struct {
rac ReaderAtCloser
isRemote bool
name string
func (r *readCloserWrapper) Size() int64 {
return r.size
}

// when reading print a dot
Expand All @@ -201,8 +46,12 @@ func (r *readCloserWrapper) ReadAt(p []byte, off int64) (n int, err error) {
prefix = icon + azureBG("[READ-INDEX]")
}
// if has suffix .car, then it's a car file
if strings.HasSuffix(r.name, ".car") {
prefix = icon + purpleBG("[READ-CAR]")
if strings.HasSuffix(r.name, ".car") || r.isSplitCar {
if r.isSplitCar {
prefix = icon + azureBG("[READ-SPLIT-CAR]")
} else {
prefix = icon + purpleBG("[READ-CAR]")
}
}
klog.V(5).Infof(prefix+" %s:%d+%d (%s)\n", filepath.Base(r.name), off, len(p), took)
}
Expand Down
2 changes: 1 addition & 1 deletion range-cache.go → range-cache/range-cache.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package rangecache

import (
"context"
Expand Down
2 changes: 1 addition & 1 deletion range-cache_test.go → range-cache/range-cache_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package rangecache

import (
"bytes"
Expand Down
56 changes: 1 addition & 55 deletions split-car-fetcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,60 +100,6 @@ func GetContentSizeWithHeadOrZeroRange(url string) (int64, error) {
return resp.ContentLength, nil
}

type RemoteFileSplitCarReader struct {
commP string
url string
size int64
httpClient *http.Client
}

func NewRemoteFileSplitCarReader(commP string, url string) (*RemoteFileSplitCarReader, error) {
size, err := GetContentSizeWithHeadOrZeroRange(url)
if err != nil {
return nil, fmt.Errorf("failed to get content size from %q: %s", url, err)
}
return &RemoteFileSplitCarReader{
commP: commP,
url: url,
size: size,
httpClient: http.DefaultClient,
}, nil
}

func (fscr *RemoteFileSplitCarReader) ReadAt(p []byte, off int64) (n int, err error) {
req, err := http.NewRequest("GET", fscr.url, nil)
if err != nil {
return 0, err
}
req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", off, off+int64(len(p))-1))
{
req.Header.Set("Connection", "keep-alive")
req.Header.Set("Keep-Alive", "timeout=600")
}
resp, err := fscr.httpClient.Do(req)
if err != nil {
return 0, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusPartialContent {
return 0, fmt.Errorf("GET %q: unexpected status code: %d", fscr.url, resp.StatusCode)
}
n, err = io.ReadFull(resp.Body, p)
if err != nil {
return 0, err
}
return n, nil
}

func (fscr *RemoteFileSplitCarReader) Close() error {
fscr.httpClient.CloseIdleConnections()
return nil
}

func (fscr *RemoteFileSplitCarReader) Size() int64 {
return fscr.size
}

func NewSplitCarReader(
files *carlet.CarPiecesAndMetadata,
readerCreator SplitCarFileReaderCreator,
Expand Down Expand Up @@ -216,7 +162,7 @@ func NewSplitCarReader(
}

// if remote, then the file must be at least as header size + content size:
if _, ok := fi.(*RemoteFileSplitCarReader); ok {
if _, ok := fi.(*HTTPSingleFileRemoteReaderAt); ok {
expectedMinSize := int(cf.HeaderSize) + int(cf.ContentSize)
if size < expectedMinSize {
return nil, fmt.Errorf(
Expand Down
Loading

0 comments on commit 97bc2fd

Please sign in to comment.