Skip to content

Commit

Permalink
Add same speed improvements to split car fetching as for remote car (…
Browse files Browse the repository at this point in the history
…single file).
  • Loading branch information
gagliardetto committed Feb 21, 2024
1 parent cf3aa2c commit 4396069
Show file tree
Hide file tree
Showing 9 changed files with 193 additions and 267 deletions.
8 changes: 4 additions & 4 deletions epoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,8 +334,8 @@ func NewEpochFromConfig(
formattedURL := fmt.Sprintf("http://%s/piece/%s", minerIP, piece.CommP.String())

{
rfspc, err := splitcarfetcher.NewRemoteFileSplitCarReader(
piece.CommP.String(),
rfspc, _, err := splitcarfetcher.NewRemoteHTTPFileAsIoReaderAt(
c.Context,
formattedURL,
)
if err != nil {
Expand Down Expand Up @@ -366,8 +366,8 @@ func NewEpochFromConfig(

{
formattedURL := pieceURL.URI.String()
rfspc, err := splitcarfetcher.NewRemoteFileSplitCarReader(
piece.CommP.String(),
rfspc, _, err := splitcarfetcher.NewRemoteHTTPFileAsIoReaderAt(
c.Context,
formattedURL,
)
if err != nil {
Expand Down
43 changes: 0 additions & 43 deletions http-client.go
Original file line number Diff line number Diff line change
@@ -1,44 +1 @@
package main

import (
"net"
"net/http"
"time"

"github.com/klauspost/compress/gzhttp"
)

var (
defaultMaxIdleConnsPerHost = 100
defaultTimeout = 1000 * time.Second
defaultKeepAlive = 180 * time.Second
)

func newHTTPTransport() *http.Transport {
return &http.Transport{
IdleConnTimeout: time.Minute,
MaxConnsPerHost: defaultMaxIdleConnsPerHost,
MaxIdleConnsPerHost: defaultMaxIdleConnsPerHost,
MaxIdleConns: defaultMaxIdleConnsPerHost,
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: defaultTimeout,
KeepAlive: defaultKeepAlive,
}).DialContext,
ForceAttemptHTTP2: true,
// MaxIdleConns: 100,
TLSHandshakeTimeout: 10 * time.Second,
// ExpectContinueTimeout: 1 * time.Second,
}
}

// newHTTPClient returns a new Client from the provided config.
// Client is safe for concurrent use by multiple goroutines.
func newHTTPClient() *http.Client {
tr := newHTTPTransport()

return &http.Client{
Timeout: defaultTimeout,
Transport: gzhttp.Transport(tr),
}
}
161 changes: 0 additions & 161 deletions http-range.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
package main

import (
"context"
"fmt"
"io"
"net/http"
"path/filepath"
"strings"
"time"

"github.com/goware/urlx"
"k8s.io/klog/v2"
)

Expand All @@ -18,163 +14,6 @@ type ReaderAtCloser interface {
io.Closer
}

func getContentSizeWithHeadOrZeroRange(url string) (int64, error) {
// try sending a HEAD request to the server to get the file size:
resp, err := http.Head(url)
if err != nil {
return 0, err
}
if resp.StatusCode != http.StatusOK {
// try sending a GET request with a zero range to the server to get the file size:
req := &http.Request{
Method: "GET",
URL: resp.Request.URL,
Header: make(http.Header),
}
req.Header.Set("Range", "bytes=0-0")
resp, err = http.DefaultClient.Do(req)
if err != nil {
return 0, err
}
if resp.StatusCode != http.StatusPartialContent {
return 0, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}
// now find the content length:
contentRange := resp.Header.Get("Content-Range")
if contentRange == "" {
return 0, fmt.Errorf("missing Content-Range header")
}
var contentLength int64
_, err := fmt.Sscanf(contentRange, "bytes 0-0/%d", &contentLength)
if err != nil {
return 0, err
}
return contentLength, nil
}
return resp.ContentLength, nil
}

// remoteHTTPFileAsIoReaderAt returns a ReaderAtCloser for a remote file.
// The returned ReaderAtCloser is backed by a http.Client.
func remoteHTTPFileAsIoReaderAt(ctx context.Context, url string) (ReaderAtCloser, int64, error) {
// send a request to the server to get the file size:
contentLength, err := getContentSizeWithHeadOrZeroRange(url)
if err != nil {
return nil, 0, err
}
if contentLength == 0 {
return nil, 0, fmt.Errorf("missing Content-Length/Content-Range header, or file is empty")
}

// Create a cache with a default expiration time of 5 minutes, and which
// purges expired items every 10 minutes
rr := &HTTPSingleFileRemoteReaderAt{
url: url,
contentLength: contentLength,
client: newHTTPClient(),
}
parsedURL, err := urlx.Parse(url)
if err != nil {
return nil, 0, err
}
name := filepath.Base(parsedURL.Path)

rc := NewRangeCache(
contentLength,
name,
func(p []byte, off int64) (n int, err error) {
return remoteReadAt(rr.client, rr.url, p, off)
})
rc.StartCacheGC(ctx, 1*time.Minute)
rr.ca = rc

return rr, contentLength, nil
}

type HTTPSingleFileRemoteReaderAt struct {
url string
contentLength int64
client *http.Client
ca *RangeCache
}

// Close implements io.Closer.
func (r *HTTPSingleFileRemoteReaderAt) Close() error {
r.client.CloseIdleConnections()
return r.ca.Close()
}

func retryExpotentialBackoff(
ctx context.Context,
startDuration time.Duration,
maxRetries int,
fn func() error,
) error {
var err error
for i := 0; i < maxRetries; i++ {
err = fn()
if err == nil {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(startDuration):
startDuration *= 2
}
}
return fmt.Errorf("failed after %d retries; last error: %w", maxRetries, err)
}

func (r *HTTPSingleFileRemoteReaderAt) ReadAt(p []byte, off int64) (n int, err error) {
if off >= r.contentLength {
return 0, io.EOF
}
v, err := r.ca.GetRange(context.Background(), off, int64(len(p)))
if err != nil {
return 0, err
}
n = copy(p, v)
if n < len(p) {
return n, io.ErrUnexpectedEOF
}
return n, nil
}

func remoteReadAt(client *http.Client, url string, p []byte, off int64) (n int, err error) {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return 0, err
}
{
req.Header.Set("Connection", "keep-alive")
req.Header.Set("Keep-Alive", "timeout=600")
}

req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", off, off+int64(len(p))))

var resp *http.Response
err = retryExpotentialBackoff(
context.Background(),
100*time.Millisecond,
3,
func() error {
resp, err = client.Do(req)
return err
})
if err != nil {
return 0, err
}
defer resp.Body.Close()
{
n, err := io.ReadFull(resp.Body, p)
if err != nil {
return 0, err
}
return n, nil
}
}

type readCloserWrapper struct {
rac ReaderAtCloser
isRemote bool
Expand Down
2 changes: 1 addition & 1 deletion range-cache.go → range-cache/range-cache.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package rangecache

import (
"context"
Expand Down
2 changes: 1 addition & 1 deletion range-cache_test.go → range-cache/range-cache_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package rangecache

import (
"bytes"
Expand Down
56 changes: 1 addition & 55 deletions split-car-fetcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,60 +100,6 @@ func GetContentSizeWithHeadOrZeroRange(url string) (int64, error) {
return resp.ContentLength, nil
}

type RemoteFileSplitCarReader struct {
commP string
url string
size int64
httpClient *http.Client
}

func NewRemoteFileSplitCarReader(commP string, url string) (*RemoteFileSplitCarReader, error) {
size, err := GetContentSizeWithHeadOrZeroRange(url)
if err != nil {
return nil, fmt.Errorf("failed to get content size from %q: %s", url, err)
}
return &RemoteFileSplitCarReader{
commP: commP,
url: url,
size: size,
httpClient: http.DefaultClient,
}, nil
}

func (fscr *RemoteFileSplitCarReader) ReadAt(p []byte, off int64) (n int, err error) {
req, err := http.NewRequest("GET", fscr.url, nil)
if err != nil {
return 0, err
}
req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", off, off+int64(len(p))-1))
{
req.Header.Set("Connection", "keep-alive")
req.Header.Set("Keep-Alive", "timeout=600")
}
resp, err := fscr.httpClient.Do(req)
if err != nil {
return 0, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusPartialContent {
return 0, fmt.Errorf("GET %q: unexpected status code: %d", fscr.url, resp.StatusCode)
}
n, err = io.ReadFull(resp.Body, p)
if err != nil {
return 0, err
}
return n, nil
}

func (fscr *RemoteFileSplitCarReader) Close() error {
fscr.httpClient.CloseIdleConnections()
return nil
}

func (fscr *RemoteFileSplitCarReader) Size() int64 {
return fscr.size
}

func NewSplitCarReader(
files *carlet.CarPiecesAndMetadata,
readerCreator SplitCarFileReaderCreator,
Expand Down Expand Up @@ -216,7 +162,7 @@ func NewSplitCarReader(
}

// if remote, then the file must be at least as header size + content size:
if _, ok := fi.(*RemoteFileSplitCarReader); ok {
if _, ok := fi.(*HTTPSingleFileRemoteReaderAt); ok {
expectedMinSize := int(cf.HeaderSize) + int(cf.ContentSize)
if size < expectedMinSize {
return nil, fmt.Errorf(
Expand Down
44 changes: 44 additions & 0 deletions split-car-fetcher/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package splitcarfetcher

import (
"net"
"net/http"
"time"

"github.com/klauspost/compress/gzhttp"
)

var (
DefaultMaxIdleConnsPerHost = 100
DefaultTimeout = 1000 * time.Second
DefaultKeepAlive = 180 * time.Second
)

func NewHTTPTransport() *http.Transport {
return &http.Transport{
IdleConnTimeout: time.Minute,
MaxConnsPerHost: DefaultMaxIdleConnsPerHost,
MaxIdleConnsPerHost: DefaultMaxIdleConnsPerHost,
MaxIdleConns: DefaultMaxIdleConnsPerHost,
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: DefaultTimeout,
KeepAlive: DefaultKeepAlive,
}).DialContext,
ForceAttemptHTTP2: true,
// MaxIdleConns: 100,
TLSHandshakeTimeout: 10 * time.Second,
// ExpectContinueTimeout: 1 * time.Second,
}
}

// NewHTTPClient returns a new Client from the provided config.
// Client is safe for concurrent use by multiple goroutines.
func NewHTTPClient() *http.Client {
tr := NewHTTPTransport()

return &http.Client{
Timeout: DefaultTimeout,
Transport: gzhttp.Transport(tr),
}
}
Loading

0 comments on commit 4396069

Please sign in to comment.