Skip to content

Commit

Permalink
UI: Allow writing to lakeFS from duckdb-wasm (#6044)
Browse files Browse the repository at this point in the history
  • Loading branch information
ozkatz authored Jun 11, 2023
1 parent 176de75 commit b8a7c49
Show file tree
Hide file tree
Showing 9 changed files with 135 additions and 58 deletions.
13 changes: 13 additions & 0 deletions cmd/lakefs/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,18 @@ var runCmd = &cobra.Command{
}
}

// setup authenticator for s3 gateway to also support swagger auth
apiAuthenticator, err := api.GenericAuthMiddleware(
logger.WithField("service", "s3_gateway"),
middlewareAuthenticator,
authService,
&cfg.Auth.OIDC,
&cfg.Auth.CookieAuthVerification,
)
if err != nil {
logger.WithError(err).Fatal("could not initialize authenticator for S3 gateway")
}

s3gatewayHandler := gateway.NewHandler(
cfg.Gateways.S3.Region,
c,
Expand All @@ -289,6 +301,7 @@ var runCmd = &cobra.Command{
cfg.Logging.AuditLogLevel,
cfg.Logging.TraceRequestHeaders,
)
s3gatewayHandler = apiAuthenticator(s3gatewayHandler)

bufferedCollector.Start(ctx)
defer bufferedCollector.Close()
Expand Down
22 changes: 22 additions & 0 deletions pkg/api/auth_middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,28 @@ func extractSecurityRequirements(router routers.Router, r *http.Request) (openap
return *route.Operation.Security, nil
}

func GenericAuthMiddleware(logger logging.Logger, authenticator auth.Authenticator, authService auth.Service, oidcConfig *config.OIDC, cookieAuthconfig *config.CookieAuthVerification) (func(next http.Handler) http.Handler, error) {
swagger, err := GetSwagger()
if err != nil {
return nil, err
}
sessionStore := sessions.NewCookieStore(authService.SecretStore().SharedSecret())
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
user, err := checkSecurityRequirements(r, swagger.Security, logger, authenticator, authService, sessionStore, oidcConfig, cookieAuthconfig)
if err != nil {
writeError(w, r, http.StatusUnauthorized, err)
return
}
if user != nil {
ctx := logging.AddFields(r.Context(), logging.Fields{logging.UserFieldKey: user.Username})
r = r.WithContext(auth.WithUser(ctx, user))
}
next.ServeHTTP(w, r)
})
}, nil
}

