Skip to content

Commit

Permalink
update to add health checks
Browse files Browse the repository at this point in the history
  • Loading branch information
bpeng committed Nov 21, 2024
1 parent e62d85e commit d39c4e4
Show file tree
Hide file tree
Showing 16 changed files with 564 additions and 39 deletions.
68 changes: 65 additions & 3 deletions cmd/fdsn-holdings-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,33 @@
package main

import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"log"
"log/slog"
"os"
"os/signal"
"strings"
"syscall"
"time"

"github.com/GeoNet/kit/aws/s3"
"github.com/GeoNet/kit/aws/sqs"
"github.com/GeoNet/kit/cfg"
"github.com/GeoNet/kit/health"
"github.com/GeoNet/kit/metrics"
"github.com/GeoNet/kit/slogger"
)

const (
healthCheckAged = 5 * time.Minute //need to have a good heartbeat within this time (depends on tilde-bundle)
healthCheckStartup = 5 * time.Minute //ignore heartbeat messages for this time after starting
healthCheckTimeout = 30 * time.Second //health check timeout
healthCheckService = ":7777" //end point to listen to for SOH checks
healthCheckPath = "/soh"
)

var (
Expand All @@ -31,13 +45,21 @@ var (
sqsClient sqs.SQS
s3Client *s3.S3
saveHoldings *sql.Stmt

sLogger = slogger.NewSmartLogger(10*time.Minute, "") // log repeated error messages
)

type event struct {
s3.Event
}

func main() {
//check health
if health.RunningHealthCheck() {
healthCheck()
}

//run as normal service
p, err := cfg.PostgresEnv()
if err != nil {
log.Fatalf("error reading DB config from the environment vars: %s", err)
Expand Down Expand Up @@ -80,11 +102,15 @@ func main() {
db.SetMaxIdleConns(p.MaxIdle)
db.SetMaxOpenConns(p.MaxOpen)

// provide a soh heartbeat
health := health.New(healthCheckService, healthCheckAged, healthCheckStartup)

ping:
for {
err = db.Ping()
if err != nil {
log.Println("problem pinging DB sleeping and retrying")
health.Ok() //send heartbeat
time.Sleep(time.Second * 30)
continue ping
}
Expand All @@ -107,25 +133,61 @@ ping:
var r sqs.Raw
var e event

ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()

loop1:
for {
r, err = sqsClient.Receive(queueURL, 600)
r, err = sqsClient.ReceiveWithContext(ctx, queueURL, 600)
if err != nil {
log.Printf("problem receiving message, backing off: %s", err)
time.Sleep(time.Second * 20)
switch {
case sqs.Cancelled(err): //stoped
log.Println("##1 system stop... ")
break loop1
case sqs.IsNoMessagesError(err):
n := sLogger.Log(err)
if n%100 == 0 { //don't log all repeated error messages
log.Printf("no message received for %d times ", n)
}
default:
slog.Warn("problem receiving message, backing off", "err", err)
time.Sleep(time.Second * 20)
}
// update soh
health.Ok()
continue
}

err = metrics.DoProcess(&e, []byte(r.Body))
if err != nil {
log.Printf("problem processing message, skipping deletion for redelivery: %s", err)
// update soh
health.Ok()
continue
}

err = sqsClient.Delete(queueURL, r.ReceiptHandle)
if err != nil {
log.Printf("problem deleting message, continuing: %s", err)
}
// update soh
health.Ok()
}
}

// check health by calling the http soh endpoint
// cmd: ./fdsn-holdings-consumer -check
func healthCheck() {
ctx, cancel := context.WithTimeout(context.Background(), healthCheckTimeout)
defer cancel()

msg, err := health.Check(ctx, healthCheckService+healthCheckPath, healthCheckTimeout)
if err != nil {
log.Printf("status: %v", err)
os.Exit(1)
}
log.Printf("status: %s", string(msg))
os.Exit(0)
}

// Process implements msg.Processor for event.
Expand Down
69 changes: 66 additions & 3 deletions cmd/fdsn-quake-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,32 +5,54 @@ package main

import (
"bytes"
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"log"
"log/slog"
"os"
"os/signal"
"syscall"
"time"

"github.com/GeoNet/kit/aws/s3"
"github.com/GeoNet/kit/aws/sqs"
"github.com/GeoNet/kit/cfg"
"github.com/GeoNet/kit/health"
"github.com/GeoNet/kit/metrics"
"github.com/GeoNet/kit/slogger"
)

const (
healthCheckAged = 5 * time.Minute //need to have a good heartbeat within this time (depends on tilde-bundle)
healthCheckStartup = 5 * time.Minute //ignore heartbeat messages for this time after starting
healthCheckTimeout = 30 * time.Second //health check timeout
healthCheckService = ":7777" //end point to listen to for SOH checks
healthCheckPath = "/soh"
)

var (
queueURL = os.Getenv("SQS_QUEUE_URL")
s3Client s3.S3
sqsClient sqs.SQS
db *sql.DB

sLogger = slogger.NewSmartLogger(10*time.Minute, "") // log repeated error messages
)

type notification struct {
s3.Event
}

func main() {
//check health
if health.RunningHealthCheck() {
healthCheck()
}

//run as normal service
p, err := cfg.PostgresEnv()
if err != nil {
log.Fatalf("error reading DB config from the environment vars: %s", err)
Expand All @@ -45,11 +67,16 @@ func main() {
db.SetMaxIdleConns(p.MaxIdle)
db.SetMaxOpenConns(p.MaxOpen)

// provide a soh heartbeat
health := health.New(healthCheckService, healthCheckAged, healthCheckStartup)

ping:
for {
err = db.Ping()
if err != nil {
log.Println("problem pinging DB sleeping and retrying")
health.Ok() //send heartbeat

time.Sleep(time.Second * 30)
continue ping
}
Expand All @@ -71,25 +98,61 @@ ping:
var r sqs.Raw
var n notification

ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()

loop1:
for {
r, err = sqsClient.Receive(queueURL, 600)
r, err = sqsClient.ReceiveWithContext(ctx, queueURL, 600)
if err != nil {
log.Printf("problem receiving message, backing off: %s", err)
time.Sleep(time.Second * 20)
switch {
case sqs.Cancelled(err): //stoped
log.Println("##1 system stop... ")
break loop1
case sqs.IsNoMessagesError(err):
n := sLogger.Log(err)
if n%100 == 0 { //don't log all repeated error messages
log.Printf("no message received for %d times ", n)
}
default:
slog.Warn("problem receiving message, backing off", "err", err)
time.Sleep(time.Second * 20)
}
// update soh
health.Ok()
continue
}

err = metrics.DoProcess(&n, []byte(r.Body))
if err != nil {
log.Printf("problem processing message, skipping deletion for redelivery: %s", err)
// update soh
health.Ok()
continue
}

err = sqsClient.Delete(queueURL, r.ReceiptHandle)
if err != nil {
log.Printf("problem deleting message, continuing: %s", err)
}
// update soh
health.Ok()
}
}

// check health by calling the http soh endpoint
// cmd: ./fdsn-quake-consumer -check
func healthCheck() {
ctx, cancel := context.WithTimeout(context.Background(), healthCheckTimeout)
defer cancel()

msg, err := health.Check(ctx, healthCheckService+healthCheckPath, healthCheckTimeout)
if err != nil {
log.Printf("status: %v", err)
os.Exit(1)
}
log.Printf("status: %s", string(msg))
os.Exit(0)
}

// Process implements msg.Processor for event.
Expand Down
24 changes: 24 additions & 0 deletions cmd/fdsn-ws/server.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"database/sql"
"log"
"net/http"
Expand All @@ -10,6 +11,7 @@ import (
"time"

"github.com/GeoNet/kit/cfg"
"github.com/GeoNet/kit/health"
"github.com/gorilla/schema"
_ "github.com/lib/pq"
)
Expand All @@ -36,6 +38,12 @@ func newDecoder() *schema.Decoder {
}

func main() {
//check health if flagged in cmd
if health.RunningHealthCheck() {
healthCheck()
}

//run as normal service
var err error
if S3_BUCKET = os.Getenv("S3_BUCKET"); S3_BUCKET == "" {
log.Fatal("ERROR: S3_BUCKET environment variable is not set")
Expand Down Expand Up @@ -84,3 +92,19 @@ func main() {
}
log.Fatal(server.ListenAndServe())
}

// check health by calling the http soh endpoint
// cmd: ./fdsn-ws -check
func healthCheck() {
timeout := 30 * time.Second
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

msg, err := health.Check(ctx, ":8080/soh", timeout)
if err != nil {
log.Printf("status: %v", err)
os.Exit(1)
}
log.Printf("status: %s", string(msg))
os.Exit(0)
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/GeoNet/fdsn
go 1.21

require (
github.com/GeoNet/kit v0.0.0-20240512234353-4d4493144f60
github.com/GeoNet/kit v0.0.0-20241117195712-f2c17e5af268
github.com/gorilla/schema v1.4.1
github.com/joho/godotenv v1.5.1
github.com/lib/pq v1.10.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
github.com/GeoNet/kit v0.0.0-20240512234353-4d4493144f60 h1:BgAWCVg+WxU28mXiy/3le7H9nZUo37QS/+GfXSFWYgo=
github.com/GeoNet/kit v0.0.0-20240512234353-4d4493144f60/go.mod h1:O5T12WrCE1SOD52A9Ye//Wjl3HX7BFZv3dXzDz3adMo=
github.com/GeoNet/kit v0.0.0-20241117195712-f2c17e5af268 h1:SeKMshwK+xOgKLKrMSPhYTQImmLop5tXXei/wOmgO80=
github.com/GeoNet/kit v0.0.0-20241117195712-f2c17e5af268/go.mod h1:O5T12WrCE1SOD52A9Ye//Wjl3HX7BFZv3dXzDz3adMo=
github.com/aws/aws-sdk-go-v2 v1.25.3 h1:xYiLpZTQs1mzvz5PaI6uR0Wh57ippuEthxS4iK5v0n0=
github.com/aws/aws-sdk-go-v2 v1.25.3/go.mod h1:35hUlJVYd+M++iLI3ALmVwMOyRYMmRqUXpTtRGW+K9I=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.1 h1:gTK2uhtAPtFcdRRJilZPx8uJLL2J85xK11nKtWL0wfU=
Expand Down
2 changes: 1 addition & 1 deletion vendor/github.com/GeoNet/kit/aws/s3/s3.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit d39c4e4

Please sign in to comment.