Skip to content

Commit

Permalink
feat: csi driver
Browse files Browse the repository at this point in the history
  • Loading branch information
morlay committed Aug 16, 2023
1 parent f3a0d44 commit aea43a4
Show file tree
Hide file tree
Showing 46 changed files with 2,026 additions and 161 deletions.
11 changes: 11 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
version: 2
updates:
- package-ecosystem: "github-actions"
directory: "/"
schedule:
interval: "daily"

- package-ecosystem: "gomod"
directory: "/"
schedule:
interval: "daily"
26 changes: 26 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
name: ci

on:
push:
branches:
- "*"
tags:
- 'v*'

jobs:
ci:
runs-on: ubuntu-latest
env:
GH_USERNAME: ${{ github.actor }}
GH_PASSWORD: ${{ secrets.GITHUB_TOKEN }}

steps:
- uses: actions/checkout@v3
with:
fetch-depth: 0
- uses: docker/setup-qemu-action@v2

- run: curl -sSLf https://raw.githubusercontent.com/octohelm/wagon/main/install.sh | sudo sh

- run: make ship

48 changes: 37 additions & 11 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,57 @@ ifneq ( ,$(wildcard .secrets/local.mk))
include .secrets/local.mk
endif

WAGON = wagon -p wagon.cue

DEBUG = 0
ifeq ($(DEBUG),1)
WAGON := $(WAGON) --log-level=debug
endif

UNIFS = go run ./cmd/unifs

gen:
go run ./tool/internal/cmd/tool gen ./cmd/kubepkg

ship:
$(WAGON) do go ship pushx

manifests:
$(WAGON) do export manifests --output .tmp/

fmt:
cue fmt -s ./cuepkg/...
cue fmt -s ./cuedevpkg/...
goimports -w ./pkg
goimports -w ./cmd

dep:
go get -u ./pkg/...

test:
go test -v ./pkg/...

install:
go install ./cmd/unifs

test.fuse:
TEST_FUSE=1 \
go test -v -failfast ./pkg/fuse/...

mount.fs:
UNIFS_ENDPOINT=file:///tmp/data \
$(UNIFS) mount /tmp/mnt
mount.fs: install
unifs mount --delegate \
--backend=file:///tmp/data /tmp/mnt

mount.webdav:
UNIFS_ENDPOINT=$(UNIFS_WEBDAV_ENDPOINT) \
$(UNIFS) mount /tmp/mnt
$(UNIFS) mount \
--backend=$(UNIFS_WEBDAV_ENDPOINT) /tmp/mnt

mount.s3:
UNIFS_ENDPOINT=$(UNIFS_S3_ENDPOINT) \
$(UNIFS) mount /tmp/mnt

serve.webdav:
UNIFS_ENDPOINT=file:///tmp/data \
$(UNIFS) webdav
$(UNIFS) mount \
--backend=$(UNIFS_S3_ENDPOINT) /tmp/mnt

serve.webdav: install
unifs webdav --backend=file:///tmp/data

test.remote.s3:
TEST_S3_ENDPOINT=$(UNIFS_S3_ENDPOINT) \
Expand All @@ -38,3 +61,6 @@ test.remote.s3:
test.remote.webdav:
TEST_WEBDAV_ENDPOINT=$(UNIFS_WEBDAV_ENDPOINT) \
go test -v -failfast ./pkg/filesystem/webdav/...

debug.apply:
KUBECONFIG=${HOME}/.kube_config/config--infra-staging.yaml kubectl apply -f .tmp/manifests/unifs.yaml
81 changes: 79 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# UniFS

```mermaid
flowchart TB
flowchart TB
s3_fs[S3 FS]
local_fs[Local FS]
webdav_fs[WebDAV FS]
Expand All @@ -17,4 +17,81 @@ flowchart TB
fsi -->|serve| webdav_server
fsi -->|mount| fuse_fs
fsi -->|direct| go_code
```
```

### Supported Backends

```
webdav://<username>:<password>@<host>[<bath_path>][?insecure=true]
s3://<access_key_id>:<access_key_secret>@<host>/<bucket>[<bath_path>][?insecure=true]
file://<absolute_path>
```

### CSI

### Create StorageClass

```yaml
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: unifs
provisioner: csi-driver.unifs.octohelm.tech
parameters:
csi.storage.k8s.io/provisioner-secret-name: "${pvc.name}"
csi.storage.k8s.io/provisioner-secret-namespace: "${pvc.namespace}"
csi.storage.k8s.io/node-publish-secret-name: "${pvc.name}"
csi.storage.k8s.io/node-publish-secret-namespace: "${pvc.namespace}"
reclaimPolicy: Delete
```
### Create Secret && PersistentVolumeClaim
```yaml
---
apiVersion: v1
kind: Secret
metadata:
name: fuse-file
namespace: storage-system--unifs
type: Opaque
stringData:
backend: file:///data/unifs
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: fuse-file
namespace: storage-system--unifs
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 10Gi
storageClassName: unifs
volumeMode: Filesystem
---
apiVersion: v1
kind: Pod
metadata:
name: task-pv-pod
namespace: storage-system--unifs
spec:
volumes:
- name: pv-storage
persistentVolumeClaim:
claimName: fuse-file
containers:
- name: web
image: nginx
ports:
- containerPort: 80
name: "http"
volumeMounts:
- mountPath: "/usr/share/nginx/html"
name: pv-storage

