Add derived device snapshots for summary dashboard
This commit is contained in:
@@ -24,6 +24,8 @@ type Service struct {
|
||||
config config.Config
|
||||
deviceAliases map[string]string
|
||||
aliasRecords []model.Record
|
||||
snapshots map[string]*deviceSnapshot
|
||||
dirtyDevices map[string]struct{}
|
||||
influxClient writer
|
||||
input chan model.RawMessage
|
||||
received atomic.Uint64
|
||||
@@ -41,12 +43,23 @@ type Snapshot struct {
|
||||
Failed uint64 `json:"failed"`
|
||||
}
|
||||
|
||||
type deviceSnapshot struct {
|
||||
Device string
|
||||
Alias string
|
||||
LastSeen time.Time
|
||||
StateSeen time.Time
|
||||
SensorSeen time.Time
|
||||
Fields map[string]any
|
||||
}
|
||||
|
||||
func NewService(cfg config.Config, influxClient writer) *Service {
|
||||
normalizedAliases := normalizeDeviceAliases(cfg.DeviceAliases)
|
||||
return &Service{
|
||||
config: cfg,
|
||||
deviceAliases: normalizedAliases,
|
||||
aliasRecords: buildAliasRecords(normalizedAliases),
|
||||
snapshots: make(map[string]*deviceSnapshot),
|
||||
dirtyDevices: make(map[string]struct{}),
|
||||
influxClient: influxClient,
|
||||
input: make(chan model.RawMessage, cfg.App.BufferSize),
|
||||
}
|
||||
@@ -91,7 +104,7 @@ func (service *Service) Run(ctx context.Context) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
flushCtx, cancel := context.WithTimeout(context.Background(), service.config.App.FlushTimeout.Duration)
|
||||
err := service.flush(flushCtx, batch)
|
||||
err := service.flushPending(flushCtx, batch)
|
||||
cancel()
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -108,13 +121,14 @@ func (service *Service) Run(ctx context.Context) error {
|
||||
}
|
||||
|
||||
service.applyDeviceAliases(records)
|
||||
service.updateDeviceSnapshots(records)
|
||||
|
||||
service.parsed.Add(uint64(len(records)))
|
||||
batch = append(batch, records...)
|
||||
|
||||
if len(batch) >= service.config.App.BatchSize {
|
||||
flushCtx, cancel := context.WithTimeout(ctx, service.config.App.FlushTimeout.Duration)
|
||||
err := service.flush(flushCtx, batch)
|
||||
err := service.flushPending(flushCtx, batch)
|
||||
cancel()
|
||||
if err != nil {
|
||||
slog.Error("failed to flush full batch to influx; keeping batch in memory", "count", len(batch), "error", err)
|
||||
@@ -123,12 +137,12 @@ func (service *Service) Run(ctx context.Context) error {
|
||||
batch = batch[:0]
|
||||
}
|
||||
case <-ticker.C:
|
||||
if len(batch) == 0 {
|
||||
if len(batch) == 0 && len(service.dirtyDevices) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
flushCtx, cancel := context.WithTimeout(ctx, service.config.App.FlushTimeout.Duration)
|
||||
err := service.flush(flushCtx, batch)
|
||||
err := service.flushPending(flushCtx, batch)
|
||||
cancel()
|
||||
if err != nil {
|
||||
slog.Error("failed to flush batch to influx; will retry on next interval", "count", len(batch), "error", err)
|
||||
@@ -139,6 +153,18 @@ func (service *Service) Run(ctx context.Context) error {
|
||||
}
|
||||
}
|
||||
|
||||
func (service *Service) flushPending(ctx context.Context, batch []model.Record) error {
|
||||
flushBatch := append([]model.Record(nil), batch...)
|
||||
flushBatch = append(flushBatch, service.buildSnapshotRecords()...)
|
||||
|
||||
if err := service.flush(ctx, flushBatch); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
service.clearDirtyDevices()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (service *Service) flush(ctx context.Context, batch []model.Record) error {
|
||||
if len(batch) == 0 {
|
||||
return nil
|
||||
@@ -189,6 +215,107 @@ func (service *Service) applyDeviceAliases(records []model.Record) {
|
||||
}
|
||||
}
|
||||
|
||||
func (service *Service) updateDeviceSnapshots(records []model.Record) {
|
||||
for _, record := range records {
|
||||
device := normalizeDeviceKey(record.Tags["device"])
|
||||
if device == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
snapshot, ok := service.snapshots[device]
|
||||
if !ok {
|
||||
snapshot = &deviceSnapshot{
|
||||
Device: device,
|
||||
Fields: make(map[string]any),
|
||||
}
|
||||
service.snapshots[device] = snapshot
|
||||
}
|
||||
|
||||
if record.Timestamp.After(snapshot.LastSeen) {
|
||||
snapshot.LastSeen = record.Timestamp
|
||||
}
|
||||
|
||||
if alias, ok := record.Fields["device_alias"].(string); ok && strings.TrimSpace(alias) != "" {
|
||||
snapshot.Alias = alias
|
||||
snapshot.Fields["device_alias"] = alias
|
||||
}
|
||||
|
||||
switch record.Tags["message_type"] {
|
||||
case "state":
|
||||
if record.Timestamp.After(snapshot.StateSeen) {
|
||||
snapshot.StateSeen = record.Timestamp
|
||||
copySnapshotFields(snapshot.Fields, record.Fields, "device_alias", "wifi_signal", "uptime_sec")
|
||||
}
|
||||
case "sensor":
|
||||
if record.Timestamp.After(snapshot.SensorSeen) {
|
||||
snapshot.SensorSeen = record.Timestamp
|
||||
copySnapshotFields(snapshot.Fields, record.Fields, "device_alias", "energy_power", "energy_today", "energy_total")
|
||||
}
|
||||
}
|
||||
|
||||
service.dirtyDevices[device] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
func copySnapshotFields(target map[string]any, source map[string]any, keys ...string) {
|
||||
for _, key := range keys {
|
||||
value, ok := source[key]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
target[key] = value
|
||||
}
|
||||
}
|
||||
|
||||
func (service *Service) buildSnapshotRecords() []model.Record {
|
||||
if len(service.dirtyDevices) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
devices := make([]string, 0, len(service.dirtyDevices))
|
||||
for device := range service.dirtyDevices {
|
||||
devices = append(devices, device)
|
||||
}
|
||||
sort.Strings(devices)
|
||||
|
||||
records := make([]model.Record, 0, len(devices))
|
||||
for _, device := range devices {
|
||||
snapshot := service.snapshots[device]
|
||||
if snapshot == nil || snapshot.LastSeen.IsZero() {
|
||||
continue
|
||||
}
|
||||
|
||||
fields := make(map[string]any, len(snapshot.Fields)+2)
|
||||
for key, value := range snapshot.Fields {
|
||||
fields[key] = value
|
||||
}
|
||||
if !snapshot.StateSeen.IsZero() {
|
||||
fields["state_last_seen_unix"] = snapshot.StateSeen.Unix()
|
||||
}
|
||||
if !snapshot.SensorSeen.IsZero() {
|
||||
fields["sensor_last_seen_unix"] = snapshot.SensorSeen.Unix()
|
||||
}
|
||||
|
||||
records = append(records, model.Record{
|
||||
Measurement: "tasmota_device_snapshot",
|
||||
Tags: map[string]string{
|
||||
"device": device,
|
||||
"source": "derived",
|
||||
},
|
||||
Fields: fields,
|
||||
Timestamp: snapshot.LastSeen,
|
||||
})
|
||||
}
|
||||
|
||||
return records
|
||||
}
|
||||
|
||||
func (service *Service) clearDirtyDevices() {
|
||||
for device := range service.dirtyDevices {
|
||||
delete(service.dirtyDevices, device)
|
||||
}
|
||||
}
|
||||
|
||||
func normalizeDeviceAliases(aliases map[string]string) map[string]string {
|
||||
if len(aliases) == 0 {
|
||||
return nil
|
||||
|
||||
Reference in New Issue
Block a user