Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding AWS SigV4 authentication #240

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ dependencies {
implementation "org.slf4j:slf4j-api:$slf4jVersion"
implementation "com.google.code.gson:gson:2.10.1"
implementation "org.opensearch.client:opensearch-rest-high-level-client:$openSearchVersion"
implementation "io.github.acm19:aws-request-signing-apache-interceptor:2.3.1"

testImplementation "org.junit.jupiter:junit-jupiter:5.10.0"
testImplementation "org.mockito:mockito-core:5.5.0"
Expand Down
24 changes: 24 additions & 0 deletions docs/opensearch-sink-connector-config-options.rst
Original file line number Diff line number Diff line change
Expand Up @@ -231,4 +231,28 @@ Authentication
* Default: null
* Importance: medium

AWS Authentication SigV4
^^^^^^^^^^^^^^^^^^^^^^^^

``aws.access_key_id``
AWS Access key id, this field is required to enable AWS SigV4 request signing

* Type: string
* Default: null
* Importance: medium

``aws.region``
AWS Region, eg us-east-1. This field is required to enable AWS SigV4 request signing

* Type: string
* Default: null
* Importance: medium

``aws.secret_access_key``
AWS secret access key, this field is required to enable AWS SigV4 request signing

* Type: password
* Default: null
* Importance: medium


Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Copyright 2019 Aiven Oy
* Copyright 2016 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.opensearch;

import java.util.Objects;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;
import org.apache.kafka.common.config.types.Password;

import io.aiven.kafka.connect.opensearch.spi.ConfigDefContributor;
import io.aiven.kafka.connect.opensearch.spi.OpensearchClientConfigurator;

import io.github.acm19.aws.interceptor.http.AwsRequestSigningApacheInterceptor;
import org.apache.http.HttpRequestInterceptor;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.auth.signer.Aws4Signer;
import software.amazon.awssdk.regions.Region;


/**
* Adds AWS SigV4 authentication to the {@index HttpAsyncClientBuilder} for Opensearch client
* if configured.
*/
public class OpensearchSigV4Configurator
implements OpensearchClientConfigurator, ConfigDefContributor {

public static final String AWS_ACCESS_KEY_ID_CONFIG = "aws.access_key_id";
public static final String AWS_SECRET_ACCESS_KEY_CONFIG = "aws.secret_access_key";
public static final String AWS_REGION_CONFIG = "aws.region";
private static final String AWS_ACCESS_KEY_ID_DOC =
"AWS Access key id, this field is required "
+ "to enable AWS SigV4 request signing";
private static final String AWS_SECRET_ACCESS_KEY_DOC =
"AWS secret access key, this field is required "
+ "to enable AWS SigV4 request signing";
private static final String AWS_REGION_DOC =
"AWS Region, eg us-east-1. This field is required "
+ "to enable AWS SigV4 request signing";

private static final String AWS_GROUP_NAME = "AWS Authentication SigV4";

private static boolean isAuthenticatedConnection(final OpensearchSinkConnectorConfig config) {
return Objects.nonNull(awsAccessKeyId(config))
&& Objects.nonNull(awsSecretAccessKey(config))
&& Objects.nonNull(awsRegion(config));
}

private static String awsRegion(final OpensearchSinkConnectorConfig config) {
return config.getString(AWS_REGION_CONFIG);
}

private static String awsAccessKeyId(final OpensearchSinkConnectorConfig config) {
return config.getString(AWS_ACCESS_KEY_ID_CONFIG);
}

private static Password awsSecretAccessKey(final OpensearchSinkConnectorConfig config) {
return config.getPassword(AWS_SECRET_ACCESS_KEY_CONFIG);
}

@Override
public boolean apply(final OpensearchSinkConnectorConfig config,
final HttpAsyncClientBuilder builder) {
if (!isAuthenticatedConnection(config)) {
return false;
}

final AwsCredentials credentials = AwsBasicCredentials.create(
awsAccessKeyId(config),
awsSecretAccessKey(config).value());
final StaticCredentialsProvider credentialsProvider = StaticCredentialsProvider.create(credentials);

final HttpRequestInterceptor awsSignerInterceptor = new AwsRequestSigningApacheInterceptor(
"es",
Aws4Signer.create(),
credentialsProvider,
Region.of(config.getString(AWS_REGION_CONFIG))
);

builder.addInterceptorLast(awsSignerInterceptor);

return true;
}

@Override
public void addConfig(final ConfigDef config) {
config
.define(
AWS_ACCESS_KEY_ID_CONFIG,
Type.STRING,
null,
Importance.MEDIUM,
AWS_ACCESS_KEY_ID_DOC,
AWS_GROUP_NAME,
0,
Width.SHORT,
"Access Key Id"
).define(
AWS_SECRET_ACCESS_KEY_CONFIG,
Type.PASSWORD,
null,
Importance.MEDIUM,
AWS_SECRET_ACCESS_KEY_DOC,
AWS_GROUP_NAME,
1,
Width.SHORT,
"Secret Access Key"
).define(
AWS_REGION_CONFIG,
Type.STRING,
null,
Importance.MEDIUM,
AWS_REGION_DOC,
AWS_GROUP_NAME,
1,
Width.SHORT,
"Region");
}
}
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
io.aiven.kafka.connect.opensearch.OpensearchBasicAuthConfigurator
io.aiven.kafka.connect.opensearch.OpensearchSigV4Configurator
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
io.aiven.kafka.connect.opensearch.OpensearchBasicAuthConfigurator
io.aiven.kafka.connect.opensearch.OpensearchSigV4Configurator
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright 2019 Aiven Oy
* Copyright 2016 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.opensearch;

import java.util.Map;

import org.apache.http.HttpRequestInterceptor;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;

@ExtendWith(MockitoExtension.class)
public class OpensearchSigV4ConfiguratorTest {

@Test
void testApplyInterceptor(final @Mock HttpAsyncClientBuilder httpBuilder) {
final var config = new OpensearchSinkConnectorConfig(
Map.of(
OpensearchSinkConnectorConfig.CONNECTION_URL_CONFIG, "http://localhost",
OpensearchSigV4Configurator.AWS_ACCESS_KEY_ID_CONFIG, "id",
OpensearchSigV4Configurator.AWS_SECRET_ACCESS_KEY_CONFIG, "secret",
OpensearchSigV4Configurator.AWS_REGION_CONFIG, "us-east-1"
)
);
final OpensearchSigV4Configurator configurator = new OpensearchSigV4Configurator();
configurator.apply(config, httpBuilder);

verify(httpBuilder).addInterceptorLast(any(HttpRequestInterceptor.class));
}

@Test
void testConfigMissing(final @Mock HttpAsyncClientBuilder httpBuilder) {
final var config = new OpensearchSinkConnectorConfig(
Map.of(OpensearchSinkConnectorConfig.CONNECTION_URL_CONFIG, "http://localhost")
);
final OpensearchSigV4Configurator configurator = new OpensearchSigV4Configurator();
configurator.apply(config, httpBuilder);

verify(httpBuilder, never()).addInterceptorLast(any(HttpRequestInterceptor.class));
}

@Test
void testAccessKeyIdMissing(final @Mock HttpAsyncClientBuilder httpBuilder) {
final var config = new OpensearchSinkConnectorConfig(
Map.of(
OpensearchSinkConnectorConfig.CONNECTION_URL_CONFIG, "http://localhost",
OpensearchSigV4Configurator.AWS_SECRET_ACCESS_KEY_CONFIG, "secret",
OpensearchSigV4Configurator.AWS_REGION_CONFIG, "us-east-1"
)
);
final OpensearchSigV4Configurator configurator = new OpensearchSigV4Configurator();
configurator.apply(config, httpBuilder);

verify(httpBuilder, never()).addInterceptorLast(any(HttpRequestInterceptor.class));
}

@Test
void testSecretAccessKeyMissing(final @Mock HttpAsyncClientBuilder httpBuilder) {
final var config = new OpensearchSinkConnectorConfig(
Map.of(
OpensearchSinkConnectorConfig.CONNECTION_URL_CONFIG, "http://localhost",
OpensearchSigV4Configurator.AWS_ACCESS_KEY_ID_CONFIG, "id",
OpensearchSigV4Configurator.AWS_REGION_CONFIG, "us-east-1"
)
);
final OpensearchSigV4Configurator configurator = new OpensearchSigV4Configurator();
configurator.apply(config, httpBuilder);

verify(httpBuilder, never()).addInterceptorLast(any(HttpRequestInterceptor.class));
}

@Test
void testRegionMissing(final @Mock HttpAsyncClientBuilder httpBuilder) {
final var config = new OpensearchSinkConnectorConfig(
Map.of(
OpensearchSinkConnectorConfig.CONNECTION_URL_CONFIG, "http://localhost",
OpensearchSigV4Configurator.AWS_ACCESS_KEY_ID_CONFIG, "id",
OpensearchSigV4Configurator.AWS_SECRET_ACCESS_KEY_CONFIG, "secret"
)
);
final OpensearchSigV4Configurator configurator = new OpensearchSigV4Configurator();
configurator.apply(config, httpBuilder);

verify(httpBuilder, never()).addInterceptorLast(any(HttpRequestInterceptor.class));
}
}
Loading