Skip to content

Commit

Permalink
Merge pull request cdapio#15379 from cdapio/CDAP-20862-helper
Browse files Browse the repository at this point in the history
  • Loading branch information
samdgupi authored Oct 31, 2023
2 parents fdff36e + 86c5121 commit 6bb2566
Show file tree
Hide file tree
Showing 13 changed files with 551 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,15 @@
import io.cdap.cdap.internal.capability.CapabilityNotAvailableException;
import io.cdap.cdap.internal.capability.CapabilityReader;
import io.cdap.cdap.internal.profile.AdminEventPublisher;
import io.cdap.cdap.messaging.spi.MessagingService;
import io.cdap.cdap.messaging.context.MultiThreadMessagingContext;
import io.cdap.cdap.messaging.spi.MessagingService;
import io.cdap.cdap.proto.ApplicationDetail;
import io.cdap.cdap.proto.PluginInstanceDetail;
import io.cdap.cdap.proto.ProgramType;
import io.cdap.cdap.proto.app.AppVersion;
import io.cdap.cdap.proto.app.MarkLatestAppsRequest;
import io.cdap.cdap.proto.app.UpdateSourceControlMetaRequest;
import io.cdap.cdap.proto.app.UpdateMultiSourceControlMetaReqeust;
import io.cdap.cdap.proto.app.UpdateSourceControlMetaRequest;
import io.cdap.cdap.proto.artifact.AppRequest;
import io.cdap.cdap.proto.artifact.ArtifactSortOrder;
import io.cdap.cdap.proto.artifact.ChangeDetail;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.app.sourcecontrol;

