Skip to content

Commit

Permalink
Refactor S3 hadoopfs tests
Browse files Browse the repository at this point in the history
Allow using many utilities in other tests that run a MinIO instance.
  • Loading branch information
arielshaqed committed Nov 21, 2024
1 parent e12411f commit f9caad9
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,9 @@
import io.lakefs.utils.ObjectLocation;

import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.Path;

import com.amazonaws.HttpMethod;
import com.amazonaws.services.s3.model.*;

import org.junit.Assert;
Expand All @@ -29,21 +27,13 @@
import static org.mockserver.model.JsonBody.json;

import java.io.*;
import java.net.URL;
import java.util.Date;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;

@RunWith(Parameterized.class)
public class LakeFSFileSystemServerS3Test extends S3FSTestBase {
static private final Logger LOG = LoggerFactory.getLogger(LakeFSFileSystemServerS3Test.class);

public static interface PhysicalAddressCreator {
default void initConfiguration(Configuration conf) {}
String createGetPhysicalAddress(S3FSTestBase o, String key);
StagingLocation createPutStagingLocation(S3FSTestBase o, String namespace, String repo, String branch, String path);
}

@Parameters(name="{1}")
public static Iterable<Object[]> data() {
return Arrays.asList(new Object[][]{
Expand All @@ -57,69 +47,12 @@ public static Iterable<Object[]> data() {
@Parameter(0)
public PhysicalAddressCreator pac;

static private class SimplePhysicalAddressCreator implements PhysicalAddressCreator {
public String createGetPhysicalAddress(S3FSTestBase o, String key) {
return o.s3Url(key);
}

public StagingLocation createPutStagingLocation(S3FSTestBase o, String namespace, String repo, String branch, String path) {
String fullPath = String.format("%s/%s/%s/%s/%s-object",
o.sessionId(), namespace, repo, branch, path);
return new StagingLocation().physicalAddress(o.s3Url(fullPath));
}
}

static private class PresignedPhysicalAddressCreator implements PhysicalAddressCreator {
public void initConfiguration(Configuration conf) {
conf.set("fs.lakefs.access.mode", "presigned");
}

protected Date getExpiration() {
return new Date(System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1));
}

public String createGetPhysicalAddress(S3FSTestBase o, String key) {
Date expiration = getExpiration();
URL presignedUrl =
o.s3Client.generatePresignedUrl(new GeneratePresignedUrlRequest(o.s3Bucket, key)
.withMethod(HttpMethod.GET)
.withExpiration(expiration));
return presignedUrl.toString();
}

public StagingLocation createPutStagingLocation(S3FSTestBase o, String namespace, String repo, String branch, String path) {
String fullPath = String.format("%s/%s/%s/%s/%s-object",
o.sessionId(), namespace, repo, branch, path);
Date expiration = getExpiration();
URL presignedUrl =
o.s3Client.generatePresignedUrl(new GeneratePresignedUrlRequest(o.s3Bucket, fullPath)
.withMethod(HttpMethod.PUT)
.withExpiration(expiration));
return new StagingLocation()
.physicalAddress(o.s3Url(fullPath))
.presignedUrl(presignedUrl.toString());
}
}

@Override
protected void moreHadoopSetup() {
super.moreHadoopSetup();
pac.initConfiguration(conf);
}

// Return a location under namespace for this getPhysicalAddress call.
protected StagingLocation mockGetPhysicalAddress(String repo, String branch, String path, String namespace) {
StagingLocation stagingLocation =
pac.createPutStagingLocation(this, namespace, repo, branch, path);
mockServerClient.when(request()
.withMethod("GET")
.withPath(String.format("/repositories/%s/branches/%s/staging/backing", repo, branch))
.withQueryStringParameter("path", path))
.respond(response().withStatusCode(200)
.withBody(gson.toJson(stagingLocation)));
return stagingLocation;
}

@Test
public void testCreate() throws IOException {
String contents = "The quick brown fox jumps over the lazy dog.";
Expand Down
72 changes: 72 additions & 0 deletions clients/hadoopfs/src/test/java/io/lakefs/S3FSTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.HttpMethod;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.s3.AmazonS3;
Expand All @@ -14,9 +15,12 @@
import com.google.common.collect.Lists;

import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;

import io.lakefs.clients.sdk.model.*;

import static org.mockserver.model.HttpResponse.response;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
Expand All @@ -27,6 +31,9 @@
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.utility.DockerImageName;

import java.net.URL;
import java.util.concurrent.TimeUnit;
import java.util.Date;
import java.util.List;

/**
Expand All @@ -37,6 +44,8 @@
public abstract class S3FSTestBase extends FSTestBase {
static private final Logger LOG = LoggerFactory.getLogger(S3FSTestBase.class);

protected PhysicalAddressCreator pac = new SimplePhysicalAddressCreator();

protected String s3Endpoint;
protected AmazonS3 s3Client;

Expand Down Expand Up @@ -126,4 +135,67 @@ protected void moreHadoopSetup() {
conf.set(org.apache.hadoop.fs.s3a.Constants.ENDPOINT, s3Endpoint);
conf.set(org.apache.hadoop.fs.s3a.Constants.BUFFER_DIR, "/tmp/s3a");
}

public static interface PhysicalAddressCreator {
default void initConfiguration(Configuration conf) {}
String createGetPhysicalAddress(S3FSTestBase o, String key);
StagingLocation createPutStagingLocation(S3FSTestBase o, String namespace, String repo, String branch, String path);
}

protected static class SimplePhysicalAddressCreator implements PhysicalAddressCreator {
public String createGetPhysicalAddress(S3FSTestBase o, String key) {
return o.s3Url(key);
}

public StagingLocation createPutStagingLocation(S3FSTestBase o, String namespace, String repo, String branch, String path) {
String fullPath = String.format("%s/%s/%s/%s/%s-object",
o.sessionId(), namespace, repo, branch, path);
return new StagingLocation().physicalAddress(o.s3Url(fullPath));
}
}

protected static class PresignedPhysicalAddressCreator implements PhysicalAddressCreator {
public void initConfiguration(Configuration conf) {
conf.set("fs.lakefs.access.mode", "presigned");
}

protected Date getExpiration() {
return new Date(System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1));
}

public String createGetPhysicalAddress(S3FSTestBase o, String key) {
Date expiration = getExpiration();
URL presignedUrl =
o.s3Client.generatePresignedUrl(new GeneratePresignedUrlRequest(o.s3Bucket, key)
.withMethod(HttpMethod.GET)
.withExpiration(expiration));
return presignedUrl.toString();
}

public StagingLocation createPutStagingLocation(S3FSTestBase o, String namespace, String repo, String branch, String path) {
String fullPath = String.format("%s/%s/%s/%s/%s-object",
o.sessionId(), namespace, repo, branch, path);
Date expiration = getExpiration();
URL presignedUrl =
o.s3Client.generatePresignedUrl(new GeneratePresignedUrlRequest(o.s3Bucket, fullPath)
.withMethod(HttpMethod.PUT)
.withExpiration(expiration));
return new StagingLocation()
.physicalAddress(o.s3Url(fullPath))
.presignedUrl(presignedUrl.toString());
}
}

// Return a location under namespace for this getPhysicalAddress call.
protected StagingLocation mockGetPhysicalAddress(String repo, String branch, String path, String namespace) {
StagingLocation stagingLocation =
pac.createPutStagingLocation(this, namespace, repo, branch, path);
mockServerClient.when(request()
.withMethod("GET")
.withPath(String.format("/repositories/%s/branches/%s/staging/backing", repo, branch))
.withQueryStringParameter("path", path))
.respond(response().withStatusCode(200)
.withBody(gson.toJson(stagingLocation)));
return stagingLocation;
}
}

0 comments on commit f9caad9

Please sign in to comment.