diff --git a/component/src/main/java/io/siddhi/extension/io/http/sink/HttpSink.java b/component/src/main/java/io/siddhi/extension/io/http/sink/HttpSink.java index 4a70a07e..5ba6fad9 100644 --- a/component/src/main/java/io/siddhi/extension/io/http/sink/HttpSink.java +++ b/component/src/main/java/io/siddhi/extension/io/http/sink/HttpSink.java @@ -18,9 +18,7 @@ */ package io.siddhi.extension.io.http.sink; -import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; -import io.netty.handler.codec.base64.Base64; import io.netty.handler.codec.http.DefaultHttpRequest; import io.netty.handler.codec.http.DefaultLastHttpContent; import io.netty.handler.codec.http.HttpHeaders; @@ -610,8 +608,8 @@ protected StateFactory init(StreamDefinition outputStreamDefinition, OptionHolde } else if (!(EMPTY_STRING.equals(userName))) { byte[] val = (userName + HttpConstants.AUTH_USERNAME_PASSWORD_SEPARATOR + userPassword).getBytes(Charset .defaultCharset()); - this.authorizationHeader = HttpConstants.AUTHORIZATION_METHOD + Base64.encode - (Unpooled.copiedBuffer(val)); + this.authorizationHeader = HttpConstants.AUTHORIZATION_METHOD + HttpIoUtil.encodeBase64(new String(val, + StandardCharsets.UTF_8)); } proxyServerConfiguration = createProxyServerConfiguration(optionHolder, streamID, siddhiAppContext.getName()); @@ -755,7 +753,7 @@ protected void sendOauthRequest(Object payload, DynamicOptions dynamicOptions, L } else { consumerKeyValue = bodyConsumerKey + ":" + bodyConsumerSecret; } - String encodedAuth = "Basic " + encodeBase64(consumerKeyValue) + String encodedAuth = "Basic " + HttpIoUtil.encodeBase64(consumerKeyValue) .replaceAll(HttpConstants.NEW_LINE, HttpConstants.EMPTY_STRING); //check the availability of access token in the header setAccessToken(encodedAuth, dynamicOptions, headersList, clientConnector.getPublisherURL()); @@ -1187,12 +1185,6 @@ private String encodeMessage(Object s) { } } - private String encodeBase64(String consumerKeyValue) { - ByteBuf byteBuf = Unpooled.wrappedBuffer(consumerKeyValue.getBytes(StandardCharsets.UTF_8)); - ByteBuf encodedByteBuf = Base64.encode(byteBuf); - return encodedByteBuf.toString(StandardCharsets.UTF_8); - } - private class HTTPResponseListener implements HttpConnectorListener { Object payload; DynamicOptions dynamicOptions; diff --git a/component/src/main/java/io/siddhi/extension/io/http/source/SSESource.java b/component/src/main/java/io/siddhi/extension/io/http/source/SSESource.java index 5bbd7bb2..ac4b0ec1 100644 --- a/component/src/main/java/io/siddhi/extension/io/http/source/SSESource.java +++ b/component/src/main/java/io/siddhi/extension/io/http/source/SSESource.java @@ -17,8 +17,6 @@ package io.siddhi.extension.io.http.source; -import io.netty.buffer.Unpooled; -import io.netty.handler.codec.base64.Base64; import io.netty.handler.codec.http.DefaultHttpRequest; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpMethod; @@ -44,6 +42,7 @@ import io.siddhi.extension.io.http.sink.util.HttpSinkUtil; import io.siddhi.extension.io.http.util.HTTPSourceRegistry; import io.siddhi.extension.io.http.util.HttpConstants; +import io.siddhi.extension.io.http.util.HttpIoUtil; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.wso2.carbon.messaging.Header; @@ -57,6 +56,7 @@ import org.wso2.transport.http.netty.message.HttpCarbonMessage; import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; import java.util.Objects; @@ -355,8 +355,8 @@ private String validateAndGetAuthType() { } else if (!(EMPTY_STRING.equals(userName))) { byte[] val = (userName + HttpConstants.AUTH_USERNAME_PASSWORD_SEPARATOR + userPassword).getBytes(Charset .defaultCharset()); - this.authorizationHeader = HttpConstants.AUTHORIZATION_METHOD + Base64.encode - (Unpooled.copiedBuffer(val)); + this.authorizationHeader = HttpConstants.AUTHORIZATION_METHOD + HttpIoUtil.encodeBase64(new String(val, + StandardCharsets.UTF_8)); } if (!HttpConstants.EMPTY_STRING.equals(userName) && !HttpConstants.EMPTY_STRING.equals(userPassword)) { diff --git a/component/src/main/java/io/siddhi/extension/io/http/util/HttpIoUtil.java b/component/src/main/java/io/siddhi/extension/io/http/util/HttpIoUtil.java index 27ba80ad..2845221d 100644 --- a/component/src/main/java/io/siddhi/extension/io/http/util/HttpIoUtil.java +++ b/component/src/main/java/io/siddhi/extension/io/http/util/HttpIoUtil.java @@ -18,7 +18,9 @@ */ package io.siddhi.extension.io.http.util; +import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import io.netty.handler.codec.base64.Base64; import io.netty.handler.codec.http.DefaultHttpResponse; import io.netty.handler.codec.http.DefaultLastHttpContent; import io.netty.handler.codec.http.HttpHeaderNames; @@ -48,6 +50,7 @@ import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -334,4 +337,15 @@ public static CompiledCondition createTableDeleteResource(Map tab return table.compileCondition(condition, matchingMetaInfoHolder, null, tableMap, siddhiQueryContext); } + + /** + * Encode the given value using Base64 encoding scheme. + * @param value value to be encoded + * @return encoded value + */ + public static String encodeBase64(String value) { + ByteBuf byteBuf = Unpooled.wrappedBuffer(value.getBytes(StandardCharsets.UTF_8)); + ByteBuf encodedByteBuf = Base64.encode(byteBuf); + return encodedByteBuf.toString(StandardCharsets.UTF_8); + } } diff --git a/component/src/test/java/io/siddhi/extension/io/http/sink/HttpAuthTestCase.java b/component/src/test/java/io/siddhi/extension/io/http/sink/HttpAuthTestCase.java index 02711a30..d8f7a5a8 100644 --- a/component/src/test/java/io/siddhi/extension/io/http/sink/HttpAuthTestCase.java +++ b/component/src/test/java/io/siddhi/extension/io/http/sink/HttpAuthTestCase.java @@ -24,6 +24,7 @@ import io.siddhi.core.stream.input.InputHandler; import io.siddhi.extension.io.http.sink.exception.HttpSinkAdaptorRuntimeException; import io.siddhi.extension.io.http.sink.util.HttpServerListenerHandler; +import io.siddhi.extension.io.http.source.util.Constants; import io.siddhi.extension.map.xml.sinkmapper.XMLSinkMapper; import io.siddhi.query.api.exception.SiddhiAppValidationException; import org.apache.logging.log4j.LogManager; @@ -31,6 +32,8 @@ import org.testng.Assert; import org.testng.annotations.Test; +import java.util.List; + /** * test cases for basic authentication. */ @@ -235,4 +238,55 @@ public void testHTTPWithoutURL() throws Exception { lst.shutdown(); } + @Test(dependsOnMethods = "testHTTPWithoutURL") + public void testHttpCallSinkBasicAuthTrue() throws Exception { + log.info("Creating test for publishing events with basic authentication true."); + SiddhiManager siddhiManager = new SiddhiManager(); + siddhiManager.setExtension("xml-output-mapper", XMLSinkMapper.class); + String inStreamDefinition = "Define stream FooStream (message String,method String,headers String);" + + "@sink(type='http-call',sink.id='foo',publisher.url='http://localhost:8005/abc',method='{{method}}'," + + "headers='{{headers}}', basic.auth.username='admin',basic.auth.password='admin'," + + "@map(type='xml', " + + "@payload('{{message}}'))) " + + "Define stream BarStream (message String,method String,headers String);" + + "@source(type='http-call-response', sink.id='foo', @map(type='text', regex.A='(.*)', " + + "@attributes(message='A')))" + + "define stream ResponseStream(message string);"; + String query = ("@info(name = 'query') " + + "from FooStream " + + "select message,method,headers " + + "insert into BarStream;" + ); + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(inStreamDefinition + + query); + InputHandler fooStream = siddhiAppRuntime.getInputHandler("FooStream"); + siddhiAppRuntime.start(); + HttpServerListenerHandler lst = new HttpServerListenerHandler(8005); + lst.run(); + String payload = "" + + "" + + "WSO2" + + "55.645" + + "100" + + "" + + ""; + fooStream.send(new Object[]{payload, "POST", "'Name:John','Age:23'"}); + while (!lst.getServerListener().isMessageArrive()) { + Thread.sleep(10); + } + List authHeader = lst.getServerListener().getHeaders().get(Constants.BASIC_AUTH_HEADER); + String authHeaderValue = (authHeader != null && authHeader.size() > 0) ? authHeader.get(0) : null; + Assert.assertEquals(authHeaderValue, Constants.BASIC_AUTH_HEADER_VALUE, "Invalid basic auth header present"); + String eventData = lst.getServerListener().getData(); + String expected = "" + + "" + + "WSO2" + + "55.645" + + "100" + + "" + + "\n"; + Assert.assertEquals(eventData, expected); + siddhiAppRuntime.shutdown(); + lst.shutdown(); + } } diff --git a/component/src/test/java/io/siddhi/extension/io/http/source/SSESourceTestCase.java b/component/src/test/java/io/siddhi/extension/io/http/source/SSESourceTestCase.java index 1e000964..8ed131c9 100644 --- a/component/src/test/java/io/siddhi/extension/io/http/source/SSESourceTestCase.java +++ b/component/src/test/java/io/siddhi/extension/io/http/source/SSESourceTestCase.java @@ -28,6 +28,8 @@ import io.siddhi.core.util.SiddhiTestHelper; import io.siddhi.core.util.persistence.InMemoryPersistenceStore; import io.siddhi.core.util.persistence.PersistenceStore; +import io.siddhi.extension.io.http.sink.util.HttpServerListenerHandler; +import io.siddhi.extension.io.http.source.util.Constants; import io.siddhi.extension.io.http.util.HttpConstants; import io.siddhi.extension.map.json.sinkmapper.JsonSinkMapper; import io.siddhi.extension.map.json.sourcemapper.JsonSourceMapper; @@ -60,7 +62,7 @@ public class SSESourceTestCase { private HttpServer sseServer; private ThreadPoolExecutor threadPoolExecutor; - @BeforeMethod + @BeforeMethod(groups = "event-server") public void init() { eventCount.set(0); try { @@ -104,7 +106,7 @@ public void handle(HttpExchange httpExchange) throws IOException { } } - @Test + @Test(groups = "event-server") public void testSSESource() throws Exception { List receivedEventList = new ArrayList<>(2); PersistenceStore persistenceStore = new InMemoryPersistenceStore(); @@ -147,7 +149,31 @@ public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) { siddhiAppRuntime.shutdown(); } - @AfterMethod + @Test(dependsOnMethods = "testSSESource") + public void testSSESourceWithBasicAuth() throws Exception { + SiddhiManager siddhiManager = new SiddhiManager(); + siddhiManager.setExtension("json-output-mapper", JsonSinkMapper.class); + siddhiManager.setExtension("json-input-mapper", JsonSourceMapper.class); + String sourceStreamDefinition = "@Source(type='sse', receiver.url='http://localhost:8005/abc', " + + "basic.auth.username='admin', basic.auth.password='admin',\n" + + "@map(type='json'))\n" + + "define stream ReceiveProductionStream (param1 string);\n"; + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(sourceStreamDefinition); + siddhiAppRuntime.start(); + HttpServerListenerHandler lst = new HttpServerListenerHandler(8005); + lst.run(); + while (!lst.getServerListener().isMessageArrive()) { + Thread.sleep(10); + } + List authHeader = lst.getServerListener().getHeaders().get(Constants.BASIC_AUTH_HEADER); + String authHeaderValue = (authHeader != null && authHeader.size() > 0) ? authHeader.get(0) : null; + Assert.assertEquals(authHeaderValue, Constants.BASIC_AUTH_HEADER_VALUE, "Invalid basic auth header present"); + siddhiAppRuntime.shutdown(); + lst.shutdown(); + } + + + @AfterMethod(groups = "event-server") public void destroy() { sseServer.stop(1); threadPoolExecutor.shutdownNow(); diff --git a/component/src/test/java/io/siddhi/extension/io/http/source/util/Constants.java b/component/src/test/java/io/siddhi/extension/io/http/source/util/Constants.java new file mode 100644 index 00000000..45854983 --- /dev/null +++ b/component/src/test/java/io/siddhi/extension/io/http/source/util/Constants.java @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2023 WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package io.siddhi.extension.io.http.source.util; + +/** + * Constants used by the test cases. + */ +public class Constants { + public static final String BASIC_AUTH_HEADER = "Authorization"; + public static final String BASIC_AUTH_HEADER_VALUE = "Basic YWRtaW46YWRtaW4="; + +}