Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

example hook: dataset metadata validation #7752

Merged
merged 7 commits into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions docs/howto/hooks/lua.md
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,22 @@ gs.write_fuse_symlink(source, destination, mount_info)
-- Symlink: "/home/user/gcs-mount/exported/path/to/object" -> "/home/user/gcs-mount/lakefs/data/abc/def"
```

### `hook`

A set of utilities to aide in writing user friendly hooks.

### `hook/fail(message)`

Will abort the current hook's execution with the given message. This is similar to using `error()`, but is typically used to separate
generic runtime errors (an API call that returned an unexpected response) and explict failure of the calling hook.

When called, errors will appear without a stacktrace, and the error message will be directly the one given as `message`.

```lua
> hook = require("hook")
> hook.fail("this hook shall not pass because of: " .. reason)
```

### `lakefs`

The Lua Hook library allows calling back to the lakeFS API using the identity of the user that triggered the action.
Expand Down Expand Up @@ -811,8 +827,8 @@ Returns a table for the given path string with the following structure:
Receives a variable number of strings and returns a joined string that represents a path:

```lua
> require("path")
> path.join("path/", "to", "a", "file.data")
> path = require("path")
> path.join("/", "path/", "to", "a", "file.data")
path/o/a/file.data
```

Expand Down
145 changes: 145 additions & 0 deletions examples/hooks/dataset_validator.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
--[[

Validate the existence of mandatory metadata describing a dataset.
A metadata file should exist either in the same directory as the modified dataset, or in any parent directory.
The closest metadata file would take precedence (i.e. same folder > parent > 2nd parent).

# Example hook definition (_lakefs_actions/validate_dataset_fields.yaml):
name: Validate Dataset Fields
description: Validate the existence of mandatory metadata describing a dataset.
on:
pre-merge:
branches:
- main
hooks:
- id: validate_datasets
type: lua
properties:
script_path: scripts/dataset_validator.lua
args:
prefix: 'datasets/'
metadata_file_name: dataset_metadata.yaml
fields:
- name: contains_pii
required: true
type: boolean
- name: approval_link
required: true
type: string
match_pattern: 'https?:\/\/.*'
- name: rank
required: true
type: number
- name: department
type: string
choices: ['hr', 'it', 'other']
]]

path = require("path")
regexp = require("regexp")
yaml = require("encoding/yaml")

lakefs = require("lakefs")
hook = require("hook")

function is_a_valid_choice(choices, value)
for _, c in ipairs(choices) do
if c == value then
return true
end
end
return false
end

function check_field(field_descriptor, value, filename)
-- check required but missing
if value == nil and field_descriptor.required then
hook.fail(filename .. ": field '" .. field_descriptor.name .. "' is required but no value given")
end
-- check type is correct
if field_descriptor.type ~= nil and type(value) ~= field_descriptor.type then
hook.fail(filename .. ": field '" .. field_descriptor.name .. "' should be of type " .. field_descriptor.type)
end
-- check choices
if field_descriptor.choices ~= nil and not is_a_valid_choice(field_descriptor.choices, value) then
hook.fail(filename .. ": field '" .. field_descriptor.name .. "' should be one of '" .. table.concat(field_descriptor.choices, ", ") .. "'")
end
-- check pattern
if field_descriptor.match_pattern ~= nil then
if value ~= nil and type(value) ~= "string" then
hook.fail(filename .. ": field " .. field_descriptor.name .. " should be text (got '" .. type(value) .. "') and match pattern '" .. field_descriptor.match_pattern .. "'")
elseif value ~= nil and not regexp.match(field_descriptor.match_pattern, value) then
hook.fail(filename .. ": field " .. field_descriptor.name .. " should match pattern '" .. field_descriptor.match_pattern .. "'")
end
end
end


-- main flow
after = ""
has_more = true
metadata_files = {}
while has_more do
local code, resp = lakefs.diff_refs(action.repository_id, action.branch_id, action.source_ref, after, args.prefix)
if code ~= 200 then
error("could not diff: " .. resp.message)
end
for _, result in pairs(resp.results) do
print("" .. result.type .. " " .. result.path)
if result.type == "added" then
should_check = true
valid = true
has_parent = true
current = result.path
descriptor_for_file = ""

-- find nearest metadata file
while has_parent do
parsed = path.parse(current)
if not parsed.parent or parsed.parent == "" then
has_parent = false
break
end
current_descriptor = path.join("/", parsed.parent, args.metadata_file_name)
-- check if this descriptor has already been cached
if metadata_files[current_descriptor] then
-- cache hit
descriptor_for_file = metadata_files[current_descriptor]
break

elseif metadata_files[current_descriptor] == nil then
-- cache miss
-- attempt to fetch it
code, body = lakefs.get_object(action.repository_id, action.source_ref, current_descriptor)
if code == 200 then
metadata_files[current_descriptor] = yaml.unmarshal(body)
descriptor_for_file = current_descriptor
break
elseif code ~= 404 then
error("failed to look up metadata file: '" .. current_descriptor .. "', HTTP " .. tostring(code))
else
-- indicates this doesn't exist, no need to look it up again
metadata_files[current_descriptor] = false
end
end

current = parsed.parent
end

-- check if we found a descriptor
if descriptor_for_file == "" then
hook.fail("No dataset metadata found for file: " .. result.path)
end
end
end
-- pagination
has_more = resp.pagination.has_more
after = resp.pagination.next_offset
end

-- now let's review all the metadata files for this commit:
for metadata_filename, metadata_file in pairs(metadata_files) do
for _, field_descriptor in ipairs(args.fields) do
check_field(field_descriptor, metadata_file[field_descriptor.name], metadata_filename)
end
end
14 changes: 13 additions & 1 deletion pkg/actions/lua.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import (

"github.com/Shopify/go-lua"
"github.com/spf13/viper"

lualibs "github.com/treeverse/lakefs/pkg/actions/lua"
"github.com/treeverse/lakefs/pkg/actions/lua/hook"
"github.com/treeverse/lakefs/pkg/actions/lua/lakefs"
luautil "github.com/treeverse/lakefs/pkg/actions/lua/util"
"github.com/treeverse/lakefs/pkg/api/apiutil"
Expand Down Expand Up @@ -151,11 +153,21 @@ func (h *LuaHook) Run(ctx context.Context, record graveler.HookRecord, buf *byte
}

func LuaRun(l *lua.State, code, name string) error {
l.Global("debug")
l.Field(-1, "traceback")
traceback := l.Top()
var mode string
if err := lua.LoadBuffer(l, code, name, mode); err != nil {
v, ok := l.ToString(l.Top())
if ok {
err = fmt.Errorf("%w: %s", err, v)
}
return err
}
return l.ProtectedCall(0, lua.MultipleReturns, 0)
if err := l.ProtectedCall(0, lua.MultipleReturns, traceback); err != nil {
return hook.Unwrap(err)
}
return nil
}

func (h *LuaHook) collectMetrics(l *lua.State) {
Expand Down
49 changes: 49 additions & 0 deletions pkg/actions/lua/hook/lib.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package hook

import (
"strings"

"github.com/Shopify/go-lua"
)

// helpers for writing lua actions

// ErrHookFailure indicates an explicit failure from a hook
// (as opposed to a generic error that occurred during execution)
type ErrHookFailure string

func (e ErrHookFailure) Error() string {
return string(e)
}

func Open(l *lua.State) {
open := func(l *lua.State) int {
lua.NewLibrary(l, library)
return 1
}
lua.Require(l, "hook", open, false)
l.Pop(1)
}

var library = []lua.RegistryFunction{
{Name: "fail", Function: fail},
}

func fail(l *lua.State) int {
p := lua.CheckString(l, 1)
lua.Errorf(l, "<HookFailure>%s</HookFailure>", p)
panic("unreachable")
}

func Unwrap(err error) error {
switch err.(type) {
case lua.RuntimeError, *lua.RuntimeError:
str := err.Error()
_, after, found := strings.Cut(str, "<HookFailure>")
if found {
before, _, _ := strings.Cut(after, "</HookFailure>")
return ErrHookFailure(before)
}
}
return err
}
75 changes: 75 additions & 0 deletions pkg/actions/lua/hook/lib_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package hook_test

import (
"testing"

"github.com/treeverse/lakefs/pkg/actions/lua/hook"

"github.com/Shopify/go-lua"
)

const scriptWithExplicitFailure = `
hook = require("hook")

