Files
mqqt-scrubber/internal/config/config.go
T
2026-03-14 22:42:27 +01:00

279 lines
7.0 KiB
Go

package config
import (
"encoding/json"
"errors"
"fmt"
"os"
"regexp"
"strconv"
"strings"
"time"
)
const envPrefix = "MQTT_SCRUBBER_"
var invalidDeviceCharacters = regexp.MustCompile(`[^a-z0-9_]+`)
type Config struct {
MQTT MQTTConfig `json:"mqtt"`
Influx InfluxConfig `json:"influx"`
App AppConfig `json:"app"`
DeviceAliases map[string]string `json:"device_aliases"`
}
type MQTTConfig struct {
Broker string `json:"broker"`
Username string `json:"username"`
Password string `json:"password"`
ClientID string `json:"client_id"`
Topics []string `json:"topics"`
QoS byte `json:"qos"`
}
type InfluxConfig struct {
URL string `json:"url"`
Database string `json:"database"`
Token string `json:"token"`
Precision string `json:"precision"`
}
type AppConfig struct {
BatchSize int `json:"batch_size"`
BufferSize int `json:"buffer_size"`
FlushInterval DurationValue `json:"flush_interval"`
FlushTimeout DurationValue `json:"flush_timeout"`
LogLevel string `json:"log_level"`
HealthAddress string `json:"health_address"`
}
type DurationValue struct {
time.Duration
}
func (value *DurationValue) UnmarshalJSON(data []byte) error {
var raw string
if err := json.Unmarshal(data, &raw); err != nil {
return fmt.Errorf("duration must be a string: %w", err)
}
parsed, err := time.ParseDuration(raw)
if err != nil {
return fmt.Errorf("invalid duration %q: %w", raw, err)
}
value.Duration = parsed
return nil
}
func Load(path string) (Config, error) {
cfg := defaultConfig()
if path != "" {
contents, err := os.ReadFile(path)
if err != nil {
return Config{}, fmt.Errorf("read config file: %w", err)
}
if err := json.Unmarshal(contents, &cfg); err != nil {
return Config{}, fmt.Errorf("parse config file: %w", err)
}
}
if err := applyEnvOverrides(&cfg); err != nil {
return Config{}, err
}
cfg.DeviceAliases = normalizeDeviceAliases(cfg.DeviceAliases)
if err := cfg.Validate(); err != nil {
return Config{}, err
}
return cfg, nil
}
func (cfg Config) Validate() error {
if cfg.MQTT.Broker == "" {
return errors.New("mqtt broker is required")
}
if cfg.MQTT.ClientID == "" {
return errors.New("mqtt client_id is required")
}
if len(cfg.MQTT.Topics) == 0 {
return errors.New("at least one mqtt topic is required")
}
if cfg.Influx.URL == "" {
return errors.New("influx url is required")
}
if cfg.Influx.Database == "" {
return errors.New("influx database is required")
}
if cfg.Influx.Precision == "" {
return errors.New("influx precision is required")
}
if cfg.App.BatchSize <= 0 {
return errors.New("app batch_size must be greater than zero")
}
if cfg.App.BufferSize <= 0 {
return errors.New("app buffer_size must be greater than zero")
}
if cfg.App.FlushInterval.Duration <= 0 {
return errors.New("app flush_interval must be greater than zero")
}
if cfg.App.FlushTimeout.Duration <= 0 {
return errors.New("app flush_timeout must be greater than zero")
}
return nil
}
func defaultConfig() Config {
return Config{
MQTT: MQTTConfig{
Broker: "tcp://127.0.0.1:1883",
ClientID: "mqqt-scrubber",
Topics: []string{
"tele/+/SENSOR",
"tele/+/STATE",
},
QoS: 0,
},
Influx: InfluxConfig{
URL: "http://127.0.0.1:8181",
Database: "home",
Precision: "ns",
},
App: AppConfig{
BatchSize: 200,
BufferSize: 1000,
FlushInterval: DurationValue{Duration: 10 * time.Second},
FlushTimeout: DurationValue{Duration: 10 * time.Second},
LogLevel: "info",
HealthAddress: ":8080",
},
}
}
func applyEnvOverrides(cfg *Config) error {
setString(&cfg.MQTT.Broker, envPrefix+"MQTT_BROKER")
setString(&cfg.MQTT.Username, envPrefix+"MQTT_USERNAME")
setString(&cfg.MQTT.Password, envPrefix+"MQTT_PASSWORD")
setString(&cfg.MQTT.ClientID, envPrefix+"MQTT_CLIENT_ID")
setString(&cfg.Influx.URL, envPrefix+"INFLUX_URL")
setString(&cfg.Influx.Database, envPrefix+"INFLUX_DATABASE")
setString(&cfg.Influx.Token, envPrefix+"INFLUX_TOKEN")
setString(&cfg.Influx.Precision, envPrefix+"INFLUX_PRECISION")
setString(&cfg.App.LogLevel, envPrefix+"APP_LOG_LEVEL")
setString(&cfg.App.HealthAddress, envPrefix+"APP_HEALTH_ADDRESS")
if raw, ok := os.LookupEnv(envPrefix + "DEVICE_ALIASES"); ok {
if strings.TrimSpace(raw) == "" {
cfg.DeviceAliases = nil
} else {
aliases := make(map[string]string)
if err := json.Unmarshal([]byte(raw), &aliases); err != nil {
return fmt.Errorf("parse %sDEVICE_ALIASES: %w", envPrefix, err)
}
cfg.DeviceAliases = aliases
}
}
if raw, ok := os.LookupEnv(envPrefix + "MQTT_TOPICS"); ok {
cfg.MQTT.Topics = splitAndTrim(raw)
}
if raw, ok := os.LookupEnv(envPrefix + "MQTT_QOS"); ok {
parsed, err := strconv.Atoi(raw)
if err != nil {
return fmt.Errorf("parse %sMQTT_QOS: %w", envPrefix, err)
}
cfg.MQTT.QoS = byte(parsed)
}
if raw, ok := os.LookupEnv(envPrefix + "APP_BATCH_SIZE"); ok {
parsed, err := strconv.Atoi(raw)
if err != nil {
return fmt.Errorf("parse %sAPP_BATCH_SIZE: %w", envPrefix, err)
}
cfg.App.BatchSize = parsed
}
if raw, ok := os.LookupEnv(envPrefix + "APP_BUFFER_SIZE"); ok {
parsed, err := strconv.Atoi(raw)
if err != nil {
return fmt.Errorf("parse %sAPP_BUFFER_SIZE: %w", envPrefix, err)
}
cfg.App.BufferSize = parsed
}
if raw, ok := os.LookupEnv(envPrefix + "APP_FLUSH_INTERVAL"); ok {
parsed, err := time.ParseDuration(raw)
if err != nil {
return fmt.Errorf("parse %sAPP_FLUSH_INTERVAL: %w", envPrefix, err)
}
cfg.App.FlushInterval = DurationValue{Duration: parsed}
}
if raw, ok := os.LookupEnv(envPrefix + "APP_FLUSH_TIMEOUT"); ok {
parsed, err := time.ParseDuration(raw)
if err != nil {
return fmt.Errorf("parse %sAPP_FLUSH_TIMEOUT: %w", envPrefix, err)
}
cfg.App.FlushTimeout = DurationValue{Duration: parsed}
}
return nil
}
func setString(target *string, key string) {
if value, ok := os.LookupEnv(key); ok {
*target = value
}
}
func splitAndTrim(value string) []string {
parts := strings.Split(value, ",")
result := make([]string, 0, len(parts))
for _, part := range parts {
trimmed := strings.TrimSpace(part)
if trimmed != "" {
result = append(result, trimmed)
}
}
return result
}
func normalizeDeviceAliases(aliases map[string]string) map[string]string {
if len(aliases) == 0 {
return nil
}
normalized := make(map[string]string, len(aliases))
for device, alias := range aliases {
cleanDevice := normalizeDeviceKey(device)
cleanAlias := strings.TrimSpace(alias)
if cleanDevice == "" || cleanAlias == "" {
continue
}
normalized[cleanDevice] = cleanAlias
}
if len(normalized) == 0 {
return nil
}
return normalized
}
func normalizeDeviceKey(value string) string {
normalized := strings.ToLower(strings.TrimSpace(value))
normalized = strings.ReplaceAll(normalized, "-", "_")
normalized = strings.ReplaceAll(normalized, " ", "_")
normalized = invalidDeviceCharacters.ReplaceAllString(normalized, "_")
normalized = strings.Trim(normalized, "_")
return normalized
}