Skip to content

Commit

Permalink
Add xDS control plane (#904)
Browse files Browse the repository at this point in the history
Motivation:
Enhance Central Dogma's capabilities by introducing support for the xDS
protocol.
This enables Envoy or xDS clients to utilize Central Dogma as a control
plane.
https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol

Modifications:
- Establish internal repositories for xDS resources within an internal
project.
- Add gRPC services for xDS protocol from the
`io.envoyproxy.controlplane` library.
- Customized resource versioning to include all resources in the
request.

Result:
- You can now use Central Dogma as a control plane for xDS service
discovery.
  • Loading branch information
minwoox authored Feb 2, 2024
1 parent 8edcf91 commit c7450c5
Show file tree
Hide file tree
Showing 27 changed files with 1,812 additions and 27 deletions.
20 changes: 19 additions & 1 deletion dependencies.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
# If its classes are exposed in Javadoc, update offline links as well.
#
[versions]
armeria = "1.26.4"
armeria = "1.27.0"
assertj = "3.24.2"
awaitility = "4.2.0"
bouncycastle = "1.76"
caffeine = "2.9.3"
checkstyle = "10.3.3"
controlplane = "1.0.39"
curator = "5.5.0"
# Do not upgrade cron-utils until there's another CVE or Armeria's SLF4J and Logback are upgraded.
cron-utils = "9.2.0"
Expand Down Expand Up @@ -60,6 +61,7 @@ sphinx = "2.10.1"
spring-boot2 = "2.7.16"
spring-boot3 = "3.1.4"
spring-test-junit5 = "1.5.0"
testcontainers = "1.19.0"
thrift09 = { strictly = "0.9.3-1" }
zookeeper = "3.7.1"

Expand All @@ -71,6 +73,8 @@ junit5 = { module = "org.junit:junit-bom", version.ref = "junit5" }
[libraries.armeria]
module = "com.linecorp.armeria:armeria"
javadocs = "https://www.javadoc.io/doc/com.linecorp.armeria/armeria-javadoc/1.24.3/"
[libraries.armeria-grpc]
module = "com.linecorp.armeria:armeria-grpc"
[libraries.armeria-junit5]
module = "com.linecorp.armeria:armeria-junit5"
[libraries.armeria-logback]
Expand All @@ -79,6 +83,9 @@ module = "com.linecorp.armeria:armeria-logback"
module = "com.linecorp.armeria:armeria-saml"
[libraries.armeria-thrift09]
module = "com.linecorp.armeria:armeria-thrift0.9"
[libraries.armeria-xds]
module = "com.linecorp.armeria:armeria-xds"


[libraries.assertj]
module = "org.assertj:assertj-core"
Expand Down Expand Up @@ -275,6 +282,13 @@ module = "ch.qos.logback:logback-classic"
version.ref = "logback"
javadocs = "https://www.javadoc.io/doc/ch.qos.logback/logback-classic/1.2.12/"

[libraries.controlplane-cache]
module = "io.envoyproxy.controlplane:cache"
version.ref = "controlplane"
[libraries.controlplane-server]
module = "io.envoyproxy.controlplane:server"
version.ref = "controlplane"

[libraries.micrometer-core]
module = "io.micrometer:micrometer-core"
version.ref = "micrometer"
Expand Down Expand Up @@ -366,6 +380,10 @@ version.ref = "spring-boot3"
module = "com.github.sbrannen:spring-test-junit5"
version.ref = "spring-test-junit5"

[libraries.testcontainers-junit-jupiter]
module = "org.testcontainers:junit-jupiter"
version.ref = "testcontainers"

[libraries.thrift09]
module = "org.apache.thrift:libthrift"
version.ref = "thrift09"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,15 @@ public Map<InetSocketAddress, ServerPort> activePorts() {
}
}

/**
* Returns the {@link ProjectManager} of the server if the server is started.
* {@code null} is returned, otherwise.
*/
@Nullable
public ProjectManager projectManager() {
return pm;
}

