Skip to content

Commit

Permalink
Delta Exporter: Azure Support (#7444)
Browse files Browse the repository at this point in the history
* Delta Exporter: Azure Support

* CR Fixes

* Update pkg/actions/lua/lakefs/catalogexport/table_extractor.lua

Co-authored-by: isan_rivkin <[email protected]>

* Documentation

---------

Co-authored-by: isan_rivkin <[email protected]>
  • Loading branch information
N-o-Z and Isan-Rivkin authored Feb 13, 2024
1 parent 51153e6 commit ab22684
Show file tree
Hide file tree
Showing 24 changed files with 649 additions and 276 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/esti.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -900,8 +900,8 @@ jobs:
DOCKER_REG: ${{ needs.login-to-amazon-ecr.outputs.registry }}
LAKEFS_DATABASE_TYPE: postgres
LAKEFS_BLOCKSTORE_TYPE: azure
LAKEFS_BLOCKSTORE_AZURE_STORAGE_ACCOUNT: esti
LAKEFS_BLOCKSTORE_AZURE_STORAGE_ACCESS_KEY: ${{ secrets.LAKEFS_BLOCKSTORE_AZURE_STORAGE_ACCESS_KEY2 }}
ESTI_AZURE_STORAGE_ACCOUNT: esti
ESTI_AZURE_STORAGE_ACCESS_KEY: ${{ secrets.LAKEFS_BLOCKSTORE_AZURE_STORAGE_ACCESS_KEY }}
ESTI_BLOCKSTORE_TYPE: azure
ESTI_STORAGE_NAMESPACE: https://esti.blob.core.windows.net/esti-system-testing/${{ github.run_number }}/${{ steps.unique.outputs.value }}

Expand Down Expand Up @@ -954,11 +954,11 @@ jobs:
LAKEFS_DATABASE_COSMOSDB_CONTAINER: ${{ github.run_number }}-${{ steps.unique.outputs.value }}
LAKEFS_DATABASE_COSMOSDB_KEY: ${{ secrets.LAKEFS_DATABASE_COSMOSDB_READWRITEKEY }}
LAKEFS_BLOCKSTORE_TYPE: azure
LAKEFS_BLOCKSTORE_AZURE_STORAGE_ACCOUNT: esti4hns
LAKEFS_BLOCKSTORE_AZURE_STORAGE_ACCESS_KEY: ${{ secrets.LAKEFS_BLOCKSTORE_AZURE_STORAGE_GEN2_ACCESS_KEY }}
ESTI_BLOCKSTORE_TYPE: azure
ESTI_STORAGE_NAMESPACE: https://esti4hns.blob.core.windows.net/esti-system-testing/${{ github.run_number }}/${{ steps.unique.outputs.value }}
ESTI_ADLS_IMPORT_BASE_URL: https://esti4hns.adls.core.windows.net/esti-system-testing-data/
ESTI_AZURE_STORAGE_ACCOUNT: esti4hns
ESTI_AZURE_STORAGE_ACCESS_KEY: ${{ secrets.LAKEFS_BLOCKSTORE_AZURE_STORAGE_GEN2_ACCESS_KEY }}

- name: cleanup cosmos db container
if: always()
Expand Down
155 changes: 90 additions & 65 deletions docs/howto/hooks/lua.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,19 +102,30 @@ The Lua runtime embedded in lakeFS is limited for security reasons. The provided

Helper function to mark a table object as an array for the runtime by setting `_is_array: true` metatable field.

### `aws/s3.get_object(bucket, key)`
### `aws`

### `aws/s3_client`
S3 client library.

```lua
local aws = require("aws")
-- pass valid AWS credentials
local client = aws.s3_client("ACCESS_KEY_ID", "SECRET_ACCESS_KEY", "REGION")
```

### `aws/s3_client.get_object(bucket, key)`

Returns the body (as a Lua string) of the requested object and a boolean value that is true if the requested object exists

### `aws/s3.put_object(bucket, key, value)`
### `aws/s3_client.put_object(bucket, key, value)`

Sets the object at the given bucket and key to the value of the supplied value string

### `aws/s3.delete_object(bucket [, key])`
### `aws/s3_client.delete_object(bucket [, key])`

Deletes the object at the given key

### `aws/s3.list_objects(bucket [, prefix, continuation_token, delimiter])`
### `aws/s3_client.list_objects(bucket [, prefix, continuation_token, delimiter])`

Returns a table of results containing the following structure:

Expand Down Expand Up @@ -143,7 +154,7 @@ or:
}
```

### `aws/s3.delete_recursive(bucket, prefix)`
### `aws/s3_client.delete_recursive(bucket, prefix)`

Deletes all objects under the given prefix

Expand Down Expand Up @@ -196,6 +207,34 @@ The `table_input` is the same as the argument in `glue.create_table` function.

Delete an existing Table in Glue Catalog.

### `azure`

### `azure/blob_client`
Azure blob client library.

```lua
local azure = require("azure")
-- pass valid Azure credentials
local client = azure.blob_client("AZURE_STORAGE_ACCOUNT", "AZURE_ACCESS_KEY")
```

### `azure/blob_client.get_object(path_uri)`

Returns the body (as a Lua string) of the requested object and a boolean value that is true if the requested object exists
`path_uri` - A valid Azure blob storage uri in the form of `https://myaccount.blob.core.windows.net/mycontainer/myblob`

