Skip to content

Commit

Permalink
add start checks
Browse files Browse the repository at this point in the history
  • Loading branch information
bpeng committed Dec 4, 2024
1 parent d39c4e4 commit 363a81b
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 57 deletions.
58 changes: 30 additions & 28 deletions cmd/fdsn-holdings-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,10 @@ import (
"github.com/GeoNet/kit/cfg"
"github.com/GeoNet/kit/health"
"github.com/GeoNet/kit/metrics"
"github.com/GeoNet/kit/slogger"
)

const (
healthCheckAged = 5 * time.Minute //need to have a good heartbeat within this time (depends on tilde-bundle)
healthCheckAged = 5 * time.Minute //need to have a good heartbeat within this time
healthCheckStartup = 5 * time.Minute //ignore heartbeat messages for this time after starting
healthCheckTimeout = 30 * time.Second //health check timeout
healthCheckService = ":7777" //end point to listen to for SOH checks
Expand All @@ -41,25 +40,48 @@ const (

var (
db *sql.DB
queueURL = os.Getenv("SQS_QUEUE_URL")
queueURL string
sqsClient sqs.SQS
s3Client *s3.S3
saveHoldings *sql.Stmt

sLogger = slogger.NewSmartLogger(10*time.Minute, "") // log repeated error messages
)

type event struct {
s3.Event
}

// init and check aws variables
func initAwsClient() {
queueURL = os.Getenv("SQS_QUEUE_URL")
if queueURL == "" {
log.Fatal("SQS_QUEUE_URL is not set")
}

var err error
sqsClient, err = sqs.NewWithMaxRetries(100)
if err != nil {
log.Fatalf("error creating SQS client: %s", err)
}
if err = sqsClient.CheckQueue(queueURL); err != nil {
log.Fatalf("error checking queueURL %s: %s", queueURL, err.Error())
}

s3c, err := s3.NewWithMaxRetries(3)
if err != nil {
log.Fatalf("error creating S3 client: %s", err)
}
s3Client = &s3c

}

func main() {
//check health
if health.RunningHealthCheck() {
healthCheck()
}

//run as normal service
initAwsClient()
p, err := cfg.PostgresEnv()
if err != nil {
log.Fatalf("error reading DB config from the environment vars: %s", err)
Expand Down Expand Up @@ -117,17 +139,6 @@ ping:
break ping
}

sqsClient, err = sqs.NewWithMaxRetries(100)
if err != nil {
log.Fatalf("error creating SQS client: %s", err)
}

s3c, err := s3.NewWithMaxRetries(3)
if err != nil {
log.Fatalf("error creating S3 client: %s", err)
}
s3Client = &s3c

log.Println("listening for messages")

var r sqs.Raw
Expand All @@ -138,40 +149,31 @@ ping:

loop1:
for {
health.Ok() // update soh
r, err = sqsClient.ReceiveWithContext(ctx, queueURL, 600)
if err != nil {
switch {
case sqs.IsNoMessagesError(err):
continue
case sqs.Cancelled(err): //stoped
log.Println("##1 system stop... ")
break loop1
case sqs.IsNoMessagesError(err):
n := sLogger.Log(err)
if n%100 == 0 { //don't log all repeated error messages
log.Printf("no message received for %d times ", n)
}
default:
slog.Warn("problem receiving message, backing off", "err", err)
time.Sleep(time.Second * 20)
}
// update soh
health.Ok()
continue
}

err = metrics.DoProcess(&e, []byte(r.Body))
if err != nil {
log.Printf("problem processing message, skipping deletion for redelivery: %s", err)
// update soh
health.Ok()
continue
}

err = sqsClient.Delete(queueURL, r.ReceiptHandle)
if err != nil {
log.Printf("problem deleting message, continuing: %s", err)
}
// update soh
health.Ok()
}
}

Expand Down
54 changes: 29 additions & 25 deletions cmd/fdsn-quake-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@ import (
"github.com/GeoNet/kit/cfg"
"github.com/GeoNet/kit/health"
"github.com/GeoNet/kit/metrics"
"github.com/GeoNet/kit/slogger"
)

const (
healthCheckAged = 5 * time.Minute //need to have a good heartbeat within this time (depends on tilde-bundle)
healthCheckAged = 5 * time.Minute //need to have a good heartbeat within this time
healthCheckStartup = 5 * time.Minute //ignore heartbeat messages for this time after starting
healthCheckTimeout = 30 * time.Second //health check timeout
healthCheckService = ":7777" //end point to listen to for SOH checks
Expand All @@ -38,21 +37,43 @@ var (
s3Client s3.S3
sqsClient sqs.SQS
db *sql.DB

sLogger = slogger.NewSmartLogger(10*time.Minute, "") // log repeated error messages
)

type notification struct {
s3.Event
}

// init and check aws clients
func initAwsClients() {
queueURL = os.Getenv("SQS_QUEUE_URL")
if queueURL == "" {
log.Fatal("SQS_QUEUE_URL is not set")
}

var err error
s3Client, err = s3.NewWithMaxRetries(100)
if err != nil {
log.Fatalf("creating S3 client: %s", err)
}

sqsClient, err = sqs.NewWithMaxRetries(100)
if err != nil {
log.Fatalf("creating SQS client: %s", err)
}
if err = sqsClient.CheckQueue(queueURL); err != nil {
log.Fatalf("error checking queueURL %s: %s", queueURL, err.Error())
}
}

func main() {
//check health
if health.RunningHealthCheck() {
healthCheck()
}

//run as normal service
initAwsClients()

p, err := cfg.PostgresEnv()
if err != nil {
log.Fatalf("error reading DB config from the environment vars: %s", err)
Expand Down Expand Up @@ -83,16 +104,6 @@ ping:
break ping
}

s3Client, err = s3.NewWithMaxRetries(100)
if err != nil {
log.Fatalf("creating S3 client: %s", err)
}

sqsClient, err = sqs.NewWithMaxRetries(100)
if err != nil {
log.Fatalf("creating SQS client: %s", err)
}

log.Println("listening for messages")

var r sqs.Raw
Expand All @@ -103,40 +114,33 @@ ping:

loop1:
for {
health.Ok() // update soh

r, err = sqsClient.ReceiveWithContext(ctx, queueURL, 600)
if err != nil {
switch {
case sqs.IsNoMessagesError(err):
continue
case sqs.Cancelled(err): //stoped
log.Println("##1 system stop... ")
break loop1
case sqs.IsNoMessagesError(err):
n := sLogger.Log(err)
if n%100 == 0 { //don't log all repeated error messages
log.Printf("no message received for %d times ", n)
}
default:
slog.Warn("problem receiving message, backing off", "err", err)
time.Sleep(time.Second * 20)
}
// update soh
health.Ok()
continue
}

err = metrics.DoProcess(&n, []byte(r.Body))
if err != nil {
log.Printf("problem processing message, skipping deletion for redelivery: %s", err)
// update soh
health.Ok()
continue
}

err = sqsClient.Delete(queueURL, r.ReceiptHandle)
if err != nil {
log.Printf("problem deleting message, continuing: %s", err)
}
// update soh
health.Ok()
}
}

Expand Down
4 changes: 4 additions & 0 deletions cmd/fdsn-ws/fdsn_dataselect.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ func initDataselectTemplate() {
log.Fatalf("error creating S3 client: %s", err)
}
s3Client = &s3c

if err = s3Client.CheckBucket(S3_BUCKET); err != nil {
log.Fatalf("error checking S3_BUCKET %s: %s", S3_BUCKET, err.Error())
}
}

// fdsnDataMetricsV1Handler handles all datametrics queries.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/GeoNet/fdsn
go 1.21

require (
github.com/GeoNet/kit v0.0.0-20241117195712-f2c17e5af268
github.com/GeoNet/kit v0.0.0-20241129025613-745247c4fb1c
github.com/gorilla/schema v1.4.1
github.com/joho/godotenv v1.5.1
github.com/lib/pq v1.10.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
github.com/GeoNet/kit v0.0.0-20241117195712-f2c17e5af268 h1:SeKMshwK+xOgKLKrMSPhYTQImmLop5tXXei/wOmgO80=
github.com/GeoNet/kit v0.0.0-20241117195712-f2c17e5af268/go.mod h1:O5T12WrCE1SOD52A9Ye//Wjl3HX7BFZv3dXzDz3adMo=
github.com/GeoNet/kit v0.0.0-20241129025613-745247c4fb1c h1:yrk9pbtLaPEWmmrx2v5U457PnEzIg1o+Q6X0hOZWWS0=
github.com/GeoNet/kit v0.0.0-20241129025613-745247c4fb1c/go.mod h1:O5T12WrCE1SOD52A9Ye//Wjl3HX7BFZv3dXzDz3adMo=
github.com/aws/aws-sdk-go-v2 v1.25.3 h1:xYiLpZTQs1mzvz5PaI6uR0Wh57ippuEthxS4iK5v0n0=
github.com/aws/aws-sdk-go-v2 v1.25.3/go.mod h1:35hUlJVYd+M++iLI3ALmVwMOyRYMmRqUXpTtRGW+K9I=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.1 h1:gTK2uhtAPtFcdRRJilZPx8uJLL2J85xK11nKtWL0wfU=
Expand Down
9 changes: 9 additions & 0 deletions vendor/github.com/GeoNet/kit/aws/s3/s3.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions vendor/github.com/GeoNet/kit/aws/sqs/sqs.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion vendor/modules.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# github.com/GeoNet/kit v0.0.0-20241117195712-f2c17e5af268
# github.com/GeoNet/kit v0.0.0-20241129025613-745247c4fb1c
## explicit; go 1.21
github.com/GeoNet/kit/aws/s3
github.com/GeoNet/kit/aws/sqs
Expand Down

0 comments on commit 363a81b

Please sign in to comment.