Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set object-store last-modified time as mtime in lakeFSFS direct access #8388

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion clients/hadoopfs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ To export to S3:
<dependency>
<groupId>io.lakefs</groupId>
<artifactId>sdk</artifactId>
<version>1.18.0</version>
<version>1.43.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
Expand Down
13 changes: 10 additions & 3 deletions clients/hadoopfs/src/main/java/io/lakefs/LakeFSLinker.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.lakefs;

import java.io.IOException;
import java.util.Date;
import org.apache.hadoop.fs.Path;
import io.lakefs.clients.sdk.ApiException;
import io.lakefs.clients.sdk.StagingApi;
Expand All @@ -24,10 +25,16 @@ public LakeFSLinker(LakeFSFileSystem lfs, LakeFSClient lfsClient,
this.overwrite = overwrite;
}

public void link(String eTag, long byteSize) throws IOException {
public void link(String eTag, long byteSize, Date time) throws IOException {
StagingApi staging = lakeFSClient.getStagingApi();
StagingMetadata stagingMetadata =
new StagingMetadata().checksum(eTag).sizeBytes(byteSize).staging(stagingLocation);
StagingMetadata stagingMetadata = new StagingMetadata()
.checksum(eTag)
.sizeBytes(byteSize)
.staging(stagingLocation);
if (time != null) {
long secs = (time.getTime() + 500) / 1000;
stagingMetadata.setMtime(secs);
}
try {
StagingApi.APIlinkPhysicalAddressRequest request =
staging.linkPhysicalAddress(objectLoc.getRepository(), objectLoc.getRef(), objectLoc.getPath(), stagingMetadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void close() throws IOException {
if (eTag == null) {
throw new IOException("Failed to finish writing to presigned link. No ETag found.");
}
linker.link(eTag, buffer.size());
linker.link(eTag, buffer.size(), null);
if (connection.getResponseCode() > 299) {
throw new IOException("Failed to finish writing to presigned link. Response code: "
+ connection.getResponseCode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void close() throws IOException {
// the underlying Hadoop FileSystem) so we can link it on lakeFS.
if (!this.isLinked.getAndSet(true)) {
ObjectMetadata objectMetadata = metadataClient.getObjectMetadata(physicalUri);
linker.link(objectMetadata.getETag(), objectMetadata.getContentLength());
linker.link(objectMetadata.getETag(), objectMetadata.getContentLength(), objectMetadata.getLastModified());
}
}
}
8 changes: 0 additions & 8 deletions clients/hadoopfs/src/test/java/io/lakefs/FSTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,20 +121,12 @@ protected String getS3Key(StagingLocation stagingLocation) {
return removeStart(stagingLocation.getPhysicalAddress(), s3Base);
}

/**
* Override to add to Hadoop configuration.
*/
protected void addHadoopConfiguration(Configuration conf) {
}

@Before
public void hadoopSetup() throws IOException, URISyntaxException {
s3Base = "s3a://UNUSED/"; // Overridden if S3 will be used!

conf = new Configuration(false);

addHadoopConfiguration(conf);

conf.set("fs.lakefs.impl", "io.lakefs.LakeFSFileSystem");

conf.set("fs.lakefs.access.key", "unused-but-checked");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@
import io.lakefs.utils.ObjectLocation;

import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.Path;

import com.amazonaws.HttpMethod;
import com.amazonaws.services.s3.model.*;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
Expand All @@ -29,95 +28,29 @@
import static org.mockserver.model.JsonBody.json;

import java.io.*;
import java.net.URL;
import java.util.Date;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;

@RunWith(Parameterized.class)
public class LakeFSFileSystemServerS3Test extends S3FSTestBase {
static private final Logger LOG = LoggerFactory.getLogger(LakeFSFileSystemServerS3Test.class);

public static interface PhysicalAddressCreator {
default void initConfiguration(Configuration conf) {}
String createGetPhysicalAddress(S3FSTestBase o, String key);
StagingLocation createPutStagingLocation(S3FSTestBase o, String namespace, String repo, String branch, String path);
}

@Parameters(name="{1}")
public static Iterable<Object[]> data() {
return Arrays.asList(new Object[][]{
{new SimplePhysicalAddressCreator(), "simple"},
{new PresignedPhysicalAddressCreator(), "presigned"}});
}

@Parameter(1)
public String unusedAddressCreatorType;

@Parameter(0)
public PhysicalAddressCreator pac;

static private class SimplePhysicalAddressCreator implements PhysicalAddressCreator {
public String createGetPhysicalAddress(S3FSTestBase o, String key) {
return o.s3Url(key);
}

public StagingLocation createPutStagingLocation(S3FSTestBase o, String namespace, String repo, String branch, String path) {
String fullPath = String.format("%s/%s/%s/%s/%s-object",
o.sessionId(), namespace, repo, branch, path);
return new StagingLocation().physicalAddress(o.s3Url(fullPath));
}
}

static private class PresignedPhysicalAddressCreator implements PhysicalAddressCreator {
public void initConfiguration(Configuration conf) {
conf.set("fs.lakefs.access.mode", "presigned");
}

protected Date getExpiration() {
return new Date(System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1));
}

public String createGetPhysicalAddress(S3FSTestBase o, String key) {
Date expiration = getExpiration();
URL presignedUrl =
o.s3Client.generatePresignedUrl(new GeneratePresignedUrlRequest(o.s3Bucket, key)
.withMethod(HttpMethod.GET)
.withExpiration(expiration));
return presignedUrl.toString();
}

public StagingLocation createPutStagingLocation(S3FSTestBase o, String namespace, String repo, String branch, String path) {
String fullPath = String.format("%s/%s/%s/%s/%s-object",
o.sessionId(), namespace, repo, branch, path);
Date expiration = getExpiration();
URL presignedUrl =
o.s3Client.generatePresignedUrl(new GeneratePresignedUrlRequest(o.s3Bucket, fullPath)
.withMethod(HttpMethod.PUT)
.withExpiration(expiration));
return new StagingLocation()
.physicalAddress(o.s3Url(fullPath))
.presignedUrl(presignedUrl.toString());
}
}
@Parameter(1)
public String unusedAddressCreatorType;

@Override
protected void moreHadoopSetup() {
super.moreHadoopSetup();
pac.initConfiguration(conf);
}

// Return a location under namespace for this getPhysicalAddress call.
protected StagingLocation mockGetPhysicalAddress(String repo, String branch, String path, String namespace) {
StagingLocation stagingLocation =
pac.createPutStagingLocation(this, namespace, repo, branch, path);
mockServerClient.when(request()
.withMethod("GET")
.withPath(String.format("/repositories/%s/branches/%s/staging/backing", repo, branch))
.withQueryStringParameter("path", path))
.respond(response().withStatusCode(200)
.withBody(gson.toJson(stagingLocation)));
return stagingLocation;
protected PhysicalAddressCreator getPac() {
return pac;
}

@Test
Expand Down Expand Up @@ -180,7 +113,7 @@ public void testMkdirs() throws IOException {
mockGetPhysicalAddress("repo", "main", "dir1/dir2/dir3/", "repo-base/emptyDir");

ObjectStats newStats = makeObjectStats("dir1/dir2/dir3/")
.physicalAddress(pac.createGetPhysicalAddress(this, "repo-base/dir12"));
.physicalAddress(getPac().createGetPhysicalAddress(this, "repo-base/dir12"));
mockStatObject("repo", "main", "dir1/dir2/dir3/", newStats);

mockServerClient.when(request()
Expand Down Expand Up @@ -209,7 +142,7 @@ public void testCreateExistingDirectory() throws IOException {

mockStatObjectNotFound("repo", "main", "sub1/sub2/create.me");
ObjectStats stats = makeObjectStats("sub1/sub2/create.me/")
.physicalAddress(pac.createGetPhysicalAddress(this, "repo-base/sub1/sub2/create.me"));
.physicalAddress(getPac().createGetPhysicalAddress(this, "repo-base/sub1/sub2/create.me"));
mockStatObject("repo", "main", "sub1/sub2/create.me/", stats);

Exception e =
Expand All @@ -234,7 +167,7 @@ public void testOpen() throws IOException, ApiException {
String contents = "The quick brown fox jumps over the lazy dog.";
byte[] contentsBytes = contents.getBytes();
String physicalPath = sessionId() + "/repo-base/open";
String physicalKey = pac.createGetPhysicalAddress(this, physicalPath);
String physicalKey = getPac().createGetPhysicalAddress(this, physicalPath);
int readBufferSize = 5;
Path path = new Path("lakefs://repo/main/read.me");

Expand Down Expand Up @@ -285,7 +218,7 @@ public void testOpenWithInvalidUriChars() throws IOException, ApiException {

String path = String.format("lakefs://repo/main/%s-x", suffix);
ObjectStats stats = makeObjectStats(suffix + "-x")
.physicalAddress(pac.createGetPhysicalAddress(this, key))
.physicalAddress(getPac().createGetPhysicalAddress(this, key))
.sizeBytes((long) contentsBytes.length);
mockStatObject("repo", "main", suffix + "-x", stats);

Expand Down
84 changes: 80 additions & 4 deletions clients/hadoopfs/src/test/java/io/lakefs/S3FSTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.HttpMethod;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.s3.AmazonS3;
Expand All @@ -14,9 +15,12 @@
import com.google.common.collect.Lists;

import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;

import io.lakefs.clients.sdk.model.*;

import static org.mockserver.model.HttpResponse.response;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
Expand All @@ -27,6 +31,9 @@
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.utility.DockerImageName;

import java.net.URL;
import java.util.concurrent.TimeUnit;
import java.util.Date;
import java.util.List;

/**
Expand All @@ -40,10 +47,10 @@ public abstract class S3FSTestBase extends FSTestBase {
protected String s3Endpoint;
protected AmazonS3 s3Client;

private static final DockerImageName MINIO = DockerImageName.parse("minio/minio:RELEASE.2021-06-07T21-40-51Z");
private static final DockerImageName MINIO = DockerImageName.parse("minio/minio:RELEASE.2024-11-07T00-52-20Z");

@Rule
public final GenericContainer s3 = new GenericContainer(MINIO.toString()).
public final GenericContainer s3 = new GenericContainer(MINIO).
withCommand("minio", "server", "/data").
withEnv("MINIO_ROOT_USER", S3_ACCESS_KEY_ID).
withEnv("MINIO_ROOT_PASSWORD", S3_SECRET_ACCESS_KEY).
Expand All @@ -65,7 +72,7 @@ public void s3ClientSetup() {

ClientConfiguration clientConfiguration = new ClientConfiguration()
.withSignerOverride("AWSS3V4SignerType");
s3Endpoint = String.format("http://s3.local.lakefs.io:%d", s3.getMappedPort(9000));
s3Endpoint = String.format("http://s3.local.lakefs.io:%d/", s3.getMappedPort(9000));

s3Client = new AmazonS3Client(creds, clientConfiguration);

Expand All @@ -76,7 +83,8 @@ public void s3ClientSetup() {

s3Bucket = makeS3BucketName();
s3Base = String.format("s3://%s/", s3Bucket);
LOG.info("S3: bucket {} => base URL {}", s3Bucket, s3Base);
LOG.info("S3 [endpoint {}]: bucket {} => base URL {}",
s3Endpoint, s3Bucket, s3Base);

CreateBucketRequest cbr = new CreateBucketRequest(s3Bucket);
s3Client.createBucket(cbr);
Expand Down Expand Up @@ -117,6 +125,8 @@ protected void assertS3Object(StagingLocation stagingLocation, String contents)
}
}

protected abstract PhysicalAddressCreator getPac();

protected void moreHadoopSetup() {
s3ClientSetup();

Expand All @@ -125,5 +135,71 @@ protected void moreHadoopSetup() {
conf.set(org.apache.hadoop.fs.s3a.Constants.SECRET_KEY, S3_SECRET_ACCESS_KEY);
conf.set(org.apache.hadoop.fs.s3a.Constants.ENDPOINT, s3Endpoint);
conf.set(org.apache.hadoop.fs.s3a.Constants.BUFFER_DIR, "/tmp/s3a");
getPac().initConfiguration(conf);

LOG.info("Setup done!");
}

public static interface PhysicalAddressCreator {
default void initConfiguration(Configuration conf) {}
String createGetPhysicalAddress(S3FSTestBase o, String key);
StagingLocation createPutStagingLocation(S3FSTestBase o, String namespace, String repo, String branch, String path);
}

protected static class SimplePhysicalAddressCreator implements PhysicalAddressCreator {
public String createGetPhysicalAddress(S3FSTestBase o, String key) {
return o.s3Url(key);
}

public StagingLocation createPutStagingLocation(S3FSTestBase o, String namespace, String repo, String branch, String path) {
String fullPath = String.format("%s/%s/%s/%s/%s-object",
o.sessionId(), namespace, repo, branch, path);
return new StagingLocation().physicalAddress(o.s3Url(fullPath));
}
}

protected static class PresignedPhysicalAddressCreator implements PhysicalAddressCreator {
public void initConfiguration(Configuration conf) {
conf.set("fs.lakefs.access.mode", "presigned");
}

protected Date getExpiration() {
return new Date(System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1));
}

public String createGetPhysicalAddress(S3FSTestBase o, String key) {
Date expiration = getExpiration();
URL presignedUrl =
o.s3Client.generatePresignedUrl(new GeneratePresignedUrlRequest(o.s3Bucket, key)
.withMethod(HttpMethod.GET)
.withExpiration(expiration));
return presignedUrl.toString();
}

public StagingLocation createPutStagingLocation(S3FSTestBase o, String namespace, String repo, String branch, String path) {
String fullPath = String.format("%s/%s/%s/%s/%s-object",
o.sessionId(), namespace, repo, branch, path);
Date expiration = getExpiration();
URL presignedUrl =
o.s3Client.generatePresignedUrl(new GeneratePresignedUrlRequest(o.s3Bucket, fullPath)
.withMethod(HttpMethod.PUT)
.withExpiration(expiration));
return new StagingLocation()
.physicalAddress(o.s3Url(fullPath))
.presignedUrl(presignedUrl.toString());
}
}

// Return a location under namespace for this getPhysicalAddress call.
protected StagingLocation mockGetPhysicalAddress(String repo, String branch, String path, String namespace) {
StagingLocation stagingLocation =
getPac().createPutStagingLocation(this, namespace, repo, branch, path);
mockServerClient.when(request()
.withMethod("GET")
.withPath(String.format("/repositories/%s/branches/%s/staging/backing", repo, branch))
.withQueryStringParameter("path", path))
.respond(response().withStatusCode(200)
.withBody(gson.toJson(stagingLocation)));
return stagingLocation;
}
}
Loading