Skip to content

Commit

Permalink
[concurrency] ensure helm and placeholder extraction commands can be …
Browse files Browse the repository at this point in the history
…ran in concurrent mode by forcing an id in the whole chain
  • Loading branch information
rmannibucau committed Jan 8, 2024
1 parent e16fe42 commit 2b7f11b
Show file tree
Hide file tree
Showing 26 changed files with 372 additions and 151 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public CompletionStage<?> internalApply(final String from, final String manifest
final boolean injectTimestamp, final boolean injectBundleBeeMetadata,
final ArchiveReader.Cache cache) {
return visitor
.findRootAlveoli(from, manifest, alveolus)
.findRootAlveoli(from, manifest, alveolus, null)
.thenApply(alveoli -> alveoli.stream().map(it -> it.exclude(excludedLocations, excludedDescriptors)).collect(toList()))
.thenCompose(alveoli -> useChainInsteadOfAll ?
chain(alveoli.stream()
Expand All @@ -148,6 +148,6 @@ public CompletionStage<?> doApply(final boolean injectTimestamp, final boolean i
(ctx, desc) -> kube.apply(desc.getContent(), desc.getExtension(), labels),
cache,
desc -> conditionAwaiter.await(name(), desc, scheduledExecutorService, awaitTimeout),
"deployed");
"deployed", null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public CompletionStage<?> internalDelete(final String from, final String manifes
}
final int awaitTimeout = awaitValue;
return visitor
.findRootAlveoli(from, manifest, alveolus)
.findRootAlveoli(from, manifest, alveolus, null)
.thenApply(alveoli -> alveoli.stream().map(it -> it.exclude(excludedLocations, excludedDescriptors)).collect(toList()))
.thenCompose(alveoli -> all(
alveoli.stream()
Expand All @@ -177,7 +177,7 @@ public CompletionStage<?> doDelete(final ArchiveReader.Cache cache, final Manife
return completedFuture(null);
}
return conditionAwaiter.await(name(), desc, scheduledExecutorService, awaitTimeout);
}, "deleted")
}, "deleted", null)
.thenApply(done -> { // owner first
Collections.reverse(toDelete);
return toDelete;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ private void doWrite(final Supplier<String> contentProvider) {

protected CompletionStage<Map<Item, ActualState>> doExecute() {
return super
.doExecute(from, manifest, alveolus, descriptor)
.doExecute(from, manifest, alveolus, descriptor, null)
.thenCompose(collected -> {
// 1. load all descriptor to get the resource type
final var resources = collected.getDescriptors().entrySet().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import io.yupiik.bundlebee.core.service.ArchiveReader;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.java.Log;
import org.eclipse.microprofile.config.inject.ConfigProperty;

Expand All @@ -43,8 +42,10 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -153,10 +154,10 @@ public String description() {
"Note that this conversion has some limitation in placeholder syntax for example (ensure no conflict under a key).";
}

private void doExport(final List<Manifest.Alveolus> foundAlveoli) {
private void doExport(final List<Manifest.Alveolus> foundAlveoli, final String id) {
final var root = Path.of(output);

final var placeholders = new ArrayList<>(placeholderObserver.getEvents());
final var placeholders = new ArrayList<>(placeholderObserver.getSpies().get(id).getEvents());
final var placeholderLoadedDescriptions = new Properties();

try {
Expand All @@ -179,9 +180,9 @@ private void doExport(final List<Manifest.Alveolus> foundAlveoli) {
// todo: support charts creating nested charts == alveoli?
// -> technically it would be as easy as doing this logic per alveolus in the visitor instead of globally
final var charts = Files.createDirectories(root.resolve("templates"));
final var descriptors = placeholderObserver.getDescriptors();
final var descriptors = placeholderObserver.getSpies().get(id).getDescriptors();
for (final var entry : descriptors) {
copyDescriptor(entry.getContent(), charts.resolve(entry.getAlveolus() + "." + stripExtension(entry.getDescriptor()) + ".yaml"));
copyDescriptor(entry.getContent(), charts.resolve(entry.getAlveolus() + "." + stripExtension(entry.getDescriptor()) + ".yaml"), id);
}
} catch (final IOException ioe) {
throw new IllegalStateException(ioe);
Expand All @@ -201,9 +202,9 @@ private String stripExtension(final String descriptor) {
return descriptor;
}

private void copyDescriptor(final String value, final Path target) throws IOException {
private void copyDescriptor(final String value, final Path target, final String id) throws IOException {
final var helmContent = new Substitutor(k -> "@{@{@ .Values." + k + " @}@}@")
.replace(value)
.replace(value, id)
.replace("@{@{@", "{{")
.replace("@}@}@", "}}");
Files.writeString(target, helmContent);
Expand Down Expand Up @@ -309,77 +310,90 @@ private String helmIgnore() {

@Override
public CompletionStage<?> execute() {
final var observerActive = placeholderObserver.isActive();
placeholderObserver.setActive(true);
return doExecute(from, manifest, alveolus, archives.newCache())
.whenComplete((ok, ko) -> placeholderObserver.setActive(observerActive));
final var id = UUID.randomUUID().toString();
placeholderObserver.getSpies().put(id, new SpyCollector());
return doExecute(from, manifest, alveolus, archives.newCache(), id)
.whenComplete((ok, ko) -> placeholderObserver.getSpies().remove(id));
}

private String dropSnapshotIfNeeded(final String version) {
return version.endsWith("-SNAPSHOT") ? version.substring(0, version.length() - "-SNAPSHOT".length()) : version;
}

private CompletionStage<?> doExecute(final String from, final String manifest, final String alveolus,
final ArchiveReader.Cache cache) {
final ArchiveReader.Cache cache, final String id) {
final var foundAlveoli = new ArrayList<Manifest.Alveolus>();
return visitor
.findRootAlveoli(from, manifest, alveolus)
.findRootAlveoli(from, manifest, alveolus, id)
.thenApply(alveoli -> alveoli.stream().map(it -> it.exclude(excludedLocations, excludedDescriptors)).collect(toList()))
.thenCompose(alveoli -> {
foundAlveoli.addAll(alveoli.stream().map(AlveolusHandler.ManifestAndAlveolus::getAlveolus).collect(toList()));
return chain(alveoli.stream()
.map(it -> (Supplier<CompletionStage<?>>) () -> doExecute(cache, it))
.map(it -> (Supplier<CompletionStage<?>>) () -> doExecute(cache, it, id))
.iterator(), true);

})
.thenRun(() -> doExport(foundAlveoli));
.thenRun(() -> doExport(foundAlveoli, id));
}

private CompletionStage<?> doExecute(final ArchiveReader.Cache cache,
final AlveolusHandler.ManifestAndAlveolus it) {
final AlveolusHandler.ManifestAndAlveolus it,
final String id) {
final var spyCollector = placeholderObserver.getSpies().get(id);
it.getAlveolus().setChainDependencies(true); // forced to ensure we process everything - including placeholders - synchronously
return visitor.executeOnceOnAlveolus(
"Visiting", it.getManifest(), it.getAlveolus(), null,
(ctx, desc) -> kube.forDescriptor("Visiting", desc.getContent(), desc.getExtension(), CompletableFuture::completedFuture),
cache, desc -> {
placeholderObserver.getIgnoredKeys().clear();
spyCollector.getIgnoredKeys().clear();
return completedFuture(null);
}, "visited");
}, "visited", id);
}

@Getter
@ApplicationScoped
public static class HelmChartSpy {
@Setter
private boolean active;

private final Collection<String> ignoredKeys = new HashSet<>();
private final Collection<OnPlaceholder> events = new ArrayList<>();
private final Collection<OnPrepareDescriptor> descriptors = new ArrayList<>();
@Getter
private Map<String, SpyCollector> spies = new ConcurrentHashMap<>();

public void onDescriptor(@Observes final OnPrepareDescriptor onPrepareDescriptor) {
if (!active) {
if (onPrepareDescriptor.getId() == null) {
return;
}
synchronized (descriptors) {
descriptors.add(onPrepareDescriptor);
ignoredKeys.addAll(onPrepareDescriptor.getPlaceholders().keySet());
final var spy = spies.get(onPrepareDescriptor.getId());
if (spy == null) {
return;
}
synchronized (spy.descriptors) {
spy.descriptors.add(onPrepareDescriptor);
spy.ignoredKeys.addAll(onPrepareDescriptor.getPlaceholders().keySet());
}
}

public void onPlaceholder(@Observes final OnPlaceholder onPlaceholder) {
if (!active) {
if (onPlaceholder.getId() == null) {
return;
}
final var spy = spies.get(onPlaceholder.getId());
if (spy == null) {
return;
}
if (ignoredKeys.contains(onPlaceholder.getName())) {
if (spy.ignoredKeys.contains(onPlaceholder.getName())) {
return;
}
synchronized (events) {
events.add(onPlaceholder);
synchronized (spy.events) {
spy.events.add(onPlaceholder);
}
}
}

@Data
private static class SpyCollector {
private final Collection<String> ignoredKeys = new HashSet<>();
private final Collection<OnPlaceholder> events = new ArrayList<>();
private final Collection<OnPrepareDescriptor> descriptors = new ArrayList<>();
}

@Data
private static class Placeholder {
private final String[] name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public String description() {

@Override
public CompletionStage<?> execute() {
return super.doExecute(from, manifest, alveolus, descriptor)
return super.doExecute(from, manifest, alveolus, descriptor, null)
.thenAccept(collected -> {
log.info("Inspection Report for alveolus=" + alveolus);
log.info("");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ private CompletionStage<List<DecoratedLintError>> visit(final LintErrors result,
.collect(joining(", ", "", " missing.")));
}
return visitor
.findRootAlveoli(from, manifest, alveolus)
.findRootAlveoli(from, manifest, alveolus, null)
.thenCompose(alveoli -> all(
alveoli.stream()
.map(it -> {
Expand All @@ -365,7 +365,7 @@ private CompletionStage<List<DecoratedLintError>> visit(final LintErrors result,
return promise;
});
},
cache, null, "inspected");
cache, null, "inspected", null);
return all(allLints, mergeLists(), true);
})
.collect(toList()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import io.yupiik.bundlebee.core.qualifier.BundleBee;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.java.Log;
import org.eclipse.microprofile.config.inject.ConfigProperty;

Expand All @@ -39,7 +38,9 @@
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Predicate;
Expand Down Expand Up @@ -161,12 +162,13 @@ public CompletionStage<?> execute() {
}
});

final var id = UUID.randomUUID().toString();
final var lock = new ReentrantLock();
final var collector = new HashSet<OnPlaceholder>();
final var oldListener = placeholderSpy.getListener();
placeholderSpy.setListener(p -> {
if (ignoredPlaceholders.stream().anyMatch(it -> Objects.equals(it, p.getName()) ||
(it.endsWith(".*") && p.getName().startsWith(it.substring(0, it.length() - 2))))) {
final Consumer<OnPlaceholder> consumer = p -> {
if (Objects.equals(id, p.getId()) &&
ignoredPlaceholders.stream().anyMatch(it -> Objects.equals(it, p.getName()) ||
(it.endsWith(".*") && p.getName().startsWith(it.substring(0, it.length() - 2))))) {
return;
}
lock.lock();
Expand All @@ -175,8 +177,9 @@ public CompletionStage<?> execute() {
} finally {
lock.unlock();
}
});
return doExecute(from, manifest, alveolus, descriptor)
};
placeholderSpy.getListener().add(consumer);
return doExecute(from, manifest, alveolus, descriptor, id)
.thenAccept(data -> {
final var placeholders = collector.stream()
.collect(groupingBy(OnPlaceholder::getName)).entrySet().stream()
Expand Down Expand Up @@ -244,7 +247,7 @@ public CompletionStage<?> execute() {
}
}
})
.whenComplete((ok, ko) -> placeholderSpy.setListener(oldListener));
.whenComplete((ok, ko) -> placeholderSpy.getListener().remove(consumer));
}

protected String formatDoc(final List<Placeholder> placeholders, final Properties descriptions) {
Expand Down Expand Up @@ -338,13 +341,14 @@ protected static class Placeholder {

@ApplicationScoped
public static class PlaceholderSpy {
@Setter
@Getter
private Consumer<OnPlaceholder> listener;
private List<Consumer<OnPlaceholder>> listener = new CopyOnWriteArrayList<>();

public void onPlaceholder(@Observes final OnPlaceholder placeholder) {
if (listener != null) {
listener.accept(placeholder);
for (final var it : listener) {
it.accept(placeholder);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public CompletionStage<?> doExecute(final String from, final String manifest, fi
}
};
return visitor
.findRootAlveoli(from, manifest, alveolus)
.findRootAlveoli(from, manifest, alveolus, null)
.thenApply(alveoli -> alveoli.stream().map(it -> it.exclude(excludedLocations, excludedDescriptors)).collect(toList()))
.thenCompose(alveoli -> useChainInsteadOfAll ?
chain(alveoli.stream()
Expand Down Expand Up @@ -214,7 +214,7 @@ public CompletionStage<?> doExecute(final boolean injectTimestamp, final boolean
return completedFuture(processed);
}),
cache,
desc -> completedFuture(null), "processed");
desc -> completedFuture(null), "processed", null);
}

private String format(final JsonObject content,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public String description() {
public CompletionStage<?> execute() {
final var cache = archives.newCache();
return alveolusHandler
.findRootAlveoli(from, manifest, alveolus)
.findRootAlveoli(from, manifest, alveolus, null)
.thenCompose(alveolus -> all(
alveolus.stream()
.map(it -> findPreviousVersion(it, cache)
Expand Down Expand Up @@ -201,7 +201,7 @@ private CompletionStage<AlveolusHandler.ManifestAndAlveolus> findPreviousVersion
}
return alveolusHandler
// todo: can be found from alveolus.location too
.findRootAlveoli(previousFrom, previousManifest, previousAlveolus)
.findRootAlveoli(previousFrom, previousManifest, previousAlveolus, null)
.thenApply(list -> {
if (list.size() != 1) {
throw new IllegalArgumentException("Ambiguous previous version, found: " + list.stream()
Expand All @@ -226,7 +226,7 @@ private CompletionStage<AlveolusHandler.ManifestAndAlveolus> guessPreviousVersio
.thenCompose(versions -> {
final String previousVersion = matchPreviousVersion(alveolus, versions);
segments[2] = previousVersion;
return alveolusHandler.findRootAlveoli(findPreviousFrom(segments), manifest, String.join(":", segments));
return alveolusHandler.findRootAlveoli(findPreviousFrom(segments), manifest, String.join(":", segments), null);
})
.thenApply(alveoli -> {
if (alveoli.size() != 1) {
Expand All @@ -252,7 +252,7 @@ private CompletionStage<AlveolusHandler.ManifestAndAlveolus> tryToFindPreviousVe
final var version = potentialVersionsIt.next();
log.finest(() -> "Testing version='" + version + "' for '" + alveolus.getName() + "'");
alveolusHandler
.findRootAlveoli(previousFrom, previousManifest, alveolus.getName())
.findRootAlveoli(previousFrom, previousManifest, alveolus.getName(), null)
.whenComplete((list, error) -> {
if (error == null || list.isEmpty()) {
tryToFindPreviousVersion(alveolus, potentialVersionsIt).whenComplete((r, e) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public CompletionStage<?> doExecute(final String from, final String manifest, fi
final ArchiveReader.Cache cache) {
final var collector = new ArrayList<JsonObject>();
return visitor
.findRootAlveoli(from, manifest, alveolus)
.findRootAlveoli(from, manifest, alveolus, null)
.thenApply(alveoli -> alveoli.stream().map(it -> it.exclude(excludedLocations, excludedDescriptors)).collect(toList()))
.thenCompose(alveoli -> useChainInsteadOfAll ?
chain(alveoli.stream()
Expand All @@ -172,6 +172,6 @@ public CompletionStage<?> doExecute(final ArchiveReader.Cache cache, final Alveo
return completedFuture(json);
}),
cache,
desc -> completedFuture(null), "ran");
desc -> completedFuture(null), "ran", null);
}
}
Loading

0 comments on commit 2b7f11b

Please sign in to comment.