Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow multiple implementations of KieServerController through the ServiceLoader #3069

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,16 @@
public interface KieServerController {

/**
* Entry point for for KieServer to connect(and register if done for the first time). At the same time, when given KieServerInstance
* Entry point for KieServer to connect(and register if done for the first time). At the same time, when given KieServerInstance
* has been already added a KieServerSetup with data will be returned. Otherwise empty (or default) KieServerSetup will be provided.
* @param serverInfo representation of minimal set of information about KieServer
* @return KieServer configuration
*/
KieServerSetup connect(KieServerInfo serverInfo);

/**
* Entry point for for KieServer to update its status information.
* @param serverInfo representation of minimal set of information about KieServer
* Entry point for KieServer to update its status information.
* @param containerSpec representation of minimal set of information about KieServer
*/

default KieServerSetup update(KieServerStateInfo containerSpec) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package org.kie.server.controller.websocket.client;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -51,10 +53,9 @@ public class WebSocketKieServerControllerImpl implements KieServerController, Ki
private KieServerRegistry context;
private final KieServerMessageHandlerWebSocketClient client;
private final Marshaller marshaller;

private final DefaultRestControllerImpl restController = new DefaultRestControllerImpl();

private KieServerInfo serverInfo;

private DefaultRestControllerImpl restController;

public WebSocketKieServerControllerImpl() {
this.marshaller = MarshallerFactory.getMarshaller(MarshallingFormat.JSON, this.getClass().getClassLoader());
Expand All @@ -69,6 +70,16 @@ public WebSocketKieServerControllerImpl() {
});
}

@Override
public Integer getPriority() {
return 100;
}

@Override
public boolean supports(String url) {
return url != null && url.startsWith("ws");
}

@Override
public KieServerSetup connect(KieServerInfo serverInfo) {
KieServerState currentState = context.getStateRepository().load(KieServerEnvironment.getServerId());
Expand All @@ -93,6 +104,7 @@ public KieServerSetup connect(KieServerInfo serverInfo) {
"kieserver"))
.password(KeyStoreHelperUtil.loadControllerPassword(config))
.token(config.getConfigItemValue(KieServerConstants.CFG_KIE_CONTROLLER_TOKEN))
.userProperties(getUserProperties())
.build());
CountDownLatch waitLatch = new CountDownLatch(1);

