Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use a single Storage #85

Merged
merged 1 commit into from
Nov 21, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 14 additions & 17 deletions src/main/java/io/kestra/storage/gcs/GcsStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,11 @@
@Introspected
public class GcsStorage implements StorageInterface {
@Inject
GcsClientFactory factory;
Storage storage;

@Inject
GcsConfig config;

private Storage client() {
return factory.of(config);
}

private BlobId blob(String tenantId, URI uri) {
String path = getPath(tenantId, uri);
Expand Down Expand Up @@ -76,7 +73,7 @@ private void parentTraversalGuard(URI uri) {
@Override
public InputStream get(String tenantId, URI uri) throws IOException {
try {
Blob blob = this.client().get(this.blob(tenantId, URI.create(uri.getPath())));
Blob blob = this.storage.get(this.blob(tenantId, URI.create(uri.getPath())));

if (blob == null || !blob.exists()) {
throw new FileNotFoundException(uri + " (File not found)");
Expand All @@ -93,7 +90,7 @@ public InputStream get(String tenantId, URI uri) throws IOException {
public List<FileAttributes> list(String tenantId, URI uri) throws IOException {
String path = getPath(tenantId, uri);
String prefix = (path.endsWith("/")) ? path : path + "/";
Page<Blob> blobs = this.client().list(config.bucket, Storage.BlobListOption.prefix(prefix),
Page<Blob> blobs = this.storage.list(config.bucket, Storage.BlobListOption.prefix(prefix),
Storage.BlobListOption.currentDirectory());
List<FileAttributes> list = blobs.streamAll()
.filter(blob -> {
Expand All @@ -113,7 +110,7 @@ public List<FileAttributes> list(String tenantId, URI uri) throws IOException {
@Override
public boolean exists(String tenantId, URI uri) {
try {
Blob blob = this.client().get(this.blob(tenantId, URI.create(uri.getPath())));
Blob blob = this.storage.get(this.blob(tenantId, URI.create(uri.getPath())));
return blob != null && blob.exists();
} catch (StorageException e) {
return false;
Expand All @@ -123,7 +120,7 @@ public boolean exists(String tenantId, URI uri) {
@Override
public Long size(String tenantId,URI uri) throws IOException {
try {
Blob blob = this.client().get(this.blob(tenantId, URI.create(uri.getPath())));
Blob blob = this.storage.get(this.blob(tenantId, URI.create(uri.getPath())));

if (blob == null || !blob.exists()) {
throw new FileNotFoundException(uri + " (File not found)");
Expand All @@ -138,7 +135,7 @@ public Long size(String tenantId,URI uri) throws IOException {
@Override
public Long lastModifiedTime(String tenantId,URI uri) throws IOException {
try {
Blob blob = this.client().get(this.blob(tenantId, URI.create(uri.getPath())));
Blob blob = this.storage.get(this.blob(tenantId, URI.create(uri.getPath())));

if (blob == null || !blob.exists()) {
throw new FileNotFoundException(uri + " (File not found)");
Expand All @@ -156,7 +153,7 @@ public FileAttributes getAttributes(String tenantId, URI uri) throws IOException
if (!exists(tenantId, uri)) {
path = path + "/";
}
Blob blob = this.client().get(this.blob(path));
Blob blob = this.storage.get(this.blob(path));
if (blob == null) {
throw new FileNotFoundException("%s not found.".formatted(uri));
}
Expand All @@ -183,7 +180,7 @@ public URI put(String tenantId, URI uri, InputStream data) throws IOException {
.newBuilder(this.blob(tenantId, uri))
.build();

try (WriteChannel writer = this.client().writer(blobInfo)) {
try (WriteChannel writer = this.storage.writer(blobInfo)) {
byte[] buffer = new byte[10_240];

int limit;
Expand All @@ -210,7 +207,7 @@ private void mkdirs(String path) {
BlobInfo blobInfo = BlobInfo
.newBuilder(this.blob(aggregatedPath.toString()))
.build();
this.client().create(blobInfo);
this.storage.create(blobInfo);
}
}

Expand All @@ -231,7 +228,7 @@ public URI createDirectory(String tenantId, URI uri) {
@Override
public URI move(String tenantId, URI from, URI to) throws IOException {
String path = getPath(tenantId, from);
StorageBatch batch = this.client().batch();
StorageBatch batch = this.storage.batch();

if (getAttributes(tenantId, from).getType() == FileAttributes.FileType.File) {
// move just a file
Expand All @@ -242,7 +239,7 @@ public URI move(String tenantId, URI from, URI to) throws IOException {
// move directories
String prefix = (!path.endsWith("/")) ? path + "/" : path;

Page<Blob> list = client().list(config.bucket, Storage.BlobListOption.prefix(prefix));
Page<Blob> list = this.storage.list(config.bucket, Storage.BlobListOption.prefix(prefix));
list.streamAll().forEach(blob -> {
BlobId target = blob(getPath(tenantId, to) + "/" + blob.getName().substring(prefix.length()));
moveFile(blob.getBlobId(), target, batch);
Expand All @@ -253,19 +250,19 @@ public URI move(String tenantId, URI from, URI to) throws IOException {
}

private void moveFile(BlobId source, BlobId target, StorageBatch batch) {
client().copy(Storage.CopyRequest.newBuilder().setSource(source).setTarget(target).build());
this.storage.copy(Storage.CopyRequest.newBuilder().setSource(source).setTarget(target).build());
batch.delete(source);
}

@Override
public List<URI> deleteByPrefix(String tenantId, URI storagePrefix) throws IOException {
try {
StorageBatch batch = this.client().batch();
StorageBatch batch = this.storage.batch();
Map<URI, StorageBatchResult<Boolean>> results = new HashMap<>();

String prefix = getPath(tenantId, storagePrefix);

Page<Blob> blobs = this.client()
Page<Blob> blobs = this.storage
.list(this.config.getBucket(),
Storage.BlobListOption.prefix(prefix)
);
Expand Down