From 6f253d5ffa8c869898ab7347bd594ea6148addb0 Mon Sep 17 00:00:00 2001 From: "STeve (Xin) Huang" Date: Mon, 3 Jun 2024 14:59:42 -0400 Subject: [PATCH] GCP SQL PostgreSQL backend (#41392) * POC gcp backend * refactor pgcommon and add UT * fix for work identity and refactor * fix ttl and tab * fix lint * review comment round 01 * use connector lib * pass in gcp config instead of relying on connection string * fix typo and lint * Convert pgbk and pgevents to slog * tries -> attempts * make grpc --------- Co-authored-by: Edoardo Spadolini --- go.mod | 9 +- go.sum | 13 +- integrations/event-handler/go.mod | 4 +- integrations/event-handler/go.sum | 8 +- integrations/terraform/go.mod | 9 +- integrations/terraform/go.sum | 19 ++- lib/backend/pgbk/atomicwrite.go | 17 ++- lib/backend/pgbk/background.go | 40 ++--- lib/backend/pgbk/common/auth.go | 122 +++++++++++++++ lib/backend/pgbk/common/auth_test.go | 217 +++++++++++++++++++++++++++ lib/backend/pgbk/common/azure.go | 6 +- lib/backend/pgbk/common/gcp.go | 180 ++++++++++++++++++++++ lib/backend/pgbk/common/gcp_test.go | 103 +++++++++++++ lib/backend/pgbk/common/utils.go | 51 ++++--- lib/backend/pgbk/pgbk.go | 51 ++----- lib/events/pgevents/pgevents.go | 63 +++----- lib/events/pgevents/pgevents_test.go | 14 +- lib/utils/gcp/gcp.go | 68 ++++++++- lib/utils/gcp/gcp_test.go | 70 +++++++++ 19 files changed, 899 insertions(+), 165 deletions(-) create mode 100644 lib/backend/pgbk/common/auth.go create mode 100644 lib/backend/pgbk/common/auth_test.go create mode 100644 lib/backend/pgbk/common/gcp.go create mode 100644 lib/backend/pgbk/common/gcp_test.go diff --git a/go.mod b/go.mod index fe8c41c5b75d6..3c181fb75cc2c 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.22.0 toolchain go1.22.3 require ( + cloud.google.com/go/cloudsqlconn v1.10.0 cloud.google.com/go/compute v1.26.0 cloud.google.com/go/compute/metadata v0.3.0 cloud.google.com/go/container v1.35.1 @@ -146,7 +147,7 @@ require ( github.com/mailgun/ttlmap v0.0.0-20170619185759-c1c17f74874f github.com/mattn/go-sqlite3 v1.14.22 github.com/mdlayher/netlink v1.7.2 - github.com/microsoft/go-mssqldb v0.0.0-00010101000000-000000000000 // replaced + github.com/microsoft/go-mssqldb v1.7.1 // replaced github.com/microsoft/kiota-authentication-azure-go v1.0.2 github.com/microsoftgraph/msgraph-sdk-go v1.40.0 github.com/microsoftgraph/msgraph-sdk-go-core v1.1.0 @@ -207,7 +208,7 @@ require ( golang.org/x/text v0.15.0 golang.org/x/time v0.5.0 golang.zx2c4.com/wireguard v0.0.0-20231211153847-12269c276173 - google.golang.org/api v0.177.0 + google.golang.org/api v0.180.0 google.golang.org/genproto/googleapis/rpc v0.0.0-20240520151616-dc85e6b867a5 google.golang.org/grpc v1.64.0 google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.3.0 @@ -241,7 +242,7 @@ require github.com/mailgun/minheap v0.0.0-20170619185613-3dbe6c6bf55f // indirec require ( cloud.google.com/go v0.112.2 // indirect - cloud.google.com/go/auth v0.3.0 // indirect + cloud.google.com/go/auth v0.4.1 // indirect cloud.google.com/go/auth/oauth2adapt v0.2.2 // indirect cloud.google.com/go/longrunning v0.5.6 // indirect cloud.google.com/go/pubsub v1.37.0 // indirect @@ -355,7 +356,7 @@ require ( github.com/goccy/go-json v0.10.2 // indirect github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect github.com/golang-jwt/jwt/v5 v5.2.1 // indirect - github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe // indirect + github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // indirect github.com/golang-sql/sqlexp v0.1.0 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.4 // indirect diff --git a/go.sum b/go.sum index df47ce6e6e99b..79239e9ec393a 100644 --- a/go.sum +++ b/go.sum @@ -99,8 +99,8 @@ cloud.google.com/go/assuredworkloads v1.7.0/go.mod h1:z/736/oNmtGAyU47reJgGN+KVo cloud.google.com/go/assuredworkloads v1.8.0/go.mod h1:AsX2cqyNCOvEQC8RMPnoc0yEarXQk6WEKkxYfL6kGIo= cloud.google.com/go/assuredworkloads v1.9.0/go.mod h1:kFuI1P78bplYtT77Tb1hi0FMxM0vVpRC7VVoJC3ZoT0= cloud.google.com/go/assuredworkloads v1.10.0/go.mod h1:kwdUQuXcedVdsIaKgKTp9t0UJkE5+PAVNhdQm4ZVq2E= -cloud.google.com/go/auth v0.3.0 h1:PRyzEpGfx/Z9e8+lHsbkoUVXD0gnu4MNmm7Gp8TQNIs= -cloud.google.com/go/auth v0.3.0/go.mod h1:lBv6NKTWp8E3LPzmO1TbiiRKc4drLOfHsgmlH9ogv5w= +cloud.google.com/go/auth v0.4.1 h1:Z7YNIhlWRtrnKlZke7z3GMqzvuYzdc2z98F9D1NV5Hg= +cloud.google.com/go/auth v0.4.1/go.mod h1:QVBuVEKpCn4Zp58hzRGvL0tjRGU0YqdRTdCHM1IHnro= cloud.google.com/go/auth/oauth2adapt v0.2.2 h1:+TTV8aXpjeChS9M+aTtN/TjdQnzJvmzKFt//oWu7HX4= cloud.google.com/go/auth/oauth2adapt v0.2.2/go.mod h1:wcYjgpZI9+Yu7LyYBg4pqSiaRkfEK3GQcpb7C/uyF1Q= cloud.google.com/go/automl v1.5.0/go.mod h1:34EjfoFGMZ5sgJ9EoLsRtdPSNZLcfflJR39VbVNS2M0= @@ -157,6 +157,8 @@ cloud.google.com/go/cloudbuild v1.9.0/go.mod h1:qK1d7s4QlO0VwfYn5YuClDGg2hfmLZEb cloud.google.com/go/clouddms v1.3.0/go.mod h1:oK6XsCDdW4Ib3jCCBugx+gVjevp2TMXFtgxvPSee3OM= cloud.google.com/go/clouddms v1.4.0/go.mod h1:Eh7sUGCC+aKry14O1NRljhjyrr0NFC0G2cjwX0cByRk= cloud.google.com/go/clouddms v1.5.0/go.mod h1:QSxQnhikCLUw13iAbffF2CZxAER3xDGNHjsTAkQJcQA= +cloud.google.com/go/cloudsqlconn v1.10.0 h1:8ixabtaDQKPjkYYY+cm+Zq7zvIonYzKfgOqfA/1s0PI= +cloud.google.com/go/cloudsqlconn v1.10.0/go.mod h1:FLQhC5rt+1c0tXEujrppQffRMO1EDdj5qappSZj7xMI= cloud.google.com/go/cloudtasks v1.5.0/go.mod h1:fD92REy1x5woxkKEkLdvavGnPJGEn8Uic9nWuLzqCpY= cloud.google.com/go/cloudtasks v1.6.0/go.mod h1:C6Io+sxuke9/KNRkbQpihnW93SWDU3uXt92nu85HkYI= cloud.google.com/go/cloudtasks v1.7.0/go.mod h1:ImsfdYWwlWNJbdgPIIGJWC+gemEGTBK/SunNQQNCAb4= @@ -1324,8 +1326,9 @@ github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOW github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= github.com/golang-jwt/jwt/v5 v5.2.1 h1:OuVbFODueb089Lh128TAcimifWaLhJwVflnrgM17wHk= github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= -github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe h1:lXe2qZdvpiX5WZkZR4hgp4KJVfY3nMkvmwbVkpv1rVY= github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0= +github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 h1:au07oEsX2xN0ktxqI+Sida1w446QrXBRJ0nee3SNZlA= +github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0= github.com/golang-sql/sqlexp v0.1.0 h1:ZCD6MBpcuOVfGVqsEmY5/4FtYiKz6tSyUv9LPEDei6A= github.com/golang-sql/sqlexp v0.1.0/go.mod h1:J4ad9Vo8ZCWQ2GMrC4UCQy1JpCbwU9m3EOqtpKwwwHI= github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k= @@ -2944,8 +2947,8 @@ google.golang.org/api v0.108.0/go.mod h1:2Ts0XTHNVWxypznxWOYUeI4g3WdP9Pk2Qk58+a/ google.golang.org/api v0.110.0/go.mod h1:7FC4Vvx1Mooxh8C5HWjzZHcavuS2f6pmJpZx60ca7iI= google.golang.org/api v0.111.0/go.mod h1:qtFHvU9mhgTJegR31csQ+rwxyUTHOKFqCKWp1J0fdw0= google.golang.org/api v0.114.0/go.mod h1:ifYI2ZsFK6/uGddGfAD5BMxlnkBqCmqHSDUVi45N5Yg= -google.golang.org/api v0.177.0 h1:8a0p/BbPa65GlqGWtUKxot4p0TV8OGOfyTjtmkXNXmk= -google.golang.org/api v0.177.0/go.mod h1:srbhue4MLjkjbkux5p3dw/ocYOSZTaIEvf7bCOnFQDw= +google.golang.org/api v0.180.0 h1:M2D87Yo0rGBPWpo1orwfCLehUUL6E7/TYe5gvMQWDh4= +google.golang.org/api v0.180.0/go.mod h1:51AiyoEg1MJPSZ9zvklA8VnRILPXxn1iVen9v25XHAE= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= diff --git a/integrations/event-handler/go.mod b/integrations/event-handler/go.mod index 0a7332d18bb6e..1a91d1c6026c9 100644 --- a/integrations/event-handler/go.mod +++ b/integrations/event-handler/go.mod @@ -24,7 +24,7 @@ require ( require ( cloud.google.com/go v0.112.2 // indirect - cloud.google.com/go/auth v0.3.0 // indirect + cloud.google.com/go/auth v0.4.1 // indirect cloud.google.com/go/auth/oauth2adapt v0.2.2 // indirect cloud.google.com/go/compute v1.26.0 // indirect cloud.google.com/go/compute/metadata v0.3.0 // indirect @@ -296,7 +296,7 @@ require ( golang.org/x/term v0.20.0 // indirect golang.org/x/text v0.15.0 // indirect golang.org/x/time v0.5.0 // indirect - google.golang.org/api v0.177.0 // indirect + google.golang.org/api v0.180.0 // indirect google.golang.org/genproto v0.0.0-20240401170217-c3f982113cda // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240520151616-dc85e6b867a5 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240520151616-dc85e6b867a5 // indirect diff --git a/integrations/event-handler/go.sum b/integrations/event-handler/go.sum index 0984f141544ec..375a34a1bcb38 100644 --- a/integrations/event-handler/go.sum +++ b/integrations/event-handler/go.sum @@ -99,8 +99,8 @@ cloud.google.com/go/assuredworkloads v1.7.0/go.mod h1:z/736/oNmtGAyU47reJgGN+KVo cloud.google.com/go/assuredworkloads v1.8.0/go.mod h1:AsX2cqyNCOvEQC8RMPnoc0yEarXQk6WEKkxYfL6kGIo= cloud.google.com/go/assuredworkloads v1.9.0/go.mod h1:kFuI1P78bplYtT77Tb1hi0FMxM0vVpRC7VVoJC3ZoT0= cloud.google.com/go/assuredworkloads v1.10.0/go.mod h1:kwdUQuXcedVdsIaKgKTp9t0UJkE5+PAVNhdQm4ZVq2E= -cloud.google.com/go/auth v0.3.0 h1:PRyzEpGfx/Z9e8+lHsbkoUVXD0gnu4MNmm7Gp8TQNIs= -cloud.google.com/go/auth v0.3.0/go.mod h1:lBv6NKTWp8E3LPzmO1TbiiRKc4drLOfHsgmlH9ogv5w= +cloud.google.com/go/auth v0.4.1 h1:Z7YNIhlWRtrnKlZke7z3GMqzvuYzdc2z98F9D1NV5Hg= +cloud.google.com/go/auth v0.4.1/go.mod h1:QVBuVEKpCn4Zp58hzRGvL0tjRGU0YqdRTdCHM1IHnro= cloud.google.com/go/auth/oauth2adapt v0.2.2 h1:+TTV8aXpjeChS9M+aTtN/TjdQnzJvmzKFt//oWu7HX4= cloud.google.com/go/auth/oauth2adapt v0.2.2/go.mod h1:wcYjgpZI9+Yu7LyYBg4pqSiaRkfEK3GQcpb7C/uyF1Q= cloud.google.com/go/automl v1.5.0/go.mod h1:34EjfoFGMZ5sgJ9EoLsRtdPSNZLcfflJR39VbVNS2M0= @@ -2166,8 +2166,8 @@ google.golang.org/api v0.108.0/go.mod h1:2Ts0XTHNVWxypznxWOYUeI4g3WdP9Pk2Qk58+a/ google.golang.org/api v0.110.0/go.mod h1:7FC4Vvx1Mooxh8C5HWjzZHcavuS2f6pmJpZx60ca7iI= google.golang.org/api v0.111.0/go.mod h1:qtFHvU9mhgTJegR31csQ+rwxyUTHOKFqCKWp1J0fdw0= google.golang.org/api v0.114.0/go.mod h1:ifYI2ZsFK6/uGddGfAD5BMxlnkBqCmqHSDUVi45N5Yg= -google.golang.org/api v0.177.0 h1:8a0p/BbPa65GlqGWtUKxot4p0TV8OGOfyTjtmkXNXmk= -google.golang.org/api v0.177.0/go.mod h1:srbhue4MLjkjbkux5p3dw/ocYOSZTaIEvf7bCOnFQDw= +google.golang.org/api v0.180.0 h1:M2D87Yo0rGBPWpo1orwfCLehUUL6E7/TYe5gvMQWDh4= +google.golang.org/api v0.180.0/go.mod h1:51AiyoEg1MJPSZ9zvklA8VnRILPXxn1iVen9v25XHAE= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= diff --git a/integrations/terraform/go.mod b/integrations/terraform/go.mod index 583b58b7b8353..d757009a9b944 100644 --- a/integrations/terraform/go.mod +++ b/integrations/terraform/go.mod @@ -24,8 +24,9 @@ require ( require ( cloud.google.com/go v0.112.2 // indirect - cloud.google.com/go/auth v0.3.0 // indirect + cloud.google.com/go/auth v0.4.1 // indirect cloud.google.com/go/auth/oauth2adapt v0.2.2 // indirect + cloud.google.com/go/cloudsqlconn v1.10.0 // indirect cloud.google.com/go/compute v1.26.0 // indirect cloud.google.com/go/compute/metadata v0.3.0 // indirect cloud.google.com/go/container v1.35.1 // indirect @@ -188,7 +189,7 @@ require ( github.com/gofrs/flock v0.8.1 // indirect github.com/golang-jwt/jwt/v4 v4.5.0 // indirect github.com/golang-jwt/jwt/v5 v5.2.1 // indirect - github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe // indirect + github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // indirect github.com/golang-sql/sqlexp v0.1.0 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.4 // indirect @@ -289,7 +290,7 @@ require ( github.com/mattn/go-sqlite3 v1.14.22 // indirect github.com/mdlayher/netlink v1.7.2 // indirect github.com/mdlayher/socket v0.5.1 // indirect - github.com/microsoft/go-mssqldb v0.0.0-00010101000000-000000000000 // indirect + github.com/microsoft/go-mssqldb v1.7.1 // indirect github.com/microsoft/kiota-abstractions-go v1.6.0 // indirect github.com/microsoft/kiota-authentication-azure-go v1.0.2 // indirect github.com/microsoft/kiota-http-go v1.3.1 // indirect @@ -414,7 +415,7 @@ require ( golang.org/x/time v0.5.0 // indirect golang.org/x/tools v0.19.0 // indirect golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect - google.golang.org/api v0.177.0 // indirect + google.golang.org/api v0.180.0 // indirect google.golang.org/appengine v1.6.8 // indirect google.golang.org/genproto v0.0.0-20240401170217-c3f982113cda // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240520151616-dc85e6b867a5 // indirect diff --git a/integrations/terraform/go.sum b/integrations/terraform/go.sum index ce88e49a0b0db..c994ebc95ae91 100644 --- a/integrations/terraform/go.sum +++ b/integrations/terraform/go.sum @@ -100,8 +100,8 @@ cloud.google.com/go/assuredworkloads v1.7.0/go.mod h1:z/736/oNmtGAyU47reJgGN+KVo cloud.google.com/go/assuredworkloads v1.8.0/go.mod h1:AsX2cqyNCOvEQC8RMPnoc0yEarXQk6WEKkxYfL6kGIo= cloud.google.com/go/assuredworkloads v1.9.0/go.mod h1:kFuI1P78bplYtT77Tb1hi0FMxM0vVpRC7VVoJC3ZoT0= cloud.google.com/go/assuredworkloads v1.10.0/go.mod h1:kwdUQuXcedVdsIaKgKTp9t0UJkE5+PAVNhdQm4ZVq2E= -cloud.google.com/go/auth v0.3.0 h1:PRyzEpGfx/Z9e8+lHsbkoUVXD0gnu4MNmm7Gp8TQNIs= -cloud.google.com/go/auth v0.3.0/go.mod h1:lBv6NKTWp8E3LPzmO1TbiiRKc4drLOfHsgmlH9ogv5w= +cloud.google.com/go/auth v0.4.1 h1:Z7YNIhlWRtrnKlZke7z3GMqzvuYzdc2z98F9D1NV5Hg= +cloud.google.com/go/auth v0.4.1/go.mod h1:QVBuVEKpCn4Zp58hzRGvL0tjRGU0YqdRTdCHM1IHnro= cloud.google.com/go/auth/oauth2adapt v0.2.2 h1:+TTV8aXpjeChS9M+aTtN/TjdQnzJvmzKFt//oWu7HX4= cloud.google.com/go/auth/oauth2adapt v0.2.2/go.mod h1:wcYjgpZI9+Yu7LyYBg4pqSiaRkfEK3GQcpb7C/uyF1Q= cloud.google.com/go/automl v1.5.0/go.mod h1:34EjfoFGMZ5sgJ9EoLsRtdPSNZLcfflJR39VbVNS2M0= @@ -158,6 +158,8 @@ cloud.google.com/go/cloudbuild v1.9.0/go.mod h1:qK1d7s4QlO0VwfYn5YuClDGg2hfmLZEb cloud.google.com/go/clouddms v1.3.0/go.mod h1:oK6XsCDdW4Ib3jCCBugx+gVjevp2TMXFtgxvPSee3OM= cloud.google.com/go/clouddms v1.4.0/go.mod h1:Eh7sUGCC+aKry14O1NRljhjyrr0NFC0G2cjwX0cByRk= cloud.google.com/go/clouddms v1.5.0/go.mod h1:QSxQnhikCLUw13iAbffF2CZxAER3xDGNHjsTAkQJcQA= +cloud.google.com/go/cloudsqlconn v1.10.0 h1:8ixabtaDQKPjkYYY+cm+Zq7zvIonYzKfgOqfA/1s0PI= +cloud.google.com/go/cloudsqlconn v1.10.0/go.mod h1:FLQhC5rt+1c0tXEujrppQffRMO1EDdj5qappSZj7xMI= cloud.google.com/go/cloudtasks v1.5.0/go.mod h1:fD92REy1x5woxkKEkLdvavGnPJGEn8Uic9nWuLzqCpY= cloud.google.com/go/cloudtasks v1.6.0/go.mod h1:C6Io+sxuke9/KNRkbQpihnW93SWDU3uXt92nu85HkYI= cloud.google.com/go/cloudtasks v1.7.0/go.mod h1:ImsfdYWwlWNJbdgPIIGJWC+gemEGTBK/SunNQQNCAb4= @@ -624,6 +626,8 @@ cloud.google.com/go/workflows v1.10.0/go.mod h1:fZ8LmRmZQWacon9UCX1r/g/DfAXx5VcP connectrpc.com/connect v1.16.1 h1:rOdrK/RTI/7TVnn3JsVxt3n028MlTRwmK5Q4heSpjis= connectrpc.com/connect v1.16.1/go.mod h1:XpZAduBQUySsb4/KO5JffORVkDI4B6/EYPi7N8xpNZw= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= +filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= +filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= gioui.org v0.0.0-20210308172011-57750fc8a0a6/go.mod h1:RSH6KIUZ0p2xy5zHDxgAM4zumjgTw83q2ge/PI+yyw8= git.sr.ht/~sbinet/gg v0.3.1/go.mod h1:KGYtlADtqsqANL9ueOFkWymvzUvLMQllU5Ixo+8v3pc= github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 h1:/vQbFIOMbk2FiG/kXiLl8BRyzTWDw7gX/Hz7Dd5eDMs= @@ -1148,8 +1152,8 @@ github.com/go-resty/resty/v2 v2.12.0/go.mod h1:o0yGPrkS3lOe1+eFajk6kBW8ScXzwU3hD github.com/go-sql-driver/mysql v1.3.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= -github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI= -github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= +github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= +github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls= @@ -1191,8 +1195,9 @@ github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOW github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= github.com/golang-jwt/jwt/v5 v5.2.1 h1:OuVbFODueb089Lh128TAcimifWaLhJwVflnrgM17wHk= github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= -github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe h1:lXe2qZdvpiX5WZkZR4hgp4KJVfY3nMkvmwbVkpv1rVY= github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0= +github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 h1:au07oEsX2xN0ktxqI+Sida1w446QrXBRJ0nee3SNZlA= +github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0= github.com/golang-sql/sqlexp v0.1.0 h1:ZCD6MBpcuOVfGVqsEmY5/4FtYiKz6tSyUv9LPEDei6A= github.com/golang-sql/sqlexp v0.1.0/go.mod h1:J4ad9Vo8ZCWQ2GMrC4UCQy1JpCbwU9m3EOqtpKwwwHI= github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k= @@ -2712,8 +2717,8 @@ google.golang.org/api v0.108.0/go.mod h1:2Ts0XTHNVWxypznxWOYUeI4g3WdP9Pk2Qk58+a/ google.golang.org/api v0.110.0/go.mod h1:7FC4Vvx1Mooxh8C5HWjzZHcavuS2f6pmJpZx60ca7iI= google.golang.org/api v0.111.0/go.mod h1:qtFHvU9mhgTJegR31csQ+rwxyUTHOKFqCKWp1J0fdw0= google.golang.org/api v0.114.0/go.mod h1:ifYI2ZsFK6/uGddGfAD5BMxlnkBqCmqHSDUVi45N5Yg= -google.golang.org/api v0.177.0 h1:8a0p/BbPa65GlqGWtUKxot4p0TV8OGOfyTjtmkXNXmk= -google.golang.org/api v0.177.0/go.mod h1:srbhue4MLjkjbkux5p3dw/ocYOSZTaIEvf7bCOnFQDw= +google.golang.org/api v0.180.0 h1:M2D87Yo0rGBPWpo1orwfCLehUUL6E7/TYe5gvMQWDh4= +google.golang.org/api v0.180.0/go.mod h1:51AiyoEg1MJPSZ9zvklA8VnRILPXxn1iVen9v25XHAE= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= diff --git a/lib/backend/pgbk/atomicwrite.go b/lib/backend/pgbk/atomicwrite.go index 4e263c70ab07c..78d08e0b648b8 100644 --- a/lib/backend/pgbk/atomicwrite.go +++ b/lib/backend/pgbk/atomicwrite.go @@ -101,9 +101,9 @@ func (b *Backend) AtomicWrite(ctx context.Context, condacts []backend.Conditiona return trace.Wrap(row.Scan(&success)) } - var tries int + var attempts int err = pgcommon.RetryTx(ctx, b.log, b.pool, pgx.TxOptions{}, false, func(tx pgx.Tx) error { - tries++ + attempts++ var condBatch, actBatch pgx.Batch for _, bi := range condBatchItems { @@ -130,14 +130,15 @@ func (b *Backend) AtomicWrite(ctx context.Context, condacts []backend.Conditiona return nil }) - if tries > 1 { - backend.AtomicWriteContention.WithLabelValues(b.GetName()).Add(float64(tries - 1)) + if attempts > 1 { + backend.AtomicWriteContention.WithLabelValues(b.GetName()).Add(float64(attempts - 1)) } - if tries > 2 { - // if we retried more than once, txn experienced non-trivial conflict and we should warn about it. Infrequent warnings of this kind - // are nothing to be concerned about, but high volumes may indicate that an automatic process is creating excessive conflicts. - b.log.Warnf("AtomicWrite retried %d times due to postgres transaction contention. Some conflict is expected, but persistent conflict warnings may indicate an unhealthy state.", tries) + if attempts > 2 { + b.log.WarnContext(ctx, + "AtomicWrite was retried several times due to transaction contention. Some conflict is expected, but persistent conflict warnings may indicate an unhealthy state.", + "attempts", attempts, + ) } if err != nil { diff --git a/lib/backend/pgbk/background.go b/lib/backend/pgbk/background.go index 15fcf06d17657..5a0daebe564d9 100644 --- a/lib/backend/pgbk/background.go +++ b/lib/backend/pgbk/background.go @@ -22,13 +22,13 @@ import ( "context" "encoding/json" "fmt" + "log/slog" "time" "github.com/google/uuid" "github.com/gravitational/trace" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" - "github.com/sirupsen/logrus" "github.com/gravitational/teleport/lib/backend" pgcommon "github.com/gravitational/teleport/lib/backend/pgbk/common" @@ -36,7 +36,7 @@ import ( ) func (b *Backend) backgroundExpiry(ctx context.Context) { - defer b.log.Info("Exited expiry loop.") + defer b.log.InfoContext(ctx, "Exited expiry loop.") for ctx.Err() == nil { // "DELETE FROM kv WHERE expires <= now()" but more complicated: logical @@ -71,15 +71,15 @@ func (b *Backend) backgroundExpiry(ctx context.Context) { return tag.RowsAffected(), nil }) if err != nil { - b.log.WithError(err).Error("Failed to delete expired items.") + b.log.ErrorContext(ctx, "Failed to delete expired items.", "error", err) break } if deleted > 0 { - b.log.WithFields(logrus.Fields{ - "deleted": deleted, - "elapsed": time.Since(t0).String(), - }).Debug("Deleted expired items.") + b.log.DebugContext(ctx, "Deleted expired items.", + "deleted", deleted, + "elapsed", time.Since(t0), + ) } if deleted < int64(b.cfg.ExpiryBatchSize) { @@ -96,16 +96,16 @@ func (b *Backend) backgroundExpiry(ctx context.Context) { } func (b *Backend) backgroundChangeFeed(ctx context.Context) { - defer b.log.Info("Exited change feed loop.") + defer b.log.InfoContext(ctx, "Exited change feed loop.") defer b.buf.Close() for ctx.Err() == nil { - b.log.Info("Starting change feed stream.") + b.log.InfoContext(ctx, "Starting change feed stream.") err := b.runChangeFeed(ctx) if ctx.Err() != nil { break } - b.log.WithError(err).Error("Change feed stream lost.") + b.log.ErrorContext(ctx, "Change feed stream lost.", "error", err) select { case <-ctx.Done(): @@ -135,7 +135,7 @@ func (b *Backend) runChangeFeed(ctx context.Context) error { closeCtx, cancel := context.WithTimeout(ctx, 3*time.Second) defer cancel() if err := conn.Close(closeCtx); err != nil && closeCtx.Err() != nil { - b.log.WithError(err).Warn("Error closing change feed connection.") + b.log.WarnContext(ctx, "Error closing change feed connection.", "error", err) } }() if ac := b.feedConfig.AfterConnect; ac != nil { @@ -164,7 +164,7 @@ func (b *Backend) runChangeFeed(ctx context.Context) error { // permission issues, which would delete the temporary slot (it's deleted on // any error), so we have to do it before that if _, err := conn.Exec(ctx, "SET log_min_messages TO fatal", pgx.QueryExecModeExec); err != nil { - b.log.WithError(err).Debug("Failed to silence log messages for change feed session.") + b.log.DebugContext(ctx, "Failed to silence log messages for change feed session.", "error", err) } // this can be useful on Azure if we have azure_pg_admin permissions but not @@ -174,12 +174,12 @@ func (b *Backend) runChangeFeed(ctx context.Context) error { // // HACK(espadolini): ALTER ROLE CURRENT_USER crashes Postgres on Azure, so // we have to use an explicit username - if b.cfg.AuthMode == AzureADAuth && connConfig.User != "" { + if b.cfg.AuthMode == pgcommon.AzureADAuth && connConfig.User != "" { if _, err := conn.Exec(ctx, fmt.Sprintf("ALTER ROLE %v REPLICATION", pgx.Identifier{connConfig.User}.Sanitize()), pgx.QueryExecModeExec, ); err != nil { - b.log.WithError(err).Debug("Failed to enable replication for the current user.") + b.log.DebugContext(ctx, "Failed to enable replication for the current user.", "error", err) } } @@ -188,7 +188,7 @@ func (b *Backend) runChangeFeed(ctx context.Context) error { // https://github.com/postgres/postgres/blob/b0ec61c9c27fb932ae6524f92a18e0d1fadbc144/src/backend/replication/slot.c#L193-L194 slotName := fmt.Sprintf("teleport_%x", [16]byte(uuid.New())) - b.log.WithField("slot_name", slotName).Info("Setting up change feed.") + b.log.InfoContext(ctx, "Setting up change feed.", "slot_name", slotName) // be noisy about pg_create_logical_replication_slot taking too long, since // hanging here leaves the backend non-functional @@ -202,7 +202,7 @@ func (b *Backend) runChangeFeed(ctx context.Context) error { } cancel() - b.log.WithField("slot_name", slotName).Info("Change feed started.") + b.log.InfoContext(ctx, "Change feed started.", "slot_name", slotName) b.buf.SetInit() defer b.buf.Reset() @@ -260,10 +260,10 @@ func (b *Backend) pollChangeFeed(ctx context.Context, conn *pgx.Conn, addTables, messages := tag.RowsAffected() if messages > 0 { - b.log.WithFields(logrus.Fields{ - "messages": messages, - "elapsed": time.Since(t0).String(), - }).Debug("Fetched change feed events.") + b.log.LogAttrs(ctx, slog.LevelDebug, "Fetched change feed events.", + slog.Int64("messages", messages), + slog.Duration("elapsed", time.Since(t0)), + ) } return messages, nil diff --git a/lib/backend/pgbk/common/auth.go b/lib/backend/pgbk/common/auth.go new file mode 100644 index 0000000000000..ba150ce635bca --- /dev/null +++ b/lib/backend/pgbk/common/auth.go @@ -0,0 +1,122 @@ +/* + * Teleport + * Copyright (C) 2024 Gravitational, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package pgcommon + +import ( + "context" + "log/slog" + "slices" + + "github.com/gravitational/trace" + "github.com/jackc/pgx/v5/pgxpool" + + apiutils "github.com/gravitational/teleport/api/utils" +) + +// AuthMode determines if we should use some environment-specific authentication +// mechanism or credentials. +type AuthMode string + +const ( + // StaticAuth uses the static credentials as defined in the connection + // string. + StaticAuth AuthMode = "" + // AzureADAuth gets a connection token from Azure and uses it as the + // password when connecting. + AzureADAuth AuthMode = "azure" + // GCPCloudSQLIAMAuth fetches an access token and uses it as password when + // connecting to GCP Cloud SQL PostgreSQL. + GCPCloudSQLIAMAuth AuthMode = "gcp-cloudsql" +) + +var authModes = []AuthMode{ + StaticAuth, + AzureADAuth, + GCPCloudSQLIAMAuth, +} + +// Check returns an error if the AuthMode is invalid. +func (a AuthMode) Check() error { + if slices.Contains(authModes, a) { + return nil + } + return trace.BadParameter("invalid authentication mode %q, should be one of \"%v\"", a, apiutils.JoinStrings(authModes, `", "`)) +} + +// AuthConfig contains common auth configs. +type AuthConfig struct { + // AuthMode is the authentication mode. + AuthMode AuthMode `json:"auth_mode"` + // GCPConnectionName is the GCP connection name in format of + // project:region:instance. The connection name is required by the + // connector libraries as the connection target. + GCPConnectionName string `json:"gcp_connection_name"` + // GCPIPType specifies the type of IP used for GCP connection. + GCPIPType GCPIPType `json:"gcp_ip_type"` +} + +// Check returns an error if the AuthMode is invalid. +func (a AuthConfig) Check() error { + if err := a.AuthMode.Check(); err != nil { + return trace.Wrap(err) + } + + if a.AuthMode == GCPCloudSQLIAMAuth { + if a.GCPConnectionName == "" { + return trace.NotFound("empty GCP connection name (hint: project:region:instance)") + } + if err := a.GCPIPType.check(); err != nil { + return trace.Wrap(err) + } + } + return nil +} + +// ApplyToPoolConfigs configures pgxpool.Config based on the authMode. +func (a AuthConfig) ApplyToPoolConfigs(ctx context.Context, logger *slog.Logger, configs ...*pgxpool.Config) error { + switch a.AuthMode { + case StaticAuth: + // Nothing to do + return nil + + case AzureADAuth: + bc, err := AzureBeforeConnect(ctx, logger) + if err != nil { + return trace.Wrap(err) + } + + for _, config := range configs { + config.BeforeConnect = bc + } + return nil + + case GCPCloudSQLIAMAuth: + for _, config := range configs { + dialFunc, err := GCPCloudSQLDialFunc(ctx, a, config.ConnConfig.User, logger) + if err != nil { + return trace.Wrap(err) + } + config.ConnConfig.DialFunc = dialFunc + } + return nil + + default: + return trace.BadParameter("invalid authentication mode %q", a) + } +} diff --git a/lib/backend/pgbk/common/auth_test.go b/lib/backend/pgbk/common/auth_test.go new file mode 100644 index 0000000000000..c67eb68e87fdd --- /dev/null +++ b/lib/backend/pgbk/common/auth_test.go @@ -0,0 +1,217 @@ +/* + * Teleport + * Copyright (C) 2024 Gravitational, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package pgcommon + +import ( + "context" + "log/slog" + "os" + "testing" + + "github.com/jackc/pgx/v5/pgxpool" + "github.com/stretchr/testify/require" + + "github.com/gravitational/teleport/lib/utils" +) + +func TestMain(m *testing.M) { + utils.InitLoggerForTests() + os.Exit(m.Run()) +} + +func TestAuthConfig(t *testing.T) { + mustSetGoogleApplicationCredentialsEnv(t) + mustSetAzureEnvironmentCredential(t) + + emptyPoolConfig := func(t *testing.T) *pgxpool.Config { + t.Helper() + return &pgxpool.Config{} + } + gcpCloudSQLPoolConfig := func(t *testing.T) *pgxpool.Config { + t.Helper() + config, err := pgxpool.ParseConfig("postgres://user@project.iam@/#gcp_connection_name=project:location:instance") + require.NoError(t, err) + // Unset dial func to verify that it will be overwritten. + config.ConnConfig.DialFunc = nil + return config + } + + verifyBeforeConnectIsSet := func(t *testing.T, config *pgxpool.Config) { + t.Helper() + require.NotNil(t, config.BeforeConnect) + } + verifyDialFuncIsSet := func(t *testing.T, config *pgxpool.Config) { + t.Helper() + require.NotNil(t, config.ConnConfig.DialFunc) + } + verifyNothingIsSet := func(t *testing.T, config *pgxpool.Config) { + t.Helper() + require.Equal(t, emptyPoolConfig(t), config) + } + + tests := []struct { + name string + authConfig AuthConfig + makePoolConfig func(*testing.T) *pgxpool.Config + requireCheckError require.ErrorAssertionFunc + verifyPoolConfigAfterApply func(*testing.T, *pgxpool.Config) + }{ + { + name: "unknown mode", + authConfig: AuthConfig{ + AuthMode: AuthMode("unknown-mode"), + }, + makePoolConfig: emptyPoolConfig, + requireCheckError: require.Error, + }, + { + name: "static auth", + authConfig: AuthConfig{ + AuthMode: StaticAuth, + }, + makePoolConfig: emptyPoolConfig, + requireCheckError: require.NoError, + verifyPoolConfigAfterApply: verifyNothingIsSet, + }, + { + name: "Azure AD Auth", + authConfig: AuthConfig{ + AuthMode: AzureADAuth, + }, + makePoolConfig: emptyPoolConfig, + requireCheckError: require.NoError, + verifyPoolConfigAfterApply: verifyBeforeConnectIsSet, + }, + { + name: "GCP IAM Auth", + authConfig: AuthConfig{ + AuthMode: GCPCloudSQLIAMAuth, + GCPConnectionName: "project:location:instance", + }, + makePoolConfig: gcpCloudSQLPoolConfig, + requireCheckError: require.NoError, + verifyPoolConfigAfterApply: verifyDialFuncIsSet, + }, + { + name: "GCP IAM Auth with IP type", + authConfig: AuthConfig{ + AuthMode: GCPCloudSQLIAMAuth, + GCPConnectionName: "project:location:instance", + GCPIPType: GCPIPTypePrivateIP, + }, + makePoolConfig: gcpCloudSQLPoolConfig, + requireCheckError: require.NoError, + verifyPoolConfigAfterApply: verifyDialFuncIsSet, + }, + { + name: "missing GCP connection name", + authConfig: AuthConfig{ + AuthMode: GCPCloudSQLIAMAuth, + }, + makePoolConfig: gcpCloudSQLPoolConfig, + requireCheckError: require.Error, + }, + { + name: "invalid GCP IP Type", + authConfig: AuthConfig{ + AuthMode: GCPCloudSQLIAMAuth, + GCPConnectionName: "project:location:instance", + GCPIPType: GCPIPType("unknown-ip-type"), + }, + makePoolConfig: gcpCloudSQLPoolConfig, + requireCheckError: require.Error, + }, + } + + ctx := context.Background() + logger := slog.Default() + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + t.Run("check", func(t *testing.T) { + err := tc.authConfig.Check() + if err != nil { + // Just checking out how the error message looks like. + t.Log(err) + } + tc.requireCheckError(t, err) + }) + + if tc.verifyPoolConfigAfterApply != nil { + t.Run("ApplyToPoolConfigs", func(t *testing.T) { + configs := []*pgxpool.Config{tc.makePoolConfig(t), tc.makePoolConfig(t), tc.makePoolConfig(t)} + err := tc.authConfig.ApplyToPoolConfigs(ctx, logger, configs...) + require.NoError(t, err) + + for _, config := range configs { + tc.verifyPoolConfigAfterApply(t, config) + } + }) + } + }) + } +} + +func TestGCPIPType(t *testing.T) { + tests := []struct { + ipTypeStr string + requireCheck require.ErrorAssertionFunc + requireCloudSQLConnOption require.ValueAssertionFunc + }{ + { + ipTypeStr: "", + requireCheck: require.NoError, + requireCloudSQLConnOption: require.Nil, + }, + { + ipTypeStr: "unknown", + requireCheck: require.Error, + requireCloudSQLConnOption: require.Nil, + }, + { + ipTypeStr: "public", + requireCheck: require.NoError, + requireCloudSQLConnOption: require.NotNil, + }, + { + ipTypeStr: "private", + requireCheck: require.NoError, + requireCloudSQLConnOption: require.NotNil, + }, + { + ipTypeStr: "psc", + requireCheck: require.NoError, + requireCloudSQLConnOption: require.NotNil, + }, + } + + for _, tc := range tests { + t.Run(tc.ipTypeStr, func(t *testing.T) { + ipType := GCPIPType(tc.ipTypeStr) + tc.requireCheck(t, ipType.check()) + tc.requireCloudSQLConnOption(t, ipType.cloudsqlconnOption()) + }) + } +} + +func mustSetAzureEnvironmentCredential(t *testing.T) { + t.Helper() + t.Setenv("AZURE_TENANT_ID", "teleport-test-tenant-id") + t.Setenv("AZURE_CLIENT_ID", "teleport-test-client-id") + t.Setenv("AZURE_CLIENT_SECRET", "teleport-test-client-secret") +} diff --git a/lib/backend/pgbk/common/azure.go b/lib/backend/pgbk/common/azure.go index 9fef75a0c5fd9..c5c05857fa034 100644 --- a/lib/backend/pgbk/common/azure.go +++ b/lib/backend/pgbk/common/azure.go @@ -20,19 +20,19 @@ package pgcommon import ( "context" + "log/slog" "time" "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" "github.com/Azure/azure-sdk-for-go/sdk/azidentity" "github.com/gravitational/trace" "github.com/jackc/pgx/v5" - "github.com/sirupsen/logrus" ) // AzureBeforeConnect will return a pgx BeforeConnect function suitable for // Azure AD authentication. The returned function will set the password of the // connection to a token for the relevant scope. -func AzureBeforeConnect(log logrus.FieldLogger) (func(ctx context.Context, config *pgx.ConnConfig) error, error) { +func AzureBeforeConnect(ctx context.Context, logger *slog.Logger) (func(ctx context.Context, config *pgx.ConnConfig) error, error) { cred, err := azidentity.NewDefaultAzureCredential(nil) if err != nil { return nil, trace.Wrap(err, "creating Azure credentials") @@ -48,7 +48,7 @@ func AzureBeforeConnect(log logrus.FieldLogger) (func(ctx context.Context, confi return trace.Wrap(err, "obtaining Azure authentication token") } - log.WithField("ttl", time.Until(token.ExpiresOn).String()).Debug("Acquired Azure access token.") + logger.DebugContext(ctx, "Acquired Azure access token.", "ttl", time.Until(token.ExpiresOn).String()) config.Password = token.Token return nil diff --git a/lib/backend/pgbk/common/gcp.go b/lib/backend/pgbk/common/gcp.go new file mode 100644 index 0000000000000..2af30efcf8dbc --- /dev/null +++ b/lib/backend/pgbk/common/gcp.go @@ -0,0 +1,180 @@ +/* + * Teleport + * Copyright (C) 2024 Gravitational, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package pgcommon + +import ( + "context" + "log/slog" + "net" + "slices" + + "cloud.google.com/go/cloudsqlconn" + "github.com/gravitational/trace" + "github.com/jackc/pgx/v5/pgconn" + "golang.org/x/oauth2" + "golang.org/x/oauth2/google" + "google.golang.org/api/impersonate" + + apiutils "github.com/gravitational/teleport/api/utils" + gcputils "github.com/gravitational/teleport/lib/utils/gcp" +) + +// GCPIPType specifies the type of IP used for GCP connection. +// +// Values are sourced from: +// https://github.com/GoogleCloudPlatform/cloud-sql-go-connector/blob/main/internal/cloudsql/refresh.go +// https://github.com/GoogleCloudPlatform/alloydb-go-connector/blob/main/internal/alloydb/refresh.go +// +// Note that AutoIP is not recommended for Cloud SQL and not present for +// AlloyDB. So we are not supporting AutoIP. Values are also lower-cased for +// simplicity. If not specified, the library defaults to public. +type GCPIPType string + +const ( + GCPIPTypeUnspecified GCPIPType = "" + GCPIPTypePublicIP GCPIPType = "public" + GCPIPTypePrivateIP GCPIPType = "private" + GCPIPTypePrivateServiceConnect GCPIPType = "psc" +) + +var gcpIPTypes = []GCPIPType{ + GCPIPTypeUnspecified, + GCPIPTypePublicIP, + GCPIPTypePrivateIP, + GCPIPTypePrivateServiceConnect, +} + +func (g GCPIPType) check() error { + if slices.Contains(gcpIPTypes, g) { + return nil + } + return trace.BadParameter("invalid GCP IP type %q, should be one of \"%v\"", g, apiutils.JoinStrings(gcpIPTypes, `", "`)) +} + +func (g GCPIPType) cloudsqlconnOption() cloudsqlconn.DialOption { + switch g { + case GCPIPTypePublicIP: + return cloudsqlconn.WithPublicIP() + case GCPIPTypePrivateIP: + return cloudsqlconn.WithPrivateIP() + case GCPIPTypePrivateServiceConnect: + return cloudsqlconn.WithPSC() + default: + return nil + } +} + +// GCPCloudSQLDialFunc creates a pgconn.DialFunc to use cloudsqlconn for +// "automatic" IAM database authentication. +// +// https://cloud.google.com/sql/docs/postgres/iam-authentication +func GCPCloudSQLDialFunc(ctx context.Context, config AuthConfig, dbUser string, logger *slog.Logger) (pgconn.DialFunc, error) { + // IAM auth users have the PostgreSQL username of their emails minus + // the ".gserviceaccount.com" part. Now add the suffix back for the + // full service account email. + targetServiceAccount := dbUser + ".gserviceaccount.com" + if err := gcputils.ValidateGCPServiceAccountName(targetServiceAccount); err != nil { + return nil, trace.Wrap(err, "IAM database user for service account should have usernames in format of @.iam but got %s", dbUser) + } + + iamAuthOptions, err := makeGCPCloudSQLAuthOptionsForServiceAccount(ctx, targetServiceAccount, gcpServiceAccountImpersonatorImpl{}, logger) + if err != nil { + return nil, trace.Wrap(err) + } + + dialer, err := cloudsqlconn.NewDialer(ctx, iamAuthOptions...) + if err != nil { + return nil, trace.Wrap(err) + } + + var dialOptions []cloudsqlconn.DialOption + if ipTypeOption := config.GCPIPType.cloudsqlconnOption(); ipTypeOption != nil { + dialOptions = append(dialOptions, ipTypeOption) + } + + return func(ctx context.Context, _, _ string) (net.Conn, error) { + // Use connection name and ignore network and host address. + logger.DebugContext(ctx, "Dialing GCP Cloud SQL.", "connection_name", config.GCPConnectionName, "service_account", targetServiceAccount, "ip_type", config.GCPIPType) + conn, err := dialer.Dial(ctx, config.GCPConnectionName, dialOptions...) + return conn, trace.Wrap(err) + }, nil +} + +func makeGCPCloudSQLAuthOptionsForServiceAccount(ctx context.Context, targetServiceAccount string, impersonator gcpServiceAccountImpersonator, logger *slog.Logger) ([]cloudsqlconn.Option, error) { + defaultCred, err := google.FindDefaultCredentials(ctx) + if err != nil { + // google.FindDefaultCredentials gives pretty error descriptions already. + return nil, trace.Wrap(err) + } + + // This function tries to capture service account emails from various + // credentials methods but may fail for some unknown scenarios. + defaultServiceAccount, err := gcputils.GetServiceAccountFromCredentials(defaultCred) + if err != nil || defaultServiceAccount == "" { + logger.WarnContext(ctx, "Failed to get service account email from default google credentials. Teleport will assume the database user in the PostgreSQL connection string matches the service account of the default google credentials.", "err", err, "sa", defaultServiceAccount) + return []cloudsqlconn.Option{cloudsqlconn.WithIAMAuthN()}, nil + } + + // If the requested db user is for another service account, the default + // service account can impersonate the target service account as a Token + // Creator. This is useful when using a different database user for change + // feed. Otherwise, let cloudsqlconn use the default credentials. + if defaultServiceAccount == targetServiceAccount { + logger.InfoContext(ctx, "Using google default credentials for Cloud SQL backend.") + return []cloudsqlconn.Option{cloudsqlconn.WithIAMAuthN()}, nil + } + + // For simplicity, we assume the target service account will be used for + // both API and IAM auth. See description of + // cloudsqlconn.WithIAMAuthNTokenSources on the required scopes. + logger.InfoContext(ctx, "Impersonating a service account for Cloud SQL backend.", "service_account", targetServiceAccount) + + apiTokenSource, err := impersonator.makeTokenSource(ctx, targetServiceAccount, "https://www.googleapis.com/auth/sqlservice.admin") + if err != nil { + return nil, trace.Wrap(err) + } + iamAuthTokenSource, err := impersonator.makeTokenSource(ctx, targetServiceAccount, "https://www.googleapis.com/auth/sqlservice.login") + if err != nil { + return nil, trace.Wrap(err) + } + + return []cloudsqlconn.Option{ + cloudsqlconn.WithIAMAuthN(), + cloudsqlconn.WithIAMAuthNTokenSources(apiTokenSource, iamAuthTokenSource), + }, nil +} + +type gcpServiceAccountImpersonator interface { + makeTokenSource(context.Context, string, ...string) (oauth2.TokenSource, error) +} + +type gcpServiceAccountImpersonatorImpl struct { +} + +func (g gcpServiceAccountImpersonatorImpl) makeTokenSource(ctx context.Context, targetServiceAccount string, scopes ...string) (oauth2.TokenSource, error) { + tokenSource, err := impersonate.CredentialsTokenSource( + ctx, + impersonate.CredentialsConfig{ + TargetPrincipal: targetServiceAccount, + Scopes: scopes, + }, + ) + // tokenSource caches the access token and only refreshes when expired. + return tokenSource, trace.Wrap(err) +} diff --git a/lib/backend/pgbk/common/gcp_test.go b/lib/backend/pgbk/common/gcp_test.go new file mode 100644 index 0000000000000..c1d94991e4490 --- /dev/null +++ b/lib/backend/pgbk/common/gcp_test.go @@ -0,0 +1,103 @@ +/* + * Teleport + * Copyright (C) 2024 Gravitational, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package pgcommon + +import ( + "context" + "log/slog" + "os" + "path" + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/require" + "golang.org/x/oauth2" +) + +type mockGCPServiceAccountImpersonator struct { + calledForServiceAccount []string +} + +func (m *mockGCPServiceAccountImpersonator) makeTokenSource(_ context.Context, serviceAccount string, _ ...string) (oauth2.TokenSource, error) { + m.calledForServiceAccount = append(m.calledForServiceAccount, serviceAccount) + return oauth2.StaticTokenSource(&oauth2.Token{ + AccessToken: "access_token", + Expiry: time.Now().Add(time.Hour), + }), nil +} + +func Test_makeGCPCloudSQLAuthOptionsForServiceAccount(t *testing.T) { + mustSetGoogleApplicationCredentialsEnv(t) + ctx := context.Background() + logger := slog.Default() + m := &mockGCPServiceAccountImpersonator{} + + t.Run("using default credentials", func(t *testing.T) { + defaultServiceAccount := "my-service-account@teleport-example-123456.iam.gserviceaccount.com" + options, err := makeGCPCloudSQLAuthOptionsForServiceAccount(ctx, defaultServiceAccount, m, logger) + require.NoError(t, err) + + // Cannot validate the actual options. Just check that the count of + // options is correct and impersonator is NOT called. + require.Len(t, options, 1) + require.Empty(t, m.calledForServiceAccount) + }) + + t.Run("impersonate a service account", func(t *testing.T) { + otherServiceAccount := "my-other-service-account@teleport-example-123456.iam" + options, err := makeGCPCloudSQLAuthOptionsForServiceAccount(ctx, otherServiceAccount, m, logger) + require.NoError(t, err) + + // Cannot validate the actual options. Just check that the count of + // options is correct and impersonator is called twice (once for API + // client and once for IAM auth). + require.Len(t, options, 2) + require.Equal(t, + []string{otherServiceAccount, otherServiceAccount}, + m.calledForServiceAccount, + ) + }) +} + +func mustSetGoogleApplicationCredentialsEnv(t *testing.T) { + t.Helper() + + file := path.Join(t.TempDir(), uuid.New().String()) + err := os.WriteFile(file, []byte(fakeServiceAccountCredentialsJSON), 0644) + require.NoError(t, err) + + t.Setenv("GOOGLE_APPLICATION_CREDENTIALS", file) +} + +const ( + fakeServiceAccountCredentialsJSON = `{ + "type": "service_account", + "project_id": "teleport-example-123456", + "private_key_id": "1234569890abcdef1234567890abcdef12345678", + "private_key": "fake-private-key", + "client_email": "my-service-account@teleport-example-123456.iam.gserviceaccount.com", + "client_id": "111111111111111111111", + "auth_uri": "https://accounts.google.com/o/oauth2/auth", + "token_uri": "https://oauth2.googleapis.com/token", + "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", + "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/my-service-account%40teleport-example-123456.iam.gserviceaccount.com", + "universe_domain": "googleapis.com" +}` +) diff --git a/lib/backend/pgbk/common/utils.go b/lib/backend/pgbk/common/utils.go index 314b5a1b76a31..44d22fb1f02f5 100644 --- a/lib/backend/pgbk/common/utils.go +++ b/lib/backend/pgbk/common/utils.go @@ -22,6 +22,7 @@ import ( "context" "errors" "fmt" + "log/slog" "time" "github.com/gravitational/trace" @@ -29,7 +30,6 @@ import ( "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgxpool" - "github.com/sirupsen/logrus" "github.com/gravitational/teleport/api/utils/retryutils" ) @@ -63,10 +63,10 @@ func ConnectPostgres(ctx context.Context, poolConfig *pgxpool.Config) (*pgx.Conn // TryEnsureDatabase will connect to the "postgres" database and attempt to // create the database named in the pool's configuration. -func TryEnsureDatabase(ctx context.Context, poolConfig *pgxpool.Config, log logrus.FieldLogger) { +func TryEnsureDatabase(ctx context.Context, poolConfig *pgxpool.Config, log *slog.Logger) { pgConn, err := ConnectPostgres(ctx, poolConfig) if err != nil { - log.WithError(err).Warn("Failed to connect to the \"postgres\" database.") + log.WarnContext(ctx, "Failed to connect to the \"postgres\" database.", "error", err) return } @@ -81,13 +81,13 @@ func TryEnsureDatabase(ctx context.Context, poolConfig *pgxpool.Config, log logr // will fail immediately if we can't connect, anyway, so we can log // permission errors at debug level here. if IsCode(err, pgerrcode.InsufficientPrivilege) { - log.WithError(err).Debug("Error creating database due to insufficient privileges.") + log.DebugContext(ctx, "Error creating database due to insufficient privileges.", "error", err) } else { - log.WithError(err).Warn("Error creating database.") + log.WarnContext(ctx, "Error creating database.", "error", err) } } if err := pgConn.Close(ctx); err != nil { - log.WithError(err).Warn("Error closing connection to the \"postgres\" database.") + log.WarnContext(ctx, "Error closing connection to the \"postgres\" database.", "error", err) } } @@ -97,7 +97,7 @@ func TryEnsureDatabase(ctx context.Context, poolConfig *pgxpool.Config, log logr // any data has been sent. It will retry unique constraint violation and // exclusion constraint violations, so the closure should not rely on those for // normal behavior. -func Retry[T any](ctx context.Context, log logrus.FieldLogger, f func() (T, error)) (T, error) { +func Retry[T any](ctx context.Context, log *slog.Logger, f func() (T, error)) (T, error) { const idempotent = false v, err := retry(ctx, log, idempotent, f) return v, trace.Wrap(err) @@ -108,13 +108,13 @@ func Retry[T any](ctx context.Context, log logrus.FieldLogger, f func() (T, erro // assumes that f is idempotent, so it will retry even in ambiguous situations. // It will retry unique constraint violation and exclusion constraint // violations, so the closure should not rely on those for normal behavior. -func RetryIdempotent[T any](ctx context.Context, log logrus.FieldLogger, f func() (T, error)) (T, error) { +func RetryIdempotent[T any](ctx context.Context, log *slog.Logger, f func() (T, error)) (T, error) { const idempotent = true v, err := retry(ctx, log, idempotent, f) return v, trace.Wrap(err) } -func retry[T any](ctx context.Context, log logrus.FieldLogger, isIdempotent bool, f func() (T, error)) (T, error) { +func retry[T any](ctx context.Context, log *slog.Logger, isIdempotent bool, f func() (T, error)) (T, error) { var v T var err error v, err = f() @@ -143,18 +143,22 @@ func retry[T any](ctx context.Context, log logrus.FieldLogger, isIdempotent bool _ = errors.As(err, &pgErr) if pgErr != nil && isSerializationErrorCode(pgErr.Code) { - log.WithError(err). - WithField("attempt", i). - Debug("Operation failed due to conflicts, retrying quickly.") + log.LogAttrs(ctx, slog.LevelDebug, + "Operation failed due to conflicts, retrying quickly.", + slog.Int("attempt", i), + slog.Any("error", err), + ) retry.Reset() // the very first attempt gets instant retry on serialization failure if i > 1 { retry.Inc() } } else if (isIdempotent && pgErr == nil) || pgconn.SafeToRetry(err) { - log.WithError(err). - WithField("attempt", i). - Debug("Operation failed, retrying.") + log.LogAttrs(ctx, slog.LevelDebug, + "Operation failed, retrying.", + slog.Int("attempt", i), + slog.Any("error", err), + ) retry.Inc() } else { // we either know we shouldn't retry (on a database error), or we @@ -207,7 +211,7 @@ func isSerializationErrorCode(code string) bool { // [pgx.BeginTxFunc]. func RetryTx( ctx context.Context, - log logrus.FieldLogger, + log *slog.Logger, db interface { BeginTx(context.Context, pgx.TxOptions) (pgx.Tx, error) }, @@ -233,7 +237,7 @@ func IsCode(err error, code string) bool { // the name of a table used to hold schema version numbers. func SetupAndMigrate( ctx context.Context, - log logrus.FieldLogger, + log *slog.Logger, db interface { BeginTx(context.Context, pgx.TxOptions) (pgx.Tx, error) Exec(context.Context, string, ...any) (pgconn.CommandTag, error) @@ -259,7 +263,10 @@ func SetupAndMigrate( }); err != nil { // the very first SELECT in the next transaction will fail, we don't // need anything higher than debug here - log.WithError(err).Debugf("Failed to confirm the existence of the %v table.", tableName) + log.DebugContext(ctx, "Failed to confirm the existence of the configured table.", + "table", tableName, + "error", err, + ) } const idempotent = true @@ -307,10 +314,10 @@ func SetupAndMigrate( } if int(version) != len(schemas) { - log.WithFields(logrus.Fields{ - "previous_version": version, - "current_version": len(schemas), - }).Info("Migrated database schema.") + log.InfoContext(ctx, "Migrated database schema.", + "previous_version", version, + "current_version", len(schemas), + ) } return nil diff --git a/lib/backend/pgbk/pgbk.go b/lib/backend/pgbk/pgbk.go index 41029aef45bce..f25bf8250c928 100644 --- a/lib/backend/pgbk/pgbk.go +++ b/lib/backend/pgbk/pgbk.go @@ -22,6 +22,7 @@ import ( "bytes" "context" "errors" + "log/slog" "sync" "time" @@ -30,7 +31,6 @@ import ( "github.com/jackc/pgx/v5/pgtype/zeronull" "github.com/jackc/pgx/v5/pgxpool" "github.com/jonboulle/clockwork" - "github.com/sirupsen/logrus" "github.com/gravitational/teleport" "github.com/gravitational/teleport/api/types" @@ -64,36 +64,13 @@ const ( defaultExpiryInterval = 30 * time.Second ) -// AuthMode determines if we should use some environment-specific authentication -// mechanism or credentials. -type AuthMode string - -const ( - // StaticAuth uses the static credentials as defined in the connection - // string. - StaticAuth AuthMode = "" - // AzureADAuth gets a connection token from Azure and uses it as the - // password when connecting. - AzureADAuth AuthMode = "azure" -) - -// Check returns an error if the AuthMode is invalid. -func (a AuthMode) Check() error { - switch a { - case StaticAuth, AzureADAuth: - return nil - default: - return trace.BadParameter("invalid authentication mode %q, should be %q or %q", a, StaticAuth, AzureADAuth) - } -} - // Config is the configuration struct for [Backend]; outside of tests or custom // code, it's usually generated by converting the [backend.Params] from the // Teleport configuration file. type Config struct { - ConnString string `json:"conn_string"` + pgcommon.AuthConfig - AuthMode AuthMode `json:"auth_mode"` + ConnString string `json:"conn_string"` ChangeFeedConnString string `json:"change_feed_conn_string"` ChangeFeedPollInterval types.Duration `json:"change_feed_poll_interval"` @@ -105,7 +82,7 @@ type Config struct { } func (c *Config) CheckAndSetDefaults() error { - if err := c.AuthMode.Check(); err != nil { + if err := c.AuthConfig.Check(); err != nil { return trace.Wrap(err) } @@ -172,26 +149,22 @@ func NewWithConfig(ctx context.Context, cfg Config) (*Backend, error) { return nil, trace.Wrap(err) } - log := logrus.WithField(teleport.ComponentKey, componentName) + log := slog.With(teleport.ComponentKey, componentName) - if cfg.AuthMode == AzureADAuth { - bc, err := pgcommon.AzureBeforeConnect(log) - if err != nil { - return nil, trace.Wrap(err) - } - poolConfig.BeforeConnect = bc - feedConfig.BeforeConnect = bc + if err := cfg.AuthConfig.ApplyToPoolConfigs(ctx, log, poolConfig, feedConfig); err != nil { + return nil, trace.Wrap(err) } const defaultTxIsoParamName = "default_transaction_isolation" if defaultTxIso := poolConfig.ConnConfig.RuntimeParams[defaultTxIsoParamName]; defaultTxIso != "" { - log.WithField(defaultTxIsoParamName, defaultTxIso). - Error("The " + defaultTxIsoParamName + " parameter was overridden in the connection string; proceeding with an unsupported configuration.") + const message = "The " + defaultTxIsoParamName + " parameter was overridden in the connection string; proceeding with an unsupported configuration." + log.ErrorContext(ctx, message, + defaultTxIsoParamName, defaultTxIso) } else { poolConfig.ConnConfig.RuntimeParams[defaultTxIsoParamName] = "serializable" } - log.Info("Setting up backend.") + log.InfoContext(ctx, "Setting up backend.") pgcommon.TryEnsureDatabase(ctx, poolConfig, log) @@ -238,7 +211,7 @@ type Backend struct { cfg Config feedConfig *pgxpool.Config - log logrus.FieldLogger + log *slog.Logger pool *pgxpool.Pool buf *backend.CircularBuffer diff --git a/lib/events/pgevents/pgevents.go b/lib/events/pgevents/pgevents.go index b622ebf7e4dd6..7711d584ae063 100644 --- a/lib/events/pgevents/pgevents.go +++ b/lib/events/pgevents/pgevents.go @@ -21,6 +21,7 @@ package pgevents import ( "context" "fmt" + "log/slog" "net/url" "strconv" "strings" @@ -31,7 +32,6 @@ import ( "github.com/gravitational/trace" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" - "github.com/sirupsen/logrus" "github.com/gravitational/teleport" "github.com/gravitational/teleport/api/types" @@ -58,42 +58,21 @@ const ( // URL parameters for configuration. const ( - authModeParam = "auth_mode" + authModeParam = "auth_mode" + gcpConnectionNameParam = "gcp_connection_name" + gcpIPTypeParam = "gcp_ip_type" disableCleanupParam = "disable_cleanup" cleanupIntervalParam = "cleanup_interval" retentionPeriodParam = "retention_period" ) -// AuthMode determines if we should use some environment-specific authentication -// mechanism or credentials. -type AuthMode string - -const ( - // FixedAuth uses the static credentials as defined in the connection - // string. - FixedAuth AuthMode = "" - // AzureADAuth gets a connection token from Azure and uses it as the - // password when connecting. - AzureADAuth AuthMode = "azure" -) - -// Check returns an error if the AuthMode is invalid. -func (a AuthMode) Check() error { - switch a { - case FixedAuth, AzureADAuth: - return nil - default: - return trace.BadParameter("invalid authentication mode %q", a) - } -} - // Config is the configuration struct to pass to New. type Config struct { - Log logrus.FieldLogger - PoolConfig *pgxpool.Config + pgcommon.AuthConfig - AuthMode AuthMode + Log *slog.Logger + PoolConfig *pgxpool.Config DisableCleanup bool RetentionPeriod time.Duration @@ -122,7 +101,9 @@ func (c *Config) SetFromURL(u *url.URL) error { } c.PoolConfig = poolConfig - c.AuthMode = AuthMode(params.Get(authModeParam)) + c.AuthMode = pgcommon.AuthMode(params.Get(authModeParam)) + c.GCPConnectionName = params.Get(gcpConnectionNameParam) + c.GCPIPType = pgcommon.GCPIPType(params.Get(gcpIPTypeParam)) if s := params.Get(disableCleanupParam); s != "" { b, err := strconv.ParseBool(s) @@ -158,7 +139,7 @@ func (c *Config) CheckAndSetDefaults() error { return trace.BadParameter("missing pool config") } - if err := c.AuthMode.Check(); err != nil { + if err := c.AuthConfig.Check(); err != nil { return trace.Wrap(err) } @@ -177,7 +158,7 @@ func (c *Config) CheckAndSetDefaults() error { } if c.Log == nil { - c.Log = logrus.WithField(teleport.ComponentKey, componentName) + c.Log = slog.With(teleport.ComponentKey, componentName) } return nil @@ -194,15 +175,11 @@ func New(ctx context.Context, cfg Config) (*Log, error) { return nil, trace.Wrap(err, "registering prometheus collectors") } - if cfg.AuthMode == AzureADAuth { - bc, err := pgcommon.AzureBeforeConnect(cfg.Log) - if err != nil { - return nil, trace.Wrap(err) - } - cfg.PoolConfig.BeforeConnect = bc + if err := cfg.AuthConfig.ApplyToPoolConfigs(ctx, cfg.Log, cfg.PoolConfig); err != nil { + return nil, trace.Wrap(err) } - cfg.Log.Info("Setting up events backend.") + cfg.Log.InfoContext(ctx, "Setting up events backend.") pgcommon.TryEnsureDatabase(ctx, cfg.PoolConfig, cfg.Log) @@ -228,14 +205,14 @@ func New(ctx context.Context, cfg Config) (*Log, error) { go l.periodicCleanup(periodicCtx, cfg.CleanupInterval, cfg.RetentionPeriod) } - l.log.Info("Started events backend.") + l.log.InfoContext(ctx, "Started events backend.") return l, nil } // Log is an external [events.AuditLogger] backed by a PostgreSQL database. type Log struct { - log logrus.FieldLogger + log *slog.Logger pool *pgxpool.Pool cancel context.CancelFunc @@ -280,7 +257,7 @@ func (l *Log) periodicCleanup(ctx context.Context, cleanupInterval, retentionPer case <-tk.C: } - l.log.Debug("Executing periodic cleanup.") + l.log.DebugContext(ctx, "Executing periodic cleanup.") start := time.Now() deleted, err := pgcommon.RetryIdempotent(ctx, l.log, func() (int64, error) { tag, err := l.pool.Exec(ctx, @@ -297,10 +274,10 @@ func (l *Log) periodicCleanup(ctx context.Context, cleanupInterval, retentionPer if err != nil { batchDeleteRequestsFailure.Inc() - l.log.WithError(err).Error("Failed to execute periodic cleanup.") + l.log.ErrorContext(ctx, "Failed to execute periodic cleanup.", "error", err) } else { batchDeleteRequestsSuccess.Inc() - l.log.WithField("deleted_rows", deleted).Debug("Executed periodic cleanup.") + l.log.DebugContext(ctx, "Executed periodic cleanup.", "deleted", deleted) } } } diff --git a/lib/events/pgevents/pgevents_test.go b/lib/events/pgevents/pgevents_test.go index 07e193f9ed62f..49fb005ef2170 100644 --- a/lib/events/pgevents/pgevents_test.go +++ b/lib/events/pgevents/pgevents_test.go @@ -28,6 +28,7 @@ import ( "github.com/jonboulle/clockwork" "github.com/stretchr/testify/require" + pgcommon "github.com/gravitational/teleport/lib/backend/pgbk/common" "github.com/gravitational/teleport/lib/events/test" "github.com/gravitational/teleport/lib/utils" ) @@ -86,7 +87,18 @@ func TestPostgresEvents(t *testing.T) { func TestConfig(t *testing.T) { configs := map[string]*Config{ "postgres://foo#auth_mode=azure": { - AuthMode: AzureADAuth, + AuthConfig: pgcommon.AuthConfig{ + AuthMode: pgcommon.AzureADAuth, + }, + RetentionPeriod: defaultRetentionPeriod, + CleanupInterval: defaultCleanupInterval, + }, + "postgres://foo#auth_mode=gcp-cloudsql&gcp_connection_name=project:location:instance&gcp_ip_type=private": { + AuthConfig: pgcommon.AuthConfig{ + AuthMode: pgcommon.GCPCloudSQLIAMAuth, + GCPConnectionName: "project:location:instance", + GCPIPType: pgcommon.GCPIPTypePrivateIP, + }, RetentionPeriod: defaultRetentionPeriod, CleanupInterval: defaultCleanupInterval, }, diff --git a/lib/utils/gcp/gcp.go b/lib/utils/gcp/gcp.go index 00d71db3e89b5..141a74edbd680 100644 --- a/lib/utils/gcp/gcp.go +++ b/lib/utils/gcp/gcp.go @@ -19,9 +19,12 @@ package gcp import ( + "encoding/json" "strings" + "cloud.google.com/go/compute/metadata" "github.com/gravitational/trace" + "golang.org/x/oauth2/google" ) // SortedGCPServiceAccounts sorts service accounts by project and service account name. @@ -56,7 +59,7 @@ func (s SortedGCPServiceAccounts) Swap(i, j int) { s[i], s[j] = s[j], s[i] } -const expectedParentDomain = "iam.gserviceaccount.com" +const serviceAccountParentDomain = "iam.gserviceaccount.com" func ProjectIDFromServiceAccountName(serviceAccount string) (string, error) { if serviceAccount == "" { @@ -80,8 +83,8 @@ func ProjectIDFromServiceAccountName(serviceAccount string) (string, error) { return "", trace.BadParameter("invalid service account format: missing project ID") } - if iamDomain != expectedParentDomain { - return "", trace.BadParameter("invalid service account format: expected suffix %q, got %q", expectedParentDomain, iamDomain) + if iamDomain != serviceAccountParentDomain { + return "", trace.BadParameter("invalid service account format: expected suffix %q, got %q", serviceAccountParentDomain, iamDomain) } return projectID, nil @@ -91,3 +94,62 @@ func ValidateGCPServiceAccountName(serviceAccount string) error { _, err := ProjectIDFromServiceAccountName(serviceAccount) return err } + +// GetServiceAccountFromCredentials attempts to retrieve service account email +// from provided credentials. +func GetServiceAccountFromCredentials(credentials *google.Credentials) (string, error) { + // When credentials JSON file is provided through either + // GOOGLE_APPLICATION_CREDENTIALS env var or a well known file. + if len(credentials.JSON) > 0 { + sa, err := GetServiceAccountFromCredentialsJSON(credentials.JSON) + return sa, trace.Wrap(err) + } + + // No credentials from JSON files but using metadata endpoints when on + // Google Compute Engine. + if metadata.OnGCE() { + email, err := metadata.Email("") + return email, trace.Wrap(err) + } + + return "", trace.NotImplemented("unknown environment for getting service account") +} + +// GetServiceAccountFromCredentialsJSON attempts to retrieve service account +// email from provided credentials JSON. +func GetServiceAccountFromCredentialsJSON(credentialsJSON []byte) (string, error) { + content := struct { + // ClientEmail defines the service account email for service_account + // credentials. + // + // Reference: https://google.aip.dev/auth/4112 + ClientEmail string `json:"client_email"` + + // ServiceAccountImpersonationURL is used for external + // account_credentials (e.g. Workload Identity Federation) when using + // service account personation. + // + // Reference: https://google.aip.dev/auth/4117 + ServiceAccountImpersonationURL string `json:"service_account_impersonation_url"` + }{} + + if err := json.Unmarshal(credentialsJSON, &content); err != nil { + return "", trace.Wrap(err) + } + + if content.ClientEmail != "" { + return content.ClientEmail, nil + } + + // Format: + // https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/$EMAIL:generateAccessToken + if _, after, ok := strings.Cut(content.ServiceAccountImpersonationURL, "/serviceAccounts/"); ok { + index := strings.LastIndex(after, serviceAccountParentDomain) + if index < 0 { + return "", trace.BadParameter("invalid service_account_impersonation_url %q", content.ServiceAccountImpersonationURL) + } + return after[:index+len(serviceAccountParentDomain)], nil + } + + return "", trace.NotImplemented("unknown environment for getting service account") +} diff --git a/lib/utils/gcp/gcp_test.go b/lib/utils/gcp/gcp_test.go index 23918f3f7ddda..bcf2b71e2002b 100644 --- a/lib/utils/gcp/gcp_test.go +++ b/lib/utils/gcp/gcp_test.go @@ -192,3 +192,73 @@ func TestProjectIDFromServiceAccountName(t *testing.T) { }) } } + +func TestGetServiceAccountFromCredentialsJSON(t *testing.T) { + tests := []struct { + name string + credentialsJSON []byte + checkError require.ErrorAssertionFunc + wantServiceAccount string + }{ + { + name: "service_account credentials", + credentialsJSON: []byte(fakeServiceAccountCredentialsJSON), + checkError: require.NoError, + wantServiceAccount: "my-service-account@teleport-example-123456.iam.gserviceaccount.com", + }, + { + name: "external_account credentials with sa impersonation", + credentialsJSON: []byte(fakeExternalAccountCredentialsJSON), + checkError: require.NoError, + wantServiceAccount: "my-service-account@teleport-example-987654.iam.gserviceaccount.com", + }, + { + name: "unknown credentials", + credentialsJSON: []byte(`{}`), + checkError: require.Error, + }, + { + name: "bad json", + credentialsJSON: []byte(`{}`), + checkError: require.Error, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + sa, err := GetServiceAccountFromCredentialsJSON(tc.credentialsJSON) + tc.checkError(t, err) + require.Equal(t, tc.wantServiceAccount, sa) + }) + } +} + +const ( + fakeServiceAccountCredentialsJSON = `{ + "type": "service_account", + "project_id": "teleport-example-123456", + "private_key_id": "1234569890abcdef1234567890abcdef12345678", + "private_key": "fake-private-key", + "client_email": "my-service-account@teleport-example-123456.iam.gserviceaccount.com", + "client_id": "111111111111111111111", + "auth_uri": "https://accounts.google.com/o/oauth2/auth", + "token_uri": "https://oauth2.googleapis.com/token", + "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", + "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/my-service-account%40teleport-example-123456.iam.gserviceaccount.com", + "universe_domain": "googleapis.com" +}` + fakeExternalAccountCredentialsJSON = `{ + "type": "external_account", + "audience": "//iam.googleapis.com/projects/111111111111/locations/global/workloadIdentityPools/my-identity-pool/providers/my-provider", + "subject_token_type": "urn:ietf:params:aws:token-type:aws4_request", + "service_account_impersonation_url": "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/my-service-account@teleport-example-987654.iam.gserviceaccount.com:generateAccessToken", + "token_url": "https://sts.googleapis.com/v1/token", + "credential_source": { + "environment_id": "aws1", + "region_url": "http://169.254.169.254/latest/meta-data/placement/availability-zone", + "url": "http://169.254.169.254/latest/meta-data/iam/security-credentials", + "regional_cred_verification_url": "https://sts.{region}.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15", + "imdsv2_session_token_url": "http://169.254.169.254/latest/api/token" + } +}` +)