diff --git a/README.md b/README.md index 82b4bee..e5e2db4 100644 --- a/README.md +++ b/README.md @@ -58,10 +58,7 @@ the [`local-config.yaml`][config] can be used to set the variables. | `GIT_COMMITTER_NAME` | | Name of the user used to create the Git commits. | | `GIT_COMMITTER_EMAIL` | | E-Mail of the user used to create the Git commits. | | | | | -| `KAFKA_USERNAME` | | Leave ALL of the following `KAFKA_` fields empty to skip the Kafka integration. | -| `KAFKA_PASSWORD` | | Leave ALL of the following `KAFKA_` fields empty to skip the Kafka integration. | -| `KAFKA_TOPIC` | | | -| `KAFKA_SEED_BROKERS` | | A comma separated list of Kafka brokers, e.g. first-kafka-broker.domain.com:9092,second-kafka-broker.domain.com:9092 | +| `KAFKA_TOPICS_CONFIG` | | A Json configuration for a Kafka Topic to publish updates to. Leave empty to skip the Kafka integration. See below for details and an example. | | `KAFKA_GROUP_ID_OVERRIDE` | | Override the kafka group id for local development to avoid creating lots of consumer groups. If unset, derived from local IP address so each k8s pod gets their own group. | | | | | | `AUTH_OIDC_KEY_SET_URL` | | URL to the [OpenID Connect Keyset][openid] for validating JWTs. See [authentication](#authentication) for more details. | @@ -90,6 +87,9 @@ the [`local-config.yaml`][config] can be used to set the variables. | `REPOSITORY_KEY_SEPARATOR` | `.` | Single character used to separate repository name from repository type. repository name and repository type must not contain separator. | | | | | | `ALLOWED_FILE_CATEGORIES` | | List of allowed keys for the filecategory field in repositories. Parsed as a json array, example value: `["key1","key2"]`. All keys not in this list are rejected on writes, and silently dropped when reading. | +| | | | +| `REDIS_URL` | | Url to an optional Redis instance to use as a shared cache. Will use in-memory cache if left blank | +| `REDIS_PASSWORD` | | Password for the Redis instance. Can be read from Vault via `VAULT_SECRETS_CONFIG` | ## Datastore @@ -187,6 +187,41 @@ _If you are a client subscribing to our Kafka update notifications, and you want state following an update notification, you must compare the commit hash and timestamp to see if you got the correct version. If not, wait a bit and try again, you landed on an instance that isn't consistent yet._ +### Kafka configuration + +If you wish to use a Kafka topic, set the environment variable `KAFKA_TOPICS_CONFIG` to a JSON document +as follows (displayed in prettyprinted form for readability): + +``` +{ + "metadata-change-events": { + "topic": "metadata-change-events", + "brokers": [ + "kafka-seed-broker1.example.com:9092", + "kafka-seed-broker2.example.com:9092" + ], + "username": "", + "passwordEnvVar": "METADATA_CHANGE_EVENTS_PASSWORD", + "authType": "PLAIN" + } +} +``` + +This assumes of course that the password is provided in the specified environment variable. On Localhost, you +can simply set "password" in the JSON. + +AuthType sets the SASL authentication method, possible values are `PLAIN`, `SCRAM-SHA-256`, `SCRAM-SHA-512`. + +Note: You can use the Vault integration configuration to read the password from Vault by including it in +`VAULT_SECRETS_CONFIG`, similar to: + +``` + [...] + "some/vault/path": [ + {"vaultKey": "METADATA_CHANGE_EVENTS_PASSWORD"} + ], +``` + ## architecture ![software architecture](docs/architecture-export.png) @@ -241,8 +276,7 @@ Clear the test cache: ### Goland terminal configuration Goland has the annoying habit of limiting line width on the output terminal to 80 characters no matter how wide the -window is. -You can fix this. Menu: Help -> Find Action... -> search for "Registry" +window is. You can fix this. Menu: Help -> Find Action... -> search for "Registry" Uncheck `go.run.processes.with.pty`. diff --git a/go.mod b/go.mod index 84a7b15..c085cf2 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/Interhyp/metadata-service -go 1.21 +go 1.21.4 // exclude actually unused dependencies (mostly of pact-go, which is testing only anyway) // because our scanner fails to understand they are not in use @@ -52,6 +52,9 @@ exclude ( ) require ( + github.com/IBM/sarama v1.42.1 + github.com/Roshick/go-autumn-kafka v0.4.0 + github.com/Roshick/go-autumn-synchronisation v0.2.0 github.com/StephanHCB/go-autumn-config-api v0.2.1 github.com/StephanHCB/go-autumn-config-env v0.2.2 github.com/StephanHCB/go-autumn-logging v0.3.0 @@ -69,10 +72,10 @@ require ( github.com/lestrrat-go/jwx/v2 v2.0.17 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.17.0 + github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 github.com/robfig/cron/v3 v3.0.1 github.com/rs/zerolog v1.31.0 github.com/stretchr/testify v1.8.4 - github.com/twmb/franz-go v1.15.2 gopkg.in/yaml.v3 v3.0.1 ) @@ -90,6 +93,10 @@ require ( github.com/cyphar/filepath-securejoin v0.2.4 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/eapache/go-resiliency v1.4.0 // indirect + github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect + github.com/eapache/queue v1.1.0 // indirect github.com/elastic/go-sysinfo v1.7.1 // indirect github.com/elastic/go-windows v1.0.1 // indirect github.com/emirpasic/gods v1.18.1 // indirect @@ -98,7 +105,16 @@ require ( github.com/golang-jwt/jwt/v4 v4.5.0 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.3 // indirect + github.com/golang/snappy v0.0.4 // indirect + github.com/hashicorp/errwrap v1.0.0 // indirect + github.com/hashicorp/go-multierror v1.1.1 // indirect + github.com/hashicorp/go-uuid v1.0.3 // indirect github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect + github.com/jcmturner/aescts/v2 v2.0.0 // indirect + github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect + github.com/jcmturner/gofork v1.7.6 // indirect + github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect + github.com/jcmturner/rpc/v2 v2.0.3 // indirect github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 // indirect github.com/kevinburke/ssh_config v1.2.0 // indirect github.com/klauspost/compress v1.16.7 // indirect @@ -116,6 +132,7 @@ require ( github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 // indirect github.com/prometheus/common v0.44.0 // indirect github.com/prometheus/procfs v0.11.1 // indirect + github.com/redis/go-redis/v9 v9.3.0 // indirect github.com/segmentio/asm v1.2.0 // indirect github.com/sergi/go-diff v1.1.0 // indirect github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749 // indirect @@ -123,16 +140,19 @@ require ( github.com/skeema/knownhosts v1.2.0 // indirect github.com/sony/gobreaker v0.5.0 // indirect github.com/tidwall/tinylru v1.2.1 // indirect - github.com/twmb/franz-go/pkg/kmsg v1.7.0 // indirect github.com/xanzy/ssh-agent v0.3.3 // indirect + github.com/xdg-go/pbkdf2 v1.0.0 // indirect + github.com/xdg-go/scram v1.1.2 // indirect + github.com/xdg-go/stringprep v1.0.4 // indirect go.elastic.co/apm/module/apmchiv5/v2 v2.4.5 // indirect go.elastic.co/apm/module/apmhttp/v2 v2.4.5 // indirect go.elastic.co/apm/v2 v2.4.5 // indirect go.elastic.co/fastjson v1.1.0 // indirect golang.org/x/crypto v0.15.0 // indirect golang.org/x/mod v0.12.0 // indirect - golang.org/x/net v0.17.0 // indirect + golang.org/x/net v0.18.0 // indirect golang.org/x/sys v0.14.0 // indirect + golang.org/x/text v0.14.0 // indirect golang.org/x/tools v0.13.0 // indirect google.golang.org/protobuf v1.31.0 // indirect gopkg.in/warnings.v0 v0.1.2 // indirect diff --git a/go.sum b/go.sum index c58cb2e..eefb812 100644 --- a/go.sum +++ b/go.sum @@ -1,10 +1,16 @@ dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk= dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk= +github.com/IBM/sarama v1.42.1 h1:wugyWa15TDEHh2kvq2gAy1IHLjEjuYOYgXz/ruC/OSQ= +github.com/IBM/sarama v1.42.1/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ= github.com/Microsoft/go-winio v0.5.2/go.mod h1:WpS1mjBmmwHBEWmogvA2mj8546UReBk4v8QkMxJ6pZY= github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migciow= github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5+sAH+4kjUM= github.com/ProtonMail/go-crypto v0.0.0-20230828082145-3c4c8a2d2371 h1:kkhsdkhsCvIsutKu5zLMgWtgh9YxGCNAw8Ad8hjwfYg= github.com/ProtonMail/go-crypto v0.0.0-20230828082145-3c4c8a2d2371/go.mod h1:EjAoLdwvbIOoOQr3ihjnSoLZRtE8azugULFRteWMNc0= +github.com/Roshick/go-autumn-kafka v0.4.0 h1:nF7rr5EyThsQwgUVwnGqUC+fWKvx1Gv4kJDg3+HwXCQ= +github.com/Roshick/go-autumn-kafka v0.4.0/go.mod h1:xPnFTzE4ANIwJnu9+l+8fiy2ORhXXGB6JwQUbu2yUoo= +github.com/Roshick/go-autumn-synchronisation v0.2.0 h1:K+uKM33SWWrBaJcCbVQ3kySmcHacXOPdTbJsaXDkoZ4= +github.com/Roshick/go-autumn-synchronisation v0.2.0/go.mod h1:Escd6Ri5m/ppyFilE2C9DcVB3A6EzqO7JOFz8p8WThQ= github.com/StephanHCB/go-autumn-acorn-registry v0.3.1 h1:rAJlEsrSTJArQZHOt4Q6Gkc4NgL2ObSQGvxW0chiRiM= github.com/StephanHCB/go-autumn-acorn-registry v0.3.1/go.mod h1:KB7wPWOEy2n8VGNw75H4w7wBSWSrgwNNJNmet/F+9RI= github.com/StephanHCB/go-autumn-config-api v0.2.1 h1:t2EeTsdFpLM2xH2T7QFQtbFYI8hG5I9S+Q2o3KT6mlk= @@ -39,6 +45,10 @@ github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPd github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/bwesterb/go-ristretto v1.2.3/go.mod h1:fUIoIZaG73pV5biE2Blr2xEzDoMj7NFEuV9ekS419A0= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= @@ -53,6 +63,14 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/decred/dcrd/crypto/blake256 v1.0.1/go.mod h1:2OfgNZ5wDpcsFmHmCK5gZTPcCXqlm2ArzUIkw9czNJo= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 h1:8UrgZ3GkP4i/CLijOJx79Yu+etlyjdBU4sfcs2WYQMs= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/eapache/go-resiliency v1.4.0 h1:3OK9bWpPk5q6pbFAaYSEwD9CLUSHG8bnZuqX2yMt3B0= +github.com/eapache/go-resiliency v1.4.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= +github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws= +github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0= +github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/elastic/go-sysinfo v1.7.1 h1:Wx4DSARcKLllpKT2TnFVdSUJOsybqMYCNQZq1/wO+s0= github.com/elastic/go-sysinfo v1.7.1/go.mod h1:i1ZYdU10oLNfRzq4vq62BEwD2fH8KaWh6eh0ikPT9F0= github.com/elastic/go-windows v1.0.0/go.mod h1:TsU0Nrp7/y3+VwE82FoZF8gC/XFg/Elz6CcloAxnPgU= @@ -62,6 +80,8 @@ github.com/elazarl/goproxy v0.0.0-20230808193330-2592e75ae04a h1:mATvB/9r/3gvcej github.com/elazarl/goproxy v0.0.0-20230808193330-2592e75ae04a/go.mod h1:Ro8st/ElPeALwNFlcTpWmkr6IoMFfkjXAvTHpevnDsM= github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc= github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/gliderlabs/ssh v0.3.5 h1:OcaySEmAQJgyYcArR+gGGTHCyE7nvhEMTlYY+Dp8CpY= github.com/gliderlabs/ssh v0.3.5/go.mod h1:8XB4KraRrX39qHhT6yxPsHedjA08I/uBVwj4xC+/+z4= github.com/go-chi/chi/v5 v5.0.10 h1:rLz5avzKpjqxrYwXNfmjkrYYXOyLJd37pz53UFHC6vk= @@ -87,11 +107,34 @@ github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= +github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= +github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= +github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8= +github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 h1:BQSFePA1RWJOlocH6Fxy8MmwDt+yVQYULKfN0RoTN8A= github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99/go.mod h1:1lJo3i6rXxKeerYnT8Nvf0QmHCRC1n8sfWVwXF2Frvo= +github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= +github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= +github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= +github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM= +github.com/jcmturner/gofork v1.7.6 h1:QH0l3hzAU1tfT3rZCnW5zXl+orbkNMMRGJfdJjHVETg= +github.com/jcmturner/gofork v1.7.6/go.mod h1:1622LH6i/EZqLloHfE7IeZ0uEJwMSUyQ/nDd82IeqRo= +github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o= +github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= +github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh687T8= +github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs= +github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= +github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 h1:rp+c0RAYOWj8l6qbCUTSiRLG/iKnW3K3/QfPPuSsBt4= github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901/go.mod h1:Z86h9688Y0wesXCyonoVr47MasHilkuLMqGhRZ4Hpak= @@ -151,6 +194,10 @@ github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO github.com/prometheus/procfs v0.0.0-20190425082905-87a4384529e0/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/procfs v0.11.1 h1:xRC8Iq1yyca5ypa9n1EZnWZkt7dwcoRPQwX/5gwaUuI= github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/y0/p/ScXhY= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/redis/go-redis/v9 v9.3.0 h1:RiVDjmig62jIWp7Kk4XVLs0hzV6pI3PyTnnL0cnn0u0= +github.com/redis/go-redis/v9 v9.3.0/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= @@ -187,12 +234,14 @@ github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcU github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/tidwall/tinylru v1.2.1 h1:VgBr72c2IEr+V+pCdkPZUwiQ0KJknnWIYbhxAVkYfQk= github.com/tidwall/tinylru v1.2.1/go.mod h1:9bQnEduwB6inr2Y7AkBP7JPgCkyrhTV/ZpX0oOOpBI4= -github.com/twmb/franz-go v1.15.2 h1:mt3i7bTAp4GH/kMJiGAikJQUlG+UsCwxCmEy1CcAKYo= -github.com/twmb/franz-go v1.15.2/go.mod h1:aos+d/UBuigWkOs+6WoqEPto47EvC2jipLAO5qrAu48= -github.com/twmb/franz-go/pkg/kmsg v1.7.0 h1:a457IbvezYfA5UkiBvyV3zj0Is3y1i8EJgqjJYoij2E= -github.com/twmb/franz-go/pkg/kmsg v1.7.0/go.mod h1:se9Mjdt0Nwzc9lnjJ0HyDtLyBnaBDAd7pCje47OhSyw= github.com/xanzy/ssh-agent v0.3.3 h1:+/15pJfg/RsTxqYcX6fHqOXZwwMP+2VyYWJeWM2qQFM= github.com/xanzy/ssh-agent v0.3.3/go.mod h1:6dzNDKs0J9rVPHPhaGCukekBHKqfl+L3KghI1Bc68Uw= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= +github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= +github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= go.elastic.co/apm/module/apmchiv5/v2 v2.4.5 h1:1HCImhPw5E0Zvv1Bl2jArWcTe5750fxw3TjoJ7l+qe0= @@ -207,6 +256,7 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.3.1-0.20221117191849-2c476679df9a/go.mod h1:hebNnKkNXi2UzZN1eVRvBB7co0a+JxK6XbPiWVs/3J4= +golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU= golang.org/x/crypto v0.15.0 h1:frVn1TEaCEaZcn3Tmd7Y2b5KKPaZ+I32Q2OA3kYp5TA= golang.org/x/crypto v0.15.0/go.mod h1:4ChreQoLWfG3xLDer1WdlH5NdlQ3+mwnQq1YTKY+72g= @@ -218,17 +268,18 @@ golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= -golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= -golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/net v0.18.0 h1:mIYleuAkSbHh0tCv7RvjL3F6ZVbLjq4+R7zbOn3Kokg= +golang.org/x/net v0.18.0/go.mod h1:/czyP5RqHAH4odGYxBJ1qz0+CE5WZ+2j1YgoEo8F2jQ= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= -golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= +golang.org/x/sync v0.4.0 h1:zxkM55ReGkDlKSM+Fu41A+zmbZuaPVbGMzvvdUPznYQ= +golang.org/x/sync v0.4.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191025021431-6c3a3bfe00ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -262,6 +313,7 @@ golang.org/x/term v0.14.0/go.mod h1:TySc+nGkYR6qt8km8wUhuFRTVSMIX3XPR58y2lC8vww= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= diff --git a/internal/acorn/config/customconfigint.go b/internal/acorn/config/customconfigint.go index b5c6378..1007b50 100644 --- a/internal/acorn/config/customconfigint.go +++ b/internal/acorn/config/customconfigint.go @@ -2,6 +2,7 @@ package config import ( "github.com/Interhyp/metadata-service/internal/types" + "github.com/Roshick/go-autumn-kafka/pkg/aukafka" "regexp" librepo "github.com/StephanHCB/go-backend-service-common/acorns/repository" @@ -26,12 +27,6 @@ type CustomConfiguration interface { GitCommitterName() string GitCommitterEmail() string - KafkaUsername() string - KafkaPassword() string - KafkaTopic() string - KafkaSeedBrokers() string - KafkaGroupIdOverride() string - AuthOidcKeySetUrl() string AuthOidcTokenAudience() string AuthGroupWrite() string @@ -65,6 +60,12 @@ type CustomConfiguration interface { NotificationConsumerConfigs() map[string]NotificationConsumerConfig AllowedFileCategories() []string + + Kafka() *aukafka.Config + KafkaGroupIdOverride() string + + RedisUrl() string + RedisPassword() string } type NotificationConsumerConfig struct { @@ -120,4 +121,6 @@ const ( KeyRepositoryTypes = "REPOSITORY_TYPES" KeyNotificationConsumerConfigs = "NOTIFICATION_CONSUMER_CONFIGS" KeyAllowedFileCategories = "ALLOWED_FILE_CATEGORIES" + KeyRedisUrl = "REDIS_URL" + KeyRedisPassword = "REDIS_PASSWORD" ) diff --git a/internal/acorn/service/cacheint.go b/internal/acorn/repository/cacheint.go similarity index 81% rename from internal/acorn/service/cacheint.go rename to internal/acorn/repository/cacheint.go index 9d06220..1757109 100644 --- a/internal/acorn/service/cacheint.go +++ b/internal/acorn/repository/cacheint.go @@ -1,4 +1,4 @@ -package service +package repository import ( "context" @@ -14,15 +14,15 @@ type Cache interface { // --- owner cache --- // SetOwnerListTimestamp lets you set or update the timestamp for the last full scan of the list of aliases. - SetOwnerListTimestamp(ctx context.Context, timestamp string) + SetOwnerListTimestamp(ctx context.Context, timestamp string) error // GetOwnerListTimestamp gives you the timestamp of the last full scan of the list of aliases. - GetOwnerListTimestamp(ctx context.Context) string + GetOwnerListTimestamp(ctx context.Context) (string, error) // GetSortedOwnerAliases gives you a time snapshot copy of the slice of sorted owner names. // // This means you won't mess up the cache if you work with it in any way. - GetSortedOwnerAliases(ctx context.Context) []string + GetSortedOwnerAliases(ctx context.Context) ([]string, error) // GetOwner gives you a time snapshot deep copy of the owner information. // @@ -34,25 +34,25 @@ type Cache interface { // PutOwner creates or replaces the owner cache entry. // // This is an atomic operation. - PutOwner(ctx context.Context, alias string, entry openapi.OwnerDto) + PutOwner(ctx context.Context, alias string, entry openapi.OwnerDto) error // DeleteOwner deletes the owner cache entry. // // This is an atomic operation. - DeleteOwner(ctx context.Context, alias string) + DeleteOwner(ctx context.Context, alias string) error // --- service cache --- // SetServiceListTimestamp lets you set or update the timestamp for the last full scan of the list of names. - SetServiceListTimestamp(ctx context.Context, timestamp string) + SetServiceListTimestamp(ctx context.Context, timestamp string) error // GetServiceListTimestamp gives you the timestamp of the last full scan of the list of names. - GetServiceListTimestamp(ctx context.Context) string + GetServiceListTimestamp(ctx context.Context) (string, error) // GetSortedServiceNames gives you a time snapshot copy of the slice of sorted service names. // // This means you won't mess up the cache if you work with it in any way. - GetSortedServiceNames(ctx context.Context) []string + GetSortedServiceNames(ctx context.Context) ([]string, error) // GetService gives you a time snapshot deep copy of the service information. // @@ -64,25 +64,25 @@ type Cache interface { // PutService creates or replaces the service cache entry. // // This is an atomic operation. - PutService(ctx context.Context, name string, entry openapi.ServiceDto) + PutService(ctx context.Context, name string, entry openapi.ServiceDto) error // DeleteService deletes the service cache entry. // // This is an atomic operation. - DeleteService(ctx context.Context, name string) + DeleteService(ctx context.Context, name string) error // --- repository cache --- // SetRepositoryListTimestamp lets you set or update the timestamp for the last full scan of the list of keys. - SetRepositoryListTimestamp(ctx context.Context, timestamp string) + SetRepositoryListTimestamp(ctx context.Context, timestamp string) error // GetRepositoryListTimestamp gives you the timestamp of the last full scan of the list of keys. - GetRepositoryListTimestamp(ctx context.Context) string + GetRepositoryListTimestamp(ctx context.Context) (string, error) // GetSortedRepositoryKeys gives you a time snapshot copy of the slice of sorted repository names. // // This means you won't mess up the cache if you work with it in any way. - GetSortedRepositoryKeys(ctx context.Context) []string + GetSortedRepositoryKeys(ctx context.Context) ([]string, error) // GetRepository gives you a time snapshot deep copy of the repository information. // @@ -94,10 +94,10 @@ type Cache interface { // PutRepository creates or replaces the repository cache entry. // // This is an atomic operation. - PutRepository(ctx context.Context, key string, entry openapi.RepositoryDto) + PutRepository(ctx context.Context, key string, entry openapi.RepositoryDto) error // DeleteRepository deletes the repository cache entry. // // This is an atomic operation. - DeleteRepository(ctx context.Context, key string) + DeleteRepository(ctx context.Context, key string) error } diff --git a/internal/acorn/repository/kafkaint.go b/internal/acorn/repository/kafkaint.go index e4fca53..7f2b4d5 100644 --- a/internal/acorn/repository/kafkaint.go +++ b/internal/acorn/repository/kafkaint.go @@ -6,7 +6,9 @@ import "context" type Kafka interface { IsKafka() bool + // Setup only connects the producer, the consumer is connected with StartReceiveLoop. Setup() error + // Teardown will close both producer and consumer if they have been connected. Teardown() // SubscribeIncoming allows you to register a callback that is called whenever a message is received from the Kafka bus. diff --git a/internal/acorn/service/updaterint.go b/internal/acorn/service/updaterint.go index 50be7cd..bece532 100644 --- a/internal/acorn/service/updaterint.go +++ b/internal/acorn/service/updaterint.go @@ -87,5 +87,5 @@ type Updater interface { // CanMoveOrDeleteRepository checks that no service still references the repository key. // // Expects a current cache and you must be holding the lock. - CanMoveOrDeleteRepository(ctx context.Context, key string) bool + CanMoveOrDeleteRepository(ctx context.Context, key string) (bool, error) } diff --git a/internal/repository/cache/cache.go b/internal/repository/cache/cache.go new file mode 100644 index 0000000..789d8d8 --- /dev/null +++ b/internal/repository/cache/cache.go @@ -0,0 +1,98 @@ +package cache + +import ( + "context" + openapi "github.com/Interhyp/metadata-service/api" + "github.com/Interhyp/metadata-service/internal/acorn/config" + "github.com/Interhyp/metadata-service/internal/acorn/repository" + libcache "github.com/Roshick/go-autumn-synchronisation/pkg/aucache" + auzerolog "github.com/StephanHCB/go-autumn-logging-zerolog" + librepo "github.com/StephanHCB/go-backend-service-common/acorns/repository" + "time" +) + +var cacheRetention = 30 * 24 * time.Hour + +type Impl struct { + Configuration librepo.Configuration + CustomConfiguration config.CustomConfiguration + Logging librepo.Logging + Timestamp librepo.Timestamp + + OwnerCache libcache.Cache[openapi.OwnerDto] + ServiceCache libcache.Cache[openapi.ServiceDto] + RepositoryCache libcache.Cache[openapi.RepositoryDto] + TimestampCache libcache.Cache[string] +} + +func New( + configuration librepo.Configuration, + customConfig config.CustomConfiguration, + logging librepo.Logging, + timestamp librepo.Timestamp, +) repository.Cache { + return &Impl{ + Configuration: configuration, + CustomConfiguration: customConfig, + Logging: logging, + Timestamp: timestamp, + } +} + +func (s *Impl) IsCache() bool { + return true +} + +func (s *Impl) Setup() error { + ctx := auzerolog.AddLoggerToCtx(context.Background()) + + if err := s.SetupCache(ctx); err != nil { + s.Logging.Logger().Ctx(ctx).Error().WithErr(err).Print("failed to set up business layer cache. BAILING OUT") + return err + } + + s.Logging.Logger().Ctx(ctx).Info().Print("successfully set up cache") + return nil +} + +const ( + ownerKeyPrefix = "v1-owner" + serviceKeyPrefix = "v1-service" + repositoryKeyPrefix = "v1-repository" + timestampKeyPrefix = "v1-timestamp" +) + +func (s *Impl) SetupCache(ctx context.Context) error { + redisUrl := s.CustomConfiguration.RedisUrl() + if redisUrl == "" { + s.Logging.Logger().Ctx(ctx).Info().Print("using in-memory cache") + if s.OwnerCache == nil { + s.OwnerCache = libcache.NewMemoryCache[openapi.OwnerDto]() + } + if s.ServiceCache == nil { + s.ServiceCache = libcache.NewMemoryCache[openapi.ServiceDto]() + } + if s.RepositoryCache == nil { + s.RepositoryCache = libcache.NewMemoryCache[openapi.RepositoryDto]() + } + if s.TimestampCache == nil { + s.TimestampCache = libcache.NewMemoryCache[string]() + } + } else { + s.Logging.Logger().Ctx(ctx).Info().Printf("using redis at %s", redisUrl) + redisPassword := s.CustomConfiguration.RedisUrl() + if s.OwnerCache == nil { + s.OwnerCache = libcache.NewRedisCache[openapi.OwnerDto](redisUrl, redisPassword, ownerKeyPrefix) + } + if s.ServiceCache == nil { + s.ServiceCache = libcache.NewRedisCache[openapi.ServiceDto](redisUrl, redisPassword, serviceKeyPrefix) + } + if s.RepositoryCache == nil { + s.RepositoryCache = libcache.NewRedisCache[openapi.RepositoryDto](redisUrl, redisPassword, repositoryKeyPrefix) + } + if s.TimestampCache == nil { + s.TimestampCache = libcache.NewRedisCache[string](redisUrl, redisPassword, timestampKeyPrefix) + } + } + return nil +} diff --git a/internal/repository/cache/lowlevel.go b/internal/repository/cache/lowlevel.go new file mode 100644 index 0000000..abdca74 --- /dev/null +++ b/internal/repository/cache/lowlevel.go @@ -0,0 +1,96 @@ +package cache + +import ( + "context" + "fmt" + libcache "github.com/Roshick/go-autumn-synchronisation/pkg/aucache" + "github.com/StephanHCB/go-backend-service-common/api/apierrors" + "sort" +) + +const ( + ownerTimestampKey = "ownertimestamp" + serviceTimestampKey = "servicetimestamp" + repositoryTimestampKey = "repositorytimestamp" +) + +var notFoundTimestamp = "1970-01-01T00:00:00Z" + +func (s *Impl) setTimestamp(ctx context.Context, what string, key string, timestamp string) error { + err := s.TimestampCache.Set(ctx, key, timestamp, cacheRetention) + if err != nil { + messageKey := fmt.Sprintf("cache.%s.error", what) + details := fmt.Sprintf("error writing %s timestamp to cache", what) + s.Logging.Logger().Ctx(ctx).Info().WithErr(err).Printf("%s: %s", details, err.Error()) + return apierrors.NewBadGatewayError(messageKey, details, err, s.Timestamp.Now()) + } + return nil +} + +func (s *Impl) getTimestamp(ctx context.Context, what string, key string) (string, error) { + valPtr, err := s.TimestampCache.Get(ctx, key) + if err != nil { + messageKey := fmt.Sprintf("cache.%s.error", what) + details := fmt.Sprintf("error reading %s timestamp from cache", what) + s.Logging.Logger().Ctx(ctx).Info().Printf("%s: %s", details, err.Error()) + return notFoundTimestamp, apierrors.NewBadGatewayError(messageKey, details, err, s.Timestamp.Now()) + } + if valPtr == nil { + return notFoundTimestamp, err + } + return *valPtr, nil +} + +func getSortedKeys[E any](ctx context.Context, what string, s *Impl, cache libcache.Cache[E]) ([]string, error) { + keys, err := cache.Keys(ctx) + if err != nil { + messageKey := fmt.Sprintf("cache.%s.error", what) + details := fmt.Sprintf("error reading %s keys from cache", what) + s.Logging.Logger().Ctx(ctx).Warn().WithErr(err).Printf("%s: %s", details, err.Error()) + return []string{}, apierrors.NewBadGatewayError(messageKey, details, err, s.Timestamp.Now()) + } + sort.Strings(keys) + return keys, nil +} + +func getEntry[E any](ctx context.Context, what string, s *Impl, cache libcache.Cache[E], key string) (E, error) { + copiedEntryPtr, err := cache.Get(ctx, key) + if err != nil { + var empty E + messageKey := fmt.Sprintf("cache.%s.error", what) + details := fmt.Sprintf("error reading %s %s from cache", what, key) + s.Logging.Logger().Ctx(ctx).Warn().WithErr(err).Printf("%s: %s", details, err.Error()) + return empty, apierrors.NewBadGatewayError(messageKey, details, err, s.Timestamp.Now()) + } + if copiedEntryPtr == nil { + var empty E + messageKey := fmt.Sprintf("%s.notfound", what) + details := fmt.Sprintf("%s %s not found", what, key) + s.Logging.Logger().Ctx(ctx).Info().Print(details) + return empty, apierrors.NewNotFoundError(messageKey, details, nil, s.Timestamp.Now()) + } else { + return *copiedEntryPtr, nil + } +} + +func putEntry[E any](ctx context.Context, what string, s *Impl, cache libcache.Cache[E], key string, entry E) error { + err := cache.Set(ctx, key, entry, cacheRetention) + if err != nil { + messageKey := fmt.Sprintf("cache.%s.error", what) + details := fmt.Sprintf("error writing %s %s to cache", what, key) + s.Logging.Logger().Ctx(ctx).Warn().WithErr(err).Printf("%s: %s", details, err.Error()) + return apierrors.NewBadGatewayError(messageKey, details, err, s.Timestamp.Now()) + } + return nil +} + +func removeEntry[E any](ctx context.Context, what string, s *Impl, cache libcache.Cache[E], key string) error { + err := cache.Remove(ctx, key) + if err != nil { + messageKey := fmt.Sprintf("cache.%s.error", what) + details := fmt.Sprintf("error removing %s %s from cache", what, key) + s.Logging.Logger().Ctx(ctx).Warn().WithErr(err).Printf("%s: %s", details, err.Error()) + return apierrors.NewBadGatewayError(messageKey, details, err, s.Timestamp.Now()) + } + return nil +} diff --git a/internal/repository/cache/ownercache.go b/internal/repository/cache/ownercache.go new file mode 100644 index 0000000..58d6c14 --- /dev/null +++ b/internal/repository/cache/ownercache.go @@ -0,0 +1,32 @@ +package cache + +import ( + "context" + "github.com/Interhyp/metadata-service/api" +) + +const ownerWhat = "owner" + +func (s *Impl) SetOwnerListTimestamp(ctx context.Context, timestamp string) error { + return s.setTimestamp(ctx, ownerWhat, ownerTimestampKey, timestamp) +} + +func (s *Impl) GetOwnerListTimestamp(ctx context.Context) (string, error) { + return s.getTimestamp(ctx, ownerWhat, ownerTimestampKey) +} + +func (s *Impl) GetSortedOwnerAliases(ctx context.Context) ([]string, error) { + return getSortedKeys(ctx, ownerWhat, s, s.OwnerCache) +} + +func (s *Impl) GetOwner(ctx context.Context, alias string) (openapi.OwnerDto, error) { + return getEntry(ctx, ownerWhat, s, s.OwnerCache, alias) +} + +func (s *Impl) PutOwner(ctx context.Context, alias string, entry openapi.OwnerDto) error { + return putEntry(ctx, ownerWhat, s, s.OwnerCache, alias, entry) +} + +func (s *Impl) DeleteOwner(ctx context.Context, alias string) error { + return removeEntry(ctx, ownerWhat, s, s.OwnerCache, alias) +} diff --git a/internal/repository/cache/repositorycache.go b/internal/repository/cache/repositorycache.go new file mode 100644 index 0000000..1819745 --- /dev/null +++ b/internal/repository/cache/repositorycache.go @@ -0,0 +1,32 @@ +package cache + +import ( + "context" + "github.com/Interhyp/metadata-service/api" +) + +const repositoryWhat = "repository" + +func (s *Impl) SetRepositoryListTimestamp(ctx context.Context, timestamp string) error { + return s.setTimestamp(ctx, repositoryWhat, repositoryTimestampKey, timestamp) +} + +func (s *Impl) GetRepositoryListTimestamp(ctx context.Context) (string, error) { + return s.getTimestamp(ctx, repositoryWhat, repositoryTimestampKey) +} + +func (s *Impl) GetSortedRepositoryKeys(ctx context.Context) ([]string, error) { + return getSortedKeys(ctx, repositoryWhat, s, s.RepositoryCache) +} + +func (s *Impl) GetRepository(ctx context.Context, key string) (openapi.RepositoryDto, error) { + return getEntry(ctx, repositoryWhat, s, s.RepositoryCache, key) +} + +func (s *Impl) PutRepository(ctx context.Context, key string, entry openapi.RepositoryDto) error { + return putEntry(ctx, repositoryWhat, s, s.RepositoryCache, key, entry) +} + +func (s *Impl) DeleteRepository(ctx context.Context, key string) error { + return removeEntry(ctx, repositoryWhat, s, s.RepositoryCache, key) +} diff --git a/internal/repository/cache/servicecache.go b/internal/repository/cache/servicecache.go new file mode 100644 index 0000000..0ba35fa --- /dev/null +++ b/internal/repository/cache/servicecache.go @@ -0,0 +1,32 @@ +package cache + +import ( + "context" + "github.com/Interhyp/metadata-service/api" +) + +const serviceWhat = "service" + +func (s *Impl) SetServiceListTimestamp(ctx context.Context, timestamp string) error { + return s.setTimestamp(ctx, serviceWhat, serviceTimestampKey, timestamp) +} + +func (s *Impl) GetServiceListTimestamp(ctx context.Context) (string, error) { + return s.getTimestamp(ctx, serviceWhat, serviceTimestampKey) +} + +func (s *Impl) GetSortedServiceNames(ctx context.Context) ([]string, error) { + return getSortedKeys(ctx, serviceWhat, s, s.ServiceCache) +} + +func (s *Impl) GetService(ctx context.Context, name string) (openapi.ServiceDto, error) { + return getEntry(ctx, serviceWhat, s, s.ServiceCache, name) +} + +func (s *Impl) PutService(ctx context.Context, name string, entry openapi.ServiceDto) error { + return putEntry(ctx, serviceWhat, s, s.ServiceCache, name, entry) +} + +func (s *Impl) DeleteService(ctx context.Context, name string) error { + return removeEntry(ctx, serviceWhat, s, s.ServiceCache, name) +} diff --git a/internal/repository/config/accessors.go b/internal/repository/config/accessors.go index 374c06a..e545efb 100644 --- a/internal/repository/config/accessors.go +++ b/internal/repository/config/accessors.go @@ -2,6 +2,7 @@ package config import ( "github.com/Interhyp/metadata-service/internal/acorn/config" + "github.com/Roshick/go-autumn-kafka/pkg/aukafka" "os" "regexp" "strings" @@ -59,22 +60,6 @@ func (c *CustomConfigImpl) GitCommitterEmail() string { return c.VGitCommitterEmail } -func (c *CustomConfigImpl) KafkaUsername() string { - return c.VKafkaUsername -} - -func (c *CustomConfigImpl) KafkaPassword() string { - return c.VKafkaPassword -} - -func (c *CustomConfigImpl) KafkaTopic() string { - return c.VKafkaTopic -} - -func (c *CustomConfigImpl) KafkaSeedBrokers() string { - return c.VKafkaSeedBrokers -} - func (c *CustomConfigImpl) AuthOidcKeySetUrl() string { return c.VAuthOidcKeySetUrl } @@ -177,3 +162,15 @@ func (c *CustomConfigImpl) NotificationConsumerConfigs() map[string]config.Notif func (c *CustomConfigImpl) AllowedFileCategories() []string { return c.VAllowedFileCategories } + +func (c *CustomConfigImpl) Kafka() *aukafka.Config { + return c.VKafkaConfig +} + +func (c *CustomConfigImpl) RedisUrl() string { + return c.VRedisUrl +} + +func (c *CustomConfigImpl) RedisPassword() string { + return c.VRedisPassword +} diff --git a/internal/repository/config/config.go b/internal/repository/config/config.go index 4ccc805..82a263e 100644 --- a/internal/repository/config/config.go +++ b/internal/repository/config/config.go @@ -99,34 +99,6 @@ var CustomConfigItems = []auconfigapi.ConfigItem{ Description: "email address to use for git commits", Validate: auconfigenv.ObtainNotEmptyValidator(), }, - { - Key: config.KeyKafkaUsername, - EnvName: config.KeyKafkaUsername, - Default: "", - Description: "optional: kafka username (needed to send kafka notifications), leaving this or any of the other *KAFKA* fields empty will switch off all Kafka functionality", - Validate: auconfigapi.ConfigNeedsNoValidation, - }, - { - Key: config.KeyKafkaPassword, - EnvName: config.KeyKafkaPassword, - Default: "", - Description: "optional: kafka password (needed to send kafka notifications), leaving this or any of the other *KAFKA* fields empty will switch off all Kafka functionality", - Validate: auconfigapi.ConfigNeedsNoValidation, - }, - { - Key: config.KeyKafkaTopic, - EnvName: config.KeyKafkaTopic, - Default: "", - Description: "optional: kafka topic (needed to send kafka notifications), leaving this or any of the other *KAFKA* fields empty will switch off all Kafka functionality", - Validate: auconfigenv.ObtainPatternValidator("^(|[a-z0-9-]+)$"), - }, - { - Key: config.KeyKafkaSeedBrokers, - EnvName: config.KeyKafkaSeedBrokers, - Default: "", - Description: "optional: comma separated list of kafka seed broker URLs (needed to send kafka notifications), leaving this or any of the other *KAFKA* fields empty will switch off all Kafka functionality", - Validate: auconfigenv.ObtainPatternValidator("^(|([a-z0-9-]+.[a-z0-9-]+.[a-z]{2,3}:9092)(,[a-z0-9-]+.[a-z0-9-]+.[a-z]{2,3}:9092)*)$"), - }, { Key: config.KeyKafkaGroupIdOverride, EnvName: config.KeyKafkaGroupIdOverride, @@ -308,4 +280,18 @@ var CustomConfigItems = []auconfigapi.ConfigItem{ return err }, }, + { + Key: config.KeyRedisUrl, + EnvName: config.KeyRedisUrl, + Default: "", + Description: "base url to the redis, including protocol. Uses in-memory caching if blank.", + Validate: auconfigapi.ConfigNeedsNoValidation, + }, + { + Key: config.KeyRedisPassword, + EnvName: config.KeyRedisPassword, + Default: "", + Description: "password used to access the redis", + Validate: auconfigapi.ConfigNeedsNoValidation, + }, } diff --git a/internal/repository/config/plumbing.go b/internal/repository/config/plumbing.go index 87ec1f3..2557d17 100644 --- a/internal/repository/config/plumbing.go +++ b/internal/repository/config/plumbing.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/Interhyp/metadata-service/internal/acorn/config" openapi "github.com/Interhyp/metadata-service/internal/types" + "github.com/Roshick/go-autumn-kafka/pkg/aukafka" auconfigapi "github.com/StephanHCB/go-autumn-config-api" auconfigenv "github.com/StephanHCB/go-autumn-config-env" librepo "github.com/StephanHCB/go-backend-service-common/acorns/repository" @@ -35,10 +36,6 @@ type CustomConfigImpl struct { VBitbucketReviewerFallback string VGitCommitterName string VGitCommitterEmail string - VKafkaUsername string - VKafkaPassword string - VKafkaTopic string - VKafkaSeedBrokers string VAuthOidcKeySetUrl string VAuthOidcTokenAudience string VAuthGroupWrite string @@ -64,13 +61,20 @@ type CustomConfigImpl struct { VRepositoryKeySeparator string VNotificationConsumerConfigs map[string]config.NotificationConsumerConfig VAllowedFileCategories []string + VRedisUrl string + VRedisPassword string + + VKafkaConfig *aukafka.Config } func New() (librepo.Configuration, config.CustomConfiguration) { - instance := &CustomConfigImpl{} + instance := &CustomConfigImpl{ + VKafkaConfig: aukafka.NewConfig(), + } configItems := make([]auconfigapi.ConfigItem, 0) configItems = append(configItems, CustomConfigItems...) configItems = append(configItems, vault.ConfigItems...) + configItems = append(configItems, instance.VKafkaConfig.ConfigItems()...) libInstance := libconfig.NewNoAcorn(instance, configItems) @@ -92,10 +96,6 @@ func (c *CustomConfigImpl) Obtain(getter func(key string) string) { c.VBitbucketReviewerFallback = getter(config.KeyBitbucketReviewerFallback) c.VGitCommitterName = getter(config.KeyGitCommitterName) c.VGitCommitterEmail = getter(config.KeyGitCommitterEmail) - c.VKafkaUsername = getter(config.KeyKafkaUsername) - c.VKafkaPassword = getter(config.KeyKafkaPassword) - c.VKafkaTopic = getter(config.KeyKafkaTopic) - c.VKafkaSeedBrokers = getter(config.KeyKafkaSeedBrokers) c.VKafkaGroupIdOverride = getter(config.KeyKafkaGroupIdOverride) c.VAuthOidcKeySetUrl = getter(config.KeyAuthOidcKeySetUrl) c.VAuthOidcTokenAudience = getter(config.KeyAuthOidcTokenAudience) @@ -121,6 +121,8 @@ func (c *CustomConfigImpl) Obtain(getter func(key string) string) { c.VRepositoryKeySeparator = getter(config.KeyRepositoryKeySeparator) c.VNotificationConsumerConfigs, _ = parseNotificationConsumerConfigs(getter(config.KeyNotificationConsumerConfigs)) c.VAllowedFileCategories, _ = parseAllowedFileCategories(getter(config.KeyAllowedFileCategories)) + + c.VKafkaConfig.Obtain(getter) } // used after validation, so known safe diff --git a/internal/repository/config/validation_test.go b/internal/repository/config/validation_test.go index 6cfeb3d..29b2688 100644 --- a/internal/repository/config/validation_test.go +++ b/internal/repository/config/validation_test.go @@ -78,7 +78,7 @@ func TestValidate_LotsOfErrors(t *testing.T) { _, err := tstSetupCutAndLogRecorder(t, "invalid-config-values.yaml") require.NotNil(t, err) - require.Contains(t, err.Error(), "some configuration values failed to validate or parse. There were 29 error(s). See details above") + require.Contains(t, err.Error(), "some configuration values failed to validate or parse. There were 27 error(s). See details above") actualLog := goauzerolog.RecordedLogForTesting.String() @@ -91,21 +91,18 @@ func TestValidate_LotsOfErrors(t *testing.T) { expectedPart3 := "METRICS_PORT: value -12387192873invalid is not a valid integer" require.Contains(t, actualLog, expectedPart3) - expectedPart4 := "failed to validate configuration field KAFKA_SEED_BROKERS: must match ^(|([a-z0-9-]+.[a-z0-9-]+.[a-z]{2,3}" + expectedPart4 := "failed to validate configuration field ALERT_TARGET_PREFIX: must match ^((http|https)://|)[a-z0-9-.]+.[a-z]{2,3}/$" require.Contains(t, actualLog, expectedPart4) - expectedPart5 := "failed to validate configuration field ALERT_TARGET_PREFIX: must match ^((http|https)://|)[a-z0-9-.]+.[a-z]{2,3}/$" + expectedPart5 := "failed to validate configuration field ALERT_TARGET_SUFFIX: must match ^@[a-z0-9-]+.[a-z]{2,3}$" require.Contains(t, actualLog, expectedPart5) - expectedPart6 := "failed to validate configuration field ALERT_TARGET_SUFFIX: must match ^@[a-z0-9-]+.[a-z]{2,3}$" + expectedPart6 := "failed to validate configuration field VAULT_ENABLED: value what is not a valid boolean value" require.Contains(t, actualLog, expectedPart6) - expectedPart7 := "failed to validate configuration field VAULT_ENABLED: value what is not a valid boolean value" + expectedPart7 := "failed to validate configuration field VAULT_SECRETS_CONFIG: invalid character '}' after top-level value" require.Contains(t, actualLog, expectedPart7) - expectedPart8 := "failed to validate configuration field VAULT_SECRETS_CONFIG: invalid character '}' after top-level value" - require.Contains(t, actualLog, expectedPart8) - require.Contains(t, actualLog, "failed to validate configuration field NOTIFICATION_CONSUMER_CONFIGS:") require.Contains(t, actualLog, "Notification consumer config 'caseInvalidTypes' contains invalid type 'invalid'.") require.Contains(t, actualLog, "Notification consumer config 'caseInvalidTypes' contains invalid type 'alsoInvalid'.") @@ -135,10 +132,6 @@ func TestAccessors(t *testing.T) { require.Equal(t, "username", config.Custom(cut).BitbucketReviewerFallback()) require.Equal(t, "Body, Some", config.Custom(cut).GitCommitterName()) require.Equal(t, "somebody@somewhere.com", config.Custom(cut).GitCommitterEmail()) - require.Equal(t, "some-kafka-username", config.Custom(cut).KafkaUsername()) - require.Equal(t, "some-kafka-password", config.Custom(cut).KafkaPassword()) - require.Equal(t, "some-kafka-topic", config.Custom(cut).KafkaTopic()) - require.Equal(t, "first-kafka-broker.domain.com:9092,second-kafka-broker.domain.com:9092", config.Custom(cut).KafkaSeedBrokers()) require.Equal(t, "http://keyset", config.Custom(cut).AuthOidcKeySetUrl()) require.Equal(t, "some-audience", config.Custom(cut).AuthOidcTokenAudience()) require.Equal(t, "admin", config.Custom(cut).AuthGroupWrite()) diff --git a/internal/repository/kafka/kafka.go b/internal/repository/kafka/kafka.go index ab797cf..d7c5638 100644 --- a/internal/repository/kafka/kafka.go +++ b/internal/repository/kafka/kafka.go @@ -2,22 +2,23 @@ package kafka import ( "context" - "crypto/tls" - "encoding/json" "errors" "fmt" + "github.com/IBM/sarama" "github.com/Interhyp/metadata-service/internal/acorn/config" "github.com/Interhyp/metadata-service/internal/acorn/repository" + "github.com/Roshick/go-autumn-kafka/pkg/aukafka" + aulogging "github.com/StephanHCB/go-autumn-logging" auzerolog "github.com/StephanHCB/go-autumn-logging-zerolog" librepo "github.com/StephanHCB/go-backend-service-common/acorns/repository" - "github.com/twmb/franz-go/pkg/kgo" - "github.com/twmb/franz-go/pkg/sasl/scram" - "net" + "github.com/rcrowley/go-metrics" "strings" "time" ) import _ "github.com/go-git/go-git/v5" +const MetadataChangeEventsTopicKey = "metadata-change-events" + type Impl struct { Configuration librepo.Configuration CustomConfiguration config.CustomConfiguration @@ -25,9 +26,8 @@ type Impl struct { HostIP repository.HostIP Callback repository.ReceiverCallback - KafkaProducer *kgo.Client - KafkaConsumer *kgo.Client - KafkaTopic string + KafkaProducer *aukafka.SyncProducer[repository.UpdateEvent] + KafkaConsumer *aukafka.Consumer[repository.UpdateEvent] } func New( @@ -53,12 +53,12 @@ func (r *Impl) IsKafka() bool { func (r *Impl) Setup() error { ctx := auzerolog.AddLoggerToCtx(context.Background()) - if err := r.Connect(ctx); err != nil { - r.Logging.Logger().Ctx(ctx).Error().WithErr(err).Print("failed to set up kafka connection. BAILING OUT") + if err := r.ConnectProducer(ctx); err != nil { + r.Logging.Logger().Ctx(ctx).Error().WithErr(err).Print("failed to set up kafka producer connection. BAILING OUT") return err } - r.Logging.Logger().Ctx(ctx).Info().Print("successfully set up kafka") + r.Logging.Logger().Ctx(ctx).Info().Print("successfully set up kafka producer") return nil } @@ -66,7 +66,7 @@ func (r *Impl) Teardown() { ctx := auzerolog.AddLoggerToCtx(context.Background()) if err := r.Disconnect(ctx); err != nil { - r.Logging.Logger().Ctx(ctx).Error().WithErr(err).Print("failed to tear down kafka connection. Continuing anyway.") + r.Logging.Logger().Ctx(ctx).Error().WithErr(err).Print("failed to tear down kafka connection(s). Continuing anyway.") } } @@ -81,220 +81,110 @@ func (r *Impl) Send(ctx context.Context, event repository.UpdateEvent) error { return nil } - value, err := json.Marshal(&event) - if err != nil { - return err - } - - record := &kgo.Record{Topic: r.KafkaTopic, Value: value} - - // this blocks completely?!? - - //if err := r.KafkaProducer.ProduceSync(ctx, record).FirstErr(); err != nil { - // return err - //} - - // so use async produce (with no guarantee of delivery) - - r.KafkaProducer.Produce(ctx, record, nil) - - return nil + return r.KafkaProducer.Produce(ctx, nil, &event) } -func (r *Impl) StartReceiveLoop(ctx context.Context) error { - r.Logging.Logger().Ctx(ctx).Info().Print("starting receive loop in background") - go func() { - myCtx := auzerolog.AddLoggerToCtx(context.Background()) - _ = r.receiveLoop(myCtx) - }() - return nil +func (r *Impl) topicConfig(ctx context.Context) (*aukafka.TopicConfig, error) { + if r.CustomConfiguration.Kafka() != nil { + if topicConfig, ok := r.CustomConfiguration.Kafka().TopicConfigs()[MetadataChangeEventsTopicKey]; ok { + if topicConfig.Password == "" { + r.Logging.Logger().Ctx(ctx).Warn().Print("kafka configuration present but password is missing") + return nil, errors.New("kafka configuration present but got empty password from vault") + } + return &topicConfig, nil + } + } + r.Logging.Logger().Ctx(ctx).Info().Print("NOT connecting to kafka due to missing configuration (ok, feature toggle)") + return nil, nil } -// receiveLoop should terminate when its context is cancelled -// -// it will also terminate on fetch errors -// -// TODO handle fetch errors more than just logging -func (r *Impl) receiveLoop(ctx context.Context) error { - if r.KafkaConsumer == nil { - r.Logging.Logger().Ctx(ctx).Info().Print("receive loop cannot start, no kafka client") +func (r *Impl) StartReceiveLoop(ctx context.Context) error { + topicConfig, err := r.topicConfig(ctx) + if err != nil { + return err + } + if topicConfig == nil { return nil } - for { - fetches := r.KafkaConsumer.PollFetches(context.Background()) - if fetches.IsClientClosed() { - r.Logging.Logger().Ctx(ctx).Info().Print("receive loop ending, kafka client was closed") - return nil - } - r.Logging.Logger().Ctx(ctx).Debug().Printf("receive loop found %d fetches", len(fetches)) - var firstError error = nil - fetches.EachError(func(t string, p int32, err error) { - if firstError == nil { - firstError = fmt.Errorf("receive loop fetch error topic %s partition %d: %v", t, p, err) - } - }) - if firstError != nil { - r.Logging.Logger().Ctx(ctx).Error().WithErr(firstError).Print("receive loop terminated abnormally: %v", firstError) - return firstError + if r.Callback == nil { + return errors.New("cannot start kafka receive loop - no callback configured. This is an implementation error") + } + callback := func(ctx context.Context, key *string, event *repository.UpdateEvent, stamp time.Time) error { + if event == nil { + aulogging.Logger.Ctx(ctx).Warn().Print("kafka receiver callback received nil event - skipping") + return nil } - - fetches.EachRecord(func(record *kgo.Record) { - event := repository.UpdateEvent{} - err := json.Unmarshal(record.Value, &event) - if err != nil { - r.Logging.Logger().Ctx(ctx).Error().WithErr(err).Print("receive loop json error - ignoring malformed message: %v", err) - } else { - r.Logging.Logger().Ctx(ctx).Info().Printf("received kafka message: %v", event) - r.Callback(event) - } - }) + r.Callback(*event) + return nil } -} -func (r *Impl) createConsumer(ctx context.Context, seedBrokers string, user string, pass string, topic string) (*kgo.Client, error) { + // group id groupId := r.CustomConfiguration.KafkaGroupIdOverride() if groupId == "" { ip, err := r.HostIP.ObtainLocalIp() if err != nil { - return nil, err + return err } ipComponents := strings.Split(ip.String(), ".") if len(ipComponents) != 4 { - return nil, errors.New("failed to obtain local non-localhost ip address to use for consumer group, did not get an ipv4 address") + return errors.New("failed to obtain local non-localhost ip address to use for consumer group, did not get an ipv4 address") } - workerNodeId := ipComponents[2] - - groupId = "metadata-worker" + workerNodeId + groupId = fmt.Sprintf("metadata-worker-%s-%s", ipComponents[2], ipComponents[3]) } - + topicConfig.ConsumerGroup = &groupId r.Logging.Logger().Ctx(ctx).Info().Printf("using kafka group id %s for consumer", groupId) - tlsDialer := &tls.Dialer{ - NetDialer: &net.Dialer{Timeout: 10 * time.Second}, - Config: &tls.Config{InsecureSkipVerify: true}, - } - opts := []kgo.Opt{ - kgo.SeedBrokers(strings.Split(seedBrokers, ",")...), - - kgo.SASL(scram.Auth{ - User: user, - Pass: pass, - }.AsSha256Mechanism()), - - kgo.Dialer(tlsDialer.DialContext), - - kgo.ConsumerGroup(groupId), - kgo.ConsumeTopics(topic), - kgo.SessionTimeout(30 * time.Second), - kgo.WithLogger(r), - } + configPreset := sarama.NewConfig() + configPreset.Net.TLS.Enable = true + configPreset.Producer.Compression = sarama.CompressionNone + configPreset.MetricRegistry = metrics.NewPrefixedChildRegistry(metrics.DefaultRegistry, "sarama.consumer.") + configPreset.Consumer.Offsets.Initial = sarama.OffsetNewest - consumer, err := kgo.NewClient(opts...) + consumer, err := aukafka.CreateConsumer[repository.UpdateEvent](ctx, *topicConfig, callback, configPreset) if err != nil { - return nil, err + return err } + r.Logging.Logger().Ctx(ctx).Info().Print("successfully connected to kafka as consumer (also started receive loop in background)") - return consumer, nil + r.KafkaConsumer = consumer + return nil } -func (r *Impl) createProducer(seedBrokers string, user string, pass string) (*kgo.Client, error) { - tlsDialer := &tls.Dialer{ - NetDialer: &net.Dialer{Timeout: 10 * time.Second}, - Config: &tls.Config{InsecureSkipVerify: true}, - } - opts := []kgo.Opt{ - kgo.SeedBrokers(strings.Split(seedBrokers, ",")...), - - kgo.SASL(scram.Auth{ - User: user, - Pass: pass, - }.AsSha256Mechanism()), - - kgo.Dialer(tlsDialer.DialContext), - - kgo.RequestRetries(2), - kgo.RetryTimeout(5 * time.Second), - kgo.WithLogger(r), - } - - producer, err := kgo.NewClient(opts...) +func (r *Impl) ConnectProducer(ctx context.Context) error { + topicConfig, err := r.topicConfig(ctx) if err != nil { - return nil, err + return err } - - return producer, nil -} - -func (r *Impl) Connect(ctx context.Context) error { - seedBrokers := r.CustomConfiguration.KafkaSeedBrokers() - user := r.CustomConfiguration.KafkaUsername() - pass := r.CustomConfiguration.KafkaPassword() - topic := r.CustomConfiguration.KafkaTopic() - - if seedBrokers == "" || user == "" || topic == "" { - r.Logging.Logger().Ctx(ctx).Info().Print("NOT connecting to kafka due to missing configuration (ok, feature toggle)") + if topicConfig == nil { return nil } - if pass == "" { - r.Logging.Logger().Ctx(ctx).Warn().Print("kafka configuration present but password is missing") - return errors.New("kafka configuration present but got empty password from vault") - } - consumer, err := r.createConsumer(ctx, seedBrokers, user, pass, topic) - if err != nil { - return err - } - r.Logging.Logger().Ctx(ctx).Info().Print("successfully connected to kafka as consumer") + configPreset := sarama.NewConfig() + configPreset.Net.TLS.Enable = true + configPreset.Producer.Compression = sarama.CompressionNone + configPreset.MetricRegistry = metrics.NewPrefixedChildRegistry(metrics.DefaultRegistry, "sarama.producer.") - producer, err := r.createProducer(seedBrokers, user, pass) + producer, err := aukafka.CreateSyncProducer[repository.UpdateEvent](ctx, *topicConfig, configPreset) if err != nil { - consumer.Close() return err } r.Logging.Logger().Ctx(ctx).Info().Print("successfully connected to kafka as producer") - r.KafkaConsumer = consumer r.KafkaProducer = producer - r.KafkaTopic = topic return nil } func (r *Impl) Disconnect(ctx context.Context) error { if r.KafkaConsumer != nil { - r.KafkaConsumer.Close() + r.KafkaConsumer.Close(ctx) r.KafkaConsumer = nil } if r.KafkaProducer != nil { - r.KafkaProducer.Close() + r.KafkaProducer.Close(ctx) r.KafkaProducer = nil } return nil } - -// --- implementing kgo.Logger --- - -func (r *Impl) Level() kgo.LogLevel { - return kgo.LogLevelInfo -} - -func (r *Impl) Log(level kgo.LogLevel, msg string, keyvals ...interface{}) { - switch level { - case kgo.LogLevelError: - r.Logging.Logger().NoCtx().Warn().Print("kgo error: " + msg) - return - case kgo.LogLevelWarn: - r.Logging.Logger().NoCtx().Warn().Print("kgo warning: " + msg) - return - case kgo.LogLevelInfo: - r.Logging.Logger().NoCtx().Debug().Print("kgo info: " + msg) - return - case kgo.LogLevelDebug: - r.Logging.Logger().NoCtx().Debug().Print("kgo debug: " + msg) - return - default: - return - } -} diff --git a/internal/service/cache/cache.go b/internal/service/cache/cache.go deleted file mode 100644 index f0bea90..0000000 --- a/internal/service/cache/cache.go +++ /dev/null @@ -1,61 +0,0 @@ -package cache - -import ( - "context" - "github.com/Interhyp/metadata-service/internal/acorn/service" - "github.com/Interhyp/metadata-service/internal/service/cache/cacheable" - auzerolog "github.com/StephanHCB/go-autumn-logging-zerolog" - librepo "github.com/StephanHCB/go-backend-service-common/acorns/repository" -) - -type Impl struct { - Configuration librepo.Configuration - Logging librepo.Logging - Timestamp librepo.Timestamp - - OwnerCache cacheable.Cacheable - ServiceCache cacheable.Cacheable - RepositoryCache cacheable.Cacheable -} - -func New( - configuration librepo.Configuration, - logging librepo.Logging, - timestamp librepo.Timestamp, -) service.Cache { - return &Impl{ - Configuration: configuration, - Logging: logging, - Timestamp: timestamp, - } -} - -func (s *Impl) IsCache() bool { - return true -} - -func (s *Impl) Setup() error { - ctx := auzerolog.AddLoggerToCtx(context.Background()) - - if err := s.SetupCache(ctx); err != nil { - s.Logging.Logger().Ctx(ctx).Error().WithErr(err).Print("failed to set up business layer cache. BAILING OUT") - return err - } - - s.Logging.Logger().Ctx(ctx).Info().Print("successfully set up cache") - return nil -} - -func (s *Impl) SetupCache(_ context.Context) error { - // idempotent to allow mocking - if s.OwnerCache == nil { - s.OwnerCache = cacheable.New() - } - if s.ServiceCache == nil { - s.ServiceCache = cacheable.New() - } - if s.RepositoryCache == nil { - s.RepositoryCache = cacheable.New() - } - return nil -} diff --git a/internal/service/cache/cacheable/cacheable.go b/internal/service/cache/cacheable/cacheable.go deleted file mode 100644 index aa4f975..0000000 --- a/internal/service/cache/cacheable/cacheable.go +++ /dev/null @@ -1,82 +0,0 @@ -package cacheable - -import ( - "sort" - "sync" -) - -type Impl struct { - mu sync.Mutex - - timestamp string - sortedKeysCache *[]string - values map[string]*interface{} -} - -func New() Cacheable { - c := &Impl{ - values: make(map[string]*interface{}), - } - // during initial creation we don't need to lock - c.buildKeysCacheMustHaveLock() - return c -} - -func (c *Impl) buildKeysCacheMustHaveLock() { - keys := make([]string, 0, len(c.values)) - for k := range c.values { - keys = append(keys, k) - } - sort.Strings(keys) - - c.sortedKeysCache = &keys -} - -func (c *Impl) GetTimestamp() string { - return c.timestamp -} - -func (c *Impl) SetTimestamp(timestamp string) { - c.timestamp = timestamp -} - -func (c *Impl) GetSortedKeys() *[]string { - c.mu.Lock() - defer c.mu.Unlock() - return c.sortedKeysCache -} - -func (c *Impl) GetEntryRef(key string) *interface{} { - c.mu.Lock() - defer c.mu.Unlock() - ref, ok := c.values[key] - if ok { - return ref - } else { - // this is actually the same as the ok branch, because the zero value of a pointer is nil, - // but writing it down for clarity's sake - return nil - } -} - -func (c *Impl) UpdateEntryRef(key string, newRef *interface{}) { - c.mu.Lock() - defer c.mu.Unlock() - _, ok := c.values[key] - if ok { - if newRef != nil { - // update existing - c.values[key] = newRef - } else { - // delete - delete(c.values, key) - c.buildKeysCacheMustHaveLock() - } - } else { - if newRef != nil { - // insert new - c.values[key] = newRef - c.buildKeysCacheMustHaveLock() - } - } -} diff --git a/internal/service/cache/cacheable/cacheableint.go b/internal/service/cache/cacheable/cacheableint.go deleted file mode 100644 index 7cbea1c..0000000 --- a/internal/service/cache/cacheable/cacheableint.go +++ /dev/null @@ -1,20 +0,0 @@ -package cacheable - -// Cacheable implements a synchronized in-memory cache for references to arbitrary data structures -// -// Update the timestamp whenever you do a full rescan. -type Cacheable interface { - GetTimestamp() string - SetTimestamp(timestamp string) - - GetSortedKeys() *[]string - - // GetEntryRef obtains the reference to the current entry, or nil if the key isn't present. - GetEntryRef(key string) *interface{} - - // UpdateEntryRef will create the key if it doesn't exist - // if newRef is nil, will remove the key if present - // - // key changes lead to re-creating the sorted keys cache - UpdateEntryRef(key string, newRef *interface{}) -} diff --git a/internal/service/cache/deepcopy.go b/internal/service/cache/deepcopy.go deleted file mode 100644 index 66e60e9..0000000 --- a/internal/service/cache/deepcopy.go +++ /dev/null @@ -1,27 +0,0 @@ -package cache - -import ( - "encoding/json" -) - -// deepCopyStruct creates a copy of a struct by serializing and deserializing to JSON. -// Beware: Unexported fields are not copied. -func deepCopyStruct[T any](source T, targetPointer *T) error { - jsonBytes, err := json.Marshal(source) - if err != nil { - return err - } - err = json.Unmarshal(jsonBytes, targetPointer) - return err -} - -func deepCopyStringSlice(original []string) []string { - // the slice pointers returned by Cacheable already are a snapshot in time because - // the Cacheable switches the pointer and doesn't ever change existing values - - // now prevent users from inadvertently updating the slice from the outside by making a copy of its values - result := make([]string, len(original)) - // for strings, which are immutable in go, this _is_ a deep copy - copy(result, original) - return result -} diff --git a/internal/service/cache/deepcopy_test.go b/internal/service/cache/deepcopy_test.go deleted file mode 100644 index 957f569..0000000 --- a/internal/service/cache/deepcopy_test.go +++ /dev/null @@ -1,91 +0,0 @@ -package cache - -import ( - "github.com/stretchr/testify/require" - "testing" -) - -func TestDeepCopyStringSlice(t *testing.T) { - input := []string{"first", "WORLD", "problems"} - copyResult := deepCopyStringSlice(input) - require.Equal(t, input, copyResult, "Deep copied slices do not contain same elements.") - // Compare addresses of first slice elements to determine reference equality. - // We may not use require.NotEqual as it compares pointer values and not addresses. - // See https://stackoverflow.com/a/53010178 - require.False(t, &input[0] == ©Result[0], "Deep copied slices are identical.") -} - -func TestDeepCopyStruct(t *testing.T) { - type deepCopyTestStruct struct { - SomeString string - SomeInt int - SomeBool bool - SomeSlice []string - SomeMap map[string]string - SomePointer *string - SomeNilPointer *string - somePrivateField string - SomeInBetweenStruct struct { - SomeInBetweenString string - SomeInnerStruct struct { - SomeInnerString string - } - } - } - - mapField := make(map[string]string) - mapField["key"] = "value" - pointerValue := "pointer" - input := deepCopyTestStruct{ - SomeString: "SomeString", - SomeInt: 51324, - SomeBool: true, - SomeSlice: []string{"first", "second"}, - SomeMap: mapField, - SomePointer: &pointerValue, - SomeNilPointer: nil, - somePrivateField: "private", - SomeInBetweenStruct: struct { - SomeInBetweenString string - SomeInnerStruct struct { - SomeInnerString string - } - }{ - SomeInBetweenString: "inBetweenString", - SomeInnerStruct: struct { - SomeInnerString string - }{ - SomeInnerString: "innerString", - }, - }, - } - - result := deepCopyTestStruct{} - err := deepCopyStruct(input, &result) - - require.Nil(t, err) - require.Empty(t, result.somePrivateField) - require.EqualExportedValues(t, input, result) - require.False(t, &input == &result, "Structs are equal by reference.") -} - -func TestDeepCopyStruct_MarshalErrorIsReturned(t *testing.T) { - type errorStruct struct{ SomeFunction func(int) int } - input := errorStruct{ - SomeFunction: func(i int) int { - return i - }, - } - result := errorStruct{} - err := deepCopyStruct(input, &result) - require.ErrorContains(t, err, "unsupported type") -} - -func TestDeepCopyStruct_UnmarshalErrorIsReturned(t *testing.T) { - type someStruct struct{ SomeString string } - input := someStruct{ - SomeString: "someString", - } - err := deepCopyStruct(input, nil) - require.ErrorContains(t, err, "json: Unmarshal(nil") -} diff --git a/internal/service/cache/ownercache.go b/internal/service/cache/ownercache.go deleted file mode 100644 index d5992e1..0000000 --- a/internal/service/cache/ownercache.go +++ /dev/null @@ -1,49 +0,0 @@ -package cache - -import ( - "context" - "fmt" - "github.com/Interhyp/metadata-service/api" - "github.com/StephanHCB/go-backend-service-common/api/apierrors" -) - -func (s *Impl) SetOwnerListTimestamp(_ context.Context, timestamp string) { - s.OwnerCache.SetTimestamp(timestamp) -} - -func (s *Impl) GetOwnerListTimestamp(_ context.Context) string { - return s.OwnerCache.GetTimestamp() -} - -func (s *Impl) GetSortedOwnerAliases(_ context.Context) []string { - keysPtr := s.OwnerCache.GetSortedKeys() - return deepCopyStringSlice(*keysPtr) -} - -func (s *Impl) GetOwner(ctx context.Context, alias string) (openapi.OwnerDto, error) { - immutableOwnerPtr := s.OwnerCache.GetEntryRef(alias) - if immutableOwnerPtr == nil { - s.Logging.Logger().Ctx(ctx).Info().Printf("owner %v not found", alias) - return openapi.OwnerDto{}, apierrors.NewNotFoundError("owner.notfound", fmt.Sprintf("owner %s not found", alias), nil, s.Timestamp.Now()) - } else { - ownerCopy := openapi.OwnerDto{} - owner := (*immutableOwnerPtr).(openapi.OwnerDto) - err := deepCopyStruct(owner, &ownerCopy) - return ownerCopy, err - } -} - -func (s *Impl) PutOwner(_ context.Context, alias string, entry openapi.OwnerDto) { - var e interface{} - e = entry - - s.OwnerCache.UpdateEntryRef(alias, &e) -} - -func (s *Impl) DeleteOwner(_ context.Context, alias string) { - s.OwnerCache.UpdateEntryRef(alias, nil) - // TODO since this may come in from reading a manually made git commit, in this lowlevel cache we cascade - // - // s.scDeleteOwner(alias) - // s.rcDeleteOwner(alias) -} diff --git a/internal/service/cache/repositorycache.go b/internal/service/cache/repositorycache.go deleted file mode 100644 index bcc6373..0000000 --- a/internal/service/cache/repositorycache.go +++ /dev/null @@ -1,45 +0,0 @@ -package cache - -import ( - "context" - "fmt" - "github.com/Interhyp/metadata-service/api" - "github.com/StephanHCB/go-backend-service-common/api/apierrors" -) - -func (s *Impl) SetRepositoryListTimestamp(_ context.Context, timestamp string) { - s.RepositoryCache.SetTimestamp(timestamp) -} - -func (s *Impl) GetRepositoryListTimestamp(_ context.Context) string { - return s.RepositoryCache.GetTimestamp() -} - -func (s *Impl) GetSortedRepositoryKeys(_ context.Context) []string { - keysPtr := s.RepositoryCache.GetSortedKeys() - return deepCopyStringSlice(*keysPtr) -} - -func (s *Impl) GetRepository(ctx context.Context, key string) (openapi.RepositoryDto, error) { - immutableRepositoryPtr := s.RepositoryCache.GetEntryRef(key) - if immutableRepositoryPtr == nil { - s.Logging.Logger().Ctx(ctx).Info().Printf("repository %v not found", key) - return openapi.RepositoryDto{}, apierrors.NewNotFoundError("repository.notfound", fmt.Sprintf("repository %s not found", key), nil, s.Timestamp.Now()) - } else { - repoCopy := openapi.RepositoryDto{} - repo := (*immutableRepositoryPtr).(openapi.RepositoryDto) - err := deepCopyStruct(repo, &repoCopy) - return repoCopy, err - } -} - -func (s *Impl) PutRepository(_ context.Context, key string, entry openapi.RepositoryDto) { - var e interface{} - e = entry - - s.RepositoryCache.UpdateEntryRef(key, &e) -} - -func (s *Impl) DeleteRepository(_ context.Context, key string) { - s.RepositoryCache.UpdateEntryRef(key, nil) -} diff --git a/internal/service/cache/servicecache.go b/internal/service/cache/servicecache.go deleted file mode 100644 index 23ead4d..0000000 --- a/internal/service/cache/servicecache.go +++ /dev/null @@ -1,44 +0,0 @@ -package cache - -import ( - "context" - "fmt" - "github.com/Interhyp/metadata-service/api" - "github.com/StephanHCB/go-backend-service-common/api/apierrors" -) - -func (s *Impl) SetServiceListTimestamp(_ context.Context, timestamp string) { - s.ServiceCache.SetTimestamp(timestamp) -} - -func (s *Impl) GetServiceListTimestamp(_ context.Context) string { - return s.ServiceCache.GetTimestamp() -} - -func (s *Impl) GetSortedServiceNames(_ context.Context) []string { - keysPtr := s.ServiceCache.GetSortedKeys() - return deepCopyStringSlice(*keysPtr) -} - -func (s *Impl) GetService(ctx context.Context, name string) (openapi.ServiceDto, error) { - immutableServicePtr := s.ServiceCache.GetEntryRef(name) - if immutableServicePtr == nil { - return openapi.ServiceDto{}, apierrors.NewNotFoundError("service.notfound", fmt.Sprintf("service %s not found", name), nil, s.Timestamp.Now()) - } else { - serviceCopy := openapi.ServiceDto{} - service := (*immutableServicePtr).(openapi.ServiceDto) - err := deepCopyStruct(service, &serviceCopy) - return serviceCopy, err - } -} - -func (s *Impl) PutService(_ context.Context, name string, entry openapi.ServiceDto) { - var e interface{} - e = entry - - s.ServiceCache.UpdateEntryRef(name, &e) -} - -func (s *Impl) DeleteService(_ context.Context, name string) { - s.ServiceCache.UpdateEntryRef(name, nil) -} diff --git a/internal/service/owners/owners.go b/internal/service/owners/owners.go index 4531365..61bd2a8 100644 --- a/internal/service/owners/owners.go +++ b/internal/service/owners/owners.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "github.com/Interhyp/metadata-service/api" + "github.com/Interhyp/metadata-service/internal/acorn/repository" "github.com/Interhyp/metadata-service/internal/acorn/service" auzerolog "github.com/StephanHCB/go-autumn-logging-zerolog" "strings" @@ -16,7 +17,7 @@ type Impl struct { Configuration librepo.Configuration Logging librepo.Logging Timestamp librepo.Timestamp - Cache service.Cache + Cache repository.Cache Updater service.Updater } @@ -24,7 +25,7 @@ func New( configuration librepo.Configuration, logging librepo.Logging, timestamp librepo.Timestamp, - cache service.Cache, + cache repository.Cache, updater service.Updater, ) service.Owners { return &Impl{ @@ -51,10 +52,20 @@ func (s *Impl) Setup() error { func (s *Impl) GetOwners(ctx context.Context) (openapi.OwnerListDto, error) { result := openapi.OwnerListDto{ - Owners: make(map[string]openapi.OwnerDto), - TimeStamp: s.Cache.GetOwnerListTimestamp(ctx), + Owners: make(map[string]openapi.OwnerDto), } - for _, name := range s.Cache.GetSortedOwnerAliases(ctx) { + + stamp, err := s.Cache.GetOwnerListTimestamp(ctx) + if err != nil { + return result, err + } + result.TimeStamp = stamp + + names, err := s.Cache.GetSortedOwnerAliases(ctx) + if err != nil { + return result, err + } + for _, name := range names { owner, err := s.GetOwner(ctx, name) if err != nil { // owner not found errors are ok, the cache may have been changed concurrently, just drop the entry diff --git a/internal/service/repositories/repositories.go b/internal/service/repositories/repositories.go index b9cca73..1b3b1fe 100644 --- a/internal/service/repositories/repositories.go +++ b/internal/service/repositories/repositories.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/Interhyp/metadata-service/api" "github.com/Interhyp/metadata-service/internal/acorn/config" + "github.com/Interhyp/metadata-service/internal/acorn/repository" "github.com/Interhyp/metadata-service/internal/acorn/service" "github.com/Interhyp/metadata-service/internal/service/util" auzerolog "github.com/StephanHCB/go-autumn-logging-zerolog" @@ -19,7 +20,7 @@ type Impl struct { CustomConfiguration config.CustomConfiguration Logging librepo.Logging Timestamp librepo.Timestamp - Cache service.Cache + Cache repository.Cache Updater service.Updater Owners service.Owners } @@ -29,7 +30,7 @@ func New( customConfig config.CustomConfiguration, logging librepo.Logging, timestamp librepo.Timestamp, - cache service.Cache, + cache repository.Cache, updater service.Updater, owners service.Owners, ) service.Repositories { @@ -94,9 +95,14 @@ func (s *Impl) GetRepositories(ctx context.Context, ) (openapi.RepositoryListDto, error) { result := openapi.RepositoryListDto{ Repositories: make(map[string]openapi.RepositoryDto), - TimeStamp: s.Cache.GetRepositoryListTimestamp(ctx), } + stamp, err := s.Cache.GetRepositoryListTimestamp(ctx) + if err != nil { + return result, err + } + result.TimeStamp = stamp + useReferencedRepositoriesMap := false referencedRepositoriesMap := make(map[string]bool, 0) if serviceNameFilter != "" { @@ -110,7 +116,11 @@ func (s *Impl) GetRepositories(ctx context.Context, } } - for _, key := range s.Cache.GetSortedRepositoryKeys(ctx) { + keys, err := s.Cache.GetSortedRepositoryKeys(ctx) + if err != nil { + return openapi.RepositoryListDto{}, err + } + for _, key := range keys { if !useReferencedRepositoriesMap || referencedRepositoriesMap[key] { repository, err := s.GetRepository(ctx, key) if err != nil { @@ -583,7 +593,10 @@ func (s *Impl) DeleteRepository(ctx context.Context, key string, deletionInfo op return err } - allowed := s.Updater.CanMoveOrDeleteRepository(subCtx, key) + allowed, err := s.Updater.CanMoveOrDeleteRepository(subCtx, key) + if err != nil { + return err + } if !allowed { s.Logging.Logger().Ctx(ctx).Info().Printf("tried to delete repository %v, which is still referenced by its service", key) return apierrors.NewConflictError("repository.conflict.referenced", "this repository is still being referenced by a service and cannot be deleted", nil, s.Timestamp.Now()) diff --git a/internal/service/services/services.go b/internal/service/services/services.go index bb85e3d..56150cb 100644 --- a/internal/service/services/services.go +++ b/internal/service/services/services.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "github.com/Interhyp/metadata-service/internal/acorn/repository" auzerolog "github.com/StephanHCB/go-autumn-logging-zerolog" "strings" @@ -20,7 +21,7 @@ type Impl struct { CustomConfiguration config.CustomConfiguration Logging librepo.Logging Timestamp librepo.Timestamp - Cache service.Cache + Cache repository.Cache Updater service.Updater Repositories service.Repositories } @@ -30,7 +31,7 @@ func New( customConfig config.CustomConfiguration, logging librepo.Logging, timestamp librepo.Timestamp, - cache service.Cache, + cache repository.Cache, updater service.Updater, repositories service.Repositories, ) service.Services { @@ -61,11 +62,19 @@ func (s *Impl) Setup() error { var initialServiceLifecycle = "experimental" func (s *Impl) GetServices(ctx context.Context, ownerAliasFilter string) (openapi.ServiceListDto, error) { + stamp, err := s.Cache.GetServiceListTimestamp(ctx) + if err != nil { + return openapi.ServiceListDto{}, err + } result := openapi.ServiceListDto{ Services: make(map[string]openapi.ServiceDto), - TimeStamp: s.Cache.GetServiceListTimestamp(ctx), + TimeStamp: stamp, + } + names, err := s.Cache.GetSortedServiceNames(ctx) + if err != nil { + return openapi.ServiceListDto{}, err } - for _, name := range s.Cache.GetSortedServiceNames(ctx) { + for _, name := range names { theService, err := s.GetService(ctx, name) if err != nil { // service not found errors are ok, the cache may have been changed concurrently, just drop the entry @@ -477,7 +486,11 @@ func (s *Impl) validateDeletionDto(ctx context.Context, deletionInfo openapi.Del } func (s *Impl) addAllProductOwners(ctx context.Context, resultSet map[string]bool) error { - for _, alias := range s.Cache.GetSortedOwnerAliases(ctx) { + names, err := s.Cache.GetSortedOwnerAliases(ctx) + if err != nil { + return err + } + for _, alias := range names { owner, err := s.Cache.GetOwner(ctx, alias) if err != nil { // owner not found errors are ok, the cache may have been changed concurrently, just drop the entry diff --git a/internal/service/updater/owners.go b/internal/service/updater/owners.go index 98ab2ce..db9dbaa 100644 --- a/internal/service/updater/owners.go +++ b/internal/service/updater/owners.go @@ -106,7 +106,10 @@ func (s *Impl) updateOwners(ctx context.Context) error { } func (s *Impl) decideOwnersToAddUpdateOrRemove(ctx context.Context) (map[string]int8, error) { - cachedOwnerAliases := s.Cache.GetSortedOwnerAliases(ctx) + cachedOwnerAliases, err := s.Cache.GetSortedOwnerAliases(ctx) + if err != nil { + return nil, err + } currentOwnerAliases, err := s.Mapper.GetSortedOwnerAliases(ctx) if err != nil { diff --git a/internal/service/updater/repositories.go b/internal/service/updater/repositories.go index c153c5b..3df1c64 100644 --- a/internal/service/updater/repositories.go +++ b/internal/service/updater/repositories.go @@ -19,7 +19,10 @@ func (s *Impl) WriteRepository(ctx context.Context, key string, repository opena current, err := s.Cache.GetRepository(ctx, key) if err == nil && current.Owner != repository.Owner { - allowed := s.CanMoveOrDeleteRepository(subCtx, key) + allowed, err := s.CanMoveOrDeleteRepository(subCtx, key) + if err != nil { + return err + } if !allowed { s.Logging.Logger().Ctx(ctx).Info().Printf("tried to move repository %v, which is still referenced by its service", key) return apierrors.NewConflictError("repository.conflict.referenced", "this repository is being referenced in a service, you cannot change its owner directly - you can change the owner of the service and this will move it along", nil, s.Timestamp.Now()) @@ -128,7 +131,10 @@ func (s *Impl) updateRepositories(ctx context.Context) error { } func (s *Impl) decideRepositoriesToAddUpdateOrRemove(ctx context.Context) (map[string]int8, error) { - cachedRepositoryKeys := s.Cache.GetSortedRepositoryKeys(ctx) + cachedRepositoryKeys, err := s.Cache.GetSortedRepositoryKeys(ctx) + if err != nil { + return nil, err + } currentRepositoryKeys, err := s.Mapper.GetSortedRepositoryKeys(ctx) if err != nil { @@ -219,14 +225,18 @@ func (s *Impl) updateIndividualRepository(ctx context.Context, key string, isNew return nil } -func (s *Impl) CanMoveOrDeleteRepository(ctx context.Context, key string) bool { - for _, name := range s.Cache.GetSortedServiceNames(ctx) { +func (s *Impl) CanMoveOrDeleteRepository(ctx context.Context, key string) (bool, error) { + names, err := s.Cache.GetSortedServiceNames(ctx) + if err != nil { + return false, err + } + for _, name := range names { svc, _ := s.Cache.GetService(ctx, name) for _, candidateKey := range svc.Repositories { if key == candidateKey { - return false + return false, nil } } } - return true + return true, nil } diff --git a/internal/service/updater/services.go b/internal/service/updater/services.go index c57d573..4c82732 100644 --- a/internal/service/updater/services.go +++ b/internal/service/updater/services.go @@ -143,7 +143,10 @@ func (s *Impl) updateServices(ctx context.Context) error { } func (s *Impl) decideServicesToAddUpdateOrRemove(ctx context.Context) (map[string]int8, error) { - cachedServiceNames := s.Cache.GetSortedServiceNames(ctx) + cachedServiceNames, err := s.Cache.GetSortedServiceNames(ctx) + if err != nil { + return nil, err + } currentServiceNames, err := s.Mapper.GetSortedServiceNames(ctx) if err != nil { diff --git a/internal/service/updater/updater.go b/internal/service/updater/updater.go index a4fed2a..bf92c4c 100644 --- a/internal/service/updater/updater.go +++ b/internal/service/updater/updater.go @@ -23,7 +23,7 @@ type Impl struct { Kafka repository.Kafka Notifier repository.Notifier Mapper service.Mapper - Cache service.Cache + Cache repository.Cache mu sync.Mutex @@ -42,7 +42,7 @@ func New( kafka repository.Kafka, notifier repository.Notifier, mapper service.Mapper, - cache service.Cache, + cache repository.Cache, ) service.Updater { return &Impl{ Configuration: configuration, diff --git a/internal/web/app/app.go b/internal/web/app/app.go index 8d83a43..15f80b4 100644 --- a/internal/web/app/app.go +++ b/internal/web/app/app.go @@ -7,6 +7,7 @@ import ( "github.com/Interhyp/metadata-service/internal/acorn/repository" "github.com/Interhyp/metadata-service/internal/acorn/service" "github.com/Interhyp/metadata-service/internal/repository/bitbucket" + "github.com/Interhyp/metadata-service/internal/repository/cache" "github.com/Interhyp/metadata-service/internal/repository/config" "github.com/Interhyp/metadata-service/internal/repository/hostip" "github.com/Interhyp/metadata-service/internal/repository/idp" @@ -14,7 +15,6 @@ import ( "github.com/Interhyp/metadata-service/internal/repository/metadata" "github.com/Interhyp/metadata-service/internal/repository/notifier" "github.com/Interhyp/metadata-service/internal/repository/sshAuthProvider" - "github.com/Interhyp/metadata-service/internal/service/cache" "github.com/Interhyp/metadata-service/internal/service/mapper" "github.com/Interhyp/metadata-service/internal/service/owners" "github.com/Interhyp/metadata-service/internal/service/repositories" @@ -50,12 +50,12 @@ type ApplicationImpl struct { Timestamp librepo.Timestamp SshAuthProvider repository.SshAuthProvider Notifier repository.Notifier + Cache repository.Cache // services (business logic) Mapper service.Mapper Trigger service.Trigger Updater service.Updater - Cache service.Cache Owners service.Owners Services service.Services Repositories service.Repositories @@ -187,6 +187,11 @@ func (a *ApplicationImpl) ConstructRepositories() error { return err } + a.Cache = cache.New(a.Config, a.CustomConfig, a.Logging, a.Timestamp) + if err := a.Cache.Setup(); err != nil { + return err + } + return nil } @@ -198,11 +203,6 @@ func (a *ApplicationImpl) ConstructServices() error { return err } - a.Cache = cache.New(a.Config, a.Logging, a.Timestamp) - if err := a.Cache.Setup(); err != nil { - return err - } - a.Updater = updater.New(a.Config, a.CustomConfig, a.Logging, a.Timestamp, a.Kafka, a.Notifier, a.Mapper, a.Cache) if err := a.Updater.Setup(); err != nil { return err diff --git a/local-config.template.yaml b/local-config.template.yaml index 60911ba..a6e7460 100644 --- a/local-config.template.yaml +++ b/local-config.template.yaml @@ -26,6 +26,7 @@ VAULT_SECRETS_CONFIG: >- {"vaultKey": "BASIC_AUTH_PASSWORD"}, {"vaultKey": "BITBUCKET_PASSWORD"}, {"vaultKey": "KAFKA_PASSWORD"}, + {"vaultKey": "METADATA_CHANGE_EVENTS_CONNECTION_STRING"}, {"vaultKey": "SSH_PRIVATE_KEY"}, {"vaultKey": "SSH_PRIVATE_KEY_PASSWORD"} ] @@ -66,3 +67,18 @@ ALLOWED_FILE_CATEGORIES: '["template"]' # "url": "https://another.url.com/for/another/webhook" # } # } + +# Enable KAFKA communication (Azure event hub example) + +#KAFKA_TOPICS_CONFIG: >- +# { +# "metadata-change-events": { +# "topic": "metadata-change-events", +# "brokers": [ +# "example.com:9093" +# ], +# "username": "$ConnectionString", +# "passwordEnvVar": "METADATA_CHANGE_EVENTS_CONNECTION_STRING", +# "authType": "PLAIN" +# } +# } diff --git a/test/mock/cachemock/cachemock.go b/test/mock/cachemock/cachemock.go index 928d45c..4334549 100644 --- a/test/mock/cachemock/cachemock.go +++ b/test/mock/cachemock/cachemock.go @@ -16,16 +16,16 @@ func (s *Mock) Setup() error { return nil } -func (s *Mock) SetOwnerListTimestamp(ctx context.Context, timestamp string) { - +func (s *Mock) SetOwnerListTimestamp(ctx context.Context, timestamp string) error { + return nil } -func (s *Mock) GetOwnerListTimestamp(ctx context.Context) string { - return "" +func (s *Mock) GetOwnerListTimestamp(ctx context.Context) (string, error) { + return "", nil } -func (s *Mock) GetSortedOwnerAliases(ctx context.Context) []string { - return nil +func (s *Mock) GetSortedOwnerAliases(ctx context.Context) ([]string, error) { + return nil, nil } func (s *Mock) GetOwner(ctx context.Context, alias string) (openapi.OwnerDto, error) { @@ -37,52 +37,58 @@ func (s *Mock) GetOwner(ctx context.Context, alias string) (openapi.OwnerDto, er return openapi.OwnerDto{}, nil } -func (s *Mock) PutOwner(ctx context.Context, alias string, entry openapi.OwnerDto) { - +func (s *Mock) PutOwner(ctx context.Context, alias string, entry openapi.OwnerDto) error { + return nil } -func (s *Mock) DeleteOwner(ctx context.Context, alias string) {} +func (s *Mock) DeleteOwner(ctx context.Context, alias string) error { + return nil +} -func (s *Mock) SetServiceListTimestamp(ctx context.Context, timestamp string) {} +func (s *Mock) SetServiceListTimestamp(ctx context.Context, timestamp string) error { + return nil +} -func (s *Mock) GetServiceListTimestamp(ctx context.Context) string { - return "" +func (s *Mock) GetServiceListTimestamp(ctx context.Context) (string, error) { + return "", nil } -func (s *Mock) GetSortedServiceNames(ctx context.Context) []string { - return nil +func (s *Mock) GetSortedServiceNames(ctx context.Context) ([]string, error) { + return nil, nil } func (s *Mock) GetService(ctx context.Context, name string) (openapi.ServiceDto, error) { return openapi.ServiceDto{}, nil } -func (s *Mock) PutService(ctx context.Context, name string, entry openapi.ServiceDto) {} - -func (s *Mock) DeleteService(ctx context.Context, name string) { - +func (s *Mock) PutService(ctx context.Context, name string, entry openapi.ServiceDto) error { + return nil } -func (s *Mock) SetRepositoryListTimestamp(ctx context.Context, timestamp string) { +func (s *Mock) DeleteService(ctx context.Context, name string) error { + return nil +} +func (s *Mock) SetRepositoryListTimestamp(ctx context.Context, timestamp string) error { + return nil } -func (s *Mock) GetRepositoryListTimestamp(ctx context.Context) string { - return "" +func (s *Mock) GetRepositoryListTimestamp(ctx context.Context) (string, error) { + return "", nil } -func (s *Mock) GetSortedRepositoryKeys(ctx context.Context) []string { - return nil +func (s *Mock) GetSortedRepositoryKeys(ctx context.Context) ([]string, error) { + return nil, nil } func (s *Mock) GetRepository(ctx context.Context, key string) (openapi.RepositoryDto, error) { return openapi.RepositoryDto{}, nil } -func (s *Mock) PutRepository(ctx context.Context, key string, entry openapi.RepositoryDto) { - +func (s *Mock) PutRepository(ctx context.Context, key string, entry openapi.RepositoryDto) error { + return nil } -func (s *Mock) DeleteRepository(ctx context.Context, key string) { - +func (s *Mock) DeleteRepository(ctx context.Context, key string) error { + return nil } diff --git a/test/mock/configmock/configmock.go b/test/mock/configmock/configmock.go index 39c2901..5e8f5f0 100644 --- a/test/mock/configmock/configmock.go +++ b/test/mock/configmock/configmock.go @@ -2,6 +2,7 @@ package configmock import ( "github.com/Interhyp/metadata-service/internal/acorn/config" + "github.com/Roshick/go-autumn-kafka/pkg/aukafka" "regexp" ) @@ -126,26 +127,6 @@ func (c *MockConfig) GitCommitterEmail() string { panic("implement me") } -func (c *MockConfig) KafkaUsername() string { - //TODO implement me - panic("implement me") -} - -func (c *MockConfig) KafkaPassword() string { - //TODO implement me - panic("implement me") -} - -func (c *MockConfig) KafkaTopic() string { - //TODO implement me - panic("implement me") -} - -func (c *MockConfig) KafkaSeedBrokers() string { - //TODO implement me - panic("implement me") -} - func (c *MockConfig) KafkaGroupIdOverride() string { //TODO implement me panic("implement me") @@ -262,3 +243,16 @@ func (c *MockConfig) AllowedFileCategories() []string { //TODO implement me panic("implement me") } + +func (c *MockConfig) Kafka() *aukafka.Config { + //TODO implement me + panic("implement me") +} + +func (c *MockConfig) RedisUrl() string { + return "" +} + +func (c *MockConfig) RedisPassword() string { + return "" +} diff --git a/test/resources/invalid-config-values.yaml b/test/resources/invalid-config-values.yaml index 4c85867..4718c57 100644 --- a/test/resources/invalid-config-values.yaml +++ b/test/resources/invalid-config-values.yaml @@ -17,9 +17,6 @@ VAULT_SECRETS_CONFIG: '{}}' UPDATE_JOB_INTERVAL_MINUTES: 26 UPDATE_JOB_TIMEOUT_SECONDS: true -KAFKA_USERNAME: 'WeirdÄÄÄÄ' -KAFKA_TOPIC: 'CRAZY/äöü' -KAFKA_SEED_BROKERS: 'external.url:443' KAFKA_GROUP_ID_OVERRIDE: 'no banana, no spaces' NOTIFICATION_CONSUMER_CONFIGS: >- diff --git a/test/resources/valid-config-unique.yaml b/test/resources/valid-config-unique.yaml index 72feb05..73cf071 100644 --- a/test/resources/valid-config-unique.yaml +++ b/test/resources/valid-config-unique.yaml @@ -13,11 +13,6 @@ BITBUCKET_REVIEWER_FALLBACK: username GIT_COMMITTER_NAME: 'Body, Some' GIT_COMMITTER_EMAIL: 'somebody@somewhere.com' -KAFKA_USERNAME: 'some-kafka-username' -KAFKA_PASSWORD: 'some-kafka-password' -KAFKA_TOPIC: 'some-kafka-topic' -KAFKA_SEED_BROKERS: 'first-kafka-broker.domain.com:9092,second-kafka-broker.domain.com:9092' - AUTH_OIDC_KEY_SET_URL: http://keyset AUTH_OIDC_TOKEN_AUDIENCE: some-audience AUTH_GROUP_WRITE: admin