Skip to content

Commit

Permalink
feat: multi file editor
Browse files Browse the repository at this point in the history
Implement new interface for multifile editor
  • Loading branch information
Martin GUIBERT committed Oct 17, 2023
1 parent 2be9c15 commit 9581cf7
Show file tree
Hide file tree
Showing 3 changed files with 272 additions and 20 deletions.
38 changes: 38 additions & 0 deletions src/main/java/io/kestra/storage/gcs/GcsFileAttributes.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package io.kestra.storage.gcs;

import com.google.cloud.storage.BlobInfo;
import io.kestra.core.storages.FileAttributes;
import lombok.Builder;
import lombok.Value;

@Value
@Builder
public class GcsFileAttributes implements FileAttributes {

String fileName;
BlobInfo blobInfo;
boolean isDirectory;

@Override
public long getLastModifiedTime() {
return blobInfo.getUpdateTimeOffsetDateTime().toInstant().toEpochMilli();
}

@Override
public long getCreationTime() {
return blobInfo.getCreateTimeOffsetDateTime().toInstant().toEpochMilli();
}

@Override
public FileType getType() {
if (isDirectory || fileName.endsWith("/") || blobInfo.getContentType().equals("application/x-directory")) {
return FileType.Directory;
}
return FileType.File;
}

@Override
public long getSize() {
return blobInfo.getSize();
}
}
128 changes: 113 additions & 15 deletions src/main/java/io/kestra/storage/gcs/GcsStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
import com.google.api.gax.paging.Page;
import com.google.cloud.WriteChannel;
import com.google.cloud.storage.*;
import io.kestra.core.storages.FileAttributes;
import io.micronaut.core.annotation.Introspected;
import io.kestra.core.storages.StorageInterface;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
Expand All @@ -14,14 +16,17 @@
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.attribute.FileTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.jetbrains.annotations.NotNull;

import static io.kestra.core.utils.Rethrow.throwFunction;

@Singleton
@GcsStorageEnabled
Expand All @@ -38,13 +43,24 @@ private Storage client() {
}

private BlobId blob(String tenantId, URI uri) {
String path = getPath(tenantId, uri);
return blob(path);
}

@NotNull
private BlobId blob(String path) {
return BlobId.of(this.config.getBucket(), path);
}

@NotNull
private String getPath(String tenantId, URI uri) {
String path;
if (tenantId != null) {
path = tenantId + uri.getPath();
} else {
path = uri.getPath().substring(1);
}
return BlobId.of(this.config.getBucket(), path);
return path;
}

@Override
Expand All @@ -63,6 +79,22 @@ public InputStream get(String tenantId, URI uri) throws IOException {
}
}

@Override
public List<FileAttributes> list(String tenantId, URI uri) throws IOException {
String path = getPath(tenantId, uri);
String prefix = (path.endsWith("/")) ? path : path + "/";
Page<Blob> blobs = this.client().list(config.bucket, Storage.BlobListOption.prefix(prefix),
Storage.BlobListOption.currentDirectory());
return blobs.streamAll()
.filter(blob -> {
String key = blob.getName().substring(prefix.length());
// Remove recursive result and requested dir
return !key.isEmpty() && !Objects.equals(key, prefix) && new File(key).getParent() == null;
})
.map(throwFunction(this::getGcsFileAttributes))
.toList();
}

