diff --git a/garden-app/controller/controller.go b/garden-app/controller/controller.go index 6259093b..7822b535 100644 --- a/garden-app/controller/controller.go +++ b/garden-app/controller/controller.go @@ -297,6 +297,19 @@ func (c *Controller) publishTemperatureHumidityData() { } } +// PublishStartupLog publishes the message that controllers use to signal that they started up +func (c *Controller) PublishStartupLog(topicPrefix string) error { + topic := fmt.Sprintf("%s/data/logs", topicPrefix) + msg := "logs message=\"garden-controller setup complete\"" + + err := c.mqttClient.Publish(topic, []byte(msg)) + if err != nil { + return fmt.Errorf("error publishing startup log %w", err) + } + + return nil +} + // addNoise will take a base value and introduce some += variance based on the provided percentage range. This will // produce sensor data that is relatively consistent but not totally flat func addNoise(baseValue float64, percentRange float64) float64 { diff --git a/garden-app/integration_tests/main_test.go b/garden-app/integration_tests/main_test.go index 0193f24b..1f770b2c 100644 --- a/garden-app/integration_tests/main_test.go +++ b/garden-app/integration_tests/main_test.go @@ -14,6 +14,8 @@ import ( "github.com/calvinmclean/automated-garden/garden-app/controller" "github.com/calvinmclean/automated-garden/garden-app/pkg" "github.com/calvinmclean/automated-garden/garden-app/pkg/action" + "github.com/calvinmclean/automated-garden/garden-app/pkg/notifications" + fake_notification "github.com/calvinmclean/automated-garden/garden-app/pkg/notifications/fake" "github.com/calvinmclean/automated-garden/garden-app/pkg/weather" "github.com/calvinmclean/automated-garden/garden-app/pkg/weather/fake" "github.com/calvinmclean/automated-garden/garden-app/server" @@ -59,6 +61,7 @@ func TestIntegration(t *testing.T) { t.Run("Garden", GardenTests) t.Run("Zone", ZoneTests) t.Run("WaterSchedule", WaterScheduleTests) + t.Run("ControllerStartupNotification", ControllerStartupNotificationTest) } func getConfigs(t *testing.T) (server.Config, controller.Config) { @@ -409,8 +412,8 @@ func ZoneTests(t *testing.T) { }) } -func floatPointer(n float32) *float32 { - return &n +func pointer[T any](v T) *T { + return &v } func WaterScheduleTests(t *testing.T) { @@ -443,9 +446,9 @@ func WaterScheduleTests(t *testing.T) { status, err = makeRequest(http.MethodPatch, "/water_schedules/"+waterScheduleID, pkg.WaterSchedule{ WeatherControl: &weather.Control{ Rain: &weather.ScaleControl{ - BaselineValue: floatPointer(0), - Factor: floatPointer(0), - Range: floatPointer(25.4), + BaselineValue: pointer[float32](0), + Factor: pointer[float32](0), + Range: pointer[float32](25.4), ClientID: weatherClientWithRain, }, }, @@ -468,6 +471,49 @@ func WaterScheduleTests(t *testing.T) { }) } +func ControllerStartupNotificationTest(t *testing.T) { + var g server.GardenResponse + t.Run("CreateGarden", func(t *testing.T) { + status, err := makeRequest(http.MethodPost, "/gardens", `{ + "name": "Notification", + "topic_prefix": "notification", + "max_zones": 3 + }`, &g) + assert.NoError(t, err) + assert.Equal(t, http.StatusCreated, status) + }) + + var nc notifications.Client + t.Run("CreateNotificationClient", func(t *testing.T) { + status, err := makeRequest(http.MethodPost, "/notification_clients", `{ + "name": "fake client", + "type": "fake", + "options": {} + }`, &nc) + assert.NoError(t, err) + assert.Equal(t, http.StatusCreated, status) + }) + + t.Run("EnableNotificationsForGarden", func(t *testing.T) { + status, err := makeRequest(http.MethodPatch, "/gardens/"+g.GetID(), pkg.Garden{ + NotificationClientID: pointer(nc.GetID()), + }, &g) + assert.NoError(t, err) + assert.Equal(t, http.StatusOK, status) + }) + + t.Run("PublishStartupLogAndCheckNotification", func(t *testing.T) { + err := c.PublishStartupLog(g.TopicPrefix) + require.NoError(t, err) + + time.Sleep(500 * time.Millisecond) + + lastMsg := fake_notification.LastMessage() + require.Equal(t, "Notification connected", lastMsg.Title) + require.Equal(t, "garden-controller setup complete", lastMsg.Message) + }) +} + func makeRequest(method, path string, body, response interface{}) (int, error) { // TODO: Use babyapi Client var reqBody io.Reader diff --git a/garden-app/pkg/mqtt/mock_Client.go b/garden-app/pkg/mqtt/mock_Client.go index 77669ef0..2feca33e 100644 --- a/garden-app/pkg/mqtt/mock_Client.go +++ b/garden-app/pkg/mqtt/mock_Client.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.23.4. DO NOT EDIT. +// Code generated by mockery v2.45.0. DO NOT EDIT. package mqtt @@ -9,10 +9,19 @@ type MockClient struct { mock.Mock } +// AddHandler provides a mock function with given fields: _a0 +func (_m *MockClient) AddHandler(_a0 TopicHandler) { + _m.Called(_a0) +} + // Connect provides a mock function with given fields: func (_m *MockClient) Connect() error { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for Connect") + } + var r0 error if rf, ok := ret.Get(0).(func() error); ok { r0 = rf() @@ -32,6 +41,10 @@ func (_m *MockClient) Disconnect(_a0 uint) { func (_m *MockClient) LightTopic(_a0 string) (string, error) { ret := _m.Called(_a0) + if len(ret) == 0 { + panic("no return value specified for LightTopic") + } + var r0 string var r1 error if rf, ok := ret.Get(0).(func(string) (string, error)); ok { @@ -56,6 +69,10 @@ func (_m *MockClient) LightTopic(_a0 string) (string, error) { func (_m *MockClient) Publish(_a0 string, _a1 []byte) error { ret := _m.Called(_a0, _a1) + if len(ret) == 0 { + panic("no return value specified for Publish") + } + var r0 error if rf, ok := ret.Get(0).(func(string, []byte) error); ok { r0 = rf(_a0, _a1) @@ -70,6 +87,10 @@ func (_m *MockClient) Publish(_a0 string, _a1 []byte) error { func (_m *MockClient) StopAllTopic(_a0 string) (string, error) { ret := _m.Called(_a0) + if len(ret) == 0 { + panic("no return value specified for StopAllTopic") + } + var r0 string var r1 error if rf, ok := ret.Get(0).(func(string) (string, error)); ok { @@ -94,6 +115,10 @@ func (_m *MockClient) StopAllTopic(_a0 string) (string, error) { func (_m *MockClient) StopTopic(_a0 string) (string, error) { ret := _m.Called(_a0) + if len(ret) == 0 { + panic("no return value specified for StopTopic") + } + var r0 string var r1 error if rf, ok := ret.Get(0).(func(string) (string, error)); ok { @@ -118,6 +143,10 @@ func (_m *MockClient) StopTopic(_a0 string) (string, error) { func (_m *MockClient) WaterTopic(_a0 string) (string, error) { ret := _m.Called(_a0) + if len(ret) == 0 { + panic("no return value specified for WaterTopic") + } + var r0 string var r1 error if rf, ok := ret.Get(0).(func(string) (string, error)); ok { diff --git a/garden-app/pkg/mqtt/mqtt.go b/garden-app/pkg/mqtt/mqtt.go index 1f1e52c2..184fbd93 100644 --- a/garden-app/pkg/mqtt/mqtt.go +++ b/garden-app/pkg/mqtt/mqtt.go @@ -1,5 +1,7 @@ package mqtt +//go:generate mockery --all --inpackage + import ( "bytes" "errors" @@ -12,6 +14,8 @@ import ( "github.com/prometheus/client_golang/prometheus" ) +const QOS = byte(1) + var mqttClientSummary = prometheus.NewSummaryVec(prometheus.SummaryOpts{ Namespace: "garden_app", Name: "mqtt_client_duration_seconds", @@ -39,12 +43,16 @@ type Client interface { LightTopic(string) (string, error) Connect() error Disconnect(uint) + AddHandler(TopicHandler) } // client is a wrapper struct for connecting our config and MQTT Client. It implements the Client interface type client struct { mu sync.Mutex mqtt.Client + + handlers []TopicHandler + Config } @@ -58,17 +66,21 @@ type TopicHandler struct { // using the supplied functions to handle incoming messages. It really should be used with only one function, // but I wanted to make it an optional argument, which required using the variadic function argument func NewClient(config Config, defaultHandler mqtt.MessageHandler, handlers ...TopicHandler) (Client, error) { + client := &client{ + Config: config, + handlers: handlers, + } + opts := mqtt.NewClientOptions().AddBroker(fmt.Sprintf("tcp://%s:%d", config.Broker, config.Port)) opts.ClientID = config.ClientID opts.AutoReconnect = true opts.CleanSession = false - if len(handlers) > 0 { - opts.OnConnect = func(c mqtt.Client) { - for _, handler := range handlers { - if token := c.Subscribe(handler.Topic, byte(1), handler.Handler); token.Wait() && token.Error() != nil { - // TODO: can I return an error instead of panicking (recover maybe?) - panic(token.Error()) - } + opts.OnConnect = func(c mqtt.Client) { + for _, handler := range client.handlers { + token := c.Subscribe(handler.Topic, QOS, handler.Handler) + if token.Wait() && token.Error() != nil { + // TODO: can I return an error instead of panicking (recover maybe?) + panic(token.Error()) } } } @@ -79,7 +91,13 @@ func NewClient(config Config, defaultHandler mqtt.MessageHandler, handlers ...To return nil, err } - return &client{Client: mqtt.NewClient(opts), Config: config}, nil + client.Client = mqtt.NewClient(opts) + + return client, nil +} + +func (c *client) AddHandler(handler TopicHandler) { + c.handlers = append(c.handlers, handler) } // Connect uses the MQTT Client's Connect function but returns the error instead of Token diff --git a/garden-app/server/api.go b/garden-app/server/api.go index 61f9f9a4..68e2be65 100644 --- a/garden-app/server/api.go +++ b/garden-app/server/api.go @@ -117,10 +117,7 @@ func (api *API) Setup(cfg Config, validateData bool) error { "broker", cfg.MQTTConfig.Broker, "port", cfg.MQTTConfig.Port, ).Info("initializing MQTT client") - mqttClient, err := mqtt.NewClient(cfg.MQTTConfig, mqtt.DefaultHandler(logger), mqtt.TopicHandler{ - Topic: "+/data/water", - Handler: NewWaterNotificationHandler(storageClient, logger).HandleMessage, - }) + mqttClient, err := mqtt.NewClient(cfg.MQTTConfig, mqtt.DefaultHandler(logger)) if err != nil { return fmt.Errorf("unable to initialize MQTT client: %v", err) } diff --git a/garden-app/server/notification_clients.go b/garden-app/server/notification_clients.go index 08b758f2..6abf227f 100644 --- a/garden-app/server/notification_clients.go +++ b/garden-app/server/notification_clients.go @@ -12,8 +12,7 @@ import ( ) const ( - notificationClientsBasePath = "/notification_clients" - notificationClientIDLogField = "notification_client_id" + notificationClientsBasePath = "/notification_clients" ) // NotificationClientsAPI encapsulates the structs and dependencies necessary for the NotificationClients API diff --git a/garden-app/server/water_notification_handler.go b/garden-app/server/water_notification_handler.go deleted file mode 100644 index 9af26e57..00000000 --- a/garden-app/server/water_notification_handler.go +++ /dev/null @@ -1,171 +0,0 @@ -package server - -import ( - "context" - "errors" - "fmt" - "log/slog" - "strconv" - "strings" - "time" - - "github.com/calvinmclean/automated-garden/garden-app/pkg" - "github.com/calvinmclean/automated-garden/garden-app/pkg/storage" - mqtt "github.com/eclipse/paho.mqtt.golang" -) - -type WaterNotificationHandler struct { - storageClient *storage.Client - logger *slog.Logger -} - -func NewWaterNotificationHandler(storageClient *storage.Client, logger *slog.Logger) *WaterNotificationHandler { - return &WaterNotificationHandler{storageClient, logger} -} - -func (h *WaterNotificationHandler) getGarden(topicPrefix string) (*pkg.Garden, error) { - gardens, err := h.storageClient.Gardens.GetAll(context.Background(), nil) - if err != nil { - return nil, fmt.Errorf("error getting all gardens: %w", err) - } - var garden *pkg.Garden - for _, g := range gardens { - if g.TopicPrefix == topicPrefix { - garden = g - break - } - } - if garden == nil { - return nil, errors.New("no garden found") - } - - return garden, nil -} - -func (h *WaterNotificationHandler) getZone(gardenID string, zonePosition int) (*pkg.Zone, error) { - zones, err := h.storageClient.Zones.GetAll(context.Background(), nil) - if err != nil { - return nil, fmt.Errorf("error getting all zones: %w", err) - } - var zone *pkg.Zone - for _, z := range zones { - if z.GardenID.String() == gardenID && - z.Position != nil && - *z.Position == uint(zonePosition) { - zone = z - break - } - } - if zone == nil { - return nil, errors.New("no zone found") - } - - return zone, nil -} - -func (h *WaterNotificationHandler) HandleMessage(_ mqtt.Client, msg mqtt.Message) { - err := h.handle(msg.Topic(), msg.Payload()) - if err != nil { - h.logger.With("topic", msg.Topic(), "error", err).Error("error handling message") - } -} - -func (h *WaterNotificationHandler) handle(topic string, payload []byte) error { - logger := h.logger.With("topic", topic) - logger.Info("received message", "message", string(payload)) - - zonePosition, waterDuration, err := parseWaterMessage(payload) - if err != nil { - return fmt.Errorf("error parsing message: %w", err) - } - - topicPrefix := strings.TrimSuffix(topic, "/data/water") - if topicPrefix == "" { - return fmt.Errorf("received message on invalid topic: %w", err) - } - logger = logger.With("topic_prefix", topicPrefix) - - garden, err := h.getGarden(topicPrefix) - if err != nil { - return fmt.Errorf("error getting garden with topic-prefix %q: %w", topicPrefix, err) - } - logger = logger.With("garden_id", garden.GetID()) - logger.Info("found garden with topic-prefix") - - if garden.GetNotificationClientID() == "" { - logger.Info("garden does not have notification client", "garden_id", garden.GetID()) - return nil - } - logger = logger.With(notificationClientIDLogField, garden.GetNotificationClientID()) - - zone, err := h.getZone(garden.GetID(), zonePosition) - if err != nil { - return fmt.Errorf("error getting zone with position %d: %w", zonePosition, err) - } - logger.Info("found zone with position", "zone_position", zonePosition, "zone_id", zone.GetID()) - - notificationClient, err := h.storageClient.NotificationClientConfigs.Get(context.Background(), garden.GetNotificationClientID()) - if err != nil { - return fmt.Errorf("error getting all notification clients: %w", err) - } - - title := fmt.Sprintf("%s finished watering", zone.Name) - message := fmt.Sprintf("watered for %s", waterDuration.String()) - - err = notificationClient.SendMessage(title, message) - if err != nil { - logger.Error("error sending message", "error", err) - return err - } - - logger.Info("successfully send notification") - - return nil -} - -func parseWaterMessage(msg []byte) (int, time.Duration, error) { - p := &parser{msg, 0} - zonePosition, err := p.readNextInt() - if err != nil { - return 0, 0, fmt.Errorf("error parsing zone position: %w", err) - } - - waterMillis, err := p.readNextInt() - if err != nil { - return 0, 0, fmt.Errorf("error parsing watering time: %w", err) - } - waterDuration := time.Duration(waterMillis) * time.Millisecond - - return zonePosition, waterDuration, nil -} - -type parser struct { - data []byte - i int -} - -func (p *parser) readNextInt() (int, error) { - reading := false - var n []byte - for ; p.i < len(p.data); p.i++ { - c := p.data[p.i] - if c == ' ' { - p.i++ - break - } - if reading { - n = append(n, c) - continue - } - if c == '=' { - reading = true - continue - } - } - - result, err := strconv.Atoi(string(n)) - if err != nil { - return 0, fmt.Errorf("invalid integer: %w", err) - } - return result, nil -} diff --git a/garden-app/worker/notification_handler_utils.go b/garden-app/worker/notification_handler_utils.go new file mode 100644 index 00000000..6228a643 --- /dev/null +++ b/garden-app/worker/notification_handler_utils.go @@ -0,0 +1,125 @@ +package worker + +import ( + "context" + "errors" + "fmt" + "log/slog" + "strconv" + "strings" + + "github.com/calvinmclean/automated-garden/garden-app/pkg" +) + +const notificationClientIDLogField = "notification_client_id" + +func (w *Worker) sendNotificationForGarden(garden *pkg.Garden, title, message string, logger *slog.Logger) error { + if garden.GetNotificationClientID() == "" { + logger.Info("garden does not have notification client", "garden_id", garden.GetID()) + return nil + } + logger = logger.With(notificationClientIDLogField, garden.GetNotificationClientID()) + + notificationClient, err := w.storageClient.NotificationClientConfigs.Get(context.Background(), garden.GetNotificationClientID()) + if err != nil { + return fmt.Errorf("error getting all notification clients: %w", err) + } + + err = notificationClient.SendMessage(title, message) + if err != nil { + logger.Error("error sending message", "error", err) + return err + } + + logger.Info("successfully send notification") + return nil +} + +func (w *Worker) getGardenForTopic(topic string) (*pkg.Garden, error) { + splitTopic := strings.SplitN(topic, "/", 2) + if len(splitTopic) != 2 { + return nil, fmt.Errorf("unexpected short topic: %q", topic) + } + + topicPrefix := splitTopic[0] + if topicPrefix == "" { + return nil, errors.New("received message on empty topic") + } + + garden, err := w.getGarden(topicPrefix) + if err != nil { + return nil, fmt.Errorf("error getting garden with topic-prefix %q: %w", topicPrefix, err) + } + return garden, nil +} + +func (w *Worker) getGarden(topicPrefix string) (*pkg.Garden, error) { + gardens, err := w.storageClient.Gardens.GetAll(context.Background(), nil) + if err != nil { + return nil, fmt.Errorf("error getting all gardens: %w", err) + } + var garden *pkg.Garden + for _, g := range gardens { + if g.TopicPrefix == topicPrefix { + garden = g + break + } + } + if garden == nil { + return nil, errors.New("no garden found") + } + + return garden, nil +} + +func (w *Worker) getZone(gardenID string, zonePosition int) (*pkg.Zone, error) { + zones, err := w.storageClient.Zones.GetAll(context.Background(), nil) + if err != nil { + return nil, fmt.Errorf("error getting all zones: %w", err) + } + var zone *pkg.Zone + for _, z := range zones { + if z.GardenID.String() == gardenID && + z.Position != nil && + *z.Position == uint(zonePosition) { + zone = z + break + } + } + if zone == nil { + return nil, errors.New("no zone found") + } + + return zone, nil +} + +type parser struct { + data []byte + i int +} + +func (p *parser) readNextInt() (int, error) { + reading := false + var n []byte + for ; p.i < len(p.data); p.i++ { + c := p.data[p.i] + if c == ' ' { + p.i++ + break + } + if reading { + n = append(n, c) + continue + } + if c == '=' { + reading = true + continue + } + } + + result, err := strconv.Atoi(string(n)) + if err != nil { + return 0, fmt.Errorf("invalid integer: %w", err) + } + return result, nil +} diff --git a/garden-app/worker/startup_notification_handler.go b/garden-app/worker/startup_notification_handler.go new file mode 100644 index 00000000..0039fbf9 --- /dev/null +++ b/garden-app/worker/startup_notification_handler.go @@ -0,0 +1,40 @@ +package worker + +import ( + "fmt" + "strings" + + mqtt "github.com/eclipse/paho.mqtt.golang" +) + +func (w *Worker) handleGardenStartupMessage(_ mqtt.Client, msg mqtt.Message) { + err := w.doGardenStartupMessage(msg.Topic(), msg.Payload()) + if err != nil { + w.logger.With("topic", msg.Topic(), "error", err).Error("error handling message") + } +} + +func (w *Worker) doGardenStartupMessage(topic string, payload []byte) error { + logger := w.logger.With("topic", topic) + + msg := parseStartupMessage(payload) + if msg != "garden-controller setup complete" { + logger.Warn("unexpected message from controller", "message", string(payload)) + return nil + } + logger.Info("received message", "message", string(payload)) + + garden, err := w.getGardenForTopic(topic) + if err != nil { + return err + } + logger = logger.With("garden_id", garden.GetID()) + logger.Info("found garden with topic-prefix") + + title := fmt.Sprintf("%s connected", garden.Name) + return w.sendNotificationForGarden(garden, title, msg, logger) +} + +func parseStartupMessage(msg []byte) string { + return strings.TrimSuffix(strings.TrimPrefix(string(msg), "logs message=\""), "\"") +} diff --git a/garden-app/worker/startup_notification_handler_test.go b/garden-app/worker/startup_notification_handler_test.go new file mode 100644 index 00000000..ac536b8c --- /dev/null +++ b/garden-app/worker/startup_notification_handler_test.go @@ -0,0 +1,13 @@ +package worker + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestParseStartupMessage(t *testing.T) { + input := "logs message=\"garden-controller setup complete\"" + msg := parseStartupMessage([]byte(input)) + require.Equal(t, "garden-controller setup complete", msg) +} diff --git a/garden-app/server/testdata/fixtures/pushover_fail.yaml b/garden-app/worker/testdata/fixtures/pushover_fail.yaml similarity index 100% rename from garden-app/server/testdata/fixtures/pushover_fail.yaml rename to garden-app/worker/testdata/fixtures/pushover_fail.yaml diff --git a/garden-app/server/testdata/fixtures/pushover_success.yaml b/garden-app/worker/testdata/fixtures/pushover_success.yaml similarity index 100% rename from garden-app/server/testdata/fixtures/pushover_success.yaml rename to garden-app/worker/testdata/fixtures/pushover_success.yaml diff --git a/garden-app/worker/water_notification_handler.go b/garden-app/worker/water_notification_handler.go new file mode 100644 index 00000000..3c93d396 --- /dev/null +++ b/garden-app/worker/water_notification_handler.go @@ -0,0 +1,64 @@ +package worker + +import ( + "fmt" + "time" + + mqtt "github.com/eclipse/paho.mqtt.golang" +) + +func (w *Worker) handleWaterCompleteMessage(_ mqtt.Client, msg mqtt.Message) { + err := w.doWaterCompleteMessage(msg.Topic(), msg.Payload()) + if err != nil { + w.logger.With("topic", msg.Topic(), "error", err).Error("error handling message") + } +} + +func (w *Worker) doWaterCompleteMessage(topic string, payload []byte) error { + logger := w.logger.With("topic", topic) + logger.Info("received message", "message", string(payload)) + + zonePosition, waterDuration, err := parseWaterMessage(payload) + if err != nil { + return fmt.Errorf("error parsing message: %w", err) + } + + garden, err := w.getGardenForTopic(topic) + if err != nil { + return err + } + logger = logger.With("garden_id", garden.GetID()) + logger.Info("found garden with topic-prefix") + + if garden.GetNotificationClientID() == "" { + logger.Info("garden does not have notification client", "garden_id", garden.GetID()) + return nil + } + logger = logger.With(notificationClientIDLogField, garden.GetNotificationClientID()) + + zone, err := w.getZone(garden.GetID(), zonePosition) + if err != nil { + return fmt.Errorf("error getting zone with position %d: %w", zonePosition, err) + } + logger.Info("found zone with position", "zone_position", zonePosition, "zone_id", zone.GetID()) + + title := fmt.Sprintf("%s finished watering", zone.Name) + message := fmt.Sprintf("watered for %s", waterDuration.String()) + return w.sendNotificationForGarden(garden, title, message, logger) +} + +func parseWaterMessage(msg []byte) (int, time.Duration, error) { + p := &parser{msg, 0} + zonePosition, err := p.readNextInt() + if err != nil { + return 0, 0, fmt.Errorf("error parsing zone position: %w", err) + } + + waterMillis, err := p.readNextInt() + if err != nil { + return 0, 0, fmt.Errorf("error parsing watering time: %w", err) + } + waterDuration := time.Duration(waterMillis) * time.Millisecond + + return zonePosition, waterDuration, nil +} diff --git a/garden-app/server/water_notification_handler_test.go b/garden-app/worker/water_notification_handler_test.go similarity index 86% rename from garden-app/server/water_notification_handler_test.go rename to garden-app/worker/water_notification_handler_test.go index 0f014d6c..3b997b6d 100644 --- a/garden-app/server/water_notification_handler_test.go +++ b/garden-app/worker/water_notification_handler_test.go @@ -1,4 +1,4 @@ -package server +package worker import ( "context" @@ -52,16 +52,16 @@ func TestHandleMessage(t *testing.T) { }) require.NoError(t, err) - handler := NewWaterNotificationHandler(storageClient, slog.Default()) + handler := NewWorker(storageClient, nil, nil, slog.Default()) t.Run("ErrorParsingMessage", func(t *testing.T) { - err = handler.handle("garden/data/water", []byte{}) + err = handler.doWaterCompleteMessage("garden/data/water", []byte{}) require.Error(t, err) require.Equal(t, `error parsing message: error parsing zone position: invalid integer: strconv.Atoi: parsing "": invalid syntax`, err.Error()) }) t.Run("ErrorGettingGarden", func(t *testing.T) { - err = handler.handle("garden/data/water", []byte("water,zone=0 millis=6000")) + err = handler.doWaterCompleteMessage("garden/data/water", []byte("water,zone=0 millis=6000")) require.Error(t, err) require.Equal(t, "error getting garden with topic-prefix \"garden\": no garden found", err.Error()) }) @@ -84,7 +84,7 @@ func TestHandleMessage(t *testing.T) { require.NoError(t, err) t.Run("SuccessfulWithNoNotificationClients", func(t *testing.T) { - err = handler.handle("garden/data/water", []byte("water,zone=0 millis=6000")) + err = handler.doWaterCompleteMessage("garden/data/water", []byte("water,zone=0 millis=6000")) require.NoError(t, err) }) @@ -109,7 +109,7 @@ func TestHandleMessage(t *testing.T) { }) t.Run("ErrorGettingZone", func(t *testing.T) { - err = handler.handle("garden/data/water", []byte("water,zone=1 millis=6000")) + err = handler.doWaterCompleteMessage("garden/data/water", []byte("water,zone=1 millis=6000")) require.Error(t, err) require.Equal(t, "error getting zone with position 1: no zone found", err.Error()) }) @@ -130,7 +130,7 @@ func TestHandleMessage(t *testing.T) { // github.com/gregdel/pushover uses http.DefaultClient http.DefaultClient = r.GetDefaultClient() - err = handler.handle("garden/data/water", []byte("water,zone=0 millis=6000")) + err = handler.doWaterCompleteMessage("garden/data/water", []byte("water,zone=0 millis=6000")) require.Error(t, err) require.Equal(t, "Errors:\napplication token is invalid, see https://pushover.net/api", err.Error()) }) @@ -162,7 +162,7 @@ func TestHandleMessage(t *testing.T) { // github.com/gregdel/pushover uses http.DefaultClient http.DefaultClient = r.GetDefaultClient() - err = handler.handle("garden/data/water", []byte("water,zone=0 millis=6000")) + err = handler.doWaterCompleteMessage("garden/data/water", []byte("water,zone=0 millis=6000")) require.NoError(t, err) // ensure a message is sent by API diff --git a/garden-app/worker/worker.go b/garden-app/worker/worker.go index fe01d1ab..57a3fe74 100644 --- a/garden-app/worker/worker.go +++ b/garden-app/worker/worker.go @@ -65,6 +65,25 @@ func NewWorker( // StartAsync starts the Worker's background jobs func (w *Worker) StartAsync() { w.scheduler.StartAsync() + + // Skip adding handler when mocked since it's not used + _, isMock := w.mqttClient.(*mqtt.MockClient) + if isMock || w.mqttClient == nil { + return + } + + w.mqttClient.AddHandler(mqtt.TopicHandler{ + Topic: "+/data/water", + Handler: w.handleWaterCompleteMessage, + }) + w.mqttClient.AddHandler(mqtt.TopicHandler{ + Topic: "+/data/logs", + Handler: w.handleGardenStartupMessage, + }) + + if err := w.mqttClient.Connect(); err != nil { + w.logger.Error("failed to connect to MQTT broker", "error", err) + } } // Stop stops the Worker's background jobs