Skip to content

Commit

Permalink
Merge pull request #134 from arenadata/bugfix/ADH-5335-copys3-npe
Browse files Browse the repository at this point in the history
[ADH-5335] Fix NPE in Copy2S3Scheduler
  • Loading branch information
iamlapa authored Nov 22, 2024
2 parents 1b51de0 + 7836fa7 commit 0a3ba7e
Show file tree
Hide file tree
Showing 11 changed files with 198 additions and 85 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.smartdata.exception;

import java.io.IOException;

public class ActionRejectedException extends IOException {
public ActionRejectedException(String message) {
super(message);
}

public ActionRejectedException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.smartdata.cmdlet.parser.CmdletParser;
import org.smartdata.cmdlet.parser.ParsedCmdlet;
import org.smartdata.conf.SmartConfKeys;
import org.smartdata.exception.ActionRejectedException;
import org.smartdata.exception.NotFoundException;
import org.smartdata.exception.QueueFullException;
import org.smartdata.exception.SsmParseException;
Expand Down Expand Up @@ -308,7 +309,7 @@ private void checkActionsOnSubmit(CmdletInfo cmdletInfo,
for (ActionInfo actionInfo : actionInfos) {
for (ActionScheduler p : schedulers.get(actionInfo.getActionName())) {
if (!p.onSubmit(cmdletInfo, actionInfo)) {
throw new IOException("Action rejected by scheduler: " + actionInfo);
throw new ActionRejectedException("Action rejected by scheduler: " + actionInfo);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ private void copySingleFile(FileSystem fileSystem) throws IOException {
}

private OutputStream getDestOutPutStream() throws IOException {
FileSystem destFileSystem = destPath.getFileSystem(new Configuration());
FileSystem destFileSystem = destPath.getFileSystem(getConf());
return destFileSystem.create(destPath, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,7 @@ public Set<String> getFileLock() {

@Override
public boolean onSubmit(CmdletInfo cmdletInfo, ActionInfo actionInfo) {
if (isLocked(actionInfo)) {
return false;
}
return true;
return !isLocked(actionInfo);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,106 +17,71 @@
*/
package org.smartdata.hdfs.scheduler;

import org.apache.hadoop.conf.Configuration;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.smartdata.SmartContext;
import org.smartdata.exception.ActionRejectedException;
import org.smartdata.hdfs.action.HdfsAction;
import org.smartdata.metastore.MetaStore;
import org.smartdata.metastore.MetaStoreException;
import org.smartdata.model.ActionInfo;
import org.smartdata.model.CmdletInfo;
import org.smartdata.model.FileInfo;
import org.smartdata.model.FileState;
import org.smartdata.model.LaunchAction;
import org.smartdata.model.S3FileState;
import org.smartdata.model.action.ScheduleResult;
import org.smartdata.protocol.message.LaunchCmdlet;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;


public class Copy2S3Scheduler extends ActionSchedulerService {
private static final List<String> actions = Arrays.asList("copy2s3");
static final Logger LOG =
private static final Logger LOG =
LoggerFactory.getLogger(Copy2S3Scheduler.class);
private MetaStore metaStore;
private static final List<String> SUPPORTED_ACTIONS =
Collections.singletonList("copy2s3");

private final MetaStore metaStore;
//The file in copy need to be locked
private Set<String> fileLock;
// Global variables
private Configuration conf;
private final Set<String> fileLock;

public Copy2S3Scheduler(SmartContext context, MetaStore metaStore) {
super(context, metaStore);
this.metaStore = metaStore;
this.fileLock = Collections.synchronizedSet(new HashSet<String>());
try {
this.conf = getContext().getConf();
} catch (NullPointerException e) {
// If SmartContext is empty
this.conf = new Configuration();
}
}

private void lockTheFile(String filePath) {
fileLock.add(filePath);
}

private void unLockTheFile(String filePath) {
fileLock.remove(filePath);
}

private boolean ifLocked(String filePath) {
return fileLock.contains(filePath);
}

private long checkTheLengthOfFile(String fileName) {
try {
return metaStore.getFile(fileName).getLength();
} catch (MetaStoreException e) {
e.printStackTrace();
}
return 0;
}

private boolean isOnS3(String fileName) {
try {
return metaStore.getFileState(fileName)
.getFileType().getValue() == FileState.FileType.S3.getValue();
} catch (MetaStoreException e) {
return false;
}
this.fileLock = ConcurrentHashMap.newKeySet();
}

@Override
public List<String> getSupportedActions() {
return actions;
return SUPPORTED_ACTIONS;
}

@Override
public boolean onSubmit(CmdletInfo cmdletInfo, ActionInfo actionInfo)
throws IOException {
public boolean onSubmit(CmdletInfo cmdletInfo, ActionInfo actionInfo) throws IOException {
// check args
if (actionInfo.getArgs() == null) {
throw new IOException("No arguments for the action");
String path = Optional.ofNullable(actionInfo.getArgs())
.map(args -> args.get(HdfsAction.FILE_PATH))
.orElseThrow(() -> new ActionRejectedException(
"Required argument not found: " + HdfsAction.FILE_PATH));

if (isLocked(path)) {
throw new ActionRejectedException("The source file " + path + " is locked");
}
String path = actionInfo.getArgs().get(HdfsAction.FILE_PATH);
if (ifLocked(path)) {
throw new IOException("The submit file " + path + " is locked");

Optional<Long> fileLength = getFileLength(path);
if (!fileLength.isPresent()) {
throw new ActionRejectedException("The source file " + path + " not found");
}
if (checkTheLengthOfFile(path) == 0) {
throw new IOException("The submit file " + path + " length is 0");
if (fileLength.get() == 0) {
throw new ActionRejectedException("The source file " + path + " length is 0");
}
if (isOnS3(path)) {
throw new IOException("The submit file " + path + " is already copied");
throw new ActionRejectedException("The source file " + path + " is already copied");
}
lockTheFile(path);
LOG.debug("The file {} can be submitted", path);
fileLock.add(path);
return true;
}

Expand All @@ -132,8 +97,8 @@ public void onActionFinished(CmdletInfo cmdletInfo, ActionInfo actionInfo) {
}
}
// unlock filelock
if (ifLocked(path)) {
unLockTheFile(path);
if (isLocked(path)) {
fileLock.remove(path);
LOG.debug("unlocked copy2s3 file {}", path);
}
}
Expand All @@ -144,11 +109,34 @@ public void init() throws IOException {

@Override
public void start() throws IOException {

}

@Override
public void stop() throws IOException {
}


private boolean isLocked(String filePath) {
return fileLock.contains(filePath);
}

private Optional<Long> getFileLength(String fileName) {
try {
return Optional.ofNullable(metaStore.getFile(fileName))
.map(FileInfo::getLength);
} catch (MetaStoreException e) {
LOG.warn("Error fetching info about file: {}", fileName, e);
return Optional.empty();
}
}

private boolean isOnS3(String fileName) {
try {
return metaStore.getFileState(fileName)
.getFileType()
.getValue() == FileState.FileType.S3.getValue();
} catch (MetaStoreException e) {
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.smartdata.action.SyncAction;
import org.smartdata.conf.SmartConf;
import org.smartdata.conf.SmartConfKeys;
import org.smartdata.exception.ActionRejectedException;
import org.smartdata.hdfs.action.CopyDirectoryAction;
import org.smartdata.hdfs.action.CopyFileAction;
import org.smartdata.hdfs.action.HdfsAction;
Expand Down Expand Up @@ -345,7 +346,7 @@ public boolean onSubmit(CmdletInfo cmdletInfo, ActionInfo actionInfo)
throws IOException {
// check args
if (actionInfo.getArgs() == null) {
throw new IOException("No arguments for the action");
throw new ActionRejectedException("No arguments for the action");
}
String path = actionInfo.getArgs().get(HdfsAction.FILE_PATH);
LOG.debug("Submit file {} with lock {}", path, fileLocks);
Expand All @@ -355,7 +356,8 @@ public boolean onSubmit(CmdletInfo cmdletInfo, ActionInfo actionInfo)
fileLocks.add(path);
return true;
}
throw new IOException("The submit file " + path + " is in use by another program or user");
throw new ActionRejectedException(
"The submit file " + path + " is in use by another program or user");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.smartdata.SmartContext;
import org.smartdata.conf.SmartConf;
import org.smartdata.conf.SmartConfKeys;
import org.smartdata.exception.ActionRejectedException;
import org.smartdata.hdfs.CompatibilityHelper;
import org.smartdata.hdfs.CompatibilityHelperLoader;
import org.smartdata.hdfs.HadoopUtil;
Expand Down Expand Up @@ -124,7 +125,8 @@ public boolean onSubmit(CmdletInfo cmdletInfo, ActionInfo actionInfo)
}

if (actionInfo.getArgs().get(HdfsAction.FILE_PATH) == null) {
throw new IOException("File path is required for action " + actionInfo.getActionName() + "!");
throw new ActionRejectedException("File path is required for action "
+ actionInfo.getActionName() + "!");
}
String srcPath = actionInfo.getArgs().get(HdfsAction.FILE_PATH);
// The root dir should be excluded in checking whether file path ends with slash.
Expand All @@ -135,9 +137,7 @@ public boolean onSubmit(CmdletInfo cmdletInfo, ActionInfo actionInfo)
// For ec or unec action, check if the file is locked.
if (actionInfo.getActionName().equals(EC_ACTION_ID) ||
actionInfo.getActionName().equals(UNEC_ACTION_ID)) {
if (fileLock.contains(srcPath)) {
return false;
}
return !fileLock.contains(srcPath);
}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.smartdata.SmartContext;
import org.smartdata.conf.SmartConf;
import org.smartdata.conf.SmartConfKeys;
import org.smartdata.exception.ActionRejectedException;
import org.smartdata.hdfs.HadoopUtil;
import org.smartdata.hdfs.action.HdfsAction;
import org.smartdata.hdfs.action.MoveFileAction;
Expand Down Expand Up @@ -129,7 +130,7 @@ public boolean onSubmit(CmdletInfo cmdletInfo, ActionInfo actionInfo)
throws IOException {
// check args
if (actionInfo.getArgs() == null) {
throw new IOException("No arguments for the action");
throw new ActionRejectedException("No arguments for the action");
}

if (fileLock.contains(actionInfo.getArgs().get(HdfsAction.FILE_PATH))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.slf4j.LoggerFactory;
import org.smartdata.SmartContext;
import org.smartdata.SmartFilePermission;
import org.smartdata.exception.ActionRejectedException;
import org.smartdata.hdfs.HadoopUtil;
import org.smartdata.hdfs.action.HdfsAction;
import org.smartdata.hdfs.action.SmallFileCompactAction;
Expand Down Expand Up @@ -160,27 +161,27 @@ public boolean onSubmit(CmdletInfo cmdletInfo, ActionInfo actionInfo)
throws IOException {
// check args
if (actionInfo.getArgs() == null) {
throw new IOException("No arguments for the action");
throw new ActionRejectedException("No arguments for the action");
}
if (COMPACT_ACTION_NAME.equals(actionInfo.getActionName())) {
// Check if container file is null
String containerFilePath = getContainerFile(actionInfo);
if (containerFilePath == null || containerFilePath.isEmpty()) {
throw new IOException("Illegal container file path: " + containerFilePath);
throw new ActionRejectedException("Illegal container file path: " + containerFilePath);
}

// Check if small files is null or empty
String smallFiles = actionInfo.getArgs().get(HdfsAction.FILE_PATH);
if (smallFiles == null || smallFiles.isEmpty()) {
throw new IOException("Illegal small files: " + smallFiles);
throw new ActionRejectedException("Illegal small files: " + smallFiles);
}

// Check if small file list converted from Json is not empty
ArrayList<String> smallFileList = new Gson().fromJson(
smallFiles, new TypeToken<ArrayList<String>>() {
}.getType());
if (smallFileList.isEmpty()) {
throw new IOException("Illegal small files list: " + smallFileList);
throw new ActionRejectedException("Illegal small files list: " + smallFileList);
}

// Check whitelist
Expand All @@ -190,7 +191,7 @@ public boolean onSubmit(CmdletInfo cmdletInfo, ActionInfo actionInfo)
if (checkIfValidSmallFiles(smallFileList)) {
return true;
} else {
throw new IOException("Illegal small files are provided.");
throw new ActionRejectedException("Illegal small files are provided.");
}
} else {
return true;
Expand Down
Loading

0 comments on commit 0a3ba7e

Please sign in to comment.