@Override
public boolean exists(String tenantId, URI uri) {
try {
Expand Down Expand Up @@ -103,9 +135,35 @@ public Long lastModifiedTime(String tenantId,URI uri) throws IOException {
}
}

@Override
public FileAttributes getAttributes(String tenantId, URI uri) throws IOException {
String path = getPath(tenantId, uri);
if (!exists(tenantId, uri)) {
path = path + "/";
}
Blob blob = this.client().get(this.blob(path));
if (blob == null) {
throw new FileNotFoundException("%s not found.".formatted(uri));
}
return getGcsFileAttributes(blob);
}

private FileAttributes getGcsFileAttributes(Blob blob) {
GcsFileAttributes.GcsFileAttributesBuilder builder = GcsFileAttributes.builder()
.fileName(new File(blob.getName()).getName())
.blobInfo(blob.asBlobInfo());
if (blob.getName().endsWith("/")) {
builder.isDirectory(true);
}
return builder.build();
}

@Override
public URI put(String tenantId, URI uri, InputStream data) throws IOException {
try {
String path = getPath(tenantId, uri);
mkdirs(path);

BlobInfo blobInfo = BlobInfo
.newBuilder(this.blob(tenantId, uri))
.build();
Expand All @@ -127,12 +185,54 @@ public URI put(String tenantId, URI uri, InputStream data) throws IOException {
}
}

private void mkdirs(String path) {
if (!path.endsWith("/") && path.lastIndexOf('/') > 0) {
path = path.substring(0, path.lastIndexOf('/') + 1);
}
BlobInfo blobInfo = BlobInfo
.newBuilder(this.blob(path))
.build();
this.client().create(blobInfo);
}

public boolean delete(String tenantId, URI uri) throws IOException {
try {
return this.client().delete(this.blob(tenantId, uri));
} catch (StorageException e) {
throw new IOException(e);
return !deleteByPrefix(tenantId, uri).isEmpty();
}

@Override
public URI createDirectory(String tenantId, URI uri) throws IOException {
String path = getPath(tenantId, uri);
if (!path.endsWith("/")) {
path = path + "/";
}
mkdirs(path);
return createUri(uri.getPath());
}

@Override
public URI move(String tenantId, URI from, URI to) throws IOException {
String path = getPath(tenantId, from);
if (getAttributes(tenantId, from).getType() == FileAttributes.FileType.File) {
// copy just a file
BlobId source = blob(path);
BlobId target = blob(tenantId, to);
client().copy(Storage.CopyRequest.newBuilder().setSource(source).setTarget(target).build());
client().delete(source);
return createUri(to.getPath());
}

// move directories
StorageBatch batch = this.client().batch();
String prefix = (!path.endsWith("/")) ? path + "/" : path;

Page<Blob> list = client().list(config.bucket, Storage.BlobListOption.prefix(prefix), Storage.BlobListOption.currentDirectory());
list.streamAll().forEach(blob -> {
BlobId target = blob(getPath(tenantId, to) + "/" + blob.getName().substring(prefix.length()));
client().copy(Storage.CopyRequest.newBuilder().setSource(blob.getBlobId()).setTarget(target).build());
batch.delete(blob.getBlobId());
});
batch.submit();
return createUri(to.getPath());
}

@Override
Expand All @@ -141,12 +241,7 @@ public List<URI> deleteByPrefix(String tenantId, URI storagePrefix) throws IOExc
StorageBatch batch = this.client().batch();
Map<URI, StorageBatchResult<Boolean>> results = new HashMap<>();

String prefix;
if (tenantId != null) {
prefix = tenantId + storagePrefix.getPath();
} else {
prefix = storagePrefix.getPath().substring(1);
}
String prefix = getPath(tenantId, storagePrefix);

Page<Blob> blobs = this.client()
.list(this.config.getBucket(),
Expand Down Expand Up @@ -179,4 +274,7 @@ public List<URI> deleteByPrefix(String tenantId, URI storagePrefix) throws IOExc
throw new IOException(e);
}
}
private static URI createUri(String key) {
return URI.create("kestra://%s".formatted(key));
}
}
126 changes: 121 additions & 5 deletions src/test/java/io/kestra/storage/gcs/GcsStorageTest.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
package io.kestra.storage.gcs;

import com.google.common.io.CharStreams;
import io.kestra.core.storages.FileAttributes;
import io.kestra.core.utils.IdUtils;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import org.junit.jupiter.api.Test;
import io.kestra.core.storages.StorageInterface;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.*;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -140,7 +139,16 @@ private void deleteByPrefix(String prefix, String tenantId) throws Exception {

List<URI> deleted = storageInterface.deleteByPrefix(tenantId, new URI("/" + prefix + "/storage/"));

assertThat(deleted, containsInAnyOrder(path.stream().map(s -> URI.create("kestra://" + s)).toArray()));
List<String> res = Arrays.asList(
"/" + prefix + "/storage/",
"/" + prefix + "/storage/root.yml",
"/" + prefix + "/storage/level1/",
"/" + prefix + "/storage/level1/1.yml",
"/" + prefix + "/storage/level1/level2/",
"/" + prefix + "/storage/level1/level2/1.yml"
);

assertThat(deleted, containsInAnyOrder(res.stream().map(s -> URI.create("kestra://" + s)).toArray()));

assertThrows(FileNotFoundException.class, () -> {
storageInterface.get(tenantId, new URI("/" + prefix + "/storage/"));
Expand All @@ -162,4 +170,112 @@ void deleteByPrefixNoResult() throws Exception {
List<URI> deleted = storageInterface.deleteByPrefix(tenantId, new URI("/" + prefix + "/storage/"));
assertThat(deleted.size(), is(0));
}

@Test
void list() throws Exception {
String prefix = IdUtils.create();
String tenantId = IdUtils.create();
URL resource = GcsFileAttributes.class.getClassLoader().getResource("application.yml");

List<String> path = Arrays.asList(
"/" + prefix + "/storage/root.yml",
"/" + prefix + "/storage/level1/1.yml",
"/" + prefix + "/storage/level1/level2/1.yml",
"/" + prefix + "/storage/another/1.yml"
);
path.forEach(throwConsumer(s -> this.putFile(tenantId, resource, s)));

List<FileAttributes> list = storageInterface.list(tenantId, new URI("/" + prefix + "/storage"));

assertThat(list.stream().map(FileAttributes::getFileName).toList(), containsInAnyOrder("root.yml", "level1", "another"));
}

@Test
void getAttributes() throws Exception {
String prefix = IdUtils.create();
String tenantId = IdUtils.create();
URL resource = GcsStorageTest.class.getClassLoader().getResource("application.yml");
String content = CharStreams.toString(new InputStreamReader(new FileInputStream(Objects.requireNonNull(resource).getFile())));

List<String> path = Arrays.asList(
"/" + prefix + "/storage/root.yml",
"/" + prefix + "/storage/level1/1.yml"
);
path.forEach(throwConsumer(s -> this.putFile(tenantId, resource, s)));

FileAttributes attr = storageInterface.getAttributes(tenantId, new URI("/" + prefix + "/storage/root.yml"));
assertThat(attr.getFileName(), is("root.yml"));
assertThat(attr.getType(), is(FileAttributes.FileType.File));
assertThat(attr.getSize(), is((long) content.length()));
assertThat(attr.getLastModifiedTime(), notNullValue());

attr = storageInterface.getAttributes(tenantId, new URI("/" + prefix + "/storage/level1"));
assertThat(attr.getFileName(), is("level1"));
assertThat(attr.getType(), is(FileAttributes.FileType.Directory));
assertThat(attr.getSize(), is(0L));
assertThat(attr.getLastModifiedTime(), notNullValue());
}

@Test
void delete() throws Exception {
String prefix = IdUtils.create();
String tenantId = IdUtils.create();
URL resource = GcsStorageTest.class.getClassLoader().getResource("application.yml");

List<String> path = Arrays.asList(
"/" + prefix + "/storage/root.yml",
"/" + prefix + "/storage/level1/1.yml",
"/" + prefix + "/storage/level1/level2/1.yml",
"/" + prefix + "/storage/another/1.yml"
);
path.forEach(throwConsumer(s -> this.putFile(tenantId, resource, s)));

boolean deleted = storageInterface.delete(tenantId, new URI("/" + prefix + "/storage/level1"));
assertThat(deleted, is(true));
assertThat(storageInterface.exists(tenantId, new URI("/" + prefix + "/storage/root.yml")), is(true));
assertThat(storageInterface.exists(tenantId, new URI("/" + prefix + "/storage/another/1.yml")), is(true));
assertThat(storageInterface.exists(tenantId, new URI("/" + prefix + "/storage/level1/1.yml")), is(false));
assertThat(storageInterface.exists(tenantId, new URI("/" + prefix + "/storage/level1/level2/1.yml")), is(false));
deleted = storageInterface.delete(tenantId, new URI("/" + prefix + "/storage/root.yml"));
assertThat(deleted, is(true));
assertThat(storageInterface.exists(tenantId, new URI("/" + prefix + "/storage/root.yml")), is(false));
}

@Test
void createDirectory() throws URISyntaxException, IOException {
String prefix = IdUtils.create();
String tenantId = IdUtils.create();

storageInterface.createDirectory(tenantId, new URI("/" + prefix + "/storage/level1"));
FileAttributes attr = storageInterface.getAttributes(tenantId, new URI("/" + prefix + "/storage/level1"));
assertThat(attr.getFileName(), is("level1"));
assertThat(attr.getType(), is(FileAttributes.FileType.Directory));
assertThat(attr.getSize(), is(0L));
assertThat(attr.getLastModifiedTime(), notNullValue());
}

@Test
void move() throws Exception {
String prefix = IdUtils.create();
String tenantId = IdUtils.create();
URL resource = GcsStorageTest.class.getClassLoader().getResource("application.yml");

List<String> path = Arrays.asList(
"/" + prefix + "/storage/root.yml",
"/" + prefix + "/storage/level1/1.yml",
"/" + prefix + "/storage/level1/level2/1.yml",
"/" + prefix + "/storage/another/1.yml"
);
path.forEach(throwConsumer(s -> this.putFile(tenantId, resource, s)));

storageInterface.move(tenantId, new URI("/" + prefix + "/storage/level1"), new URI("/" + prefix + "/storage/moved"));

List<FileAttributes> list = storageInterface.list(tenantId, new URI("/" + prefix + "/storage/moved"));
assertThat(storageInterface.exists(tenantId, new URI("/" + prefix + "/storage/level1")), is(false));
assertThat(list.stream().map(FileAttributes::getFileName).toList(), containsInAnyOrder("level2", "1.yml"));

storageInterface.move(tenantId, new URI("/" + prefix + "/storage/root.yml"), new URI("/" + prefix + "/storage/root-moved.yml"));
assertThat(storageInterface.exists(tenantId, new URI("/" + prefix + "/storage/root.yml")), is(false));
assertThat(storageInterface.exists(tenantId, new URI("/" + prefix + "/storage/root-moved.yml")), is(true));
}
}

0 comments on commit 9581cf7

Please sign in to comment.