```

20 changes: 20 additions & 0 deletions cmd/unifs/csidriver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package main

import (
"github.com/innoai-tech/infra/pkg/cli"
"github.com/innoai-tech/infra/pkg/otel"

"github.com/octohelm/unifs/pkg/csidriver"
)

func init() {
cli.AddTo(App, &CSIDriver{})
}

// Serve CSIDriver
type CSIDriver struct {
cli.C
Otel otel.Otel

csidriver.Driver
}
107 changes: 41 additions & 66 deletions cmd/unifs/mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,25 @@ package main
import (
"context"
"fmt"
"github.com/octohelm/unifs/pkg/csidriver/mounter"
"os"

"github.com/go-courier/logr"
"github.com/hanwen/go-fuse/v2/fs"
fusefuse "github.com/hanwen/go-fuse/v2/fuse"
"github.com/innoai-tech/infra/pkg/cli"
"github.com/innoai-tech/infra/pkg/configuration"
"github.com/innoai-tech/infra/pkg/otel"
"github.com/octohelm/unifs/pkg/filesystem"
"github.com/octohelm/unifs/pkg/filesystem/local"
"github.com/octohelm/unifs/pkg/filesystem/s3"
"github.com/octohelm/unifs/pkg/filesystem/webdav"
"github.com/octohelm/unifs/pkg/filesystem/api"
"github.com/octohelm/unifs/pkg/fuse"
"github.com/octohelm/unifs/pkg/strfmt"
"github.com/pkg/errors"
"os"
"time"
daemon "github.com/sevlyar/go-daemon"
)

func init() {
cli.AddTo(App, &Mount{})
}

var _ configuration.Server = &Mounter{}

// Mount as fuse fs
type Mount struct {
cli.C
Expand All @@ -34,93 +30,72 @@ type Mount struct {
Mounter
}

type Mounter struct {
MountPoint string `arg:""`
// Source Endpoint
Endpoint strfmt.Endpoint `flag:"endpoint"`
var _ configuration.Runner = &Mounter{}

fsi filesystem.FileSystem `flag:"-"`
state *fusefuse.Server `flag:"-"`
type Mounter struct {
MountPoint string `arg:""`
Backend strfmt.Endpoint `flag:"backend"`
Foreground bool `flag:"foreground,omitempty"`
Delegate bool `flag:"delegate,omitempty"`
}

func (m *Mounter) Init(ctx context.Context) error {
switch m.Endpoint.Scheme {
case "s3":
conf := &s3.Config{Endpoint: m.Endpoint}
c, err := conf.Client(ctx)
func (m *Mounter) Run(ctx context.Context) error {
if m.Delegate {
m2, err := mounter.NewMounter(ctx, m.Backend.String())
if err != nil {
return err
}
m.fsi = s3.NewS3FS(c, conf.Bucket(), conf.Prefix())
case "webdav":
conf := &webdav.Config{Endpoint: m.Endpoint}
c, err := conf.Client(ctx)
return m2.Mount(m.MountPoint)
}

if !m.Foreground {
dctx := &daemon.Context{}
p, err := dctx.Reborn()
if err != nil {
return err
}
m.fsi = webdav.NewWebdavFS(c)
case "file":
m.fsi = local.NewLocalFS(m.Endpoint.Path)
default:
return errors.Errorf("unsupported endpoint %s", m.Endpoint)
}

return nil
}
if p != nil {
return nil
}

defer dctx.Release()
}

func (m *Mounter) Serve(ctx context.Context) error {
if err := os.MkdirAll(m.MountPoint, os.ModePerm); err != nil {
return err
}

b := &api.FileSystemBackend{}
b.Backend = m.Backend

if err := b.Init(ctx); err != nil {
return err
}

options := &fs.Options{}
options.Name = fmt.Sprintf("%s.fs", m.Endpoint.Scheme)
options.Name = fmt.Sprintf("%s.fs", b.Backend.Scheme)
//options.Debug = true

rawFS := fs.NewNodeFS(fuse.FS(m.fsi), options)
rawFS := fs.NewNodeFS(fuse.FS(b.FileSystem()), options)

state, err := fusefuse.NewServer(rawFS, m.MountPoint, &options.MountOptions)
if err != nil {
return err
}
m.state = state

logr.FromContext(ctx).
WithValues(
"fsi", m.Endpoint.Scheme,
"fsi", m.Backend.Scheme,
"on", m.MountPoint,
).
Info("mounted")

state.Serve()
return nil
}

func (m *Mounter) Shutdown(ctx context.Context) error {
if m.state == nil {
return nil
if !m.Foreground {
go state.Serve()
return daemon.ServeSignals()
}

errCh := make(chan error)

go func() {
for i := 0; i < 5; i++ {
err := m.state.Unmount()
if err == nil {
errCh <- err
return
}
logr.FromContext(ctx).Warn(errors.Wrap(err, "unmount failed"))
time.Sleep(time.Second)
logr.FromContext(ctx).Info("retrying...")
}
errCh <- m.state.Unmount()
}()

select {
case <-ctx.Done():
return ctx.Err()
case err := <-errCh:
return err
}
state.Serve()
return nil
}
Loading

0 comments on commit aea43a4

Please sign in to comment.