func AuthMiddleware(logger logging.Logger, swagger *openapi3.Swagger, authenticator auth.Authenticator, authService auth.Service, sessionStore sessions.Store, oidcConfig *config.OIDC, cookieAuthconfig *config.CookieAuthVerification) func(next http.Handler) http.Handler {
router, err := legacy.NewRouter(swagger)
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion pkg/api/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ func Serve(
if err != nil {
panic(err)
}

sessionStore := sessions.NewCookieStore(authService.SecretStore().SharedSecret())
r := chi.NewRouter()
apiRouter := r.With(
Expand Down
21 changes: 16 additions & 5 deletions pkg/gateway/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"strings"

"github.com/treeverse/lakefs/pkg/auth"
"github.com/treeverse/lakefs/pkg/auth/model"
"github.com/treeverse/lakefs/pkg/block"
"github.com/treeverse/lakefs/pkg/catalog"
gatewayerrors "github.com/treeverse/lakefs/pkg/gateway/errors"
Expand All @@ -27,7 +26,6 @@ import (
type contextKey string

const (
ContextKeyUser contextKey = "user"
ContextKeyRepositoryID contextKey = "repository_id"
ContextKeyRepository contextKey = "repository"
ContextKeyAuthContext contextKey = "auth_context"
Expand Down Expand Up @@ -241,8 +239,17 @@ func PathOperationHandler(sc *ServerContext, handler operations.PathOperationHan
func authorize(w http.ResponseWriter, req *http.Request, authService auth.GatewayService, perms permissions.Node) *operations.AuthorizedOperation {
ctx := req.Context()
o := ctx.Value(ContextKeyOperation).(*operations.Operation)
username := ctx.Value(ContextKeyUser).(*model.User).Username
authContext := ctx.Value(ContextKeyAuthContext).(sig.SigContext)
user, err := auth.GetUser(ctx)
if err != nil {
o.Log(req).WithError(err).Error("failed to authorize, get user")
_ = o.EncodeError(w, req, gatewayerrors.ErrInternalError.ToAPIErr())
return nil
}
username := user.Username
var accessKeyID string
if authContext, ok := ctx.Value(ContextKeyAuthContext).(sig.SigContext); ok {
accessKeyID = authContext.GetAccessKeyID()
}

if len(perms.Nodes) == 0 && len(perms.Permission.Action) == 0 {
// has not provided required permissions
Expand All @@ -262,7 +269,11 @@ func authorize(w http.ResponseWriter, req *http.Request, authService auth.Gatewa
return nil
}
if authResp.Error != nil || !authResp.Allowed {
o.Log(req).WithError(authResp.Error).WithField("key", authContext.GetAccessKeyID()).Warn("no permission")
l := o.Log(req).WithError(authResp.Error)
if accessKeyID != "" {
l = l.WithField("key", accessKeyID)
}
l.Warn("no permission")
_ = o.EncodeError(w, req, gatewayerrors.ErrAccessDenied.ToAPIErr())
return nil
}
Expand Down
15 changes: 11 additions & 4 deletions pkg/gateway/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"time"

"github.com/treeverse/lakefs/pkg/auth"
"github.com/treeverse/lakefs/pkg/auth/model"
"github.com/treeverse/lakefs/pkg/catalog"
gatewayerrors "github.com/treeverse/lakefs/pkg/gateway/errors"
"github.com/treeverse/lakefs/pkg/gateway/operations"
Expand All @@ -26,6 +25,13 @@ import (
func AuthenticationHandler(authService auth.GatewayService, next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
user, err := auth.GetUser(ctx)
if err == nil {
ctx = logging.AddFields(ctx, logging.Fields{logging.UserFieldKey: user.Username})
req = req.WithContext(auth.WithUser(ctx, user))
next.ServeHTTP(w, req)
return
}
o := ctx.Value(ContextKeyOperation).(*operations.Operation)
authenticator := sig.ChainedAuthenticator(
sig.NewV4Authenticator(req),
Expand Down Expand Up @@ -57,14 +63,14 @@ func AuthenticationHandler(authService auth.GatewayService, next http.Handler) h
return
}

user, err := authService.GetUser(ctx, creds.Username)
user, err = authService.GetUser(ctx, creds.Username)
if err != nil {
logger.WithError(err).Warn("could not get user for credentials key")
_ = o.EncodeError(w, req, gatewayerrors.ErrAccessDenied.ToAPIErr())
return
}
ctx = logging.AddFields(ctx, logging.Fields{logging.UserFieldKey: user.Username})
ctx = context.WithValue(ctx, ContextKeyUser, user)
ctx = auth.WithUser(ctx, user)
ctx = context.WithValue(ctx, ContextKeyAuthContext, authContext)
req = req.WithContext(ctx)
next.ServeHTTP(w, req)
Expand Down Expand Up @@ -152,7 +158,8 @@ func EnrichWithRepositoryOrFallback(c catalog.Interface, authService auth.Gatewa
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
repoID := ctx.Value(ContextKeyRepositoryID).(string)
username := ctx.Value(ContextKeyUser).(*model.User).Username
user, _ := auth.GetUser(ctx)
username := user.Username
o := ctx.Value(ContextKeyOperation).(*operations.Operation)
if repoID == "" {
// action without repo
Expand Down
14 changes: 7 additions & 7 deletions webui/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion webui/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"typesync": "npx typesync"
},
"dependencies": {
"@duckdb/duckdb-wasm": "^1.24.0",
"@duckdb/duckdb-wasm": "^1.26.0",
"@emotion/react": "^11.10.5",
"@emotion/styled": "^11.10.5",
"@mui/material": "^5.11.1",
Expand Down
20 changes: 5 additions & 15 deletions webui/src/pages/repositories/repository/fileRenderers/data.tsx
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import React, {FC, FormEvent, useCallback, useEffect, useState} from "react";
import {getDuckDBConnection, closeDuckDBConnection} from "./duckdb";
import * as duckdb from '@duckdb/duckdb-wasm';
import {runDuckDBQuery} from "./duckdb";
import * as arrow from 'apache-arrow';
import Form from "react-bootstrap/Form";
import Button from "react-bootstrap/Button";
Expand All @@ -20,17 +19,11 @@ export const DataLoader: FC = () => {
}

export const DuckDBRenderer: FC<RendererComponent> = ({repoId, refId, path, fileExtension }) => {
let initialQuery = `SELECT *
FROM read_parquet(lakefs_object('${repoId}', '${refId}', '${path}'))
LIMIT 20`;
let initialQuery = `SELECT * FROM read_parquet('lakefs://${repoId}/${refId}/${path}') LIMIT 20`;
if (fileExtension === 'csv') {
initialQuery = `SELECT *
FROM read_csv(lakefs_object('${repoId}', '${refId}', '${path}'), AUTO_DETECT = TRUE)
LIMIT 20`
initialQuery = `SELECT * FROM read_csv('lakefs://${repoId}/${refId}/${path}', AUTO_DETECT = TRUE) LIMIT 20`
} else if (fileExtension === 'tsv') {
initialQuery = `SELECT *
FROM read_csv(lakefs_object('${repoId}', '${refId}', '${path}'), DELIM='\t', AUTO_DETECT=TRUE)
LIMIT 20`
initialQuery = `SELECT * FROM read_csv('lakefs://${repoId}/${refId}/${path}', DELIM='\t', AUTO_DETECT=TRUE) LIMIT 20`
}
const [shouldSubmit, setShouldSubmit] = useState<boolean>(true)
// eslint-disable-next-line @typescript-eslint/no-explicit-any
Expand Down Expand Up @@ -60,17 +53,14 @@ LIMIT 20`
const runQuery = async (sql: string) => {
setLoading(true)
setError(null)
let conn: duckdb.AsyncDuckDBConnection | null = null
try {
conn = await getDuckDBConnection()
const results = await conn.query(sql)
const results = await runDuckDBQuery(sql)
setData(results)
} catch (e) {
setError(e.toString())
setData(null)
} finally {
setLoading(false)
await closeDuckDBConnection(conn)
}
}
runQuery(sql).catch(console.error);
Expand Down
85 changes: 60 additions & 25 deletions webui/src/pages/repositories/repository/fileRenderers/duckdb.tsx
Original file line number Diff line number Diff line change
@@ -1,31 +1,12 @@
import * as duckdb from '@duckdb/duckdb-wasm';
import * as arrow from 'apache-arrow';
import {AsyncDuckDB, AsyncDuckDBConnection, DuckDBDataProtocol} from '@duckdb/duckdb-wasm';
import duckdb_wasm from '@duckdb/duckdb-wasm/dist/duckdb-mvp.wasm?url';
import mvp_worker from '@duckdb/duckdb-wasm/dist/duckdb-browser-mvp.worker.js?url';
import duckdb_wasm_eh from '@duckdb/duckdb-wasm/dist/duckdb-eh.wasm?url';
import eh_worker from '@duckdb/duckdb-wasm/dist/duckdb-browser-eh.worker.js?url';
import {AsyncDuckDB, AsyncDuckDBConnection} from "@duckdb/duckdb-wasm";


// based on the replacement rules on the percent-encoding MDN page:
// https://developer.mozilla.org/en-US/docs/Glossary/Percent-encoding
// also, I tried doing something nicer with list comprehensions and printf('%x') to convert
// from unicode code point to hex - DuckDB didn't seem to evaluate lambdas and list comprehensions
// Issue: https://github.com/duckdb/duckdb/issues/5821
// when padding a macro to a table function such as read_parquet() or read_csv().
// so - string replacements it is.
const DUCKDB_SEED_SQL = `
CREATE MACRO p_encode(s) AS
list_aggregate([
case when x in (':', '/', '?', '#', '[', ']', '@', '!', '$', '&', '''', '(', ')', '*', '+', ',', ';', '=', '%', ' ')
then printf('%%%X', unicode(x)) else x end
for x
in string_split(s, '')
], 'string_agg', '');
CREATE MACRO lakefs_object(repoId, refId, path) AS
'${document.location.protocol}//${document.location.host}/api/v1/repositories/' ||
p_encode(repoId) || '/refs/' || p_encode(refId) || '/objects?path=' || p_encode(path);
`

const MANUAL_BUNDLES: duckdb.DuckDBBundles = {
mvp: {
Expand Down Expand Up @@ -53,18 +34,72 @@ async function getDuckDB(): Promise<duckdb.AsyncDuckDB> {
const db = new duckdb.AsyncDuckDB(logger, worker)
await db.instantiate(bundle.mainModule, bundle.pthreadWorker)
const conn = await db.connect()
await conn.query(DUCKDB_SEED_SQL)
await conn.close()
_db = db
return _db
}

export async function getDuckDBConnection(): Promise<duckdb.AsyncDuckDBConnection> {

// taken from @duckdb/duckdb-wasm/dist/types/src/bindings/tokens.d.ts
// which, unfortunately, we cannot import.
const DUCKDB_STRING_CONSTANT = 2;
const LAKEFS_URI_PATTERN = /^(['"]?)(lakefs:\/\/(.*))(['"])$/;

// returns a mapping of `lakefs://..` URIs to their `s3://...` equivalent
async function extractFiles(conn: AsyncDuckDBConnection, sql: string): Promise<{ [name: string]: string }> {
const tokenized = await conn.bindings.tokenize(sql)
let prev = 0;
const fileMap: { [name: string]: string } = {};
tokenized.offsets.forEach((offset, i) => {
let currentToken = sql.length;
if (i < tokenized.offsets.length - 1) {
currentToken = tokenized.offsets[i+1];
}
const part = sql.substring(prev, currentToken);
prev = currentToken;
if (tokenized.types[i] === DUCKDB_STRING_CONSTANT) {
const matches = part.match(LAKEFS_URI_PATTERN)
if (matches !== null) {
fileMap[matches[2]] = `s3://${matches[3]}`;
}
}
})
return fileMap
}

/* eslint-disable @typescript-eslint/no-explicit-any */
export async function runDuckDBQuery(sql: string): Promise<arrow.Table<any>> {
const db = await getDuckDB()
return db.connect()
/* eslint-disable @typescript-eslint/no-explicit-any */
let result: arrow.Table<any>
const conn = await db.connect()
try {
// TODO (ozk): read this from the server's configuration?
await conn.query(`SET s3_region='us-east-1';`)
// set the example values (used to make sure the S3 gateway picks up the request)
// real authentication is done using the existing swagger cookie or token
await conn.query(`SET s3_access_key_id='use_swagger_credentials';`)
await conn.query(`SET s3_secret_access_key='these_are_meaningless_but_must_be_set';`)
await conn.query(`SET s3_endpoint='${document.location.protocol}//${document.location.host}'`)

// register lakefs uri-ed files as s3 files
const fileMap = await extractFiles(conn, sql)
const fileNames = Object.getOwnPropertyNames(fileMap)
await Promise.all(fileNames.map(
fileName => db.registerFileURL(fileName, fileMap[fileName], DuckDBDataProtocol.S3, true)
))
// execute the query
result = await conn.query(sql)

// remove registrations
await Promise.all(fileNames.map(fileName => db.dropFile(fileName)))
} finally {
await closeDuckDBConnection(conn)
}
return result
}

export async function closeDuckDBConnection(conn: AsyncDuckDBConnection | null) {
async function closeDuckDBConnection(conn: AsyncDuckDBConnection | null) {
if (conn !== null) {
await conn.close()
}
Expand Down

0 comments on commit b8a7c49

Please sign in to comment.