diff --git a/src/main/java/io/kestra/storage/gcs/GcsFileAttributes.java b/src/main/java/io/kestra/storage/gcs/GcsFileAttributes.java new file mode 100644 index 0000000..7cee11a --- /dev/null +++ b/src/main/java/io/kestra/storage/gcs/GcsFileAttributes.java @@ -0,0 +1,38 @@ +package io.kestra.storage.gcs; + +import com.google.cloud.storage.BlobInfo; +import io.kestra.core.storages.FileAttributes; +import lombok.Builder; +import lombok.Value; + +@Value +@Builder +public class GcsFileAttributes implements FileAttributes { + + String fileName; + BlobInfo blobInfo; + boolean isDirectory; + + @Override + public long getLastModifiedTime() { + return blobInfo.getUpdateTimeOffsetDateTime().toInstant().toEpochMilli(); + } + + @Override + public long getCreationTime() { + return blobInfo.getCreateTimeOffsetDateTime().toInstant().toEpochMilli(); + } + + @Override + public FileType getType() { + if (isDirectory || fileName.endsWith("/") || blobInfo.getContentType().equals("application/x-directory")) { + return FileType.Directory; + } + return FileType.File; + } + + @Override + public long getSize() { + return blobInfo.getSize(); + } +} diff --git a/src/main/java/io/kestra/storage/gcs/GcsStorage.java b/src/main/java/io/kestra/storage/gcs/GcsStorage.java index 2ecced6..a2cab38 100644 --- a/src/main/java/io/kestra/storage/gcs/GcsStorage.java +++ b/src/main/java/io/kestra/storage/gcs/GcsStorage.java @@ -3,9 +3,11 @@ import com.google.api.gax.paging.Page; import com.google.cloud.WriteChannel; import com.google.cloud.storage.*; +import io.kestra.core.storages.FileAttributes; import io.micronaut.core.annotation.Introspected; import io.kestra.core.storages.StorageInterface; +import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; @@ -14,14 +16,17 @@ import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; import java.nio.file.Files; +import java.nio.file.NoSuchFileException; import java.nio.file.attribute.FileTime; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.stream.Collectors; +import java.util.stream.Stream; + import jakarta.inject.Inject; import jakarta.inject.Singleton; +import org.jetbrains.annotations.NotNull; + +import static io.kestra.core.utils.Rethrow.throwFunction; @Singleton @GcsStorageEnabled @@ -38,13 +43,24 @@ private Storage client() { } private BlobId blob(String tenantId, URI uri) { + String path = getPath(tenantId, uri); + return blob(path); + } + + @NotNull + private BlobId blob(String path) { + return BlobId.of(this.config.getBucket(), path); + } + + @NotNull + private String getPath(String tenantId, URI uri) { String path; if (tenantId != null) { path = tenantId + uri.getPath(); } else { path = uri.getPath().substring(1); } - return BlobId.of(this.config.getBucket(), path); + return path; } @Override @@ -63,6 +79,22 @@ public InputStream get(String tenantId, URI uri) throws IOException { } } + @Override + public List list(String tenantId, URI uri) throws IOException { + String path = getPath(tenantId, uri); + String prefix = (path.endsWith("/")) ? path : path + "/"; + Page blobs = this.client().list(config.bucket, Storage.BlobListOption.prefix(prefix), + Storage.BlobListOption.currentDirectory()); + return blobs.streamAll() + .filter(blob -> { + String key = blob.getName().substring(prefix.length()); + // Remove recursive result and requested dir + return !key.isEmpty() && !Objects.equals(key, prefix) && new File(key).getParent() == null; + }) + .map(throwFunction(this::getGcsFileAttributes)) + .toList(); + } + @Override public boolean exists(String tenantId, URI uri) { try { @@ -103,9 +135,35 @@ public Long lastModifiedTime(String tenantId,URI uri) throws IOException { } } + @Override + public FileAttributes getAttributes(String tenantId, URI uri) throws IOException { + String path = getPath(tenantId, uri); + if (!exists(tenantId, uri)) { + path = path + "/"; + } + Blob blob = this.client().get(this.blob(path)); + if (blob == null) { + throw new FileNotFoundException("%s not found.".formatted(uri)); + } + return getGcsFileAttributes(blob); + } + + private FileAttributes getGcsFileAttributes(Blob blob) { + GcsFileAttributes.GcsFileAttributesBuilder builder = GcsFileAttributes.builder() + .fileName(new File(blob.getName()).getName()) + .blobInfo(blob.asBlobInfo()); + if (blob.getName().endsWith("/")) { + builder.isDirectory(true); + } + return builder.build(); + } + @Override public URI put(String tenantId, URI uri, InputStream data) throws IOException { try { + String path = getPath(tenantId, uri); + mkdirs(path); + BlobInfo blobInfo = BlobInfo .newBuilder(this.blob(tenantId, uri)) .build(); @@ -127,12 +185,54 @@ public URI put(String tenantId, URI uri, InputStream data) throws IOException { } } + private void mkdirs(String path) { + if (!path.endsWith("/") && path.lastIndexOf('/') > 0) { + path = path.substring(0, path.lastIndexOf('/') + 1); + } + BlobInfo blobInfo = BlobInfo + .newBuilder(this.blob(path)) + .build(); + this.client().create(blobInfo); + } + public boolean delete(String tenantId, URI uri) throws IOException { - try { - return this.client().delete(this.blob(tenantId, uri)); - } catch (StorageException e) { - throw new IOException(e); + return !deleteByPrefix(tenantId, uri).isEmpty(); + } + + @Override + public URI createDirectory(String tenantId, URI uri) throws IOException { + String path = getPath(tenantId, uri); + if (!path.endsWith("/")) { + path = path + "/"; } + mkdirs(path); + return createUri(uri.getPath()); + } + + @Override + public URI move(String tenantId, URI from, URI to) throws IOException { + String path = getPath(tenantId, from); + if (getAttributes(tenantId, from).getType() == FileAttributes.FileType.File) { + // copy just a file + BlobId source = blob(path); + BlobId target = blob(tenantId, to); + client().copy(Storage.CopyRequest.newBuilder().setSource(source).setTarget(target).build()); + client().delete(source); + return createUri(to.getPath()); + } + + // move directories + StorageBatch batch = this.client().batch(); + String prefix = (!path.endsWith("/")) ? path + "/" : path; + + Page list = client().list(config.bucket, Storage.BlobListOption.prefix(prefix), Storage.BlobListOption.currentDirectory()); + list.streamAll().forEach(blob -> { + BlobId target = blob(getPath(tenantId, to) + "/" + blob.getName().substring(prefix.length())); + client().copy(Storage.CopyRequest.newBuilder().setSource(blob.getBlobId()).setTarget(target).build()); + batch.delete(blob.getBlobId()); + }); + batch.submit(); + return createUri(to.getPath()); } @Override @@ -141,12 +241,7 @@ public List deleteByPrefix(String tenantId, URI storagePrefix) throws IOExc StorageBatch batch = this.client().batch(); Map> results = new HashMap<>(); - String prefix; - if (tenantId != null) { - prefix = tenantId + storagePrefix.getPath(); - } else { - prefix = storagePrefix.getPath().substring(1); - } + String prefix = getPath(tenantId, storagePrefix); Page blobs = this.client() .list(this.config.getBucket(), @@ -179,4 +274,7 @@ public List deleteByPrefix(String tenantId, URI storagePrefix) throws IOExc throw new IOException(e); } } + private static URI createUri(String key) { + return URI.create("kestra://%s".formatted(key)); + } } diff --git a/src/test/java/io/kestra/storage/gcs/GcsStorageTest.java b/src/test/java/io/kestra/storage/gcs/GcsStorageTest.java index bf3d58c..3404bff 100644 --- a/src/test/java/io/kestra/storage/gcs/GcsStorageTest.java +++ b/src/test/java/io/kestra/storage/gcs/GcsStorageTest.java @@ -1,16 +1,15 @@ package io.kestra.storage.gcs; import com.google.common.io.CharStreams; +import io.kestra.core.storages.FileAttributes; import io.kestra.core.utils.IdUtils; import io.micronaut.test.extensions.junit5.annotation.MicronautTest; import org.junit.jupiter.api.Test; import io.kestra.core.storages.StorageInterface; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.InputStream; -import java.io.InputStreamReader; +import java.io.*; import java.net.URI; +import java.net.URISyntaxException; import java.net.URL; import java.util.Arrays; import java.util.List; @@ -140,7 +139,16 @@ private void deleteByPrefix(String prefix, String tenantId) throws Exception { List deleted = storageInterface.deleteByPrefix(tenantId, new URI("/" + prefix + "/storage/")); - assertThat(deleted, containsInAnyOrder(path.stream().map(s -> URI.create("kestra://" + s)).toArray())); + List res = Arrays.asList( + "/" + prefix + "/storage/", + "/" + prefix + "/storage/root.yml", + "/" + prefix + "/storage/level1/", + "/" + prefix + "/storage/level1/1.yml", + "/" + prefix + "/storage/level1/level2/", + "/" + prefix + "/storage/level1/level2/1.yml" + ); + + assertThat(deleted, containsInAnyOrder(res.stream().map(s -> URI.create("kestra://" + s)).toArray())); assertThrows(FileNotFoundException.class, () -> { storageInterface.get(tenantId, new URI("/" + prefix + "/storage/")); @@ -162,4 +170,112 @@ void deleteByPrefixNoResult() throws Exception { List deleted = storageInterface.deleteByPrefix(tenantId, new URI("/" + prefix + "/storage/")); assertThat(deleted.size(), is(0)); } + + @Test + void list() throws Exception { + String prefix = IdUtils.create(); + String tenantId = IdUtils.create(); + URL resource = GcsFileAttributes.class.getClassLoader().getResource("application.yml"); + + List path = Arrays.asList( + "/" + prefix + "/storage/root.yml", + "/" + prefix + "/storage/level1/1.yml", + "/" + prefix + "/storage/level1/level2/1.yml", + "/" + prefix + "/storage/another/1.yml" + ); + path.forEach(throwConsumer(s -> this.putFile(tenantId, resource, s))); + + List list = storageInterface.list(tenantId, new URI("/" + prefix + "/storage")); + + assertThat(list.stream().map(FileAttributes::getFileName).toList(), containsInAnyOrder("root.yml", "level1", "another")); + } + + @Test + void getAttributes() throws Exception { + String prefix = IdUtils.create(); + String tenantId = IdUtils.create(); + URL resource = GcsStorageTest.class.getClassLoader().getResource("application.yml"); + String content = CharStreams.toString(new InputStreamReader(new FileInputStream(Objects.requireNonNull(resource).getFile()))); + + List path = Arrays.asList( + "/" + prefix + "/storage/root.yml", + "/" + prefix + "/storage/level1/1.yml" + ); + path.forEach(throwConsumer(s -> this.putFile(tenantId, resource, s))); + + FileAttributes attr = storageInterface.getAttributes(tenantId, new URI("/" + prefix + "/storage/root.yml")); + assertThat(attr.getFileName(), is("root.yml")); + assertThat(attr.getType(), is(FileAttributes.FileType.File)); + assertThat(attr.getSize(), is((long) content.length())); + assertThat(attr.getLastModifiedTime(), notNullValue()); + + attr = storageInterface.getAttributes(tenantId, new URI("/" + prefix + "/storage/level1")); + assertThat(attr.getFileName(), is("level1")); + assertThat(attr.getType(), is(FileAttributes.FileType.Directory)); + assertThat(attr.getSize(), is(0L)); + assertThat(attr.getLastModifiedTime(), notNullValue()); + } + + @Test + void delete() throws Exception { + String prefix = IdUtils.create(); + String tenantId = IdUtils.create(); + URL resource = GcsStorageTest.class.getClassLoader().getResource("application.yml"); + + List path = Arrays.asList( + "/" + prefix + "/storage/root.yml", + "/" + prefix + "/storage/level1/1.yml", + "/" + prefix + "/storage/level1/level2/1.yml", + "/" + prefix + "/storage/another/1.yml" + ); + path.forEach(throwConsumer(s -> this.putFile(tenantId, resource, s))); + + boolean deleted = storageInterface.delete(tenantId, new URI("/" + prefix + "/storage/level1")); + assertThat(deleted, is(true)); + assertThat(storageInterface.exists(tenantId, new URI("/" + prefix + "/storage/root.yml")), is(true)); + assertThat(storageInterface.exists(tenantId, new URI("/" + prefix + "/storage/another/1.yml")), is(true)); + assertThat(storageInterface.exists(tenantId, new URI("/" + prefix + "/storage/level1/1.yml")), is(false)); + assertThat(storageInterface.exists(tenantId, new URI("/" + prefix + "/storage/level1/level2/1.yml")), is(false)); + deleted = storageInterface.delete(tenantId, new URI("/" + prefix + "/storage/root.yml")); + assertThat(deleted, is(true)); + assertThat(storageInterface.exists(tenantId, new URI("/" + prefix + "/storage/root.yml")), is(false)); + } + + @Test + void createDirectory() throws URISyntaxException, IOException { + String prefix = IdUtils.create(); + String tenantId = IdUtils.create(); + + storageInterface.createDirectory(tenantId, new URI("/" + prefix + "/storage/level1")); + FileAttributes attr = storageInterface.getAttributes(tenantId, new URI("/" + prefix + "/storage/level1")); + assertThat(attr.getFileName(), is("level1")); + assertThat(attr.getType(), is(FileAttributes.FileType.Directory)); + assertThat(attr.getSize(), is(0L)); + assertThat(attr.getLastModifiedTime(), notNullValue()); + } + + @Test + void move() throws Exception { + String prefix = IdUtils.create(); + String tenantId = IdUtils.create(); + URL resource = GcsStorageTest.class.getClassLoader().getResource("application.yml"); + + List path = Arrays.asList( + "/" + prefix + "/storage/root.yml", + "/" + prefix + "/storage/level1/1.yml", + "/" + prefix + "/storage/level1/level2/1.yml", + "/" + prefix + "/storage/another/1.yml" + ); + path.forEach(throwConsumer(s -> this.putFile(tenantId, resource, s))); + + storageInterface.move(tenantId, new URI("/" + prefix + "/storage/level1"), new URI("/" + prefix + "/storage/moved")); + + List list = storageInterface.list(tenantId, new URI("/" + prefix + "/storage/moved")); + assertThat(storageInterface.exists(tenantId, new URI("/" + prefix + "/storage/level1")), is(false)); + assertThat(list.stream().map(FileAttributes::getFileName).toList(), containsInAnyOrder("level2", "1.yml")); + + storageInterface.move(tenantId, new URI("/" + prefix + "/storage/root.yml"), new URI("/" + prefix + "/storage/root-moved.yml")); + assertThat(storageInterface.exists(tenantId, new URI("/" + prefix + "/storage/root.yml")), is(false)); + assertThat(storageInterface.exists(tenantId, new URI("/" + prefix + "/storage/root-moved.yml")), is(true)); + } }