Skip to content

Commit

Permalink
Use copy instead of stage object in lakeFSFS rename (#5028)
Browse files Browse the repository at this point in the history
Co-authored-by: Ariel Shaqed (Scolnicov) <[email protected]>
  • Loading branch information
itaiad200 and arielshaqed authored Jan 17, 2023
1 parent 55143e9 commit 7a8e189
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 17 deletions.
2 changes: 1 addition & 1 deletion clients/hadoopfs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ To export to S3:
<dependency>
<groupId>io.lakefs</groupId>
<artifactId>api-client</artifactId>
<version>0.81.0</version>
<version>0.89.1-RC1</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
Expand Down
45 changes: 36 additions & 9 deletions clients/hadoopfs/src/main/java/io/lakefs/LakeFSFileSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,12 @@ private boolean renameFile(LakeFSFileStatus srcStatus, Path dst) throws IOExcept
return renameObject(srcStatus, dst);
}

/**
* fallbackToStage determines whether the old StageObject API should be use,
* turn true when CopyObject API is not supported.
*/
private boolean fallbackToStage = false;

/**
* Non-atomic rename operation.
*
Expand All @@ -440,16 +446,37 @@ private boolean renameObject(LakeFSFileStatus srcStatus, Path dst) throws IOExce

ObjectsApi objects = lfsClient.getObjects();
//TODO (Tals): Can we add metadata? we currently don't have an API to get the metadata of an object.
ObjectStageCreation creationReq = new ObjectStageCreation()
.checksum(srcStatus.getChecksum())
.sizeBytes(srcStatus.getLen())
.physicalAddress(srcStatus.getPhysicalAddress());

try {
objects.stageObject(dstObjectLoc.getRepository(), dstObjectLoc.getRef(), dstObjectLoc.getPath(),
creationReq);
} catch (ApiException e) {
throw translateException("renameObject: src:" + srcStatus.getPath() + ", dst: " + dst + ", failed to stage object", e);
if (!fallbackToStage) {
try {
ObjectCopyCreation creationReq = new ObjectCopyCreation()
.srcRef(srcObjectLoc.getRef())
.srcPath(srcObjectLoc.getPath());
objects.copyObject(dstObjectLoc.getRepository(), dstObjectLoc.getRef(), dstObjectLoc.getPath(),
creationReq);
} catch (ApiException e) {
if (e.getCode() != HttpStatus.SC_INTERNAL_SERVER_ERROR ||
!e.getMessage().contains("invalid API endpoint")) {
throw translateException("renameObject: src:" + srcStatus.getPath() + ", dst: " + dst + ", failed to copy object", e);
}

LOG.warn("Copy API doesn't exist, falling back to stageObject");
fallbackToStage = true;
}
}

if (fallbackToStage) {
ObjectStageCreation stageCreationReq = new ObjectStageCreation()
.checksum(srcStatus.getChecksum())
.sizeBytes(srcStatus.getLen())
.physicalAddress(srcStatus.getPhysicalAddress());
try {
objects.stageObject(dstObjectLoc.getRepository(), dstObjectLoc.getRef(), dstObjectLoc.getPath(),
stageCreationReq);
} catch (ApiException e) {
throw translateException("renameObject: src:" + srcStatus.getPath() + ", dst: " + dst +
", failed to stage object", e);
}
}

// delete src path
Expand Down
45 changes: 38 additions & 7 deletions clients/hadoopfs/src/test/java/io/lakefs/LakeFSFileSystemTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -783,6 +783,11 @@ private ObjectStats mockExistingFilePath(ObjectLocation objectLoc) throws ApiExc
return srcStats;
}

private void mockMissingCopyAPI() throws ApiException {
when(objectsApi.copyObject(any(), any(), any(), any())).thenThrow(new ApiException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "{\"message\":\"invalid API endpoint\"}"));
when(objectsApi.stageObject(any(), any(), any(), any())).thenReturn(new ObjectStats());
}

private ObjectStats mockEmptyDirectoryMarker(ObjectLocation objectLoc) throws ApiException {
String key = objectLocToS3ObjKey(objectLoc);

Expand Down Expand Up @@ -821,13 +826,13 @@ private void verifyObjDeletion(ObjectLocation srcObjLoc) throws ApiException {
}

private boolean dstPathLinkedToSrcPhysicalAddress(ObjectLocation srcObjLoc, ObjectLocation dstObjLoc) throws ApiException {
ArgumentCaptor<ObjectStageCreation> creationReqCapture = ArgumentCaptor.forClass(ObjectStageCreation.class);
verify(objectsApi).stageObject(eq(dstObjLoc.getRepository()), eq(dstObjLoc.getRef()), eq(dstObjLoc.getPath()),
ArgumentCaptor<ObjectCopyCreation> creationReqCapture = ArgumentCaptor.forClass(ObjectCopyCreation.class);
verify(objectsApi).copyObject(eq(dstObjLoc.getRepository()), eq(dstObjLoc.getRef()), eq(dstObjLoc.getPath()),
creationReqCapture.capture());
ObjectStageCreation actualCreationReq = creationReqCapture.getValue();
ObjectCopyCreation actualCreationReq = creationReqCapture.getValue();
// Rename is a metadata operation, therefore the dst name is expected to link to the src physical address.
String expectedPhysicalAddress = s3Url(objectLocToS3ObjKey(srcObjLoc));
return expectedPhysicalAddress.equals(actualCreationReq.getPhysicalAddress());
return srcObjLoc.getRef().equals(actualCreationReq.getSrcRef()) &&
srcObjLoc.getPath().equals(actualCreationReq.getSrcPath());
}

/**
Expand Down Expand Up @@ -967,14 +972,40 @@ public void testRename_existingDirToExistingNonEmptyDirName() throws ApiExceptio
Assert.assertFalse(renamed);
}

/**
* Check that a file is renamed when working against a lakeFS version
* where CopyObject API doesn't exist
*/
@Test
public void testRename_fallbackStageAPI() throws ApiException, IOException {
Path src = new Path("lakefs://repo/main/existing-dir1/existing.src");
ObjectLocation srcObjLoc = fs.pathToObjectLocation(src);
mockExistingFilePath(srcObjLoc);

Path fileInDstDir = new Path("lakefs://repo/main/existing-dir2/existing.src");
ObjectLocation fileObjLoc = fs.pathToObjectLocation(fileInDstDir);
Path dst = new Path("lakefs://repo/main/existing-dir2");
ObjectLocation dstObjLoc = fs.pathToObjectLocation(dst);

mockExistingDirPath(dstObjLoc, ImmutableList.of(fileObjLoc));
mockDirectoryMarker(fs.pathToObjectLocation(src.getParent()));
mockMissingCopyAPI();

boolean renamed = fs.rename(src, dst);
Assert.assertTrue(renamed);
Path expectedDstPath = new Path("lakefs://repo/main/existing-dir2/existing.src");
Assert.assertTrue(dstPathLinkedToSrcPhysicalAddress(srcObjLoc, fs.pathToObjectLocation(expectedDstPath)));
verifyObjDeletion(srcObjLoc);
}

@Test
public void testRename_srcAndDstOnDifferentBranch() throws IOException, ApiException {
Path src = new Path("lakefs://repo/branch/existing.src");
Path dst = new Path("lakefs://repo/another-branch/existing.dst");
boolean renamed = fs.rename(src, dst);
Assert.assertFalse(renamed);
Mockito.verify(objectsApi, never()).statObject(any(), any(), any(), any());
Mockito.verify(objectsApi, never()).stageObject(any(), any(), any(), any());
Mockito.verify(objectsApi, never()).copyObject(any(), any(), any(), any());
Mockito.verify(objectsApi, never()).deleteObject(any(), any(), any());
}

Expand All @@ -988,7 +1019,7 @@ public void testRename_srcEqualsDst() throws IOException, ApiException {
boolean renamed = fs.rename(src, dst);
Assert.assertTrue(renamed);
Mockito.verify(objectsApi, never()).statObject(any(), any(), any(), any());
Mockito.verify(objectsApi, never()).stageObject(any(), any(), any(), any());
Mockito.verify(objectsApi, never()).copyObject(any(), any(), any(), any());
Mockito.verify(objectsApi, never()).deleteObject(any(), any(), any());
}

Expand Down

0 comments on commit 7a8e189

Please sign in to comment.