diff --git a/cmd/fdsn-holdings-consumer/main.go b/cmd/fdsn-holdings-consumer/main.go index fc36d5d..2fb4a0f 100644 --- a/cmd/fdsn-holdings-consumer/main.go +++ b/cmd/fdsn-holdings-consumer/main.go @@ -28,11 +28,10 @@ import ( "github.com/GeoNet/kit/cfg" "github.com/GeoNet/kit/health" "github.com/GeoNet/kit/metrics" - "github.com/GeoNet/kit/slogger" ) const ( - healthCheckAged = 5 * time.Minute //need to have a good heartbeat within this time (depends on tilde-bundle) + healthCheckAged = 5 * time.Minute //need to have a good heartbeat within this time healthCheckStartup = 5 * time.Minute //ignore heartbeat messages for this time after starting healthCheckTimeout = 30 * time.Second //health check timeout healthCheckService = ":7777" //end point to listen to for SOH checks @@ -41,18 +40,40 @@ const ( var ( db *sql.DB - queueURL = os.Getenv("SQS_QUEUE_URL") + queueURL string sqsClient sqs.SQS s3Client *s3.S3 saveHoldings *sql.Stmt - - sLogger = slogger.NewSmartLogger(10*time.Minute, "") // log repeated error messages ) type event struct { s3.Event } +// init and check aws variables +func initAwsClient() { + queueURL = os.Getenv("SQS_QUEUE_URL") + if queueURL == "" { + log.Fatal("SQS_QUEUE_URL is not set") + } + + var err error + sqsClient, err = sqs.NewWithMaxRetries(100) + if err != nil { + log.Fatalf("error creating SQS client: %s", err) + } + if err = sqsClient.CheckQueue(queueURL); err != nil { + log.Fatalf("error checking queueURL %s: %s", queueURL, err.Error()) + } + + s3c, err := s3.NewWithMaxRetries(3) + if err != nil { + log.Fatalf("error creating S3 client: %s", err) + } + s3Client = &s3c + +} + func main() { //check health if health.RunningHealthCheck() { @@ -60,6 +81,7 @@ func main() { } //run as normal service + initAwsClient() p, err := cfg.PostgresEnv() if err != nil { log.Fatalf("error reading DB config from the environment vars: %s", err) @@ -117,17 +139,6 @@ ping: break ping } - sqsClient, err = sqs.NewWithMaxRetries(100) - if err != nil { - log.Fatalf("error creating SQS client: %s", err) - } - - s3c, err := s3.NewWithMaxRetries(3) - if err != nil { - log.Fatalf("error creating S3 client: %s", err) - } - s3Client = &s3c - log.Println("listening for messages") var r sqs.Raw @@ -138,40 +149,31 @@ ping: loop1: for { + health.Ok() // update soh r, err = sqsClient.ReceiveWithContext(ctx, queueURL, 600) if err != nil { switch { + case sqs.IsNoMessagesError(err): + continue case sqs.Cancelled(err): //stoped log.Println("##1 system stop... ") break loop1 - case sqs.IsNoMessagesError(err): - n := sLogger.Log(err) - if n%100 == 0 { //don't log all repeated error messages - log.Printf("no message received for %d times ", n) - } default: slog.Warn("problem receiving message, backing off", "err", err) time.Sleep(time.Second * 20) } - // update soh - health.Ok() continue } err = metrics.DoProcess(&e, []byte(r.Body)) if err != nil { log.Printf("problem processing message, skipping deletion for redelivery: %s", err) - // update soh - health.Ok() continue } - err = sqsClient.Delete(queueURL, r.ReceiptHandle) if err != nil { log.Printf("problem deleting message, continuing: %s", err) } - // update soh - health.Ok() } } diff --git a/cmd/fdsn-quake-consumer/main.go b/cmd/fdsn-quake-consumer/main.go index 9f1ce3a..5961c06 100644 --- a/cmd/fdsn-quake-consumer/main.go +++ b/cmd/fdsn-quake-consumer/main.go @@ -22,11 +22,10 @@ import ( "github.com/GeoNet/kit/cfg" "github.com/GeoNet/kit/health" "github.com/GeoNet/kit/metrics" - "github.com/GeoNet/kit/slogger" ) const ( - healthCheckAged = 5 * time.Minute //need to have a good heartbeat within this time (depends on tilde-bundle) + healthCheckAged = 5 * time.Minute //need to have a good heartbeat within this time healthCheckStartup = 5 * time.Minute //ignore heartbeat messages for this time after starting healthCheckTimeout = 30 * time.Second //health check timeout healthCheckService = ":7777" //end point to listen to for SOH checks @@ -38,14 +37,34 @@ var ( s3Client s3.S3 sqsClient sqs.SQS db *sql.DB - - sLogger = slogger.NewSmartLogger(10*time.Minute, "") // log repeated error messages ) type notification struct { s3.Event } +// init and check aws clients +func initAwsClients() { + queueURL = os.Getenv("SQS_QUEUE_URL") + if queueURL == "" { + log.Fatal("SQS_QUEUE_URL is not set") + } + + var err error + s3Client, err = s3.NewWithMaxRetries(100) + if err != nil { + log.Fatalf("creating S3 client: %s", err) + } + + sqsClient, err = sqs.NewWithMaxRetries(100) + if err != nil { + log.Fatalf("creating SQS client: %s", err) + } + if err = sqsClient.CheckQueue(queueURL); err != nil { + log.Fatalf("error checking queueURL %s: %s", queueURL, err.Error()) + } +} + func main() { //check health if health.RunningHealthCheck() { @@ -53,6 +72,8 @@ func main() { } //run as normal service + initAwsClients() + p, err := cfg.PostgresEnv() if err != nil { log.Fatalf("error reading DB config from the environment vars: %s", err) @@ -83,16 +104,6 @@ ping: break ping } - s3Client, err = s3.NewWithMaxRetries(100) - if err != nil { - log.Fatalf("creating S3 client: %s", err) - } - - sqsClient, err = sqs.NewWithMaxRetries(100) - if err != nil { - log.Fatalf("creating SQS client: %s", err) - } - log.Println("listening for messages") var r sqs.Raw @@ -103,31 +114,26 @@ ping: loop1: for { + health.Ok() // update soh + r, err = sqsClient.ReceiveWithContext(ctx, queueURL, 600) if err != nil { switch { + case sqs.IsNoMessagesError(err): + continue case sqs.Cancelled(err): //stoped log.Println("##1 system stop... ") break loop1 - case sqs.IsNoMessagesError(err): - n := sLogger.Log(err) - if n%100 == 0 { //don't log all repeated error messages - log.Printf("no message received for %d times ", n) - } default: slog.Warn("problem receiving message, backing off", "err", err) time.Sleep(time.Second * 20) } - // update soh - health.Ok() continue } err = metrics.DoProcess(&n, []byte(r.Body)) if err != nil { log.Printf("problem processing message, skipping deletion for redelivery: %s", err) - // update soh - health.Ok() continue } @@ -135,8 +141,6 @@ loop1: if err != nil { log.Printf("problem deleting message, continuing: %s", err) } - // update soh - health.Ok() } } diff --git a/cmd/fdsn-ws/fdsn_dataselect.go b/cmd/fdsn-ws/fdsn_dataselect.go index 9e0c979..153da40 100644 --- a/cmd/fdsn-ws/fdsn_dataselect.go +++ b/cmd/fdsn-ws/fdsn_dataselect.go @@ -63,6 +63,10 @@ func initDataselectTemplate() { log.Fatalf("error creating S3 client: %s", err) } s3Client = &s3c + + if err = s3Client.CheckBucket(S3_BUCKET); err != nil { + log.Fatalf("error checking S3_BUCKET %s: %s", S3_BUCKET, err.Error()) + } } // fdsnDataMetricsV1Handler handles all datametrics queries. diff --git a/go.mod b/go.mod index 87342e0..a77ad9f 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/GeoNet/fdsn go 1.21 require ( - github.com/GeoNet/kit v0.0.0-20241117195712-f2c17e5af268 + github.com/GeoNet/kit v0.0.0-20241129025613-745247c4fb1c github.com/gorilla/schema v1.4.1 github.com/joho/godotenv v1.5.1 github.com/lib/pq v1.10.3 diff --git a/go.sum b/go.sum index a5c407f..8e89387 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -github.com/GeoNet/kit v0.0.0-20241117195712-f2c17e5af268 h1:SeKMshwK+xOgKLKrMSPhYTQImmLop5tXXei/wOmgO80= -github.com/GeoNet/kit v0.0.0-20241117195712-f2c17e5af268/go.mod h1:O5T12WrCE1SOD52A9Ye//Wjl3HX7BFZv3dXzDz3adMo= +github.com/GeoNet/kit v0.0.0-20241129025613-745247c4fb1c h1:yrk9pbtLaPEWmmrx2v5U457PnEzIg1o+Q6X0hOZWWS0= +github.com/GeoNet/kit v0.0.0-20241129025613-745247c4fb1c/go.mod h1:O5T12WrCE1SOD52A9Ye//Wjl3HX7BFZv3dXzDz3adMo= github.com/aws/aws-sdk-go-v2 v1.25.3 h1:xYiLpZTQs1mzvz5PaI6uR0Wh57ippuEthxS4iK5v0n0= github.com/aws/aws-sdk-go-v2 v1.25.3/go.mod h1:35hUlJVYd+M++iLI3ALmVwMOyRYMmRqUXpTtRGW+K9I= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.1 h1:gTK2uhtAPtFcdRRJilZPx8uJLL2J85xK11nKtWL0wfU= diff --git a/vendor/github.com/GeoNet/kit/aws/s3/s3.go b/vendor/github.com/GeoNet/kit/aws/s3/s3.go index 1cd6baf..9a0cdab 100644 --- a/vendor/github.com/GeoNet/kit/aws/s3/s3.go +++ b/vendor/github.com/GeoNet/kit/aws/s3/s3.go @@ -271,6 +271,15 @@ func (s *S3) PutWithMetadata(bucket, key string, object []byte, metadata Meta) e return err } +// CheckBucket checks if the given S3 bucket exists and is accessible. +func (s *S3) CheckBucket(bucket string) error { + _, err := s.client.HeadBucket(context.TODO(), &s3.HeadBucketInput{ + Bucket: aws.String(bucket), + }) + + return err +} + // Exists checks if an object for key already exists in the bucket. func (s *S3) Exists(bucket, key string) (bool, error) { diff --git a/vendor/github.com/GeoNet/kit/aws/sqs/sqs.go b/vendor/github.com/GeoNet/kit/aws/sqs/sqs.go index bd7f21c..58c7a4b 100644 --- a/vendor/github.com/GeoNet/kit/aws/sqs/sqs.go +++ b/vendor/github.com/GeoNet/kit/aws/sqs/sqs.go @@ -320,6 +320,18 @@ func (s *SQS) CreateQueue(queueName string, isFifoQueue bool) (string, error) { return aws.ToString(queue.QueueUrl), err } +// CheckQueue checks if the given SQS queue exists and is accessible. +func (s *SQS) CheckQueue(queueUrl string) error { + params := sqs.GetQueueAttributesInput{ + QueueUrl: aws.String(queueUrl), + AttributeNames: []types.QueueAttributeName{ + types.QueueAttributeNameAll, + }, + } + _, err := s.client.GetQueueAttributes(context.TODO(), ¶ms) + return err +} + // DeleteQueue deletes an Amazon SQS queue. func (s *SQS) DeleteQueue(queueUrl string) error { _, err := s.client.DeleteQueue(context.TODO(), &sqs.DeleteQueueInput{ diff --git a/vendor/modules.txt b/vendor/modules.txt index 5dbd532..447cf6a 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1,4 +1,4 @@ -# github.com/GeoNet/kit v0.0.0-20241117195712-f2c17e5af268 +# github.com/GeoNet/kit v0.0.0-20241129025613-745247c4fb1c ## explicit; go 1.21 github.com/GeoNet/kit/aws/s3 github.com/GeoNet/kit/aws/sqs