### `azure/blob_client.put_object(path_uri, value)`

Sets the object at the given bucket and key to the value of the supplied value string
`path_uri` - A valid Azure blob storage uri in the form of `https://myaccount.blob.core.windows.net/mycontainer/myblob`

### `azure/blob_client.delete_object(path_uri)`

Deletes the object at the given key
`path_uri` - A valid Azure blob storage uri in the form of `https://myaccount.blob.core.windows.net/mycontainer/myblob`

### `crypto`

### `crypto/aes/encryptCBC(key, plaintext)`

Returns a ciphertext for the aes encrypted text
Expand Down Expand Up @@ -412,20 +451,20 @@ Parameters:

A package used to export Delta Lake tables from lakeFS to an external cloud storage.

### `lakefs/catalogexport/delta_exporter.export_delta_log(action, table_names, writer, delta_client, table_descriptors_path)`
### `lakefs/catalogexport/delta_exporter.export_delta_log(action, table_def_names, write_object, delta_client, table_descriptors_path)`

The function used to export Delta Lake tables.
The return value is a table with mapping of table names to external table location (from which it is possible to query the data).

Parameters:

- `action`: The global action object
- `table_names`: Delta tables name list (e.g. `{"table1", "table2"}`)
- `writer`: A writer function with `function(bucket, key, data)` signature, used to write the exported Delta Log (e.g. `aws/s3.s3_client.put_object`)
- `table_def_names`: Delta tables name list (e.g. `{"table1", "table2"}`)
- `write_object`: A writer function with `function(bucket, key, data)` signature, used to write the exported Delta Log (e.g. `aws/s3_client.put_object` or `azure/blob_client.put_object`)
- `delta_client`: A Delta Lake client that implements `get_table: function(repo, ref, prefix)`
- `table_descriptors_path`: The path under which the table descriptors of the provided `table_names` reside
- `table_descriptors_path`: The path under which the table descriptors of the provided `table_def_names` reside

Example:
Delta export example for AWS S3:

```yaml
---
Expand All @@ -444,7 +483,7 @@ hooks:
local table_descriptors_path = "_lakefs_tables"
local sc = aws.s3_client(args.aws.access_key_id, args.aws.secret_access_key, args.aws.region)
local delta_client = formats.delta_client(args.lakefs.access_key_id, args.lakefs.secret_access_key, args.aws.region)
local delta_table_locations = delta_exporter.export_delta_log(action, args.table_names, sc.put_object, delta_client, table_descriptors_path)
local delta_table_locations = delta_exporter.export_delta_log(action, args.table_defs, sc.put_object, delta_client, table_descriptors_path)
for t, loc in pairs(delta_table_locations) do
print("Delta Lake exported table \"" .. t .. "\"'s location: " .. loc .. "\n")
Expand All @@ -457,7 +496,7 @@ hooks:
lakefs:
access_key_id: <LAKEFS_ACCESS_KEY_ID>
secret_access_key: <LAKEFS_SECRET_ACCESS_KEY>
table_names:
table_defs:
- mytable
```

