Skip to content

Commit

Permalink
Add comment
Browse files Browse the repository at this point in the history
  • Loading branch information
hexiaofeng committed May 28, 2024
1 parent 9f1b538 commit 5e3563d
Show file tree
Hide file tree
Showing 2 changed files with 189 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@
import java.util.function.BiConsumer;
import java.util.function.Predicate;

/**
* LiveServiceSyncer is responsible for synchronizing live service policies from a multilive environment.
*/
@Injectable
@Extension("LiveServiceSyncer")
@ConditionalOnProperty(name = SyncConfig.SYNC_LIVE_SPACE_TYPE, value = "multilive")
Expand Down Expand Up @@ -149,6 +152,11 @@ protected CompletableFuture<Void> doStop() {
return CompletableFuture.completedFuture(null);
}

/**
* Synchronizes and updates the service based on the given subscriber.
*
* @param subscriber the policy subscriber to synchronize and update.
*/
private void syncAndUpdate(PolicySubscriber subscriber) {
ServiceSyncMeta meta = versions.computeIfAbsent(subscriber.getName(), ServiceSyncMeta::new);
if (!meta.status.compareAndSet(false, true)) {
Expand Down Expand Up @@ -179,6 +187,13 @@ private void syncAndUpdate(PolicySubscriber subscriber) {
timer.delay(getName() + "-" + subscriber.getName(), delay, () -> addTask(subscriber));
}

/**
* Handles successful synchronization with status OK.
*
* @param subscriber the policy subscriber.
* @param service the service data.
* @param meta the service synchronization metadata.
*/
private void onOk(PolicySubscriber subscriber, Service service, ServiceSyncMeta meta) {
if (update(subscriber.getName(), service)) {
meta.version = service.getVersion();
Expand All @@ -187,13 +202,25 @@ private void onOk(PolicySubscriber subscriber, Service service, ServiceSyncMeta
}
}

/**
* Handles service is NOT_MODIFIED.
*
* @param subscriber the policy subscriber.
* @param meta the service synchronization metadata.
*/
private void onNotModified(PolicySubscriber subscriber, ServiceSyncMeta meta) {
subscriber.complete();
if (meta.shouldPrint()) {
logger.info(meta.getSuccessMessage(HttpStatus.NOT_MODIFIED));
}
}

/**
* Handles service is NOT_FOUND.
*
* @param subscriber the policy subscriber.
* @param meta the service synchronization metadata.
*/
private void onNotFound(PolicySubscriber subscriber, ServiceSyncMeta meta) {
if (meta.version > 0) {
if (update(subscriber.getName(), null)) {
Expand All @@ -209,6 +236,13 @@ private void onNotFound(PolicySubscriber subscriber, ServiceSyncMeta meta) {
}
}

/**
* Attempts to update the service with retries.
*
* @param name the name of the service.
* @param service the service data.
* @return true if the update was successful, false otherwise.
*/
private boolean update(String name, Service service) {
for (int i = 0; i < UPDATE_MAX_RETRY; i++) {
if (updateOnce(name, service)) {
Expand All @@ -218,10 +252,25 @@ private boolean update(String name, Service service) {
return false;
}

/**
* Attempts to update the service once.
*
* @param name the name of the service.
* @param service the service data.
* @return true if the update was successful, false otherwise.
*/
private boolean updateOnce(String name, Service service) {
return policySupervisor.update(expect -> newPolicy(name, service, expect));
}

/**
* Creates a new policy based on the given service.
*
* @param name the name of the service.
* @param service the service data.
* @param oldPolicy the old policy.
* @return the new policy.
*/
private GovernancePolicy newPolicy(String name, Service service, GovernancePolicy oldPolicy) {
GovernancePolicy result = oldPolicy == null ? new GovernancePolicy() : oldPolicy.copy();
BiConsumer<ServicePolicy, ServicePolicy> consumer = (o, n) -> o.setLivePolicy(n == null ? null : n.getLivePolicy());
Expand All @@ -230,6 +279,15 @@ private GovernancePolicy newPolicy(String name, Service service, GovernancePolic
return result;
}

/**
* Retrieves the service data from the remote server.
*
* @param name the name of the service.
* @param meta the service synchronization metadata.
* @param config the synchronization configuration.
* @return the response containing the service data.
* @throws IOException if an I/O error occurs.
*/
private Response<Service> getService(String name, ServiceSyncMeta meta, SyncConfig config) throws IOException {
Map<String, Object> context = new HashMap<>(2);
context.put(SERVICE_NAME, name);
Expand All @@ -254,26 +312,53 @@ private Response<Service> getService(String name, ServiceSyncMeta meta, SyncConf
throw new SyncFailedException(meta.getErrorMessage(httpResponse));
}

/**
* Configures the HTTP connection with the specified synchronization configuration.
*
* @param config the synchronization configuration.
* @param conn the HTTP connection to be configured.
*/
private void configure(SyncConfig config, HttpURLConnection conn) {
config.header(conn::setRequestProperty);
conn.setRequestProperty("Accept", "application/json");
conn.setConnectTimeout((int) config.getTimeout());
}

/**
* Handles events for policy subscribers.
*
* @param events the list of events containing policy subscribers.
*/
private void onEvent(List<Event<PolicySubscriber>> events) {
events.forEach(e -> addTask(e.getData(), t -> !versions.containsKey(t.getName())));
}

/**
* Adds a list of policy subscriber tasks to the queue.
*
* @param tasks the list of policy subscriber tasks to be added.
*/
private void addTasks(List<PolicySubscriber> tasks) {
if (tasks != null) {
tasks.forEach(task -> addTask(task, t -> !versions.containsKey(t.getName())));
}
}

/**
* Adds a single policy subscriber task to the queue.
*
* @param task the policy subscriber task to be added.
*/
private void addTask(PolicySubscriber task) {
addTask(task, null);
}

/**
* Adds a single policy subscriber task to the queue with an optional predicate.
*
* @param task the policy subscriber task to be added.
* @param predicate an optional predicate to test the task before adding it to the queue.
*/
private void addTask(PolicySubscriber task, Predicate<PolicySubscriber> predicate) {
if (task != null
&& task.getType() == PolicyType.SERVICE_POLICY
Expand All @@ -284,6 +369,9 @@ && isStarted()
}
}

/**
* Metadata for synchronizing services.
*/
private static class ServiceSyncMeta {

protected final String name;
Expand All @@ -298,16 +386,33 @@ private static class ServiceSyncMeta {
this.name = name;
}

/**
* Determines whether a log message should be printed based on the counter.
*
* @return true if a log message should be printed, false otherwise.
*/
public boolean shouldPrint() {
return counter.get() % INTERVALS == 1;
}

/**
* Generates a success message for the synchronization.
*
* @param status the HTTP status of the synchronization.
* @return the success message.
*/
public String getSuccessMessage(HttpStatus status) {
return "Success synchronizing service policy from multilive. service=" + name
+ ", code=" + status.value()
+ ", counter=" + counter.get();
}

/**
* Generates an error message for the synchronization.
*
* @param reply the HTTP state of the synchronization.
* @return the error message.
*/
public String getErrorMessage(HttpState reply) {
return "Failed to synchronize service policy from multilive. service=" + name
+ ", code=" + reply.getCode()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@
import java.util.List;
import java.util.Map;

/**
* LiveSpaceSyncer is responsible for synchronizing live spaces from a multilive environment.
*/
@Injectable
@Extension("LiveSpaceSyncer")
@ConditionalOnProperty(name = SyncConfig.SYNC_LIVE_SPACE_TYPE, value = "multilive")
Expand Down Expand Up @@ -126,6 +129,16 @@ protected SyncResult<List<LiveSpace>, Map<String, Long>> doSynchronize(SyncConfi
}
}

/**
* Synchronizes a specific live space.
*
* @param liveSpaceId the ID of the live space.
* @param version the version of the live space.
* @param config the synchronization configuration.
* @param versions the map of versions.
* @return the synchronized live space.
* @throws IOException if an I/O error occurs.
*/
private LiveSpace syncSpace(String liveSpaceId, long version, SyncConfig config, Map<String, Long> versions) throws IOException {
Response<LiveSpace> response = getSpace(liveSpaceId, version, (LiveSyncConfig) config);
HttpStatus status = response.getStatus();
Expand All @@ -143,6 +156,15 @@ private LiveSpace syncSpace(String liveSpaceId, long version, SyncConfig config,
}
}

/**
* Synchronizes a specific live space and returns the result.
*
* @param liveSpaceId the ID of the live space.
* @param config the synchronization configuration.
* @param last the last synchronization metadata.
* @return the result of the synchronization.
* @throws IOException if an I/O error occurs.
*/
private SyncResult<List<LiveSpace>, Map<String, Long>> syncSpace(String liveSpaceId, SyncConfig config, Map<String, Long> last) throws IOException {
List<LiveSpace> liveSpaces = new ArrayList<>();
Long version = last == null ? null : last.get(liveSpaceId);
Expand All @@ -159,6 +181,14 @@ private SyncResult<List<LiveSpace>, Map<String, Long>> syncSpace(String liveSpac
}
}

/**
* Synchronizes all live spaces and returns the result.
*
* @param config the synchronization configuration.
* @param last the last synchronization metadata.
* @return the result of the synchronization.
* @throws IOException if an I/O error occurs.
*/
private SyncResult<List<LiveSpace>, Map<String, Long>> syncSpaces(SyncConfig config, Map<String, Long> last) throws IOException {
List<LiveSpace> liveSpaces = new ArrayList<>();
Map<String, Long> spaces = getSpaces((LiveSyncConfig) config);
Expand All @@ -176,6 +206,14 @@ private SyncResult<List<LiveSpace>, Map<String, Long>> syncSpaces(SyncConfig con
return spaces.equals(last) ? null : new SyncResult<>(liveSpaces, spaces);
}

/**
* Creates a new policy based on the given live spaces and metadata.
*
* @param liveSpaces the list of live spaces.
* @param meta the metadata map.
* @param old the old policy.
* @return the new policy.
*/
private GovernancePolicy newPolicy(List<LiveSpace> liveSpaces, Map<String, Long> meta, GovernancePolicy old) {
List<LiveSpace> oldSpaces = old == null ? null : old.getLiveSpaces();
if (oldSpaces != null) {
Expand All @@ -192,6 +230,13 @@ private GovernancePolicy newPolicy(List<LiveSpace> liveSpaces, Map<String, Long>
return result;
}

/**
* Retrieves the map of live spaces and their versions from the remote server.
*
* @param config the synchronization configuration.
* @return the map of live spaces and their versions.
* @throws IOException if an I/O error occurs.
*/
private Map<String, Long> getSpaces(LiveSyncConfig config) throws IOException {
String uri = config.getSpacesUrl();
HttpResponse<Response<List<Workspace>>> httpResponse = HttpUtils.get(uri,
Expand All @@ -216,6 +261,15 @@ private Map<String, Long> getSpaces(LiveSyncConfig config) throws IOException {
throw new SyncFailedException(getErrorMessage(httpResponse));
}

/**
* Retrieves the live space information from the remote server.
*
* @param workspaceId the ID of the workspace to retrieve.
* @param version the version of the workspace to retrieve.
* @param config the synchronization configuration.
* @return the response containing the live space information.
* @throws IOException if an I/O error occurs during the HTTP request.
*/
private Response<LiveSpace> getSpace(String workspaceId, Long version, LiveSyncConfig config) throws IOException {
Map<String, Object> context = new HashMap<>(2);
context.put(SPACE_ID, workspaceId);
Expand All @@ -240,29 +294,59 @@ private Response<LiveSpace> getSpace(String workspaceId, Long version, LiveSyncC
throw new SyncFailedException(getErrorMessage(httpResponse, workspaceId));
}

/**
* Configures the HTTP connection with the necessary headers and timeout settings.
*
* @param config the synchronization configuration.
* @param conn the HTTP connection to configure.
*/
private void configure(SyncConfig config, HttpURLConnection conn) {
config.header(conn::setRequestProperty);
conn.setRequestProperty("Accept", "application/json");
conn.setConnectTimeout((int) config.getTimeout());
}

/**
* Constructs a success message for logging purposes.
*
* @return the success message.
*/
private String getSuccessMessage() {
return "Success synchronizing live space policy from multilive. counter=" + counter.get();
}

/**
* Constructs an error message for logging purposes based on the HTTP state.
*
* @param reply the HTTP state containing the error information.
* @return the error message.
*/
private String getErrorMessage(HttpState reply) {
return "Failed to synchronize live space from multilive. code=" + reply.getCode()
+ ", message=" + reply.getMessage()
+ ", counter=" + counter.get();
}

/**
* Constructs an error message for logging purposes based on the HTTP state and workspace ID.
*
* @param reply the HTTP state containing the error information.
* @param workspaceId the ID of the workspace that failed to synchronize.
* @return the error message.
*/
private String getErrorMessage(HttpState reply, String workspaceId) {
return "Failed to synchronize live space from multilive. space=" + workspaceId
+ ", code=" + reply.getCode()
+ ", message=" + reply.getMessage()
+ ", counter=" + counter.get();
}

/**
* Constructs an error message for logging purposes based on an exception.
*
* @param throwable the exception that caused the synchronization failure.
* @return the error message.
*/
private String getErrorMessage(Throwable throwable) {
return "Failed to synchronize live space from multilive. counter=" + counter.get() + ", caused by " + throwable.getMessage();
}
Expand Down

0 comments on commit 5e3563d

Please sign in to comment.