diff --git a/docker-compose.yaml b/docker-compose.yaml index f3e11ead..d0339dfb 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -8,7 +8,7 @@ services: networks: - storage volumes: - - ./temp/esdata:/usr/share/elasticsearch/data + - esdata:/usr/share/elasticsearch/data postgres: ports: @@ -21,7 +21,11 @@ services: networks: - storage volumes: - - ./temp/pgdata:/var/lib/postgresql/data + - pgdata:/var/lib/postgresql/data + +volumes: + pgdata: + esdata: networks: storage: \ No newline at end of file diff --git a/internal/store/postgres/asset_repository.go b/internal/store/postgres/asset_repository.go index c2489c9d..f0c36c9d 100644 --- a/internal/store/postgres/asset_repository.go +++ b/internal/store/postgres/asset_repository.go @@ -279,35 +279,45 @@ func (r *AssetRepository) getByVersion( // It updates if asset does exist. // Checking existence is done using "urn", "type", and "service" fields. func (r *AssetRepository) Upsert(ctx context.Context, ast *asset.Asset) (string, error) { - fetchedAsset, err := r.GetByURN(ctx, ast.URN) - if errors.As(err, new(asset.NotFoundError)) { - err = nil - } - if err != nil { - return "", fmt.Errorf("error getting asset by URN: %w", err) - } + var id string + err := r.client.RunWithinTx(ctx, func(tx *sqlx.Tx) error { + fetchedAsset, err := r.GetByURN(ctx, ast.URN) + if errors.As(err, new(asset.NotFoundError)) { + err = nil + } + if err != nil { + return fmt.Errorf("error getting asset by URN: %w", err) + } + + if fetchedAsset.ID == "" { + // insert flow + id, err = r.insert(ctx, ast) + if err != nil { + return fmt.Errorf("error inserting asset to DB: %w", err) + } + return nil + } - if fetchedAsset.ID == "" { - // insert flow - id, err := r.insert(ctx, ast) + // update flow + changelog, err := fetchedAsset.Diff(ast) if err != nil { - return "", fmt.Errorf("error inserting asset to DB: %w", err) + return fmt.Errorf("error diffing two assets: %w", err) } - return id, nil - } - // update flow - changelog, err := fetchedAsset.Diff(ast) - if err != nil { - return "", fmt.Errorf("error diffing two assets: %w", err) - } + err = r.update(ctx, fetchedAsset.ID, ast, &fetchedAsset, changelog) + if err != nil { + return fmt.Errorf("error updating asset to DB: %w", err) + } + id = fetchedAsset.ID + + return nil + }) - err = r.update(ctx, fetchedAsset.ID, ast, &fetchedAsset, changelog) if err != nil { - return "", fmt.Errorf("error updating asset to DB: %w", err) + return "", err } - return fetchedAsset.ID, nil + return id, nil } // DeleteByID removes asset using its ID