Skip to content

Commit

Permalink
Add handler to notify when controller starts up
Browse files Browse the repository at this point in the history
- This will allow detecting if the controller unexpectedly reboots
  which would cause interruption to lighting or watering
  • Loading branch information
calvinmclean committed Aug 30, 2024
1 parent 28c2aec commit b67dcb3
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 106 deletions.
10 changes: 5 additions & 5 deletions garden-app/integration_tests/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,8 +409,8 @@ func ZoneTests(t *testing.T) {
})
}

func floatPointer(n float32) *float32 {
return &n
func pointer[T any](v T) *T {
return &v
}

func WaterScheduleTests(t *testing.T) {
Expand Down Expand Up @@ -443,9 +443,9 @@ func WaterScheduleTests(t *testing.T) {
status, err = makeRequest(http.MethodPatch, "/water_schedules/"+waterScheduleID, pkg.WaterSchedule{
WeatherControl: &weather.Control{
Rain: &weather.ScaleControl{
BaselineValue: floatPointer(0),
Factor: floatPointer(0),
Range: floatPointer(25.4),
BaselineValue: pointer[float32](0),
Factor: pointer[float32](0),
Range: pointer[float32](25.4),
ClientID: weatherClientWithRain,
},
},
Expand Down
125 changes: 125 additions & 0 deletions garden-app/worker/notification_handler_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package worker

import (
"context"
"errors"
"fmt"
"log/slog"
"strconv"
"strings"

"github.com/calvinmclean/automated-garden/garden-app/pkg"
)

const notificationClientIDLogField = "notification_client_id"

func (w *Worker) sendNotificationForGarden(garden *pkg.Garden, title, message string, logger *slog.Logger) error {
if garden.GetNotificationClientID() == "" {
logger.Info("garden does not have notification client", "garden_id", garden.GetID())
return nil
}
logger = logger.With(notificationClientIDLogField, garden.GetNotificationClientID())

notificationClient, err := w.storageClient.NotificationClientConfigs.Get(context.Background(), garden.GetNotificationClientID())
if err != nil {
return fmt.Errorf("error getting all notification clients: %w", err)
}

err = notificationClient.SendMessage(title, message)
if err != nil {
logger.Error("error sending message", "error", err)
return err
}

logger.Info("successfully send notification")
return nil
}

func (w *Worker) getGardenForTopic(topic string) (*pkg.Garden, error) {
splitTopic := strings.SplitN(topic, "/", 2)
if len(splitTopic) != 2 {
return nil, fmt.Errorf("unexpected short topic: %q", topic)
}

topicPrefix := splitTopic[0]
if topicPrefix == "" {
return nil, errors.New("received message on empty topic")
}

garden, err := w.getGarden(topicPrefix)
if err != nil {
return nil, fmt.Errorf("error getting garden with topic-prefix %q: %w", topicPrefix, err)
}
return garden, nil
}

func (w *Worker) getGarden(topicPrefix string) (*pkg.Garden, error) {
gardens, err := w.storageClient.Gardens.GetAll(context.Background(), nil)
if err != nil {
return nil, fmt.Errorf("error getting all gardens: %w", err)
}
var garden *pkg.Garden
for _, g := range gardens {
if g.TopicPrefix == topicPrefix {
garden = g
break
}
}
if garden == nil {
return nil, errors.New("no garden found")
}

return garden, nil
}

func (w *Worker) getZone(gardenID string, zonePosition int) (*pkg.Zone, error) {
zones, err := w.storageClient.Zones.GetAll(context.Background(), nil)
if err != nil {
return nil, fmt.Errorf("error getting all zones: %w", err)
}
var zone *pkg.Zone
for _, z := range zones {
if z.GardenID.String() == gardenID &&
z.Position != nil &&
*z.Position == uint(zonePosition) {
zone = z
break
}
}
if zone == nil {
return nil, errors.New("no zone found")
}

return zone, nil
}

type parser struct {
data []byte
i int
}

func (p *parser) readNextInt() (int, error) {
reading := false
var n []byte
for ; p.i < len(p.data); p.i++ {
c := p.data[p.i]
if c == ' ' {
p.i++
break
}
if reading {
n = append(n, c)
continue
}
if c == '=' {
reading = true
continue
}
}

result, err := strconv.Atoi(string(n))
if err != nil {
return 0, fmt.Errorf("invalid integer: %w", err)
}
return result, nil
}
40 changes: 40 additions & 0 deletions garden-app/worker/startup_notification_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package worker

import (
"fmt"
"strings"

mqtt "github.com/eclipse/paho.mqtt.golang"
)

func (w *Worker) handleGardenStartupMessage(_ mqtt.Client, msg mqtt.Message) {
err := w.doGardenStartupMessage(msg.Topic(), msg.Payload())
if err != nil {
w.logger.With("topic", msg.Topic(), "error", err).Error("error handling message")
}
}

func (w *Worker) doGardenStartupMessage(topic string, payload []byte) error {
logger := w.logger.With("topic", topic)

msg := parseStartupMessage(payload)
if msg != "garden-controller setup complete" {
logger.Warn("unexpected message from controller", "message", string(payload))
return nil
}
logger.Info("received message", "message", string(payload))

garden, err := w.getGardenForTopic(topic)
if err != nil {
return err
}
logger = logger.With("garden_id", garden.GetID())
logger.Info("found garden with topic-prefix")

title := fmt.Sprintf("%s connected", garden.Name)
return w.sendNotificationForGarden(garden, title, msg, logger)
}

func parseStartupMessage(msg []byte) string {
return strings.TrimSuffix(strings.TrimPrefix(string(msg), "logs message=\""), "\"")
}
13 changes: 13 additions & 0 deletions garden-app/worker/startup_notification_handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package worker

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestParseStartupMessage(t *testing.T) {
input := "logs message=\"garden-controller setup complete\""
msg := parseStartupMessage([]byte(input))
require.Equal(t, "garden-controller setup complete", msg)
}
104 changes: 3 additions & 101 deletions garden-app/worker/water_notification_handler.go
Original file line number Diff line number Diff line change
@@ -1,59 +1,12 @@
package worker

import (
"context"
"errors"
"fmt"
"strconv"
"strings"
"time"

"github.com/calvinmclean/automated-garden/garden-app/pkg"
mqtt "github.com/eclipse/paho.mqtt.golang"
)

const notificationClientIDLogField = "notification_client_id"

func (w *Worker) getGarden(topicPrefix string) (*pkg.Garden, error) {
gardens, err := w.storageClient.Gardens.GetAll(context.Background(), nil)
if err != nil {
return nil, fmt.Errorf("error getting all gardens: %w", err)
}
var garden *pkg.Garden
for _, g := range gardens {
if g.TopicPrefix == topicPrefix {
garden = g
break
}
}
if garden == nil {
return nil, errors.New("no garden found")
}

return garden, nil
}

func (w *Worker) getZone(gardenID string, zonePosition int) (*pkg.Zone, error) {
zones, err := w.storageClient.Zones.GetAll(context.Background(), nil)
if err != nil {
return nil, fmt.Errorf("error getting all zones: %w", err)
}
var zone *pkg.Zone
for _, z := range zones {
if z.GardenID.String() == gardenID &&
z.Position != nil &&
*z.Position == uint(zonePosition) {
zone = z
break
}
}
if zone == nil {
return nil, errors.New("no zone found")
}

return zone, nil
}

func (w *Worker) handleWaterCompleteMessage(_ mqtt.Client, msg mqtt.Message) {
err := w.doWaterCompleteMessage(msg.Topic(), msg.Payload())
if err != nil {
Expand All @@ -70,15 +23,9 @@ func (w *Worker) doWaterCompleteMessage(topic string, payload []byte) error {
return fmt.Errorf("error parsing message: %w", err)
}

topicPrefix := strings.TrimSuffix(topic, "/data/water")
if topicPrefix == "" {
return fmt.Errorf("received message on invalid topic: %w", err)
}
logger = logger.With("topic_prefix", topicPrefix)

garden, err := w.getGarden(topicPrefix)
garden, err := w.getGardenForTopic(topic)
if err != nil {
return fmt.Errorf("error getting garden with topic-prefix %q: %w", topicPrefix, err)
return err
}
logger = logger.With("garden_id", garden.GetID())
logger.Info("found garden with topic-prefix")
Expand All @@ -95,23 +42,9 @@ func (w *Worker) doWaterCompleteMessage(topic string, payload []byte) error {
}
logger.Info("found zone with position", "zone_position", zonePosition, "zone_id", zone.GetID())

notificationClient, err := w.storageClient.NotificationClientConfigs.Get(context.Background(), garden.GetNotificationClientID())
if err != nil {
return fmt.Errorf("error getting all notification clients: %w", err)
}

title := fmt.Sprintf("%s finished watering", zone.Name)
message := fmt.Sprintf("watered for %s", waterDuration.String())

err = notificationClient.SendMessage(title, message)
if err != nil {
logger.Error("error sending message", "error", err)
return err
}

logger.Info("successfully send notification")

return nil
return w.sendNotificationForGarden(garden, title, message, logger)
}

func parseWaterMessage(msg []byte) (int, time.Duration, error) {
Expand All @@ -129,34 +62,3 @@ func parseWaterMessage(msg []byte) (int, time.Duration, error) {

return zonePosition, waterDuration, nil
}

type parser struct {
data []byte
i int
}

func (p *parser) readNextInt() (int, error) {
reading := false
var n []byte
for ; p.i < len(p.data); p.i++ {
c := p.data[p.i]
if c == ' ' {
p.i++
break
}
if reading {
n = append(n, c)
continue
}
if c == '=' {
reading = true
continue
}
}

result, err := strconv.Atoi(string(n))
if err != nil {
return 0, fmt.Errorf("invalid integer: %w", err)
}
return result, nil
}
4 changes: 4 additions & 0 deletions garden-app/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ func (w *Worker) StartAsync() {
Topic: "+/data/water",
Handler: w.handleWaterCompleteMessage,
})
w.mqttClient.AddHandler(mqtt.TopicHandler{
Topic: "+/data/logs",
Handler: w.handleGardenStartupMessage,
})
}
}

Expand Down

0 comments on commit b67dcb3

Please sign in to comment.