Expand All @@ -469,6 +508,45 @@ type: delta
path: a/path/to/my/delta/table
```

Delta export example for Azure Blob Storage:

```yaml
name: Delta Exporter
on:
post-commit:
branches: ["{{ .Branch }}*"]
hooks:
- id: delta_exporter
type: lua
properties:
script: |
local azure = require("azure")
local formats = require("formats")
local delta_exporter = require("lakefs/catalogexport/delta_exporter")
local table_descriptors_path = "_lakefs_tables"
local sc = azure.blob_client(args.azure.storage_account, args.azure.access_key)
local function write_object(_, key, buf)
return sc.put_object(key,buf)
end
local delta_client = formats.delta_client(args.lakefs.access_key_id, args.lakefs.secret_access_key)
local delta_table_locations = delta_exporter.export_delta_log(action, args.table_names, write_object, delta_client, table_descriptors_path)
for t, loc in pairs(delta_table_locations) do
print("Delta Lake exported table \"" .. t .. "\"'s location: " .. loc .. "\n")
end
args:
azure:
storage_account: "{{ .AzureStorageAccount }}"
access_key: "{{ .AzureAccessKey }}"
lakefs: # provide credentials of a user that has access to the script and Delta Table
access_key_id: "{{ .LakeFSAccessKeyID }}"
secret_access_key: "{{ .LakeFSSecretAccessKey }}"
table_defs:
- mytable
```

### `lakefs/catalogexport/table_extractor`

Utility package to parse `_lakefs_tables/` descriptors.
Expand Down Expand Up @@ -606,59 +684,6 @@ Parameters:
- `descriptor(Table)`: Object from (e.g. _lakefs_tables/my_table.yaml).
- `action_info(Table)`: The global action object.

### `lakefs/catalogexport/delta_exporter`

A package used to export Delta Lake tables from lakeFS to an external cloud storage.

### `lakefs/catalogexport/delta_exporter.export_delta_log(action, table_paths, writer, delta_client, table_descriptors_path)`

The function used to export Delta Lake tables.
The return value is a table with mapping of table names to external table location (from which it is possible to query the data).

Parameters:

- `action`: The global action object
- `table_paths`: Paths list in lakeFS to Delta Tables (e.g. `{"path/to/table1", "path/to/table2"}`)
- `writer`: A writer function with `function(bucket, key, data)` signature, used to write the exported Delta Log (e.g. `aws/s3.s3_client.put_object`)
- `delta_client`: A Delta Lake client that implements `get_table: function(repo, ref, prefix)`
- `table_descriptors_path`: The path under which the table descriptors of the provided `table_paths` reside

Example:

```yaml
---
name: delta_exporter
on:
post-commit: null
hooks:
- id: delta
type: lua
properties:
script: |
local aws = require("aws")
local formats = require("formats")
local delta_exporter = require("lakefs/catalogexport/delta_exporter")
local table_descriptors_path = "_lakefs_tables"
local sc = aws.s3_client(args.aws.access_key_id, args.aws.secret_access_key, args.aws.region)
local delta_client = formats.delta_client(args.lakefs.access_key_id, args.lakefs.secret_access_key, args.aws.region)
local delta_table_locations = delta_exporter.export_delta_log(action, args.table_paths, sc.put_object, delta_client, table_descriptors_path)
for t, loc in pairs(delta_table_locations) do
print("Delta Lake exported table \"" .. t .. "\"'s location: " .. loc .. "\n")
end
args:
aws:
access_key_id: <AWS_ACCESS_KEY_ID>
secret_access_key: <AWS_SECRET_ACCESS_KEY>
region: us-east-1
lakefs:
access_key_id: <LAKEFS_ACCESS_KEY_ID>
secret_access_key: <LAKEFS_SECRET_ACCESS_KEY>
table_paths:
- my/delta/table/path
```

### `lakefs/catalogexport/unity_exporter`

A package used to register exported Delta Lake tables to Databricks' Unity catalog.
Expand Down
Loading

0 comments on commit ab22684

Please sign in to comment.