Skip to content

Commit

Permalink
add uploadS3Key activation parameter to specify upload file name (#82)
Browse files Browse the repository at this point in the history
* add `s3key` parameter to specify upload file name

* rename `s3key` to `upload.s3.key` for readability and avoid conflicts

* rename `s3key` to `uploadS3Key` for readability and avoid conflicts

* upgrade to gradle 6 in order to apply com.jfrog.artifactory

* typo
  • Loading branch information
vqminh authored Mar 28, 2024
1 parent 2b4b5f1 commit a764088
Show file tree
Hide file tree
Showing 25 changed files with 166 additions and 165 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ subprojects { sp ->
maxWarnings = 0
ignoreFailures = false
}
checkstyleMain.source = ["src/main/java", "src/test/java"]
}

afterEvaluate {
Expand Down
12 changes: 1 addition & 11 deletions cdi-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,7 @@ sourceSets {
}

jacoco {
toolVersion = '0.8.0'
}

// Exclude classes, packages from Jacoco report, which we don't need to add unit tests for.
jacocoTestReport {
afterEvaluate {
classDirectories = files(classDirectories.files.collect {
fileTree(dir: it,
exclude: ['com/linkedin/cdi/factory'])
})
}
toolVersion = "0.8.6"
}

apply from: "$rootDir/gradle/java-publishing.gradle"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public Map<String, JsonArray> readAuthenticationToken(State state) {
* Read authentication and activation secondary input records and payload definitions (not records)
*
* @return a set of JsonArrays of data read from locations specified in SECONDARY_INPUT
* property organized by category, in a Map<String, JsonArray> structure
* property organized by category, in a Map&lt;String, JsonArray&gt; structure
*/
public Map<String, JsonArray> readAllContext(State state) {
Map<String, JsonArray> secondaryInputs = new HashMap<>();
Expand Down
133 changes: 70 additions & 63 deletions cdi-core/src/main/java/com/linkedin/cdi/connection/S3Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,42 +14,35 @@
import com.linkedin.cdi.util.JsonUtils;
import com.linkedin.cdi.util.SecretManager;
import com.linkedin.cdi.util.WorkUnitStatus;
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.gobblin.configuration.State;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.auth.credentials.*;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.core.sync.ResponseTransformer;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.services.s3.model.*;
import software.amazon.awssdk.utils.AttributeMap;

import static com.linkedin.cdi.configuration.PropertyCollection.*;
import static software.amazon.awssdk.http.SdkHttpConfigurationOption.*;
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.List;
import java.util.stream.Collectors;

import static com.linkedin.cdi.configuration.PropertyCollection.MSTAGE_CONNECTION_CLIENT_FACTORY;
import static software.amazon.awssdk.http.SdkHttpConfigurationOption.CONNECTION_TIMEOUT;
import static software.amazon.awssdk.http.SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS;

/**
* S3Connection creates transmission channel with AWS S3 data provider or AWS S3 data receiver,
Expand All @@ -59,6 +52,7 @@
*/
public class S3Connection extends MultistageConnection {
private static final Logger LOG = LoggerFactory.getLogger(S3Connection.class);
private static final String UPLOAD_S3_KEY = "uploadS3Key";
final private S3Keys s3SourceV2Keys;
private S3Client s3Client = null;

Expand All @@ -84,43 +78,47 @@ public S3Connection(State state, JobKeys jobKeys, ExtractorKeys extractorKeys) {
public WorkUnitStatus execute(WorkUnitStatus status) {
s3Client = getS3HttpClient(getState());

String finalPrefix = getWorkUnitSpecificString(s3SourceV2Keys.getPrefix(), getExtractorKeys().getDynamicParameters());
JsonObject dynamicParameters = getExtractorKeys().getDynamicParameters();
String finalPrefix = getWorkUnitSpecificString(s3SourceV2Keys.getPrefix(), dynamicParameters);
LOG.debug("Final Prefix to get files list: {}", finalPrefix);
boolean shouldUpload = StringUtils.isNotEmpty(getExtractorKeys().getPayloadsBinaryPath());
// upload to S3 if payload is empty, otherwise download from S3
String pathStr = getExtractorKeys().getPayloadsBinaryPath();
boolean shouldUpload = StringUtils.isNotEmpty(pathStr);
// upload to S3 if payload is empty, otherwise download from S3
if (shouldUpload) {
ByteArrayInputStream byteArrayInputStream = handleUpload(finalPrefix);
Path path = new Path(pathStr);
String fileName = finalPrefix + "/" + getS3Key(path);
ByteArrayInputStream byteArrayInputStream = handleUpload(path, fileName);
if (byteArrayInputStream != null) {
status.setBuffer(byteArrayInputStream);
}
return status;
}
try {
List<String> files = getFilesList(finalPrefix).stream()
.filter(objectKey -> objectKey.matches(s3SourceV2Keys.getFilesPattern()))
.collect(Collectors.toList());
.filter(objectKey -> objectKey.matches(s3SourceV2Keys.getFilesPattern()))
.collect(Collectors.toList());

LOG.debug("Number of files identified: {}", files.size());

if (StringUtils.isBlank(s3SourceV2Keys.getTargetFilePattern())) {
status.setBuffer(wrap(files));
} else {
// Multiple files are returned, then only process the exact match
String fileToDownload = files.size() == 0
? StringUtils.EMPTY : files.size() == 1
? files.get(0) : finalPrefix;
String fileToDownload = files.isEmpty()
? StringUtils.EMPTY : files.size() == 1
? files.get(0) : finalPrefix;

if (StringUtils.isNotBlank(fileToDownload)) {
LOG.debug("Downloading file: {}", fileToDownload);
GetObjectRequest getObjectRequest =
GetObjectRequest.builder().bucket(s3SourceV2Keys.getBucket()).key(fileToDownload).build();
GetObjectRequest.builder().bucket(s3SourceV2Keys.getBucket()).key(fileToDownload).build();
ResponseInputStream<GetObjectResponse> response =
s3Client.getObject(getObjectRequest, ResponseTransformer.toInputStream());
s3Client.getObject(getObjectRequest, ResponseTransformer.toInputStream());
status.setBuffer(response);
} else {
LOG.warn("Invalid set of parameters. "
+ "To list down files from a bucket, pattern parameter is needed,"
+ ", and to get object from s3 source target file name is needed.");
+ "To list down files from a bucket, pattern parameter is needed,"
+ ", and to get object from s3 source target file name is needed.");
}
}
} catch (Exception e) {
Expand All @@ -130,32 +128,41 @@ public WorkUnitStatus execute(WorkUnitStatus status) {
return status;
}

private ByteArrayInputStream handleUpload(String finalPrefix) {
/**
* Get s3 key either from activation parameters named `UPLOAD_S3_KEY` or from the source path itself
*/
@NotNull
private String getS3Key(Path path) {
JsonObject activationParameters = getExtractorKeys().getActivationParameters();
if(activationParameters.has(UPLOAD_S3_KEY)){
return activationParameters.get(UPLOAD_S3_KEY).getAsString();
}
return path.getName();
}

private ByteArrayInputStream handleUpload(Path path, String fileName) {
// the path here should be a file path instead of a directory path. Planning should be done upfront at the Source
// level and here each connection would just read a single file
String pathStr = getExtractorKeys().getPayloadsBinaryPath();
Path path = new Path(pathStr);
LOG.info("reading from path: {}", getExtractorKeys().getPayloadsBinaryPath());
LOG.info("reading from path: {}", path);
Configuration conf = new Configuration();
try (
FSDataInputStream fsDataInputStream = path.getFileSystem(conf).open(path);
FSDataInputStream fsDataInputStreamForMD5 = path.getFileSystem(conf).open(path);
BufferedInputStream bufferedInputStream = new BufferedInputStream(fsDataInputStream)
) {
FSDataInputStream fsDataInputStream = path.getFileSystem(conf).open(path);
FSDataInputStream fsDataInputStreamForMD5 = path.getFileSystem(conf).open(path);
BufferedInputStream bufferedInputStream = new BufferedInputStream(fsDataInputStream)
) {
long fileSize = path.getFileSystem(conf).getFileStatus(path).getLen();
// HDFS uses MD5MD5CRC for checksum, and thus MD5 needs to be computed separately
// to compare with the MD5 returned from S3
// A more detailed explanation can be found here
// https://cloud.google.com/architecture/hadoop/validating-data-transfers
String md5Hex = DigestUtils.md5Hex(fsDataInputStreamForMD5);
String fileName = finalPrefix + "/" + path.getName();
String bucket = s3SourceV2Keys.getBucket();
LOG.info("writing to bucket {} and key {}", bucket, fileName);
PutObjectRequest putObjectRequest = PutObjectRequest
.builder()
.bucket(bucket)
.key(fileName)
.build();
.builder()
.bucket(bucket)
.key(fileName)
.build();
RequestBody requestBody = RequestBody.fromInputStream(bufferedInputStream, fileSize);
PutObjectResponse putObjectResponse = s3Client.putObject(putObjectRequest, requestBody);
LOG.info("retrieved put object response");
Expand All @@ -165,21 +172,20 @@ private ByteArrayInputStream handleUpload(String finalPrefix) {
boolean md5Valid = true;
if (!eTagTruncated.equals(md5Hex)) {
LOG.error("md5 validation failed for bucket {} and key {}:"
+ " {} from S3 is different from {} of the original file",
bucket, fileName, eTag, md5Hex);
+ " {} from S3 is different from {} of the original file",
bucket, fileName, eTag, md5Hex);
md5Valid = false;
}
JsonObject jsonObject =
JsonUtils.GSON_WITH_SUPERCLASS_EXCLUSION.toJsonTree(putObjectResponse).getAsJsonObject();
JsonUtils.GSON_WITH_SUPERCLASS_EXCLUSION.toJsonTree(putObjectResponse).getAsJsonObject();
jsonObject.addProperty("md5Valid", md5Valid);
jsonObject.addProperty("bucket", bucket);
jsonObject.addProperty("key", fileName);
return new ByteArrayInputStream(
jsonObject.toString().getBytes(StandardCharsets.UTF_8)
jsonObject.toString().getBytes(StandardCharsets.UTF_8)
);
} catch (IOException e) {
LOG.error("Encountered IO Exception when reading from path: {}", pathStr);
e.printStackTrace();
LOG.error("Encountered IO Exception when reading from path: " + path, e);
return null;
}
}
Expand Down Expand Up @@ -226,16 +232,16 @@ synchronized S3Client getS3HttpClient(State state) {

Integer connectionTimeout = s3SourceV2Keys.getConnectionTimeout();
AttributeMap config = connectionTimeout == null ? GLOBAL_HTTP_DEFAULTS
: GLOBAL_HTTP_DEFAULTS.toBuilder()
: GLOBAL_HTTP_DEFAULTS.toBuilder()
.put(CONNECTION_TIMEOUT, Duration.ofSeconds(connectionTimeout))
.build();

s3Client = S3Client.builder()
.region(this.s3SourceV2Keys.getRegion())
.endpointOverride(URI.create(s3SourceV2Keys.getEndpoint()))
.httpClient(factory.getS3Client(state, config))
.credentialsProvider(getCredentialsProvider(state))
.build();
.region(this.s3SourceV2Keys.getRegion())
.endpointOverride(URI.create(s3SourceV2Keys.getEndpoint()))
.httpClient(factory.getS3Client(state, config))
.credentialsProvider(getCredentialsProvider(state))
.build();
} catch (Exception e) {
LOG.error("Error creating S3 Client: {}", e.getMessage());
}
Expand All @@ -245,12 +251,13 @@ synchronized S3Client getS3HttpClient(State state) {

/**
* retrieve a list of objects given a bucket name and a prefix
*
* @return list of object keys
*/
private List<String> getFilesList(String finalPrefix) {
List<String> files = Lists.newArrayList();
ListObjectsV2Request.Builder builder =
ListObjectsV2Request.builder().bucket(s3SourceV2Keys.getBucket()).maxKeys(s3SourceV2Keys.getMaxKeys());
ListObjectsV2Request.builder().bucket(s3SourceV2Keys.getBucket()).maxKeys(s3SourceV2Keys.getMaxKeys());

if (!finalPrefix.isEmpty()) {
builder.prefix(finalPrefix);
Expand All @@ -275,8 +282,8 @@ public AwsCredentialsProvider getCredentialsProvider(State state) {
AwsCredentialsProvider credentialsProvider = AnonymousCredentialsProvider.create();
if (StringUtils.isNotBlank(s3SourceV2Keys.getAccessKey()) || StringUtils.isNotEmpty(s3SourceV2Keys.getSecretId())) {
AwsCredentials credentials =
AwsBasicCredentials.create(SecretManager.getInstance(state).decrypt(s3SourceV2Keys.getAccessKey()),
SecretManager.getInstance(state).decrypt(s3SourceV2Keys.getSecretId()));
AwsBasicCredentials.create(SecretManager.getInstance(state).decrypt(s3SourceV2Keys.getAccessKey()),
SecretManager.getInstance(state).decrypt(s3SourceV2Keys.getSecretId()));
credentialsProvider = StaticCredentialsProvider.create(credentials);
}
return credentialsProvider;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@
* Currently following rules are defined:
*
* fail (upper bound rule): the source should be failed records
* Job succeeds when the row count in validation set / row count in base set < threshold
* Job fails when the row count in validation set / row count in base set >= threshold
* Job succeeds when the row count in validation set / row count in base set &lt; threshold
* Job fails when the row count in validation set / row count in base set &gt;= threshold
*
* success (lower bound rule): the source should be succeeded records
* Job succeeds when the row count in validation set / row count in base set >= threshold
* Job fails when the row count in validation set / row count in base set < threshold
* Job succeeds when the row count in validation set / row count in base set &gt;= threshold
* Job fails when the row count in validation set / row count in base set &lt; threshold
*/
public class InFlowValidationConverter extends Converter<Schema, Schema, GenericRecord, GenericRecord> {
private static final Logger LOG = LoggerFactory.getLogger(InFlowValidationConverter.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ protected String appendParameters(String uri, JsonObject parameters) {
* {"param1": "value1", "param2": "value2"}
*
* URL Encoded Example:
* param1=value1&param2=value2
* param1=value1%26param2=value2
*
* @param parameters Json structured parameters
* @return URL encoded entity
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public List<String> ls(String path) {
/**
* Execute an FTP ls command with retries
* @param path the target path to list content
* @param retries the number of times to try the ls command, must be > 0
* @param retries the number of times to try the ls command, must be &gt; 0
* @return the list of files and directories
*/
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public List<String> getFieldsAsList(JsonElement field) {

/**
* Reads secondary input paths one by one and return the JsonArrays by category
* @return a Map<String, JsonArray> structure for records by category
* @return a Map&lt;String, JsonArray&gt; structure for records by category
*/
public Map<String, JsonArray> readAll() {
if (transientInputPayload == null || transientInputPayload.size() == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
/**
* Recursively defined a Json Intermediate schema
*
* JsonIntermediateSchema := Map<columnName, JisColumn>
* JsonIntermediateSchema := Map&lt;columnName, JisColumn&gt;
*
* JisColumn := (columnName, nullability, JisDataType)
*
Expand All @@ -31,7 +31,7 @@
*
* EnumType := (JsonElementType, symbolsArray)
*
* UnionType := (JsonElementType, List<JisDataType>)
* UnionType := (JsonElementType, List&lt;JisDataType&gt;)
*
*/

Expand Down
6 changes: 3 additions & 3 deletions cdi-core/src/main/java/com/linkedin/cdi/util/SchemaUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ private SchemaUtils() {
* @return true if first N columns are all existing in source and false other wise
*
*
* Example 1: definedColumns: [A, c], sourceColumns: [a, B, C] ==> true, B in source will be ignored in projection
* Example 2: definedColumns: [A, e], sourceColumns: [a, B, C] ==> false
* Example 3: definedColumns: [A, B, C], sourceColumns: [A, B] ==> true, C is assumed to be a derived field
* Example 1: definedColumns: [A, c], sourceColumns: [a, B, C] ==&gt; true, B in source will be ignored in projection
* Example 2: definedColumns: [A, e], sourceColumns: [a, B, C] ==&gt; false
* Example 3: definedColumns: [A, B, C], sourceColumns: [A, B] ==&gt; true, C is assumed to be a derived field
*
*/
public static boolean isValidSchemaDefinition(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.apache.gobblin.source.extractor.filebased.TimestampAwareFileBasedHelper;
import org.apache.gobblin.source.extractor.hadoop.HadoopFsHelper;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.testng.PowerMockTestCase;
import org.testng.Assert;
Expand All @@ -27,6 +28,7 @@

@Test
@PrepareForTest({HadoopFsHelper.class, TimestampAwareFileBasedHelper.class})
@PowerMockIgnore("jdk.internal.reflect.*")
public class HdfsReadConnectionTest extends PowerMockTestCase {
@Test
public void testGetFileList() throws Exception {
Expand Down
Loading

0 comments on commit a764088

Please sign in to comment.