Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADH-4064] Preserve file attributes in sync and distcp actions #11

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,16 @@
@ActionSignature(
actionId = "sync",
displayName = "sync",
usage = SyncAction.SRC + " $src" + SyncAction.DEST + " $dest"
usage = SyncAction.SRC + " $src "
+ SyncAction.DEST + " $dest "
+ SyncAction.PRESERVE + " $attributes"
)
public class SyncAction extends SmartAction {
// related to fileDiff.src
public static final String SRC = "-src";
// related to remote cluster and fileDiff.src
public static final String DEST = "-dest";
public static final String PRESERVE = "-preserve";

@Override
protected void execute() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import java.util.Objects;

public class FileInfoUpdate {
public class FileInfoDiff {
private String path;
private Long length;
private Boolean isdir;
iamlapa marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -37,7 +37,7 @@ public String getPath() {
return path;
}

public FileInfoUpdate setPath(String path) {
public FileInfoDiff setPath(String path) {
this.path = path;
return this;
}
Expand All @@ -46,7 +46,7 @@ public Long getLength() {
return length;
}

public FileInfoUpdate setLength(long length) {
public FileInfoDiff setLength(Long length) {
this.length = length;
return this;
}
Expand All @@ -55,7 +55,7 @@ public Boolean getIsdir() {
return isdir;
}

public FileInfoUpdate setIsdir(boolean isdir) {
public FileInfoDiff setIsdir(boolean isdir) {
this.isdir = isdir;
return this;
}
Expand All @@ -64,7 +64,7 @@ public Short getBlockReplication() {
return blockReplication;
}

public FileInfoUpdate setBlockReplication(short blockReplication) {
public FileInfoDiff setBlockReplication(Short blockReplication) {
this.blockReplication = blockReplication;
return this;
}
Expand All @@ -73,7 +73,7 @@ public Long getBlocksize() {
return blocksize;
}

public FileInfoUpdate setBlocksize(long blocksize) {
public FileInfoDiff setBlocksize(Long blocksize) {
this.blocksize = blocksize;
return this;
}
Expand All @@ -82,7 +82,7 @@ public Long getModificationTime() {
return modificationTime;
}

public FileInfoUpdate setModificationTime(long modificationTime) {
public FileInfoDiff setModificationTime(Long modificationTime) {
this.modificationTime = modificationTime;
return this;
}
Expand All @@ -91,7 +91,7 @@ public Long getAccessTime() {
return accessTime;
}

public FileInfoUpdate setAccessTime(long accessTime) {
public FileInfoDiff setAccessTime(Long accessTime) {
this.accessTime = accessTime;
return this;
}
Expand All @@ -100,7 +100,7 @@ public Short getPermission() {
return permission;
}

public FileInfoUpdate setPermission(short permission) {
public FileInfoDiff setPermission(Short permission) {
this.permission = permission;
return this;
}
Expand All @@ -109,7 +109,7 @@ public String getOwner() {
return owner;
}

public FileInfoUpdate setOwner(String owner) {
public FileInfoDiff setOwner(String owner) {
this.owner = owner;
return this;
}
Expand All @@ -118,7 +118,7 @@ public String getGroup() {
return group;
}

public FileInfoUpdate setGroup(String group) {
public FileInfoDiff setGroup(String group) {
this.group = group;
return this;
}
Expand All @@ -127,7 +127,7 @@ public Byte getErasureCodingPolicy() {
return erasureCodingPolicy;
}

public FileInfoUpdate setErasureCodingPolicy(byte erasureCodingPolicy) {
public FileInfoDiff setErasureCodingPolicy(Byte erasureCodingPolicy) {
this.erasureCodingPolicy = erasureCodingPolicy;
return this;
}
Expand All @@ -140,7 +140,7 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
FileInfoUpdate that = (FileInfoUpdate) o;
FileInfoDiff that = (FileInfoDiff) o;
return Objects.equals(path, that.path)
&& Objects.equals(length, that.length)
&& Objects.equals(isdir, that.isdir)
Expand All @@ -162,7 +162,7 @@ public int hashCode() {

@Override
public String toString() {
return "FileInfoUpdate{"
return "FileInfoDiff{"
+ "path='" + path + '\''
+ ", length=" + length
+ ", isdir=" + isdir
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
*/
package org.smartdata.server.engine.rule;


import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.smartdata.action.SyncAction;
import org.smartdata.hdfs.action.CopyFileAction;
import org.smartdata.metastore.MetaStore;
import org.smartdata.metastore.MetaStoreException;
import org.smartdata.model.BackUpInfo;
Expand Down Expand Up @@ -63,6 +64,9 @@ public void onNewRuleExecutor(final RuleInfo ruleInfo, TranslateResult tResult)
CmdletDescriptor des = tResult.getCmdDescriptor();
iamlapa marked this conversation as resolved.
Show resolved Hide resolved
for (int i = 0; i < des.getActionSize(); i++) {
if (des.getActionName(i).equals("sync")) {
String rawPreserveArg = des.getActionArgs(i).get(SyncAction.PRESERVE);
// fail fast if preserve arg is not valid
validatePreserveArg(rawPreserveArg);

List<String> statements = tResult.getSqlStatements();
String before = statements.get(statements.size() - 1);
Expand Down Expand Up @@ -169,4 +173,14 @@ public void onRuleExecutorExit(final RuleInfo ruleInfo) {
}
}
}

private void validatePreserveArg(String rawPreserveArg) {
if (StringUtils.isBlank(rawPreserveArg)) {
return;
}

for (String attribute: rawPreserveArg.split(",")) {
CopyFileAction.PreserveAttribute.fromOption(attribute);
iamlapa marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,15 @@
*/
package org.smartdata.hdfs.action;

import com.google.common.collect.Sets;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
Expand All @@ -34,6 +42,13 @@
import java.io.OutputStream;
import java.net.URI;
import java.util.Map;
import org.smartdata.model.FileInfoDiff;

import static org.smartdata.hdfs.action.CopyFileAction.PreserveAttribute.MODIFICATION_TIME;
import static org.smartdata.hdfs.action.CopyFileAction.PreserveAttribute.GROUP;
import static org.smartdata.hdfs.action.CopyFileAction.PreserveAttribute.OWNER;
import static org.smartdata.hdfs.action.CopyFileAction.PreserveAttribute.PERMISSIONS;
import static org.smartdata.hdfs.action.CopyFileAction.PreserveAttribute.REPLICATION_NUMBER;

/**
* An action to copy a single file from src to destination.
Expand All @@ -44,10 +59,12 @@
@ActionSignature(
actionId = "copy",
displayName = "copy",
usage = HdfsAction.FILE_PATH + " $src " + CopyFileAction.DEST_PATH +
" $dest " + CopyFileAction.OFFSET_INDEX + " $offset" +
CopyFileAction.LENGTH +
" $length" + CopyFileAction.BUF_SIZE + " $size"
usage = HdfsAction.FILE_PATH + " $src "
+ CopyFileAction.DEST_PATH + " $dest "
+ CopyFileAction.OFFSET_INDEX + " $offset "
+ CopyFileAction.LENGTH + " $length "
+ CopyFileAction.BUF_SIZE + " $size "
+ CopyFileAction.PRESERVE + " $attributes"
)
public class CopyFileAction extends HdfsAction {
private static final Logger LOG =
Expand All @@ -56,11 +73,14 @@ public class CopyFileAction extends HdfsAction {
public static final String DEST_PATH = "-dest";
public static final String OFFSET_INDEX = "-offset";
public static final String LENGTH = "-length";
public static final String PRESERVE = "-preserve";

private String srcPath;
private String destPath;
private long offset = 0;
private long length = 0;
private int bufferSize = 64 * 1024;
private List<String> rawPreserveAttributes = Collections.emptyList();
private Configuration conf;

@Override
Expand All @@ -80,13 +100,16 @@ public void init(Map<String, String> args) {
this.destPath = args.get(DEST_PATH);
}
if (args.containsKey(BUF_SIZE)) {
bufferSize = Integer.valueOf(args.get(BUF_SIZE));
bufferSize = Integer.parseInt(args.get(BUF_SIZE));
}
if (args.containsKey(OFFSET_INDEX)) {
offset = Long.valueOf(args.get(OFFSET_INDEX));
offset = Long.parseLong(args.get(OFFSET_INDEX));
}
if (args.containsKey(LENGTH)) {
length = Long.valueOf(args.get(LENGTH));
length = Long.parseLong(args.get(LENGTH));
}
if (StringUtils.isNotBlank(args.get(PRESERVE))) {
rawPreserveAttributes = Arrays.asList(args.get(PRESERVE).split(","));
iamlapa marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand All @@ -98,6 +121,7 @@ protected void execute() throws Exception {
if (destPath == null) {
throw new IllegalArgumentException("Dest File parameter is missing.");
}
Set<PreserveAttribute> preserveAttributes = parsePreserveAttributes();
appendLog(
String.format("Action starts at %s : Read %s",
Utils.getFormatedCurrentTime(), srcPath));
Expand All @@ -112,12 +136,13 @@ protected void execute() throws Exception {
if (length != 0) {
copyWithOffset(srcPath, destPath, bufferSize, offset, length);
}
copyAttributes(preserveAttributes);
appendLog("Copy Successfully!!");
}

private boolean copySingleFile(String src, String dest) throws IOException {
//get The file size of source file
long fileSize = getFileSize(src);
long fileSize = getFileStatus(src).getLen();
appendLog(
String.format("Copy the whole file with length %s", fileSize));
return copyWithOffset(src, dest, bufferSize, 0, fileSize);
Expand Down Expand Up @@ -160,13 +185,13 @@ private boolean copyWithOffset(String src, String dest, int bufferSize,
}
}

private long getFileSize(String fileName) throws IOException {
private FileStatus getFileStatus(String fileName) throws IOException {
FileSystem fs = FileSystem.get(URI.create(fileName), conf);
if (fileName.startsWith("hdfs")) {
// Get InputStream from URL
FileSystem fs = FileSystem.get(URI.create(fileName), conf);
return fs.getFileStatus(new Path(fileName)).getLen();
return fs.getFileStatus(new Path(fileName));
} else {
return dfsClient.getFileInfo(fileName).getLen();
return (FileStatus) dfsClient.getFileInfo(fileName);
}
}

Expand Down Expand Up @@ -211,4 +236,71 @@ private OutputStream getDestOutPutStream(String dest, long offset) throws IOExce
.getDFSClientAppend(dfsClient, dest, bufferSize, offset);
}
}

private Set<PreserveAttribute> parsePreserveAttributes() {
Set<PreserveAttribute> attributesFromOptions = rawPreserveAttributes
.stream()
.map(PreserveAttribute::fromOption)
.collect(Collectors.toSet());

return attributesFromOptions.isEmpty()
// preserve file owner, group and permissions by default
? Sets.newHashSet(OWNER, GROUP, PERMISSIONS)
: attributesFromOptions;
}

private void copyAttributes(Set<PreserveAttribute> preserveAttributes) throws IOException {
FileStatus srcFileStatus = getFileStatus(srcPath);
FileInfoDiff fileInfoDiff = new FileInfoDiff();

if (preserveAttributes.contains(PERMISSIONS)) {
fileInfoDiff.setPermission(srcFileStatus.getPermission().toShort());
}

if (preserveAttributes.contains(OWNER)) {
fileInfoDiff.setOwner(srcFileStatus.getOwner());
}

if (preserveAttributes.contains(GROUP)) {
fileInfoDiff.setGroup(srcFileStatus.getGroup());
}

if (preserveAttributes.contains(REPLICATION_NUMBER)) {
fileInfoDiff.setBlockReplication(srcFileStatus.getReplication());
}

if (preserveAttributes.contains(MODIFICATION_TIME)) {
fileInfoDiff.setModificationTime(srcFileStatus.getModificationTime());
}

MetaDataAction.changeFileMetadata(destPath, fileInfoDiff, dfsClient, conf);
appendLog("Successfully transferred file attributes: " + preserveAttributes);
}

public enum PreserveAttribute {
OWNER("owner"),
GROUP("group"),
PERMISSIONS("permissions"),
REPLICATION_NUMBER("replication"),
MODIFICATION_TIME("modification-time");

private final String name;

PreserveAttribute(String name) {
this.name = name;
}

public static PreserveAttribute fromOption(String option) {
return Arrays.stream(PreserveAttribute.values())
.filter(attr -> attr.name.equals(option))
.findFirst()
.orElseThrow(() ->
new IllegalArgumentException("Wrong preserve attribute: " + option));
}

@Override
public String toString() {
return name;
}
}
}
Loading
Loading