Skip to content

Commit

Permalink
Refactor metacat-connector-druid http client to support custom extens…
Browse files Browse the repository at this point in the history
…ions
  • Loading branch information
jtuglu-netflix committed Dec 5, 2024
1 parent 52066ec commit fb89b1f
Show file tree
Hide file tree
Showing 8 changed files with 123 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@
/**
* Druid Config Constants.
*
* @author zhenl
* @author zhenl jtuglu
* @since 1.2.0
*/
public final class DruidConfigConstants {
/**
* DRUID_COORDINATOR_URI.
* DRUID_ROUTER_URI .
*/
public static final String DRUID_COORDINATOR_URI = "druid.uri";
public static final String DRUID_ROUTER_URI = "druid.uri";

//Http client
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,30 @@
import com.netflix.metacat.common.server.connectors.ConnectorTableService;
import com.netflix.metacat.common.server.connectors.SpringConnectorFactory;
import com.netflix.metacat.connector.druid.configs.DruidConnectorConfig;
import com.netflix.metacat.connector.druid.configs.DruidHttpClientConfig;
import com.netflix.metacat.connector.druid.configs.BaseDruidHttpClientConfig;
import com.netflix.metacat.connector.druid.converter.DruidConnectorInfoConverter;

/**
* Druid Connector Factory.
*
* @author zhenl
* @author zhenl jtuglu
* @since 1.2.0
*/
public class DruidConnectorFactory extends SpringConnectorFactory {
/**
* Constructor.
*
* @param connectorContext connector config
* @param druidConnectorInfoConverter connector info converter
* @param connectorContext connector config
* @param clientConfigClass config class to register
*/
DruidConnectorFactory(
public DruidConnectorFactory(
final DruidConnectorInfoConverter druidConnectorInfoConverter,
final ConnectorContext connectorContext
final ConnectorContext connectorContext,
final Class<? extends BaseDruidHttpClientConfig> clientConfigClass
) {
super(druidConnectorInfoConverter, connectorContext);
super.registerClazz(DruidConnectorConfig.class, DruidHttpClientConfig.class);
super.registerClazz(DruidConnectorConfig.class, clientConfigClass);
super.refresh();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@
import com.netflix.metacat.common.server.connectors.ConnectorFactory;
import com.netflix.metacat.common.server.connectors.ConnectorPlugin;
import com.netflix.metacat.common.server.connectors.ConnectorTypeConverter;
import com.netflix.metacat.connector.druid.configs.DruidHttpClientConfig;
import com.netflix.metacat.connector.druid.converter.DruidConnectorInfoConverter;

/**
* Druid Connector Plugin.
*
* @author zhenl
* @author zhenl jtuglu
* @since 1.2.0
*/
public class DruidConnectorPlugin implements ConnectorPlugin {
Expand All @@ -45,7 +46,10 @@ public String getType() {
@Override
public ConnectorFactory create(final ConnectorContext connectorContext) {
return new DruidConnectorFactory(
new DruidConnectorInfoConverter(connectorContext.getCatalogName()), connectorContext);
new DruidConnectorInfoConverter(connectorContext.getCatalogName()),
connectorContext,
DruidHttpClientConfig.class
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,35 +37,37 @@
/**
* DruidHttpClientImpl.
*
* @author zhenl
* @author zhenl jtuglu
* @since 1.2.0
*/
@Slf4j
public class DruidHttpClientImpl implements MetacatDruidClient {
private String druidURI;
private final RestTemplate restTemplate;
private final MetacatJsonLocator jsonLocator = new MetacatJsonLocator();
public class BaseDruidClientImpl implements MetacatDruidClient {
protected String druidURI;
protected final RestTemplate restTemplate;
protected final MetacatJsonLocator jsonLocator = new MetacatJsonLocator();

/**
* Constructor.
*
* @param connectorContext connector context
* @param restTemplate rest template
*/
public DruidHttpClientImpl(final ConnectorContext connectorContext,
final RestTemplate restTemplate) {
public BaseDruidClientImpl(
final ConnectorContext connectorContext,
final RestTemplate restTemplate
) {
this.restTemplate = restTemplate;
final Map<String, String> config = connectorContext.getConfiguration();
final String coordinatorUri = config.get(DruidConfigConstants.DRUID_COORDINATOR_URI);
if (coordinatorUri == null) {
final String routerUri = config.get(DruidConfigConstants.DRUID_ROUTER_URI);
if (routerUri == null) {
throw new MetacatException("Druid cluster ending point not provided.");
}
try {
new URI(coordinatorUri);
new URI(routerUri);
} catch (URISyntaxException exception) {
throw new MetacatException("Druid ending point invalid");
}
this.druidURI = coordinatorUri;
this.druidURI = routerUri;
log.info("druid server uri={}", this.druidURI);
}

Expand Down Expand Up @@ -100,7 +102,7 @@ public ObjectNode getLatestDataByName(final String dataSourceName) {
if (result == null) {
throw new MetacatException(String.format("Druid cluster: %s result not found.", dataSourceName));
}
final String latestSegment = DruidHttpClientUtil.getLatestSegment(result);
final String latestSegment = DruidClientUtil.getLatestSegment(result);
log.debug("Get the latest segment {}", latestSegment);
url = String.format(druidURI + "/%s/segments/%s", dataSourceName, latestSegment);
result = restTemplate.getForObject(url, String.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
/**
* DruidHttpClientUtil.
*
* @author zhenl
* @author zhenl jtuglu
* @since 1.2.0
*/
public final class DruidHttpClientUtil {
private DruidHttpClientUtil() {
public final class DruidClientUtil {
private DruidClientUtil() {
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package com.netflix.metacat.connector.druid.configs;

import com.netflix.metacat.common.server.connectors.ConnectorContext;
import com.netflix.metacat.common.server.connectors.util.TimeUtil;
import com.netflix.metacat.connector.druid.DruidConfigConstants;
import com.netflix.metacat.connector.druid.MetacatDruidClient;
import org.apache.http.client.HttpClient;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
import org.springframework.web.client.RestTemplate;

import java.util.concurrent.TimeUnit;

/**
* DruidHttpClientConfig.
*
* @author zhenl jtuglu
* @since 1.2.0
*/
public abstract class BaseDruidHttpClientConfig {
/**
* Druid client instance.
*
* @param connectorContext connector context
* @param restTemplate rest template
* @return MetacatDruidClient
*/
@Bean
public abstract MetacatDruidClient createMetacatDruidClient(
final ConnectorContext connectorContext,
final RestTemplate restTemplate
);

/**
* Rest template.
*
* @param connectorContext connector context
* @return RestTemplate
*/
@Bean
public RestTemplate restTemplate(final ConnectorContext connectorContext) {
return new RestTemplate(new HttpComponentsClientHttpRequestFactory(httpClient(connectorContext)));
}

/**
* Http client.
*
* @param connectorContext connector context
* @return HttpClient
*/
@Bean
public HttpClient httpClient(final ConnectorContext connectorContext) {
final int timeout = (int) TimeUtil.toTime(
connectorContext.getConfiguration().getOrDefault(DruidConfigConstants.HTTP_TIMEOUT, "5s"),
TimeUnit.SECONDS,
TimeUnit.MILLISECONDS
);
final int poolSize = Integer.parseInt(connectorContext.getConfiguration()
.getOrDefault(DruidConfigConstants.POOL_SIZE, "10"));
final RequestConfig config = RequestConfig.custom()
.setConnectTimeout(timeout)
.setConnectionRequestTimeout(timeout)
.setSocketTimeout(timeout)
.setMaxRedirects(3)
.build();
final PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager();
connectionManager.setMaxTotal(poolSize);
return HttpClientBuilder
.create()
.setDefaultRequestConfig(config)
.setConnectionManager(connectionManager)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,83 +17,30 @@
package com.netflix.metacat.connector.druid.configs;

import com.netflix.metacat.common.server.connectors.ConnectorContext;
import com.netflix.metacat.common.server.connectors.util.TimeUtil;
import com.netflix.metacat.connector.druid.DruidConfigConstants;
import com.netflix.metacat.connector.druid.MetacatDruidClient;
import com.netflix.metacat.connector.druid.client.DruidHttpClientImpl;
import org.apache.http.client.HttpClient;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.springframework.context.annotation.Bean;
import com.netflix.metacat.connector.druid.client.BaseDruidClientImpl;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
import org.springframework.web.client.RestTemplate;

import java.net.UnknownHostException;
import java.util.concurrent.TimeUnit;

/**
* DruidHttpClientConfig.
*
* @author zhenl
* @author zhenl jtuglu
* @since 1.2.0
*/
@Configuration
public class DruidHttpClientConfig {
public class DruidHttpClientConfig extends BaseDruidHttpClientConfig {
/**
* Druid client instance.
*
* @param connectorContext connector context
* @param restTemplate rest template
* @return MetacatDruidClient
* @throws UnknownHostException exception for unknownhost
*/
@Bean
@Override
public MetacatDruidClient createMetacatDruidClient(
final ConnectorContext connectorContext,
final RestTemplate restTemplate) throws UnknownHostException {
return new DruidHttpClientImpl(connectorContext, restTemplate);
}

/**
* Rest template.
*
* @param connectorContext connector context
* @return RestTemplate
*/
@Bean
public RestTemplate restTemplate(final ConnectorContext connectorContext) {
return new RestTemplate(new HttpComponentsClientHttpRequestFactory(httpClient(connectorContext)));
}

/**
* Http client.
*
* @param connectorContext connector context
* @return HttpClient
*/
@Bean
public HttpClient httpClient(final ConnectorContext connectorContext) {
final int timeout = (int) TimeUtil.toTime(
connectorContext.getConfiguration().getOrDefault(DruidConfigConstants.HTTP_TIMEOUT, "5s"),
TimeUnit.SECONDS,
TimeUnit.MILLISECONDS
);
final int poolsize = Integer.parseInt(connectorContext.getConfiguration()
.getOrDefault(DruidConfigConstants.POOL_SIZE, "10"));
final RequestConfig config = RequestConfig.custom()
.setConnectTimeout(timeout)
.setConnectionRequestTimeout(timeout)
.setSocketTimeout(timeout)
.setMaxRedirects(3)
.build();
final PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager();
connectionManager.setMaxTotal(poolsize);
return HttpClientBuilder
.create()
.setDefaultRequestConfig(config)
.setConnectionManager(connectionManager)
.build();
final RestTemplate restTemplate) {
return new BaseDruidClientImpl(connectorContext, restTemplate);
}
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
package com.netflix.metacat.connector.druid

import com.netflix.metacat.connector.druid.client.DruidHttpClientUtil
import com.netflix.metacat.connector.druid.client.DruidClientUtil
import spock.lang.Specification

/**
* DruidHttpClientUtilSpec.
* @author zhenl
* @author zhenl jtuglu
* @since 1.2.0
*/
class DruidHttpClientUtilSpec extends Specification{
class DruidClientUtilSpec extends Specification {

def "Test for getLatestDataByName"() {

when:
def ret = DruidHttpClientUtil.getLatestSegment(getInput())
def ret = DruidClientUtil.getLatestSegment(getInput())

then:
ret.equals("algodash_map_row_report_agg_2016-09-03T00:00:00.000Z_2016-09-04T00:00:00.000Z_2016-10-10T21:19:50.893Z");
Expand Down

0 comments on commit fb89b1f

Please sign in to comment.