Skip to content

Commit

Permalink
Different dbpool for branch-locker (#1447)
Browse files Browse the repository at this point in the history
  • Loading branch information
itaiad200 authored Feb 10, 2021
1 parent 30c26bf commit a981183
Show file tree
Hide file tree
Showing 11 changed files with 173 additions and 19 deletions.
114 changes: 114 additions & 0 deletions .github/workflows/benchmark.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
name: Benchmark
on:
push:
branches:
- master

jobs:
deploy-executor-image:
name: Build and push benchmark executor Docker image
runs-on: ubuntu-20.04
steps:

- name: Checkout
uses: actions/checkout@v2

- name: Set up Go 1.14
uses: actions/setup-go@v1
with:
go-version: 1.14
id: go

- name: Generate code
run: |
make gen
- name: Configure AWS Credentials
uses: aws-actions/configure-aws-credentials@v1
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: us-east-1

- name: Extract version
shell: bash
run: echo "::set-output name=tag::sha-$(git rev-parse --short HEAD | sed s/^v//g)"
id: version

- name: Login to Amazon ECR
id: login-ecr
uses: aws-actions/amazon-ecr-login@v1

- name: Build and push to Amazon ECR
env:
ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }}
ECR_REPOSITORY_BENCHMARK_EXECUTOR: benchmark-executor
run: |
set +e
describe_image="$( aws ecr describe-images --repository-name $ECR_REGISTRY --image-ids imageTag=${{ steps.version.outputs.tag }})"
if [ $? -eq 0 ]; then
echo "Image exists"
else
echo "Image doesn't exist"
docker build --target benchmark-executor -t $ECR_REGISTRY/$ECR_REPOSITORY_BENCHMARK_EXECUTOR:${{ steps.version.outputs.tag }} --build-arg VERSION=${{ steps.version.outputs.tag }} .
docker push $ECR_REGISTRY/$ECR_REPOSITORY_BENCHMARK_EXECUTOR:${{ steps.version.outputs.tag }}
fi
benchmark-system:
name: Measure system benchmark
needs: deploy-executor-image
runs-on: ubuntu-20.04
steps:
- name: Check-out code
uses: actions/checkout@v2
- name: Setup go
uses: actions/setup-go@v2
with:
go-version: 1.14
id: go
- name: Extract version
shell: bash
run: echo "::set-output name=tag::sha-$(git rev-parse --short HEAD | sed s/^v//g)"
id: version
- name: Configure AWS Credentials
uses: aws-actions/configure-aws-credentials@v1
with:
aws-access-key-id: ${{ secrets.BENCHMARK_AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.BENCHMARK_AWS_SECRET_ACCESS_KEY }}
aws-region: us-east-1
- name: Setup Terraform
uses: hashicorp/setup-terraform@v1
with:
terraform_version: 0.13.4
- name: Terraform init
run: |
cd benchmarks/terraform/
terraform init
- name: Terraform apply
run: |
cd benchmarks/terraform/
terraform apply --auto-approve -var="dockerReg=${{ secrets.AWS_ACCOUNT_ID }}.dkr.ecr.us-east-1.amazonaws.com" -var="tag=${{ steps.version.outputs.tag }}" \
-var="build=${{ github.run_number }}" -var="grafana-username=${{ secrets.GRAFANA_USERNAME }}" -var="grafana-password=${{ secrets.GRAFANA_PASSWORD }}"
- name: 'Terraform Output'
id: terraform
uses: hashicorp/terraform-github-actions@master
with:
tf_actions_version: 0.13.4
tf_actions_subcommand: 'output'
tf_actions_working_dir: 'benchmarks/terraform'
- name: Wait until executor finishes
run: |
CLUSTER_ARN=$(echo '${{ steps.terraform.outputs.tf_actions_output }}' | jq -r '.cluster_arn.value')
EXECUTOR_FAMILY=$(echo '${{ steps.terraform.outputs.tf_actions_output }}' | jq -r '.executor_family.value')
until [ $(aws ecs list-tasks --cluster=$CLUSTER_ARN --desired-status=STOPPED --family=$EXECUTOR_FAMILY | jq '.taskArns | length') -ge 1 ] ; do
echo benchmark-executor still running, sleeping...
sleep 20
done
- name: Terraform destory
if: ${{ always() }}
run: |
cd benchmarks/terraform/
# keep the logs
terraform state rm aws_cloudwatch_log_group.benchmark
terraform destroy --auto-approve
5 changes: 4 additions & 1 deletion api/serve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,10 @@ func setupHandler(t testing.TB, blockstoreType string, opts ...testutil.GetDBOpt
cfg.Override(func(configurator config.Configurator) {
configurator.SetDefault(config.BlockstoreTypeKey, mem.BlockstoreType)
})
cataloger, err := catalog.NewCataloger(conn, cfg)
cataloger, err := catalog.NewCataloger(catalog.Config{
Config: cfg,
DB: conn,
})
testutil.MustDo(t, "build cataloger", err)

authService := auth.NewDBAuthService(conn, crypt.NewSecretStore([]byte("some secret")), authparams.ServiceCache{
Expand Down
22 changes: 16 additions & 6 deletions catalog/entry_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,18 @@ const (
MetaRangeFSName = "meta-range"
)

func NewEntryCatalog(cfg *config.Config, db db.Database) (*EntryCatalog, error) {
tierFSParams, err := cfg.GetCommittedTierFSParams()
type Config struct {
Config *config.Config
DB db.Database
LockDB db.Database
}

func NewEntryCatalog(cfg Config) (*EntryCatalog, error) {
if cfg.LockDB == nil {
cfg.LockDB = cfg.DB
}

tierFSParams, err := cfg.Config.GetCommittedTierFSParams()
if err != nil {
return nil, fmt.Errorf("configure tiered FS for committed: %w", err)
}
Expand Down Expand Up @@ -120,7 +130,7 @@ func NewEntryCatalog(cfg *config.Config, db db.Database) (*EntryCatalog, error)
sstableManager := sstable.NewPebbleSSTableRangeManager(pebbleSSTableCache, rangeFS, hashAlg)
sstableMetaManager := sstable.NewPebbleSSTableRangeManager(pebbleSSTableCache, metaRangeFS, hashAlg)
sstableMetaRangeManager, err := committed.NewMetaRangeManager(
*cfg.GetCommittedParams(),
*cfg.Config.GetCommittedParams(),
// TODO(ariels): Use separate range managers for metaranges and ranges
sstableMetaManager,
sstableManager,
Expand All @@ -130,9 +140,9 @@ func NewEntryCatalog(cfg *config.Config, db db.Database) (*EntryCatalog, error)
}
committedManager := committed.NewCommittedManager(sstableMetaRangeManager)

stagingManager := staging.NewManager(db)
refManager := ref.NewPGRefManager(db, ident.NewHexAddressProvider())
branchLocker := ref.NewBranchLocker(db)
stagingManager := staging.NewManager(cfg.DB)
refManager := ref.NewPGRefManager(cfg.DB, ident.NewHexAddressProvider())
branchLocker := ref.NewBranchLocker(cfg.LockDB)
store := graveler.NewGraveler(branchLocker, committedManager, stagingManager, refManager)
entryCatalog := &EntryCatalog{
BlockAdapter: tierFSParams.Adapter,
Expand Down
6 changes: 2 additions & 4 deletions catalog/rocks_cataloger.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"fmt"
"strings"

"github.com/treeverse/lakefs/config"
"github.com/treeverse/lakefs/db"
"github.com/treeverse/lakefs/graveler"
"github.com/treeverse/lakefs/logging"
"google.golang.org/protobuf/types/known/timestamppb"
Expand All @@ -28,8 +26,8 @@ const (

var ErrUnknownDiffType = errors.New("unknown graveler difference type")

func NewCataloger(db db.Database, cfg *config.Config) (Cataloger, error) {
entryCatalog, err := NewEntryCatalog(cfg, db)
func NewCataloger(cfg Config) (Cataloger, error) {
entryCatalog, err := NewEntryCatalog(cfg)
if err != nil {
return nil, err
}
Expand Down
8 changes: 7 additions & 1 deletion cmd/lakefs-loadtest/cmd/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,15 @@ var entryCmd = &cobra.Command{
ctx := context.Background()
database := connectToDB(connectionString)
defer database.Close()
lockDB := connectToDB(connectionString)
defer lockDB.Close()

conf := config.NewConfig()
c, err := catalog.NewCataloger(database, conf)
c, err := catalog.NewCataloger(catalog.Config{
Config: conf,
DB: database,
LockDB: lockDB,
})
if err != nil {
fmt.Printf("Cannot create cataloger: %s\n", err)
os.Exit(1)
Expand Down
5 changes: 4 additions & 1 deletion cmd/lakefs/cmd/diagnose.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ var diagnoseCmd = &cobra.Command{
if err != nil {
logger.WithError(err).Fatal("Failed to create block adapter")
}
cataloger, err := catalog.NewCataloger(dbPool, cfg)
cataloger, err := catalog.NewCataloger(catalog.Config{
Config: cfg,
DB: dbPool,
})
if err != nil {
logger.WithError(err).Fatal("Failed to create cataloger")
}
Expand Down
5 changes: 4 additions & 1 deletion cmd/lakefs/cmd/diagnostics.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ var diagnosticsCmd = &cobra.Command{
if err != nil {
log.Printf("Failed to create block adapter: %s", err)
}
cataloger, err := catalog.NewCataloger(dbPool, cfg)
cataloger, err := catalog.NewCataloger(catalog.Config{
Config: cfg,
DB: dbPool,
})
if err != nil {
log.Printf("Failed to create cataloger: %s", err)
}
Expand Down
8 changes: 6 additions & 2 deletions cmd/lakefs/cmd/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,18 @@ func runImport(cmd *cobra.Command, args []string) (statusCode int) {
dbPool := db.BuildDatabaseConnection(cfg.GetDatabaseParams())
defer dbPool.Close()

cataloger, err := catalog.NewCataloger(dbPool, cfg)
catalogCfg := catalog.Config{
Config: cfg,
DB: dbPool,
}
cataloger, err := catalog.NewCataloger(catalogCfg)
if err != nil {
fmt.Printf("Failed to create cataloger: %s\n", err)
return 1
}
defer func() { _ = cataloger.Close() }()

entryCataloger, err := catalog.NewEntryCatalog(cfg, dbPool)
entryCataloger, err := catalog.NewEntryCatalog(catalogCfg)
if err != nil {
fmt.Printf("Failed to build entry catalog: %s\n", err)
return 1
Expand Down
9 changes: 8 additions & 1 deletion cmd/lakefs/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,17 @@ var runCmd = &cobra.Command{
dbPool := db.BuildDatabaseConnection(dbParams)
defer dbPool.Close()

lockdbPool := db.BuildDatabaseConnection(dbParams)
defer lockdbPool.Close()

registerPrometheusCollector(dbPool)
migrator := db.NewDatabaseMigrator(dbParams)

cataloger, err := catalog.NewCataloger(dbPool, cfg)
cataloger, err := catalog.NewCataloger(catalog.Config{
Config: cfg,
DB: dbPool,
LockDB: lockdbPool,
})
if err != nil {
logger.WithError(err).Fatal("failed to create cataloger")
}
Expand Down
5 changes: 4 additions & 1 deletion gateway/playback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,10 @@ func getBasicHandler(t *testing.T, authService *simulator.PlayBackMockConf) (htt
}

conn, _ := testutil.GetDB(t, databaseURI)
cataloger, err := catalog.NewCataloger(conn, config.NewConfig())
cataloger, err := catalog.NewCataloger(catalog.Config{
Config: config.NewConfig(),
DB: conn,
})
testutil.MustDo(t, "build cataloger", err)
multipartsTracker := multiparts.NewTracker(conn)

Expand Down
5 changes: 4 additions & 1 deletion loadtest/local_load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@ func TestLocalLoad(t *testing.T) {
blockstoreType = "mem"
}
blockAdapter := testutil.NewBlockAdapterByType(t, &block.NoOpTranslator{}, blockstoreType)
cataloger, err := catalog.NewCataloger(conn, config.NewConfig())
cataloger, err := catalog.NewCataloger(catalog.Config{
Config: config.NewConfig(),
DB: conn,
})
testutil.MustDo(t, "build cataloger", err)
authService := auth.NewDBAuthService(conn, crypt.NewSecretStore([]byte("some secret")), authparams.ServiceCache{})
meta := auth.NewDBMetadataManager("dev", conn)
Expand Down

0 comments on commit a981183

Please sign in to comment.