Track cmd entrypoint
This commit is contained in:
+1
-1
@@ -1,4 +1,4 @@
|
|||||||
config.json
|
config.json
|
||||||
mqqt-scrubber
|
/mqqt-scrubber
|
||||||
bin/
|
bin/
|
||||||
dist/
|
dist/
|
||||||
@@ -0,0 +1,86 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"flag"
|
||||||
|
"log/slog"
|
||||||
|
"os"
|
||||||
|
"os/signal"
|
||||||
|
"syscall"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"mqqt-scrubber/internal/config"
|
||||||
|
"mqqt-scrubber/internal/health"
|
||||||
|
"mqqt-scrubber/internal/influx"
|
||||||
|
"mqqt-scrubber/internal/mqtt"
|
||||||
|
"mqqt-scrubber/internal/pipeline"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
configPath := flag.String("config", "", "path to config JSON file")
|
||||||
|
flag.Parse()
|
||||||
|
|
||||||
|
cfg, err := config.Load(*configPath)
|
||||||
|
if err != nil {
|
||||||
|
slog.Error("failed to load config", "error", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
logger := newLogger(cfg.App.LogLevel)
|
||||||
|
slog.SetDefault(logger)
|
||||||
|
|
||||||
|
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
|
||||||
|
defer stop()
|
||||||
|
|
||||||
|
influxClient := influx.NewClient(cfg.Influx)
|
||||||
|
service := pipeline.NewService(cfg, influxClient)
|
||||||
|
subscriber := mqtt.NewSubscriber(cfg.MQTT, service.Enqueue)
|
||||||
|
healthServer := health.NewServer(cfg.App.HealthAddress, func() any { return service.Snapshot() })
|
||||||
|
|
||||||
|
if err := subscriber.Start(); err != nil {
|
||||||
|
slog.Error("failed to start mqtt subscriber", "error", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
defer subscriber.Stop()
|
||||||
|
healthServer.Start()
|
||||||
|
defer func() {
|
||||||
|
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
if err := healthServer.Shutdown(shutdownCtx); err != nil {
|
||||||
|
slog.Warn("health server shutdown failed", "error", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
slog.Info("service started",
|
||||||
|
"mqtt_broker", cfg.MQTT.Broker,
|
||||||
|
"topics", cfg.MQTT.Topics,
|
||||||
|
"influx_url", cfg.Influx.URL,
|
||||||
|
"database", cfg.Influx.Database,
|
||||||
|
"health_address", cfg.App.HealthAddress,
|
||||||
|
)
|
||||||
|
|
||||||
|
if err := service.Run(ctx); err != nil {
|
||||||
|
slog.Error("service stopped with error", "error", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
slog.Info("service stopped")
|
||||||
|
}
|
||||||
|
|
||||||
|
func newLogger(level string) *slog.Logger {
|
||||||
|
var slogLevel slog.Level
|
||||||
|
|
||||||
|
switch level {
|
||||||
|
case "debug":
|
||||||
|
slogLevel = slog.LevelDebug
|
||||||
|
case "warn":
|
||||||
|
slogLevel = slog.LevelWarn
|
||||||
|
case "error":
|
||||||
|
slogLevel = slog.LevelError
|
||||||
|
default:
|
||||||
|
slogLevel = slog.LevelInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
handler := slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slogLevel})
|
||||||
|
return slog.New(handler)
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user