diff --git a/smart-action/src/main/java/org/smartdata/action/SyncAction.java b/smart-action/src/main/java/org/smartdata/action/SyncAction.java index 70be85d87bd..e4ff30fd282 100644 --- a/smart-action/src/main/java/org/smartdata/action/SyncAction.java +++ b/smart-action/src/main/java/org/smartdata/action/SyncAction.java @@ -27,13 +27,16 @@ @ActionSignature( actionId = "sync", displayName = "sync", - usage = SyncAction.SRC + " $src" + SyncAction.DEST + " $dest" + usage = SyncAction.SRC + " $src " + + SyncAction.DEST + " $dest " + + SyncAction.PRESERVE + " $attributes" ) public class SyncAction extends SmartAction { // related to fileDiff.src public static final String SRC = "-src"; // related to remote cluster and fileDiff.src public static final String DEST = "-dest"; + public static final String PRESERVE = "-preserve"; @Override protected void execute() throws Exception { diff --git a/smart-common/src/main/java/org/smartdata/model/FileInfoUpdate.java b/smart-common/src/main/java/org/smartdata/model/FileInfoDiff.java similarity index 71% rename from smart-common/src/main/java/org/smartdata/model/FileInfoUpdate.java rename to smart-common/src/main/java/org/smartdata/model/FileInfoDiff.java index 57f00e2602d..788d5b20808 100644 --- a/smart-common/src/main/java/org/smartdata/model/FileInfoUpdate.java +++ b/smart-common/src/main/java/org/smartdata/model/FileInfoDiff.java @@ -20,12 +20,10 @@ import java.util.Objects; -public class FileInfoUpdate { +public class FileInfoDiff { private String path; private Long length; - private Boolean isdir; private Short blockReplication; - private Long blocksize; private Long modificationTime; private Long accessTime; private Short permission; @@ -37,7 +35,7 @@ public String getPath() { return path; } - public FileInfoUpdate setPath(String path) { + public FileInfoDiff setPath(String path) { this.path = path; return this; } @@ -46,43 +44,25 @@ public Long getLength() { return length; } - public FileInfoUpdate setLength(long length) { + public FileInfoDiff setLength(Long length) { this.length = length; return this; } - public Boolean getIsdir() { - return isdir; - } - - public FileInfoUpdate setIsdir(boolean isdir) { - this.isdir = isdir; - return this; - } - public Short getBlockReplication() { return blockReplication; } - public FileInfoUpdate setBlockReplication(short blockReplication) { + public FileInfoDiff setBlockReplication(Short blockReplication) { this.blockReplication = blockReplication; return this; } - public Long getBlocksize() { - return blocksize; - } - - public FileInfoUpdate setBlocksize(long blocksize) { - this.blocksize = blocksize; - return this; - } - public Long getModificationTime() { return modificationTime; } - public FileInfoUpdate setModificationTime(long modificationTime) { + public FileInfoDiff setModificationTime(Long modificationTime) { this.modificationTime = modificationTime; return this; } @@ -91,7 +71,7 @@ public Long getAccessTime() { return accessTime; } - public FileInfoUpdate setAccessTime(long accessTime) { + public FileInfoDiff setAccessTime(Long accessTime) { this.accessTime = accessTime; return this; } @@ -100,7 +80,7 @@ public Short getPermission() { return permission; } - public FileInfoUpdate setPermission(short permission) { + public FileInfoDiff setPermission(Short permission) { this.permission = permission; return this; } @@ -109,7 +89,7 @@ public String getOwner() { return owner; } - public FileInfoUpdate setOwner(String owner) { + public FileInfoDiff setOwner(String owner) { this.owner = owner; return this; } @@ -118,7 +98,7 @@ public String getGroup() { return group; } - public FileInfoUpdate setGroup(String group) { + public FileInfoDiff setGroup(String group) { this.group = group; return this; } @@ -127,7 +107,7 @@ public Byte getErasureCodingPolicy() { return erasureCodingPolicy; } - public FileInfoUpdate setErasureCodingPolicy(byte erasureCodingPolicy) { + public FileInfoDiff setErasureCodingPolicy(Byte erasureCodingPolicy) { this.erasureCodingPolicy = erasureCodingPolicy; return this; } @@ -140,12 +120,10 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) { return false; } - FileInfoUpdate that = (FileInfoUpdate) o; + FileInfoDiff that = (FileInfoDiff) o; return Objects.equals(path, that.path) && Objects.equals(length, that.length) - && Objects.equals(isdir, that.isdir) && Objects.equals(blockReplication, that.blockReplication) - && Objects.equals(blocksize, that.blocksize) && Objects.equals(modificationTime, that.modificationTime) && Objects.equals(accessTime, that.accessTime) && Objects.equals(permission, that.permission) @@ -156,18 +134,16 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Objects.hash(path, length, isdir, blockReplication, - blocksize, modificationTime, accessTime, permission, owner, group, erasureCodingPolicy); + return Objects.hash(path, length, blockReplication, + modificationTime, accessTime, permission, owner, group, erasureCodingPolicy); } @Override public String toString() { - return "FileInfoUpdate{" + return "FileInfoDiff{" + "path='" + path + '\'' + ", length=" + length - + ", isdir=" + isdir + ", blockReplication=" + blockReplication - + ", blocksize=" + blocksize + ", modificationTime=" + modificationTime + ", accessTime=" + accessTime + ", permission=" + permission diff --git a/smart-common/src/main/java/org/smartdata/utils/ConfigUtil.java b/smart-common/src/main/java/org/smartdata/utils/ConfigUtil.java new file mode 100644 index 00000000000..d8560c9972e --- /dev/null +++ b/smart-common/src/main/java/org/smartdata/utils/ConfigUtil.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.smartdata.utils; + +import org.apache.hadoop.conf.Configuration; + +import static org.smartdata.SmartConstants.DISTRIBUTED_FILE_SYSTEM; +import static org.smartdata.SmartConstants.FS_HDFS_IMPL; +import static org.smartdata.SmartConstants.SMART_FILE_SYSTEM; + +public class ConfigUtil { + public static Configuration toRemoteClusterConfig(Configuration configuration) { + Configuration remoteConfig = new Configuration(configuration); + if (SMART_FILE_SYSTEM.equals(remoteConfig.get(FS_HDFS_IMPL))) { + remoteConfig.set(FS_HDFS_IMPL, DISTRIBUTED_FILE_SYSTEM); + } + + return remoteConfig; + } +} diff --git a/smart-engine/src/main/java/org/smartdata/server/engine/rule/FileCopyDrPlugin.java b/smart-engine/src/main/java/org/smartdata/server/engine/rule/FileCopyDrPlugin.java index a1d473c38d3..89125ad19b7 100644 --- a/smart-engine/src/main/java/org/smartdata/server/engine/rule/FileCopyDrPlugin.java +++ b/smart-engine/src/main/java/org/smartdata/server/engine/rule/FileCopyDrPlugin.java @@ -17,10 +17,11 @@ */ package org.smartdata.server.engine.rule; - +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.smartdata.action.SyncAction; +import org.smartdata.hdfs.action.CopyFileAction; import org.smartdata.metastore.MetaStore; import org.smartdata.metastore.MetaStoreException; import org.smartdata.model.BackUpInfo; @@ -43,8 +44,8 @@ import static org.smartdata.utils.StringUtil.ssmPatternsToRegex; public class FileCopyDrPlugin implements RuleExecutorPlugin { - private MetaStore metaStore; - private Map> backups = new HashMap<>(); + private final MetaStore metaStore; + private final Map> backups = new HashMap<>(); private static final Logger LOG = LoggerFactory.getLogger(FileCopyDrPlugin.class.getName()); @@ -63,6 +64,9 @@ public void onNewRuleExecutor(final RuleInfo ruleInfo, TranslateResult tResult) CmdletDescriptor des = tResult.getCmdDescriptor(); for (int i = 0; i < des.getActionSize(); i++) { if (des.getActionName(i).equals("sync")) { + String rawPreserveArg = des.getActionArgs(i).get(SyncAction.PRESERVE); + // fail fast if preserve arg is not valid + validatePreserveArg(rawPreserveArg); List statements = tResult.getSqlStatements(); String before = statements.get(statements.size() - 1); @@ -169,4 +173,14 @@ public void onRuleExecutorExit(final RuleInfo ruleInfo) { } } } + + private void validatePreserveArg(String rawPreserveArg) { + if (StringUtils.isBlank(rawPreserveArg)) { + return; + } + + for (String attribute: rawPreserveArg.split(",")) { + CopyFileAction.PreserveAttribute.validate(attribute); + } + } } diff --git a/smart-hadoop-support/smart-hadoop-common/src/test/java/org/smartdata/hdfs/MultiClusterHarness.java b/smart-hadoop-support/smart-hadoop-common/src/test/java/org/smartdata/hdfs/MultiClusterHarness.java new file mode 100644 index 00000000000..e8ba5a0e62c --- /dev/null +++ b/smart-hadoop-support/smart-hadoop-common/src/test/java/org/smartdata/hdfs/MultiClusterHarness.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.smartdata.hdfs; + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.smartdata.hdfs.MultiClusterHarness.TestType.INTER_CLUSTER; +import static org.smartdata.hdfs.MultiClusterHarness.TestType.INTRA_CLUSTER; + +/** + * A MiniCluster for action test. + */ +@RunWith(Parameterized.class) +public abstract class MultiClusterHarness extends MiniClusterHarness { + + public enum TestType { + INTRA_CLUSTER, + INTER_CLUSTER + } + + @Rule + public TemporaryFolder tmpFolder = new TemporaryFolder(); + + @Parameterized.Parameter() + public TestType testType; + + protected MiniDFSCluster anotherCluster; + protected DistributedFileSystem anotherDfs; + protected DFSClient anotherDfsClient; + + @Parameterized.Parameters(name = "Test type - {0}") + public static Object[] parameters() { + return new Object[] {INTRA_CLUSTER, INTER_CLUSTER}; + } + + @Before + public void setUp() throws Exception { + if (testType == INTRA_CLUSTER) { + anotherDfs = dfs; + anotherDfsClient = dfsClient; + return; + } + Configuration clusterConfig = new Configuration(smartContext.getConf()); + clusterConfig.set("hdfs.minidfs.basedir", tmpFolder.newFolder().getAbsolutePath()); + anotherCluster = createCluster(clusterConfig); + anotherDfs = anotherCluster.getFileSystem(); + anotherDfsClient = anotherDfs.getClient(); + } + + @After + public void shutdown() throws IOException { + if (anotherCluster != null) { + anotherCluster.shutdown(true); + } + } + + protected Path anotherClusterPath(String parent, String child) { + return anotherDfs.makeQualified(new Path(parent, child)); + } + + protected String pathToActionArg(Path path) { + return testType == TestType.INTER_CLUSTER ? path.toString() : path.toUri().getPath(); + } +} diff --git a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/action/CopyFileAction.java b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/action/CopyFileAction.java index ad5fc032c40..0d5c7b40dd5 100644 --- a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/action/CopyFileAction.java +++ b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/action/CopyFileAction.java @@ -17,7 +17,15 @@ */ package org.smartdata.hdfs.action; +import com.google.common.collect.Sets; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -34,6 +42,13 @@ import java.io.OutputStream; import java.net.URI; import java.util.Map; +import org.smartdata.model.FileInfoDiff; + +import static org.smartdata.hdfs.action.CopyFileAction.PreserveAttribute.MODIFICATION_TIME; +import static org.smartdata.hdfs.action.CopyFileAction.PreserveAttribute.GROUP; +import static org.smartdata.hdfs.action.CopyFileAction.PreserveAttribute.OWNER; +import static org.smartdata.hdfs.action.CopyFileAction.PreserveAttribute.PERMISSIONS; +import static org.smartdata.hdfs.action.CopyFileAction.PreserveAttribute.REPLICATION_NUMBER; /** * An action to copy a single file from src to destination. @@ -44,10 +59,12 @@ @ActionSignature( actionId = "copy", displayName = "copy", - usage = HdfsAction.FILE_PATH + " $src " + CopyFileAction.DEST_PATH + - " $dest " + CopyFileAction.OFFSET_INDEX + " $offset" + - CopyFileAction.LENGTH + - " $length" + CopyFileAction.BUF_SIZE + " $size" + usage = HdfsAction.FILE_PATH + " $src " + + CopyFileAction.DEST_PATH + " $dest " + + CopyFileAction.OFFSET_INDEX + " $offset " + + CopyFileAction.LENGTH + " $length " + + CopyFileAction.BUF_SIZE + " $size " + + CopyFileAction.PRESERVE + " $attributes" ) public class CopyFileAction extends HdfsAction { private static final Logger LOG = @@ -56,11 +73,14 @@ public class CopyFileAction extends HdfsAction { public static final String DEST_PATH = "-dest"; public static final String OFFSET_INDEX = "-offset"; public static final String LENGTH = "-length"; + public static final String PRESERVE = "-preserve"; + private String srcPath; private String destPath; private long offset = 0; private long length = 0; private int bufferSize = 64 * 1024; + private List rawPreserveAttributes = Collections.emptyList(); private Configuration conf; @Override @@ -80,13 +100,16 @@ public void init(Map args) { this.destPath = args.get(DEST_PATH); } if (args.containsKey(BUF_SIZE)) { - bufferSize = Integer.valueOf(args.get(BUF_SIZE)); + bufferSize = Integer.parseInt(args.get(BUF_SIZE)); } if (args.containsKey(OFFSET_INDEX)) { - offset = Long.valueOf(args.get(OFFSET_INDEX)); + offset = Long.parseLong(args.get(OFFSET_INDEX)); } if (args.containsKey(LENGTH)) { - length = Long.valueOf(args.get(LENGTH)); + length = Long.parseLong(args.get(LENGTH)); + } + if (StringUtils.isNotBlank(args.get(PRESERVE))) { + rawPreserveAttributes = Arrays.asList(args.get(PRESERVE).split(",")); } } @@ -98,6 +121,7 @@ protected void execute() throws Exception { if (destPath == null) { throw new IllegalArgumentException("Dest File parameter is missing."); } + Set preserveAttributes = parsePreserveAttributes(); appendLog( String.format("Action starts at %s : Read %s", Utils.getFormatedCurrentTime(), srcPath)); @@ -112,12 +136,13 @@ protected void execute() throws Exception { if (length != 0) { copyWithOffset(srcPath, destPath, bufferSize, offset, length); } + copyAttributes(preserveAttributes); appendLog("Copy Successfully!!"); } private boolean copySingleFile(String src, String dest) throws IOException { //get The file size of source file - long fileSize = getFileSize(src); + long fileSize = getFileStatus(src).getLen(); appendLog( String.format("Copy the whole file with length %s", fileSize)); return copyWithOffset(src, dest, bufferSize, 0, fileSize); @@ -160,13 +185,13 @@ private boolean copyWithOffset(String src, String dest, int bufferSize, } } - private long getFileSize(String fileName) throws IOException { + private FileStatus getFileStatus(String fileName) throws IOException { + FileSystem fs = FileSystem.get(URI.create(fileName), conf); if (fileName.startsWith("hdfs")) { // Get InputStream from URL - FileSystem fs = FileSystem.get(URI.create(fileName), conf); - return fs.getFileStatus(new Path(fileName)).getLen(); + return fs.getFileStatus(new Path(fileName)); } else { - return dfsClient.getFileInfo(fileName).getLen(); + return (FileStatus) dfsClient.getFileInfo(fileName); } } @@ -211,4 +236,75 @@ private OutputStream getDestOutPutStream(String dest, long offset) throws IOExce .getDFSClientAppend(dfsClient, dest, bufferSize, offset); } } + + private Set parsePreserveAttributes() { + Set attributesFromOptions = rawPreserveAttributes + .stream() + .map(PreserveAttribute::fromOption) + .collect(Collectors.toSet()); + + return attributesFromOptions.isEmpty() + // preserve file owner, group and permissions by default + ? Sets.newHashSet(OWNER, GROUP, PERMISSIONS) + : attributesFromOptions; + } + + private void copyAttributes(Set preserveAttributes) throws IOException { + FileStatus srcFileStatus = getFileStatus(srcPath); + FileInfoDiff fileInfoDiff = new FileInfoDiff(); + + if (preserveAttributes.contains(PERMISSIONS)) { + fileInfoDiff.setPermission(srcFileStatus.getPermission().toShort()); + } + + if (preserveAttributes.contains(OWNER)) { + fileInfoDiff.setOwner(srcFileStatus.getOwner()); + } + + if (preserveAttributes.contains(GROUP)) { + fileInfoDiff.setGroup(srcFileStatus.getGroup()); + } + + if (preserveAttributes.contains(REPLICATION_NUMBER)) { + fileInfoDiff.setBlockReplication(srcFileStatus.getReplication()); + } + + if (preserveAttributes.contains(MODIFICATION_TIME)) { + fileInfoDiff.setModificationTime(srcFileStatus.getModificationTime()); + } + + MetaDataAction.changeFileMetadata(destPath, fileInfoDiff, conf); + appendLog("Successfully transferred file attributes: " + preserveAttributes); + } + + public enum PreserveAttribute { + OWNER("owner"), + GROUP("group"), + PERMISSIONS("permissions"), + REPLICATION_NUMBER("replication"), + MODIFICATION_TIME("modification-time"); + + private final String name; + + PreserveAttribute(String name) { + this.name = name; + } + + public static PreserveAttribute fromOption(String option) { + return Arrays.stream(PreserveAttribute.values()) + .filter(attr -> attr.name.equals(option)) + .findFirst() + .orElseThrow(() -> + new IllegalArgumentException("Wrong preserve attribute: " + option)); + } + + public static void validate(String option) { + fromOption(option); + } + + @Override + public String toString() { + return name; + } + } } diff --git a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/action/DistCpAction.java b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/action/DistCpAction.java index d3aa2e941f6..2da2e70a679 100644 --- a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/action/DistCpAction.java +++ b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/action/DistCpAction.java @@ -46,7 +46,10 @@ public class DistCpAction extends HdfsAction { public static final String TARGET_ARG = "-target"; public static final String SOURCE_PATH_LIST_FILE = "-f"; + // preserve file owner, group and permissions by default + private static final String PRESERVE_DISTCP_OPTION_DEFAULT = "-pugp"; private static final String SOURCE_PATHS_DELIMITER = ","; + private static final String PRESERVE_DISTCP_OPTION_PREFIX = "-p"; private String sourcePaths; @@ -64,6 +67,10 @@ public void init(Map args) { distCpArgs = new HashMap<>(args); distCpArgs.remove(FILE_PATH); distCpArgs.remove(TARGET_ARG); + + if (!containsPreserveOption(args)) { + distCpArgs.put(PRESERVE_DISTCP_OPTION_DEFAULT, ""); + } } @Override @@ -126,6 +133,12 @@ private Stream mapOptionToString(String key, String value) { return Stream.of(key, value); } + private boolean containsPreserveOption(Map args) { + return args.keySet() + .stream() + .anyMatch(option -> option.startsWith(PRESERVE_DISTCP_OPTION_PREFIX)); + } + /** Used to gracefully close MapReduce job (MR Job is not AutoCloseable inheritor in Hadoop 2.7) */ private static class JobCloseableWrapper implements AutoCloseable { diff --git a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/action/MetaDataAction.java b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/action/MetaDataAction.java index 169acf66614..6ac5995623c 100644 --- a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/action/MetaDataAction.java +++ b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/action/MetaDataAction.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,20 +15,26 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.smartdata.hdfs.action; +import java.io.IOException; +import java.net.URI; +import java.util.Map; +import java.util.Optional; +import org.apache.commons.lang.math.NumberUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.smartdata.action.annotation.ActionSignature; -import org.smartdata.model.FileInfo; +import org.smartdata.hdfs.HadoopUtil; +import org.smartdata.model.FileInfoDiff; -import java.io.IOException; -import java.net.URI; -import java.util.Map; +import static org.smartdata.utils.ConfigUtil.toRemoteClusterConfig; /** * action to set MetaData of file @@ -52,43 +58,26 @@ public class MetaDataAction extends HdfsAction { public static final String ATIME = "-atime"; private String srcPath; - private String ownerName; - private String groupName; - private short replication; - private short permission; - private long aTime; - private long mTime; + + private FileInfoDiff fileInfoDiff; @Override public void init(Map args) { super.init(args); srcPath = args.get(FILE_PATH); - ownerName = null; - groupName = null; - replication = -1; - permission = -1; - aTime = -1; - mTime = -1; + fileInfoDiff = new FileInfoDiff() + .setOwner(args.get(OWNER_NAME)) + .setGroup(args.get(GROUP_NAME)) + .setModificationTime(NumberUtils.createLong(args.get(MTIME))) + .setAccessTime(NumberUtils.createLong(args.get(ATIME))); - if (args.containsKey(OWNER_NAME)) { - this.ownerName = args.get(OWNER_NAME); - } - if (args.containsKey(GROUP_NAME)) { - this.groupName = args.get(GROUP_NAME); - } if (args.containsKey(BLOCK_REPLICATION)) { - this.replication = Short.parseShort(args.get(BLOCK_REPLICATION)); + fileInfoDiff.setBlockReplication(Short.parseShort(args.get(BLOCK_REPLICATION))); } + if (args.containsKey(PERMISSION)) { - FsPermission fsPermission = new FsPermission(args.get(PERMISSION)); - this.permission = fsPermission.toShort(); - } - if (args.containsKey(MTIME)) { - this.mTime = Long.parseLong(args.get(MTIME)); - } - if (args.containsKey(ATIME)) { - this.aTime = Long.parseLong(args.get(ATIME)); + fileInfoDiff.setPermission(Short.parseShort(args.get(PERMISSION))); } } @@ -98,97 +87,68 @@ protected void execute() throws Exception { throw new IllegalArgumentException("File src is missing."); } - FileInfo fileInfo = - new FileInfo( - srcPath, - 0, - 0, - false, - replication, - 0, - mTime, - aTime, - permission, - ownerName, - groupName, - (byte) 1, - (byte) 0); - - changeFileMetaData(srcPath, fileInfo); + changeFileMetadata(srcPath, fileInfoDiff, getContext().getConf()); } - private boolean changeFileMetaData(String srcFile, FileInfo fileInfo) throws IOException { + static void changeFileMetadata(String srcFile, FileInfoDiff fileInfoDiff, Configuration configuration) throws IOException { try { if (srcFile.startsWith("hdfs")) { - // change file metadata in remote cluster - // TODO read conf from files - Configuration conf = new Configuration(); - FileSystem fs = FileSystem.get(URI.create(srcFile), conf); - - if (fileInfo.getOwner() != null) { - fs.setOwner( - new Path(srcFile), - fileInfo.getOwner(), - fs.getFileStatus(new Path(srcFile)).getGroup()); - } - if (fileInfo.getGroup() != null) { - fs.setOwner( - new Path(srcFile), - fs.getFileStatus(new Path(srcFile)).getOwner(), - fileInfo.getGroup()); - } - if (fileInfo.getBlockReplication() != -1) { - fs.setReplication(new Path(srcFile), fileInfo.getBlockReplication()); - } - if (fileInfo.getPermission() != -1) { - fs.setPermission(new Path(srcFile), new FsPermission(fileInfo.getPermission())); - } - if (fileInfo.getAccessTime() != -1) { - fs.setTimes( - new Path(srcFile), - fs.getFileStatus(new Path(srcFile)).getModificationTime(), - fileInfo.getAccessTime()); - } - if (fileInfo.getModificationTime() != -1) { - fs.setTimes( - new Path(srcFile), - fileInfo.getModificationTime(), - fs.getFileStatus(new Path(srcFile)).getAccessTime()); - } - return true; - } else { - // change file metadata in local cluster - if (fileInfo.getOwner() != null) { - dfsClient.setOwner( - srcFile, fileInfo.getOwner(), dfsClient.getFileInfo(srcFile).getGroup()); - } - if (fileInfo.getGroup() != null) { - dfsClient.setOwner( - srcFile, dfsClient.getFileInfo(srcFile).getOwner(), fileInfo.getGroup()); - } - if (fileInfo.getBlockReplication() != -1) { - dfsClient.setReplication(srcFile, fileInfo.getBlockReplication()); - } - if (fileInfo.getPermission() != -1) { - dfsClient.setPermission(srcFile, new FsPermission(fileInfo.getPermission())); - } - if (fileInfo.getAccessTime() != -1) { - dfsClient.setTimes( - srcFile, - dfsClient.getFileInfo(srcFile).getModificationTime(), - fileInfo.getAccessTime()); - } - if (fileInfo.getModificationTime() != -1) { - dfsClient.setTimes( - srcFile, - fileInfo.getModificationTime(), - dfsClient.getFileInfo(srcFile).getAccessTime()); - } - return true; + changeRemoteFileMetadata(srcFile, fileInfoDiff, configuration); + return; } - } catch (Exception e) { - LOG.debug("Metadata cannot be applied", e); + changeLocalFileMetadata(srcFile, fileInfoDiff, configuration); + } catch ( + Exception exception) { + LOG.error("Metadata cannot be applied", exception); + throw exception; + } + } + + private static void changeRemoteFileMetadata(String srcFile, + FileInfoDiff fileInfoDiff, Configuration configuration) throws IOException { + FileSystem remoteFileSystem = FileSystem.get(URI.create(srcFile), + toRemoteClusterConfig(configuration)); + changeFileMetadata(srcFile, fileInfoDiff, remoteFileSystem); + } + + private static void changeLocalFileMetadata(String srcFile, + FileInfoDiff fileInfoDiff, Configuration configuration) throws IOException { + FileSystem localFileSystem = FileSystem.get( + HadoopUtil.getNameNodeUri(configuration), configuration); + changeFileMetadata(srcFile, fileInfoDiff, localFileSystem); + } + + private static void changeFileMetadata(String srcFile, + FileInfoDiff fileInfoDiff, FileSystem fileSystem) throws IOException { + Path srcPath = new Path(srcFile); + FileStatus srcFileStatus = fileSystem.getFileStatus(srcPath); + + String owner = Optional.ofNullable(fileInfoDiff.getOwner()) + .orElseGet(srcFileStatus::getOwner); + String group = Optional.ofNullable(fileInfoDiff.getGroup()) + .orElseGet(srcFileStatus::getGroup); + + if (fileInfoDiff.getOwner() != null + || fileInfoDiff.getGroup() != null) { + fileSystem.setOwner(srcPath, owner, group); + } + + if (fileInfoDiff.getBlockReplication() != null) { + fileSystem.setReplication(srcPath, fileInfoDiff.getBlockReplication()); + } + + if (fileInfoDiff.getPermission() != null) { + fileSystem.setPermission(srcPath, new FsPermission(fileInfoDiff.getPermission())); + } + + long modificationTime = Optional.ofNullable(fileInfoDiff.getModificationTime()) + .orElseGet(srcFileStatus::getModificationTime); + long accessTime = Optional.ofNullable(fileInfoDiff.getAccessTime()) + .orElseGet(srcFileStatus::getAccessTime); + + if (fileInfoDiff.getAccessTime() != null + || fileInfoDiff.getModificationTime() != null) { + fileSystem.setTimes(srcPath, modificationTime, accessTime); } - return false; } } diff --git a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/metric/fetcher/InotifyEventApplier.java b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/metric/fetcher/InotifyEventApplier.java index 1a33b191266..254f71cc239 100644 --- a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/metric/fetcher/InotifyEventApplier.java +++ b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/metric/fetcher/InotifyEventApplier.java @@ -33,7 +33,7 @@ import org.smartdata.model.FileDiff; import org.smartdata.model.FileDiffType; import org.smartdata.model.FileInfo; -import org.smartdata.model.FileInfoUpdate; +import org.smartdata.model.FileInfoDiff; import java.io.ByteArrayInputStream; import java.io.DataInputStream; @@ -218,10 +218,10 @@ private void applyClose(Event.CloseEvent closeEvent) throws MetaStoreException { metaStore.insertFileDiff(fileDiff); } } - FileInfoUpdate fileInfoUpdate = new FileInfoUpdate() + FileInfoDiff fileInfoDiff = new FileInfoDiff() .setLength(closeEvent.getFileSize()) .setModificationTime(closeEvent.getTimestamp()); - metaStore.updateFileByPath(closeEvent.getPath(), fileInfoUpdate); + metaStore.updateFileByPath(closeEvent.getPath(), fileInfoDiff); } //Todo: should update mtime? atime? @@ -333,7 +333,7 @@ private void applyMetadataUpdate(Event.MetadataUpdateEvent metadataUpdateEvent) fileDiff = new FileDiff(FileDiffType.METADATA); fileDiff.setSrc(metadataUpdateEvent.getPath()); } - FileInfoUpdate fileInfoUpdate = new FileInfoUpdate(); + FileInfoDiff fileInfoUpdate = new FileInfoDiff(); switch (metadataUpdateEvent.getMetadataType()) { case TIMES: if (metadataUpdateEvent.getMtime() > 0) { diff --git a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/scheduler/CopyScheduler.java b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/scheduler/CopyScheduler.java index b30264981db..85e353bfe25 100644 --- a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/scheduler/CopyScheduler.java +++ b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/scheduler/CopyScheduler.java @@ -29,6 +29,7 @@ import org.smartdata.SmartContext; import org.smartdata.action.SyncAction; import org.smartdata.conf.SmartConfKeys; +import org.smartdata.hdfs.action.CopyFileAction; import org.smartdata.hdfs.action.HdfsAction; import org.smartdata.metastore.MetaStore; import org.smartdata.metastore.MetaStoreException; @@ -53,9 +54,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import static org.smartdata.SmartConstants.DISTRIBUTED_FILE_SYSTEM; -import static org.smartdata.SmartConstants.FS_HDFS_IMPL; -import static org.smartdata.SmartConstants.SMART_FILE_SYSTEM; +import static org.smartdata.utils.ConfigUtil.toRemoteClusterConfig; public class CopyScheduler extends ActionSchedulerService { static final Logger LOG = @@ -149,6 +148,7 @@ public ScheduleResult onSchedule(CmdletInfo cmdletInfo, ActionInfo actionInfo, String srcDir = action.getArgs().get(SyncAction.SRC); String path = action.getArgs().get(HdfsAction.FILE_PATH); String destDir = action.getArgs().get(SyncAction.DEST); + String preserveAttributes = action.getArgs().get(SyncAction.PRESERVE); String destPath = path.replaceFirst(srcDir, destDir); // Check again to avoid corner cases long did = fileDiffChainMap.get(path).getHead(); @@ -192,6 +192,9 @@ public ScheduleResult onSchedule(CmdletInfo cmdletInfo, ActionInfo actionInfo, case APPEND: action.setActionType("copy"); action.getArgs().put("-dest", destPath); + if (preserveAttributes != null) { + action.getArgs().put(CopyFileAction.PRESERVE, preserveAttributes); + } if (rateLimiter != null) { String strLen = fileDiff.getParameters().get("-length"); if (strLen != null) { @@ -413,11 +416,7 @@ private FileStatus[] listFileStatusesOfDirs(String dirName) { // We simply use local HDFS conf for getting remote file system. // The smart file system configured for local HDFS should not be // introduced to remote file system. - Configuration remoteConf = new Configuration(conf); - if (remoteConf.get(FS_HDFS_IMPL, "").equals( - SMART_FILE_SYSTEM)) { - remoteConf.set(FS_HDFS_IMPL, DISTRIBUTED_FILE_SYSTEM); - } + Configuration remoteConf = toRemoteClusterConfig(conf); fs = FileSystem.get(URI.create(dirName), remoteConf); tmpFileStatus = fs.listStatus(new Path(dirName)); for (FileStatus fileStatus : tmpFileStatus) { diff --git a/smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/action/TestCopyFileAction.java b/smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/action/TestCopyFileAction.java index f3ef1f17c46..f9d15dbfedd 100644 --- a/smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/action/TestCopyFileAction.java +++ b/smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/action/TestCopyFileAction.java @@ -17,154 +17,160 @@ */ package org.smartdata.hdfs.action; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; +import com.google.common.collect.Sets; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Set; +import java.util.stream.Collectors; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSTestUtil; import org.junit.Assert; import org.junit.Test; -import org.smartdata.hdfs.MiniClusterHarness; import java.util.HashMap; import java.util.Map; +import org.smartdata.hdfs.MultiClusterHarness; + +import static org.smartdata.hdfs.action.CopyFileAction.PreserveAttribute.MODIFICATION_TIME; +import static org.smartdata.hdfs.action.CopyFileAction.PreserveAttribute.OWNER; +import static org.smartdata.hdfs.action.CopyFileAction.PreserveAttribute.REPLICATION_NUMBER; /** * Test for CopyFileAction. */ -public class TestCopyFileAction extends MiniClusterHarness { +public class TestCopyFileAction extends MultiClusterHarness { + + private static final String FILE_TO_COPY_CONTENT = "testContent 112"; - private void copyFile(String src, String dest, long length, + private void copyFile(Path src, Path dest, long length, long offset) throws Exception { + copyFile(src, dest, length, offset, Collections.emptySet()); + } + + private void copyFile(Path src, Path dest, long length, + long offset, Set preserveAttributes) throws Exception { CopyFileAction copyFileAction = new CopyFileAction(); copyFileAction.setDfsClient(dfsClient); copyFileAction.setContext(smartContext); Map args = new HashMap<>(); - args.put(CopyFileAction.FILE_PATH, src); - args.put(CopyFileAction.DEST_PATH, dest); + args.put(CopyFileAction.FILE_PATH, src.toUri().getPath()); + args.put(CopyFileAction.DEST_PATH, pathToActionArg(dest)); args.put(CopyFileAction.LENGTH, "" + length); args.put(CopyFileAction.OFFSET_INDEX, "" + offset); + + if (!preserveAttributes.isEmpty()) { + String attributesOption = preserveAttributes.stream() + .map(Object::toString) + .collect(Collectors.joining(",")); + args.put(CopyFileAction.PRESERVE, attributesOption); + } + copyFileAction.init(args); copyFileAction.run(); Assert.assertTrue(copyFileAction.getExpectedAfterRun()); } - /*@Test - public void testLocalFileCopy() throws Exception { - final String srcPath = "/testCopy"; - final String file1 = "file1"; - final String destPath = "/backup"; - Path srcDir = new Path(srcPath); - dfs.mkdirs(srcDir); - dfs.mkdirs(new Path(destPath)); - // write to DISK - final FSDataOutputStream out1 = dfs.create(new Path(srcPath + "/" + file1)); - out1.writeChars("testCopy1"); - out1.close(); - copyFile(srcPath + "/" + file1, destPath + "/" + file1, 0, 0); - // Check if file exists - Assert.assertTrue(dfsClient.exists(destPath + "/" + file1)); - final FSDataInputStream in1 = dfs.open(new Path(destPath + "/" + file1)); - StringBuilder readString = new StringBuilder(); - for (int i = 0; i < 9; i++) { - readString.append(in1.readChar()); - } - Assert.assertTrue(readString.toString().equals("testCopy1")); - }*/ + @Test + public void testFileCopy() throws Exception { + Path srcPath = new Path("/testCopy/file1"); + Path destPath = anotherClusterPath("/backup", srcPath.getName()); + + DFSTestUtil.writeFile(dfs, srcPath, "testCopy1"); + + copyFile(srcPath, destPath, 0, 0); + + assertFileContent(destPath, "testCopy1"); + } @Test - public void testRemoteFileCopy() throws Exception { - final String srcPath = "/testCopy"; - final String file1 = "file1"; - // Destination with "hdfs" prefix - final String destPath = dfs.getUri() + "/backup"; - dfs.mkdirs(new Path(srcPath)); - dfs.mkdirs(new Path(destPath)); - // write to DISK - final FSDataOutputStream out1 = dfs.create(new Path(srcPath + "/" + file1)); - out1.writeChars("testCopy1"); - out1.close(); - - copyFile(srcPath + "/" + file1, destPath + "/" + file1, 0, 0); - // Check if file exists - Assert.assertTrue(dfsClient.exists("/backup/" + file1)); - final FSDataInputStream in1 = dfs.open(new Path(destPath + "/" + file1)); - StringBuilder readString = new StringBuilder(); - for (int i = 0; i < 9; i++) { - readString.append(in1.readChar()); - } - Assert.assertTrue(readString.toString().equals("testCopy1")); + public void testCopyWithOffset() throws Exception { + Path srcPath = new Path("/testCopy/testCopyWithOffset"); + Path destPath = anotherClusterPath("/backup", srcPath.getName()); + + byte[] srcFileContent = {0, 1, 2, 3, 4, 5, 6, 7}; + DFSTestUtil.writeFile(dfs, srcPath, srcFileContent); + + copyFile(srcPath, destPath, 4, 4); + + Assert.assertTrue(anotherDfs.exists(destPath)); + + byte[] actualDestContent = DFSTestUtil.readFileAsBytes(anotherDfs, destPath); + byte[] expectedDestContent = Arrays.copyOfRange(srcFileContent, 4, srcFileContent.length); + Assert.assertArrayEquals(expectedDestContent, actualDestContent); } @Test - public void testRmoteCopyWithOffset() throws Exception { - final String srcPath = "/testCopy"; - final String file1 = "file1"; - // Destination with "hdfs" prefix - final String destPath = dfs.getUri() + "/backup"; - dfs.mkdirs(new Path(srcPath)); - dfs.mkdirs(new Path(destPath)); - // write to DISK - final FSDataOutputStream out1 = dfs.create(new Path(srcPath + "/" + file1)); - for (int i = 0; i < 50; i++) { - out1.writeByte(1); - } - for (int i = 0; i < 50; i++) { - out1.writeByte(2); - } - out1.close(); - copyFile(srcPath + "/" + file1, destPath + "/" + file1, 50, 50); - // Check if file exists - Assert.assertTrue(dfsClient.exists("/backup/" + file1)); - final FSDataInputStream in1 = dfs.open(new Path(destPath + "/" + file1)); - for (int i = 0; i < 50; i++) { - Assert.assertTrue(in1.readByte() == 2); - } + public void testAppend() throws Exception { + Path srcPath = new Path("/testCopy/testAppend"); + Path destPath = anotherClusterPath("/backup", srcPath.getName()); + + DFSTestUtil.createFile(dfs, srcPath, 100, (short) 3, 0xFEED); + DFSTestUtil.createFile(anotherDfs, destPath, 50, (short) 3, 0xFEED); + + copyFile(srcPath, destPath, 50, 50); + + Assert.assertTrue(anotherDfs.exists(destPath)); + Assert.assertEquals(100, anotherDfs.getFileStatus(destPath).getLen()); } - /*@Test - public void testLocalCopyWithOffset() throws Exception { - final String srcPath = "/testCopy"; - final String file1 = "file1"; - // Destination with "hdfs" prefix - final String destPath = "/backup"; - dfs.mkdirs(new Path(srcPath)); - dfs.mkdirs(new Path(destPath)); - // write to DISK - final FSDataOutputStream out1 = dfs.create(new Path(srcPath + "/" + file1)); - for (int i = 0; i < 50; i++) { - out1.writeByte(1); - } - for (int i = 0; i < 50; i++) { - out1.writeByte(2); - } - out1.close(); - copyFile(srcPath + "/" + file1, destPath + "/" + file1, 50, 50); - // Check if file exists - Assert.assertTrue(dfsClient.exists("/backup/" + file1)); - final FSDataInputStream in1 = dfs.open(new Path(destPath + "/" + file1)); - for (int i = 0; i < 50; i++) { - Assert.assertTrue(in1.readByte() == 2); - } - }*/ + @Test + public void testPreserveAllAttributes() throws Exception { + Path srcPath = createFileWithAttributes("/test/src/fileToCopy"); + Path destPath = anotherClusterPath("/dest", srcPath.getName()); + + copyFileWithAttributes(srcPath, destPath, + Sets.newHashSet(CopyFileAction.PreserveAttribute.values())); + + FileStatus destFileStatus = anotherDfs.getFileStatus(destPath); + Assert.assertEquals(new FsPermission("777"), destFileStatus.getPermission()); + Assert.assertEquals("newUser", destFileStatus.getOwner()); + Assert.assertEquals("newGroup", destFileStatus.getGroup()); + Assert.assertEquals(2, destFileStatus.getReplication()); + Assert.assertEquals(0L, destFileStatus.getModificationTime()); + } @Test - public void testAppendRemote() throws Exception { - final String srcPath = "/testCopy"; - final String file1 = "file1"; - // Destination with "hdfs" prefix - final String destPath = dfs.getUri() + "/backup"; - dfs.mkdirs(new Path(srcPath)); - dfs.mkdirs(new Path(destPath)); - // write to DISK - DFSTestUtil.createFile(dfs, new Path(srcPath + "/" + file1), 100, (short) 3, - 0xFEED); - DFSTestUtil.createFile(dfs, new Path(destPath + "/" + file1), 50, (short) 3, - 0xFEED); - copyFile(srcPath + "/" + file1, destPath + "/" + file1, 50, 50); - // Check if file exists - Assert.assertTrue(dfsClient.exists("/backup/" + file1)); - FileStatus fileStatus = dfs.getFileStatus(new Path(destPath + "/" + file1)); - Assert.assertEquals(100, fileStatus.getLen()); + public void testPreserveSomeAttributes() throws Exception { + Path srcPath = createFileWithAttributes("/test/src/anotherFileToCopy"); + Path destPath = anotherClusterPath("/dest", srcPath.getName()); + + copyFileWithAttributes(srcPath, destPath, + Sets.newHashSet(OWNER, MODIFICATION_TIME, REPLICATION_NUMBER)); + + FileStatus destFileStatus = anotherDfs.getFileStatus(destPath); + Assert.assertEquals("newUser", destFileStatus.getOwner()); + Assert.assertEquals(0L, destFileStatus.getModificationTime()); + Assert.assertEquals(2, destFileStatus.getReplication()); + Assert.assertNotEquals(new FsPermission("777"), destFileStatus.getPermission()); + Assert.assertNotEquals("newGroup", destFileStatus.getGroup()); + } + + private Path createFileWithAttributes(String path) throws IOException { + Path srcFilePath = new Path(path); + + DFSTestUtil.writeFile(dfs, srcFilePath, FILE_TO_COPY_CONTENT); + + FsPermission newPermission = new FsPermission("777"); + dfs.setOwner(srcFilePath, "newUser", "newGroup"); + dfs.setPermission(srcFilePath, newPermission); + dfs.setReplication(srcFilePath, (short) 2); + dfs.setTimes(srcFilePath, 0L, 1L); + + return srcFilePath; + } + + private void copyFileWithAttributes(Path srcFilePath, Path destPath, + Set preserveAttributes) throws Exception { + copyFile(srcFilePath, destPath, 0, 0, preserveAttributes); + assertFileContent(destPath, FILE_TO_COPY_CONTENT); + } + + private void assertFileContent(Path filePath, String expectedContent) throws Exception { + Assert.assertTrue(anotherDfs.exists(filePath)); + String actualContent = DFSTestUtil.readFile(anotherDfs, filePath); + Assert.assertEquals(expectedContent, actualContent); } } diff --git a/smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/action/TestDistCpAction.java b/smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/action/TestDistCpAction.java index 466661c3fa4..d7d2cffa799 100644 --- a/smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/action/TestDistCpAction.java +++ b/smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/action/TestDistCpAction.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.Collections; +import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -38,6 +39,13 @@ import org.junit.rules.TemporaryFolder; import org.smartdata.hdfs.MiniClusterHarness; +import static org.apache.hadoop.tools.DistCpOptions.FileAttribute.ACL; +import static org.apache.hadoop.tools.DistCpOptions.FileAttribute.CHECKSUMTYPE; +import static org.apache.hadoop.tools.DistCpOptions.FileAttribute.GROUP; +import static org.apache.hadoop.tools.DistCpOptions.FileAttribute.PERMISSION; +import static org.apache.hadoop.tools.DistCpOptions.FileAttribute.TIMES; +import static org.apache.hadoop.tools.DistCpOptions.FileAttribute.USER; + /** * Test for DistCpAction. */ @@ -67,6 +75,8 @@ public void testParseSingleSource() { distCpOptions.getSourcePaths()); Assert.assertEquals(new Path("hdfs://nn2/test/target/dir1"), distCpOptions.getTargetPath()); + Assert.assertEquals(EnumSet.of(USER, GROUP, PERMISSION), + distCpOptions.getPreserveAttributes()); } @Test @@ -181,6 +191,8 @@ public void testParseDistCpOptionalArgs() { Assert.assertEquals(16, distCpOptions.getMaxMaps()); Assert.assertEquals("dynamic", distCpOptions.getCopyStrategy()); Assert.assertTrue(distCpOptions.shouldSyncFolder()); + Assert.assertEquals(EnumSet.of(CHECKSUMTYPE, ACL, TIMES), + distCpOptions.getPreserveAttributes()); } @Test @@ -219,6 +231,7 @@ public void testCopyFromAnotherCluster() throws Exception { } } + // todo inherit from MultiClusterHarness private void testCopyToCluster(FileSystem sourceFs, FileSystem targetFs) throws Exception { Map args = new HashMap<>(); String sourcePath = sourceFs.getUri() + "/test/source/dir1"; diff --git a/smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/action/TestMetaDataAction.java b/smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/action/TestMetaDataAction.java index 9951739b8d5..457349cd204 100644 --- a/smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/action/TestMetaDataAction.java +++ b/smart-hadoop-support/smart-hadoop/src/test/java/org/smartdata/hdfs/action/TestMetaDataAction.java @@ -17,10 +17,14 @@ */ package org.smartdata.hdfs.action; -import org.apache.hadoop.fs.FSDataOutputStream; +import java.io.UnsupportedEncodingException; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSTestUtil; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.smartdata.hdfs.MiniClusterHarness; import java.io.IOException; @@ -30,58 +34,69 @@ /** * Test for MetaDataAction */ +@RunWith(Parameterized.class) public class TestMetaDataAction extends MiniClusterHarness { - @Test - public void testLocalMetadataChange() throws IOException { - final String srcPath = "/test"; - final String file = "file"; - - dfs.mkdirs(new Path(srcPath)); - FSDataOutputStream out = dfs.create(new Path(srcPath + "/" + file)); - out.close(); + @Parameterized.Parameter() + public boolean isRemoteCopy; - MetaDataAction metaFileAction = new MetaDataAction(); - metaFileAction.setDfsClient(dfsClient); - metaFileAction.setContext(smartContext); + @Parameterized.Parameters(name = "Remote copy - {0}") + public static Object[] parameters() { + return new Object[] {true, false}; + } + @Test + public void testMetadataChange() throws IOException { Map args = new HashMap<>(); - args.put(MetaDataAction.FILE_PATH, srcPath + "/" + file); - args.put(MetaDataAction.OWNER_NAME, "test"); - args.put(MetaDataAction.PERMISSION, "777"); - - metaFileAction.init(args); - metaFileAction.run(); - - Assert.assertTrue(metaFileAction.getExpectedAfterRun()); - Assert.assertTrue(dfs.getFileStatus(new Path(srcPath + "/" + file)).getOwner().equals("test")); - Assert.assertTrue(dfs.getFileStatus(new Path(srcPath + "/" + file)).getPermission().toString().equals("rwxrwxrwx")); + args.put(MetaDataAction.OWNER_NAME, "user"); + args.put(MetaDataAction.GROUP_NAME, "group"); + args.put(MetaDataAction.BLOCK_REPLICATION, "7"); + args.put(MetaDataAction.PERMISSION, "511"); + args.put(MetaDataAction.MTIME, "10"); + + FileStatus fileStatus = updateMetadata(args); + Assert.assertEquals("user", fileStatus.getOwner()); + Assert.assertEquals("group", fileStatus.getGroup()); + Assert.assertEquals(7, fileStatus.getReplication()); + Assert.assertEquals("rwxrwxrwx", fileStatus.getPermission().toString()); + Assert.assertEquals(10L, fileStatus.getModificationTime()); } @Test - public void testRemoteMetadataChange() throws IOException { - final String srcPath = "/test"; - final String file = "file"; + public void testPartialMetadataChange() throws IOException { + Map args = new HashMap<>(); + args.put(MetaDataAction.GROUP_NAME, "group"); + args.put(MetaDataAction.BLOCK_REPLICATION, "7"); + args.put(MetaDataAction.MTIME, "10"); + + FileStatus fileStatus = updateMetadata(args); + Assert.assertEquals("group", fileStatus.getGroup()); + Assert.assertEquals(7, fileStatus.getReplication()); + Assert.assertEquals(10L, fileStatus.getModificationTime()); + } + + private FileStatus updateMetadata(Map args) throws IOException { + Path srcPath = new Path("/test/file"); + DFSTestUtil.writeFile(dfs, srcPath, "data"); - dfs.mkdirs(new Path(srcPath)); - FSDataOutputStream out = dfs.create(new Path(srcPath + "/" + file)); - out.close(); + args.put(MetaDataAction.FILE_PATH, pathToActionArg(srcPath)); + runAction(args); + return dfs.getFileStatus(srcPath); + } + + private void runAction(Map args) throws UnsupportedEncodingException { MetaDataAction metaFileAction = new MetaDataAction(); metaFileAction.setDfsClient(dfsClient); metaFileAction.setContext(smartContext); - Map args = new HashMap<>(); - args.put(MetaDataAction.FILE_PATH, dfs.getUri() + srcPath + "/" + file); - args.put(MetaDataAction.OWNER_NAME, "test"); - args.put(MetaDataAction.PERMISSION, "777"); - metaFileAction.init(args); metaFileAction.run(); Assert.assertTrue(metaFileAction.getExpectedAfterRun()); - Assert.assertTrue(dfs.getFileStatus(new Path(srcPath + "/" + file)).getOwner().equals("test")); - Assert.assertTrue(dfs.getFileStatus(new Path(srcPath + "/" + file)).getPermission().toString().equals("rwxrwxrwx")); + } + protected String pathToActionArg(Path path) { + return isRemoteCopy ? path.toString() : path.toUri().getPath(); } } diff --git a/smart-metastore/src/main/java/org/smartdata/metastore/MetaStore.java b/smart-metastore/src/main/java/org/smartdata/metastore/MetaStore.java index d828c4fe689..d9c5ab22776 100644 --- a/smart-metastore/src/main/java/org/smartdata/metastore/MetaStore.java +++ b/smart-metastore/src/main/java/org/smartdata/metastore/MetaStore.java @@ -71,7 +71,7 @@ import org.smartdata.model.FileDiff; import org.smartdata.model.FileDiffState; import org.smartdata.model.FileInfo; -import org.smartdata.model.FileInfoUpdate; +import org.smartdata.model.FileInfoDiff; import org.smartdata.model.FileState; import org.smartdata.model.GlobalConfig; import org.smartdata.model.NormalFileState; @@ -205,7 +205,7 @@ public void insertFiles(FileInfo[] files) fileInfoDao.insert(files); } - public void updateFileByPath(String path, FileInfoUpdate fileUpdate) { + public void updateFileByPath(String path, FileInfoDiff fileUpdate) { fileInfoDao.updateByPath(path, fileUpdate); } diff --git a/smart-metastore/src/main/java/org/smartdata/metastore/dao/FileInfoDao.java b/smart-metastore/src/main/java/org/smartdata/metastore/dao/FileInfoDao.java index e8cd8bbff83..9bda9008c92 100644 --- a/smart-metastore/src/main/java/org/smartdata/metastore/dao/FileInfoDao.java +++ b/smart-metastore/src/main/java/org/smartdata/metastore/dao/FileInfoDao.java @@ -18,7 +18,7 @@ package org.smartdata.metastore.dao; import org.smartdata.model.FileInfo; -import org.smartdata.model.FileInfoUpdate; +import org.smartdata.model.FileInfoDiff; import java.sql.SQLException; import java.util.Collection; @@ -51,7 +51,7 @@ Map getFidPaths(Collection ids) int update(String path, int storagePolicy); - int updateByPath(String path, FileInfoUpdate fileUpdate); + int updateByPath(String path, FileInfoDiff fileUpdate); void deleteById(long fid); diff --git a/smart-metastore/src/main/java/org/smartdata/metastore/dao/impl/DefaultFileInfoDao.java b/smart-metastore/src/main/java/org/smartdata/metastore/dao/impl/DefaultFileInfoDao.java index fed0b6d537c..f52c9ec0d6e 100644 --- a/smart-metastore/src/main/java/org/smartdata/metastore/dao/impl/DefaultFileInfoDao.java +++ b/smart-metastore/src/main/java/org/smartdata/metastore/dao/impl/DefaultFileInfoDao.java @@ -20,7 +20,7 @@ import org.smartdata.metastore.dao.AbstractDao; import org.smartdata.metastore.dao.FileInfoDao; import org.smartdata.model.FileInfo; -import org.smartdata.model.FileInfoUpdate; +import org.smartdata.model.FileInfoDiff; import org.springframework.jdbc.core.RowMapper; import org.springframework.jdbc.core.namedparam.MapSqlParameterSource; import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; @@ -134,7 +134,7 @@ public int update(String path, int storagePolicy) { } @Override - public int updateByPath(String path, FileInfoUpdate fileUpdate) { + public int updateByPath(String path, FileInfoDiff fileUpdate) { return update(updateToMap(fileUpdate), "path = ?", path); } @@ -175,12 +175,11 @@ protected void renameDirectoryFiles(String oldPath, String newPath) { jdbcTemplate.update(sql, newPath, oldPath.length() + 1, oldPath + "/%"); } - private Map updateToMap(FileInfoUpdate fileInfo) { + private Map updateToMap(FileInfoDiff fileInfo) { Map parameters = new HashMap<>(); parameters.put("path", fileInfo.getPath()); parameters.put("length", fileInfo.getLength()); parameters.put("block_replication", fileInfo.getBlockReplication()); - parameters.put("block_size", fileInfo.getBlocksize()); parameters.put("modification_time", fileInfo.getModificationTime()); parameters.put("access_time", fileInfo.getAccessTime()); parameters diff --git a/smart-zeppelin/zeppelin-web/src/app/dashboard/views/actions/submit/help.html b/smart-zeppelin/zeppelin-web/src/app/dashboard/views/actions/submit/help.html index d31314c04dc..36add7b43bf 100755 --- a/smart-zeppelin/zeppelin-web/src/app/dashboard/views/actions/submit/help.html +++ b/smart-zeppelin/zeppelin-web/src/app/dashboard/views/actions/submit/help.html @@ -234,8 +234,8 @@

Action Usage

sync - -dest $dest - Sync file to $dest. + -dest $dest -preserve $attributes + Sync file to $dest with is comma-separated list of the file $attributes to preserve: owner, group, permissions, replication, modification-time. distcp