/**
* Returns the {@link MirroringService} of the server.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import static com.linecorp.centraldogma.server.command.Command.createRepository;
import static com.linecorp.centraldogma.server.command.Command.push;

import java.util.List;

import com.google.common.collect.ImmutableList;

import com.linecorp.armeria.common.util.Exceptions;
Expand All @@ -45,30 +47,11 @@ public final class ProjectInitializer {
*/
public static void initializeInternalProject(CommandExecutor executor) {
final long creationTimeMillis = System.currentTimeMillis();
try {
executor.execute(createProject(creationTimeMillis, Author.SYSTEM, INTERNAL_PROJECT_DOGMA))
.get();
} catch (Throwable cause) {
cause = Exceptions.peel(cause);
if (!(cause instanceof ProjectExistsException)) {
throw new Error("failed to initialize an internal project", cause);
}
}
initializeInternalProject(executor, creationTimeMillis, INTERNAL_PROJECT_DOGMA);

// These repositories might be created when creating an internal project, but we try to create them
// again here in order to make sure them exist because sometimes their names are changed.
for (final String repo : Project.internalRepos()) {
try {
executor.execute(createRepository(creationTimeMillis, Author.SYSTEM,
INTERNAL_PROJECT_DOGMA, repo))
.get();
} catch (Throwable cause) {
cause = Exceptions.peel(cause);
if (!(cause instanceof RepositoryExistsException)) {
throw new Error(cause);
}
}
}
initializeInternalRepos(executor, creationTimeMillis, Project.internalRepos());

try {
final Change<?> change = Change.ofJsonPatch(MetadataService.TOKEN_JSON,
Expand All @@ -79,9 +62,39 @@ public static void initializeInternalProject(CommandExecutor executor) {
commitSummary, "", Markup.PLAINTEXT, ImmutableList.of(change)))
.get();
} catch (Throwable cause) {
cause = Exceptions.peel(cause);
if (!(cause instanceof ChangeConflictException)) {
throw new Error("failed to initialize the token list file", cause);
final Throwable peeled = Exceptions.peel(cause);
if (!(peeled instanceof ChangeConflictException)) {
throw new Error("failed to initialize the token list file", peeled);
}
}
}

public static void initializeInternalProject(
CommandExecutor executor, long creationTimeMillis, String projectName) {
try {
executor.execute(createProject(creationTimeMillis, Author.SYSTEM, projectName))
.get();
} catch (Throwable cause) {
final Throwable peeled = Exceptions.peel(cause);
if (!(peeled instanceof ProjectExistsException)) {
throw new Error("failed to initialize an internal project: " + projectName, peeled);
}
}
}

public static void initializeInternalRepos(
CommandExecutor executor, long creationTimeMillis, List<String> internalRepos) {
for (final String repo : internalRepos) {
try {
executor.execute(createRepository(creationTimeMillis, Author.SYSTEM,
INTERNAL_PROJECT_DOGMA, repo))
.get();
} catch (Throwable cause) {
final Throwable peeled = Exceptions.peel(cause);
if (!(peeled instanceof RepositoryExistsException)) {
throw new Error("failed to initialize an internal repository: " + INTERNAL_PROJECT_DOGMA +
'/' + repo, peeled);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ public PluginInitContext(CentralDogmaConfig config,
ProjectManager projectManager,
CommandExecutor commandExecutor,
MeterRegistry meterRegistry,
ScheduledExecutorService purgeWorker,
ServerBuilder serverBuilder) {
ScheduledExecutorService purgeWorker, ServerBuilder serverBuilder) {
super(config, projectManager, commandExecutor, meterRegistry, purgeWorker);
this.serverBuilder = requireNonNull(serverBuilder, "serverBuilder");
}
Expand Down
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ includeWithFlags ':server-mirror-git', 'java11', 'publish',
includeWithFlags ':testing:testing-common', 'java', 'publish', 'relocate'
includeWithFlags ':testing:junit', 'java', 'publish', 'relocate'
includeWithFlags ':testing:junit4', 'java', 'publish', 'relocate'
includeWithFlags ':xds', 'java', 'publish', 'relocate'
// Set correct directory names
project(':testing:testing-common').projectDir = file('testing/common')

Expand Down
1 change: 1 addition & 0 deletions testing-internal/src/main/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

<logger name="com.linecorp" level="DEBUG" />
<logger name="io.netty" level="INFO" />
<logger name="org.testcontainers" level="DEBUG" />
<!-- Disable the 'Invalid config event received' log messages. -->
<logger name="org.apache.curator.framework.imps.EnsembleTracker" level="OFF" />

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.linecorp.centraldogma.server.GracefulShutdownTimeout;
import com.linecorp.centraldogma.server.MirroringService;
import com.linecorp.centraldogma.server.TlsConfig;
import com.linecorp.centraldogma.server.storage.project.ProjectManager;

import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
import io.netty.util.NetUtil;
Expand Down Expand Up @@ -176,6 +177,15 @@ public final com.linecorp.centraldogma.server.CentralDogma dogma() {
return dogma;
}

/**
* Returns the {@link ProjectManager} of the server.
*
* @throws IllegalStateException if Central Dogma did not start yet
*/
public ProjectManager projectManager() {
return dogma().projectManager();
}

/**
* Returns the {@link MirroringService} of the server.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.linecorp.centraldogma.client.armeria.legacy.LegacyCentralDogmaBuilder;
import com.linecorp.centraldogma.server.CentralDogmaBuilder;
import com.linecorp.centraldogma.server.MirroringService;
import com.linecorp.centraldogma.server.storage.project.ProjectManager;
import com.linecorp.centraldogma.testing.internal.CentralDogmaRuleDelegate;
import com.linecorp.centraldogma.testing.internal.TemporaryFolder;

Expand Down Expand Up @@ -184,6 +185,15 @@ public final com.linecorp.centraldogma.server.CentralDogma dogma() {
return delegate.dogma();
}

/**
* Returns the {@link ProjectManager} of the server.
*
* @throws IllegalStateException if Central Dogma did not start yet
*/
public ProjectManager projectManager() {
return delegate.projectManager();
}

/**
* Returns the {@link MirroringService} of the server.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.linecorp.centraldogma.client.armeria.legacy.LegacyCentralDogmaBuilder;
import com.linecorp.centraldogma.server.CentralDogmaBuilder;
import com.linecorp.centraldogma.server.MirroringService;
import com.linecorp.centraldogma.server.storage.project.ProjectManager;
import com.linecorp.centraldogma.testing.internal.CentralDogmaRuleDelegate;

/**
Expand Down Expand Up @@ -171,6 +172,15 @@ public final com.linecorp.centraldogma.server.CentralDogma dogma() {
return delegate.dogma();
}

/**
* Returns the {@link ProjectManager} of the server.
*
* @throws IllegalStateException if Central Dogma did not start yet
*/
public ProjectManager projectManager() {
return delegate.projectManager();
}

/**
* Returns the {@link MirroringService} of the server.
*
Expand Down
11 changes: 11 additions & 0 deletions xds/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
dependencies {
implementation project(':server')

implementation libs.armeria.grpc
implementation libs.controlplane.cache
implementation libs.controlplane.server

testImplementation libs.armeria.junit5
testImplementation libs.armeria.xds
testImplementation libs.testcontainers.junit.jupiter
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright 2023 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.centraldogma.xds.internal;

import java.util.Map;

import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;

import io.envoyproxy.controlplane.cache.Resources.ResourceType;
import io.envoyproxy.controlplane.cache.SnapshotResources;
import io.envoyproxy.controlplane.cache.VersionedResource;
import io.envoyproxy.controlplane.cache.v3.Snapshot;
import io.envoyproxy.envoy.config.cluster.v3.Cluster;
import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment;
import io.envoyproxy.envoy.config.listener.v3.Listener;
import io.envoyproxy.envoy.config.route.v3.RouteConfiguration;
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.Secret;

final class CentralDogmaSnapshot extends Snapshot {

private final SnapshotResources<Cluster> clusters;
private final SnapshotResources<ClusterLoadAssignment> endpoints;
private final SnapshotResources<Listener> listeners;
private final SnapshotResources<RouteConfiguration> routes;
private final SnapshotResources<Secret> secrets;

CentralDogmaSnapshot(SnapshotResources<Cluster> clusters,
SnapshotResources<ClusterLoadAssignment> endpoints,
SnapshotResources<Listener> listeners, SnapshotResources<RouteConfiguration> routes,
SnapshotResources<Secret> secrets) {
this.clusters = clusters;
this.endpoints = endpoints;
this.listeners = listeners;
this.routes = routes;
this.secrets = secrets;
}

@Override
public SnapshotResources<Cluster> clusters() {
return clusters;
}

@Override
public SnapshotResources<ClusterLoadAssignment> endpoints() {
return endpoints;
}

@Override
public SnapshotResources<Listener> listeners() {
return listeners;
}

@Override
public SnapshotResources<RouteConfiguration> routes() {
return routes;
}

@Override
public SnapshotResources<Secret> secrets() {
return secrets;
}

@Override
public Map<String, VersionedResource<?>> versionedResources(ResourceType resourceType) {
// Have to override this method because of the type inference.
return super.versionedResources(resourceType);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof CentralDogmaSnapshot)) {
return false;
}
final CentralDogmaSnapshot that = (CentralDogmaSnapshot) o;
return Objects.equal(clusters, that.clusters) &&
Objects.equal(endpoints, that.endpoints) &&
Objects.equal(listeners, that.listeners) &&
Objects.equal(routes, that.routes) &&
Objects.equal(secrets, that.secrets);
}

@Override
public int hashCode() {
return Objects.hashCode(clusters, endpoints, listeners, routes, secrets);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("clusters", clusters)
.add("endpoints", endpoints)
.add("listeners", listeners)
.add("routes", routes)
.add("secrets", secrets)
.toString();
}
}
Loading

0 comments on commit c7450c5

Please sign in to comment.