381 lines
9.7 KiB
Go
381 lines
9.7 KiB
Go
package pipeline
|
|
|
|
import (
|
|
"context"
|
|
"log/slog"
|
|
"regexp"
|
|
"sort"
|
|
"strings"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"mqqt-scrubber/internal/config"
|
|
"mqqt-scrubber/internal/model"
|
|
"mqqt-scrubber/internal/parser"
|
|
)
|
|
|
|
var invalidDeviceAliasCharacters = regexp.MustCompile(`[^a-z0-9_]+`)
|
|
|
|
type writer interface {
|
|
Write(ctx context.Context, records []model.Record) error
|
|
}
|
|
|
|
type Service struct {
|
|
config config.Config
|
|
tasmotaTimeLocation *time.Location
|
|
deviceAliases map[string]string
|
|
aliasRecords []model.Record
|
|
snapshots map[string]*deviceSnapshot
|
|
dirtyDevices map[string]struct{}
|
|
influxClient writer
|
|
input chan model.RawMessage
|
|
received atomic.Uint64
|
|
parsed atomic.Uint64
|
|
written atomic.Uint64
|
|
dropped atomic.Uint64
|
|
failed atomic.Uint64
|
|
}
|
|
|
|
type Snapshot struct {
|
|
Received uint64 `json:"received"`
|
|
Parsed uint64 `json:"parsed"`
|
|
Written uint64 `json:"written"`
|
|
Dropped uint64 `json:"dropped"`
|
|
Failed uint64 `json:"failed"`
|
|
}
|
|
|
|
type deviceSnapshot struct {
|
|
Device string
|
|
Alias string
|
|
LastSeen time.Time
|
|
StateSeen time.Time
|
|
SensorSeen time.Time
|
|
Fields map[string]any
|
|
}
|
|
|
|
func NewService(cfg config.Config, influxClient writer) *Service {
|
|
normalizedAliases := normalizeDeviceAliases(cfg.DeviceAliases)
|
|
return &Service{
|
|
config: cfg,
|
|
tasmotaTimeLocation: cfg.App.TasmotaLocation(),
|
|
deviceAliases: normalizedAliases,
|
|
aliasRecords: buildAliasRecords(normalizedAliases),
|
|
snapshots: make(map[string]*deviceSnapshot),
|
|
dirtyDevices: make(map[string]struct{}),
|
|
influxClient: influxClient,
|
|
input: make(chan model.RawMessage, cfg.App.BufferSize),
|
|
}
|
|
}
|
|
|
|
func (service *Service) Enqueue(message model.RawMessage) {
|
|
service.received.Add(1)
|
|
|
|
select {
|
|
case service.input <- message:
|
|
default:
|
|
service.dropped.Add(1)
|
|
slog.Warn("dropping message because buffer is full", "topic", message.Topic)
|
|
}
|
|
}
|
|
|
|
func (service *Service) Run(ctx context.Context) error {
|
|
ticker := time.NewTicker(service.config.App.FlushInterval.Duration)
|
|
defer ticker.Stop()
|
|
|
|
batch := append(make([]model.Record, 0, service.config.App.BatchSize+len(service.aliasRecords)), service.aliasRecords...)
|
|
if len(batch) > 0 {
|
|
flushCtx, cancel := context.WithTimeout(ctx, service.config.App.FlushTimeout.Duration)
|
|
err := service.flush(flushCtx, batch)
|
|
cancel()
|
|
if err != nil {
|
|
slog.Error("failed to flush alias metadata to influx; will retry on next interval", "count", len(batch), "error", err)
|
|
} else {
|
|
batch = batch[:0]
|
|
}
|
|
}
|
|
|
|
var input <-chan model.RawMessage = service.input
|
|
|
|
for {
|
|
if len(batch) >= service.config.App.BatchSize {
|
|
input = nil
|
|
} else {
|
|
input = service.input
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
flushCtx, cancel := context.WithTimeout(context.Background(), service.config.App.FlushTimeout.Duration)
|
|
err := service.flushPending(flushCtx, batch)
|
|
cancel()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
service.logCounters()
|
|
return nil
|
|
case message := <-input:
|
|
records, err := parser.ParseTasmotaInLocation(message, service.tasmotaTimeLocation)
|
|
if err != nil {
|
|
service.failed.Add(1)
|
|
slog.Warn("failed to parse message", "topic", message.Topic, "error", err)
|
|
continue
|
|
}
|
|
|
|
service.applyDeviceAliases(records)
|
|
service.updateDeviceSnapshots(records)
|
|
|
|
service.parsed.Add(uint64(len(records)))
|
|
batch = append(batch, records...)
|
|
|
|
if len(batch) >= service.config.App.BatchSize {
|
|
flushCtx, cancel := context.WithTimeout(ctx, service.config.App.FlushTimeout.Duration)
|
|
err := service.flushPending(flushCtx, batch)
|
|
cancel()
|
|
if err != nil {
|
|
slog.Error("failed to flush full batch to influx; keeping batch in memory", "count", len(batch), "error", err)
|
|
continue
|
|
}
|
|
batch = batch[:0]
|
|
}
|
|
case <-ticker.C:
|
|
if len(batch) == 0 && len(service.dirtyDevices) == 0 {
|
|
continue
|
|
}
|
|
|
|
flushCtx, cancel := context.WithTimeout(ctx, service.config.App.FlushTimeout.Duration)
|
|
err := service.flushPending(flushCtx, batch)
|
|
cancel()
|
|
if err != nil {
|
|
slog.Error("failed to flush batch to influx; will retry on next interval", "count", len(batch), "error", err)
|
|
continue
|
|
}
|
|
batch = batch[:0]
|
|
}
|
|
}
|
|
}
|
|
|
|
func (service *Service) flushPending(ctx context.Context, batch []model.Record) error {
|
|
flushBatch := append([]model.Record(nil), batch...)
|
|
flushBatch = append(flushBatch, service.buildSnapshotRecords()...)
|
|
|
|
if err := service.flush(ctx, flushBatch); err != nil {
|
|
return err
|
|
}
|
|
|
|
service.clearDirtyDevices()
|
|
return nil
|
|
}
|
|
|
|
func (service *Service) flush(ctx context.Context, batch []model.Record) error {
|
|
if len(batch) == 0 {
|
|
return nil
|
|
}
|
|
|
|
if err := service.influxClient.Write(ctx, batch); err != nil {
|
|
service.failed.Add(uint64(len(batch)))
|
|
return err
|
|
}
|
|
|
|
service.written.Add(uint64(len(batch)))
|
|
slog.Info("flushed records to influx", "count", len(batch))
|
|
return nil
|
|
}
|
|
|
|
func (service *Service) logCounters() {
|
|
slog.Info("service counters",
|
|
"received", service.received.Load(),
|
|
"parsed", service.parsed.Load(),
|
|
"written", service.written.Load(),
|
|
"dropped", service.dropped.Load(),
|
|
"failed", service.failed.Load(),
|
|
)
|
|
}
|
|
|
|
func (service *Service) Snapshot() Snapshot {
|
|
return Snapshot{
|
|
Received: service.received.Load(),
|
|
Parsed: service.parsed.Load(),
|
|
Written: service.written.Load(),
|
|
Dropped: service.dropped.Load(),
|
|
Failed: service.failed.Load(),
|
|
}
|
|
}
|
|
|
|
func (service *Service) applyDeviceAliases(records []model.Record) {
|
|
if len(service.deviceAliases) == 0 {
|
|
return
|
|
}
|
|
|
|
for index := range records {
|
|
device := normalizeDeviceKey(records[index].Tags["device"])
|
|
alias, ok := service.deviceAliases[device]
|
|
if !ok || alias == "" {
|
|
continue
|
|
}
|
|
records[index].Fields["device_alias"] = alias
|
|
}
|
|
}
|
|
|
|
func (service *Service) updateDeviceSnapshots(records []model.Record) {
|
|
for _, record := range records {
|
|
device := normalizeDeviceKey(record.Tags["device"])
|
|
if device == "" {
|
|
continue
|
|
}
|
|
|
|
snapshot, ok := service.snapshots[device]
|
|
if !ok {
|
|
snapshot = &deviceSnapshot{
|
|
Device: device,
|
|
Fields: make(map[string]any),
|
|
}
|
|
service.snapshots[device] = snapshot
|
|
}
|
|
|
|
if record.Timestamp.After(snapshot.LastSeen) {
|
|
snapshot.LastSeen = record.Timestamp
|
|
}
|
|
|
|
if alias, ok := record.Fields["device_alias"].(string); ok && strings.TrimSpace(alias) != "" {
|
|
snapshot.Alias = alias
|
|
snapshot.Fields["device_alias"] = alias
|
|
}
|
|
|
|
switch record.Tags["message_type"] {
|
|
case "state":
|
|
if record.Timestamp.After(snapshot.StateSeen) {
|
|
snapshot.StateSeen = record.Timestamp
|
|
copySnapshotFields(snapshot.Fields, record.Fields, "device_alias", "wifi_signal", "uptime_sec")
|
|
}
|
|
case "sensor":
|
|
if record.Timestamp.After(snapshot.SensorSeen) {
|
|
snapshot.SensorSeen = record.Timestamp
|
|
copySnapshotFields(snapshot.Fields, record.Fields, "device_alias", "energy_power", "energy_today", "energy_total")
|
|
}
|
|
}
|
|
|
|
service.dirtyDevices[device] = struct{}{}
|
|
}
|
|
}
|
|
|
|
func copySnapshotFields(target map[string]any, source map[string]any, keys ...string) {
|
|
for _, key := range keys {
|
|
value, ok := source[key]
|
|
if !ok {
|
|
continue
|
|
}
|
|
target[key] = value
|
|
}
|
|
}
|
|
|
|
func (service *Service) buildSnapshotRecords() []model.Record {
|
|
if len(service.dirtyDevices) == 0 {
|
|
return nil
|
|
}
|
|
|
|
devices := make([]string, 0, len(service.dirtyDevices))
|
|
for device := range service.dirtyDevices {
|
|
devices = append(devices, device)
|
|
}
|
|
sort.Strings(devices)
|
|
|
|
records := make([]model.Record, 0, len(devices))
|
|
for _, device := range devices {
|
|
snapshot := service.snapshots[device]
|
|
if snapshot == nil || snapshot.LastSeen.IsZero() {
|
|
continue
|
|
}
|
|
|
|
fields := make(map[string]any, len(snapshot.Fields)+2)
|
|
for key, value := range snapshot.Fields {
|
|
fields[key] = value
|
|
}
|
|
if !snapshot.StateSeen.IsZero() {
|
|
fields["state_last_seen_unix"] = snapshot.StateSeen.Unix()
|
|
}
|
|
if !snapshot.SensorSeen.IsZero() {
|
|
fields["sensor_last_seen_unix"] = snapshot.SensorSeen.Unix()
|
|
}
|
|
|
|
records = append(records, model.Record{
|
|
Measurement: "tasmota_device_snapshot",
|
|
Tags: map[string]string{
|
|
"device": device,
|
|
"source": "derived",
|
|
},
|
|
Fields: fields,
|
|
Timestamp: snapshot.LastSeen,
|
|
})
|
|
}
|
|
|
|
return records
|
|
}
|
|
|
|
func (service *Service) clearDirtyDevices() {
|
|
for device := range service.dirtyDevices {
|
|
delete(service.dirtyDevices, device)
|
|
}
|
|
}
|
|
|
|
func normalizeDeviceAliases(aliases map[string]string) map[string]string {
|
|
if len(aliases) == 0 {
|
|
return nil
|
|
}
|
|
|
|
normalized := make(map[string]string, len(aliases))
|
|
for device, alias := range aliases {
|
|
cleanDevice := normalizeDeviceKey(device)
|
|
cleanAlias := strings.TrimSpace(alias)
|
|
if cleanDevice == "" || cleanAlias == "" {
|
|
continue
|
|
}
|
|
normalized[cleanDevice] = cleanAlias
|
|
}
|
|
|
|
if len(normalized) == 0 {
|
|
return nil
|
|
}
|
|
|
|
return normalized
|
|
}
|
|
|
|
func normalizeDeviceKey(value string) string {
|
|
normalized := strings.ToLower(strings.TrimSpace(value))
|
|
normalized = strings.ReplaceAll(normalized, "-", "_")
|
|
normalized = strings.ReplaceAll(normalized, " ", "_")
|
|
normalized = invalidDeviceAliasCharacters.ReplaceAllString(normalized, "_")
|
|
normalized = strings.Trim(normalized, "_")
|
|
return normalized
|
|
}
|
|
|
|
func buildAliasRecords(aliases map[string]string) []model.Record {
|
|
if len(aliases) == 0 {
|
|
return nil
|
|
}
|
|
|
|
devices := make([]string, 0, len(aliases))
|
|
for device := range aliases {
|
|
devices = append(devices, device)
|
|
}
|
|
sort.Strings(devices)
|
|
|
|
timestamp := time.Now().UTC()
|
|
records := make([]model.Record, 0, len(devices))
|
|
for _, device := range devices {
|
|
records = append(records, model.Record{
|
|
Measurement: "tasmota_device_meta",
|
|
Tags: map[string]string{
|
|
"device": device,
|
|
"source": "config",
|
|
},
|
|
Fields: map[string]any{
|
|
"device_alias": aliases[device],
|
|
},
|
|
Timestamp: timestamp,
|
|
})
|
|
}
|
|
|
|
return records
|
|
}
|