Skip to content

Commit

Permalink
Immutable tiered storage (#962)
Browse files Browse the repository at this point in the history
First version of the immutable tiered storage
  • Loading branch information
itaiad200 authored Dec 6, 2020
1 parent 99e89ec commit e88f3c7
Show file tree
Hide file tree
Showing 11 changed files with 885 additions and 1 deletion.
18 changes: 18 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"strings"
"time"

"github.com/treeverse/lakefs/pyramid"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
Expand All @@ -28,6 +30,10 @@ const (
DefaultBlockStoreS3StreamingChunkSize = 2 << 19 // 1MiB by default per chunk
DefaultBlockStoreS3StreamingChunkTimeout = time.Second * 1 // or 1 seconds, whatever comes first

DefaultDiskAllocatedBytes = 1 * 1024 * 1024 * 1024
DefaultDiskBaseDir = "~/lakefs/local_tier"
DefaultDiskBlockStoragePrefix = "_lakefs"

DefaultBlockStoreGSS3Endpoint = "https://storage.googleapis.com"

DefaultAuthCacheEnabled = true
Expand Down Expand Up @@ -88,6 +94,10 @@ func setDefaults() {
viper.SetDefault("blockstore.s3.streaming_chunk_timeout", DefaultBlockStoreS3StreamingChunkTimeout)
viper.SetDefault("blockstore.s3.max_retries", DefaultS3MaxRetries)

viper.SetDefault("disk.allocated_bytes", DefaultDiskAllocatedBytes)
viper.SetDefault("disk.base_dir", DefaultDiskBaseDir)
viper.SetDefault("disk.block_storage_prefix", DefaultDiskBlockStoragePrefix)

viper.SetDefault("gateways.s3.domain_name", DefaultS3GatewayDomainName)
viper.SetDefault("gateways.s3.region", DefaultS3GatewayRegion)

Expand All @@ -107,6 +117,14 @@ func (c *Config) GetDatabaseParams() dbparams.Database {
}
}

func (c *Config) GetLocalDiskParams() pyramid.Params {
return pyramid.Params{
AllocatedBytes: viper.GetInt64("disk.allocated_bytes"),
BaseDir: viper.GetString("disk.base_dir"),
BlockStoragePrefix: viper.GetString("disk.block_storage_prefix"),
}
}

func (c *Config) GetCatalogerType() string {
return viper.GetString("cataloger.type")
}
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ require (
github.com/gofrs/uuid v3.3.0+incompatible // indirect
github.com/golang-migrate/migrate/v4 v4.12.2
github.com/golang/protobuf v1.4.2
github.com/golang/snappy v0.0.2-0.20190904063534-ff6b7dc882cf // indirect
github.com/golangci/golangci-lint v1.30.0
github.com/google/uuid v1.1.1
github.com/gopherjs/gopherjs v0.0.0-20200217142428-fce0ec30dd00 // indirect
Expand Down Expand Up @@ -72,6 +73,7 @@ require (
github.com/stretchr/testify v1.6.1
github.com/thanhpk/randstr v1.0.4
github.com/tidwall/pretty v1.0.1 // indirect
github.com/treeverse/golang-lru v0.6.1
github.com/tsenart/vegeta/v12 v12.8.3
github.com/vbauerster/mpb/v5 v5.3.0
github.com/xitongsys/parquet-go v1.5.2
Expand Down
7 changes: 6 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ github.com/Microsoft/go-winio v0.4.14 h1:+hMXMk01us9KgxGb7ftKQt2Xpf5hH/yky+TDA+q
github.com/Microsoft/go-winio v0.4.14/go.mod h1:qXqCSQ3Xa7+6tgxaGTIe4Kpcdsi+P8jBhyzoq1bpyYA=
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEVMRuU21PR1EtLVZJmdB18Gu3Rw=
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8D7ML55dXQrVaamCz2vxCfdQBasLZfHKk=
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/OpenPeeDeeP/depguard v1.0.1 h1:VlW4R6jmBIv3/u1JNlawEvJMM4J+dPORPaZasQee8Us=
github.com/OpenPeeDeeP/depguard v1.0.1/go.mod h1:xsIw86fROiiwelg+jB2uM9PiKihMMmUx/1V+TNhjQvM=
Expand Down Expand Up @@ -449,6 +450,8 @@ github.com/golang/snappy v0.0.0-20170215233205-553a64147049/go.mod h1:/XxbfmMg8l
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.2-0.20190904063534-ff6b7dc882cf h1:gFVkHXmVAhEbxZVDln5V9GKrLaluNoFHDbrZwAWZgws=
github.com/golang/snappy v0.0.2-0.20190904063534-ff6b7dc882cf/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golangci/check v0.0.0-20180506172741-cfe4005ccda2 h1:23T5iq8rbUYlhpt5DB4XJkc6BU31uODLD1o1gKvZmD0=
github.com/golangci/check v0.0.0-20180506172741-cfe4005ccda2/go.mod h1:k9Qvh+8juN+UKMCS/3jFtGICgW8O96FVaZsaxdzDkR4=
github.com/golangci/dupl v0.0.0-20180902072040-3e9179ac440a h1:w8hkcTqaFpzKqonE9uMCefW1WDie15eSP/4MssdenaM=
Expand Down Expand Up @@ -600,7 +603,6 @@ github.com/jackc/pgconn v1.5.0/go.mod h1:QeD3lBfpTFe8WUnPZWN5KY/mB8FGMIYRdd8P8Jr
github.com/jackc/pgconn v1.5.1-0.20200601181101-fa742c524853/go.mod h1:QeD3lBfpTFe8WUnPZWN5KY/mB8FGMIYRdd8P8Jr0fAI=
github.com/jackc/pgconn v1.6.4 h1:S7T6cx5o2OqmxdHaXLH1ZeD1SbI8jBznyYE9Ec0RCQ8=
github.com/jackc/pgconn v1.6.4/go.mod h1:w2pne1C2tZgP+TvjqLpOigGzNqjBgQW9dUw/4Chex78=
github.com/jackc/pgconn v1.7.2 h1:195tt17jkjy+FrFlY0pgyrul5kRLb7BGXY3JTrNxeXU=
github.com/jackc/pgerrcode v0.0.0-20190803225404-afa3381909a6 h1:geJ1mgTGd0WQo67wEd+H4OjFG5uA2e3cEBz9D5+pftU=
github.com/jackc/pgerrcode v0.0.0-20190803225404-afa3381909a6/go.mod h1:a/s9Lp5W7n/DD0VrVoyJ00FbP2ytTPDVOivvn2bMlds=
github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE=
Expand Down Expand Up @@ -1025,6 +1027,7 @@ github.com/sonatard/noctx v0.0.1/go.mod h1:9D2D/EoULe8Yy2joDHJj7bv3sZoq9AaSb8B4l
github.com/sony/gobreaker v0.4.1/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY=
github.com/sourcegraph/go-diff v0.5.3 h1:lhIKJ2nXLZZ+AfbHpYxTn0pXpNTTui0DX7DO3xeb1Zs=
github.com/sourcegraph/go-diff v0.5.3/go.mod h1:v9JDtjCE4HHHCZGId75rg8gkKKa98RVjBcBGsVmMmak=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
github.com/spf13/afero v1.2.1/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk=
Expand Down Expand Up @@ -1096,6 +1099,8 @@ github.com/tommy-muehle/go-mnd v1.3.1-0.20200224220436-e6f9a994e8fa h1:RC4maTWLK
github.com/tommy-muehle/go-mnd v1.3.1-0.20200224220436-e6f9a994e8fa/go.mod h1:dSUh0FtTP8VhvkL1S+gUR1OKd9ZnSaozuI6r3m6wOig=
github.com/toqueteos/webbrowser v1.2.0 h1:tVP/gpK69Fx+qMJKsLE7TD8LuGWPnEV71wBN9rrstGQ=
github.com/toqueteos/webbrowser v1.2.0/go.mod h1:XWoZq4cyp9WeUeak7w7LXRUQf1F1ATJMir8RTqb4ayM=
github.com/treeverse/golang-lru v0.6.1 h1:G7nAh8gqSChNEwRpC4MopsIhKvE5cJVQLgKsZhr6cmg=
github.com/treeverse/golang-lru v0.6.1/go.mod h1:q66C/bXLqkppmXQPBfsdbuzj1ELfQeOc4/SMS3SmDrc=
github.com/tsenart/go-tsz v0.0.0-20180814232043-cdeb9e1e981e/go.mod h1:SWZznP1z5Ki7hDT2ioqiFKEse8K9tU2OUvaRI0NeGQo=
github.com/tsenart/vegeta/v12 v12.8.3 h1:UEsDkSrEJojMKW/xr7KUv4H/bYykX+V48KCsPZPqEfk=
github.com/tsenart/vegeta/v12 v12.8.3/go.mod h1:ZiJtwLn/9M4fTPdMY7bdbIeyNeFVE8/AHbWFqCsUuho=
Expand Down
39 changes: 39 additions & 0 deletions pyramid/eviction.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package pyramid

import (
"fmt"

lru "github.com/treeverse/golang-lru"
"github.com/treeverse/golang-lru/simplelru"
)

// eviction is an abstraction of the eviction control for easy testing
type eviction interface {
touch(rPath relativePath)
store(rPath relativePath, filesize int64) int
}

type lruSizeEviction struct {
cache simplelru.LRUCache
}

func newLRUSizeEviction(capacity int64, evict func(rPath relativePath)) (eviction, error) {
cache, err := lru.NewWithEvict(capacity, func(key interface{}, _ interface{}, _ int64) {
evict(key.(relativePath))
})
if err != nil {
return nil, fmt.Errorf("creating cache: %w", err)
}
return &lruSizeEviction{
cache: cache,
}, nil
}

func (am *lruSizeEviction) touch(rPath relativePath) {
// update last access time, value is meaningless
am.cache.Get(rPath)
}

func (am *lruSizeEviction) store(rPath relativePath, filesize int64) int {
return am.cache.Add(rPath, nil, filesize)
}
65 changes: 65 additions & 0 deletions pyramid/file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package pyramid

import (
"fmt"
"os"
)

// File is pyramid wrapper for os.file that triggers pyramid hooks for file actions.
type File struct {
fh *os.File

closed bool
persisted bool
store func(string) error
}

func (f *File) Read(p []byte) (n int, err error) {
return f.fh.Read(p)
}

func (f *File) ReadAt(p []byte, off int64) (n int, err error) {
return f.fh.ReadAt(p, off)
}

func (f *File) Write(p []byte) (n int, err error) {
return f.fh.Write(p)
}

func (f *File) Stat() (os.FileInfo, error) {
return f.fh.Stat()
}

func (f *File) Sync() error {
return f.fh.Sync()
}

func (f *File) Close() error {
f.closed = true
return f.fh.Close()
}

var (
errAlreadyPersisted = fmt.Errorf("file is already persisted")
errFileNotClosed = fmt.Errorf("file isn't closed")
)

// Store copies the closed file to all tiers of the pyramid.
func (f *File) Store(filename string) error {
if err := validateFilename(filename); err != nil {
return err
}

if f.persisted {
return errAlreadyPersisted
}
if !f.closed {
return errFileNotClosed
}

err := f.store(filename)
if err == nil {
f.persisted = true
}
return err
}
134 changes: 134 additions & 0 deletions pyramid/file_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package pyramid

import (
"io/ioutil"
"os"
"path"
"testing"

"github.com/stretchr/testify/require"

"github.com/google/uuid"
)

func TestPyramidWriteFile(t *testing.T) {
filename := uuid.New().String()

fh, err := ioutil.TempFile("", filename)
if err != nil {
panic(err)
}

filepath := fh.Name()
defer os.Remove(filepath)

storeCalled := false
sut := File{
fh: fh,
store: func(string) error {
storeCalled = true
return nil
},
}

content := "some content to write to file"
n, err := sut.Write([]byte(content))
require.Equal(t, len(content), n)
require.NoError(t, err)
require.NoError(t, sut.Sync())

_, err = sut.Stat()
require.NoError(t, err)

require.NoError(t, sut.Close())
require.NoError(t, sut.Store(filename))

require.True(t, storeCalled)
}

func TestWriteValidate(t *testing.T) {
filename := uuid.New().String()
fh, err := ioutil.TempFile("", filename)
if err != nil {
panic(err)
}

filepath := fh.Name()
defer os.Remove(filepath)

storeCalled := false

sut := File{
fh: fh,
store: func(string) error {
storeCalled = true
return nil
},
}

content := "some content to write to file"
n, err := sut.Write([]byte(content))
require.Equal(t, len(content), n)
require.NoError(t, err)

require.NoError(t, sut.Close())
require.Error(t, sut.Store("workspace"+string(os.PathSeparator)))
require.False(t, storeCalled)

require.Error(t, sut.Close())
require.NoError(t, sut.Store("validfilename"))
require.Error(t, sut.Store("validfilename"))
}

func TestPyramidReadFile(t *testing.T) {
filename := uuid.New().String()
filepath := path.Join("/tmp", filename)
content := "some content to write to file"
if err := ioutil.WriteFile(filepath, []byte(content), os.ModePerm); err != nil {
panic(err)
}
defer os.Remove(filepath)

mockEv := newMockEviction()

fh, err := os.Open(filepath)
if err != nil {
panic(err)
}

sut := ROFile{
fh: fh,
eviction: mockEv,
rPath: relativePath(filename),
}

_, err = sut.Stat()
require.NoError(t, err)

bytes := make([]byte, len(content))
n, err := sut.Read(bytes)
require.NoError(t, err)
require.Equal(t, len(content), n)
require.Equal(t, content, string(bytes))
require.NoError(t, sut.Close())

require.Equal(t, 2, mockEv.touchedTimes[relativePath(filename)])
}

type mockEviction struct {
touchedTimes map[relativePath]int
}

func newMockEviction() *mockEviction {
return &mockEviction{
touchedTimes: map[relativePath]int{},
}
}

func (me *mockEviction) touch(rPath relativePath) {
me.touchedTimes[rPath]++
}

func (me *mockEviction) store(_ relativePath, _ int64) int {
return 0
}
15 changes: 15 additions & 0 deletions pyramid/params.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package pyramid

// Params is pyramid.FS params that are identical for all file-systems
// in a single lakeFS instance.
type Params struct {
// AllocatedBytes is the disk size in bytes that lakeFS is allowed to use.
AllocatedBytes int64

// BaseDir is the local directory where lakeFS app is storing the files.
BaseDir string

// BlockStoragePrefix is the prefix prepended to lakeFS metadata files in
// the blockstore.
BlockStoragePrefix string
}
14 changes: 14 additions & 0 deletions pyramid/pyramid.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package pyramid

// FS is pyramid abstraction of filesystem where the persistent storage-layer is the block storage.
// Files on the local disk are transient and might be cleaned up by the eviction policy.
// File structure under a namespace and namespace itself are flat (no directories).
type FS interface {
// Create creates a new file in the FS.
// It will only be persistent after the returned file is stored.
Create(namespace string) (*File, error)

// Open finds the referenced file and returns its read-only File.
// If file isn't in the local disk, it is fetched from the block storage.
Open(namespace, filename string) (*ROFile, error)
}
Loading

0 comments on commit e88f3c7

Please sign in to comment.