Expand Down Expand Up @@ -170,15 +182,15 @@ protected String serialize(Object object) {
@Override
public void setRegistry(KieServerRegistry registry) {
this.context = registry;

this.restController = new DefaultRestControllerImpl(this.context);
this.restController.setRegistry(registry);
}

@Override
public KieServerRegistry getRegistry() {
return this.context;
}



protected Map<String, Object> getUserProperties() {
return new HashMap<>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public void init(final WebSocketClientConfiguration clientConfig) {
.encoders(clientConfig.getEncoders())
.decoders(clientConfig.getDecoders())
.build();
this.config.getUserProperties().putAll(clientConfig.getUserProperties());
this.endpoint = URI.create(clientConfig.getControllerUrl());
session = container.connectToServer(this, this.config, this.endpoint);
LOGGER.info("New Web Socket Session with id: {}, started", session.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import javax.websocket.Decoder;
import javax.websocket.Encoder;

Expand All @@ -43,6 +44,8 @@ static Builder builder() {

List<Class<? extends Decoder>> getDecoders();

Map<String, Object> getUserProperties();

class Builder {

private WebSocketClientConfigurationImpl config = new WebSocketClientConfigurationImpl();
Expand Down Expand Up @@ -87,6 +90,11 @@ public Builder decoders(final Class<? extends Decoder>... decoders) {
return this;
}

public Builder userProperties(final Map<String, Object> userProperties) {
config.setUserProperties(userProperties);
return this;
}

public WebSocketClientConfiguration build() {
return config;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

package org.kie.server.controller.websocket.common.config;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.websocket.Decoder;
import javax.websocket.Encoder;

Expand All @@ -38,6 +40,8 @@ public class WebSocketClientConfigurationImpl implements WebSocketClientConfigur

private List<Class<? extends Decoder>> decoders;

private Map<String, Object> userProperties;

protected WebSocketClientConfigurationImpl() {
}

Expand Down Expand Up @@ -113,6 +117,15 @@ public void setDecoders(List<Class<? extends Decoder>> decoders) {
this.decoders = decoders;
}

@Override
public Map<String, Object> getUserProperties() {
return userProperties;
}

public void setUserProperties(Map<String, Object> userProperties) {
this.userProperties = userProperties;
}

@Override
public String toString() {
return "WebSocketClientConfigurationImpl{" +
Expand All @@ -124,6 +137,7 @@ public String toString() {
", asyncSendTimeout=" + asyncSendTimeout +
", encoders=" + encoders +
", decoders=" + decoders +
", userProperties=" + userProperties +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,18 @@ public interface KieServerRegistryAware {
void setRegistry(KieServerRegistry registry);

KieServerRegistry getRegistry();


/**
* Determine the priority of the loaded KieServerController loaded through the ServiceLoader.
* @return A priority for the KieServerController. 0 being the highest, Integer.MAX_VALUE being the lowest. If null, then the lowest priority is assumed.
*/
Integer getPriority();

/**
* Determine if a KieServerController supports a specific connection point.
* @param url The URL to check.
* @return true if the KieServerController supports the endpoint, false otherwise.
*/
boolean supports(String url);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,7 @@

package org.kie.server.services.impl;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -1138,15 +1126,48 @@ protected Map<String, Object> getReleaseUpdateParameters(KieModuleMetaData metaD
return parameters;
}

/**
* Get the first KieServerController which is supported for the all controller URLs. This assumes that all
* controllers are of the same type. For example, it is not possible to have one REST and one WebSocket controller.
*
* @return A KieServerController or null if one is not available.
*/
protected KieServerController getController() {
KieServerController controller = new DefaultRestControllerImpl(context);
KieServerState currentState = context.getStateRepository().load(KieServerEnvironment.getServerId());
Set<String> controllers = currentState.getControllers();

if (controllers.isEmpty()) {
logger.debug("No controllers registered");
return null;
}

KieServerController controller = null;
int controllerPriority = Integer.MAX_VALUE;
try {
Iterator<KieServerController> it = kieControllers.iterator();
if (it != null && it.hasNext()) {
controller = it.next();
for (KieServerController curr: kieControllers) {
// Only want remote capable
if (!(curr instanceof KieServerRegistryAware)) {
continue;
}

if (controller instanceof KieServerRegistryAware) {
((KieServerRegistryAware) controller).setRegistry(context);
KieServerRegistryAware currRemote = (KieServerRegistryAware)curr;
currRemote.setRegistry(context);

boolean supportsAll = controllers.stream().allMatch(currRemote::supports);
if (!supportsAll) {
logger.debug("KieServerController {} does not support all controllers", curr.getClass().getName());
continue;
}

// See if this is a better fit.
Integer currPriority = currRemote.getPriority();
if (null == currPriority) {
currPriority = Integer.MAX_VALUE;
}

if (currPriority >= 0 && currPriority < controllerPriority) {
controllerPriority = currPriority;
controller = curr;
}
}
} catch (Exception e) {
Expand All @@ -1161,7 +1182,9 @@ protected void notifyStatusToControllers() {
}

protected KieServerController getDefaultController() {
return new DefaultRestControllerImpl(context);
DefaultRestControllerImpl controller = new DefaultRestControllerImpl();
controller.setRegistry(context);
return controller;
}

protected ContainerManager getContainerManager() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,41 @@
import org.kie.server.services.api.KieControllerNotConnectedException;
import org.kie.server.services.api.KieControllerNotDefinedException;
import org.kie.server.services.api.KieServerRegistry;
import org.kie.server.services.api.KieServerRegistryAware;
import org.kie.server.services.impl.storage.KieServerState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.kie.server.common.KeyStoreHelperUtil.loadControllerPassword;

public class DefaultRestControllerImpl implements KieServerController {
public class DefaultRestControllerImpl implements KieServerController, KieServerRegistryAware {

private static final Logger logger = LoggerFactory.getLogger(DefaultRestControllerImpl.class);

private final KieServerRegistry context;
private KieServerRegistry context;

public DefaultRestControllerImpl(KieServerRegistry context) {
this.context = context;
public DefaultRestControllerImpl() {
}

@Override
public void setRegistry(KieServerRegistry registry) {
this.context = registry;
}

@Override
public KieServerRegistry getRegistry() {
return context;
}

@Override
public Integer getPriority() {
return 100;
}

@Override
public boolean supports(String url) {
return url != null && url.startsWith("http");
}

protected <T> T makeHttpPutRequestAndCreateCustomResponse(String uri, String body, Class<T> resultType, String user, String password, String token) {
logger.debug("About to send PUT request to '{}' with payload '{}' by thread {}", uri, body, Thread.currentThread().getId());
Expand Down Expand Up @@ -323,6 +342,4 @@ public void stopContainer(String containerId) {
}
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ public void stop() {
@Override
public void apply(KieServerRegistry kieServerRegistry, KieServer kieServer) {

DefaultRestControllerImpl controller = new DefaultRestControllerImpl(kieServerRegistry);
DefaultRestControllerImpl controller = new DefaultRestControllerImpl();
controller.setRegistry(kieServerRegistry);

List<String> containerAliases = kieServerRegistry.getContainerAliases();
if (containerAliases.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.kie.server.services.impl.controller.DefaultRestControllerImpl
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,7 @@
import org.kie.server.api.model.Severity;
import org.kie.server.controller.api.KieServerController;
import org.kie.server.controller.api.model.KieServerSetup;
import org.kie.server.services.api.KieContainerInstance;
import org.kie.server.services.api.KieControllerNotConnectedException;
import org.kie.server.services.api.KieServerExtension;
import org.kie.server.services.api.KieServerRegistry;
import org.kie.server.services.api.SupportedTransports;
import org.kie.server.services.api.*;
import org.kie.server.services.impl.controller.DefaultRestControllerImpl;
import org.kie.server.services.impl.storage.KieServerState;
import org.kie.server.services.impl.storage.KieServerStateRepository;
Expand Down Expand Up @@ -710,9 +706,10 @@ public void markAsReady() {

@Override
public KieServerController getController() {
return new DefaultRestControllerImpl(getServerRegistry()) {
KieServerController controller = new DefaultRestControllerImpl() {
@Override
public KieServerSetup connect(KieServerInfo serverInfo) {
setRegistry(getServerRegistry());
try {
if (latch.await(10, TimeUnit.MILLISECONDS)) {
return new KieServerSetup();
Expand All @@ -722,8 +719,9 @@ public KieServerSetup connect(KieServerInfo serverInfo) {
throw new KieControllerNotConnectedException("Unable to connect to any controller");
}
}

};
((KieServerRegistryAware)controller).setRegistry(getServerRegistry());
return controller;
}

};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public void onContainerSpecUpdated(ContainerSpecUpdated containerSpecUpdated) {}
if (TestConfig.isLocalServer()) {
controllerClient = KieServerControllerClientFactory.newWebSocketClient(TestConfig.getControllerWebSocketManagementUrl(),
(String) null,
(String) null,
(String) null,
eventHandler);
} else {
controllerClient = KieServerControllerClientFactory.newWebSocketClient(TestConfig.getControllerWebSocketManagementUrl(),
Expand Down Expand Up @@ -162,7 +162,8 @@ public KieServerState load(String serverId) {

};
registry.registerStateRepository(dummyKieServerStateRepository);
KieServerController controller = new DefaultRestControllerImpl(registry);
DefaultRestControllerImpl controller = new DefaultRestControllerImpl();
controller.setRegistry(registry);
controller.connect(kieServerInfo);
// Check that kie server is registered.
serverUp.await(5, TimeUnit.SECONDS);
Expand Down
Loading