Skip to content

Commit

Permalink
Merge pull request #150 from mohanvive/master
Browse files Browse the repository at this point in the history
Add support for Blocking IO for durable messaging
  • Loading branch information
pcnfernando authored Aug 23, 2019
2 parents 6edccc4 + bc6f50b commit 551bf8f
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.wso2.carbon.messaging.Header;
import org.wso2.transport.http.netty.contract.Constants;
import org.wso2.transport.http.netty.contract.HttpClientConnector;
import org.wso2.transport.http.netty.contract.HttpConnectorListener;
import org.wso2.transport.http.netty.contract.HttpResponseFuture;
import org.wso2.transport.http.netty.contract.config.ChunkConfig;
import org.wso2.transport.http.netty.contract.config.ProxyServerConfiguration;
Expand Down Expand Up @@ -167,6 +168,13 @@
type = {DataType.STRING},
optional = true,
defaultValue = "-"),
@Parameter(
name = "blocking.io",
description = "Blocks the request thread until a response it received from HTTP " +
"endpoint. This should be enabled for reliable messaging (error handling)",
type = {DataType.BOOL},
optional = true,
defaultValue = "false"),
@Parameter(
name = "headers",
description = "HTTP request headers in format `\"'<key>:<value>','<key>:<value>'\"`.\n" +
Expand Down Expand Up @@ -467,6 +475,7 @@ public class HttpSink extends Sink {
private long maxWaitTime;
private String hostnameVerificationEnabled;
private String sslVerificationDisabled;
private boolean isBlockingIO;

private DefaultHttpWsConnectorFactory httpConnectorFactory;

Expand Down Expand Up @@ -586,6 +595,9 @@ protected StateFactory init(StreamDefinition outputStreamDefinition, OptionHolde
authType = HttpConstants.NO_AUTH;
}

isBlockingIO = Boolean.parseBoolean(
optionHolder.validateAndGetStaticValue(HttpConstants.BLOCKING_IO, HttpConstants.FALSE));

initConnectorFactory();
if (publisherURLOption.isStatic()) {
initClientConnector(null);
Expand Down Expand Up @@ -649,7 +661,7 @@ private void sendOauthRequest(Object payload, DynamicOptions dynamicOptions, Lis

private void handleOAuthFailure(Object payload, DynamicOptions dynamicOptions, List<Header> headersList,
String encodedAuth) {
Boolean checkFromCache = accessTokenCache.checkAvailableKey(encodedAuth);
boolean checkFromCache = accessTokenCache.checkAvailableKey(encodedAuth);
if (checkFromCache) {
getNewAccessTokenWithCache(payload, dynamicOptions, headersList, encodedAuth);
} else {
Expand Down Expand Up @@ -752,7 +764,7 @@ private void requestForNewAccessToken(Object payload, DynamicOptions dynamicOpti
}
}

public void getAccessToken(DynamicOptions dynamicOptions, String encodedAuth, String tokenURL) {
void getAccessToken(DynamicOptions dynamicOptions, String encodedAuth, String tokenURL) {
this.tokenURL = tokenURL;
HttpsClient httpsClient = new HttpsClient();
if (!HttpConstants.EMPTY_STRING.equals(oauthUsername) &&
Expand All @@ -769,8 +781,8 @@ public void getAccessToken(DynamicOptions dynamicOptions, String encodedAuth, St
}
}

public void setAccessToken(String encodedAuth, DynamicOptions dynamicOptions,
List<Header> headersList) {
void setAccessToken(String encodedAuth, DynamicOptions dynamicOptions,
List<Header> headersList) {
//check the availability of the authorization
String accessToken;
boolean authAvailability = false;
Expand Down Expand Up @@ -870,9 +882,35 @@ private int sendRequest(Object payload, DynamicOptions dynamicOptions, List<Head
}
HttpCarbonMessage response = listener.getHttpResponseMessage();
return response.getNettyHttpResponse().status().code();
} else {
} else if (!isBlockingIO) {
clientConnector.send(cMessage);
return HttpConstants.SUCCESS_CODE;
} else {
CountDownLatch latch = new CountDownLatch(1);
HttpResponseFuture responseFuture = clientConnector.send(cMessage);
HTTPResponseListener responseListener = new HTTPResponseListener(latch);
responseFuture.setHttpConnectorListener(responseListener);

try {
boolean latchCount = latch.await(30, TimeUnit.SECONDS);
if (!latchCount) {
log.debug("Time out due to getting getting response from " + publisherURL + ". Message dropped.");
throw new HttpSinkAdaptorRuntimeException("Time out due to getting getting response from "
+ publisherURL + ". Message dropped.");

}
} catch (InterruptedException e) {
log.debug("Failed to get a response from " + publisherURL + "," + e + ". Message dropped.");
throw new HttpSinkAdaptorRuntimeException("Failed to get a response from " +
publisherURL + ", " + e + ". Message dropped.");
}

if (responseListener.throwable == null) {
return HttpConstants.SUCCESS_CODE;
} else {
throw new SiddhiAppRuntimeException("Siddhi App " + siddhiAppContext.getName() + " failed to publish " +
"events to HTTP endpoint", responseListener.throwable);
}
}
}

Expand Down Expand Up @@ -976,7 +1014,7 @@ String getMessageBody(Object payload) {
}
}

void initConnectorFactory() {
private void initConnectorFactory() {
//if bootstrap configurations are given then pass it if not let take default value of transport
if (!EMPTY_STRING.equals(bootstrapBoss) && !EMPTY_STRING.equals(bootstrapWorker)) {
if (!EMPTY_STRING.equals(bootstrapClient)) {
Expand Down Expand Up @@ -1029,8 +1067,7 @@ public void initClientConnector(DynamicOptions dynamicOptions) {
throw new SiddhiAppCreationException("Please provide user name and password in " +
HttpConstants.HTTP_SINK_ID + " with the stream " + streamID + " in Siddhi app " +
siddhiAppContext.getName());
} else if (!(EMPTY_STRING.equals(userName) || EMPTY_STRING.equals
(userPassword))) {
} else if (!(EMPTY_STRING.equals(userName))) {
byte[] val = (userName + HttpConstants.AUTH_USERNAME_PASSWORD_SEPARATOR + userPassword).getBytes(Charset
.defaultCharset());
this.authorizationHeader = HttpConstants.AUTHORIZATION_METHOD + Base64.encode
Expand Down Expand Up @@ -1115,4 +1152,24 @@ private String encodeBase64(String consumerKeyValue) {
ByteBuf encodedByteBuf = Base64.encode(byteBuf);
return encodedByteBuf.toString(StandardCharsets.UTF_8);
}

static class HTTPResponseListener implements HttpConnectorListener {
Throwable throwable;
CountDownLatch countDownLatch;

HTTPResponseListener(CountDownLatch latch) {
this.countDownLatch = latch;
}

@Override
public void onMessage(HttpCarbonMessage httpCarbonMessage) {
countDownLatch.countDown();
}

@Override
public void onError(Throwable throwable) {
this.throwable = throwable;
countDownLatch.countDown();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import com.sun.net.httpserver.Headers;
import io.siddhi.core.SiddhiAppRuntime;
import io.siddhi.core.SiddhiManager;
import io.siddhi.core.stream.StreamJunction;
import io.siddhi.core.stream.input.InputHandler;
import io.siddhi.extension.io.http.sink.util.HttpServerListenerHandler;
import io.siddhi.extension.io.http.sink.util.UnitTestAppender;
import io.siddhi.extension.map.xml.sinkmapper.XMLSinkMapper;
import org.apache.log4j.Logger;
import org.testng.Assert;
Expand Down Expand Up @@ -154,4 +156,46 @@ public void testHTTPContentTypeAtHeaders() throws Exception {
siddhiAppRuntime.shutdown();
lst.shutdown();
}

/**
* Creating test to check the connection validity when publishing events.
*
* @throws Exception Interrupted exception
*/
@Test
public void testHTTPConnectionFailure() throws InterruptedException {
log.info("Creating test for publishing events to invalid HTTP endpoint.");
SiddhiManager siddhiManager = new SiddhiManager();
siddhiManager.setExtension("xml-output-mapper", XMLSinkMapper.class);
String inStreamDefinition = "Define stream FooStream (message String,method String,headers String);"
+ "@sink(type='http',blocking.io='true',publisher.url='http://localhost:8010/abcd',method='{{method}}',"
+ "headers='{{headers}}',"
+ "@map(type='xml', @payload('{{message}}'))) "
+ "Define stream BarStream (message String,method String,headers String);";
String query = (
"@info(name = 'query') "
+ "from FooStream "
+ "select message,method,headers "
+ "insert into BarStream;"
);

SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(inStreamDefinition + query);
InputHandler fooStream = siddhiAppRuntime.getInputHandler("FooStream");
siddhiAppRuntime.start();
Logger logger = Logger.getLogger(StreamJunction.class);
UnitTestAppender appender = new UnitTestAppender();
logger.addAppender(appender);

try {
fooStream.send(1566562744069L, new Object[]{payload, "POST", "'Name:John','Age:23'"});
String expectedMessage = "failed to publish events to HTTP endpoint. Hence, dropping event " +
"'StreamEvent{ timestamp=1566562744069, beforeWindowData=null, onAfterWindowData=null, " +
"outputData=[<events><event><symbol>WSO2</symbol><price>55.645</price><volume>100</volume>" +
"</event></events>, POST, 'Name:John','Age:23'], type=CURRENT, next=null}";
Assert.assertTrue(appender.getMessages().contains(expectedMessage));
} finally {
logger.removeAppender(appender);
siddhiAppRuntime.shutdown();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.siddhi.extension.io.http.sink.util;

import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.spi.LoggingEvent;

public class UnitTestAppender extends AppenderSkeleton {
private String messages;

public String getMessages() {
return messages;
}

@Override
protected void append(LoggingEvent loggingEvent) {
messages = loggingEvent.getRenderedMessage();
}

@Override
public void close() {

}

@Override
public boolean requiresLayout() {
return false;
}
}

0 comments on commit 551bf8f

Please sign in to comment.