Skip to content

Commit

Permalink
Make sure upgraded configs are commited
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Nied <[email protected]>
  • Loading branch information
peternied committed Mar 7, 2024
1 parent 3a1e1ff commit 0a77c17
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@

import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
import com.fasterxml.jackson.databind.JsonNode;
Expand Down Expand Up @@ -121,9 +122,9 @@ public void securityRolesUgrade() throws Exception {
}
}

private List<String> extractFieldNames(final JsonNode json) {
final var list = new ArrayList<String>();
json.fieldNames().forEachRemaining(list::add);
return list;
private Set<String> extractFieldNames(final JsonNode json) {
final var set = new HashSet<String>();
json.fieldNames().forEachRemaining(set::add);
return set;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,16 @@
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.CheckedSupplier;
import org.opensearch.common.action.ActionFuture;
import org.opensearch.common.util.concurrent.ThreadContext.StoredContext;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.transport.TransportAddress;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentHelper;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.BytesRestResponse;
Expand Down Expand Up @@ -336,7 +339,7 @@ final void saveOrUpdateConfiguration(
final SecurityDynamicConfiguration<?> configuration,
final OnSucessActionListener<IndexResponse> onSucessActionListener
) {
saveAndUpdateConfigs(securityApiDependencies.securityIndexName(), client, getConfigType(), configuration, onSucessActionListener);
saveAndUpdateConfigsAsync(securityApiDependencies, client, getConfigType(), configuration, onSucessActionListener);
}

protected final String nameParam(final RestRequest request) {
Expand Down Expand Up @@ -485,30 +488,45 @@ public final void onFailure(Exception e) {

}

public static void saveAndUpdateConfigs(
final String indexName,
public static ActionFuture<IndexResponse> saveAndUpdateConfigs(
final SecurityApiDependencies dependencies,
final Client client,
final CType cType,
final SecurityDynamicConfiguration<?> configuration
) {
final var request = createIndexRequestForConfig(dependencies, cType, configuration);
return client.index(request);
}

public static void saveAndUpdateConfigsAsync(
final SecurityApiDependencies dependencies,
final Client client,
final CType cType,
final SecurityDynamicConfiguration<?> configuration,
final ActionListener<IndexResponse> actionListener
) {
final IndexRequest ir = new IndexRequest(indexName);
final String id = cType.toLCString();
final var ir = createIndexRequestForConfig(dependencies, cType, configuration);
client.index(ir, new ConfigUpdatingActionListener<>(new String[] { cType.toLCString() }, client, actionListener));
}

private static IndexRequest createIndexRequestForConfig(
final SecurityApiDependencies dependencies,
final CType cType,
final SecurityDynamicConfiguration<?> configuration
) {
configuration.removeStatic();

final BytesReference content;
try {
client.index(
ir.id(id)
.setRefreshPolicy(RefreshPolicy.IMMEDIATE)
.setIfSeqNo(configuration.getSeqNo())
.setIfPrimaryTerm(configuration.getPrimaryTerm())
.source(id, XContentHelper.toXContent(configuration, XContentType.JSON, false)),
new ConfigUpdatingActionListener<>(new String[] { id }, client, actionListener)
);
} catch (IOException e) {
content = XContentHelper.toXContent(configuration, XContentType.JSON, ToXContent.EMPTY_PARAMS, false);
} catch (final IOException e) {
throw ExceptionsHelper.convertToOpenSearchException(e);
}

return new IndexRequest(dependencies.securityIndexName()).id(cType.toLCString())
.setRefreshPolicy(RefreshPolicy.IMMEDIATE)
.setIfSeqNo(configuration.getSeqNo())
.setIfPrimaryTerm(configuration.getPrimaryTerm())
.source(cType.toLCString(), content);
}

protected static class ConfigUpdatingActionListener<Response> implements ActionListener<Response> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,6 @@

package org.opensearch.security.dlic.rest.api;

import static org.opensearch.security.dlic.rest.api.Responses.badRequestMessage;
import static org.opensearch.security.dlic.rest.api.Responses.response;
import static org.opensearch.security.dlic.rest.support.Utils.addRoutesPrefix;
import static org.opensearch.security.dlic.rest.support.Utils.withIOException;

import java.io.IOException;
import java.nio.file.Path;
import java.security.AccessController;
Expand All @@ -30,17 +25,21 @@
import java.util.Set;
import java.util.stream.Collectors;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.index.IndexResponse;

import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.action.ActionFuture;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestChannel;
Expand All @@ -57,14 +56,13 @@
import org.opensearch.security.support.ConfigHelper;
import org.opensearch.threadpool.ThreadPool;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.flipkart.zjsonpatch.DiffFlags;
import com.flipkart.zjsonpatch.JsonDiff;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;

import static org.opensearch.security.dlic.rest.api.Responses.badRequestMessage;
import static org.opensearch.security.dlic.rest.api.Responses.response;
import static org.opensearch.security.dlic.rest.support.Utils.addRoutesPrefix;
import static org.opensearch.security.dlic.rest.support.Utils.withIOException;

public class ConfigUpgradeApiAction extends AbstractApiAction {

Expand Down Expand Up @@ -113,7 +111,7 @@ void handleCanUpgrade(final RestChannel channel, final RestRequest request, fina
void handleUpgrade(final RestChannel channel, final RestRequest request, final Client client) throws IOException {
withIOException(() -> getAndValidateConfigurationsToUpgrade(request).map(this::configurationDifferences)).map(
this::verifyHasDifferences
).map(diffs -> applyDifferences(request, diffs)).valid(updatedConfigs -> {
).map(diffs -> applyDifferences(request, client, diffs)).valid(updatedConfigs -> {
final var response = JsonNodeFactory.instance.objectNode();
response.put("status", "OK");

Expand All @@ -134,29 +132,21 @@ ValidationResult<List<ConfigItemChanges>> applyDifferences(
for (final Tuple<CType, JsonNode> difference : differencesToUpdate) {
updatedResources.add(
loadConfiguration(difference.v1(), false, false).map(
configuration -> patchEntities(request, difference.v2(), SecurityConfiguration.of(null, configuration))
.map(patchResults -> {
saveAndUpdateConfigs(securityApiDependencies.securityIndexName(), client, difference.v1(), configuration, new ActionListener<>(){

@Override
public void onResponse(IndexResponse response) {
// TODO: oh my - how did we get here
}

@Override
public void onFailure(Exception e) {
}
});

})
.map(
configuration -> patchEntities(request, difference.v2(), SecurityConfiguration.of(null, configuration)).map(
patchResults -> {


final var itemsGroupedByOperation = new ConfigItemChanges(difference.v1(), difference.v2());
return ValidationResult.success(itemsGroupedByOperation);
final var response = saveAndUpdateConfigs(
securityApiDependencies,
client,
difference.v1(),
patchResults.configuration()
);
return ValidationResult.success(response.actionGet());
}
)
).map(indexResponse -> {

final var itemsGroupedByOperation = new ConfigItemChanges(difference.v1(), difference.v2());
return ValidationResult.success(itemsGroupedByOperation);
})
)
);
}
Expand Down Expand Up @@ -351,7 +341,7 @@ private static Map<String, List<String>> classifyChanges(final JsonNode differen
items.put(item, operation);
}
});

final var itemsGroupedByOperation = items.entrySet()
.stream()
.collect(Collectors.groupingBy(Map.Entry::getValue, Collectors.mapping(Map.Entry::getKey, Collectors.toList())));
Expand Down

0 comments on commit 0a77c17

Please sign in to comment.