diff --git a/.gitignore b/.gitignore index e7c45b3..1a4362a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,4 @@ config.json -mqqt-scrubber +/mqqt-scrubber bin/ dist/ \ No newline at end of file diff --git a/cmd/mqqt-scrubber/main.go b/cmd/mqqt-scrubber/main.go new file mode 100644 index 0000000..b051e24 --- /dev/null +++ b/cmd/mqqt-scrubber/main.go @@ -0,0 +1,86 @@ +package main + +import ( + "context" + "flag" + "log/slog" + "os" + "os/signal" + "syscall" + "time" + + "mqqt-scrubber/internal/config" + "mqqt-scrubber/internal/health" + "mqqt-scrubber/internal/influx" + "mqqt-scrubber/internal/mqtt" + "mqqt-scrubber/internal/pipeline" +) + +func main() { + configPath := flag.String("config", "", "path to config JSON file") + flag.Parse() + + cfg, err := config.Load(*configPath) + if err != nil { + slog.Error("failed to load config", "error", err) + os.Exit(1) + } + + logger := newLogger(cfg.App.LogLevel) + slog.SetDefault(logger) + + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer stop() + + influxClient := influx.NewClient(cfg.Influx) + service := pipeline.NewService(cfg, influxClient) + subscriber := mqtt.NewSubscriber(cfg.MQTT, service.Enqueue) + healthServer := health.NewServer(cfg.App.HealthAddress, func() any { return service.Snapshot() }) + + if err := subscriber.Start(); err != nil { + slog.Error("failed to start mqtt subscriber", "error", err) + os.Exit(1) + } + defer subscriber.Stop() + healthServer.Start() + defer func() { + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := healthServer.Shutdown(shutdownCtx); err != nil { + slog.Warn("health server shutdown failed", "error", err) + } + }() + + slog.Info("service started", + "mqtt_broker", cfg.MQTT.Broker, + "topics", cfg.MQTT.Topics, + "influx_url", cfg.Influx.URL, + "database", cfg.Influx.Database, + "health_address", cfg.App.HealthAddress, + ) + + if err := service.Run(ctx); err != nil { + slog.Error("service stopped with error", "error", err) + os.Exit(1) + } + + slog.Info("service stopped") +} + +func newLogger(level string) *slog.Logger { + var slogLevel slog.Level + + switch level { + case "debug": + slogLevel = slog.LevelDebug + case "warn": + slogLevel = slog.LevelWarn + case "error": + slogLevel = slog.LevelError + default: + slogLevel = slog.LevelInfo + } + + handler := slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slogLevel}) + return slog.New(handler) +}