Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: multi file editor #81

Merged
merged 1 commit into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions src/main/java/io/kestra/storage/gcs/GcsFileAttributes.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package io.kestra.storage.gcs;

import com.google.cloud.storage.BlobInfo;
import io.kestra.core.storages.FileAttributes;
import lombok.Builder;
import lombok.Value;

@Value
@Builder
public class GcsFileAttributes implements FileAttributes {

String fileName;
BlobInfo blobInfo;
boolean isDirectory;

@Override
public long getLastModifiedTime() {
return blobInfo.getUpdateTimeOffsetDateTime().toInstant().toEpochMilli();
}

@Override
public long getCreationTime() {
return blobInfo.getCreateTimeOffsetDateTime().toInstant().toEpochMilli();
}

@Override
public FileType getType() {
if (isDirectory || fileName.endsWith("/") || blobInfo.getContentType().equals("application/x-directory")) {
return FileType.Directory;
}
return FileType.File;
}

@Override
public long getSize() {
return blobInfo.getSize();
}
}
131 changes: 114 additions & 17 deletions src/main/java/io/kestra/storage/gcs/GcsStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,26 @@
import com.google.api.gax.paging.Page;
import com.google.cloud.WriteChannel;
import com.google.cloud.storage.*;
import io.kestra.core.storages.FileAttributes;
import io.micronaut.core.annotation.Introspected;
import io.kestra.core.storages.StorageInterface;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.file.Files;
import java.nio.file.attribute.FileTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.stream.Collectors;

import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.jetbrains.annotations.NotNull;

import static io.kestra.core.utils.Rethrow.throwFunction;

@Singleton
@GcsStorageEnabled
Expand All @@ -38,13 +39,24 @@ private Storage client() {
}

private BlobId blob(String tenantId, URI uri) {
String path = getPath(tenantId, uri);
return blob(path);
}

@NotNull
private BlobId blob(String path) {
return BlobId.of(this.config.getBucket(), path);
}

@NotNull
private String getPath(String tenantId, URI uri) {
moadibfr marked this conversation as resolved.
Show resolved Hide resolved
String path;
if (tenantId != null) {
path = tenantId + uri.getPath();
} else {
path = uri.getPath().substring(1);
}
return BlobId.of(this.config.getBucket(), path);
return path;
}

@Override
Expand All @@ -63,6 +75,22 @@ public InputStream get(String tenantId, URI uri) throws IOException {
}
}

@Override
public List<FileAttributes> list(String tenantId, URI uri) throws IOException {
String path = getPath(tenantId, uri);
String prefix = (path.endsWith("/")) ? path : path + "/";
Page<Blob> blobs = this.client().list(config.bucket, Storage.BlobListOption.prefix(prefix),
Storage.BlobListOption.currentDirectory());
return blobs.streamAll()
.filter(blob -> {
String key = blob.getName().substring(prefix.length());
// Remove recursive result and requested dir
return !key.isEmpty() && !Objects.equals(key, prefix) && new File(key).getParent() == null;
})
.map(throwFunction(this::getGcsFileAttributes))
.toList();
}

@Override
public boolean exists(String tenantId, URI uri) {
try {
Expand Down Expand Up @@ -103,9 +131,35 @@ public Long lastModifiedTime(String tenantId,URI uri) throws IOException {
}
}

@Override
public FileAttributes getAttributes(String tenantId, URI uri) throws IOException {
String path = getPath(tenantId, uri);
if (!exists(tenantId, uri)) {
path = path + "/";
}
Blob blob = this.client().get(this.blob(path));
if (blob == null) {
throw new FileNotFoundException("%s not found.".formatted(uri));
}
return getGcsFileAttributes(blob);
}

private FileAttributes getGcsFileAttributes(Blob blob) {
GcsFileAttributes.GcsFileAttributesBuilder builder = GcsFileAttributes.builder()
.fileName(new File(blob.getName()).getName())
.blobInfo(blob.asBlobInfo());
if (blob.getName().endsWith("/")) {
builder.isDirectory(true);
}
return builder.build();
}

@Override
public URI put(String tenantId, URI uri, InputStream data) throws IOException {
try {
String path = getPath(tenantId, uri);
mkdirs(path);

BlobInfo blobInfo = BlobInfo
.newBuilder(this.blob(tenantId, uri))
.build();
Expand All @@ -127,12 +181,57 @@ public URI put(String tenantId, URI uri, InputStream data) throws IOException {
}
}

private void mkdirs(String path) {
if (!path.endsWith("/") && path.lastIndexOf('/') > 0) {
path = path.substring(0, path.lastIndexOf('/') + 1);
}
BlobInfo blobInfo = BlobInfo
.newBuilder(this.blob(path))
.build();
this.client().create(blobInfo);
}

public boolean delete(String tenantId, URI uri) throws IOException {
try {
return this.client().delete(this.blob(tenantId, uri));
} catch (StorageException e) {
throw new IOException(e);
return !deleteByPrefix(tenantId, uri).isEmpty();
}

@Override
public URI createDirectory(String tenantId, URI uri) throws IOException {
String path = getPath(tenantId, uri);
if (!path.endsWith("/")) {
path = path + "/";
}
mkdirs(path);
return createUri(uri.getPath());
}

@Override
public URI move(String tenantId, URI from, URI to) throws IOException {
String path = getPath(tenantId, from);
StorageBatch batch = this.client().batch();

if (getAttributes(tenantId, from).getType() == FileAttributes.FileType.File) {
// move just a file
BlobId source = blob(path);
BlobId target = blob(tenantId, to);
moveFile(source, target, batch);
} else {
// move directories
String prefix = (!path.endsWith("/")) ? path + "/" : path;

Page<Blob> list = client().list(config.bucket, Storage.BlobListOption.prefix(prefix));
list.streamAll().forEach(blob -> {
BlobId target = blob(getPath(tenantId, to) + "/" + blob.getName().substring(prefix.length()));
moveFile(blob.getBlobId(), target, batch);
});
}
batch.submit();
return createUri(to.getPath());
}

private void moveFile(BlobId source, BlobId target, StorageBatch batch) {
client().copy(Storage.CopyRequest.newBuilder().setSource(source).setTarget(target).build());
batch.delete(source);
}

@Override
Expand All @@ -141,12 +240,7 @@ public List<URI> deleteByPrefix(String tenantId, URI storagePrefix) throws IOExc
StorageBatch batch = this.client().batch();
Map<URI, StorageBatchResult<Boolean>> results = new HashMap<>();

String prefix;
if (tenantId != null) {
prefix = tenantId + storagePrefix.getPath();
} else {
prefix = storagePrefix.getPath().substring(1);
}
String prefix = getPath(tenantId, storagePrefix);

Page<Blob> blobs = this.client()
.list(this.config.getBucket(),
Expand Down Expand Up @@ -179,4 +273,7 @@ public List<URI> deleteByPrefix(String tenantId, URI storagePrefix) throws IOExc
throw new IOException(e);
}
}
private static URI createUri(String key) {
return URI.create("kestra://%s".formatted(key));
}
}
Loading