diff --git a/.github/workflows/cd.yml b/.github/workflows/cd.yml deleted file mode 100644 index bc94506..0000000 --- a/.github/workflows/cd.yml +++ /dev/null @@ -1,94 +0,0 @@ -name: CD - -on: - push: - branches: - - master - -jobs: - publish: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v2 - - - name: Set envs - # use latest tag as release version - run: echo ::set-env name=RELEASE_VERSION::${GITHUB_SHA} - - - name: Build docker image, push - uses: docker/build-push-action@v1 - with: - # login to repo - repository: lukaswire/polls - username: ${{ secrets.DOCKER_USERNAME }} - password: ${{ secrets.DOCKER_PASSWORD }} - # pass release_version - build_args: release_version=${{ env.RELEASE_VERSION }} - # tag the image with latest git tag - tag_with_ref: true - # add labels based on the build - see https://github.com/opencontainers/image-spec/blob/master/annotations.md - add_git_labels: true - # push only if this is master - push: ${{ startsWith(github.ref, 'refs/heads/master') }} - - # Send webhook to Wire using Slack Bot - - name: Webhook to Wire - uses: 8398a7/action-slack@v2 - with: - status: ${{ job.status }} - author_name: Poll Bot CD pipeline - env: - SLACK_WEBHOOK_URL: ${{ secrets.WEBHOOK_CI }} - # Send message only if previous step failed - if: failure() - - - deploy: - name: Deploy to K8S - needs: publish - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v2 - with: - repository: zinfra/rubicon - ref: develop - token: ${{ secrets.RUBICON_GIT_TOKEN }} - - # Setup gcloud CLI - - uses: GoogleCloudPlatform/github-actions/setup-gcloud@master - with: - version: '286.0.0' - service_account_email: ${{ secrets.GKE_SA_EMAIL }} - service_account_key: ${{ secrets.GKE_SA_KEY }} - project_id: ${{ secrets.GKE_PROJECT }} - - # Configure Docker to use the gcloud command-line tool as a credential - # helper for authentication - - name: Configure Docker Google cloud - run: |- - gcloud --quiet auth configure-docker - - # Get the GKE credentials so we can deploy to the cluster - - name: Obtain k8s credentials - env: - GKE_CLUSTER: ${{ secrets.GKE_CLUSTER }} - GKE_ZONE: ${{ secrets.GKE_ZONE }} - run: |- - gcloud container clusters get-credentials "$GKE_CLUSTER" --zone "$GKE_ZONE" - - # K8s is set up, deploy the app - - name: Deploy - run: |- - kubectl delete pod -l name=poll -n staging - kubectl describe pod -l name=poll -n staging - - # Send webhook to Wire using Slack Bot - - name: Webhook to Wire - uses: 8398a7/action-slack@v2 - with: - status: ${{ job.status }} - author_name: Poll Bot Deployment to Kubernetes - env: - SLACK_WEBHOOK_URL: ${{ secrets.WEBHOOK_CI }} - # Send message only if previous step failed - if: failure() diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 133843e..bb7c99c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,6 +1,12 @@ name: CI -on: [pull_request] +on: + push: + branches-ignore: + - master + - staging + + pull_request: jobs: check: @@ -36,12 +42,8 @@ jobs: uses: docker/build-push-action@v1 with: # login to repo - repository: lukaswire/polls - # tag the image with latest git tag - tag_with_ref: true - # add labels based on the build - see https://github.com/opencontainers/image-spec/blob/master/annotations.md - add_git_labels: true - # push only if this is master + repository: wire/ci-test-image + # do not push image push: false # Send webhook to Wire using Slack Bot @@ -49,7 +51,7 @@ jobs: uses: 8398a7/action-slack@v2 with: status: ${{ job.status }} - author_name: Poll Bot Docker CI pipeline + author_name: Docker CI pipeline env: SLACK_WEBHOOK_URL: ${{ secrets.WEBHOOK_CI }} # Send message only if previous step failed diff --git a/.github/workflows/master.yml b/.github/workflows/master.yml new file mode 100644 index 0000000..f1c83bd --- /dev/null +++ b/.github/workflows/master.yml @@ -0,0 +1,50 @@ +name: Docker Latest build + +on: + push: + branches: + - master + +env: + DOCKER_IMAGE: wire-bot/poll + +jobs: + publish: + name: Build and publish docker image + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + + - name: Set Release Version + # use latest tag as release version + run: echo ::set-env name=RELEASE_VERSION::${GITHUB_SHA} + + - name: Build docker image, push + uses: docker/build-push-action@v1 + with: + # set docker image + repository: ${{ env.DOCKER_IMAGE }} + # use GCR repository + registry: eu.gcr.io + # see https://github.com/marketplace/actions/docker-build-push#google-container-registry-gcr + username: _json_key + password: ${{ secrets.GCR_ACCESS_JSON }} + # pass release_version + build_args: release_version=${{ env.RELEASE_VERSION }} + # tag the image with name of the branch - latest as this is master + tag_with_ref: true + # add labels based on the build - see https://github.com/opencontainers/image-spec/blob/master/annotations.md + add_git_labels: true + # push + push: true + + # Send webhook to Wire using Slack Bot + - name: Webhook to Wire + uses: 8398a7/action-slack@v2 + with: + status: ${{ job.status }} + author_name: ${{ env.DOCKER_IMAGE }} master branch build + env: + SLACK_WEBHOOK_URL: ${{ secrets.WEBHOOK_CI }} + # Send message only if previous step failed + if: failure() diff --git a/.github/workflows/release.yml b/.github/workflows/prod.yml similarity index 51% rename from .github/workflows/release.yml rename to .github/workflows/prod.yml index b4d998f..fb443af 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/prod.yml @@ -4,22 +4,31 @@ on: release: types: published +env: + DOCKER_IMAGE: wire-bot/poll + SERVICE_NAME: poll + jobs: - build: + deploy: + name: Build and deploy service runs-on: ubuntu-latest steps: - - uses: actions/checkout@v1 - - name: Set envs + - uses: actions/checkout@v2 + + - name: Set Release Version # use latest tag as release version run: echo ::set-env name=RELEASE_VERSION::${GITHUB_REF:10} - name: Build and publish docker image uses: docker/build-push-action@v1 with: - # login to repo - repository: lukaswire/polls - username: ${{ secrets.DOCKER_USERNAME }} - password: ${{ secrets.DOCKER_PASSWORD }} + # set docker image + repository: ${{ env.DOCKER_IMAGE }} + # use GCR repository + registry: eu.gcr.io + # see https://github.com/marketplace/actions/docker-build-push#google-container-registry-gcr + username: _json_key + password: ${{ secrets.GCR_ACCESS_JSON }} # pass release_version build_args: release_version=${{ env.RELEASE_VERSION }} # tag the image with latest git tag @@ -41,59 +50,69 @@ jobs: token: ${{ secrets.RUBICON_GIT_TOKEN }} # Update version to the one that was just built - - name: Change version + - name: Change Version in Rubicon env: - RELEASE_VERSION: ${{ env.RELEASE_VERSION }} - run: |- - cd rubicon/prod/services/poll - sed -i".bak" "s/image: lukaswire\/polls.*/image: lukaswire\/polls:$RELEASE_VERSION/g" poll.yaml - rm poll.yaml.bak + IMAGE: ${{ env.DOCKER_IMAGE }} + SERVICE: ${{ env.SERVICE_NAME }} + VERSION: ${{ env.RELEASE_VERSION }} + run: | + # go to directory with configuration + cd "rubicon/prod/services/$SERVICE" + # escape literals for the sed and set output with GCR + export SED_PREPARED=$(echo $IMAGE | awk '{ gsub("/", "\\/", $1); print "eu.gcr.io\\/"$1 }') + # update final yaml + sed -i".bak" "s/image: $SED_PREPARED.*/image: $SED_PREPARED:$VERSION/g" "$SERVICE.yaml" + # delete bakup file + rm "$SERVICE.yaml.bak" # Setup gcloud CLI - - uses: GoogleCloudPlatform/github-actions/setup-gcloud@master + - name: Setup Google Cloud CLI + uses: GoogleCloudPlatform/github-actions/setup-gcloud@master with: version: '286.0.0' - service_account_email: ${{ secrets.GKE_SA_EMAIL }} + service_account_email: kubernetes-deployment-agent@wire-bot.iam.gserviceaccount.com service_account_key: ${{ secrets.GKE_SA_KEY }} - project_id: ${{ secrets.GKE_PROJECT }} + project_id: wire-bot # Configure Docker to use the gcloud command-line tool - name: Configure Docker Google cloud - run: |- + run: | gcloud --quiet auth configure-docker # Get the GKE credentials so we can deploy to the cluster - name: Obtain k8s credentials env: - GKE_CLUSTER: ${{ secrets.GKE_CLUSTER }} - GKE_ZONE: ${{ secrets.GKE_ZONE }} - run: |- + GKE_CLUSTER: anayotto + GKE_ZONE: europe-west1-c + run: | gcloud container clusters get-credentials "$GKE_CLUSTER" --zone "$GKE_ZONE" # K8s is set up, deploy the app - - name: Deploy - run: |- - kubectl apply -f rubicon/prod/services/poll/poll.yaml + - name: Deploy the Service + env: + SERVICE: ${{ env.SERVICE_NAME }} + run: | + kubectl apply -f "rubicon/prod/services/$SERVICE/$SERVICE.yaml" # Commit all data to Rubicon and open PR - - name: Create Pull Request + - name: Create Rubicon Pull Request uses: peter-evans/create-pull-request@v2 - env: - RELEASE_VERSION: ${{ env.RELEASE_VERSION }} with: path: rubicon - branch: poll-bot-release + branch: ${{ env.SERVICE_NAME }}-release token: ${{ secrets.RUBICON_GIT_TOKEN }} - title: "Poll Bot Release" - commit-message: "Poll Bot version bump" - body: "Poll Bot Release" + labels: version-bump, automerge + title: ${{ env.SERVICE_NAME }} release ${{ env.RELEASE_VERSION }} + commit-message: ${{ env.SERVICE_NAME }} version bump to ${{ env.RELEASE_VERSION }} + body: | + This is automatic version bump from the pipeline. # Send webhook to Wire using Slack Bot - name: Webhook to Wire uses: 8398a7/action-slack@v2 with: status: ${{ job.status }} - author_name: Poll Bot Release Pipeline + author_name: ${{ env.SERVICE_NAME }} release pipeline env: SLACK_WEBHOOK_URL: ${{ secrets.WEBHOOK_RELEASE }} # Notify every release diff --git a/.github/workflows/staging.yml b/.github/workflows/staging.yml new file mode 100644 index 0000000..61f000b --- /dev/null +++ b/.github/workflows/staging.yml @@ -0,0 +1,82 @@ +name: Staging Deployment + +on: + push: + branches: + - staging + +env: + DOCKER_IMAGE: wire-bot/poll + SERVICE_NAME: poll + +jobs: + publish: + name: Deploy to staging + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + + - name: Set Release Version + # use latest tag as release version + run: echo ::set-env name=RELEASE_VERSION::${GITHUB_SHA} + + - name: Build docker image, push + uses: docker/build-push-action@v1 + with: + # set docker image + repository: ${{ env.DOCKER_IMAGE }} + # use GCR repository + registry: eu.gcr.io + # see https://github.com/marketplace/actions/docker-build-push#google-container-registry-gcr + username: _json_key + password: ${{ secrets.GCR_ACCESS_JSON }} + # pass release_version + build_args: release_version=${{ env.RELEASE_VERSION }} + # tag the image with name of the branch - staging + tag_with_ref: true + # add labels based on the build - see https://github.com/opencontainers/image-spec/blob/master/annotations.md + add_git_labels: true + # push + push: true + + # Setup gcloud CLI + - name: Setup Google Cloud CLI + uses: GoogleCloudPlatform/github-actions/setup-gcloud@master + with: + version: '286.0.0' + service_account_email: kubernetes-deployment-agent@wire-bot.iam.gserviceaccount.com + service_account_key: ${{ secrets.GKE_SA_KEY }} + project_id: wire-bot + + # Configure Docker to use the gcloud command-line tool + - name: Configure Docker Google cloud + run: | + gcloud --quiet auth configure-docker + + # Get the GKE credentials so we can deploy to the cluster + - name: Obtain k8s credentials + env: + GKE_CLUSTER: anayotto + GKE_ZONE: europe-west1-c + run: | + gcloud container clusters get-credentials "$GKE_CLUSTER" --zone "$GKE_ZONE" + + # K8s is set up, deploy the app + - name: Deploy the Service + env: + SERVICE: ${{ env.SERVICE_NAME }} + run: | + kubectl delete pod -l name=$SERVICE -n staging + kubectl describe pod -l name=$SERVICE -n staging + + # Send webhook to Wire using Slack Bot + - name: Webhook to Wire + uses: 8398a7/action-slack@v2 + with: + status: ${{ job.status }} + author_name: ${{ env.SERVICE_NAME }} staging pipeline + env: + SLACK_WEBHOOK_URL: ${{ secrets.WEBHOOK_CI }} + # Send message only if previous step failed + if: always() + diff --git a/Dockerfile b/Dockerfile index e2a5b47..5417e6d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -27,11 +27,11 @@ ENV APP_ROOT /app WORKDIR $APP_ROOT # Obtain built from the base -COPY --from=build /src/build/distributions/polls*.tar $APP_ROOT/ +COPY --from=build /src/build/distributions/app.tar $APP_ROOT/ # Extract executables RUN mkdir $APP_ROOT/run -RUN tar -xvf polls*.tar --strip-components=1 -C $APP_ROOT/run +RUN tar -xvf app.tar --strip-components=1 -C $APP_ROOT/run # create version file ARG release_version=development diff --git a/README.md b/README.md index eb31987..4e313d0 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,7 @@ # Wire Poll Bot [![GitHub version](https://badge.fury.io/gh/wireapp%2Fpoll-bot.svg)](https://badge.fury.io/gh/wireapp%2Fpoll-bot) -![CI/CD](https://github.com/wireapp/poll-bot/workflows/CI/CD/badge.svg) +![CI](https://github.com/wireapp/poll-bot/workflows/CI/badge.svg) +![CD](https://github.com/wireapp/poll-bot/workflows/CD/badge.svg) ![Release Pipeline](https://github.com/wireapp/poll-bot/workflows/Release%20Pipeline/badge.svg) [Wire](https://wire.com/) bot for the polls. @@ -18,7 +19,7 @@ Basic usage ## Dev Stack * HTTP Server - [Ktor](https://ktor.io/) -* HTTP Client - [CIO](https://ktor.io/clients/http-client/engines.html) under [Ktor](https://ktor.io/) +* HTTP Client - [Apache](https://ktor.io/clients/http-client/engines.html) under [Ktor](https://ktor.io/) * Dependency Injection - [Kodein](https://github.com/Kodein-Framework/Kodein-DI) * Build system - [Gradle](https://gradle.org/) * Communication with [Wire](https://wire.com/) - [Roman](https://github.com/dkovacevic/roman) @@ -71,18 +72,6 @@ Configuration is currently being loaded from the environment variables. * Key for connecting to the web socket of the proxy. */ const val APP_KEY = "APP_KEY" - /** - * Determines whether to use web sockets for connection to proxy or not eg. true - */ - const val USE_WEB_SOCKETS = "USE_WEB_SOCKETS" - /** - * Host name for the connection to web socket eg."proxy.services.zinfra.io" - */ - const val PROXY_WS_HOST = "PROXY_WS_HOST" - /** - * Path to web socket at proxy eg. "/await" - */ - const val PROXY_WS_PATH = "PROXY_WS_PATH" /** * Domain used for sending the messages from the bot to proxy eg. "https://proxy.services.zinfra.io" */ @@ -106,8 +95,5 @@ DB_PASSWORD= DB_URL= SERVICE_TOKEN= APP_KEY= -USE_WEB_SOCKETS= -PROXY_WS_HOST= -PROXY_WS_PATH= PROXY_DOMAIN= ``` diff --git a/build.gradle.kts b/build.gradle.kts index 4c801b9..7779edc 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -34,10 +34,13 @@ dependencies { // Ktor client dependencies implementation("io.ktor", "ktor-client-json", ktorVersion) implementation("io.ktor", "ktor-client-jackson", ktorVersion) - implementation("io.ktor", "ktor-client-websockets", ktorVersion) - implementation("io.ktor", "ktor-client-cio", ktorVersion) + implementation("io.ktor", "ktor-client-apache", ktorVersion) implementation("io.ktor", "ktor-client-logging-jvm", ktorVersion) + // Prometheus metrics + implementation("io.ktor", "ktor-metrics-micrometer", ktorVersion) + implementation("io.micrometer", "micrometer-registry-prometheus", "1.4.1") + // logging implementation("io.github.microutils", "kotlin-logging", "1.7.9") implementation("ch.qos.logback", "logback-classic", "1.2.3") @@ -69,6 +72,14 @@ tasks { kotlinOptions.jvmTarget = "1.8" } + distTar { + archiveFileName.set("app.tar") + } + + withType { + useJUnitPlatform() + } + register("fatJar") { manifest { attributes["Main-Class"] = mainClass diff --git a/src/main/kotlin/com/wire/bots/polls/PollBot.kt b/src/main/kotlin/com/wire/bots/polls/PollBot.kt index f0ac374..c982309 100644 --- a/src/main/kotlin/com/wire/bots/polls/PollBot.kt +++ b/src/main/kotlin/com/wire/bots/polls/PollBot.kt @@ -4,9 +4,7 @@ import com.wire.bots.polls.setup.init import io.ktor.application.Application import io.ktor.server.engine.embeddedServer import io.ktor.server.netty.Netty -import io.ktor.util.KtorExperimentalAPI -@KtorExperimentalAPI fun main() { embeddedServer(Netty, 8080, module = Application::init).start() } diff --git a/src/main/kotlin/com/wire/bots/polls/routing/HealthRoute.kt b/src/main/kotlin/com/wire/bots/polls/routing/HealthRoute.kt deleted file mode 100644 index b96da0a..0000000 --- a/src/main/kotlin/com/wire/bots/polls/routing/HealthRoute.kt +++ /dev/null @@ -1,31 +0,0 @@ -package com.wire.bots.polls.routing - -import com.wire.bots.polls.dao.DatabaseSetup -import io.ktor.application.call -import io.ktor.http.HttpStatusCode -import io.ktor.response.respond -import io.ktor.routing.Routing -import io.ktor.routing.get - -/** - * Health indication endpoints. - */ -fun Routing.healthStatus() { - /** - * Responds only 200 for ingres. - */ - get("/status") { - call.respond(HttpStatusCode.OK) - } - - /** - * More complex API for indication of all resources. - */ - get("/status/health") { - if (DatabaseSetup.isConnected()) { - call.respond("healthy") - } else { - call.respond(HttpStatusCode.ServiceUnavailable, "DB connection is not working") - } - } -} diff --git a/src/main/kotlin/com/wire/bots/polls/routing/MessagesRoute.kt b/src/main/kotlin/com/wire/bots/polls/routing/MessagesRoute.kt index 258a83b..3d4404f 100644 --- a/src/main/kotlin/com/wire/bots/polls/routing/MessagesRoute.kt +++ b/src/main/kotlin/com/wire/bots/polls/routing/MessagesRoute.kt @@ -9,39 +9,39 @@ import io.ktor.request.receive import io.ktor.response.respond import io.ktor.routing.Routing import io.ktor.routing.post -import mu.KLogger +import org.kodein.di.LazyKodein import org.kodein.di.generic.instance -import org.kodein.di.ktor.kodein /** * Messages API. */ -fun Routing.messages() { - val k by kodein() - - val logger by k.instance("routing-logger") +fun Routing.messages(k: LazyKodein) { val handler by k.instance() val authService by k.instance() + /** + * API for receiving messages from Roman. + */ post("/messages") { - logger.debug { "POST /messages" } + routingLogger.debug { "POST /messages" } // verify whether request contain correct auth header if (authService.isTokenValid { call.request.headers }) { - logger.info { "Token is valid." } + routingLogger.debug { "Token is valid." } // bot responds either with 200 or with 400 runCatching { - logger.info { "Parsing an message." } - val message = call.receive() - logger.info { "Message parsed." } - handler.handle(message) - logger.info { "Responding OK" } - call.respond(HttpStatusCode.OK) + routingLogger.debug { "Parsing an message." } + call.receive() }.onFailure { - logger.error(it) { "Exception occurred during the request handling!" } + routingLogger.error(it) { "Exception occurred during the request handling!" } call.respond(HttpStatusCode.BadRequest, "Bot did not understand the message.") + }.onSuccess { + routingLogger.debug { "Message parsed." } + handler.handle(it) + routingLogger.debug { "Responding OK" } + call.respond(HttpStatusCode.OK) } } else { - logger.info { "Token is invalid." } + routingLogger.warn { "Token is invalid." } call.respond(HttpStatusCode.Unauthorized, "Please provide Authorization header.") } } diff --git a/src/main/kotlin/com/wire/bots/polls/routing/Routing.kt b/src/main/kotlin/com/wire/bots/polls/routing/Routing.kt index 74fe6f9..56e7ed7 100644 --- a/src/main/kotlin/com/wire/bots/polls/routing/Routing.kt +++ b/src/main/kotlin/com/wire/bots/polls/routing/Routing.kt @@ -1,31 +1,17 @@ package com.wire.bots.polls.routing -import ai.blindspot.ktoolz.extensions.newLine -import io.ktor.application.call -import io.ktor.content.TextContent -import io.ktor.http.ContentType -import io.ktor.response.respond +import com.wire.bots.polls.utils.createLogger import io.ktor.routing.Routing -import io.ktor.routing.get -import org.kodein.di.generic.instance import org.kodein.di.ktor.kodein +internal val routingLogger by lazy { createLogger("RoutingLogger") } + /** * Register routes to the KTor. */ fun Routing.registerRoutes() { - val k by kodein() - val version by k.instance("version") - - get("/") { - call.respond("This is the Wire Poll Bot running version \"$version\".") - } - - get("/version") { - call.respond(TextContent("{\"version\": \"$version\"}$newLine", ContentType.Application.Json)) - } - healthStatus() - messages() + serviceRoutes(k) + messages(k) } diff --git a/src/main/kotlin/com/wire/bots/polls/routing/ServiceRoutes.kt b/src/main/kotlin/com/wire/bots/polls/routing/ServiceRoutes.kt new file mode 100644 index 0000000..63c1ff2 --- /dev/null +++ b/src/main/kotlin/com/wire/bots/polls/routing/ServiceRoutes.kt @@ -0,0 +1,62 @@ +package com.wire.bots.polls.routing + +import com.wire.bots.polls.dao.DatabaseSetup +import io.ktor.application.call +import io.ktor.http.HttpStatusCode +import io.ktor.response.respond +import io.ktor.response.respondTextWriter +import io.ktor.routing.Routing +import io.ktor.routing.get +import io.micrometer.prometheus.PrometheusMeterRegistry +import org.kodein.di.LazyKodein +import org.kodein.di.generic.instance + +/** + * Registers prometheus data. + */ +fun Routing.serviceRoutes(k: LazyKodein) { + val version by k.instance("version") + val registry by k.instance() + + /** + * Information about service. + */ + get("/") { + call.respond("Server running version: \"$version\".") + } + + /** + * Send data about version. + */ + get("/version") { + call.respond(mapOf("version" to version)) + } + + /** + * Responds only 200 for ingres. + */ + get("/status") { + call.respond(HttpStatusCode.OK) + } + + /** + * More complex API for indication of all resources. + */ + get("/status/health") { + if (DatabaseSetup.isConnected()) { + call.respond(mapOf("health" to "healthy")) + } else { + call.respond(HttpStatusCode.ServiceUnavailable, "DB connection is not working") + } + } + + /** + * Prometheus endpoint. + */ + get("/metrics") { + call.respondTextWriter(status = HttpStatusCode.OK) { + @Suppress("BlockingMethodInNonBlockingContext") // sadly this is synchronous API + registry.scrape(this) + } + } +} diff --git a/src/main/kotlin/com/wire/bots/polls/services/ConversationService.kt b/src/main/kotlin/com/wire/bots/polls/services/ConversationService.kt index a1e0add..43076ff 100644 --- a/src/main/kotlin/com/wire/bots/polls/services/ConversationService.kt +++ b/src/main/kotlin/com/wire/bots/polls/services/ConversationService.kt @@ -1,6 +1,7 @@ package com.wire.bots.polls.services import com.wire.bots.polls.dto.roman.ConversationInformation +import com.wire.bots.polls.utils.appendPath import io.ktor.client.HttpClient import io.ktor.client.request.get import io.ktor.client.request.header @@ -16,17 +17,23 @@ class ConversationService(private val client: HttpClient, config: ProxyConfigura const val conversationPath = "/conversation" } - private val endpoint = config.baseUrl + conversationPath + private val endpoint = config.baseUrl appendPath conversationPath /** * Returns the number of members of conversation. */ - suspend fun getNumberOfConversationMembers(token: String): Int? { - val conversationInformation = client.get { - url(endpoint) - header("Authorization", "Bearer $token") - } - - return conversationInformation.members?.filter { it.service == null }?.size - } + suspend fun getNumberOfConversationMembers(token: String): Int? = + runCatching { + client.get { + url(endpoint) + header("Authorization", "Bearer $token") + } + }.onFailure { + logger.error(it) { "It was not possible to fetch conversation information!" } + }.onSuccess { + logger.debug { "Successfully got conversation information." } + }.getOrNull() + ?.members + ?.filter { it.service == null } + ?.size } diff --git a/src/main/kotlin/com/wire/bots/polls/services/MessagesHandlingService.kt b/src/main/kotlin/com/wire/bots/polls/services/MessagesHandlingService.kt index dff4c67..3a393cb 100644 --- a/src/main/kotlin/com/wire/bots/polls/services/MessagesHandlingService.kt +++ b/src/main/kotlin/com/wire/bots/polls/services/MessagesHandlingService.kt @@ -3,6 +3,7 @@ package com.wire.bots.polls.services import com.wire.bots.polls.dto.PollAction import com.wire.bots.polls.dto.UsersInput import com.wire.bots.polls.dto.roman.Message +import io.ktor.features.BadRequestException import mu.KLogging class MessagesHandlingService( @@ -14,10 +15,11 @@ class MessagesHandlingService( suspend fun handle(message: Message) { logger.debug { "Handling message." } + logger.trace { "Message: $message" } val handled = when (message.type) { - "conversation.bot_request" -> false.also { logger.info { "Bot was added to conversation." } } - "conversation.bot_removed" -> false.also { logger.info { "Bot was removed from the conversation." } } + "conversation.bot_request" -> false.also { logger.debug { "Bot was added to conversation." } } + "conversation.bot_removed" -> false.also { logger.debug { "Bot was removed from the conversation." } } else -> { logger.debug { "Handling type: ${message.type}" } when { @@ -69,20 +71,20 @@ class MessagesHandlingService( } }.onFailure { logger.error(it) { "Exception during handling the message: $message with token $token." } - }.getOrNull() ?: false + }.getOrThrow() } private suspend fun handleText(token: String, message: Message): Boolean { var handled = true fun ignore(reason: () -> String) { - logger.info(reason) + logger.debug(reason) handled = false } with(message) { when { - userId == null -> throw IllegalArgumentException("UserId must be set for text messages.") + userId == null -> throw BadRequestException("UserId must be set for text messages.") // it is a reply on something refMessageId != null && text != null -> when { // request for stats diff --git a/src/main/kotlin/com/wire/bots/polls/services/PollService.kt b/src/main/kotlin/com/wire/bots/polls/services/PollService.kt index f89298c..395fa6e 100644 --- a/src/main/kotlin/com/wire/bots/polls/services/PollService.kt +++ b/src/main/kotlin/com/wire/bots/polls/services/PollService.kt @@ -114,10 +114,14 @@ class PollService( * Sends statistics about the poll to the proxy. */ suspend fun sendStats(token: String, pollId: String, conversationMembers: Int? = null) { + logger.debug { "Sending stats for poll $pollId" } val conversationMembersCount = conversationMembers ?: conversationService.getNumberOfConversationMembers(token) .whenNull { logger.warn { "It was not possible to determine number of conversation members!" } } - val stats = statsFormattingService.formatStats(pollId, conversationMembersCount) ?: return + logger.debug { "Conversation members: $conversationMembersCount" } + val stats = statsFormattingService.formatStats(pollId, conversationMembersCount) + .whenNull { logger.warn { "It was not possible to format stats for poll $pollId" } } ?: return + GlobalScope.launch { proxySenderService.send(token, stats) } } @@ -125,6 +129,8 @@ class PollService( * Sends stats for latest poll. */ suspend fun sendStatsForLatest(token: String, botId: String) { + logger.debug { "Sending latest stats for bot $botId" } + val latest = repository.getLatestForBot(botId).whenNull { logger.info { "No polls found for bot $botId" } } ?: return diff --git a/src/main/kotlin/com/wire/bots/polls/services/ProxySenderService.kt b/src/main/kotlin/com/wire/bots/polls/services/ProxySenderService.kt index c90d222..746f0b5 100644 --- a/src/main/kotlin/com/wire/bots/polls/services/ProxySenderService.kt +++ b/src/main/kotlin/com/wire/bots/polls/services/ProxySenderService.kt @@ -1,8 +1,9 @@ package com.wire.bots.polls.services +import ai.blindspot.ktoolz.extensions.createJson import com.wire.bots.polls.dto.bot.BotMessage import com.wire.bots.polls.dto.roman.Response -import com.wire.bots.polls.utils.createJson +import com.wire.bots.polls.utils.appendPath import io.ktor.client.HttpClient import io.ktor.client.call.receive import io.ktor.client.request.header @@ -25,7 +26,7 @@ class ProxySenderService(private val client: HttpClient, config: ProxyConfigurat const val conversationPath = "/conversation" } - private val conversationEndpoint = config.baseUrl + conversationPath + private val conversationEndpoint = config.baseUrl appendPath conversationPath /** * Send given message with provided token. diff --git a/src/main/kotlin/com/wire/bots/polls/setup/ConfigurationDependencyInjection.kt b/src/main/kotlin/com/wire/bots/polls/setup/ConfigurationDependencyInjection.kt index 8de4bd0..a564441 100644 --- a/src/main/kotlin/com/wire/bots/polls/setup/ConfigurationDependencyInjection.kt +++ b/src/main/kotlin/com/wire/bots/polls/setup/ConfigurationDependencyInjection.kt @@ -4,24 +4,18 @@ import ai.blindspot.ktoolz.extensions.getEnv import ai.blindspot.ktoolz.extensions.whenNull import com.wire.bots.polls.dto.conf.DatabaseConfiguration import com.wire.bots.polls.services.ProxyConfiguration -import com.wire.bots.polls.setup.EnvConfigVariables.APP_KEY import com.wire.bots.polls.setup.EnvConfigVariables.DB_PASSWORD import com.wire.bots.polls.setup.EnvConfigVariables.DB_URL import com.wire.bots.polls.setup.EnvConfigVariables.DB_USER import com.wire.bots.polls.setup.EnvConfigVariables.PROXY_DOMAIN -import com.wire.bots.polls.setup.EnvConfigVariables.PROXY_WS_HOST -import com.wire.bots.polls.setup.EnvConfigVariables.PROXY_WS_PATH import com.wire.bots.polls.setup.EnvConfigVariables.SERVICE_TOKEN -import com.wire.bots.polls.setup.EnvConfigVariables.USE_WEB_SOCKETS -import com.wire.bots.polls.websockets.WebSocketConfig -import mu.KLogging +import com.wire.bots.polls.utils.createLogger import org.kodein.di.Kodein.MainBuilder import org.kodein.di.generic.bind -import org.kodein.di.generic.instance import org.kodein.di.generic.singleton import java.io.File -private val logger = KLogging().logger("EnvironmentLoaderLogger") +private val logger = createLogger("EnvironmentLoaderLogger") private fun getEnvOrLogDefault(env: String, defaultValue: String) = getEnv(env).whenNull { logger.warn { "Env variable $env not set! Using default value - $defaultValue" } @@ -55,27 +49,10 @@ fun MainBuilder.bindConfiguration() { getEnvOrLogDefault(SERVICE_TOKEN, "local-token") } - bind("app-key-websocket") with singleton { - getEnvOrLogDefault(APP_KEY, "") - } - bind("version") with singleton { loadVersion("development") } - bind("use-websocket") with singleton { - getEnvOrLogDefault(USE_WEB_SOCKETS, "false").toBoolean() - } - - bind() with singleton { - val appKey = instance("app-key-websocket") - - val host = getEnvOrLogDefault(PROXY_WS_HOST, "proxy.services.zinfra.io") - val path = getEnvOrLogDefault(PROXY_WS_PATH, "/await") - - WebSocketConfig(host = host, path = "$path/$appKey") - } - bind() with singleton { ProxyConfiguration(getEnvOrLogDefault(PROXY_DOMAIN, "http://proxy.services.zinfra.io")) } diff --git a/src/main/kotlin/com/wire/bots/polls/setup/DependencyInjection.kt b/src/main/kotlin/com/wire/bots/polls/setup/DependencyInjection.kt index b240c31..de1467b 100644 --- a/src/main/kotlin/com/wire/bots/polls/setup/DependencyInjection.kt +++ b/src/main/kotlin/com/wire/bots/polls/setup/DependencyInjection.kt @@ -11,51 +11,21 @@ import com.wire.bots.polls.services.PollService import com.wire.bots.polls.services.ProxySenderService import com.wire.bots.polls.services.StatsFormattingService import com.wire.bots.polls.services.UserCommunicationService -import com.wire.bots.polls.websockets.PollWebSocket +import com.wire.bots.polls.utils.createLogger import io.ktor.client.HttpClient -import io.ktor.client.engine.cio.CIO -import io.ktor.client.features.json.JacksonSerializer -import io.ktor.client.features.json.JsonFeature -import io.ktor.client.features.logging.LogLevel -import io.ktor.client.features.logging.Logger -import io.ktor.client.features.logging.Logging -import io.ktor.client.features.websocket.WebSockets -import io.ktor.util.KtorExperimentalAPI +import io.micrometer.prometheus.PrometheusConfig +import io.micrometer.prometheus.PrometheusMeterRegistry import mu.KLogger -import mu.KLogging import org.kodein.di.Kodein.MainBuilder import org.kodein.di.generic.bind import org.kodein.di.generic.instance import org.kodein.di.generic.singleton -@KtorExperimentalAPI fun MainBuilder.configureContainer() { bind() with singleton { PollValidation() } - bind() with singleton { - HttpClient(CIO) { - install(WebSockets) - - install(JsonFeature) { - serializer = JacksonSerializer() - } - - install(Logging) { - this.level - logger = Logger.DEBUG - level = LogLevel.ALL - } - } - } - - bind() with singleton { - PollWebSocket( - client = instance(), - config = instance(), - handler = instance() - ) - } + bind() with singleton { createHttpClient(instance()) } bind() with singleton { ProxySenderService( @@ -64,6 +34,14 @@ fun MainBuilder.configureContainer() { ) } + bind() with singleton { + PrometheusMeterRegistry(PrometheusConfig.DEFAULT).apply { + with(this.config()) { + commonTags("application", "poll-bot") + } + } + } + bind() with singleton { InputParser() } bind() with singleton { PollFactory(instance(), instance()) } @@ -84,6 +62,6 @@ fun MainBuilder.configureContainer() { bind() with singleton { StatsFormattingService(instance()) } - bind("routing-logger") with singleton { KLogging().logger("Routing") } - bind("install-logger") with singleton { KLogging().logger("KtorStartup") } + bind("routing-logger") with singleton { createLogger("Routing") } + bind("install-logger") with singleton { createLogger("KtorStartup") } } diff --git a/src/main/kotlin/com/wire/bots/polls/setup/EnvConfigVariables.kt b/src/main/kotlin/com/wire/bots/polls/setup/EnvConfigVariables.kt index 5a8ee7e..f9bee7d 100644 --- a/src/main/kotlin/com/wire/bots/polls/setup/EnvConfigVariables.kt +++ b/src/main/kotlin/com/wire/bots/polls/setup/EnvConfigVariables.kt @@ -27,26 +27,6 @@ object EnvConfigVariables { */ const val SERVICE_TOKEN = "SERVICE_TOKEN" - /** - * Key for connecting to the web socket of the proxy. - */ - const val APP_KEY = "APP_KEY" - - /** - * Determines whether to use web sockets for connection to proxy or not eg. true - */ - const val USE_WEB_SOCKETS = "USE_WEB_SOCKETS" - - /** - * Host name for the connection to web socket eg."proxy.services.zinfra.io" - */ - const val PROXY_WS_HOST = "PROXY_WS_HOST" - - /** - * Path to web socket at proxy eg. "/await" - */ - const val PROXY_WS_PATH = "PROXY_WS_PATH" - /** * Domain used for sending the messages from the bot to proxy eg. "https://proxy.services.zinfra.io" */ diff --git a/src/main/kotlin/com/wire/bots/polls/setup/HttpClient.kt b/src/main/kotlin/com/wire/bots/polls/setup/HttpClient.kt new file mode 100644 index 0000000..aa459b4 --- /dev/null +++ b/src/main/kotlin/com/wire/bots/polls/setup/HttpClient.kt @@ -0,0 +1,60 @@ +package com.wire.bots.polls.setup + +import com.wire.bots.polls.utils.createLogger +import com.wire.bots.polls.utils.httpCall +import io.ktor.client.HttpClient +import io.ktor.client.engine.apache.Apache +import io.ktor.client.features.json.JacksonSerializer +import io.ktor.client.features.json.JsonFeature +import io.ktor.client.features.logging.LogLevel +import io.ktor.client.features.logging.Logger +import io.ktor.client.features.logging.Logging +import io.ktor.client.features.observer.ResponseObserver +import io.micrometer.core.instrument.MeterRegistry + + +/** + * Prepares HTTP Client. + */ +fun createHttpClient(meterRegistry: MeterRegistry) = + HttpClient(Apache) { + install(JsonFeature) { + serializer = JacksonSerializer() + } + + // TODO check https://github.com/ktorio/ktor/issues/1813 + @Suppress("ConstantConditionIf") // temporary disabled until https://github.com/ktorio/ktor/issues/1813 is resolved + if (false) { + install(ResponseObserver) { + onResponse { + meterRegistry.httpCall(it) + } + } + } + + install(Logging) { + logger = Logger.TRACE + level = LogLevel.ALL + } + } + +/** + * Debug logger for HTTP Requests. + */ +private val Logger.Companion.DEBUG: Logger + get() = object : Logger, org.slf4j.Logger by createLogger("DebugHttpClient") { + override fun log(message: String) { + debug(message) + } + } + + +/** + * Trace logger for HTTP Requests. + */ +private val Logger.Companion.TRACE: Logger + get() = object : Logger, org.slf4j.Logger by createLogger("TraceHttpClient") { + override fun log(message: String) { + trace(message) + } + } diff --git a/src/main/kotlin/com/wire/bots/polls/setup/HttpDebug.kt b/src/main/kotlin/com/wire/bots/polls/setup/HttpDebug.kt deleted file mode 100644 index ad99ca5..0000000 --- a/src/main/kotlin/com/wire/bots/polls/setup/HttpDebug.kt +++ /dev/null @@ -1,17 +0,0 @@ -package com.wire.bots.polls.setup - -import io.ktor.client.HttpClient -import io.ktor.client.features.logging.Logger -import org.slf4j.LoggerFactory - -/** - * Debug logger for HTTP requests. - */ -val Logger.Companion.DEBUG: Logger - get() = object : Logger { - private val delegate = LoggerFactory.getLogger(HttpClient::class.java)!! - override fun log(message: String) { - delegate.debug(message) - } - } - diff --git a/src/main/kotlin/com/wire/bots/polls/setup/KodeinSetup.kt b/src/main/kotlin/com/wire/bots/polls/setup/KodeinSetup.kt index a98b7d8..4cad3fa 100644 --- a/src/main/kotlin/com/wire/bots/polls/setup/KodeinSetup.kt +++ b/src/main/kotlin/com/wire/bots/polls/setup/KodeinSetup.kt @@ -1,13 +1,11 @@ package com.wire.bots.polls.setup import io.ktor.application.Application -import io.ktor.util.KtorExperimentalAPI import org.kodein.di.ktor.kodein /** * Inits and sets up DI container. */ -@KtorExperimentalAPI fun Application.setupKodein() { kodein { bindConfiguration() diff --git a/src/main/kotlin/com/wire/bots/polls/setup/KtorInstallation.kt b/src/main/kotlin/com/wire/bots/polls/setup/KtorInstallation.kt index 95470d5..8593383 100644 --- a/src/main/kotlin/com/wire/bots/polls/setup/KtorInstallation.kt +++ b/src/main/kotlin/com/wire/bots/polls/setup/KtorInstallation.kt @@ -4,91 +4,85 @@ import com.fasterxml.jackson.databind.SerializationFeature import com.wire.bots.polls.dao.DatabaseSetup import com.wire.bots.polls.dto.conf.DatabaseConfiguration import com.wire.bots.polls.routing.registerRoutes -import com.wire.bots.polls.websockets.subscribeToWebSockets +import com.wire.bots.polls.setup.errors.registerExceptionHandlers +import com.wire.bots.polls.utils.createLogger import io.ktor.application.Application import io.ktor.application.install import io.ktor.features.CallLogging import io.ktor.features.ContentNegotiation import io.ktor.features.DefaultHeaders -import io.ktor.http.cio.websocket.pingPeriod -import io.ktor.http.cio.websocket.timeout import io.ktor.jackson.jackson +import io.ktor.metrics.micrometer.MicrometerMetrics +import io.ktor.request.path import io.ktor.routing.routing -import io.ktor.util.KtorExperimentalAPI -import io.ktor.websocket.WebSockets -import mu.KLogger +import io.micrometer.core.instrument.distribution.DistributionStatisticConfig +import io.micrometer.prometheus.PrometheusMeterRegistry import org.flywaydb.core.Flyway import org.kodein.di.LazyKodein import org.kodein.di.generic.instance import org.kodein.di.ktor.kodein +import org.slf4j.event.Level import java.text.DateFormat -import java.time.Duration +private val installationLogger = createLogger("ApplicationSetup") + /** * Loads the application. */ -@KtorExperimentalAPI fun Application.init() { setupKodein() // now kodein is running and can be used val k by kodein() - val logger by k.instance("install-logger") - logger.debug { "DI container started." } + installationLogger.debug { "DI container started." } // connect to the database - connectDatabase(logger, k) + connectDatabase(k) // configure Ktor - installFrameworks() + installFrameworks(k) // register routing routing { registerRoutes() } - - // determine whether should bot connect to the proxy web socket - val useWebSockets by k.instance("use-websocket") - if (useWebSockets) { - subscribeToWebSockets() - } } /** * Connect bot to the database. */ -fun connectDatabase(logger: KLogger, k: LazyKodein) { - logger.info { "Connecting to the DB" } +fun connectDatabase(k: LazyKodein) { + installationLogger.info { "Connecting to the DB" } val dbConfig by k.instance() DatabaseSetup.connect(dbConfig) if (DatabaseSetup.isConnected()) { - logger.info { "DB connected." } - migrateDatabase(logger, dbConfig) + installationLogger.info { "DB connected." } + migrateDatabase(dbConfig) } else { // TODO verify handling, maybe exit the App? - logger.error { "It was not possible to connect to db database! The application will start but it won't work." } + installationLogger.error { "It was not possible to connect to db database! The application will start but it won't work." } } } /** * Migrate database using flyway. */ -fun migrateDatabase(logger: KLogger, dbConfig: DatabaseConfiguration) { - logger.info { "Migrating database." } +fun migrateDatabase(dbConfig: DatabaseConfiguration) { + installationLogger.info { "Migrating database." } val migrationsCount = Flyway .configure() .dataSource(dbConfig.url, dbConfig.userName, dbConfig.password) .load() .migrate() - logger.info { if (migrationsCount == 0) "No migrations necessary." else "Applied $migrationsCount migrations." } + installationLogger.info { if (migrationsCount == 0) "No migrations necessary." else "Applied $migrationsCount migrations." } } /** * Configure Ktor and install necessary extensions. */ -fun Application.installFrameworks() { +fun Application.installFrameworks(k: LazyKodein) { install(ContentNegotiation) { jackson { // enable pretty print for JSONs @@ -98,15 +92,26 @@ fun Application.installFrameworks() { } install(DefaultHeaders) - install(CallLogging) - - install(WebSockets) { - // enable ping - to keep the connection alive - pingPeriod = Duration.ofSeconds(30) - timeout = Duration.ofSeconds(15) - // disabled (max value) - the connection will be closed if surpassed this length. - maxFrameSize = Long.MAX_VALUE - masking = false + install(CallLogging) { + level = Level.TRACE + logger = createLogger("EndpointLogger") + + filter { call -> call.request.path().startsWith("/messages") } } + configurePrometheus(k) + registerExceptionHandlers(k) +} + +/** + * Install prometheus. + */ +fun Application.configurePrometheus(k: LazyKodein) { + val prometheusRegistry by k.instance() + install(MicrometerMetrics) { + registry = prometheusRegistry + distributionStatisticConfig = DistributionStatisticConfig.Builder() + .percentilesHistogram(true) + .build() + } } diff --git a/src/main/kotlin/com/wire/bots/polls/setup/errors/ExceptionHandling.kt b/src/main/kotlin/com/wire/bots/polls/setup/errors/ExceptionHandling.kt new file mode 100644 index 0000000..5e66868 --- /dev/null +++ b/src/main/kotlin/com/wire/bots/polls/setup/errors/ExceptionHandling.kt @@ -0,0 +1,41 @@ +package com.wire.bots.polls.setup.errors + +import com.wire.bots.polls.utils.countException +import com.wire.bots.polls.utils.createLogger +import io.ktor.application.Application +import io.ktor.application.ApplicationCall +import io.ktor.application.call +import io.ktor.application.install +import io.ktor.features.StatusPages +import io.ktor.http.HttpStatusCode +import io.ktor.response.respond +import io.micrometer.prometheus.PrometheusMeterRegistry +import org.kodein.di.LazyKodein +import org.kodein.di.generic.instance + +private val logger = createLogger("ExceptionHandler") + +/** + * Registers exception handling. + */ +fun Application.registerExceptionHandlers(k: LazyKodein) { + val registry by k.instance() + + install(StatusPages) { + exception { cause -> + logger.error(cause) { "Exception occurred in the application: ${cause.message}" } + call.errorResponse(HttpStatusCode.InternalServerError, cause.message) + registry.countException(cause) + } + + exception { cause -> + logger.error(cause) { "Error in communication with Roman. Status: ${cause.status}, body: ${cause.body}." } + call.errorResponse(HttpStatusCode.ServiceUnavailable, cause.message) + registry.countException(cause) + } + } +} + +suspend inline fun ApplicationCall.errorResponse(statusCode: HttpStatusCode, message: String?) { + respond(status = statusCode, message = mapOf("message" to (message ?: "No details specified"))) +} diff --git a/src/main/kotlin/com/wire/bots/polls/setup/errors/Exceptions.kt b/src/main/kotlin/com/wire/bots/polls/setup/errors/Exceptions.kt new file mode 100644 index 0000000..ec3d261 --- /dev/null +++ b/src/main/kotlin/com/wire/bots/polls/setup/errors/Exceptions.kt @@ -0,0 +1,9 @@ +package com.wire.bots.polls.setup.errors + +import io.ktor.http.HttpStatusCode + +data class RomanUnavailableException( + val status: HttpStatusCode, + val body: String, + override val message: String = "Error in communication with Roman." +) : Exception(message) diff --git a/src/main/kotlin/com/wire/bots/polls/utils/Extensions.kt b/src/main/kotlin/com/wire/bots/polls/utils/Extensions.kt new file mode 100644 index 0000000..cd9f880 --- /dev/null +++ b/src/main/kotlin/com/wire/bots/polls/utils/Extensions.kt @@ -0,0 +1,13 @@ +package com.wire.bots.polls.utils + +import mu.KLogging + +/** + * Creates URL from [this] as base and [path] as path + */ +infix fun String.appendPath(path: String) = "${dropLastWhile { it == '/' }}/${path.dropWhile { it == '/' }}" + +/** + * Creates logger with given name. + */ +fun createLogger(name: String) = KLogging().logger("com.wire.$name") diff --git a/src/main/kotlin/com/wire/bots/polls/utils/JsonExtensions.kt b/src/main/kotlin/com/wire/bots/polls/utils/JsonExtensions.kt deleted file mode 100644 index 5480284..0000000 --- a/src/main/kotlin/com/wire/bots/polls/utils/JsonExtensions.kt +++ /dev/null @@ -1,57 +0,0 @@ -@file:Suppress("unused") // might be handy in the future - -package com.wire.bots.polls.utils - -import ai.blindspot.ktoolz.extensions.newLine -import com.fasterxml.jackson.databind.DeserializationFeature -import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper -import com.fasterxml.jackson.module.kotlin.readValue -import mu.KLogging - -/** - * Logger for this file. - */ -@PublishedApi -internal val jsonLogger = KLogging().logger("JsonExtensions") - -/** - * Standard [ObjectMapper] configured in a way the platform operates. - */ -fun jacksonMapper(): ObjectMapper = jacksonObjectMapper().apply { - configure(DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES, false) - configure(DeserializationFeature.ACCEPT_FLOAT_AS_INT, false) -} - -/** - * Tries to create instance of T from provided [json], null is returned when it is not possible to parse it. - */ -inline fun parseJson(json: String): T? = - runCatching { jacksonMapper().readValue(json) } - .onFailure { jsonLogger.warn(it) { "Exception raised during JSON parsing:$newLine$json" } } - .getOrNull() - -/** - * Tries to create instance of T from provided [json], null is returned when it is not possible to parse it. - */ -inline fun parseJson(json: ByteArray): T? = - runCatching { jacksonMapper().readValue(json) } - .onFailure { jsonLogger.warn(it) { "Exception raised during JSON parsing:$newLine$json" } } - .getOrNull() - -/** - * Serializes given object to string. - */ -fun createJson(value: T): String = jacksonMapper().writeValueAsString(value) - -/** - * Serializes given object to byte array. - */ -fun createJsonBytes(value: T): ByteArray = jacksonMapper().writeValueAsBytes(value) - -/** - * Pretty print a json. - */ -fun prettyPrintJson(json: String): String = with(jacksonMapper()) { - writerWithDefaultPrettyPrinter().writeValueAsString(readValue(json)) -} diff --git a/src/main/kotlin/com/wire/bots/polls/utils/PrometheusExtensions.kt b/src/main/kotlin/com/wire/bots/polls/utils/PrometheusExtensions.kt new file mode 100644 index 0000000..6cbe45a --- /dev/null +++ b/src/main/kotlin/com/wire/bots/polls/utils/PrometheusExtensions.kt @@ -0,0 +1,50 @@ +package com.wire.bots.polls.utils + +import io.ktor.client.statement.HttpResponse +import io.ktor.client.statement.request +import io.micrometer.core.instrument.MeterRegistry +import io.micrometer.core.instrument.Tag +import java.util.concurrent.TimeUnit + +/** + * Registers exception in the prometheus metrics. + */ +fun MeterRegistry.countException(exception: Throwable, additionalTags: Map = emptyMap()) { + val baseTags = mapOf( + "type" to exception.javaClass.name, + "message" to (exception.message ?: "No message.") + ) + val tags = (baseTags + additionalTags).toTags() + counter("exceptions", tags).increment() +} + +/** + * Register http call. + */ +fun MeterRegistry.httpCall(response: HttpResponse) { + val startTime = response.requestTime.timestamp + val endTime = response.responseTime.timestamp + + val duration = endTime - startTime + val tags = mapOf( + "method" to response.request.method.value, + "url" to response.request.url.toString(), + "response_code" to response.status.value.toString() + ).toTags() + + timer("http_calls", tags).record(duration, TimeUnit.MILLISECONDS) +} + +/** + * Convert map to the logging tags. + */ +private fun Map.toTags() = + map { (key, value) -> Tag(key, value) } + +/** + * Because original implementation is not handy. + */ +private data class Tag(private val k: String, private val v: String) : Tag { + override fun getKey(): String = k + override fun getValue(): String = v +} diff --git a/src/main/kotlin/com/wire/bots/polls/websockets/PollWebSocket.kt b/src/main/kotlin/com/wire/bots/polls/websockets/PollWebSocket.kt deleted file mode 100644 index 0dac846..0000000 --- a/src/main/kotlin/com/wire/bots/polls/websockets/PollWebSocket.kt +++ /dev/null @@ -1,21 +0,0 @@ -package com.wire.bots.polls.websockets - -import com.wire.bots.polls.dto.roman.Message -import com.wire.bots.polls.services.MessagesHandlingService -import io.ktor.client.HttpClient -import mu.KLogging - -/** - * Class which contains logic for receiving JSON formatted request from the given [WebSocketConfig]. - */ -class PollWebSocket(private val client: HttpClient, private val config: WebSocketConfig, private val handler: MessagesHandlingService) { - - private companion object : KLogging() - - /** - * Starts listening on the web socket. - */ - suspend fun run() = client.createJsonWebSocketReceiver(config) { _, message -> - handler.handle(message) - }.subscribe() -} diff --git a/src/main/kotlin/com/wire/bots/polls/websockets/WebSocketBase.kt b/src/main/kotlin/com/wire/bots/polls/websockets/WebSocketBase.kt deleted file mode 100644 index 6b493ec..0000000 --- a/src/main/kotlin/com/wire/bots/polls/websockets/WebSocketBase.kt +++ /dev/null @@ -1,48 +0,0 @@ -package com.wire.bots.polls.websockets - -import io.ktor.client.HttpClient -import io.ktor.client.features.websocket.DefaultClientWebSocketSession -import io.ktor.client.features.websocket.ws -import io.ktor.http.DEFAULT_PORT -import io.ktor.http.cio.websocket.Frame -import mu.KLogging - -/** - * Base class for web socket connections. - */ -abstract class WebSocketBase(private val client: HttpClient, private val config: WebSocketConfig) { - - private companion object : KLogging() - - /** - * Opens web socket and starts receiving connections. [keepAlive] determines whether the app should try to reconnect when the - * connection is closed. - */ - tailrec suspend fun subscribe(keepAlive: Boolean = true) { - // TODO websocket reconnect - use better solution than dummy while true - runCatching { - client.ws( - host = config.host, - port = config.port ?: DEFAULT_PORT, - path = config.path - ) { - for (frame in incoming) { - logger.debug { "WS frame received." } - runCatching { onFrameReceived(frame) } - .onFailure { logger.error(it) { "Exception occurred during handling onFrameReceived." } } - - } - logger.info { "Closing the socket" } - } - }.onFailure { - logger.error(it) { "Exception occurred while receiving web sockets. Keep Alive - $keepAlive" } - } - // use tail recursion if the bot should keep the connection opened - if (keepAlive) subscribe(keepAlive) - } - - /** - * Method called when frame is received. - */ - abstract suspend fun DefaultClientWebSocketSession.onFrameReceived(frame: Frame) -} diff --git a/src/main/kotlin/com/wire/bots/polls/websockets/WebSocketConfig.kt b/src/main/kotlin/com/wire/bots/polls/websockets/WebSocketConfig.kt deleted file mode 100644 index 255eb49..0000000 --- a/src/main/kotlin/com/wire/bots/polls/websockets/WebSocketConfig.kt +++ /dev/null @@ -1,19 +0,0 @@ -package com.wire.bots.polls.websockets - -/** - * Configuration for [PollWebSocket] class. - */ -data class WebSocketConfig( - /** - * Host address - ie. proxy.services.zinfra.io - */ - val host: String, - /** - * Path to socket - ie. /await/ws - */ - val path: String, - /** - * Port, if null, default is used. - */ - val port: Int? = null -) diff --git a/src/main/kotlin/com/wire/bots/polls/websockets/WebSocketJsonReceiver.kt b/src/main/kotlin/com/wire/bots/polls/websockets/WebSocketJsonReceiver.kt deleted file mode 100644 index f841040..0000000 --- a/src/main/kotlin/com/wire/bots/polls/websockets/WebSocketJsonReceiver.kt +++ /dev/null @@ -1,59 +0,0 @@ -package com.wire.bots.polls.websockets - -import ai.blindspot.ktoolz.extensions.whenNull -import com.wire.bots.polls.utils.jacksonMapper -import io.ktor.client.HttpClient -import io.ktor.client.features.websocket.DefaultClientWebSocketSession -import io.ktor.http.cio.websocket.Frame -import io.ktor.http.cio.websocket.readText -import mu.KLogging -import kotlin.reflect.KClass - -/** - * Class which uses JSON parser for [Frame.Text]. - */ -open class WebSocketJsonReceiver( - client: HttpClient, - config: WebSocketConfig, - private val clazz: KClass, - private val onJsonReceived: (suspend (DefaultClientWebSocketSession, T) -> Unit)? -) : WebSocketBase(client, config) { - - private companion object : KLogging() - - /** - * Default implementation of on received which parses the JSON from [frame]. - * - * Note that there is no error handling and parse exceptions will be propagated. - */ - override suspend fun DefaultClientWebSocketSession.onFrameReceived(frame: Frame) { - when (frame) { - is Frame.Text -> { - logger.debug { "Text frame received." } - val text = frame.readText() - // TODO remove this when going to prod as it prints users data to the log - logger.info { "Received text:\n$text" } - - @Suppress("BlockingMethodInNonBlockingContext") // because sadly jackson does not have async read - jacksonMapper().readValue(text, clazz.java) - .whenNull { logger.error { "It was not possible to parse incoming message!" } } - ?.let { onJsonReceived(it) } - } - else -> logger.debug { "Received non-text frame, not processing it." } - } - } - - /** - * Default implementation for the handling of received JSONs. [onJsonReceived] is used when provided. - */ - open suspend fun DefaultClientWebSocketSession.onJsonReceived(payload: T) = - onJsonReceived?.invoke(this, payload) ?: logger.warn { "No action specified, skipping." } -} - -/** - * Create instance of [WebSocketJsonReceiver] with [onJsonReceived] for handling the JSONs. - */ -inline fun HttpClient.createJsonWebSocketReceiver( - config: WebSocketConfig, - noinline onJsonReceived: suspend (DefaultClientWebSocketSession, T) -> Unit -) = WebSocketJsonReceiver(this, config, T::class, onJsonReceived) diff --git a/src/main/kotlin/com/wire/bots/polls/websockets/WebsocketRegistrations.kt b/src/main/kotlin/com/wire/bots/polls/websockets/WebsocketRegistrations.kt deleted file mode 100644 index 7a9042d..0000000 --- a/src/main/kotlin/com/wire/bots/polls/websockets/WebsocketRegistrations.kt +++ /dev/null @@ -1,20 +0,0 @@ -package com.wire.bots.polls.websockets - -import io.ktor.application.Application -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.GlobalScope -import kotlinx.coroutines.launch -import org.kodein.di.generic.instance -import org.kodein.di.ktor.kodein - -/** - * Start listening the preconfigured web sockets. - */ -fun Application.subscribeToWebSockets() { - val k by kodein() - - GlobalScope.launch(Dispatchers.IO) { - val pollWebSocket by k.instance() - pollWebSocket.run() - } -}