import com.google.inject.Inject;
import io.cdap.cdap.common.ApplicationNotFoundException;
import io.cdap.cdap.common.BadRequestException;
import io.cdap.cdap.common.NotFoundException;
import io.cdap.cdap.common.app.RunIds;
import io.cdap.cdap.internal.app.services.ApplicationLifecycleService;
import io.cdap.cdap.metadata.LocalApplicationDetailFetcher;
import io.cdap.cdap.proto.ApplicationDetail;
import io.cdap.cdap.proto.app.AppVersion;
import io.cdap.cdap.proto.app.MarkLatestAppsRequest;
import io.cdap.cdap.proto.app.UpdateMultiSourceControlMetaReqeust;
import io.cdap.cdap.proto.artifact.AppRequest;
import io.cdap.cdap.proto.id.ApplicationId;
import io.cdap.cdap.proto.id.ApplicationReference;
import io.cdap.cdap.proto.id.NamespaceId;
import io.cdap.cdap.proto.sourcecontrol.SourceControlMeta;
import io.cdap.cdap.security.spi.authorization.UnauthorizedException;
import io.cdap.cdap.sourcecontrol.ApplicationManager;
import io.cdap.cdap.sourcecontrol.SourceControlException;
import io.cdap.cdap.sourcecontrol.operationrunner.PullAppResponse;
import java.io.IOException;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Local implementation of {@link ApplicationManager} to fetch/update data while running in
* app-fabric.
*/
public class LocalApplicationManager implements ApplicationManager {

private final ApplicationLifecycleService appLifeCycleService;
private final LocalApplicationDetailFetcher appDetailsFetcher;

private static final Logger LOG = LoggerFactory.getLogger(LocalApplicationManager.class);


@Inject
LocalApplicationManager(ApplicationLifecycleService appLifeCycleService,
LocalApplicationDetailFetcher fetcher) {
this.appLifeCycleService = appLifeCycleService;
this.appDetailsFetcher = fetcher;
}


@Override
public ApplicationId deployApp(ApplicationReference appRef, PullAppResponse<?> pullDetails)
throws Exception {
String versionId = RunIds.generate().getId();
ApplicationId appId = appRef.app(versionId);

AppRequest<?> appRequest = pullDetails.getAppRequest();
SourceControlMeta sourceControlMeta = new SourceControlMeta(
pullDetails.getApplicationFileHash());

LOG.info("Start to deploy app {} in namespace {} without marking latest",
appId.getApplication(), appId.getParent());

appLifeCycleService.deployApp(appId, appRequest, sourceControlMeta, x -> {
}, true);
return appId;
}

@Override
public void markAppVersionsLatest(NamespaceId namespace, List<AppVersion> apps)
throws SourceControlException, ApplicationNotFoundException, BadRequestException, IOException {
MarkLatestAppsRequest request = new MarkLatestAppsRequest(apps);
LOG.info("Marking latest in namespace {} : {}", namespace, apps);
appLifeCycleService.markAppsAsLatest(namespace, request);
}

@Override
public void updateSourceControlMeta(NamespaceId namespace,
UpdateMultiSourceControlMetaReqeust metas)
throws SourceControlException, BadRequestException, IOException {
appLifeCycleService.updateSourceControlMeta(namespace, metas);
}

@Override
public ApplicationDetail get(ApplicationReference appRef)
throws IOException, NotFoundException, UnauthorizedException {
return appDetailsFetcher.get(appRef);
}

}


Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import io.cdap.cdap.common.BadRequestException;
import io.cdap.cdap.common.NotFoundException;
import io.cdap.cdap.internal.operation.LongRunningOperation;
import io.cdap.cdap.internal.operation.LongRunningOperationContext;
Expand All @@ -30,9 +31,11 @@
import io.cdap.cdap.proto.operation.OperationResource;
import io.cdap.cdap.proto.operation.OperationResourceScopedError;
import io.cdap.cdap.proto.sourcecontrol.RepositoryConfig;
import io.cdap.cdap.sourcecontrol.ApplicationManager;
import io.cdap.cdap.sourcecontrol.SourceControlException;
import io.cdap.cdap.sourcecontrol.operationrunner.InMemorySourceControlOperationRunner;
import io.cdap.cdap.sourcecontrol.operationrunner.MultiPullAppOperationRequest;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
Expand Down Expand Up @@ -88,13 +91,17 @@ public ListenableFuture<Set<OperationResource>> run(LongRunningOperationContext
scmOpRunner.pull(pullReq, response -> {
appTobeDeployed.set(new ApplicationReference(context.getRunId().getNamespace(),
response.getApplicationName()));
ApplicationId deployedVersion = applicationManager.deployApp(
appTobeDeployed.get(), response
);
deployed.add(deployedVersion);
context.updateOperationResources(getResources());
try {
ApplicationId deployedVersion = applicationManager.deployApp(
appTobeDeployed.get(), response
);
deployed.add(deployedVersion);
context.updateOperationResources(getResources());
} catch (Exception e) {
throw new SourceControlException(e);
}
});
} catch (NotFoundException | SourceControlException e) {
} catch (Exception e) {
throw new OperationException(
"Failed to deploy applications",
appTobeDeployed.get() != null ? ImmutableList.of(
Expand All @@ -105,8 +112,11 @@ public ListenableFuture<Set<OperationResource>> run(LongRunningOperationContext

try {
// all deployed versions are marked latest atomically
applicationManager.markAppVersionsLatest(deployed);
} catch (SourceControlException e) {
applicationManager.markAppVersionsLatest(
context.getRunId().getNamespaceId(),
deployed.stream().map(ApplicationId::getAppVersion).collect(Collectors.toList())
);
} catch (BadRequestException | NotFoundException | IOException e) {
throw new OperationException(
"Failed to mark applications latest",
Collections.emptySet()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.app.sourcecontrol;

import com.google.gson.Gson;
import com.google.inject.Inject;
import io.cdap.cdap.common.NotFoundException;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.http.DefaultHttpRequestConfig;
import io.cdap.cdap.common.internal.remote.RemoteClient;
import io.cdap.cdap.common.internal.remote.RemoteClientFactory;
import io.cdap.cdap.metadata.RemoteApplicationDetailFetcher;
import io.cdap.cdap.proto.ApplicationDetail;
import io.cdap.cdap.proto.ApplicationRecord;
import io.cdap.cdap.proto.app.AppVersion;
import io.cdap.cdap.proto.app.MarkLatestAppsRequest;
import io.cdap.cdap.proto.app.UpdateMultiSourceControlMetaReqeust;
import io.cdap.cdap.proto.id.ApplicationId;
import io.cdap.cdap.proto.id.ApplicationReference;
import io.cdap.cdap.proto.id.NamespaceId;
import io.cdap.cdap.security.spi.authorization.UnauthorizedException;
import io.cdap.cdap.sourcecontrol.ApplicationManager;
import io.cdap.cdap.sourcecontrol.SourceControlException;
import io.cdap.cdap.sourcecontrol.operationrunner.PullAppResponse;
import io.cdap.common.http.HttpMethod;
import io.cdap.common.http.HttpRequest;
import io.cdap.common.http.HttpResponse;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.util.List;

/**
* Remote implementation of {@link ApplicationManager} which calls app-fabric apis.
*/
public class RemoteApplicationManager implements ApplicationManager {

private final RemoteClient remoteClient;
private final RemoteApplicationDetailFetcher appDetailsFetcher;

private static final Gson GSON = new Gson();

@Inject
RemoteApplicationManager(RemoteClientFactory remoteClientFactory,
RemoteApplicationDetailFetcher appDetailsFetcher) {
this.remoteClient = remoteClientFactory.createRemoteClient(
Constants.Service.APP_FABRIC_HTTP,
new DefaultHttpRequestConfig(false),
Constants.Gateway.INTERNAL_API_VERSION_3);
this.appDetailsFetcher = appDetailsFetcher;
}

@Override
public ApplicationId deployApp(ApplicationReference appRef, PullAppResponse<?> pullDetails)
throws SourceControlException, NotFoundException, IOException {
String url = String.format("namespaces/%s/apps/%s?skipMarkingLatest=true",
appRef.getNamespace(), appRef.getApplication());
HttpRequest.Builder requestBuilder = remoteClient.requestBuilder(HttpMethod.PUT, url)
.withBody(GSON.toJson(pullDetails.getAppRequest()));
HttpResponse httpResponse;
httpResponse = execute(requestBuilder.build());
ApplicationRecord response = GSON.fromJson(httpResponse.getResponseBodyAsString(),
ApplicationRecord.class);
return appRef.app(response.getAppVersion());
}

@Override
public void markAppVersionsLatest(NamespaceId namespace, List<AppVersion> apps)
throws SourceControlException, NotFoundException, IOException {
MarkLatestAppsRequest markLatestRequest = new MarkLatestAppsRequest(apps);
String url = String.format("namespaces/%s/apps/markLatest", namespace.getEntityName());
HttpRequest.Builder requestBuilder = remoteClient.requestBuilder(HttpMethod.POST, url)
.withBody(GSON.toJson(markLatestRequest));
execute(requestBuilder.build());
}

@Override
public void updateSourceControlMeta(NamespaceId namespace,
UpdateMultiSourceControlMetaReqeust metas)
throws SourceControlException, NotFoundException, IOException {
String url = String.format("namespaces/%s/apps/updateSourceControlMeta",
namespace.getEntityName());
HttpRequest.Builder requestBuilder = remoteClient.requestBuilder(HttpMethod.POST, url)
.withBody(GSON.toJson(metas));
execute(requestBuilder.build());
}

@Override
public ApplicationDetail get(ApplicationReference appRef)
throws IOException, NotFoundException, UnauthorizedException {
return appDetailsFetcher.get(appRef);
}

private HttpResponse execute(HttpRequest request)
throws SourceControlException, NotFoundException, IOException {
HttpResponse httpResponse = remoteClient.execute(request);
if (httpResponse.getResponseCode() == HttpURLConnection.HTTP_NOT_FOUND) {
throw new NotFoundException(httpResponse.getResponseBodyAsString());
}
if (httpResponse.getResponseCode() != HttpURLConnection.HTTP_OK) {
throw new SourceControlException(httpResponse.getResponseBodyAsString());
}
return httpResponse;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ public LocalApplicationDetailFetcher(ApplicationLifecycleService applicationLife
*
* @param appRef the versionless id of the application
* @return {@link ApplicationDetail} for the given application
* @throws IOException if failed to get {@link ApplicationDetail} for the given {@link
* ApplicationId}
* @throws NotFoundException if the given the given application doesn't exist
* @throws IOException if failed to get {@link ApplicationDetail} for the given
* {@link ApplicationId}
* @throws NotFoundException if the given application doesn't exist
*/
@Override
public ApplicationDetail get(ApplicationReference appRef) throws IOException, NotFoundException {
Expand Down
Loading

0 comments on commit 6bb2566

Please sign in to comment.