From db15923466d07484190e8426442837ceb4cbb553 Mon Sep 17 00:00:00 2001 From: Amit Sinha Date: Thu, 3 Jun 2021 15:37:55 +0100 Subject: [PATCH] Collex to case pub sub (#253) * trigger GitHub actions * Initial commit * fix test case * changes for security vulnerability * auto patch increment * Adding deployment * updating chart version * updating wrong name on deployment.yaml * Adding missing google key mount * Adding missing variables Co-authored-by: Amit Sinha Co-authored-by: ras-rm-pr-bot --- _infra/helm/collection-exercise/Chart.yaml | 4 +- .../templates/deployment.yaml | 18 ++- _infra/helm/collection-exercise/values.yaml | 6 +- docker-compose.yml | 30 ++--- pom.xml | 22 +++- .../collection/exercise/config/AppConfig.java | 1 + .../collection/exercise/config/GCP.java | 10 ++ .../distribution/SampleUnitDistributor.java | 6 +- .../exercise/domain/ExerciseSampleUnit.java | 8 +- .../lib/common/rest/RestUtilityConfig.java | 6 +- .../collection/exercise/message/PubSub.java | 43 +++++++ .../exercise/message/SampleUnitPublisher.java | 73 +++++++++--- .../exercise/representation/SampleUnit.java | 16 +++ .../representation/SampleUnitChildrenDTO.java | 15 +++ .../representation/SampleUnitParentDTO.java | 11 ++ .../exercise/utility/PubSubEmulator.java | 108 ++++++++++++++++++ src/main/resources/application-test.yml | 4 + src/main/resources/application.yml | 4 + .../SampleUnitDistributorTest.java | 15 +-- .../CollectionExerciseEndpointIT.java | 64 ++++------- .../exercise/message/TestPubSubMessage.java | 58 ++++++++++ 21 files changed, 422 insertions(+), 100 deletions(-) create mode 100644 src/main/java/uk/gov/ons/ctp/response/collection/exercise/config/GCP.java create mode 100644 src/main/java/uk/gov/ons/ctp/response/collection/exercise/message/PubSub.java create mode 100644 src/main/java/uk/gov/ons/ctp/response/collection/exercise/representation/SampleUnit.java create mode 100644 src/main/java/uk/gov/ons/ctp/response/collection/exercise/representation/SampleUnitChildrenDTO.java create mode 100644 src/main/java/uk/gov/ons/ctp/response/collection/exercise/representation/SampleUnitParentDTO.java create mode 100644 src/main/java/uk/gov/ons/ctp/response/collection/exercise/utility/PubSubEmulator.java create mode 100644 src/test/java/uk/gov/ons/ctp/response/collection/exercise/message/TestPubSubMessage.java diff --git a/_infra/helm/collection-exercise/Chart.yaml b/_infra/helm/collection-exercise/Chart.yaml index f42a5eb84..0ed2b22ec 100644 --- a/_infra/helm/collection-exercise/Chart.yaml +++ b/_infra/helm/collection-exercise/Chart.yaml @@ -14,9 +14,9 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. -version: 1.3.1 +version: 1.4.0 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. -appVersion: 11.1.2 \ No newline at end of file +appVersion: 11.1.3 \ No newline at end of file diff --git a/_infra/helm/collection-exercise/templates/deployment.yaml b/_infra/helm/collection-exercise/templates/deployment.yaml index e39405a69..7593c2e36 100644 --- a/_infra/helm/collection-exercise/templates/deployment.yaml +++ b/_infra/helm/collection-exercise/templates/deployment.yaml @@ -21,8 +21,11 @@ spec: helmVersion: {{ .Chart.Version }} env: {{ .Values.env }} spec: - {{- if .Values.database.sqlProxyEnabled }} volumes: + - name: google-cloud-key + secret: + secretName: google-application-credentials + {{- if .Values.database.sqlProxyEnabled }} - name: cloudsql-instance-credentials secret: secretName: cloudsql-proxy-credentials @@ -30,7 +33,7 @@ spec: items: - key: "credentials.json" path: "credentials.json" - {{- end }} + {{- end }} containers: {{- if .Values.database.sqlProxyEnabled }} - name: cloudsql-proxy @@ -67,6 +70,9 @@ spec: image: "{{ .Values.image.devRepo }}/{{ .Chart.Name }}:{{ .Values.image.tag }}" {{- end}} imagePullPolicy: {{ .Values.image.pullPolicy }} + volumeMounts: + - name: google-cloud-key + mountPath: /var/secrets/google ports: - name: http-server containerPort: {{ .Values.container.port }} @@ -289,5 +295,13 @@ spec: value: "{{ .Values.tomcat.maxIdle }}" - name: SPRING_DATASOURCE_TOMCAT_MIN_IDLE value: "{{ .Values.tomcat.minIdle }}" + - name: GOOGLE_APPLICATION_CREDENTIALS + value: /var/secrets/google/credentials.json + - name: GOOGLE_CLOUD_PROJECT + value: "{{ .Values.gcp.project }}" + - name: GCP_PROJECT + value: "{{ .Values.gcp.project }}" + - name: GCP_CASENOTIFICATIONTOPIC + value: "{{ .Values.gcp.caseNotificationTopic }}" resources: {{- toYaml .Values.resources.application | nindent 12 }} diff --git a/_infra/helm/collection-exercise/values.yaml b/_infra/helm/collection-exercise/values.yaml index 4231cfd28..762dafcae 100644 --- a/_infra/helm/collection-exercise/values.yaml +++ b/_infra/helm/collection-exercise/values.yaml @@ -73,4 +73,8 @@ tomcat: dns: enabled: false - wellKnownPort: 8080 \ No newline at end of file + wellKnownPort: 8080 + +gcp: + project: ras-rm-sandbox + caseNotificationTopic: "case-notification-topic" \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 4ffc73d62..eab4471cc 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -10,6 +10,13 @@ services: image: redis:3.2.9 ports: - "17379:6379" + pubsub-emulator: + container_name: pubsub-emulator-it + image: eu.gcr.io/ons-rasrmbs-management/pubsub-emulator + ports: + - "18681:8681" + environment: + - PUBSUB_PROJECT1=test rabbitmq: container_name: rabbitmq-it image: rabbitmq:3.6.10-management @@ -30,25 +37,4 @@ services: environment: - DATABASE_URL=postgres://postgres:postgres@postgres-it:5432/postgres?sslmode=disable - security_user_name=admin - - security_user_password=secret - action: - container_name: action-it - image: eu.gcr.io/ons-rasrmbs-management/action - ports: - - "38151:8151" - external_links: - - postgres-it - - redis-it - - rabbitmq-it - environment: - - spring_datasource_url=jdbc:postgresql://postgres-it:5432/postgres?sslmode=disable - - spring_datasource_username=postgres - - spring_datasource_password=postgres - - spring_liquibase_url=jdbc:postgresql://postgres-it:5432/postgres?sslmode=disable - - spring_liquibase_user=postgres - - spring_liquibase_password=postgres - - security_user_name=admin - - security_user_password=secret - - data_grid_address=redis-it:6379 - - rabbitmq_host=rabbitmq-it - - rabbitmq_port=5672 \ No newline at end of file + - security_user_password=secret \ No newline at end of file diff --git a/pom.xml b/pom.xml index 3206f4c5d..fa8209522 100644 --- a/pom.xml +++ b/pom.xml @@ -27,6 +27,13 @@ pom import + + com.google.cloud + libraries-bom + 20.2.0 + pom + import + @@ -214,7 +221,7 @@ com.thoughtworks.xstream xstream - 1.4.15 + 1.4.17 @@ -290,6 +297,10 @@ activation 1.1.1 + + com.google.cloud + google-cloud-pubsub + @@ -304,7 +315,14 @@ commons-io commons-io - 2.6 + 2.7 + test + + + + com.github.stefanbirkner + system-rules + 1.19.0 test diff --git a/src/main/java/uk/gov/ons/ctp/response/collection/exercise/config/AppConfig.java b/src/main/java/uk/gov/ons/ctp/response/collection/exercise/config/AppConfig.java index 23d196a36..dfd69dc38 100644 --- a/src/main/java/uk/gov/ons/ctp/response/collection/exercise/config/AppConfig.java +++ b/src/main/java/uk/gov/ons/ctp/response/collection/exercise/config/AppConfig.java @@ -26,4 +26,5 @@ public class AppConfig { private SwaggerSettings swaggerSettings; private Rabbitmq rabbitmq; private Logging logging; + private GCP gcp; } diff --git a/src/main/java/uk/gov/ons/ctp/response/collection/exercise/config/GCP.java b/src/main/java/uk/gov/ons/ctp/response/collection/exercise/config/GCP.java new file mode 100644 index 000000000..e85eb59f5 --- /dev/null +++ b/src/main/java/uk/gov/ons/ctp/response/collection/exercise/config/GCP.java @@ -0,0 +1,10 @@ +package uk.gov.ons.ctp.response.collection.exercise.config; + +import lombok.Data; + +/** Config POJO for GCP params */ +@Data +public class GCP { + String project; + String caseNotificationTopic; +} diff --git a/src/main/java/uk/gov/ons/ctp/response/collection/exercise/distribution/SampleUnitDistributor.java b/src/main/java/uk/gov/ons/ctp/response/collection/exercise/distribution/SampleUnitDistributor.java index dadedd835..554981cd9 100644 --- a/src/main/java/uk/gov/ons/ctp/response/collection/exercise/distribution/SampleUnitDistributor.java +++ b/src/main/java/uk/gov/ons/ctp/response/collection/exercise/distribution/SampleUnitDistributor.java @@ -21,7 +21,6 @@ import uk.gov.ons.ctp.response.collection.exercise.domain.CollectionExercise; import uk.gov.ons.ctp.response.collection.exercise.domain.ExerciseSampleUnit; import uk.gov.ons.ctp.response.collection.exercise.domain.ExerciseSampleUnitGroup; -import uk.gov.ons.ctp.response.collection.exercise.lib.casesvc.message.sampleunitnotification.SampleUnitParent; import uk.gov.ons.ctp.response.collection.exercise.lib.common.distributed.DistributedListManager; import uk.gov.ons.ctp.response.collection.exercise.lib.common.distributed.LockingException; import uk.gov.ons.ctp.response.collection.exercise.lib.common.error.CTPException; @@ -40,6 +39,7 @@ import uk.gov.ons.ctp.response.collection.exercise.representation.SampleUnitGroupDTO; import uk.gov.ons.ctp.response.collection.exercise.representation.SampleUnitGroupDTO.SampleUnitGroupEvent; import uk.gov.ons.ctp.response.collection.exercise.representation.SampleUnitGroupDTO.SampleUnitGroupState; +import uk.gov.ons.ctp.response.collection.exercise.representation.SampleUnitParentDTO; import uk.gov.ons.ctp.response.collection.exercise.service.EventService; /** Class responsible for business logic to distribute SampleUnits. */ @@ -197,7 +197,7 @@ private List retrieveSampleUnitGroups(CollectionExercis private void distributeSampleUnitGroup( CollectionExercise exercise, ExerciseSampleUnitGroup sampleUnitGroup) throws CTPException { ExerciseSampleUnit sampleUnit = sampleUnitRepo.findBySampleUnitGroup(sampleUnitGroup).get(0); - SampleUnitParent sampleUnitParent; + SampleUnitParentDTO sampleUnitParent; boolean activeEnrolment = hasActiveEnrolment(sampleUnit, exercise); sampleUnitParent = sampleUnit.toSampleUnitParent(activeEnrolment, exercise.getId()); @@ -244,7 +244,7 @@ private boolean enrolmentIsEnabledForSurvey(final Enrolment enrolment, String su * @param sampleUnitMessage to publish. */ private void publishSampleUnitToCase( - ExerciseSampleUnitGroup sampleUnitGroup, SampleUnitParent sampleUnitMessage) { + ExerciseSampleUnitGroup sampleUnitGroup, SampleUnitParentDTO sampleUnitMessage) { transactionTemplate.execute( new TransactionCallbackWithoutResult() { diff --git a/src/main/java/uk/gov/ons/ctp/response/collection/exercise/domain/ExerciseSampleUnit.java b/src/main/java/uk/gov/ons/ctp/response/collection/exercise/domain/ExerciseSampleUnit.java index e388ef34e..500260b2b 100644 --- a/src/main/java/uk/gov/ons/ctp/response/collection/exercise/domain/ExerciseSampleUnit.java +++ b/src/main/java/uk/gov/ons/ctp/response/collection/exercise/domain/ExerciseSampleUnit.java @@ -17,9 +17,9 @@ import net.sourceforge.cobertura.CoverageIgnore; import org.hibernate.annotations.GenericGenerator; import org.hibernate.annotations.Parameter; -import uk.gov.ons.ctp.response.collection.exercise.lib.casesvc.message.sampleunitnotification.SampleUnit; -import uk.gov.ons.ctp.response.collection.exercise.lib.casesvc.message.sampleunitnotification.SampleUnitParent; import uk.gov.ons.ctp.response.collection.exercise.lib.sample.representation.SampleUnitDTO; +import uk.gov.ons.ctp.response.collection.exercise.representation.SampleUnit; +import uk.gov.ons.ctp.response.collection.exercise.representation.SampleUnitParentDTO; /** Domain model object for sample units. */ @CoverageIgnore @@ -61,9 +61,9 @@ public class ExerciseSampleUnit { @Column(name = "sampleunittypefk") private SampleUnitDTO.SampleUnitType sampleUnitType; - public SampleUnitParent toSampleUnitParent( + public SampleUnitParentDTO toSampleUnitParent( final boolean activeEnrolment, final UUID collectionExerciseId) { - final SampleUnitParent parent = new SampleUnitParent(); + final SampleUnitParentDTO parent = new SampleUnitParentDTO(); populateSampleUnit(activeEnrolment, parent); parent.setCollectionExerciseId(collectionExerciseId.toString()); diff --git a/src/main/java/uk/gov/ons/ctp/response/collection/exercise/lib/common/rest/RestUtilityConfig.java b/src/main/java/uk/gov/ons/ctp/response/collection/exercise/lib/common/rest/RestUtilityConfig.java index 72cd42bca..04eb84e1b 100644 --- a/src/main/java/uk/gov/ons/ctp/response/collection/exercise/lib/common/rest/RestUtilityConfig.java +++ b/src/main/java/uk/gov/ons/ctp/response/collection/exercise/lib/common/rest/RestUtilityConfig.java @@ -13,9 +13,9 @@ @AllArgsConstructor @NoArgsConstructor public class RestUtilityConfig { - private String scheme = "http"; - private String host = "localhost"; - private String port = "8080"; + @Builder.Default private String scheme = "http"; + @Builder.Default private String host = "localhost"; + @Builder.Default private String port = "8080"; private String username; private String password; } diff --git a/src/main/java/uk/gov/ons/ctp/response/collection/exercise/message/PubSub.java b/src/main/java/uk/gov/ons/ctp/response/collection/exercise/message/PubSub.java new file mode 100644 index 000000000..3bbc3cefc --- /dev/null +++ b/src/main/java/uk/gov/ons/ctp/response/collection/exercise/message/PubSub.java @@ -0,0 +1,43 @@ +package uk.gov.ons.ctp.response.collection.exercise.message; + +import com.godaddy.logging.Logger; +import com.godaddy.logging.LoggerFactory; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.pubsub.v1.TopicName; +import java.io.IOException; +import jodd.util.StringUtil; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import uk.gov.ons.ctp.response.collection.exercise.config.AppConfig; +import uk.gov.ons.ctp.response.collection.exercise.utility.PubSubEmulator; + +@Component +public class PubSub { + private static final Logger log = LoggerFactory.getLogger(PubSub.class); + @Autowired AppConfig appConfig; + + private Publisher publisherSupplier(String project, String topic) throws IOException { + log.info("creating pubsub publish for topic " + topic + " in project " + project); + TopicName topicName = TopicName.of(project, topic); + if (StringUtil.isBlank(System.getenv("PUBSUB_EMULATOR_HOST"))) { + log.info("Returning actual Publisher"); + return Publisher.newBuilder(topicName).build(); + } else { + log.with("PubSub emulator host", System.getenv("PUBSUB_EMULATOR_HOST")) + .info("Returning emulator Publisher"); + log.info("Returning emulator Publisher"); + return new PubSubEmulator().getEmulatorPublisher(topicName); + } + } + + public Publisher sampleUnitPublisher() throws IOException { + return publisherSupplier( + appConfig.getGcp().getProject(), appConfig.getGcp().getCaseNotificationTopic()); + } + + public void shutdown() { + if (StringUtil.isEmpty(System.getenv("PUBSUB_EMULATOR_HOST"))) { + PubSubEmulator.CHANNEL.shutdown(); + } + } +} diff --git a/src/main/java/uk/gov/ons/ctp/response/collection/exercise/message/SampleUnitPublisher.java b/src/main/java/uk/gov/ons/ctp/response/collection/exercise/message/SampleUnitPublisher.java index 458f06d64..d66f6c6f1 100644 --- a/src/main/java/uk/gov/ons/ctp/response/collection/exercise/message/SampleUnitPublisher.java +++ b/src/main/java/uk/gov/ons/ctp/response/collection/exercise/message/SampleUnitPublisher.java @@ -1,34 +1,81 @@ package uk.gov.ons.ctp.response.collection.exercise.message; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import com.godaddy.logging.Logger; import com.godaddy.logging.LoggerFactory; -import net.sourceforge.cobertura.CoverageIgnore; -import org.springframework.amqp.rabbit.core.RabbitTemplate; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.integration.annotation.MessageEndpoint; -import uk.gov.ons.ctp.response.collection.exercise.lib.casesvc.message.sampleunitnotification.SampleUnitParent; +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.api.gax.rpc.ApiException; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; +import java.io.IOException; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import uk.gov.ons.ctp.response.collection.exercise.config.AppConfig; +import uk.gov.ons.ctp.response.collection.exercise.representation.SampleUnitParentDTO; /** Service implementation responsible for publishing a sampleUnit message to the case service. */ -@CoverageIgnore -@MessageEndpoint +@Service public class SampleUnitPublisher { private static final Logger log = LoggerFactory.getLogger(SampleUnitPublisher.class); - private RabbitTemplate rabbitTemplate; + @Autowired AppConfig appConfig; - public SampleUnitPublisher(@Qualifier("caseRabbitTemplate") RabbitTemplate rabbitTemplate) { - this.rabbitTemplate = rabbitTemplate; - } + @Autowired private PubSub pubSub; + + @Autowired private ObjectMapper objectMapper; /** * To publish a SampleUnitGroup * * @param sampleUnit the SampleUnitGroup message to publish. */ - public void sendSampleUnit(SampleUnitParent sampleUnit) { + public void sendSampleUnit(SampleUnitParentDTO sampleUnit) { log.with("sample_unit_ref", sampleUnit.getSampleUnitRef()) .with("sample_unit_type", sampleUnit.getSampleUnitType()) .debug("Entering sendSampleUnit"); - rabbitTemplate.convertAndSend(sampleUnit); + try { + String message = objectMapper.writeValueAsString(sampleUnit); + ByteString data = ByteString.copyFromUtf8(message); + PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build(); + Publisher publisher = pubSub.sampleUnitPublisher(); + try { + log.with("publisher", publisher).info("Publishing message to PubSub"); + ApiFuture messageIdFuture = publisher.publish(pubsubMessage); + ApiFutures.addCallback( + messageIdFuture, + new ApiFutureCallback<>() { + @Override + public void onFailure(Throwable throwable) { + if (throwable instanceof ApiException) { + ApiException apiException = ((ApiException) throwable); + log.with("error", apiException.getStatusCode().getCode()) + .error("SampleUnit publish sent failure to PubSub."); + } + log.with("message", message).error("Error Publishing PubSub message"); + } + + @Override + public void onSuccess(String messageId) { + // Once published, returns server-assigned message ids (unique within the topic) + log.with("messageId", messageId).info("SampleUnit publish sent successfully"); + } + }, + MoreExecutors.directExecutor()); + } finally { + publisher.shutdown(); + pubSub.shutdown(); + } + } catch (JsonProcessingException e) { + log.with("sampleUnit", sampleUnit).error("Error while sampleUnit can not be parsed."); + throw new RuntimeException(e); + } catch (IOException e) { + log.error("PubSub Error while processing sample unit distribution", e); + throw new RuntimeException(e); + } } } diff --git a/src/main/java/uk/gov/ons/ctp/response/collection/exercise/representation/SampleUnit.java b/src/main/java/uk/gov/ons/ctp/response/collection/exercise/representation/SampleUnit.java new file mode 100644 index 000000000..0f448d978 --- /dev/null +++ b/src/main/java/uk/gov/ons/ctp/response/collection/exercise/representation/SampleUnit.java @@ -0,0 +1,16 @@ +package uk.gov.ons.ctp.response.collection.exercise.representation; + +import javax.validation.constraints.NotNull; +import lombok.Data; +import lombok.experimental.SuperBuilder; + +@Data +@SuperBuilder +public class SampleUnit { + protected String id; + @NotNull protected String sampleUnitRef; + @NotNull protected String sampleUnitType; + protected String partyId; + @NotNull protected String collectionInstrumentId; + protected boolean activeEnrolment; +} diff --git a/src/main/java/uk/gov/ons/ctp/response/collection/exercise/representation/SampleUnitChildrenDTO.java b/src/main/java/uk/gov/ons/ctp/response/collection/exercise/representation/SampleUnitChildrenDTO.java new file mode 100644 index 000000000..7f55928f2 --- /dev/null +++ b/src/main/java/uk/gov/ons/ctp/response/collection/exercise/representation/SampleUnitChildrenDTO.java @@ -0,0 +1,15 @@ +package uk.gov.ons.ctp.response.collection.exercise.representation; + +import java.util.List; +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import uk.gov.ons.ctp.response.collection.exercise.lib.casesvc.message.sampleunitnotification.SampleUnit; + +@Data +@AllArgsConstructor +@NoArgsConstructor(access = AccessLevel.PUBLIC) +public class SampleUnitChildrenDTO { + protected List sampleUnitChildren; +} diff --git a/src/main/java/uk/gov/ons/ctp/response/collection/exercise/representation/SampleUnitParentDTO.java b/src/main/java/uk/gov/ons/ctp/response/collection/exercise/representation/SampleUnitParentDTO.java new file mode 100644 index 000000000..8ee7dd9a2 --- /dev/null +++ b/src/main/java/uk/gov/ons/ctp/response/collection/exercise/representation/SampleUnitParentDTO.java @@ -0,0 +1,11 @@ +package uk.gov.ons.ctp.response.collection.exercise.representation; + +import lombok.*; +import lombok.experimental.SuperBuilder; + +@Data +@SuperBuilder +public class SampleUnitParentDTO extends SampleUnit { + protected String collectionExerciseId; + protected SampleUnitChildrenDTO sampleUnitChildrenDTO; +} diff --git a/src/main/java/uk/gov/ons/ctp/response/collection/exercise/utility/PubSubEmulator.java b/src/main/java/uk/gov/ons/ctp/response/collection/exercise/utility/PubSubEmulator.java new file mode 100644 index 000000000..3cb585b5a --- /dev/null +++ b/src/main/java/uk/gov/ons/ctp/response/collection/exercise/utility/PubSubEmulator.java @@ -0,0 +1,108 @@ +package uk.gov.ons.ctp.response.collection.exercise.utility; + +import com.godaddy.logging.Logger; +import com.godaddy.logging.LoggerFactory; +import com.google.api.core.ApiFuture; +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.rpc.TransportChannelProvider; +import com.google.cloud.pubsub.v1.*; +import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub; +import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.*; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import java.io.IOException; +import java.util.concurrent.ExecutionException; + +/** + * * This is a PubSub Emulator class. This is a utility class which is used for testing pubsub + * function + */ +public class PubSubEmulator { + private static final Logger log = LoggerFactory.getLogger(PubSubEmulator.class); + private static final String HOST_PORT = "localhost:18681"; + public static final ManagedChannel CHANNEL = + ManagedChannelBuilder.forTarget(HOST_PORT).usePlaintext().build(); + public static final TransportChannelProvider CHANNEL_PROVIDER = + FixedTransportChannelProvider.create(GrpcTransportChannel.create(CHANNEL)); + public static final CredentialsProvider CREDENTIAL_PROVIDER = NoCredentialsProvider.create(); + private static final String PROJECT_ID = "test"; + private static final String TOPIC_ID = "test_topic"; + private static final String SUBSCRIPTION_ID = "test_subscription"; + private final TopicAdminClient topicClient = + TopicAdminClient.create( + TopicAdminSettings.newBuilder() + .setTransportChannelProvider(PubSubEmulator.CHANNEL_PROVIDER) + .setCredentialsProvider(PubSubEmulator.CREDENTIAL_PROVIDER) + .build()); + SubscriptionAdminClient subscriptionAdminClient = + SubscriptionAdminClient.create( + SubscriptionAdminSettings.newBuilder() + .setTransportChannelProvider(PubSubEmulator.CHANNEL_PROVIDER) + .setCredentialsProvider(PubSubEmulator.CREDENTIAL_PROVIDER) + .build()); + TopicName topicName = TopicName.of(PROJECT_ID, TOPIC_ID); + ProjectSubscriptionName subscriptionName = + ProjectSubscriptionName.of(PROJECT_ID, SUBSCRIPTION_ID); + + public PubSubEmulator() throws IOException {} + + public Publisher getEmulatorPublisher(TopicName topicName) throws IOException { + return Publisher.newBuilder(topicName) + .setChannelProvider(CHANNEL_PROVIDER) + .setCredentialsProvider(CREDENTIAL_PROVIDER) + .build(); + } + + public Subscriber getEmulatorSubscriber(MessageReceiver receiver) { + return Subscriber.newBuilder(ProjectSubscriptionName.of(PROJECT_ID, SUBSCRIPTION_ID), receiver) + .setChannelProvider(CHANNEL_PROVIDER) + .setCredentialsProvider(CREDENTIAL_PROVIDER) + .build(); + } + + public GrpcSubscriberStub getEmulatorSubscriberStub() throws IOException { + return GrpcSubscriberStub.create( + SubscriberStubSettings.newBuilder() + .setTransportChannelProvider(CHANNEL_PROVIDER) + .setCredentialsProvider(CREDENTIAL_PROVIDER) + .build()); + } + + public void publishMessage(String message) { + try { + ByteString data = ByteString.copyFromUtf8(message); + PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build(); + TopicName topicName = TopicName.of(PROJECT_ID, TOPIC_ID); + Publisher publisher = getEmulatorPublisher(topicName); + log.with("publisher", publisher).info("Publishing message to pubsub emulator"); + ApiFuture messageIdFuture = publisher.publish(pubsubMessage); + String messageId = messageIdFuture.get(); + log.with("messageId", messageId).info("Published message to pubsub emulator"); + } catch (IOException | InterruptedException | ExecutionException e) { + log.error("Failed to publish message", e); + } + } + + public void shutdown() { + CHANNEL.shutdown(); + } + + public void testInit() { + Topic topic = topicClient.createTopic(topicName); + System.out.println("Created topic: " + topic.getName()); + Subscription subscription = + subscriptionAdminClient.createSubscription( + subscriptionName, topicName, PushConfig.getDefaultInstance(), 10); + System.out.println("Created pull subscription: " + subscription.getName()); + } + + public void testTeardown() { + subscriptionAdminClient.deleteSubscription(subscriptionName); + topicClient.deleteTopic(topicName); + } +} diff --git a/src/main/resources/application-test.yml b/src/main/resources/application-test.yml index 1c0598121..659233270 100644 --- a/src/main/resources/application-test.yml +++ b/src/main/resources/application-test.yml @@ -31,3 +31,7 @@ redisson-config: rabbitmq: host: localhost port: 35672 + +gcp: + project: "test" + caseNotificationTopic: "test_topic" \ No newline at end of file diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index c73766460..74388d2ff 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -186,3 +186,7 @@ springdoc: path: /api-doc.html operationsSorter: method enabled: false + +gcp: + project: ras-rm-sandbox + caseNotificationTopic: "test_topic" \ No newline at end of file diff --git a/src/test/java/uk/gov/ons/ctp/response/collection/exercise/distribution/SampleUnitDistributorTest.java b/src/test/java/uk/gov/ons/ctp/response/collection/exercise/distribution/SampleUnitDistributorTest.java index 35d204662..9b2e6daec 100644 --- a/src/test/java/uk/gov/ons/ctp/response/collection/exercise/distribution/SampleUnitDistributorTest.java +++ b/src/test/java/uk/gov/ons/ctp/response/collection/exercise/distribution/SampleUnitDistributorTest.java @@ -36,7 +36,6 @@ import uk.gov.ons.ctp.response.collection.exercise.domain.Event; import uk.gov.ons.ctp.response.collection.exercise.domain.ExerciseSampleUnit; import uk.gov.ons.ctp.response.collection.exercise.domain.ExerciseSampleUnitGroup; -import uk.gov.ons.ctp.response.collection.exercise.lib.casesvc.message.sampleunitnotification.SampleUnitParent; import uk.gov.ons.ctp.response.collection.exercise.lib.common.distributed.DistributedListManager; import uk.gov.ons.ctp.response.collection.exercise.lib.common.distributed.LockingException; import uk.gov.ons.ctp.response.collection.exercise.lib.common.error.CTPException; @@ -54,10 +53,12 @@ import uk.gov.ons.ctp.response.collection.exercise.representation.SampleUnitGroupDTO; import uk.gov.ons.ctp.response.collection.exercise.representation.SampleUnitGroupDTO.SampleUnitGroupEvent; import uk.gov.ons.ctp.response.collection.exercise.representation.SampleUnitGroupDTO.SampleUnitGroupState; +import uk.gov.ons.ctp.response.collection.exercise.representation.SampleUnitParentDTO; import uk.gov.ons.ctp.response.collection.exercise.service.EventService; /** Tests for the SampleUnitDistributor */ @RunWith(MockitoJUnitRunner.class) +@SuppressWarnings("unchecked") public class SampleUnitDistributorTest { private static final Integer DISTRIBUTION_SCHEDULE_RETRIEVAL_MAX = 10; @@ -181,10 +182,10 @@ public void testSampleUnitPublished() { // Then both sample units are published and // sample unit groups and collection exercise are transitioned - ArgumentCaptor sampleUnitParentSave = - ArgumentCaptor.forClass(SampleUnitParent.class); + ArgumentCaptor sampleUnitParentSave = + ArgumentCaptor.forClass(SampleUnitParentDTO.class); verify(publisher, times(2)).sendSampleUnit(sampleUnitParentSave.capture()); - List savedSampleUnitParents = sampleUnitParentSave.getAllValues(); + List savedSampleUnitParents = sampleUnitParentSave.getAllValues(); savedSampleUnitParents.forEach( sampleUnitParent -> { assertEquals(SAMPLE_UNIT_REF, sampleUnitParent.getSampleUnitRef()); @@ -192,8 +193,8 @@ public void testSampleUnitPublished() { assertEquals(PARTY_ID_PARENT, sampleUnitParent.getPartyId()); assertEquals(COLLECTION_INSTRUMENT_ID, sampleUnitParent.getCollectionInstrumentId()); assertEquals(COLLECTION_EXERCISE_ID, sampleUnitParent.getCollectionExerciseId()); - assertTrue(sampleUnitParent.getActiveEnrolment()); - assertNull(sampleUnitParent.getSampleUnitChildren()); + assertTrue(sampleUnitParent.isActiveEnrolment()); + assertNull(sampleUnitParent.getSampleUnitChildrenDTO()); }); ArgumentCaptor sampleUnitGroupSave = @@ -318,7 +319,7 @@ public void testCollectionExerciseStateTransitionException() throws Exception { sampleUnitDistributor.distributeSampleUnits(collectionExercise); // Then sampleunits are published but collection exercise is not transitioned - verify(publisher, times(2)).sendSampleUnit(any(SampleUnitParent.class)); + verify(publisher, times(2)).sendSampleUnit(any(SampleUnitParentDTO.class)); verify(sampleUnitGroupRepo, times(2)).saveAndFlush(any(ExerciseSampleUnitGroup.class)); verify(collectionExerciseRepo, never()).saveAndFlush(any()); } diff --git a/src/test/java/uk/gov/ons/ctp/response/collection/exercise/endpoint/CollectionExerciseEndpointIT.java b/src/test/java/uk/gov/ons/ctp/response/collection/exercise/endpoint/CollectionExerciseEndpointIT.java index 2a29a40ce..a53715ca8 100644 --- a/src/test/java/uk/gov/ons/ctp/response/collection/exercise/endpoint/CollectionExerciseEndpointIT.java +++ b/src/test/java/uk/gov/ons/ctp/response/collection/exercise/endpoint/CollectionExerciseEndpointIT.java @@ -16,7 +16,6 @@ import com.mashape.unirest.http.HttpResponse; import com.mashape.unirest.http.Unirest; import com.thoughtworks.xstream.XStream; -import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.io.StringWriter; @@ -30,7 +29,6 @@ import java.util.Random; import java.util.UUID; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import javax.xml.bind.JAXBContext; import javax.xml.bind.JAXBException; @@ -38,11 +36,11 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.DateUtils; import org.apache.commons.lang3.tuple.Pair; -import org.hamcrest.Matchers; import org.junit.Before; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; +import org.junit.contrib.java.lang.system.EnvironmentVariables; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.web.server.LocalServerPort; @@ -53,7 +51,6 @@ import org.springframework.test.context.junit4.rules.SpringMethodRule; import uk.gov.ons.ctp.response.collection.exercise.config.AppConfig; import uk.gov.ons.ctp.response.collection.exercise.domain.CollectionExercise; -import uk.gov.ons.ctp.response.collection.exercise.lib.casesvc.message.sampleunitnotification.SampleUnitParent; import uk.gov.ons.ctp.response.collection.exercise.lib.common.error.CTPException; import uk.gov.ons.ctp.response.collection.exercise.lib.rabbit.Rabbitmq; import uk.gov.ons.ctp.response.collection.exercise.lib.rabbit.SimpleMessageBase; @@ -61,6 +58,7 @@ import uk.gov.ons.ctp.response.collection.exercise.lib.rabbit.SimpleMessageSender; import uk.gov.ons.ctp.response.collection.exercise.lib.sample.representation.SampleSummaryDTO; import uk.gov.ons.ctp.response.collection.exercise.lib.sampleunit.definition.SampleUnit; +import uk.gov.ons.ctp.response.collection.exercise.message.TestPubSubMessage; import uk.gov.ons.ctp.response.collection.exercise.repository.CollectionExerciseRepository; import uk.gov.ons.ctp.response.collection.exercise.repository.EventRepository; import uk.gov.ons.ctp.response.collection.exercise.repository.SampleLinkRepository; @@ -69,8 +67,10 @@ import uk.gov.ons.ctp.response.collection.exercise.representation.CollectionExerciseDTO; import uk.gov.ons.ctp.response.collection.exercise.representation.EventDTO; import uk.gov.ons.ctp.response.collection.exercise.representation.ResponseEventDTO; +import uk.gov.ons.ctp.response.collection.exercise.representation.SampleUnitParentDTO; import uk.gov.ons.ctp.response.collection.exercise.service.CollectionTransitionEvent; import uk.gov.ons.ctp.response.collection.exercise.service.EventService; +import uk.gov.ons.ctp.response.collection.exercise.utility.PubSubEmulator; import uk.gov.ons.ctp.response.collection.exercise.validation.ValidateSampleUnits; /** A class to contain integration tests for the collection exercise service */ @@ -108,11 +108,18 @@ public class CollectionExerciseEndpointIT { @Rule public final SpringMethodRule springMethodRule = new SpringMethodRule(); + @ClassRule + public static final EnvironmentVariables environmentVariables = + new EnvironmentVariables().set("PUBSUB_EMULATOR_HOST", "127.0.0.1:18681"); + @ClassRule public static WireMockClassRule wireMockRule = new WireMockClassRule(options().port(18002).bindAddress("localhost")); private CollectionExerciseClient client; + private PubSubEmulator pubSubEmulator = new PubSubEmulator(); + + public CollectionExerciseEndpointIT() throws IOException {} /** Method to set up integration test */ @Before @@ -264,26 +271,30 @@ public void shouldTransitionCollectionExerciseToReadyToReviewOnSampleSummaryActi @Test public void ensureSampleUnitIdIsPropagatedHereBusiness() throws Exception { + pubSubEmulator.testInit(); stubSurveyServiceBusiness(); stubGetPartyNoAssociations(); stubCollectionInstrumentCount(); - SampleUnitParent sampleUnit = ensureSampleUnitIdIsPropagatedHere("B"); + SampleUnitParentDTO sampleUnit = ensureSampleUnitIdIsPropagatedHere("B"); assertNotNull("Party id must be not null", sampleUnit.getPartyId()); + pubSubEmulator.testTeardown(); } @Test public void ensureSampleUnitIdIsPropagatedHereBusinessWithExistingEnrolments() throws Exception { + pubSubEmulator.testInit(); stubSurveyServiceBusiness(); stubGetPartyWithAssociations(); stubCollectionInstrumentCount(); - SampleUnitParent sampleUnit = ensureSampleUnitIdIsPropagatedHere("B"); + SampleUnitParentDTO sampleUnit = ensureSampleUnitIdIsPropagatedHere("B"); assertNotNull("Party id must be not null", sampleUnit.getPartyId()); + pubSubEmulator.testTeardown(); } - private SampleUnitParent ensureSampleUnitIdIsPropagatedHere(String type) throws Exception { + private SampleUnitParentDTO ensureSampleUnitIdIsPropagatedHere(String type) throws Exception { createCollectionInstrumentStub(); SampleUnit sampleUnit = new SampleUnit(); @@ -300,18 +311,10 @@ private SampleUnitParent ensureSampleUnitIdIsPropagatedHere(String type) throws if (type.equalsIgnoreCase("B") || type.equalsIgnoreCase("BI")) { sampleUnit.setFormType(""); } - - // sampleUnit.setSampleAttributes(new SampleUnit.SampleAttributes(new ArrayList<>())); - setSampleSize(collex, 1); setState(collex, CollectionExerciseDTO.CollectionExerciseState.EXECUTION_STARTED); - SimpleMessageListener listener = getMessageListener(); - BlockingQueue queue = - listener.listen( - SimpleMessageBase.ExchangeType.Direct, - "collection-outbound-exchange", - "Case.CaseDelivery.binding"); + TestPubSubMessage message = new TestPubSubMessage(); String xml = sampleUnitToXmlString(sampleUnit); @@ -332,46 +335,25 @@ private SampleUnitParent ensureSampleUnitIdIsPropagatedHere(String type) throws .basicAuth("admin", "secret") .header("accept", "application/json") .asString(); - assertThat(distributeResponse.getStatus(), Matchers.is(200)); - assertThat( - distributeResponse.getBody(), - Matchers.is("Completed sample unit validation")); - HttpResponse response = Unirest.get( "http://localhost:" + threadPort + "/cron/sample-unit-distribution") .basicAuth("admin", "secret") .header("accept", "application/json") .asString(); - assertThat(response.getStatus(), Matchers.is(200)); - assertThat(response.getBody(), Matchers.is("Completed sample unit distribution")); } } catch (Exception e) { log.error("exception in thread", e); } }); thread.start(); - // The service stubs will exit once the call below times out preventing further debugging of - // this test in other - // threads (i.e. you have 2 minutes to debug before the service calls will start to fail) - String message = queue.poll(2, TimeUnit.MINUTES); - // If you need more than 2 minutes to debug this test, then either change the timeout above or - // comment that line - // and uncomment the one below (which gives infinite time). - // String message = queue.take(); + SampleUnitParentDTO sampleUnitMessage = message.getPubSubSampleUnitMessage(); log.info("message = " + message); - assertNotNull("Timeout waiting for message to arrive in Case.CaseDelivery", message); - - JAXBContext jaxbContext = JAXBContext.newInstance(SampleUnitParent.class); - SampleUnitParent sampleUnitParent = - (SampleUnitParent) - jaxbContext - .createUnmarshaller() - .unmarshal(new ByteArrayInputStream(message.getBytes())); + assertNotNull("Timeout waiting for message to arrive in Case.CaseDelivery", sampleUnitMessage); - assertEquals(id, UUID.fromString(sampleUnitParent.getId())); + assertEquals(id, UUID.fromString(sampleUnitMessage.getId())); - return sampleUnitParent; + return sampleUnitMessage; } private EventDTO createEventDTO( diff --git a/src/test/java/uk/gov/ons/ctp/response/collection/exercise/message/TestPubSubMessage.java b/src/test/java/uk/gov/ons/ctp/response/collection/exercise/message/TestPubSubMessage.java new file mode 100644 index 000000000..e213629d4 --- /dev/null +++ b/src/test/java/uk/gov/ons/ctp/response/collection/exercise/message/TestPubSubMessage.java @@ -0,0 +1,58 @@ +package uk.gov.ons.ctp.response.collection.exercise.message; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.godaddy.logging.Logger; +import com.godaddy.logging.LoggerFactory; +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.pubsub.v1.PubsubMessage; +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import uk.gov.ons.ctp.response.collection.exercise.representation.SampleUnitParentDTO; +import uk.gov.ons.ctp.response.collection.exercise.utility.PubSubEmulator; + +public class TestPubSubMessage { + private static final Logger log = LoggerFactory.getLogger(TestPubSubMessage.class); + private final PubSubEmulator pubsubEmulator = new PubSubEmulator(); + private String receivedMessage = null; + + public TestPubSubMessage() throws IOException {} + + public SampleUnitParentDTO getPubSubSampleUnitMessage() throws IOException { + + MessageReceiver receiver = + (PubsubMessage message, AckReplyConsumer consumer) -> { + // Handle incoming message, then ack the received message. + log.info("Id: " + message.getMessageId()); + log.info("Data: " + message.getData().toStringUtf8()); + receivedMessage = message.getData().toStringUtf8(); + log.info("Received : " + receivedMessage); + consumer.ack(); + }; + Subscriber subscriber = pubsubEmulator.getEmulatorSubscriber(receiver); + Thread t = + new Thread() { + + @Override + public void run() { + subscriber.startAsync().awaitRunning(); + } + }; + ExecutorService service = Executors.newSingleThreadExecutor(); + service.execute(t); + while (receivedMessage == null) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + subscriber.stopAsync().awaitTerminated(); + service.shutdown(); + System.out.println(receivedMessage); + ObjectMapper objectMapper = new ObjectMapper(); + return objectMapper.readValue(receivedMessage, SampleUnitParentDTO.class); + } +}