From 28c2aec45493873534d4e695bd007d541599580c Mon Sep 17 00:00:00 2001 From: Calvin McLean Date: Mon, 26 Aug 2024 21:47:37 -0700 Subject: [PATCH 1/2] Move MQTT handler for water notification to Worker --- garden-app/pkg/mqtt/mock_Client.go | 31 +++++++++++++++- garden-app/pkg/mqtt/mqtt.go | 34 +++++++++++++---- garden-app/server/api.go | 5 +-- garden-app/server/notification_clients.go | 3 +- .../testdata/fixtures/pushover_fail.yaml | 0 .../testdata/fixtures/pushover_success.yaml | 0 .../water_notification_handler.go | 37 +++++++------------ .../water_notification_handler_test.go | 16 ++++---- garden-app/worker/worker.go | 9 +++++ 9 files changed, 89 insertions(+), 46 deletions(-) rename garden-app/{server => worker}/testdata/fixtures/pushover_fail.yaml (100%) rename garden-app/{server => worker}/testdata/fixtures/pushover_success.yaml (100%) rename garden-app/{server => worker}/water_notification_handler.go (74%) rename garden-app/{server => worker}/water_notification_handler_test.go (86%) diff --git a/garden-app/pkg/mqtt/mock_Client.go b/garden-app/pkg/mqtt/mock_Client.go index 77669ef0..2feca33e 100644 --- a/garden-app/pkg/mqtt/mock_Client.go +++ b/garden-app/pkg/mqtt/mock_Client.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.23.4. DO NOT EDIT. +// Code generated by mockery v2.45.0. DO NOT EDIT. package mqtt @@ -9,10 +9,19 @@ type MockClient struct { mock.Mock } +// AddHandler provides a mock function with given fields: _a0 +func (_m *MockClient) AddHandler(_a0 TopicHandler) { + _m.Called(_a0) +} + // Connect provides a mock function with given fields: func (_m *MockClient) Connect() error { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for Connect") + } + var r0 error if rf, ok := ret.Get(0).(func() error); ok { r0 = rf() @@ -32,6 +41,10 @@ func (_m *MockClient) Disconnect(_a0 uint) { func (_m *MockClient) LightTopic(_a0 string) (string, error) { ret := _m.Called(_a0) + if len(ret) == 0 { + panic("no return value specified for LightTopic") + } + var r0 string var r1 error if rf, ok := ret.Get(0).(func(string) (string, error)); ok { @@ -56,6 +69,10 @@ func (_m *MockClient) LightTopic(_a0 string) (string, error) { func (_m *MockClient) Publish(_a0 string, _a1 []byte) error { ret := _m.Called(_a0, _a1) + if len(ret) == 0 { + panic("no return value specified for Publish") + } + var r0 error if rf, ok := ret.Get(0).(func(string, []byte) error); ok { r0 = rf(_a0, _a1) @@ -70,6 +87,10 @@ func (_m *MockClient) Publish(_a0 string, _a1 []byte) error { func (_m *MockClient) StopAllTopic(_a0 string) (string, error) { ret := _m.Called(_a0) + if len(ret) == 0 { + panic("no return value specified for StopAllTopic") + } + var r0 string var r1 error if rf, ok := ret.Get(0).(func(string) (string, error)); ok { @@ -94,6 +115,10 @@ func (_m *MockClient) StopAllTopic(_a0 string) (string, error) { func (_m *MockClient) StopTopic(_a0 string) (string, error) { ret := _m.Called(_a0) + if len(ret) == 0 { + panic("no return value specified for StopTopic") + } + var r0 string var r1 error if rf, ok := ret.Get(0).(func(string) (string, error)); ok { @@ -118,6 +143,10 @@ func (_m *MockClient) StopTopic(_a0 string) (string, error) { func (_m *MockClient) WaterTopic(_a0 string) (string, error) { ret := _m.Called(_a0) + if len(ret) == 0 { + panic("no return value specified for WaterTopic") + } + var r0 string var r1 error if rf, ok := ret.Get(0).(func(string) (string, error)); ok { diff --git a/garden-app/pkg/mqtt/mqtt.go b/garden-app/pkg/mqtt/mqtt.go index 1f1e52c2..184fbd93 100644 --- a/garden-app/pkg/mqtt/mqtt.go +++ b/garden-app/pkg/mqtt/mqtt.go @@ -1,5 +1,7 @@ package mqtt +//go:generate mockery --all --inpackage + import ( "bytes" "errors" @@ -12,6 +14,8 @@ import ( "github.com/prometheus/client_golang/prometheus" ) +const QOS = byte(1) + var mqttClientSummary = prometheus.NewSummaryVec(prometheus.SummaryOpts{ Namespace: "garden_app", Name: "mqtt_client_duration_seconds", @@ -39,12 +43,16 @@ type Client interface { LightTopic(string) (string, error) Connect() error Disconnect(uint) + AddHandler(TopicHandler) } // client is a wrapper struct for connecting our config and MQTT Client. It implements the Client interface type client struct { mu sync.Mutex mqtt.Client + + handlers []TopicHandler + Config } @@ -58,17 +66,21 @@ type TopicHandler struct { // using the supplied functions to handle incoming messages. It really should be used with only one function, // but I wanted to make it an optional argument, which required using the variadic function argument func NewClient(config Config, defaultHandler mqtt.MessageHandler, handlers ...TopicHandler) (Client, error) { + client := &client{ + Config: config, + handlers: handlers, + } + opts := mqtt.NewClientOptions().AddBroker(fmt.Sprintf("tcp://%s:%d", config.Broker, config.Port)) opts.ClientID = config.ClientID opts.AutoReconnect = true opts.CleanSession = false - if len(handlers) > 0 { - opts.OnConnect = func(c mqtt.Client) { - for _, handler := range handlers { - if token := c.Subscribe(handler.Topic, byte(1), handler.Handler); token.Wait() && token.Error() != nil { - // TODO: can I return an error instead of panicking (recover maybe?) - panic(token.Error()) - } + opts.OnConnect = func(c mqtt.Client) { + for _, handler := range client.handlers { + token := c.Subscribe(handler.Topic, QOS, handler.Handler) + if token.Wait() && token.Error() != nil { + // TODO: can I return an error instead of panicking (recover maybe?) + panic(token.Error()) } } } @@ -79,7 +91,13 @@ func NewClient(config Config, defaultHandler mqtt.MessageHandler, handlers ...To return nil, err } - return &client{Client: mqtt.NewClient(opts), Config: config}, nil + client.Client = mqtt.NewClient(opts) + + return client, nil +} + +func (c *client) AddHandler(handler TopicHandler) { + c.handlers = append(c.handlers, handler) } // Connect uses the MQTT Client's Connect function but returns the error instead of Token diff --git a/garden-app/server/api.go b/garden-app/server/api.go index 61f9f9a4..68e2be65 100644 --- a/garden-app/server/api.go +++ b/garden-app/server/api.go @@ -117,10 +117,7 @@ func (api *API) Setup(cfg Config, validateData bool) error { "broker", cfg.MQTTConfig.Broker, "port", cfg.MQTTConfig.Port, ).Info("initializing MQTT client") - mqttClient, err := mqtt.NewClient(cfg.MQTTConfig, mqtt.DefaultHandler(logger), mqtt.TopicHandler{ - Topic: "+/data/water", - Handler: NewWaterNotificationHandler(storageClient, logger).HandleMessage, - }) + mqttClient, err := mqtt.NewClient(cfg.MQTTConfig, mqtt.DefaultHandler(logger)) if err != nil { return fmt.Errorf("unable to initialize MQTT client: %v", err) } diff --git a/garden-app/server/notification_clients.go b/garden-app/server/notification_clients.go index 08b758f2..6abf227f 100644 --- a/garden-app/server/notification_clients.go +++ b/garden-app/server/notification_clients.go @@ -12,8 +12,7 @@ import ( ) const ( - notificationClientsBasePath = "/notification_clients" - notificationClientIDLogField = "notification_client_id" + notificationClientsBasePath = "/notification_clients" ) // NotificationClientsAPI encapsulates the structs and dependencies necessary for the NotificationClients API diff --git a/garden-app/server/testdata/fixtures/pushover_fail.yaml b/garden-app/worker/testdata/fixtures/pushover_fail.yaml similarity index 100% rename from garden-app/server/testdata/fixtures/pushover_fail.yaml rename to garden-app/worker/testdata/fixtures/pushover_fail.yaml diff --git a/garden-app/server/testdata/fixtures/pushover_success.yaml b/garden-app/worker/testdata/fixtures/pushover_success.yaml similarity index 100% rename from garden-app/server/testdata/fixtures/pushover_success.yaml rename to garden-app/worker/testdata/fixtures/pushover_success.yaml diff --git a/garden-app/server/water_notification_handler.go b/garden-app/worker/water_notification_handler.go similarity index 74% rename from garden-app/server/water_notification_handler.go rename to garden-app/worker/water_notification_handler.go index 9af26e57..77fae719 100644 --- a/garden-app/server/water_notification_handler.go +++ b/garden-app/worker/water_notification_handler.go @@ -1,30 +1,21 @@ -package server +package worker import ( "context" "errors" "fmt" - "log/slog" "strconv" "strings" "time" "github.com/calvinmclean/automated-garden/garden-app/pkg" - "github.com/calvinmclean/automated-garden/garden-app/pkg/storage" mqtt "github.com/eclipse/paho.mqtt.golang" ) -type WaterNotificationHandler struct { - storageClient *storage.Client - logger *slog.Logger -} - -func NewWaterNotificationHandler(storageClient *storage.Client, logger *slog.Logger) *WaterNotificationHandler { - return &WaterNotificationHandler{storageClient, logger} -} +const notificationClientIDLogField = "notification_client_id" -func (h *WaterNotificationHandler) getGarden(topicPrefix string) (*pkg.Garden, error) { - gardens, err := h.storageClient.Gardens.GetAll(context.Background(), nil) +func (w *Worker) getGarden(topicPrefix string) (*pkg.Garden, error) { + gardens, err := w.storageClient.Gardens.GetAll(context.Background(), nil) if err != nil { return nil, fmt.Errorf("error getting all gardens: %w", err) } @@ -42,8 +33,8 @@ func (h *WaterNotificationHandler) getGarden(topicPrefix string) (*pkg.Garden, e return garden, nil } -func (h *WaterNotificationHandler) getZone(gardenID string, zonePosition int) (*pkg.Zone, error) { - zones, err := h.storageClient.Zones.GetAll(context.Background(), nil) +func (w *Worker) getZone(gardenID string, zonePosition int) (*pkg.Zone, error) { + zones, err := w.storageClient.Zones.GetAll(context.Background(), nil) if err != nil { return nil, fmt.Errorf("error getting all zones: %w", err) } @@ -63,15 +54,15 @@ func (h *WaterNotificationHandler) getZone(gardenID string, zonePosition int) (* return zone, nil } -func (h *WaterNotificationHandler) HandleMessage(_ mqtt.Client, msg mqtt.Message) { - err := h.handle(msg.Topic(), msg.Payload()) +func (w *Worker) handleWaterCompleteMessage(_ mqtt.Client, msg mqtt.Message) { + err := w.doWaterCompleteMessage(msg.Topic(), msg.Payload()) if err != nil { - h.logger.With("topic", msg.Topic(), "error", err).Error("error handling message") + w.logger.With("topic", msg.Topic(), "error", err).Error("error handling message") } } -func (h *WaterNotificationHandler) handle(topic string, payload []byte) error { - logger := h.logger.With("topic", topic) +func (w *Worker) doWaterCompleteMessage(topic string, payload []byte) error { + logger := w.logger.With("topic", topic) logger.Info("received message", "message", string(payload)) zonePosition, waterDuration, err := parseWaterMessage(payload) @@ -85,7 +76,7 @@ func (h *WaterNotificationHandler) handle(topic string, payload []byte) error { } logger = logger.With("topic_prefix", topicPrefix) - garden, err := h.getGarden(topicPrefix) + garden, err := w.getGarden(topicPrefix) if err != nil { return fmt.Errorf("error getting garden with topic-prefix %q: %w", topicPrefix, err) } @@ -98,13 +89,13 @@ func (h *WaterNotificationHandler) handle(topic string, payload []byte) error { } logger = logger.With(notificationClientIDLogField, garden.GetNotificationClientID()) - zone, err := h.getZone(garden.GetID(), zonePosition) + zone, err := w.getZone(garden.GetID(), zonePosition) if err != nil { return fmt.Errorf("error getting zone with position %d: %w", zonePosition, err) } logger.Info("found zone with position", "zone_position", zonePosition, "zone_id", zone.GetID()) - notificationClient, err := h.storageClient.NotificationClientConfigs.Get(context.Background(), garden.GetNotificationClientID()) + notificationClient, err := w.storageClient.NotificationClientConfigs.Get(context.Background(), garden.GetNotificationClientID()) if err != nil { return fmt.Errorf("error getting all notification clients: %w", err) } diff --git a/garden-app/server/water_notification_handler_test.go b/garden-app/worker/water_notification_handler_test.go similarity index 86% rename from garden-app/server/water_notification_handler_test.go rename to garden-app/worker/water_notification_handler_test.go index 0f014d6c..3b997b6d 100644 --- a/garden-app/server/water_notification_handler_test.go +++ b/garden-app/worker/water_notification_handler_test.go @@ -1,4 +1,4 @@ -package server +package worker import ( "context" @@ -52,16 +52,16 @@ func TestHandleMessage(t *testing.T) { }) require.NoError(t, err) - handler := NewWaterNotificationHandler(storageClient, slog.Default()) + handler := NewWorker(storageClient, nil, nil, slog.Default()) t.Run("ErrorParsingMessage", func(t *testing.T) { - err = handler.handle("garden/data/water", []byte{}) + err = handler.doWaterCompleteMessage("garden/data/water", []byte{}) require.Error(t, err) require.Equal(t, `error parsing message: error parsing zone position: invalid integer: strconv.Atoi: parsing "": invalid syntax`, err.Error()) }) t.Run("ErrorGettingGarden", func(t *testing.T) { - err = handler.handle("garden/data/water", []byte("water,zone=0 millis=6000")) + err = handler.doWaterCompleteMessage("garden/data/water", []byte("water,zone=0 millis=6000")) require.Error(t, err) require.Equal(t, "error getting garden with topic-prefix \"garden\": no garden found", err.Error()) }) @@ -84,7 +84,7 @@ func TestHandleMessage(t *testing.T) { require.NoError(t, err) t.Run("SuccessfulWithNoNotificationClients", func(t *testing.T) { - err = handler.handle("garden/data/water", []byte("water,zone=0 millis=6000")) + err = handler.doWaterCompleteMessage("garden/data/water", []byte("water,zone=0 millis=6000")) require.NoError(t, err) }) @@ -109,7 +109,7 @@ func TestHandleMessage(t *testing.T) { }) t.Run("ErrorGettingZone", func(t *testing.T) { - err = handler.handle("garden/data/water", []byte("water,zone=1 millis=6000")) + err = handler.doWaterCompleteMessage("garden/data/water", []byte("water,zone=1 millis=6000")) require.Error(t, err) require.Equal(t, "error getting zone with position 1: no zone found", err.Error()) }) @@ -130,7 +130,7 @@ func TestHandleMessage(t *testing.T) { // github.com/gregdel/pushover uses http.DefaultClient http.DefaultClient = r.GetDefaultClient() - err = handler.handle("garden/data/water", []byte("water,zone=0 millis=6000")) + err = handler.doWaterCompleteMessage("garden/data/water", []byte("water,zone=0 millis=6000")) require.Error(t, err) require.Equal(t, "Errors:\napplication token is invalid, see https://pushover.net/api", err.Error()) }) @@ -162,7 +162,7 @@ func TestHandleMessage(t *testing.T) { // github.com/gregdel/pushover uses http.DefaultClient http.DefaultClient = r.GetDefaultClient() - err = handler.handle("garden/data/water", []byte("water,zone=0 millis=6000")) + err = handler.doWaterCompleteMessage("garden/data/water", []byte("water,zone=0 millis=6000")) require.NoError(t, err) // ensure a message is sent by API diff --git a/garden-app/worker/worker.go b/garden-app/worker/worker.go index fe01d1ab..75ca9000 100644 --- a/garden-app/worker/worker.go +++ b/garden-app/worker/worker.go @@ -65,6 +65,15 @@ func NewWorker( // StartAsync starts the Worker's background jobs func (w *Worker) StartAsync() { w.scheduler.StartAsync() + + // Skip adding handler when mocked since it's not used + _, isMock := w.mqttClient.(*mqtt.MockClient) + if !isMock && w.mqttClient != nil { + w.mqttClient.AddHandler(mqtt.TopicHandler{ + Topic: "+/data/water", + Handler: w.handleWaterCompleteMessage, + }) + } } // Stop stops the Worker's background jobs From 0f41e82f358c3399877cc7e9a6e5d725986d0ab0 Mon Sep 17 00:00:00 2001 From: Calvin McLean Date: Mon, 26 Aug 2024 22:20:05 -0700 Subject: [PATCH 2/2] Add handler to notify when controller starts up - This will allow detecting if the controller unexpectedly reboots which would cause interruption to lighting or watering --- garden-app/controller/controller.go | 13 ++ garden-app/integration_tests/main_test.go | 56 +++++++- .../worker/notification_handler_utils.go | 125 ++++++++++++++++++ .../worker/startup_notification_handler.go | 40 ++++++ .../startup_notification_handler_test.go | 13 ++ .../worker/water_notification_handler.go | 104 +-------------- garden-app/worker/worker.go | 20 ++- 7 files changed, 260 insertions(+), 111 deletions(-) create mode 100644 garden-app/worker/notification_handler_utils.go create mode 100644 garden-app/worker/startup_notification_handler.go create mode 100644 garden-app/worker/startup_notification_handler_test.go diff --git a/garden-app/controller/controller.go b/garden-app/controller/controller.go index 6259093b..7822b535 100644 --- a/garden-app/controller/controller.go +++ b/garden-app/controller/controller.go @@ -297,6 +297,19 @@ func (c *Controller) publishTemperatureHumidityData() { } } +// PublishStartupLog publishes the message that controllers use to signal that they started up +func (c *Controller) PublishStartupLog(topicPrefix string) error { + topic := fmt.Sprintf("%s/data/logs", topicPrefix) + msg := "logs message=\"garden-controller setup complete\"" + + err := c.mqttClient.Publish(topic, []byte(msg)) + if err != nil { + return fmt.Errorf("error publishing startup log %w", err) + } + + return nil +} + // addNoise will take a base value and introduce some += variance based on the provided percentage range. This will // produce sensor data that is relatively consistent but not totally flat func addNoise(baseValue float64, percentRange float64) float64 { diff --git a/garden-app/integration_tests/main_test.go b/garden-app/integration_tests/main_test.go index 0193f24b..1f770b2c 100644 --- a/garden-app/integration_tests/main_test.go +++ b/garden-app/integration_tests/main_test.go @@ -14,6 +14,8 @@ import ( "github.com/calvinmclean/automated-garden/garden-app/controller" "github.com/calvinmclean/automated-garden/garden-app/pkg" "github.com/calvinmclean/automated-garden/garden-app/pkg/action" + "github.com/calvinmclean/automated-garden/garden-app/pkg/notifications" + fake_notification "github.com/calvinmclean/automated-garden/garden-app/pkg/notifications/fake" "github.com/calvinmclean/automated-garden/garden-app/pkg/weather" "github.com/calvinmclean/automated-garden/garden-app/pkg/weather/fake" "github.com/calvinmclean/automated-garden/garden-app/server" @@ -59,6 +61,7 @@ func TestIntegration(t *testing.T) { t.Run("Garden", GardenTests) t.Run("Zone", ZoneTests) t.Run("WaterSchedule", WaterScheduleTests) + t.Run("ControllerStartupNotification", ControllerStartupNotificationTest) } func getConfigs(t *testing.T) (server.Config, controller.Config) { @@ -409,8 +412,8 @@ func ZoneTests(t *testing.T) { }) } -func floatPointer(n float32) *float32 { - return &n +func pointer[T any](v T) *T { + return &v } func WaterScheduleTests(t *testing.T) { @@ -443,9 +446,9 @@ func WaterScheduleTests(t *testing.T) { status, err = makeRequest(http.MethodPatch, "/water_schedules/"+waterScheduleID, pkg.WaterSchedule{ WeatherControl: &weather.Control{ Rain: &weather.ScaleControl{ - BaselineValue: floatPointer(0), - Factor: floatPointer(0), - Range: floatPointer(25.4), + BaselineValue: pointer[float32](0), + Factor: pointer[float32](0), + Range: pointer[float32](25.4), ClientID: weatherClientWithRain, }, }, @@ -468,6 +471,49 @@ func WaterScheduleTests(t *testing.T) { }) } +func ControllerStartupNotificationTest(t *testing.T) { + var g server.GardenResponse + t.Run("CreateGarden", func(t *testing.T) { + status, err := makeRequest(http.MethodPost, "/gardens", `{ + "name": "Notification", + "topic_prefix": "notification", + "max_zones": 3 + }`, &g) + assert.NoError(t, err) + assert.Equal(t, http.StatusCreated, status) + }) + + var nc notifications.Client + t.Run("CreateNotificationClient", func(t *testing.T) { + status, err := makeRequest(http.MethodPost, "/notification_clients", `{ + "name": "fake client", + "type": "fake", + "options": {} + }`, &nc) + assert.NoError(t, err) + assert.Equal(t, http.StatusCreated, status) + }) + + t.Run("EnableNotificationsForGarden", func(t *testing.T) { + status, err := makeRequest(http.MethodPatch, "/gardens/"+g.GetID(), pkg.Garden{ + NotificationClientID: pointer(nc.GetID()), + }, &g) + assert.NoError(t, err) + assert.Equal(t, http.StatusOK, status) + }) + + t.Run("PublishStartupLogAndCheckNotification", func(t *testing.T) { + err := c.PublishStartupLog(g.TopicPrefix) + require.NoError(t, err) + + time.Sleep(500 * time.Millisecond) + + lastMsg := fake_notification.LastMessage() + require.Equal(t, "Notification connected", lastMsg.Title) + require.Equal(t, "garden-controller setup complete", lastMsg.Message) + }) +} + func makeRequest(method, path string, body, response interface{}) (int, error) { // TODO: Use babyapi Client var reqBody io.Reader diff --git a/garden-app/worker/notification_handler_utils.go b/garden-app/worker/notification_handler_utils.go new file mode 100644 index 00000000..6228a643 --- /dev/null +++ b/garden-app/worker/notification_handler_utils.go @@ -0,0 +1,125 @@ +package worker + +import ( + "context" + "errors" + "fmt" + "log/slog" + "strconv" + "strings" + + "github.com/calvinmclean/automated-garden/garden-app/pkg" +) + +const notificationClientIDLogField = "notification_client_id" + +func (w *Worker) sendNotificationForGarden(garden *pkg.Garden, title, message string, logger *slog.Logger) error { + if garden.GetNotificationClientID() == "" { + logger.Info("garden does not have notification client", "garden_id", garden.GetID()) + return nil + } + logger = logger.With(notificationClientIDLogField, garden.GetNotificationClientID()) + + notificationClient, err := w.storageClient.NotificationClientConfigs.Get(context.Background(), garden.GetNotificationClientID()) + if err != nil { + return fmt.Errorf("error getting all notification clients: %w", err) + } + + err = notificationClient.SendMessage(title, message) + if err != nil { + logger.Error("error sending message", "error", err) + return err + } + + logger.Info("successfully send notification") + return nil +} + +func (w *Worker) getGardenForTopic(topic string) (*pkg.Garden, error) { + splitTopic := strings.SplitN(topic, "/", 2) + if len(splitTopic) != 2 { + return nil, fmt.Errorf("unexpected short topic: %q", topic) + } + + topicPrefix := splitTopic[0] + if topicPrefix == "" { + return nil, errors.New("received message on empty topic") + } + + garden, err := w.getGarden(topicPrefix) + if err != nil { + return nil, fmt.Errorf("error getting garden with topic-prefix %q: %w", topicPrefix, err) + } + return garden, nil +} + +func (w *Worker) getGarden(topicPrefix string) (*pkg.Garden, error) { + gardens, err := w.storageClient.Gardens.GetAll(context.Background(), nil) + if err != nil { + return nil, fmt.Errorf("error getting all gardens: %w", err) + } + var garden *pkg.Garden + for _, g := range gardens { + if g.TopicPrefix == topicPrefix { + garden = g + break + } + } + if garden == nil { + return nil, errors.New("no garden found") + } + + return garden, nil +} + +func (w *Worker) getZone(gardenID string, zonePosition int) (*pkg.Zone, error) { + zones, err := w.storageClient.Zones.GetAll(context.Background(), nil) + if err != nil { + return nil, fmt.Errorf("error getting all zones: %w", err) + } + var zone *pkg.Zone + for _, z := range zones { + if z.GardenID.String() == gardenID && + z.Position != nil && + *z.Position == uint(zonePosition) { + zone = z + break + } + } + if zone == nil { + return nil, errors.New("no zone found") + } + + return zone, nil +} + +type parser struct { + data []byte + i int +} + +func (p *parser) readNextInt() (int, error) { + reading := false + var n []byte + for ; p.i < len(p.data); p.i++ { + c := p.data[p.i] + if c == ' ' { + p.i++ + break + } + if reading { + n = append(n, c) + continue + } + if c == '=' { + reading = true + continue + } + } + + result, err := strconv.Atoi(string(n)) + if err != nil { + return 0, fmt.Errorf("invalid integer: %w", err) + } + return result, nil +} diff --git a/garden-app/worker/startup_notification_handler.go b/garden-app/worker/startup_notification_handler.go new file mode 100644 index 00000000..0039fbf9 --- /dev/null +++ b/garden-app/worker/startup_notification_handler.go @@ -0,0 +1,40 @@ +package worker + +import ( + "fmt" + "strings" + + mqtt "github.com/eclipse/paho.mqtt.golang" +) + +func (w *Worker) handleGardenStartupMessage(_ mqtt.Client, msg mqtt.Message) { + err := w.doGardenStartupMessage(msg.Topic(), msg.Payload()) + if err != nil { + w.logger.With("topic", msg.Topic(), "error", err).Error("error handling message") + } +} + +func (w *Worker) doGardenStartupMessage(topic string, payload []byte) error { + logger := w.logger.With("topic", topic) + + msg := parseStartupMessage(payload) + if msg != "garden-controller setup complete" { + logger.Warn("unexpected message from controller", "message", string(payload)) + return nil + } + logger.Info("received message", "message", string(payload)) + + garden, err := w.getGardenForTopic(topic) + if err != nil { + return err + } + logger = logger.With("garden_id", garden.GetID()) + logger.Info("found garden with topic-prefix") + + title := fmt.Sprintf("%s connected", garden.Name) + return w.sendNotificationForGarden(garden, title, msg, logger) +} + +func parseStartupMessage(msg []byte) string { + return strings.TrimSuffix(strings.TrimPrefix(string(msg), "logs message=\""), "\"") +} diff --git a/garden-app/worker/startup_notification_handler_test.go b/garden-app/worker/startup_notification_handler_test.go new file mode 100644 index 00000000..ac536b8c --- /dev/null +++ b/garden-app/worker/startup_notification_handler_test.go @@ -0,0 +1,13 @@ +package worker + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestParseStartupMessage(t *testing.T) { + input := "logs message=\"garden-controller setup complete\"" + msg := parseStartupMessage([]byte(input)) + require.Equal(t, "garden-controller setup complete", msg) +} diff --git a/garden-app/worker/water_notification_handler.go b/garden-app/worker/water_notification_handler.go index 77fae719..3c93d396 100644 --- a/garden-app/worker/water_notification_handler.go +++ b/garden-app/worker/water_notification_handler.go @@ -1,59 +1,12 @@ package worker import ( - "context" - "errors" "fmt" - "strconv" - "strings" "time" - "github.com/calvinmclean/automated-garden/garden-app/pkg" mqtt "github.com/eclipse/paho.mqtt.golang" ) -const notificationClientIDLogField = "notification_client_id" - -func (w *Worker) getGarden(topicPrefix string) (*pkg.Garden, error) { - gardens, err := w.storageClient.Gardens.GetAll(context.Background(), nil) - if err != nil { - return nil, fmt.Errorf("error getting all gardens: %w", err) - } - var garden *pkg.Garden - for _, g := range gardens { - if g.TopicPrefix == topicPrefix { - garden = g - break - } - } - if garden == nil { - return nil, errors.New("no garden found") - } - - return garden, nil -} - -func (w *Worker) getZone(gardenID string, zonePosition int) (*pkg.Zone, error) { - zones, err := w.storageClient.Zones.GetAll(context.Background(), nil) - if err != nil { - return nil, fmt.Errorf("error getting all zones: %w", err) - } - var zone *pkg.Zone - for _, z := range zones { - if z.GardenID.String() == gardenID && - z.Position != nil && - *z.Position == uint(zonePosition) { - zone = z - break - } - } - if zone == nil { - return nil, errors.New("no zone found") - } - - return zone, nil -} - func (w *Worker) handleWaterCompleteMessage(_ mqtt.Client, msg mqtt.Message) { err := w.doWaterCompleteMessage(msg.Topic(), msg.Payload()) if err != nil { @@ -70,15 +23,9 @@ func (w *Worker) doWaterCompleteMessage(topic string, payload []byte) error { return fmt.Errorf("error parsing message: %w", err) } - topicPrefix := strings.TrimSuffix(topic, "/data/water") - if topicPrefix == "" { - return fmt.Errorf("received message on invalid topic: %w", err) - } - logger = logger.With("topic_prefix", topicPrefix) - - garden, err := w.getGarden(topicPrefix) + garden, err := w.getGardenForTopic(topic) if err != nil { - return fmt.Errorf("error getting garden with topic-prefix %q: %w", topicPrefix, err) + return err } logger = logger.With("garden_id", garden.GetID()) logger.Info("found garden with topic-prefix") @@ -95,23 +42,9 @@ func (w *Worker) doWaterCompleteMessage(topic string, payload []byte) error { } logger.Info("found zone with position", "zone_position", zonePosition, "zone_id", zone.GetID()) - notificationClient, err := w.storageClient.NotificationClientConfigs.Get(context.Background(), garden.GetNotificationClientID()) - if err != nil { - return fmt.Errorf("error getting all notification clients: %w", err) - } - title := fmt.Sprintf("%s finished watering", zone.Name) message := fmt.Sprintf("watered for %s", waterDuration.String()) - - err = notificationClient.SendMessage(title, message) - if err != nil { - logger.Error("error sending message", "error", err) - return err - } - - logger.Info("successfully send notification") - - return nil + return w.sendNotificationForGarden(garden, title, message, logger) } func parseWaterMessage(msg []byte) (int, time.Duration, error) { @@ -129,34 +62,3 @@ func parseWaterMessage(msg []byte) (int, time.Duration, error) { return zonePosition, waterDuration, nil } - -type parser struct { - data []byte - i int -} - -func (p *parser) readNextInt() (int, error) { - reading := false - var n []byte - for ; p.i < len(p.data); p.i++ { - c := p.data[p.i] - if c == ' ' { - p.i++ - break - } - if reading { - n = append(n, c) - continue - } - if c == '=' { - reading = true - continue - } - } - - result, err := strconv.Atoi(string(n)) - if err != nil { - return 0, fmt.Errorf("invalid integer: %w", err) - } - return result, nil -} diff --git a/garden-app/worker/worker.go b/garden-app/worker/worker.go index 75ca9000..57a3fe74 100644 --- a/garden-app/worker/worker.go +++ b/garden-app/worker/worker.go @@ -68,11 +68,21 @@ func (w *Worker) StartAsync() { // Skip adding handler when mocked since it's not used _, isMock := w.mqttClient.(*mqtt.MockClient) - if !isMock && w.mqttClient != nil { - w.mqttClient.AddHandler(mqtt.TopicHandler{ - Topic: "+/data/water", - Handler: w.handleWaterCompleteMessage, - }) + if isMock || w.mqttClient == nil { + return + } + + w.mqttClient.AddHandler(mqtt.TopicHandler{ + Topic: "+/data/water", + Handler: w.handleWaterCompleteMessage, + }) + w.mqttClient.AddHandler(mqtt.TopicHandler{ + Topic: "+/data/logs", + Handler: w.handleGardenStartupMessage, + }) + + if err := w.mqttClient.Connect(); err != nil { + w.logger.Error("failed to connect to MQTT broker", "error", err) } }