diff --git a/clients/hadoopfs/pom.xml b/clients/hadoopfs/pom.xml index a6efb38020e..ffa85044152 100644 --- a/clients/hadoopfs/pom.xml +++ b/clients/hadoopfs/pom.xml @@ -93,7 +93,7 @@ To export to S3: org.apache.maven.plugins maven-surefire-plugin - 2.12.4 + 3.1.2 presigned @@ -231,7 +231,7 @@ To export to S3: org.apache.maven.plugins maven-surefire-plugin - 2.12.4 + 3.1.2 ${exclude.tests} @@ -368,11 +368,24 @@ To export to S3: - org.mockito - - mockito-inline - 3.10.0 - test + org.mock-server + mockserver-junit-rule-no-dependencies + 5.14.0 + test + + + + org.immutables + value + 2.9.3 + test + + + + com.google.guava + guava + 32.1.2-jre + test diff --git a/clients/hadoopfs/src/main/java/io/lakefs/Constants.java b/clients/hadoopfs/src/main/java/io/lakefs/Constants.java index fe088347f74..f8ce1f5cb96 100644 --- a/clients/hadoopfs/src/main/java/io/lakefs/Constants.java +++ b/clients/hadoopfs/src/main/java/io/lakefs/Constants.java @@ -10,6 +10,7 @@ public class Constants { public static final String ENDPOINT_KEY_SUFFIX = "endpoint"; public static final String LIST_AMOUNT_KEY_SUFFIX = "list.amount"; public static final String ACCESS_MODE_KEY_SUFFIX = "access.mode"; + public static final String SESSION_ID = "session_id"; public static enum AccessMode { SIMPLE, diff --git a/clients/hadoopfs/src/main/java/io/lakefs/LakeFSClient.java b/clients/hadoopfs/src/main/java/io/lakefs/LakeFSClient.java index 15579946f5f..9fedf481881 100644 --- a/clients/hadoopfs/src/main/java/io/lakefs/LakeFSClient.java +++ b/clients/hadoopfs/src/main/java/io/lakefs/LakeFSClient.java @@ -41,6 +41,11 @@ public LakeFSClient(String scheme, Configuration conf) throws IOException { basicAuth.setUsername(accessKey); basicAuth.setPassword(secretKey); + String sessionId = FSConfiguration.get(conf, scheme, Constants.SESSION_ID); + if (sessionId != null) { + apiClient.addDefaultCookie("sessionId", sessionId); + } + this.objectsApi = new ObjectsApi(apiClient); this.stagingApi = new StagingApi(apiClient); this.repositoriesApi = new RepositoriesApi(apiClient); diff --git a/clients/hadoopfs/src/main/java/io/lakefs/LakeFSFileSystem.java b/clients/hadoopfs/src/main/java/io/lakefs/LakeFSFileSystem.java index 0fc5fd02ac5..5f257e30dc9 100644 --- a/clients/hadoopfs/src/main/java/io/lakefs/LakeFSFileSystem.java +++ b/clients/hadoopfs/src/main/java/io/lakefs/LakeFSFileSystem.java @@ -421,6 +421,7 @@ private boolean renameFile(LakeFSFileStatus srcStatus, Path dst) throws IOExcept LOG.debug("renameFile: dst {} exists and is a {}", dst, dstFileStatus.isDirectory() ? "directory" : "file"); if (dstFileStatus.isDirectory()) { dst = buildObjPathOnExistingDestinationDir(srcStatus.getPath(), dst); + LOG.debug("renameFile: use {} to create dst {}", srcStatus.getPath(), dst); } } catch (FileNotFoundException e) { LOG.debug("renameFile: dst does not exist, renaming src {} to a file called dst {}", diff --git a/clients/hadoopfs/src/test/java/io/lakefs/FSTestBase.java b/clients/hadoopfs/src/test/java/io/lakefs/FSTestBase.java new file mode 100644 index 00000000000..7ddc766bfd3 --- /dev/null +++ b/clients/hadoopfs/src/test/java/io/lakefs/FSTestBase.java @@ -0,0 +1,333 @@ +package io.lakefs; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.S3ClientOptions; +import com.amazonaws.services.s3.model.*; +import com.aventrix.jnanoid.jnanoid.NanoIdUtils; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; +import com.google.gson.FieldNamingPolicy; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + +import io.lakefs.clients.api.ApiException; +import io.lakefs.clients.api.model.*; +import io.lakefs.clients.api.model.ObjectStats.PathTypeEnum; +import io.lakefs.utils.ObjectLocation; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.http.HttpStatus; + +import org.immutables.value.Value; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +import org.mockserver.client.MockServerClient; +import org.mockserver.junit.MockServerRule; +import org.mockserver.matchers.MatchType; +import org.mockserver.matchers.TimeToLive; +import org.mockserver.matchers.Times; +import org.mockserver.model.Cookie; +import org.mockserver.model.HttpRequest; +import org.mockserver.model.HttpResponse; +import org.mockserver.model.Parameter; + +import static org.apache.commons.lang3.StringUtils.removeStart; + +import static org.mockserver.model.HttpResponse.response; +import static org.mockserver.model.JsonBody.json; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Arrays; +import java.util.List; + +/** + * Base for all LakeFSFilesystem tests. Helps set common components up but + * contains no tests of its own. + * + * See e.g. "Base Test Class Testing Pattern: Why and How to use", + * https://eliasnogueira.com/base-test-class-testing-pattern-why-and-how-to-use/ + */ +abstract class FSTestBase { + static protected final Long UNUSED_FILE_SIZE = 1L; + static protected final Long UNUSED_MTIME = 0L; + static protected final String UNUSED_CHECKSUM = "unused"; + + static protected final Long STATUS_FILE_SIZE = 2L; + static protected final Long STATUS_MTIME = 123456789L; + static protected final String STATUS_CHECKSUM = "status"; + + protected Configuration conf; + protected final LakeFSFileSystem fs = new LakeFSFileSystem(); + + protected String s3Base; + protected String s3Bucket; + + protected static final String S3_ACCESS_KEY_ID = "AKIArootkey"; + protected static final String S3_SECRET_ACCESS_KEY = "secret/minio/key="; + + protected static final ApiException noSuchFile = new ApiException(HttpStatus.SC_NOT_FOUND, "no such file"); + + protected final Gson gson = new GsonBuilder() + .setFieldNamingPolicy(FieldNamingPolicy.LOWER_CASE_WITH_UNDERSCORES) + .create(); + + @Value.Immutable static public interface Pagination { + @Value.Parameter Optional amount(); + @Value.Parameter Optional after(); + @Value.Parameter Optional prefix(); + } + + @Rule + public MockServerRule mockServerRule = new MockServerRule(this); + protected MockServerClient mockServerClient; + + @Rule + public TestName name = new TestName(); + + protected String sessionId() { + return name.getMethodName(); + } + + protected HttpRequest request() { + return HttpRequest.request().withCookie(new Cookie("sessionId", sessionId())); + } + + protected static String makeS3BucketName() { + String slug = NanoIdUtils.randomNanoId(NanoIdUtils.DEFAULT_NUMBER_GENERATOR, + "abcdefghijklmnopqrstuvwxyz-0123456789".toCharArray(), 14); + return String.format("bucket-%s-x", slug); + } + + /** @return "s3://..." URL to use for s3Path (which does not start with a slash) on bucket */ + protected String s3Url(String s3Path) { + return s3Base + s3Path; + } + + protected String getS3Key(StagingLocation stagingLocation) { + return removeStart(stagingLocation.getPhysicalAddress(), s3Base); + } + + /** + * Override to add to Hadoop configuration. + */ + protected void addHadoopConfiguration(Configuration conf) { + } + + @Before + public void hadoopSetup() throws IOException, URISyntaxException { + conf = new Configuration(false); + + addHadoopConfiguration(conf); + + conf.set("fs.lakefs.impl", "io.lakefs.LakeFSFileSystem"); + + conf.set("fs.lakefs.access.key", "unused-but-checked"); + conf.set("fs.lakefs.secret.key", "unused-but-checked"); + conf.set("fs.lakefs.endpoint", String.format("http://localhost:%d/", mockServerClient.getPort())); + conf.set("fs.lakefs.session_id", sessionId()); + + System.setProperty("hadoop.home.dir", "/"); + + // lakeFSFS initialization requires a blockstore. + mockServerClient.when(request() + .withMethod("GET") + .withPath("/config/storage"), + Times.once()) + .respond(response() + .withStatusCode(200) + .withBody(gson.toJson(new StorageConfig() + .blockstoreType("s3") + .blockstoreNamespaceValidityRegex(".*") + // TODO(ariels): Change for presigned? + .preSignSupport(false)))); + + // Always allow repo "repo" to be found, it's used in all tests. + mockServerClient.when(request() + .withMethod("GET") + .withPath("/repositories/repo")) + .respond(response().withStatusCode(200) + .withBody(gson.toJson(new Repository().id("repo") + .creationDate(1234L) + .defaultBranch("main") + // Not really needed, just put something that works. + .storageNamespace("s3a://FIX/ME?")))); + + // Don't return 404s for unknown paths - they will be emitted for + // many bad requests or mocks, and make our life difficult. Instead + // fail using a unique error code. This has very low priority. + mockServerClient.when(request(), Times.unlimited(), TimeToLive.unlimited(), -10000) + .respond(response().withStatusCode(418)); + // TODO(ariels): No tests mock "get underlying filesystem", so this + // also catches its "get repo" call. Nothing bad happens, but + // this response does show up in logs. + + moreHadoopSetup(); + + fs.initialize(new URI("lakefs://repo/main/file.txt"), conf); + } + + protected void moreHadoopSetup() {} + + // Mock this statObject call not to be found + protected void mockStatObjectNotFound(String repo, String ref, String path) { + mockServerClient.when(request() + .withMethod("GET") + .withPath(String.format("/repositories/%s/refs/%s/objects/stat", repo, ref)) + .withQueryStringParameter("path", path)) + .respond(response().withStatusCode(404) + .withBody(String.format("{message: \"%s/%s/%s not found\"}", + repo, ref, path, sessionId()))); + } + + protected void mockStatObject(String repo, String ref, String path, ObjectStats stats) { + mockServerClient.when(request() + .withMethod("GET") + .withPath(String.format("/repositories/%s/refs/%s/objects/stat", repo, ref)) + .withQueryStringParameter("path", path)) + .respond(response().withStatusCode(200) + .withBody(gson.toJson(stats))); + } + + // Mock this lakeFSFS path not to exist. You may still need to + // mockListing for the directory that will not contain this path. + protected void mockFileDoesNotExist(String repo, String ref, String path) { + mockStatObjectNotFound(repo, ref, path); + mockStatObjectNotFound(repo, ref, path + Constants.SEPARATOR); + } + + protected void mockFilesInDir(String repo, String main, String dir, String... files) { + ObjectStats[] allStats; + if (files.length == 0) { + // Fake a directory marker + Path dirPath = new Path(String.format("lakefs://%s/%s/%s", repo, main, dir)); + ObjectLocation dirLoc = ObjectLocation.pathToObjectLocation(dirPath); + ObjectStats dirStats = mockDirectoryMarker(dirLoc); + allStats = new ObjectStats[1]; + allStats[0] = dirStats; + } else { + mockStatObjectNotFound(repo, main, dir); + mockStatObjectNotFound(repo, main, dir + Constants.SEPARATOR); + + allStats = new ObjectStats[files.length]; + for (int i = 0; i < files.length; i++) { + allStats[i] = new ObjectStats() + .pathType(PathTypeEnum.OBJECT) + .path(dir + Constants.SEPARATOR + files[i]); + } + } + + // Directory can be listed! + mockListing("repo", "main", + ImmutablePagination.builder().prefix(dir + Constants.SEPARATOR).build(), + allStats); + } + + protected void mockUploadObject(String repo, String branch, String path) { + StagingLocation stagingLocation = new StagingLocation() + .token("token:foo:" + sessionId()) + .physicalAddress(s3Url(String.format("repo-base/dir-marker/%s/%s/%s/%s", + sessionId(), repo, branch, path))); + mockServerClient.when(request() + .withMethod("POST") + .withPath(String.format("/repositories/%s/branches/%s/objects", repo, branch)) + .withQueryStringParameter("path", path)) + .respond(response().withStatusCode(200) + .withBody(gson.toJson(stagingLocation))); + } + + protected void mockGetBranch(String repo, String branch) { + mockServerClient.when(request() + .withMethod("GET") + .withPath(String.format("/repositories/%s/branches/%s", repo, branch))) + .respond(response().withStatusCode(200) + .withBody(gson.toJson(new Ref().id("123").commitId("456")))); + } + + protected void mockDeleteObject(String repo, String branch, String path) { + mockServerClient.when(request() + .withMethod("DELETE") + .withPath(String.format("/repositories/%s/branches/%s/objects", repo, branch)) + .withQueryStringParameter("path", path)) + .respond(response().withStatusCode(204)); + } + + protected void mockDeleteObjectNotFound(String repo, String branch, String path) { + mockServerClient.when(request() + .withMethod("DELETE") + .withPath(String.format("/repositories/%s/branches/%s/objects", repo, branch)) + .withQueryStringParameter("path", path)) + .respond(response().withStatusCode(404)); + } + + // Mocks a single deleteObjects call to succeed, returning list of failures. + protected void mockDeleteObjects(String repo, String branch, String path, ObjectError... errors) { + PathList pathList = new PathList().addPathsItem(path); + mockDeleteObjects(repo, branch, pathList, errors); + } + + // Mocks a single deleteObjects call to succeed, returning list of failures. + protected void mockDeleteObjects(String repo, String branch, PathList pathList, ObjectError... errors) { + mockServerClient.when(request() + .withMethod("POST") + .withPath(String.format("/repositories/%s/branches/%s/objects/delete", repo, branch)) + .withBody(gson.toJson(pathList)), + Times.once()) + .respond(response().withStatusCode(200) + .withBody(gson.toJson(new ObjectErrorList() + .errors(Arrays.asList(errors))))); + } + + protected ObjectStats mockDirectoryMarker(ObjectLocation objectLoc) { + // Mock parent directory to show the directory marker exists. + ObjectStats markerStats = new ObjectStats() + .path(objectLoc.getPath()) + .pathType(PathTypeEnum.OBJECT); + mockServerClient.when(request() + .withMethod("GET") + .withPath(String.format("/repositories/%s/refs/%s/objects/stat", objectLoc.getRepository(), objectLoc.getRef())) + .withQueryStringParameter("path", objectLoc.getPath())) + .respond(response().withStatusCode(200) + .withBody(gson.toJson(markerStats))); + return markerStats; + } + + // Mock this listing and return these stats. + protected void mockListing(String repo, String ref, ImmutablePagination pagination, ObjectStats... stats) { + mockListingWithHasMore(repo, ref, pagination, false, stats); + } + + protected void mockListingWithHasMore(String repo, String ref, ImmutablePagination pagination, boolean hasMore, ObjectStats... stats) { + HttpRequest req = request() + .withMethod("GET") + .withPath(String.format("/repositories/%s/refs/%s/objects/ls", repo, ref)); + // Validate elements of pagination only if present. + if (pagination.after().isPresent()) { + req = req.withQueryStringParameter("after", pagination.after().or("")); + } + if (pagination.amount().isPresent()) { + req = req.withQueryStringParameter("amount", pagination.amount().get().toString()); + } + if (pagination.prefix().isPresent()) { + req = req.withQueryStringParameter("prefix", pagination.prefix().or("")); + } + mockServerClient.when(req) + .respond(response() + .withStatusCode(200) + .withBody(gson.toJson(ImmutableMap.of("results", Arrays.asList(stats), + "pagination", + new io.lakefs.clients.api.model.Pagination().hasMore(hasMore))))); + } +} diff --git a/clients/hadoopfs/src/test/java/io/lakefs/LakeFSFileSystemPresignedModeTest.java b/clients/hadoopfs/src/test/java/io/lakefs/LakeFSFileSystemPresignedModeTest.java deleted file mode 100644 index e0eb9437f4b..00000000000 --- a/clients/hadoopfs/src/test/java/io/lakefs/LakeFSFileSystemPresignedModeTest.java +++ /dev/null @@ -1,44 +0,0 @@ -package io.lakefs; - -import static org.mockito.Mockito.when; -import java.net.URL; -import java.util.Date; -import java.util.concurrent.TimeUnit; -import com.amazonaws.HttpMethod; -import com.amazonaws.services.s3.model.GeneratePresignedUrlRequest; -import io.lakefs.clients.api.ApiException; -import io.lakefs.clients.api.model.ObjectStats; -import io.lakefs.clients.api.model.ObjectStats.PathTypeEnum; -import io.lakefs.clients.api.model.StagingLocation; - -public class LakeFSFileSystemPresignedModeTest extends LakeFSFileSystemTest { - - void initConfiguration() { - conf.set("fs.lakefs.access.mode", "presigned"); - } - - StagingLocation mockGetPhysicalAddress(String repo, String branch, String key, - String physicalKey) throws ApiException { - URL url = - s3Client.generatePresignedUrl(new GeneratePresignedUrlRequest(s3Bucket, physicalKey) - .withMethod(HttpMethod.PUT).withExpiration( - new Date(System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1)))); - StagingLocation stagingLocation = new StagingLocation().token("foo") - .physicalAddress(s3Url("/" + physicalKey)).presignedUrl(url.toString()); - when(stagingApi.getPhysicalAddress(repo, branch, key, true)).thenReturn(stagingLocation); - return stagingLocation; - } - - @Override - void mockStatObject(String repo, String branch, String key, String physicalKey, Long sizeBytes) - throws ApiException { - URL url = - s3Client.generatePresignedUrl(new GeneratePresignedUrlRequest(s3Bucket, physicalKey) - .withMethod(HttpMethod.GET).withExpiration( - new Date(System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1)))); - when(objectsApi.statObject(repo, branch, key, false, true)) - .thenReturn(new ObjectStats().path("lakefs://" + repo + "/" + branch + "/" + key) - .pathType(PathTypeEnum.OBJECT).physicalAddress(url.toString()) - .checksum(UNUSED_CHECKSUM).mtime(UNUSED_MTIME).sizeBytes((long) sizeBytes)); - } -} diff --git a/clients/hadoopfs/src/test/java/io/lakefs/LakeFSFileSystemServerS3Test.java b/clients/hadoopfs/src/test/java/io/lakefs/LakeFSFileSystemServerS3Test.java new file mode 100644 index 00000000000..665629b4dec --- /dev/null +++ b/clients/hadoopfs/src/test/java/io/lakefs/LakeFSFileSystemServerS3Test.java @@ -0,0 +1,312 @@ +package io.lakefs; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.lakefs.clients.api.model.*; +import io.lakefs.clients.api.model.ObjectStats.PathTypeEnum; +import io.lakefs.clients.api.ApiException; +import io.lakefs.utils.ObjectLocation; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.Path; + +import com.amazonaws.HttpMethod; +import com.amazonaws.services.s3.model.*; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters;import org.hamcrest.core.StringContains; + +import org.mockserver.matchers.MatchType; + +import static org.mockserver.model.HttpResponse.response; +import static org.mockserver.model.JsonBody.json; + +import java.io.*; +import java.net.URL; +import java.util.Date; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +@RunWith(Parameterized.class) +public class LakeFSFileSystemServerS3Test extends S3FSTestBase { + static private final Logger LOG = LoggerFactory.getLogger(LakeFSFileSystemServerS3Test.class); + + public static interface PhysicalAddressCreator { + default void initConfiguration(Configuration conf) {} + String createGetPhysicalAddress(S3FSTestBase o, String key); + StagingLocation createPutStagingLocation(S3FSTestBase o, String namespace, String repo, String branch, String path); + } + + @Parameters(name="{1}") + public static Iterable data() { + return Arrays.asList(new Object[][]{ + {new SimplePhysicalAddressCreator(), "simple"}, + {new PresignedPhysicalAddressCreator(), "presigned"}}); + } + + @Parameter(1) + public String unusedAddressCreatorType; + + @Parameter(0) + public PhysicalAddressCreator pac; + + static private class SimplePhysicalAddressCreator implements PhysicalAddressCreator { + public String createGetPhysicalAddress(S3FSTestBase o, String key) { + return o.s3Url(key); + } + + public StagingLocation createPutStagingLocation(S3FSTestBase o, String namespace, String repo, String branch, String path) { + String fullPath = String.format("%s/%s/%s/%s/%s-object", + o.sessionId(), namespace, repo, branch, path); + return new StagingLocation() + .token("token:simple:" + o.sessionId()) + .physicalAddress(o.s3Url(fullPath)); + } + } + + static private class PresignedPhysicalAddressCreator implements PhysicalAddressCreator { + public void initConfiguration(Configuration conf) { + conf.set("fs.lakefs.access.mode", "presigned"); + } + + protected Date getExpiration() { + return new Date(System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1)); + } + + public String createGetPhysicalAddress(S3FSTestBase o, String key) { + Date expiration = getExpiration(); + URL presignedUrl = + o.s3Client.generatePresignedUrl(new GeneratePresignedUrlRequest(o.s3Bucket, key) + .withMethod(HttpMethod.GET) + .withExpiration(expiration)); + return presignedUrl.toString(); + } + + public StagingLocation createPutStagingLocation(S3FSTestBase o, String namespace, String repo, String branch, String path) { + String fullPath = String.format("%s/%s/%s/%s/%s-object", + o.sessionId(), namespace, repo, branch, path); + Date expiration = getExpiration(); + URL presignedUrl = + o.s3Client.generatePresignedUrl(new GeneratePresignedUrlRequest(o.s3Bucket, fullPath) + .withMethod(HttpMethod.PUT) + .withExpiration(expiration)); + return new StagingLocation() + .token("token:presigned:" + o.sessionId()) + .physicalAddress(o.s3Url(fullPath)) + .presignedUrl(presignedUrl.toString()); + } + } + + @Override + protected void moreHadoopSetup() { + super.moreHadoopSetup(); + pac.initConfiguration(conf); + } + + // Return a location under namespace for this getPhysicalAddress call. + protected StagingLocation mockGetPhysicalAddress(String repo, String branch, String path, String namespace) { + StagingLocation stagingLocation = + pac.createPutStagingLocation(this, namespace, repo, branch, path); + mockServerClient.when(request() + .withMethod("GET") + .withPath(String.format("/repositories/%s/branches/%s/staging/backing", repo, branch)) + .withQueryStringParameter("path", path)) + .respond(response().withStatusCode(200) + .withBody(gson.toJson(stagingLocation))); + return stagingLocation; + } + + @Test + public void testCreate() throws IOException { + String contents = "The quick brown fox jumps over the lazy dog."; + long contentsLength = (long) contents.getBytes().length; + Path path = new Path("lakefs://repo/main/sub1/sub2/create.me"); + + mockDirectoryMarker(ObjectLocation.pathToObjectLocation(null, path)); + + StagingLocation stagingLocation = + mockGetPhysicalAddress("repo", "main", "sub1/sub2/create.me", "repo-base/create"); + + // nothing at path + mockFileDoesNotExist("repo", "main", "sub1/sub2/create.me"); + // sub1/sub2 was an empty directory with no marker. + mockStatObjectNotFound("repo", "main", "sub1/sub2/"); + + ObjectStats newStats = new ObjectStats() + .path("sub1/sub2/create.me") + .pathType(PathTypeEnum.OBJECT) + .physicalAddress(stagingLocation.getPhysicalAddress()). + checksum(UNUSED_CHECKSUM). + mtime(UNUSED_MTIME). + sizeBytes(UNUSED_FILE_SIZE); + + mockServerClient.when(request() + .withMethod("PUT") + .withPath("/repositories/repo/branches/main/staging/backing") + .withBody(json(gson.toJson(new StagingMetadata() + .staging(stagingLocation) + .sizeBytes(contentsLength)), + MatchType.ONLY_MATCHING_FIELDS))) + .respond(response() + .withStatusCode(200) + .withBody(gson.toJson(newStats))); + + // Empty dir marker should be deleted. + mockDeleteObject("repo", "main", "sub1/sub2/"); + + OutputStream out = fs.create(path); + out.write(contents.getBytes()); + out.close(); + + // Write succeeded, verify physical file on S3. + assertS3Object(stagingLocation, contents); + } + + @Test + public void testMkdirs() throws IOException { + // setup empty folder checks + Path path = new Path("dir1/dir2/dir3"); + for (Path p = new Path(path.toString()); p != null && !p.isRoot(); p = p.getParent()) { + mockStatObjectNotFound("repo", "main", p.toString()); + mockStatObjectNotFound("repo", "main", p+"/"); + mockListing("repo", "main", ImmutablePagination.builder().prefix(p+"/").build()); + } + + // physical address to directory marker object + StagingLocation stagingLocation = + mockGetPhysicalAddress("repo", "main", "dir1/dir2/dir3/", "repo-base/emptyDir"); + + ObjectStats newStats = new ObjectStats() + .path("dir1/dir2/dir3/") + .physicalAddress(pac.createGetPhysicalAddress(this, "repo-base/dir12")); + mockStatObject("repo", "main", "dir1/dir2/dir3/", newStats); + + mockServerClient.when(request() + .withMethod("PUT") + .withPath("/repositories/repo/branches/main/staging/backing") + .withQueryStringParameter("path", "dir1/dir2/dir3/") + .withBody(json(gson.toJson(new StagingMetadata() + .staging(stagingLocation) + .sizeBytes(0L)), + MatchType.ONLY_MATCHING_FIELDS))) + .respond(response() + .withStatusCode(200) + .withBody(gson.toJson(newStats))); + + // call mkdirs + Assert.assertTrue(fs.mkdirs(new Path("lakefs://repo/main/", path))); + + // verify file exists on s3 + assertS3Object(stagingLocation, ""); + } + + @Test + public void testCreateExistingDirectory() throws IOException { + Path path = new Path("lakefs://repo/main/sub1/sub2/create.me"); + // path is a directory -- so cannot be created as a file. + + mockStatObjectNotFound("repo", "main", "sub1/sub2/create.me"); + ObjectStats stats = new ObjectStats() + .path("sub1/sub2/create.me/") + .physicalAddress(pac.createGetPhysicalAddress(this, "repo-base/sub1/sub2/create.me")); + mockStatObject("repo", "main", "sub1/sub2/create.me/", stats); + + Exception e = + Assert.assertThrows(FileAlreadyExistsException.class, () -> fs.create(path, false)); + Assert.assertThat(e.getMessage(), new StringContains("is a directory")); + } + + @Test + public void testCreateExistingFile() throws IOException { + Path path = new Path("lakefs://repo/main/sub1/sub2/create.me"); + + ObjectLocation dir = new ObjectLocation("lakefs", "repo", "main", "sub1/sub2"); + mockStatObject("repo", "main", "sub1/sub2/create.me", + new ObjectStats().path("sub1/sub2/create.me")); + Exception e = Assert.assertThrows(FileAlreadyExistsException.class, + () -> fs.create(path, false)); + Assert.assertThat(e.getMessage(), new StringContains("already exists")); + } + + @Test + public void testOpen() throws IOException, ApiException { + String contents = "The quick brown fox jumps over the lazy dog."; + byte[] contentsBytes = contents.getBytes(); + String physicalPath = sessionId() + "/repo-base/open"; + String physicalKey = pac.createGetPhysicalAddress(this, physicalPath); + int readBufferSize = 5; + Path path = new Path("lakefs://repo/main/read.me"); + + mockStatObject("repo", "main", "read.me", + new ObjectStats() + .physicalAddress(physicalKey) + .checksum(UNUSED_CHECKSUM) + .mtime(UNUSED_MTIME) + .sizeBytes((long) contentsBytes.length)); + + // Write physical file to S3. + ObjectMetadata s3Metadata = new ObjectMetadata(); + s3Metadata.setContentLength(contentsBytes.length); + s3Client.putObject(s3Bucket, + physicalPath, + new ByteArrayInputStream(contentsBytes), + s3Metadata); + + try (InputStream in = fs.open(path, readBufferSize)) { + String actual = IOUtils.toString(in); + Assert.assertEquals(contents, actual); + } catch (Exception e) { + String actualFiles = String.join(", ", getS3FilesByPrefix("")); + throw new RuntimeException("Files " + actualFiles + "; read " + path.toString() + " from " + physicalKey, e); + } + } + + // TODO(ariels): Rename test to "testOpenWithNonAsciiUriChars". + @Test + public void testOpenWithInvalidUriChars() throws IOException, ApiException { + String contents = "The quick brown fox jumps over the lazy dog."; + byte[] contentsBytes = contents.getBytes(); + int readBufferSize = 5; + + String[] suffixes = { + "with space/open", + "wi:th$cha&rs#/%op;e?n", + "עכשיו/בעברית/open", + "\uD83E\uDD2F/imoji/open", + }; + for (String suffix : suffixes) { + String key = "/repo-base/" + suffix; + + // Write physical file to S3. + ObjectMetadata s3Metadata = new ObjectMetadata(); + s3Metadata.setContentLength(contentsBytes.length); + s3Client.putObject(new PutObjectRequest(s3Bucket, key, new ByteArrayInputStream(contentsBytes), s3Metadata)); + + String path = String.format("lakefs://repo/main/%s-x", suffix); + ObjectStats stats = new ObjectStats() + .physicalAddress(pac.createGetPhysicalAddress(this, key)) + .sizeBytes((long) contentsBytes.length); + mockStatObject("repo", "main", suffix + "-x", stats); + + try (InputStream in = fs.open(new Path(path), readBufferSize)) { + String actual = IOUtils.toString(in); + Assert.assertEquals(contents, actual); + } + } + } + + @Test + public void testOpen_NotExists() throws IOException, ApiException { + Path path = new Path("lakefs://repo/main/doesNotExi.st"); + mockStatObjectNotFound("repo", "main", "doesNotExi.st"); + Assert.assertThrows(FileNotFoundException.class, + () -> fs.open(path)); + } +} diff --git a/clients/hadoopfs/src/test/java/io/lakefs/LakeFSFileSystemServerTest.java b/clients/hadoopfs/src/test/java/io/lakefs/LakeFSFileSystemServerTest.java new file mode 100644 index 00000000000..cef6c41d67b --- /dev/null +++ b/clients/hadoopfs/src/test/java/io/lakefs/LakeFSFileSystemServerTest.java @@ -0,0 +1,782 @@ +package io.lakefs; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.lakefs.clients.api.*; +import io.lakefs.clients.api.model.*; +import io.lakefs.clients.api.model.ObjectStats.PathTypeEnum; +import io.lakefs.utils.ObjectLocation; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.http.HttpStatus; +import org.junit.Assert; +import org.junit.Test; + +import org.hamcrest.core.StringContains; + +import org.mockserver.client.MockServerClient; +import org.mockserver.matchers.TimeToLive; +import org.mockserver.matchers.Times; +import org.mockserver.model.Cookie; +import org.mockserver.model.HttpRequest; +import org.mockserver.model.HttpResponse; +import org.mockserver.model.Parameter; + +import static org.mockserver.model.HttpResponse.response; +import static org.mockserver.model.JsonBody.json; + +import java.io.*; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class LakeFSFileSystemServerTest extends FSTestBase { + static private final Logger LOG = LoggerFactory.getLogger(LakeFSFileSystemServerTest.class); + + protected String objectLocToS3ObjKey(ObjectLocation objectLoc) { + return String.format("/%s/%s/%s",objectLoc.getRepository(), objectLoc.getRef(), objectLoc.getPath()); + } + + @Test + public void getUri() { + URI u = fs.getUri(); + Assert.assertNotNull(u); + } + + @Test + public void testUnknownProperties() throws IOException { + // Verify that a response with unknown properties is still parsed. + // This allows backwards compatibility: old clients can work with + // new servers. It tests that the OpenAPI codegen gave the correct + // result, but this is still important. + // + // TODO(ariels): This test is unrelated to LakeFSFileSystem. it + // should not be part of LakeFSFileSystemTest. + Path path = new Path("lakefs://repo/main/file"); + mockServerClient.when(request() + .withMethod("GET") + .withPath("/repositories/repo/refs/main/objects/stat") + .withQueryStringParameter("path", "file")) + .respond(response() + .withStatusCode(200) + .withBody("{\"path\": \"file\", \"unknown-key\": \"ignored\"}")); + LakeFSFileStatus fileStatus = fs.getFileStatus(path); + Assert.assertEquals(path, fileStatus.getPath()); + } + + @Test + public void testGetFileStatus_ExistingFile() throws IOException { + Path path = new Path("lakefs://repo/main/mock/exists"); + mockStatObject("repo", "main", "mock/exists", + new ObjectStats().path("mock/exists")); + + LakeFSFileStatus fileStatus = fs.getFileStatus(path); + Assert.assertTrue(fileStatus.isFile()); + Assert.assertEquals(path, fileStatus.getPath()); + } + + @Test + public void testGetFileStatus_NoFile() { + Path noFilePath = new Path("lakefs://repo/main/no.file"); + + mockStatObjectNotFound("repo", "main", "no.file"); + mockStatObjectNotFound("repo", "main", "no.file/"); + mockListing("repo", "main", ImmutablePagination.builder().prefix("no.file/").amount(1).build()); + Assert.assertThrows(FileNotFoundException.class, () -> fs.getFileStatus(noFilePath)); + } + + @Test + public void testGetFileStatus_DirectoryMarker() throws IOException { + Path dirPath = new Path("lakefs://repo/main/dir1/dir2"); + mockStatObjectNotFound("repo", "main", "dir1/dir2"); + + ObjectStats stats = new ObjectStats() + .path("dir1/dir2/") + .physicalAddress(s3Url("repo-base/dir12")); + mockStatObject("repo", "main", "dir1/dir2/", stats); + + LakeFSFileStatus dirStatus = fs.getFileStatus(dirPath); + Assert.assertTrue(dirStatus.isDirectory()); + Assert.assertEquals(dirPath, dirStatus.getPath()); + } + + @Test + public void testExists_ExistsAsObject() throws IOException { + Path path = new Path("lakefs://repo/main/exis.ts"); + ObjectStats stats = new ObjectStats() + .path("exis.ts") + .physicalAddress(s3Url("repo-base/o12")); + mockListing("repo", "main", ImmutablePagination.builder().prefix("exis.ts").build(), stats); + Assert.assertTrue(fs.exists(path)); + } + + @Test + public void testExists_ExistsAsDirectoryMarker() throws IOException { + Path path = new Path("lakefs://repo/main/exis.ts"); + ObjectStats stats = new ObjectStats().path("exis.ts"); + + mockListing("repo", "main", ImmutablePagination.builder().prefix("exis.ts").build(), + stats); + + Assert.assertTrue(fs.exists(path)); + } + + @Test + public void testExists_ExistsAsDirectoryContents() throws IOException { + Path path = new Path("lakefs://repo/main/exis.ts"); + ObjectStats stats = new ObjectStats().path("exis.ts/object-inside-the-path"); + + mockListing("repo", "main", ImmutablePagination.builder().prefix("exis.ts").build(), + stats); + Assert.assertTrue(fs.exists(path)); + } + + @Test + public void testExists_ExistsAsDirectoryInSecondList() throws IOException { + Path path = new Path("lakefs://repo/main/exis.ts"); + ObjectStats beforeStats1 = new ObjectStats().path("exis.ts!"); + ObjectStats beforeStats2 = new ObjectStats().path("exis.ts$x"); + ObjectStats indirStats = new ObjectStats().path("exis.ts/object-inside-the-path"); + + // First listing returns irrelevant objects, _before_ "exis.ts/" + mockListingWithHasMore("repo", "main", + ImmutablePagination.builder().prefix("exis.ts").build(), + true, + beforeStats1, beforeStats2); + // Second listing tries to find an object inside "exis.ts/". + mockListing("repo", "main", ImmutablePagination.builder().prefix("exis.ts/").build(), + indirStats); + Assert.assertTrue(fs.exists(path)); + } + + @Test + public void testExists_NotExistsNoPrefix() throws IOException { + Path path = new Path("lakefs://repo/main/doesNotExi.st"); + Object emptyBody = ImmutableMap.of("results", ImmutableList.of(), + "pagination", ImmutablePagination.builder().build()); + mockServerClient.when(request() + .withMethod("GET") + .withPath("/repositories/repo/refs/main/objects/ls")) + .respond(response() + .withStatusCode(200) + .withBody(gson.toJson(emptyBody))); + Assert.assertFalse(fs.exists(path)); + } + + @Test + public void testExists_NotExistsPrefixWithNoSlash() { + // TODO(ariels) + } + + @Test + public void testExists_NotExistsPrefixWithNoSlashTwoLists() { + // TODO(ariels) + } + + @Test + public void testDelete_FileExists() throws IOException { + mockStatObject("repo", "main", "no/place/file.txt", new ObjectStats() + .path("delete/sample/file.txt") + .pathType(PathTypeEnum.OBJECT) + .physicalAddress(s3Url("repo-base/delete"))); + String[] arrDirs = {"no/place", "no"}; + for (String dir: arrDirs) { + mockStatObjectNotFound("repo", "main", dir); + mockStatObjectNotFound("repo", "main", dir + "/"); + mockListing("repo", "main", ImmutablePagination.builder().build()); + } + mockDeleteObject("repo", "main", "no/place/file.txt"); + mockUploadObject("repo", "main", "no/place/"); + + Path path = new Path("lakefs://repo/main/no/place/file.txt"); + + mockDirectoryMarker(ObjectLocation.pathToObjectLocation(null, path.getParent())); + + Assert.assertTrue(fs.delete(path, false)); + } + + @Test + public void testDelete_FileNotExists() throws IOException { + mockDeleteObjectNotFound("repo", "main", "no/place/file.txt"); + mockStatObjectNotFound("repo", "main", "no/place/file.txt"); + mockStatObjectNotFound("repo", "main", "no/place/file.txt/"); + mockListing("repo", "main", + ImmutablePagination.builder().prefix("no/place/file.txt/").build()); + + // Should still create a directory marker! + mockUploadObject("repo", "main", "no/place/"); + + // return false because file not found + Assert.assertFalse(fs.delete(new Path("lakefs://repo/main/no/place/file.txt"), false)); + } + + @Test + public void testDelete_EmptyDirectoryExists() throws IOException { + ObjectLocation dirObjLoc = new ObjectLocation("lakefs", "repo", "main", "delete/me"); + String key = objectLocToS3ObjKey(dirObjLoc); + + mockStatObjectNotFound(dirObjLoc.getRepository(), dirObjLoc.getRef(), dirObjLoc.getPath()); + ObjectStats srcStats = new ObjectStats() + .path(dirObjLoc.getPath() + Constants.SEPARATOR) + .sizeBytes(0L) + .mtime(UNUSED_MTIME) + .pathType(PathTypeEnum.OBJECT) + .physicalAddress(s3Url(key+Constants.SEPARATOR)) + .checksum(UNUSED_CHECKSUM); + mockStatObject(dirObjLoc.getRepository(), dirObjLoc.getRef(), dirObjLoc.getPath() + Constants.SEPARATOR, srcStats); + + // Just a directory marker delete/me/, so nothing to delete. + mockListing("repo", "main", + ImmutablePagination.builder().prefix("delete/me/").build(), + srcStats); + + mockDirectoryMarker(dirObjLoc.getParent()); + mockStatObject(dirObjLoc.getRepository(), dirObjLoc.getRef(), dirObjLoc.getPath(), srcStats); + mockDeleteObject("repo", "main", "delete/me/"); + // Now need to create the parent directory. + mockUploadObject("repo", "main", "delete/"); + + Path path = new Path("lakefs://repo/main/delete/me"); + + Assert.assertTrue(fs.delete(path, false)); + } + + @Test + public void testDelete_DirectoryWithFile() throws IOException { + String directoryPath = "delete/sample"; + String existingPath = "delete/sample/file.txt"; + String directoryToDelete = "lakefs://repo/main/delete/sample"; + mockStatObjectNotFound("repo", "main", directoryPath); + mockStatObjectNotFound("repo", "main", directoryPath + Constants.SEPARATOR); + // Just a single object under delete/sample/, not even a directory + // marker for delete/sample/. + ObjectStats srcStats = new ObjectStats(). + path(existingPath). + pathType(PathTypeEnum.OBJECT). + physicalAddress(s3Url("/repo-base/delete")). + checksum(UNUSED_CHECKSUM). + mtime(UNUSED_MTIME). + sizeBytes(UNUSED_FILE_SIZE); + mockListing("repo", "main", + ImmutablePagination.builder() + .prefix(directoryPath + Constants.SEPARATOR) + .build(), + srcStats); + + // No deletes! + mockServerClient.when(request() + .withMethod("DELETE")) + .respond(response().withStatusCode(400).withBody("Should not delete anything")); + + // Can't delete a directory without recursive, and + // delete/sample/file.txt is not deleted. + Exception e = + Assert.assertThrows(IOException.class, + () -> fs.delete(new Path(directoryToDelete), false)); + String failureMessage = + String.format("Path is a non-empty directory: %s", directoryToDelete); + Assert.assertThat(e.getMessage(), new StringContains(failureMessage)); + } + + @Test + public void testDelete_NotExistsRecursive() throws IOException { + // No objects to stat. + mockServerClient.when(request() + .withMethod("GET") + .withPath("/repositories/repo/refs/main/objects/stat")) + .respond(response().withStatusCode(404)); + // No objects to list, either -- in directory. + mockListing("repo", "main", + ImmutablePagination.builder().prefix("no/place/file.txt/").build()); + Assert.assertFalse(fs.delete(new Path("lakefs://repo/main/no/place/file.txt"), true)); + } + + @Test + public void testDelete_DirectoryWithFileRecursive() throws IOException { + mockStatObjectNotFound("repo", "main", "delete/sample"); + mockStatObjectNotFound("repo", "main", "delete/sample/"); + ObjectStats stats = new ObjectStats(). + path("delete/sample/file.txt"). + pathType(PathTypeEnum.OBJECT). + physicalAddress(s3Url("/repo-base/delete")). + checksum(UNUSED_CHECKSUM). + mtime(UNUSED_MTIME). + sizeBytes(UNUSED_FILE_SIZE); + mockListing("repo", "main", + ImmutablePagination.builder().prefix("delete/sample/").build(), + stats); + + mockDeleteObjects("repo", "main", "delete/sample/file.txt"); + + // recursive will always end successfully + Path path = new Path("lakefs://repo/main/delete/sample"); + + mockDirectoryMarker(ObjectLocation.pathToObjectLocation(null, path.getParent())); + // Must create a parent directory marker: it wasn't deleted, and now + // perhaps is empty. + mockUploadObject("repo", "main", "delete/"); + + boolean delete = fs.delete(path, true); + Assert.assertTrue(delete); + } + + protected void caseDeleteDirectoryRecursive(int bulkSize, int numObjects) throws IOException { + conf.setInt(LakeFSFileSystem.LAKEFS_DELETE_BULK_SIZE, bulkSize); + mockStatObjectNotFound("repo", "main", "delete/sample"); + mockStatObjectNotFound("repo", "main", "delete/sample/"); + + ObjectStats[] objects = new ObjectStats[numObjects]; + for (int i = 0; i < numObjects; i++) { + objects[i] = new ObjectStats(). + path(String.format("delete/sample/file%04d.txt", i)). + pathType(PathTypeEnum.OBJECT). + physicalAddress(s3Url(String.format("/repo-base/delete%04d", i))). + checksum(UNUSED_CHECKSUM). + mtime(UNUSED_MTIME). + sizeBytes(UNUSED_FILE_SIZE); + } + mockListing("repo", "main", + ImmutablePagination.builder().prefix("delete/sample/").build(), + objects); + + // Set up multiple deleteObjects expectations of bulkSize deletes + // each (except for the last, which might be smaller). + for (int start = 0; start < numObjects; start += bulkSize) { + PathList pl = new PathList(); + for (int i = start; i < numObjects && i < start + bulkSize; i++) { + pl.addPathsItem(String.format("delete/sample/file%04d.txt", i)); + } + mockDeleteObjects("repo", "main", pl); + } + // Mock parent directory marker creation at end of fs.delete to show + // the directory marker exists. + mockUploadObject("repo", "main", "delete/"); + // recursive will always end successfully + Assert.assertTrue(fs.delete(new Path("lakefs://repo/main/delete/sample"), true)); + } + + @Test + public void testDeleteDirectoryRecursiveBatch1() throws IOException { + caseDeleteDirectoryRecursive(1, 123); + } + + @Test + public void testDeleteDirectoryRecursiveBatch2() throws IOException { + caseDeleteDirectoryRecursive(2, 123); + } + + @Test + public void testDeleteDirectoryRecursiveBatch3() throws IOException { + caseDeleteDirectoryRecursive(3, 123); + } + @Test + public void testDeleteDirectoryRecursiveBatch5() throws IOException { + caseDeleteDirectoryRecursive(5, 123); + } + @Test + public void testDeleteDirectoryRecursiveBatch120() throws IOException { + caseDeleteDirectoryRecursive(120, 123); + } + @Test + public void testDeleteDirectoryRecursiveBatch123() throws IOException { + caseDeleteDirectoryRecursive(123, 123); + } + + @Test + public void testListStatusFile() throws IOException { + ObjectStats objectStats = new ObjectStats(). + path("status/file"). + pathType(PathTypeEnum.OBJECT). + physicalAddress(s3Url("/repo-base/status")). + checksum(STATUS_CHECKSUM). + mtime(STATUS_MTIME). + sizeBytes(STATUS_FILE_SIZE); + mockStatObject("repo", "main", "status/file", objectStats); + Path path = new Path("lakefs://repo/main/status/file"); + FileStatus[] fileStatuses = fs.listStatus(path); + LakeFSFileStatus expectedFileStatus = new LakeFSFileStatus.Builder(path) + .length(STATUS_FILE_SIZE) + .checksum(STATUS_CHECKSUM) + .mTime(STATUS_MTIME) + .physicalAddress(s3Url("/repo-base/status")) + .blockSize(Constants.DEFAULT_BLOCK_SIZE) + .build(); + LakeFSFileStatus[] expectedFileStatuses = new LakeFSFileStatus[]{expectedFileStatus}; + Assert.assertArrayEquals(expectedFileStatuses, fileStatuses); + } + + @Test + public void testListStatusNotFound() throws ApiException { + mockStatObjectNotFound("repo", "main", "status/file"); + mockStatObjectNotFound("repo", "main", "status/file/"); + mockListing("repo", "main", + ImmutablePagination.builder().prefix("status/file/").build()); + Path path = new Path("lakefs://repo/main/status/file"); + Assert.assertThrows(FileNotFoundException.class, () -> fs.listStatus(path)); + } + + @Test + public void testListStatusDirectory() throws IOException { + int totalObjectsCount = 3; + ObjectStats[] objects = new ObjectStats[3]; + for (int i = 0; i < totalObjectsCount; i++) { + objects[i] = new ObjectStats(). + path("status/file" + i). + pathType(PathTypeEnum.OBJECT). + physicalAddress(s3Url("/repo-base/status" + i)). + checksum(STATUS_CHECKSUM). + mtime(STATUS_MTIME). + sizeBytes(STATUS_FILE_SIZE); + } + mockListing("repo", "main", + ImmutablePagination.builder().prefix("status/").build(), + objects); + mockStatObjectNotFound("repo", "main", "status"); + + Path dir = new Path("lakefs://repo/main/status"); + FileStatus[] fileStatuses = fs.listStatus(dir); + + FileStatus[] expectedFileStatuses = new LocatedFileStatus[totalObjectsCount]; + for (int i = 0; i < totalObjectsCount; i++) { + Path p = new Path(dir + "/file" + i); + LakeFSFileStatus fileStatus = new LakeFSFileStatus.Builder(p) + .length(STATUS_FILE_SIZE) + .checksum(STATUS_CHECKSUM) + .mTime(STATUS_MTIME) + .blockSize(Constants.DEFAULT_BLOCK_SIZE) + .physicalAddress(s3Url("/repo-base/status" + i)) + .build(); + expectedFileStatuses[i] = new LocatedFileStatus(fileStatus, null); + } + Assert.assertArrayEquals(expectedFileStatuses, fileStatuses); + } + + @Test(expected = UnsupportedOperationException.class) + public void testAppend() throws IOException { + fs.append(null, 0, null); + } + + /** + * rename(src.txt, non-existing-dst) -> non-existing/new - unsupported, should fail with false. (Test was buggy, FIX!) + */ + @Test + public void testRename_existingFileToNonExistingDst() throws IOException, ApiException { + Path src = new Path("lakefs://repo/main/existing.src"); + + ObjectStats stats = new ObjectStats() + .path("existing.src") + .sizeBytes(STATUS_FILE_SIZE) + .mtime(STATUS_MTIME) + .pathType(PathTypeEnum.OBJECT) + .physicalAddress(s3Url("existing.src")) + .checksum(STATUS_CHECKSUM); + + mockStatObject("repo", "main", "existing.src", stats); + + Path dst = new Path("lakefs://repo/main/non-existing/new"); + + mockListing("repo", "main", + ImmutablePagination.builder().prefix("non-existing/").build()); + mockStatObjectNotFound("repo", "main", "non-existing/new"); + mockStatObjectNotFound("repo", "main", "non-existing/new/"); + mockListing("repo", "main", + ImmutablePagination.builder().prefix("non-existing/new/").build()); + mockStatObjectNotFound("repo", "main", "non-existing"); + mockStatObjectNotFound("repo", "main", "non-existing/"); + + boolean renamed = fs.rename(src, dst); + Assert.assertFalse(renamed); + } + + @Test + public void testRename_existingFileToExistingFileName() throws IOException { + Path src = new Path("lakefs://repo/main/existing.src"); + ObjectStats srcStats = new ObjectStats() + .path("existing.src") + .pathType(PathTypeEnum.OBJECT) + .physicalAddress("base/existing.src"); + mockStatObject("repo", "main", "existing.src", srcStats); + + Path dst = new Path("lakefs://repo/main/existing.dst"); + ObjectStats dstStats = new ObjectStats() + .pathType(PathTypeEnum.OBJECT) + .path("existing.dst") + .physicalAddress(s3Url("existing.dst")); + mockStatObject("repo", "main", "existing.dst", dstStats); + + mockServerClient.when(request() + .withMethod("POST") + .withPath("/repositories/repo/branches/main/objects/copy") + .withQueryStringParameter("dest_path", "existing.dst") + .withBody(json(gson.toJson(new ObjectCopyCreation() + .srcRef("main") + .srcPath("existing.src"))))) + .respond(response() + .withStatusCode(201) + // Actual new dstStats will be different... but lakeFSFS doesn't care. + .withBody(json(gson.toJson(dstStats)))); + + mockDeleteObject("repo", "main", "existing.src"); + + Assert.assertTrue(fs.rename(src, dst)); + } + + @Test + public void testRename_existingDirToExistingFileName() throws IOException { + Path fileInSrcDir = new Path("lakefs://repo/main/existing-dir/existing.src"); + ObjectStats srcStats = new ObjectStats() + .pathType(PathTypeEnum.OBJECT) + .path("existing-dir/existing.src"); + Path srcDir = new Path("lakefs://repo/main/existing-dir"); + mockStatObjectNotFound("repo", "main", "existing-dir"); + mockStatObjectNotFound("repo", "main", "existing-dir/"); + mockListing("repo", "main", + ImmutablePagination.builder().prefix("existing-dir/").build(), + srcStats); + + Path dst = new Path("lakefs://repo/main/existingdst.file"); + ObjectStats dstStats = new ObjectStats() + .pathType(PathTypeEnum.OBJECT) + .path("existingdst.file"); + mockStatObject("repo", "main", "existingdst.file", dstStats); + + Assert.assertFalse(fs.rename(srcDir, dst)); + } + + /** + * file -> existing-directory-name: rename(src.txt, existing-dstdir) -> existing-dstdir/src.txt + */ + @Test + public void testRename_existingFileToExistingDirName() throws ApiException, IOException { + Path src = new Path("lakefs://repo/main/existing-dir1/existing.src"); + ObjectStats srcStats = new ObjectStats() + .pathType(PathTypeEnum.OBJECT) + .path("existing-dir1/existing.src"); + mockStatObject("repo", "main", "existing-dir1/existing.src", srcStats); + + ObjectStats dstStats = new ObjectStats() + .pathType(PathTypeEnum.OBJECT) + .path("existing-dir2/existing.src"); + mockFileDoesNotExist("repo", "main", "existing-dir2"); + mockFileDoesNotExist("repo", "main", "existing-dir2/"); + mockListing("repo", "main", + ImmutablePagination.builder().prefix("existing-dir2/").build(), + dstStats); + + Path dst = new Path("lakefs://repo/main/existing-dir2/"); + + mockServerClient.when(request() + .withMethod("POST") + .withPath("/repositories/repo/branches/main/objects/copy") + .withQueryStringParameter("dest_path", "existing-dir2/existing.src") + .withBody(json(gson.toJson(new ObjectCopyCreation() + .srcRef("main") + .srcPath("existing-dir1/existing.src"))))) + .respond(response() + .withStatusCode(201) + // Actual new dstStats will be different... but lakeFSFS doesn't care. + .withBody(json(gson.toJson(dstStats)))); + mockGetBranch("repo", "main"); + mockDeleteObject("repo", "main", "existing-dir1/existing.src"); + + // Need a directory marker at the source because it's now empty! + mockUploadObject("repo", "main", "existing-dir1/"); + + Assert.assertTrue(fs.rename(src, dst)); + } + + /** + * rename(srcDir(containing srcDir/a.txt, srcDir/b.txt), non-existing-dir/new) -> unsupported, rename should fail by returning false + */ + @Test + public void testRename_existingDirToNonExistingDirWithoutParent() throws IOException { + Path fileInSrcDir = new Path("lakefs://repo/main/existing-dir/existing.src"); + Path srcDir = new Path("lakefs://repo/main/existing-dir"); + + mockFilesInDir("repo", "main", "existing-dir", "existing.src"); + + mockFileDoesNotExist("repo", "main", "x/non-existing-dir"); + mockFileDoesNotExist("repo", "main", "x/non-existing-dir/new"); + // Will also check if parent of destination is a directory (it isn't). + mockListing("repo", "main", + ImmutablePagination.builder().prefix("x/non-existing-dir/").build()); + mockListing("repo", "main", + ImmutablePagination.builder().prefix("x/non-existing-dir/new/").build()); + + // Keep a directory marker, or rename will try to create one because + // it emptied the existing directory. + mockStatObject("repo", "main", "x", + new ObjectStats().pathType(PathTypeEnum.OBJECT).path("x")); + + Path dst = new Path("lakefs://repo/main/x/non-existing-dir/new"); + + Assert.assertFalse(fs.rename(srcDir, dst)); + } + + /** + * rename(srcDir(containing srcDir/a.txt, srcDir/b.txt), non-existing-dir/new) -> unsupported, rename should fail by returning false + */ + @Test + public void testRename_existingDirToNonExistingDirWithParent() throws ApiException, IOException { + Path fileInSrcDir = new Path("lakefs://repo/main/existing-dir/existing.src"); + Path srcDir = new Path("lakefs://repo/main/existing-dir"); + Path dst = new Path("lakefs://repo/main/existing-dir2/new"); + + ObjectStats srcStats = new ObjectStats() + .pathType(PathTypeEnum.OBJECT) + .path("existing-dir/existing.src"); + + mockStatObjectNotFound("repo", "main", "existing-dir"); + mockStatObjectNotFound("repo", "main", "existing-dir/"); + mockListing("repo", "main", ImmutablePagination.builder().prefix("existing-dir/").build(), + srcStats); + + mockStatObjectNotFound("repo", "main", "existing-dir2"); + mockStatObject("repo", "main", "existing-dir2/", + new ObjectStats().pathType(PathTypeEnum.OBJECT).path("existing-dir2/")); + + mockStatObjectNotFound("repo", "main", "existing-dir2/new"); + mockStatObjectNotFound("repo", "main", "existing-dir2/new/"); + mockListing("repo", "main", ImmutablePagination.builder().prefix("existing-dir2/new/").build()); + + ObjectStats dstStats = new ObjectStats() + .pathType(PathTypeEnum.OBJECT) + .path("existing-dir2/new/existing.src"); + + mockServerClient.when(request() + .withMethod("POST") + .withPath("/repositories/repo/branches/main/objects/copy") + .withQueryStringParameter("dest_path", "existing-dir2/new/existing.src") + .withBody(json(gson.toJson(new ObjectCopyCreation() + .srcRef("main") + .srcPath("existing-dir/existing.src"))))) + .respond(response() + .withStatusCode(201) + .withBody(json(gson.toJson(dstStats)))); + mockDeleteObject("repo", "main", "existing-dir/existing.src"); + // Directory marker no longer required. + mockDeleteObject("repo", "main", "existing-dir2/"); + + boolean renamed = fs.rename(srcDir, dst); + Assert.assertTrue(renamed); + } + + /** + * rename(srcDir(containing srcDir/a.txt), existing-nonempty-dstdir) -> unsupported, rename should fail by returning false. + */ + @Test + public void testRename_existingDirToExistingNonEmptyDirName() throws IOException { + Path firstSrcFile = new Path("lakefs://repo/main/existing-dir1/a.src"); + ObjectStats firstSrcFileStats = new ObjectStats().path("existing-dir1/a.src"); + Path secSrcFile = new Path("lakefs://repo/main/existing-dir1/b.src"); + ObjectStats secSrcFileStats = new ObjectStats().path("existing-dir1/b.src"); + + Path srcDir = new Path("lakefs://repo/main/existing-dir1"); + + mockStatObjectNotFound("repo", "main", "existing-dir1"); + mockStatObjectNotFound("repo", "main", "existing-dir1/"); + mockListing("repo", "main", ImmutablePagination.builder().prefix("existing-dir1/").build(), + firstSrcFileStats, secSrcFileStats); + + Path fileInDstDir = new Path("lakefs://repo/main/existing-dir2/file.dst"); + ObjectStats fileInDstDirStats = new ObjectStats().path("existing-dir2/file.dst"); + Path dstDir = new Path("lakefs://repo/main/existing-dir2"); + mockStatObjectNotFound("repo", "main", "existing-dir2"); + mockStatObjectNotFound("repo", "main", "existing-dir2/"); + mockListing("repo", "main", ImmutablePagination.builder().prefix("existing-dir2/").build(), + fileInDstDirStats); + + boolean renamed = fs.rename(srcDir, dstDir); + Assert.assertFalse(renamed); + } + + // /** + // * Check that a file is renamed when working against a lakeFS version + // * where CopyObject API doesn't exist + // */ + // TODO(johnnyaug): Do we still need this test? + // + // @Test + // public void testRename_fallbackStageAPI() throws ApiException, IOException { + // Path src = new Path("lakefs://repo/main/existing-dir1/existing.src"); + // ObjectLocation srcObjLoc = fs.pathToObjectLocation(src); + // mockExistingFilePath(srcObjLoc); + + // Path fileInDstDir = new Path("lakefs://repo/main/existing-dir2/existing.src"); + // ObjectLocation fileObjLoc = fs.pathToObjectLocation(fileInDstDir); + // Path dst = new Path("lakefs://repo/main/existing-dir2"); + // ObjectLocation dstObjLoc = fs.pathToObjectLocation(dst); + + // mockExistingDirPath(dstObjLoc, ImmutableList.of(fileObjLoc)); + // mockDirectoryMarker(fs.pathToObjectLocation(src.getParent())); + // mockMissingCopyAPI(); + + // boolean renamed = fs.rename(src, dst); + // Assert.assertTrue(renamed); + // Path expectedDstPath = new Path("lakefs://repo/main/existing-dir2/existing.src"); + // Assert.assertTrue(dstPathLinkedToSrcPhysicalAddress(srcObjLoc, fs.pathToObjectLocation(expectedDstPath))); + // verifyObjDeletion(srcObjLoc); + // } + + @Test + public void testRename_srcAndDstOnDifferentBranch() throws IOException { + Path src = new Path("lakefs://repo/branch/existing.src"); + Path dst = new Path("lakefs://repo/another-branch/existing.dst"); + // Any lakeFS access will fail, including statObject, copyObject, or + // deleteObject! + boolean renamed = fs.rename(src, dst); + Assert.assertFalse(renamed); + } + + /** + * no-op. rename is expected to succeed. + */ + @Test + public void testRename_srcEqualsDst() throws IOException { + Path src = new Path("lakefs://repo/main/existing.src"); + Path dst = new Path("lakefs://repo/main/existing.src"); + // Any lakeFS access will fail, including statObject, copyObject, or + // deleteObject! + boolean renamed = fs.rename(src, dst); + Assert.assertTrue(renamed); + } + + @Test + public void testRename_nonExistingSrcFile() throws IOException { + Path src = new Path("lakefs://repo/main/non-existing.src"); + mockStatObjectNotFound("repo", "main", "non-existing.src"); + mockStatObjectNotFound("repo", "main", "non-existing.src/"); + mockListing("repo", "main", ImmutablePagination.builder().prefix("non-existing.src/").build()); + + Path dst = new Path("lakefs://repo/main/existing.dst"); + mockStatObject("repo", "main", "existing.dst", new ObjectStats().path("existing.dst")); + + boolean success = fs.rename(src, dst); + Assert.assertFalse(success); + } + + /** + * globStatus is used only by the Hadoop CLI where the pattern is always the exact file. + */ + @Test + public void testGlobStatus_SingleFile() throws IOException { + Path path = new Path("lakefs://repo/main/existing"); + mockStatObject("repo", "main", "existing", new ObjectStats().path("existing")); + + FileStatus[] statuses = fs.globStatus(path); + Assert.assertArrayEquals(new FileStatus[]{ + new LakeFSFileStatus.Builder(path).build() + }, statuses); + } +} diff --git a/clients/hadoopfs/src/test/java/io/lakefs/LakeFSFileSystemSimpleModeTest.java b/clients/hadoopfs/src/test/java/io/lakefs/LakeFSFileSystemSimpleModeTest.java deleted file mode 100644 index 715f0fefb4b..00000000000 --- a/clients/hadoopfs/src/test/java/io/lakefs/LakeFSFileSystemSimpleModeTest.java +++ /dev/null @@ -1,31 +0,0 @@ -package io.lakefs; - -import static org.mockito.Mockito.when; -import io.lakefs.clients.api.ApiException; -import io.lakefs.clients.api.model.ObjectStats; -import io.lakefs.clients.api.model.ObjectStats.PathTypeEnum; -import io.lakefs.clients.api.model.StagingLocation; - -public class LakeFSFileSystemSimpleModeTest extends LakeFSFileSystemTest { - - @Override - void initConfiguration() {} - - @Override - StagingLocation mockGetPhysicalAddress(String repo, String branch, String key, - String physicalKey) throws ApiException { - StagingLocation stagingLocation = - new StagingLocation().token("foo").physicalAddress(s3Url("/" + physicalKey)); - when(stagingApi.getPhysicalAddress(repo, branch, key, false)).thenReturn(stagingLocation); - return stagingLocation; - } - - @Override - void mockStatObject(String repo, String branch, String key, String physicalKey, Long sizeBytes) - throws ApiException { - when(objectsApi.statObject(repo, branch, key, false, false)) - .thenReturn(new ObjectStats().path("lakefs://" + repo + "/" + branch + "/" + key).pathType(PathTypeEnum.OBJECT) - .physicalAddress(s3Url(physicalKey)).checksum(UNUSED_CHECKSUM).mtime(UNUSED_MTIME) - .sizeBytes((long) sizeBytes)); - } -} diff --git a/clients/hadoopfs/src/test/java/io/lakefs/LakeFSFileSystemTest.java b/clients/hadoopfs/src/test/java/io/lakefs/LakeFSFileSystemTest.java deleted file mode 100644 index 05c2774efbb..00000000000 --- a/clients/hadoopfs/src/test/java/io/lakefs/LakeFSFileSystemTest.java +++ /dev/null @@ -1,1079 +0,0 @@ -package io.lakefs; - -import io.lakefs.clients.api.*; -import io.lakefs.clients.api.model.*; -import io.lakefs.clients.api.model.ObjectStats.PathTypeEnum; -import io.lakefs.utils.ObjectLocation; - -import com.amazonaws.ClientConfiguration; -import com.amazonaws.auth.AWSCredentials; -import com.amazonaws.auth.BasicAWSCredentials; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.AmazonS3Client; -import com.amazonaws.services.s3.S3ClientOptions; -import com.amazonaws.services.s3.model.*; -import com.aventrix.jnanoid.jnanoid.NanoIdUtils; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileAlreadyExistsException; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.LocatedFileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.http.HttpStatus; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.mockito.Answers; -import org.mockito.ArgumentCaptor; -import org.mockito.Mockito; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.utility.DockerImageName; - -import java.io.*; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.*; - -public abstract class LakeFSFileSystemTest { - static final Long UNUSED_FILE_SIZE = 1L; - static final Long UNUSED_MTIME = 0L; - static final String UNUSED_CHECKSUM = "unused"; - - static final Long STATUS_FILE_SIZE = 2L; - static final Long STATUS_MTIME = 0L; - static final String STATUS_CHECKSUM = "status"; - - protected Configuration conf; - protected final LakeFSFileSystem fs = new LakeFSFileSystem(); - - protected LakeFSClient lfsClient; - protected ObjectsApi objectsApi; - protected BranchesApi branchesApi; - protected RepositoriesApi repositoriesApi; - protected StagingApi stagingApi; - protected ConfigApi configApi; - - protected AmazonS3 s3Client; - - protected String s3Base; - protected String s3Bucket; - - private static final DockerImageName MINIO = DockerImageName.parse("minio/minio:RELEASE.2021-06-07T21-40-51Z"); - protected static final String S3_ACCESS_KEY_ID = "AKIArootkey"; - protected static final String S3_SECRET_ACCESS_KEY = "secret/minio/key="; - - protected static final ApiException noSuchFile = new ApiException(HttpStatus.SC_NOT_FOUND, "no such file"); - - @Rule - public final GenericContainer s3 = new GenericContainer(MINIO.toString()). - withCommand("minio", "server", "/data"). - withEnv("MINIO_ROOT_USER", S3_ACCESS_KEY_ID). - withEnv("MINIO_ROOT_PASSWORD", S3_SECRET_ACCESS_KEY). - withEnv("MINIO_DOMAIN", "s3.local.lakefs.io"). - withEnv("MINIO_UPDATE", "off"). - withExposedPorts(9000); - - abstract void initConfiguration(); - - abstract void mockStatObject(String repo, String branch, String key, String physicalKey, Long sizeBytes) throws ApiException; - - abstract StagingLocation mockGetPhysicalAddress(String repo, String branch, String key, String physicalKey) throws ApiException; - - protected static String makeS3BucketName() { - String slug = NanoIdUtils.randomNanoId(NanoIdUtils.DEFAULT_NUMBER_GENERATOR, - "abcdefghijklmnopqrstuvwxyz-0123456789".toCharArray(), 14); - return String.format("bucket-%s-x", slug); - } - - /** @return "s3://..." URL to use for s3Path (which does not start with a slash) on bucket */ - protected String s3Url(String s3Path) { - return s3Base + s3Path; - } - - @Before - public void logS3Container() { - Logger s3Logger = LoggerFactory.getLogger("s3 container"); - Slf4jLogConsumer logConsumer = new Slf4jLogConsumer(s3Logger) - .withMdc("container", "s3") - .withSeparateOutputStreams(); - s3.followOutput(logConsumer); - } - - @Before - public void setUp() throws Exception { - AWSCredentials creds = new BasicAWSCredentials(S3_ACCESS_KEY_ID, S3_SECRET_ACCESS_KEY); - - ClientConfiguration clientConfiguration = new ClientConfiguration() - .withSignerOverride("AWSS3V4SignerType"); - String s3Endpoint = String.format("http://s3.local.lakefs.io:%d", s3.getMappedPort(9000)); - - s3Client = new AmazonS3Client(creds, clientConfiguration); - - S3ClientOptions s3ClientOptions = new S3ClientOptions() - .withPathStyleAccess(true); - s3Client.setS3ClientOptions(s3ClientOptions); - s3Client.setEndpoint(s3Endpoint); - - s3Bucket = makeS3BucketName(); - s3Base = String.format("s3://%s", s3Bucket); - CreateBucketRequest cbr = new CreateBucketRequest(s3Bucket); - s3Client.createBucket(cbr); - - conf = new Configuration(false); - initConfiguration(); - conf.set("fs.lakefs.impl", "io.lakefs.LakeFSFileSystem"); - - conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"); - conf.set(org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY, S3_ACCESS_KEY_ID); - conf.set(org.apache.hadoop.fs.s3a.Constants.SECRET_KEY, S3_SECRET_ACCESS_KEY); - conf.set(org.apache.hadoop.fs.s3a.Constants.ENDPOINT, s3Endpoint); - conf.set(org.apache.hadoop.fs.s3a.Constants.BUFFER_DIR, "/tmp/s3a"); - - System.setProperty("hadoop.home.dir", "/"); - - // Return the *same* mock for each client. Otherwise it is too hard - // to program _which_ client should do *what*. There is no risk of - // blocking, this client is synchronous. - - lfsClient = mock(LakeFSClient.class, Answers.RETURNS_SMART_NULLS); - objectsApi = mock(ObjectsApi.class, Answers.RETURNS_SMART_NULLS); - when(lfsClient.getObjectsApi()).thenReturn(objectsApi); - branchesApi = mock(BranchesApi.class, Answers.RETURNS_SMART_NULLS); - when(lfsClient.getBranchesApi()).thenReturn(branchesApi); - repositoriesApi = mock(RepositoriesApi.class, Answers.RETURNS_SMART_NULLS); - when(lfsClient.getRepositoriesApi()).thenReturn(repositoriesApi); - stagingApi = mock(StagingApi.class, Answers.RETURNS_SMART_NULLS); - when(lfsClient.getStagingApi()).thenReturn(stagingApi); - configApi = mock(ConfigApi.class, Answers.RETURNS_SMART_NULLS); - when(lfsClient.getConfigApi()).thenReturn(configApi); - when(configApi.getStorageConfig()) - .thenReturn(new StorageConfig().blockstoreType("s3").blockstoreNamespaceValidityRegex("^s3://")); - when(repositoriesApi.getRepository("repo")) - .thenReturn(new Repository().storageNamespace(s3Url("/repo-base"))); - - when(repositoriesApi.getRepository("repo")) - .thenReturn(new Repository().storageNamespace(s3Url("/repo-base"))); - - fs.initializeWithClientFactory(new URI("lakefs://repo/main/file.txt"), conf, () -> lfsClient); - } - - /** - * @return all pathnames under s3Prefix that start with prefix. (Obvious not scalable!) - */ - protected List getS3FilesByPrefix(String prefix) { - final int maxKeys = 1500; - - ListObjectsRequest req = new ListObjectsRequest() - .withBucketName(s3Bucket) - .withPrefix(prefix) - .withMaxKeys(maxKeys); - ObjectListing listing = s3Client.listObjects(req); - if (listing.isTruncated()) { - Assert.fail(String.format("[internal] no support for test that creates >%d S3 objects", maxKeys)); - } - - return Lists.transform(listing.getObjectSummaries(), S3ObjectSummary::getKey); - } - - @Test - public void getUri() { - URI u = fs.getUri(); - Assert.assertNotNull(u); - } - - @Test - public void testGetFileStatus_ExistingFile() throws ApiException, IOException { - Path p = new Path("lakefs://repo/main/exists"); - ObjectStats os = new ObjectStats(); - os.setPath("exists"); - os.checksum(UNUSED_CHECKSUM); - os.setPathType(PathTypeEnum.OBJECT); - os.setMtime(UNUSED_MTIME); - os.setSizeBytes(UNUSED_FILE_SIZE); - os.setPhysicalAddress(s3Url("/repo-base/exists")); - when(objectsApi.statObject("repo", "main", "exists", false, false)) - .thenReturn(os); - LakeFSFileStatus fileStatus = fs.getFileStatus(p); - Assert.assertTrue(fileStatus.isFile()); - Assert.assertEquals(p, fileStatus.getPath()); - } - - @Test - public void testGetFileStatus_NoFile() throws ApiException { - Path noFilePath = new Path("lakefs://repo/main/no.file"); - - when(objectsApi.statObject("repo", "main", "no.file", false, false)) - .thenThrow(noSuchFile); - when(objectsApi.statObject("repo", "main", "no.file/", false, false)) - .thenThrow(noSuchFile); - when(objectsApi.listObjects("repo", "main", false, false, "", 1, "", "no.file/")) - .thenReturn(new ObjectStatsList().results(Collections.emptyList()).pagination(new Pagination().hasMore(false))); - Assert.assertThrows(FileNotFoundException.class, () -> fs.getFileStatus(noFilePath)); - } - - @Test - public void testGetFileStatus_DirectoryMarker() throws ApiException, IOException { - Path dirPath = new Path("lakefs://repo/main/dir1/dir2"); - when(objectsApi.statObject("repo", "main", "dir1/dir2", false, false)) - .thenThrow(noSuchFile); - ObjectStats dirObjectStats = new ObjectStats(); - dirObjectStats.setPath("dir1/dir2/"); - dirObjectStats.checksum(UNUSED_CHECKSUM); - dirObjectStats.setPathType(PathTypeEnum.OBJECT); - dirObjectStats.setMtime(UNUSED_MTIME); - dirObjectStats.setSizeBytes(0L); - dirObjectStats.setPhysicalAddress(s3Url("/repo-base/dir12")); - when(objectsApi.statObject("repo", "main", "dir1/dir2/", false, false)) - .thenReturn(dirObjectStats); - LakeFSFileStatus dirStatus = fs.getFileStatus(dirPath); - Assert.assertTrue(dirStatus.isDirectory()); - Assert.assertEquals(dirPath, dirStatus.getPath()); - } - - @Test - public void testExists_ExistsAsObject() throws ApiException, IOException { - Path p = new Path("lakefs://repo/main/exis.ts"); - ObjectStats stats = new ObjectStats().path("exis.ts").pathType(PathTypeEnum.OBJECT); - when(objectsApi.listObjects(eq("repo"), eq("main"), eq(false), eq(false), eq(""), any(), eq(""), eq("exis.ts"))) - .thenReturn(new ObjectStatsList().results(ImmutableList.of(stats))); - Assert.assertTrue(fs.exists(p)); - } - - @Test - public void testExists_ExistsAsDirectoryMarker() throws ApiException, IOException { - Path p = new Path("lakefs://repo/main/exis.ts"); - ObjectStats stats = new ObjectStats().path("exis.ts/").pathType(PathTypeEnum.OBJECT); - when(objectsApi.listObjects(eq("repo"), eq("main"), eq(false), eq(false), eq(""), any(), eq(""), eq("exis.ts"))) - .thenReturn(new ObjectStatsList().results(ImmutableList.of(stats))); - Assert.assertTrue(fs.exists(p)); - } - - @Test - public void testExists_ExistsAsDirectoryContents() throws ApiException, IOException { - Path p = new Path("lakefs://repo/main/exis.ts"); - ObjectStats stats = new ObjectStats().path("exis.ts/inside").pathType(PathTypeEnum.OBJECT); - when(objectsApi.listObjects(eq("repo"), eq("main"), eq(false), eq(false), eq(""), any(), eq(""), eq("exis.ts"))) - .thenReturn(new ObjectStatsList().results(ImmutableList.of(stats))); - Assert.assertTrue(fs.exists(p)); - } - - @Test - public void testExists_ExistsAsDirectoryInSecondList() { - // TODO(ariels) - } - - @Test - public void testExists_NotExistsNoPrefix() throws ApiException, IOException { - Path p = new Path("lakefs://repo/main/doesNotExi.st"); - when(objectsApi.listObjects(any(), any(), any(), any(), any(), any(), any(), any())) - .thenReturn(new ObjectStatsList()); - boolean exists = fs.exists(p); - Assert.assertFalse(exists); - } - - @Test - public void testExists_NotExistsPrefixWithNoSlash() { - // TODO(ariels) - } - - @Test - public void testExists_NotExistsPrefixWithNoSlashTwoLists() { - // TODO(ariels) - } - - @Test - public void testDelete_FileExists() throws ApiException, IOException { - when(objectsApi.statObject("repo", "main", "no/place/file.txt", false, false)) - .thenReturn(new ObjectStats(). - path("delete/sample/file.txt"). - pathType(PathTypeEnum.OBJECT). - physicalAddress(s3Url("/repo-base/delete")). - checksum(UNUSED_CHECKSUM). - mtime(UNUSED_MTIME). - sizeBytes(UNUSED_FILE_SIZE)); - String[] arrDirs = {"no/place", "no"}; - for (String dir: arrDirs) { - when(objectsApi.statObject("repo", "main", dir, false, false)) - .thenThrow(noSuchFile); - when(objectsApi.statObject("repo", "main", dir + Constants.SEPARATOR, false, false)) - .thenThrow(noSuchFile); - when(objectsApi.listObjects(eq("repo"), eq("main"), eq(false), eq(false), eq(""), any(), eq(""), eq(dir + Constants.SEPARATOR))) - .thenReturn(new ObjectStatsList().results(Collections.emptyList()).pagination(new Pagination().hasMore(false))); - } - StagingLocation stagingLocation = new StagingLocation().token("foo").physicalAddress(s3Url("/repo-base/dir-marker")); - when(stagingApi.getPhysicalAddress("repo", "main", "no/place/", false)) - .thenReturn(stagingLocation); - - Path path = new Path("lakefs://repo/main/no/place/file.txt"); - - mockDirectoryMarker(ObjectLocation.pathToObjectLocation(null, path.getParent())); - - // return true because file found - boolean success = fs.delete(path, false); - Assert.assertTrue(success); - } - - @Test - public void testDelete_FileNotExists() throws ApiException, IOException { - doThrow(new ApiException(HttpStatus.SC_NOT_FOUND, "not found")) - .when(objectsApi).deleteObject("repo", "main", "no/place/file.txt"); - when(objectsApi.statObject("repo", "main", "no/place/file.txt", false, false)) - .thenThrow(noSuchFile); - when(objectsApi.statObject("repo", "main", "no/place/file.txt/", false, false)) - .thenThrow(noSuchFile); - when(objectsApi.listObjects(eq("repo"), eq("main"), eq(false), eq(false), eq(""), any(), eq(""), eq("no/place/file.txt/"))) - .thenReturn(new ObjectStatsList().results(Collections.emptyList()).pagination(new Pagination().hasMore(false))); - - // return false because file not found - boolean success = fs.delete(new Path("lakefs://repo/main/no/place/file.txt"), false); - Assert.assertFalse(success); - } - - @Test - public void testDelete_EmptyDirectoryExists() throws ApiException, IOException { - ObjectLocation dirObjLoc = new ObjectLocation("lakefs", "repo", "main", "delete/me"); - String key = objectLocToS3ObjKey(dirObjLoc); - - when(objectsApi.statObject(dirObjLoc.getRepository(), dirObjLoc.getRef(), dirObjLoc.getPath(), false, false)) - .thenThrow(noSuchFile); - - ObjectStats srcStats = new ObjectStats() - .path(dirObjLoc.getPath() + Constants.SEPARATOR) - .sizeBytes(0L) - .mtime(UNUSED_MTIME) - .pathType(PathTypeEnum.OBJECT) - .physicalAddress(s3Url(key+Constants.SEPARATOR)) - .checksum(UNUSED_CHECKSUM); - when(objectsApi.statObject(dirObjLoc.getRepository(), dirObjLoc.getRef(), dirObjLoc.getPath() + Constants.SEPARATOR, false, false)) - .thenReturn(srcStats) - .thenThrow(noSuchFile); - - Path path = new Path("lakefs://repo/main/delete/me"); - - mockDirectoryMarker(ObjectLocation.pathToObjectLocation(null, path.getParent())); - - boolean success = fs.delete(path, false); - Assert.assertTrue(success); - } - - @Test(expected = IOException.class) - public void testDelete_DirectoryWithFile() throws ApiException, IOException { - when(objectsApi.statObject("repo", "main", "delete/sample", false, false)) - .thenThrow(noSuchFile); - when(objectsApi.statObject("repo", "main", "delete/sample/", false, false)) - .thenThrow(noSuchFile); - when(objectsApi.listObjects(eq("repo"), eq("main"), eq(false), eq(false), eq(""), any(), eq(""), eq("delete/sample/"))) - .thenReturn(new ObjectStatsList().results(Collections.singletonList(new ObjectStats(). - path("delete/sample/file.txt"). - pathType(PathTypeEnum.OBJECT). - physicalAddress(s3Url("/repo-base/delete")). - checksum(UNUSED_CHECKSUM). - mtime(UNUSED_MTIME). - sizeBytes(UNUSED_FILE_SIZE))).pagination(new Pagination().hasMore(false))); - doThrow(new ApiException(HttpStatus.SC_NOT_FOUND, "not found")) - .when(objectsApi).deleteObject("repo", "main", "delete/sample/"); - // return false because we can't delete a directory without recursive - fs.delete(new Path("lakefs://repo/main/delete/sample"), false); - } - - @Test - public void testDelete_NotExistsRecursive() throws ApiException, IOException { - when(objectsApi.statObject(eq("repo"), eq("main"), any(), eq(false), eq(false))) - .thenThrow(noSuchFile); - when(objectsApi.listObjects(eq("repo"), eq("main"), eq(false),eq(false), eq(""), any(), eq(""), eq("no/place/file.txt/"))) - .thenReturn(new ObjectStatsList().results(Collections.emptyList()).pagination(new Pagination().hasMore(false))); - boolean delete = fs.delete(new Path("lakefs://repo/main/no/place/file.txt"), true); - Assert.assertFalse(delete); - } - - private PathList newPathList(String... paths) { - return new PathList().paths(Arrays.asList(paths)); - } - - @Test - public void testDelete_DirectoryWithFileRecursive() throws ApiException, IOException { - when(objectsApi.statObject("repo", "main", "delete/sample", false, false)) - .thenThrow(noSuchFile); - when(objectsApi.statObject("repo", "main", "delete/sample/", false, false)) - .thenThrow(noSuchFile); - when(objectsApi.listObjects(eq("repo"), eq("main"), eq(false), eq(false), eq(""), any(), eq(""), eq("delete/sample/"))) - .thenReturn(new ObjectStatsList().results(Collections - .singletonList(new ObjectStats(). - path("delete/sample/file.txt"). - pathType(PathTypeEnum.OBJECT). - physicalAddress(s3Url("/repo-base/delete")). - checksum(UNUSED_CHECKSUM). - mtime(UNUSED_MTIME). - sizeBytes(UNUSED_FILE_SIZE))) - .pagination(new Pagination().hasMore(false))); - when(objectsApi.deleteObjects("repo", "main", newPathList("delete/sample/file.txt"))) - .thenReturn(new ObjectErrorList()); - // recursive will always end successfully - Path path = new Path("lakefs://repo/main/delete/sample"); - - mockDirectoryMarker(ObjectLocation.pathToObjectLocation(null, path.getParent())); - - boolean delete = fs.delete(path, true); - Assert.assertTrue(delete); - } - - protected void caseDeleteDirectoryRecursive(int bulkSize, int numObjects) throws ApiException, IOException { - conf.setInt(LakeFSFileSystem.LAKEFS_DELETE_BULK_SIZE, bulkSize); - when(objectsApi.statObject("repo", "main", "delete/sample", false, false)) - .thenThrow(noSuchFile); - when(objectsApi.statObject("repo", "main", "delete/sample/", false, false)) - .thenThrow(noSuchFile); - - List objects = new ArrayList(); - for (int i = 0; i < numObjects; i++) { - objects.add(new ObjectStats(). - path(String.format("delete/sample/file%04d.txt", i)). - pathType(PathTypeEnum.OBJECT). - physicalAddress(s3Url(String.format("/repo-base/delete%04d", i))). - checksum(UNUSED_CHECKSUM). - mtime(UNUSED_MTIME). - sizeBytes(UNUSED_FILE_SIZE)); - } - when(objectsApi.listObjects(eq("repo"), eq("main"), eq(false), eq(false), eq(""), any(), eq(""), eq("delete/sample/"))) - .thenReturn(new ObjectStatsList() - .results(objects) - .pagination(new Pagination().hasMore(false))); - - // Set up multiple deleteObjects expectations of bulkSize deletes - // each (except for the last, which might be smaller). - for (int start = 0; start < numObjects; start += bulkSize) { - PathList pl = new PathList(); - for (int i = start; i < numObjects && i < start + bulkSize; i++) { - pl.addPathsItem(String.format("delete/sample/file%04d.txt", i)); - } - when(objectsApi.deleteObjects(eq("repo"), eq("main"), eq(pl))) - .thenReturn(new ObjectErrorList()); - } - // Mock parent directory marker creation at end of fs.delete to show - // the directory marker exists. - ObjectLocation dir = new ObjectLocation("lakefs", "repo", "main", "delete"); - mockDirectoryMarker(dir); - // recursive will always end successfully - boolean delete = fs.delete(new Path("lakefs://repo/main/delete/sample"), true); - Assert.assertTrue(delete); - } - - @Test - public void testDeleteDirectoryRecursiveBatch1() throws ApiException, IOException { - caseDeleteDirectoryRecursive(1, 123); - } - - @Test - public void testDeleteDirectoryRecursiveBatch2() throws ApiException, IOException { - caseDeleteDirectoryRecursive(2, 123); - } - - @Test - public void testDeleteDirectoryRecursiveBatch3() throws ApiException, IOException { - caseDeleteDirectoryRecursive(3, 123); - } - @Test - public void testDeleteDirectoryRecursiveBatch5() throws ApiException, IOException { - caseDeleteDirectoryRecursive(5, 123); - } - @Test - public void testDeleteDirectoryRecursiveBatch120() throws ApiException, IOException { - caseDeleteDirectoryRecursive(120, 123); - } - @Test - public void testDeleteDirectoryRecursiveBatch123() throws ApiException, IOException { - caseDeleteDirectoryRecursive(123, 123); - } - - @Test - public void testCreate() throws ApiException, IOException { - String contents = "The quick brown fox jumps over the lazy dog."; - Path p = new Path("lakefs://repo/main/sub1/sub2/create.me"); - - mockNonExistingPath(new ObjectLocation("lakefs", "repo", "main", "sub1/sub2/create.me")); - - StagingLocation stagingLocation = mockGetPhysicalAddress("repo", "main", "sub1/sub2/create.me", "repo-base/create"); - - // mock sub1/sub2 was an empty directory - ObjectLocation sub2Loc = new ObjectLocation("lakefs", "repo", "main", "sub1/sub2"); - mockEmptyDirectoryMarker(sub2Loc); - - OutputStream out = fs.create(p); - out.write(contents.getBytes()); - out.close(); - - ArgumentCaptor metadataCapture = ArgumentCaptor.forClass(StagingMetadata.class); - verify(stagingApi).linkPhysicalAddress(eq("repo"), eq("main"), eq("sub1/sub2/create.me"), - metadataCapture.capture()); - StagingMetadata actualMetadata = metadataCapture.getValue(); - Assert.assertEquals(stagingLocation, actualMetadata.getStaging()); - Assert.assertEquals(contents.getBytes().length, actualMetadata.getSizeBytes().longValue()); - - // Write succeeded, verify physical file on S3. - S3Object ret = s3Client.getObject(new GetObjectRequest(s3Bucket, "/repo-base/create")); - InputStream in = ret.getObjectContent(); - String actual = IOUtils.toString(in); - - Assert.assertEquals(contents, actual); - - List actualFiles = getS3FilesByPrefix("/"); - Assert.assertEquals(ImmutableList.of("repo-base/create"), actualFiles); - - // expected to delete the empty dir marker - verifyObjDeletion(new ObjectLocation("lakefs", "repo", "main", "sub1/sub2/")); - } - - @Test(expected = FileAlreadyExistsException.class) - public void testCreateExistingDirectory() throws ApiException, IOException { - ObjectLocation dir = new ObjectLocation("lakefs", "repo", "main", "sub1/sub2/create.me"); - mockExistingDirPath(dir, Collections.emptyList()); - fs.create(new Path("lakefs://repo/main/sub1/sub2/create.me"), false); - } - - @Test(expected = FileAlreadyExistsException.class) - public void testCreateExistingFile() throws ApiException, IOException { - ObjectLocation dir = new ObjectLocation("lakefs", "repo", "main", "sub1/sub2"); - mockExistingDirPath(dir, ImmutableList.of(new ObjectLocation("lakefs", "repo", "main", "sub1/sub2/create.me"))); - fs.create(new Path("lakefs://repo/main/sub1/sub2/create.me"), false); - } - - @Test - public void testMkdirs() throws ApiException, IOException { - // setup empty folder checks - Path testPath = new Path("dir1/dir2/dir3"); - do { - when(objectsApi.statObject("repo", "main", testPath.toString(), false, false)) - .thenThrow(noSuchFile); - when(objectsApi.statObject("repo", "main", testPath+"/", false, false)) - .thenThrow(noSuchFile); - when(objectsApi.listObjects(eq("repo"), eq("main"), eq(false), eq(false), eq(""), any(), eq(""), eq(testPath+"/"))) - .thenReturn(new ObjectStatsList().results(Collections.emptyList()).pagination(new Pagination().hasMore(false))); - testPath = testPath.getParent(); - } while(testPath != null && !testPath.isRoot()); - - // physical address to directory marker object - StagingLocation stagingLocation = mockGetPhysicalAddress("repo", "main", "dir1/dir2/dir3/", "repo-base/emptyDir"); - - // call mkdirs - Path p = new Path("lakefs://repo/main/dir1/dir2/dir3"); - boolean mkdirs = fs.mkdirs(p); - Assert.assertTrue("make dirs", mkdirs); - - // verify metadata - ArgumentCaptor metadataCapture = ArgumentCaptor.forClass(StagingMetadata.class); - verify(stagingApi).linkPhysicalAddress(eq("repo"), eq("main"), eq("dir1/dir2/dir3/"), - metadataCapture.capture()); - StagingMetadata actualMetadata = metadataCapture.getValue(); - Assert.assertEquals(stagingLocation, actualMetadata.getStaging()); - Assert.assertEquals(0, (long)actualMetadata.getSizeBytes()); - - // verify file exists on s3 - S3Object ret = s3Client.getObject(new GetObjectRequest(s3Bucket, "/repo-base/emptyDir")); - String actual = IOUtils.toString(ret.getObjectContent()); - Assert.assertEquals("", actual); - } - - @Test - public void testOpen() throws IOException, ApiException { - String contents = "The quick brown fox jumps over the lazy dog."; - byte[] contentsBytes = contents.getBytes(); - String physicalKey = "/repo-base/open"; - int readBufferSize = 5; - - // Write physical file to S3. - ObjectMetadata s3Metadata = new ObjectMetadata(); - s3Metadata.setContentLength(contentsBytes.length); - s3Client.putObject(new PutObjectRequest(s3Bucket, physicalKey, new ByteArrayInputStream(contentsBytes), s3Metadata)); - - Path p = new Path("lakefs://repo/main/read.me"); - mockStatObject("repo", "main", "read.me", physicalKey, (long)contentsBytes.length); - try (InputStream in = fs.open(p, readBufferSize)) { - String actual = IOUtils.toString(in); - Assert.assertEquals(contents, actual); - } - } - - @Test - public void testOpenWithInvalidUriChars() throws IOException, ApiException { - String contents = "The quick brown fox jumps over the lazy dog."; - byte[] contentsBytes = contents.getBytes(); - int readBufferSize = 5; - - String[] keys = { - "/repo-base/with space/open", - "/repo-base/wi:th$cha&rs#/%op;e?n", - "/repo-base/עכשיו/בעברית/open", - "/repo-base/\uD83E\uDD2F/imoji/open", - }; - for (String key : keys) { - // Write physical file to S3. - ObjectMetadata s3Metadata = new ObjectMetadata(); - s3Metadata.setContentLength(contentsBytes.length); - s3Client.putObject(new PutObjectRequest(s3Bucket, key, new ByteArrayInputStream(contentsBytes), s3Metadata)); - - Path p = new Path("lakefs://repo/main/read.me"); - mockStatObject("repo", "main", "read.me", key, (long) contentsBytes.length); - try (InputStream in = fs.open(p, readBufferSize)) { - String actual = IOUtils.toString(in); - Assert.assertEquals(contents, actual); - } - } - } - - @Test(expected = FileNotFoundException.class) - public void testOpen_NotExists() throws IOException, ApiException { - Path p = new Path("lakefs://repo/main/doesNotExi.st"); - when(objectsApi.statObject(any(), any(), any(), any(), any())) - .thenThrow(noSuchFile); - fs.open(p); - } - - /* - @Test - public void listFiles() throws IOException, URISyntaxException { - RemoteIterator it = fs.listFiles(new Path("lakefs://example1/master"), true); - List l = new ArrayList<>(); - while (it.hasNext()) { - l.add(it.next()); - } - // expected 'l' to include all the files in branch - no directory will be listed, with or without recursive - - Configuration conf = new Configuration(false); - conf.set(org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY, ""); - conf.set(org.apache.hadoop.fs.s3a.Constants.SECRET_KEY, ""); - conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"); - FileSystem fs2 = FileSystem.get(new URI("s3a://bucket/"), conf); - RemoteIterator it2 = fs2.listFiles(new Path("s3a://bucket"), true); - List l2 = new ArrayList<>(); - while (it2.hasNext()) { - l2.add(it2.next()); - } - // expected 'l2' to include all the files in bucket - no directory will be listed, with or without recursive - } - */ - - @Test - public void testListStatusFile() throws ApiException, IOException { - ObjectStats objectStats = new ObjectStats(). - path("status/file"). - pathType(PathTypeEnum.OBJECT). - physicalAddress(s3Url("/repo-base/status")). - checksum(STATUS_CHECKSUM). - mtime(STATUS_MTIME). - sizeBytes(STATUS_FILE_SIZE); - when(objectsApi.statObject("repo", "main", "status/file", false, false)) - .thenReturn(objectStats); - Path p = new Path("lakefs://repo/main/status/file"); - FileStatus[] fileStatuses = fs.listStatus(p); - LakeFSFileStatus expectedFileStatus = new LakeFSFileStatus.Builder(p) - .length(STATUS_FILE_SIZE) - .checksum(STATUS_CHECKSUM) - .mTime(STATUS_MTIME) - .physicalAddress(p.toString()) - .blockSize(Constants.DEFAULT_BLOCK_SIZE) - .build(); - LakeFSFileStatus[] expectedFileStatuses = new LakeFSFileStatus[]{expectedFileStatus}; - Assert.assertArrayEquals(expectedFileStatuses, fileStatuses); - } - - @Test(expected = FileNotFoundException.class) - public void testListStatusNotFound() throws ApiException, IOException { - when(objectsApi.statObject("repo", "main", "status/file", false, false)) - .thenThrow(new ApiException(HttpStatus.SC_NOT_FOUND, "no such file")); - when(objectsApi.statObject("repo", "main", "status/file/", false, false)) - .thenThrow(new ApiException(HttpStatus.SC_NOT_FOUND, "no such file")); - when(objectsApi.listObjects(eq("repo"), eq("main"), eq(false), eq(false), eq(""), - any(), eq("/"), eq("status/file/"))) - .thenReturn(new ObjectStatsList().results(Collections.emptyList()).pagination(new Pagination().hasMore(false))); - Path p = new Path("lakefs://repo/main/status/file"); - fs.listStatus(p); - } - - @Test - public void testListStatusDirectory() throws ApiException, IOException { - int totalObjectsCount = 3; - ObjectStatsList objects = new ObjectStatsList(); - for (int i = 0; i < totalObjectsCount; i++) { - ObjectStats objectStats = new ObjectStats(). - path("status/file" + i). - pathType(PathTypeEnum.OBJECT). - physicalAddress(s3Url("/repo-base/status" + i)). - checksum(STATUS_CHECKSUM). - mtime(STATUS_MTIME). - sizeBytes(STATUS_FILE_SIZE); - objects.addResultsItem(objectStats); - } - when(objectsApi.listObjects(eq("repo"), eq("main"), eq(false), eq(false), eq(""), - any(), eq("/"), eq("status/"))) - .thenReturn(objects.pagination(new Pagination().hasMore(false))); - when(objectsApi.statObject("repo", "main", "status", false, false)) - .thenThrow(new ApiException(HttpStatus.SC_NOT_FOUND, "no such file")); - - Path dir = new Path("lakefs://repo/main/status"); - FileStatus[] fileStatuses = fs.listStatus(dir); - FileStatus[] expectedFileStatuses = new LocatedFileStatus[totalObjectsCount]; - for (int i = 0; i < totalObjectsCount; i++) { - Path p = new Path(dir + "/file" + i); - LakeFSFileStatus fileStatus = new LakeFSFileStatus.Builder(p) - .length(STATUS_FILE_SIZE) - .checksum(STATUS_CHECKSUM) - .mTime(STATUS_MTIME) - .blockSize(Constants.DEFAULT_BLOCK_SIZE) - .physicalAddress(s3Url("/repo-base/status" + i)) - .build(); - expectedFileStatuses[i] = new LocatedFileStatus(fileStatus, null); - } - Assert.assertArrayEquals(expectedFileStatuses, fileStatuses); - } - - @Test(expected = UnsupportedOperationException.class) - public void testAppend() throws IOException { - fs.append(null, 0, null); - } - - private void mockDirectoryMarker(ObjectLocation objectLoc) throws ApiException { - // Mock parent directory to show the directory marker exists. - ObjectStats markerStats = new ObjectStats().path(objectLoc.getPath()).pathType(PathTypeEnum.OBJECT); - when(objectsApi.listObjects(eq(objectLoc.getRepository()), eq(objectLoc.getRef()), eq(false), eq(false), eq(""), any(), eq(""), eq(objectLoc.getPath()))). - thenReturn(new ObjectStatsList().results(ImmutableList.of(markerStats))); - } - - private void mockNonExistingPath(ObjectLocation objectLoc) throws ApiException { - when(objectsApi.statObject(objectLoc.getRepository(), objectLoc.getRef(), objectLoc.getPath(), false, false)) - .thenThrow(new ApiException(HttpStatus.SC_NOT_FOUND, "no such file")); - - when(objectsApi.statObject(objectLoc.getRepository(), objectLoc.getRef(), objectLoc.getPath() + Constants.SEPARATOR, false, false)) - .thenThrow(new ApiException(HttpStatus.SC_NOT_FOUND, "no such file")); - - when(objectsApi.listObjects(eq(objectLoc.getRepository()), eq(objectLoc.getRef()), eq(false), eq(false), - eq(""), any(), eq(""), eq(objectLoc.getPath() + Constants.SEPARATOR))) - .thenReturn(new ObjectStatsList().pagination(new Pagination().hasMore(false))); - } - - private void mockExistingDirPath(ObjectLocation dirObjLoc, List filesInDir) throws ApiException { - // io.lakefs.LakeFSFileSystem.getFileStatus tries to get object stats, when it can't find an object if will - // fall back to try listing items under this path to discover the objects it contains. if objects are found, - // then the path considered a directory. - - ObjectStatsList stats = new ObjectStatsList(); - - // Mock directory marker - if (filesInDir.isEmpty()) { - ObjectStats markerStat = mockEmptyDirectoryMarker(dirObjLoc); - stats.addResultsItem(markerStat); - } else { - when(objectsApi.statObject(dirObjLoc.getRepository(), dirObjLoc.getRef(), dirObjLoc.getPath(), false, false)) - .thenThrow(new ApiException(HttpStatus.SC_NOT_FOUND, "no such file")); - when(objectsApi.statObject(dirObjLoc.getRepository(), dirObjLoc.getRef(), dirObjLoc.getPath() + Constants.SEPARATOR, false, false)) - .thenThrow(new ApiException(HttpStatus.SC_NOT_FOUND, "no such file")); - } - - // Mock the files under this directory - for (ObjectLocation loc : filesInDir) { - ObjectStats fileStat = mockExistingFilePath(loc); - stats.addResultsItem(fileStat); - } - - - // Mock listing the files under this directory - stats.setPagination(new Pagination().hasMore(false)); - when(objectsApi.listObjects(eq(dirObjLoc.getRepository()), eq(dirObjLoc.getRef()), eq(false), eq(false), - eq(""), any(), eq(""), eq(dirObjLoc.getPath() + Constants.SEPARATOR))) - .thenReturn(stats); - } - - private ObjectStats mockExistingFilePath(ObjectLocation objectLoc) throws ApiException { - String key = objectLocToS3ObjKey(objectLoc); - ObjectStats srcStats = new ObjectStats() - .path(objectLoc.getPath()) - .sizeBytes(UNUSED_FILE_SIZE) - .mtime(UNUSED_MTIME) - .pathType(PathTypeEnum.OBJECT) - .physicalAddress(s3Url(key)) - .checksum(UNUSED_CHECKSUM); - when(objectsApi.statObject(objectLoc.getRepository(), objectLoc.getRef(), objectLoc.getPath(), false, false)).thenReturn(srcStats); - return srcStats; - } - - private void mockMissingCopyAPI() throws ApiException { - when(objectsApi.copyObject(any(), any(), any(), any())).thenThrow(new ApiException(HttpStatus.SC_INTERNAL_SERVER_ERROR, null, "{\"message\":\"invalid API endpoint\"}")); - when(objectsApi.stageObject(any(), any(), any(), any())).thenReturn(new ObjectStats()); - } - - private ObjectStats mockEmptyDirectoryMarker(ObjectLocation objectLoc) throws ApiException { - String key = objectLocToS3ObjKey(objectLoc); - - when(objectsApi.statObject(objectLoc.getRepository(), objectLoc.getRef(), objectLoc.getPath(), false, false)) - .thenThrow(new ApiException(HttpStatus.SC_NOT_FOUND, "no such file")); - - ObjectStats srcStats = new ObjectStats() - .path(objectLoc.getPath() + Constants.SEPARATOR) - .sizeBytes(0L) - .mtime(UNUSED_MTIME) - .pathType(PathTypeEnum.OBJECT) - .physicalAddress(s3Url(key+Constants.SEPARATOR)) - .checksum(UNUSED_CHECKSUM); - when(objectsApi.statObject(objectLoc.getRepository(), objectLoc.getRef(), objectLoc.getPath() + Constants.SEPARATOR, false, false)) - .thenReturn(srcStats); - - ObjectLocation parentLoc = objectLoc.getParent(); - while (parentLoc != null && parentLoc.isValidPath()) { - when(objectsApi.statObject(parentLoc.getRepository(), parentLoc.getRef(), parentLoc.getPath(), false, false)) - .thenThrow(new ApiException(HttpStatus.SC_NOT_FOUND, "no such file")); - when(objectsApi.statObject(parentLoc.getRepository(), parentLoc.getRef(), parentLoc.getPath()+ Constants.SEPARATOR, false, false)) - .thenThrow(new ApiException(HttpStatus.SC_NOT_FOUND, "no such file")); - when(objectsApi.listObjects(parentLoc.getRepository(), parentLoc.getRef(), false, false, "", 1, "", parentLoc.getPath() + Constants.SEPARATOR)) - .thenReturn(new ObjectStatsList().results(Collections.emptyList()).pagination(new Pagination().hasMore(false))); - parentLoc = parentLoc.getParent(); - } - return srcStats; - } - - private String objectLocToS3ObjKey(ObjectLocation objectLoc) { - return String.format("/%s/%s/%s",objectLoc.getRepository(), objectLoc.getRef(), objectLoc.getPath()); - } - - private void verifyObjDeletion(ObjectLocation srcObjLoc) throws ApiException { - verify(objectsApi).deleteObject(srcObjLoc.getRepository(), srcObjLoc.getRef(), srcObjLoc.getPath()); - } - - private boolean dstPathLinkedToSrcPhysicalAddress(ObjectLocation srcObjLoc, ObjectLocation dstObjLoc) throws ApiException { - ArgumentCaptor creationReqCapture = ArgumentCaptor.forClass(ObjectCopyCreation.class); - verify(objectsApi).copyObject(eq(dstObjLoc.getRepository()), eq(dstObjLoc.getRef()), eq(dstObjLoc.getPath()), - creationReqCapture.capture()); - ObjectCopyCreation actualCreationReq = creationReqCapture.getValue(); - // Rename is a metadata operation, therefore the dst name is expected to link to the src physical address. - return srcObjLoc.getRef().equals(actualCreationReq.getSrcRef()) && - srcObjLoc.getPath().equals(actualCreationReq.getSrcPath()); - } - - /** - * rename(src.txt, non-existing-dst) -> non-existing/new - unsupported, should fail with false - */ - @Test - public void testRename_existingFileToNonExistingDst() throws IOException, ApiException { - Path src = new Path("lakefs://repo/main/existing.src"); - ObjectLocation srcObjLoc = fs.pathToObjectLocation(src); - mockExistingFilePath(srcObjLoc); - - Path dst = new Path("lakefs://repo/main/non-existing/new"); - ObjectLocation dstObjLoc = fs.pathToObjectLocation(dst); - mockNonExistingPath(dstObjLoc); - mockNonExistingPath(fs.pathToObjectLocation(dst.getParent())); - - mockDirectoryMarker(fs.pathToObjectLocation(src.getParent())); - - boolean renamed = fs.rename(src, dst); - Assert.assertFalse(renamed); - } - - @Test - public void testRename_existingFileToExistingFileName() throws ApiException, IOException { - Path src = new Path("lakefs://repo/main/existing.src"); - ObjectLocation srcObjLoc = fs.pathToObjectLocation(src); - mockExistingFilePath(srcObjLoc); - - Path dst = new Path("lakefs://repo/main/existing.dst"); - ObjectLocation dstObjLoc = fs.pathToObjectLocation(dst); - mockExistingFilePath(dstObjLoc); - - mockDirectoryMarker(fs.pathToObjectLocation(src.getParent())); - - boolean success = fs.rename(src, dst); - Assert.assertTrue(success); - } - - @Test - public void testRename_existingDirToExistingFileName() throws ApiException, IOException { - Path fileInSrcDir = new Path("lakefs://repo/main/existing-dir/existing.src"); - ObjectLocation fileObjLoc = fs.pathToObjectLocation(fileInSrcDir); - Path srcDir = new Path("lakefs://repo/main/existing-dir"); - ObjectLocation srcDirObjLoc = fs.pathToObjectLocation(srcDir); - mockExistingDirPath(srcDirObjLoc, ImmutableList.of(fileObjLoc)); - - Path dst = new Path("lakefs://repo/main/existingdst.file"); - ObjectLocation dstObjLoc = fs.pathToObjectLocation(dst); - mockExistingFilePath(dstObjLoc); - - boolean success = fs.rename(srcDir, dst); - Assert.assertFalse(success); - } - - /** - * file -> existing-directory-name: rename(src.txt, existing-dstdir) -> existing-dstdir/src.txt - */ - @Test - public void testRename_existingFileToExistingDirName() throws ApiException, IOException { - Path src = new Path("lakefs://repo/main/existing-dir1/existing.src"); - ObjectLocation srcObjLoc = fs.pathToObjectLocation(src); - mockExistingFilePath(srcObjLoc); - - Path fileInDstDir = new Path("lakefs://repo/main/existing-dir2/existing.src"); - ObjectLocation fileObjLoc = fs.pathToObjectLocation(fileInDstDir); - Path dst = new Path("lakefs://repo/main/existing-dir2"); - ObjectLocation dstObjLoc = fs.pathToObjectLocation(dst); - mockExistingDirPath(dstObjLoc, ImmutableList.of(fileObjLoc)); - - mockDirectoryMarker(fs.pathToObjectLocation(src.getParent())); - - boolean renamed = fs.rename(src, dst); - Assert.assertTrue(renamed); - Path expectedDstPath = new Path("lakefs://repo/main/existing-dir2/existing.src"); - Assert.assertTrue(dstPathLinkedToSrcPhysicalAddress(srcObjLoc, fs.pathToObjectLocation(expectedDstPath))); - verifyObjDeletion(srcObjLoc); - } - - /** - * rename(srcDir(containing srcDir/a.txt, srcDir/b.txt), non-existing-dir/new) -> unsupported, rename should fail by returning false - */ - @Test - public void testRename_existingDirToNonExistingDirWithoutParent() throws ApiException, IOException { - Path fileInSrcDir = new Path("lakefs://repo/main/existing-dir/existing.src"); - ObjectLocation fileObjLoc = fs.pathToObjectLocation(fileInSrcDir); - Path srcDir = new Path("lakefs://repo/main/existing-dir"); - ObjectLocation srcDirObjLoc = fs.pathToObjectLocation(srcDir); - mockExistingDirPath(srcDirObjLoc, ImmutableList.of(fileObjLoc)); - mockNonExistingPath(new ObjectLocation("lakefs", "repo", "main", "non-existing-dir")); - mockNonExistingPath(new ObjectLocation("lakefs", "repo", "main", "non-existing-dir/new")); - - Path dst = new Path("lakefs://repo/main/non-existing-dir/new"); - boolean renamed = fs.rename(srcDir, dst); - Assert.assertFalse(renamed); - } - - /** - * rename(srcDir(containing srcDir/a.txt, srcDir/b.txt), non-existing-dir/new) -> unsupported, rename should fail by returning false - */ - @Test - public void testRename_existingDirToNonExistingDirWithParent() throws ApiException, IOException { - Path fileInSrcDir = new Path("lakefs://repo/main/existing-dir/existing.src"); - ObjectLocation fileObjLoc = fs.pathToObjectLocation(fileInSrcDir); - Path srcDir = new Path("lakefs://repo/main/existing-dir"); - ObjectLocation srcDirObjLoc = fs.pathToObjectLocation(srcDir); - mockExistingDirPath(srcDirObjLoc, ImmutableList.of(fileObjLoc)); - mockExistingDirPath(new ObjectLocation("lakefs", "repo", "main", "existing-dir2/new"), Collections.emptyList()); - - Path dst = new Path("lakefs://repo/main/existing-dir2/new"); - mockDirectoryMarker(fs.pathToObjectLocation(srcDir)); - - boolean renamed = fs.rename(srcDir, dst); - Assert.assertTrue(renamed); - } - - /** - * rename(srcDir(containing srcDir/a.txt), existing-nonempty-dstdir) -> unsupported, rename should fail by returning false. - */ - @Test - public void testRename_existingDirToExistingNonEmptyDirName() throws ApiException, IOException { - Path firstSrcFile = new Path("lakefs://repo/main/existing-dir1/a.src"); - ObjectLocation firstObjLoc = fs.pathToObjectLocation(firstSrcFile); - Path secSrcFile = new Path("lakefs://repo/main/existing-dir1/b.src"); - ObjectLocation secObjLoc = fs.pathToObjectLocation(secSrcFile); - - Path srcDir = new Path("lakefs://repo/main/existing-dir1"); - ObjectLocation srcDirObjLoc = fs.pathToObjectLocation(srcDir); - mockExistingDirPath(srcDirObjLoc, ImmutableList.of(firstObjLoc, secObjLoc)); - - Path fileInDstDir = new Path("lakefs://repo/main/existing-dir2/file.dst"); - ObjectLocation dstFileObjLoc = fs.pathToObjectLocation(fileInDstDir); - Path dstDir = new Path("lakefs://repo/main/existing-dir2"); - ObjectLocation dstDirObjLoc = fs.pathToObjectLocation(dstDir); - mockExistingDirPath(dstDirObjLoc, ImmutableList.of(dstFileObjLoc)); - - boolean renamed = fs.rename(srcDir, dstDir); - Assert.assertFalse(renamed); - } - - /** - * Check that a file is renamed when working against a lakeFS version - * where CopyObject API doesn't exist - */ - @Test - public void testRename_fallbackStageAPI() throws ApiException, IOException { - Path src = new Path("lakefs://repo/main/existing-dir1/existing.src"); - ObjectLocation srcObjLoc = fs.pathToObjectLocation(src); - mockExistingFilePath(srcObjLoc); - - Path fileInDstDir = new Path("lakefs://repo/main/existing-dir2/existing.src"); - ObjectLocation fileObjLoc = fs.pathToObjectLocation(fileInDstDir); - Path dst = new Path("lakefs://repo/main/existing-dir2"); - ObjectLocation dstObjLoc = fs.pathToObjectLocation(dst); - - mockExistingDirPath(dstObjLoc, ImmutableList.of(fileObjLoc)); - mockDirectoryMarker(fs.pathToObjectLocation(src.getParent())); - mockMissingCopyAPI(); - - boolean renamed = fs.rename(src, dst); - Assert.assertTrue(renamed); - Path expectedDstPath = new Path("lakefs://repo/main/existing-dir2/existing.src"); - Assert.assertTrue(dstPathLinkedToSrcPhysicalAddress(srcObjLoc, fs.pathToObjectLocation(expectedDstPath))); - verifyObjDeletion(srcObjLoc); - } - - @Test - public void testRename_srcAndDstOnDifferentBranch() throws IOException, ApiException { - Path src = new Path("lakefs://repo/branch/existing.src"); - Path dst = new Path("lakefs://repo/another-branch/existing.dst"); - boolean renamed = fs.rename(src, dst); - Assert.assertFalse(renamed); - Mockito.verify(objectsApi, never()).statObject(any(), any(), any(), any(), any()); - Mockito.verify(objectsApi, never()).copyObject(any(), any(), any(), any()); - Mockito.verify(objectsApi, never()).deleteObject(any(), any(), any()); - } - - /** - * no-op. rename is expected to succeed. - */ - @Test - public void testRename_srcEqualsDst() throws IOException, ApiException { - Path src = new Path("lakefs://repo/main/existing.src"); - Path dst = new Path("lakefs://repo/main/existing.src"); - boolean renamed = fs.rename(src, dst); - Assert.assertTrue(renamed); - Mockito.verify(objectsApi, never()).statObject(any(), any(), any(), any(), any()); - Mockito.verify(objectsApi, never()).copyObject(any(), any(), any(), any()); - Mockito.verify(objectsApi, never()).deleteObject(any(), any(), any()); - } - - @Test - public void testRename_nonExistingSrcFile() throws ApiException, IOException { - Path src = new Path("lakefs://repo/main/non-existing.src"); - ObjectLocation srcObjLoc = fs.pathToObjectLocation(src); - mockNonExistingPath(srcObjLoc); - - Path dst = new Path("lakefs://repo/main/existing.dst"); - ObjectLocation dstObjLoc = fs.pathToObjectLocation(dst); - mockExistingFilePath(dstObjLoc); - - boolean success = fs.rename(src, dst); - Assert.assertFalse(success); - } - - /** - * globStatus is used only by the Hadoop CLI where the pattern is always the exact file. - */ - @Test - public void testGlobStatus_SingleFile() throws ApiException, IOException { - Path path = new Path("lakefs://repo/main/existing.dst"); - ObjectLocation dstObjLoc = fs.pathToObjectLocation(path); - mockExistingFilePath(dstObjLoc); - - FileStatus[] statuses = fs.globStatus(path); - Assert.assertArrayEquals(new FileStatus[]{ - new LakeFSFileStatus.Builder(path).build() - }, statuses); - } -} diff --git a/clients/hadoopfs/src/test/java/io/lakefs/S3FSTestBase.java b/clients/hadoopfs/src/test/java/io/lakefs/S3FSTestBase.java new file mode 100644 index 00000000000..d2112b71051 --- /dev/null +++ b/clients/hadoopfs/src/test/java/io/lakefs/S3FSTestBase.java @@ -0,0 +1,128 @@ +package io.lakefs; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.S3ClientOptions; +import com.amazonaws.services.s3.model.*; +import com.aventrix.jnanoid.jnanoid.NanoIdUtils; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +import org.apache.commons.io.IOUtils; + +import io.lakefs.clients.api.model.*; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.utility.DockerImageName; + +import java.util.List; + +/** + * Base for all LakeFSFilesystem tests that need to access S3. It adds a + * MinIO container to FSTestBase, and configures to use it. + */ +abstract class S3FSTestBase extends FSTestBase { + static private final Logger LOG = LoggerFactory.getLogger(S3FSTestBase.class); + + protected String s3Endpoint; + protected AmazonS3 s3Client; + + private static final DockerImageName MINIO = DockerImageName.parse("minio/minio:RELEASE.2021-06-07T21-40-51Z"); + + @Rule + public final GenericContainer s3 = new GenericContainer(MINIO.toString()). + withCommand("minio", "server", "/data"). + withEnv("MINIO_ROOT_USER", S3_ACCESS_KEY_ID). + withEnv("MINIO_ROOT_PASSWORD", S3_SECRET_ACCESS_KEY). + withEnv("MINIO_DOMAIN", "s3.local.lakefs.io"). + withEnv("MINIO_UPDATE", "off"). + withExposedPorts(9000); + + @Before + public void logS3Container() { + Logger s3Logger = LoggerFactory.getLogger("s3 container"); + Slf4jLogConsumer logConsumer = new Slf4jLogConsumer(s3Logger) + .withMdc("container", "s3") + .withSeparateOutputStreams(); + s3.followOutput(logConsumer); + } + + public void s3ClientSetup() { + AWSCredentials creds = new BasicAWSCredentials(S3_ACCESS_KEY_ID, S3_SECRET_ACCESS_KEY); + + ClientConfiguration clientConfiguration = new ClientConfiguration() + .withSignerOverride("AWSS3V4SignerType"); + s3Endpoint = String.format("http://s3.local.lakefs.io:%d", s3.getMappedPort(9000)); + + s3Client = new AmazonS3Client(creds, clientConfiguration); + + S3ClientOptions s3ClientOptions = new S3ClientOptions() + .withPathStyleAccess(true); + s3Client.setS3ClientOptions(s3ClientOptions); + s3Client.setEndpoint(s3Endpoint); + + s3Bucket = makeS3BucketName(); + s3Base = String.format("s3://%s/", s3Bucket); + LOG.info("S3: bucket {} => base URL {}", s3Bucket, s3Base); + + CreateBucketRequest cbr = new CreateBucketRequest(s3Bucket); + s3Client.createBucket(cbr); + } + + /** + * @return all pathnames under s3Prefix that start with prefix. (Obvious not scalable!) + */ + protected List getS3FilesByPrefix(String prefix) { + + ListObjectsRequest req = new ListObjectsRequest() + .withBucketName(s3Bucket) + .withPrefix(prefix) + .withDelimiter(null); + + ObjectListing listing = s3Client.listObjects(req); + List summaries = listing.getObjectSummaries(); + if (listing.isTruncated()) { + Assert.fail(String.format("[internal] no support for test that creates >%d S3 objects", listing.getMaxKeys())); + } + + return Lists.transform(summaries, S3ObjectSummary::getKey); + } + + protected void assertS3Object(StagingLocation stagingLocation, String contents) { + String s3Key = getS3Key(stagingLocation); + List actualFiles = ImmutableList.of(""); + try (S3Object obj = + s3Client.getObject(new GetObjectRequest(s3Bucket, "/" + s3Key))) { + actualFiles = getS3FilesByPrefix(""); + String actual = IOUtils.toString(obj.getObjectContent()); + Assert.assertEquals(contents, actual); + + Assert.assertEquals(ImmutableList.of(s3Key), actualFiles); + } catch (Exception e) { + throw new RuntimeException("Files " + actualFiles + + "; read key " + s3Key + " failed", e); + } + } + + protected void moreHadoopSetup() { + s3ClientSetup(); + + conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"); + conf.set(org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY, S3_ACCESS_KEY_ID); + conf.set(org.apache.hadoop.fs.s3a.Constants.SECRET_KEY, S3_SECRET_ACCESS_KEY); + conf.set(org.apache.hadoop.fs.s3a.Constants.ENDPOINT, s3Endpoint); + conf.set(org.apache.hadoop.fs.s3a.Constants.BUFFER_DIR, "/tmp/s3a"); + } +} diff --git a/clients/hadoopfs/src/test/resources/log4j.properties b/clients/hadoopfs/src/test/resources/log4j.properties index f8d5a195017..2f1f6b1e300 100644 --- a/clients/hadoopfs/src/test/resources/log4j.properties +++ b/clients/hadoopfs/src/test/resources/log4j.properties @@ -4,5 +4,8 @@ log4j.appender.A1=org.apache.log4j.ConsoleAppender log4j.appender.A1.layout=org.apache.log4j.PatternLayout log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n -log4j.logger.io.lakefs=DEBUG, A1 +log4j.logger.io.lakefs=DEBUG +# Comment this out to show mockserver INFO logs. They will help you debug +# MockServer expectations and results. +log4j.logger.org.mockserver.log=WARN