hook.fail("this hook shall not pass")
`

const scriptWithExplicitError = `
error("oh no")
`

const scriptWithSyntaxError = `
local a = 15
a += "a"
`

func TestUnwrap(t *testing.T) {
t.Run("explicit fail", func(t *testing.T) {
l := lua.NewState()
lua.OpenLibraries(l)
hook.Open(l)
err := lua.DoString(l, scriptWithExplicitFailure)
if err == nil {
t.Error("expected error but got none!")
}
before := err
after := hook.Unwrap(before)
if after.Error() != "this hook shall not pass" {
t.Errorf("could not unwrap lua hook error, got %s", after.Error())
}
})
t.Run("regular error", func(t *testing.T) {
l := lua.NewState()
lua.OpenLibraries(l)
hook.Open(l)
err := lua.DoString(l, scriptWithExplicitError)
if err == nil {
t.Error("expected error but got none!")
}
before := err
after := hook.Unwrap(err)
if after.Error() != before.Error() {
t.Error("unwrapping things not returned by hook.fail should not change the error")
}
})
t.Run("syntax error", func(t *testing.T) {
l := lua.NewState()
lua.OpenLibraries(l)
hook.Open(l)
err := lua.DoString(l, scriptWithSyntaxError)
if err == nil {
t.Error("expected error but got none!")
}
before := err
after := hook.Unwrap(err)
if after.Error() != before.Error() {
t.Error("unwrapping things not returned by hook.fail should not change the error")
}
})
t.Run("nil error", func(t *testing.T) {
after := hook.Unwrap(nil)
if after != nil {
t.Error("unwrapping nil should return nil")
}
})
}
3 changes: 3 additions & 0 deletions pkg/actions/lua/open.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"github.com/Shopify/go-lua"

"github.com/treeverse/lakefs/pkg/actions/lua/crypto/aes"
"github.com/treeverse/lakefs/pkg/actions/lua/crypto/hmac"
"github.com/treeverse/lakefs/pkg/actions/lua/crypto/sha256"
Expand All @@ -14,6 +15,7 @@ import (
"github.com/treeverse/lakefs/pkg/actions/lua/encoding/parquet"
"github.com/treeverse/lakefs/pkg/actions/lua/encoding/yaml"
"github.com/treeverse/lakefs/pkg/actions/lua/formats"
"github.com/treeverse/lakefs/pkg/actions/lua/hook"
"github.com/treeverse/lakefs/pkg/actions/lua/net/http"
"github.com/treeverse/lakefs/pkg/actions/lua/net/url"
"github.com/treeverse/lakefs/pkg/actions/lua/path"
Expand Down Expand Up @@ -45,6 +47,7 @@ func Open(l *lua.State, ctx context.Context, cfg OpenSafeConfig) {
aes.Open(l)
parquet.Open(l)
path.Open(l)
hook.Open(l)
aws.Open(l, ctx)
gcloud.Open(l, ctx)
azure.Open(l, ctx)
Expand Down
5 changes: 5 additions & 0 deletions pkg/actions/lua/path/path_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ func TestParse(t *testing.T) {
ExpectedBasename: "bar",
ExpectedParent: "",
},
{
Input: "bar/",
ExpectedBasename: "bar",
ExpectedParent: "",
},
{
Input: "/bar",
ExpectedBasename: "bar",
Expand Down
10 changes: 8 additions & 2 deletions pkg/actions/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"sync"
"time"

"github.com/treeverse/lakefs/pkg/actions/lua/hook"

"github.com/antonmedv/expr"
"github.com/hashicorp/go-multierror"
"github.com/treeverse/lakefs/pkg/auth"
Expand Down Expand Up @@ -374,8 +376,12 @@ func (s *StoreService) runTasks(ctx context.Context, record graveler.HookRecord,
if task.Err != nil {
_, _ = fmt.Fprintf(&buf, "Error: %s\n", task.Err)
// wrap error with more information
task.Err = fmt.Errorf("hook run id '%s' failed on action '%s' hook '%s': %w",
task.HookRunID, task.Action.Name, task.HookID, task.Err)
if _, ok := task.Err.(hook.ErrHookFailure); ok {
task.Err = fmt.Errorf("%s: %w", task.HookID, task.Err)
} else {
task.Err = fmt.Errorf("hook run id '%s' failed on action '%s' hook '%s': %w",
task.HookRunID, task.Action.Name, task.HookID, task.Err)
}
}

err := hookOutputWriter.OutputWrite(ctx, &buf, int64(buf.Len()))
Expand Down
Loading
Loading