Files
mqqt-scrubber/internal/pipeline/service_test.go
T

155 lines
4.1 KiB
Go

package pipeline
import (
"context"
"sync"
"testing"
"time"
"mqqt-scrubber/internal/config"
"mqqt-scrubber/internal/model"
)
type fakeWriter struct {
mu sync.Mutex
batches [][]model.Record
flushed chan struct{}
}
func newFakeWriter() *fakeWriter {
return &fakeWriter{flushed: make(chan struct{}, 1)}
}
func (writer *fakeWriter) Write(_ context.Context, records []model.Record) error {
writer.mu.Lock()
copyBatch := append([]model.Record(nil), records...)
writer.batches = append(writer.batches, copyBatch)
writer.mu.Unlock()
select {
case writer.flushed <- struct{}{}:
default:
}
return nil
}
func (writer *fakeWriter) firstBatch() []model.Record {
writer.mu.Lock()
defer writer.mu.Unlock()
if len(writer.batches) == 0 {
return nil
}
return writer.batches[0]
}
func (writer *fakeWriter) allBatches() [][]model.Record {
writer.mu.Lock()
defer writer.mu.Unlock()
copyBatches := make([][]model.Record, 0, len(writer.batches))
for _, batch := range writer.batches {
copyBatches = append(copyBatches, append([]model.Record(nil), batch...))
}
return copyBatches
}
func TestServiceFlushesParsedRecords(t *testing.T) {
fake := newFakeWriter()
service := NewService(config.Config{
DeviceAliases: map[string]string{
"tasmota-896001": "Garage Plug",
"Tasmota C88994": "Office Plug",
},
App: config.AppConfig{
BatchSize: 2,
BufferSize: 8,
FlushInterval: config.DurationValue{Duration: time.Hour},
FlushTimeout: config.DurationValue{Duration: time.Second},
},
}, fake)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
errCh := make(chan error, 1)
go func() {
errCh <- service.Run(ctx)
}()
select {
case <-fake.flushed:
case <-time.After(2 * time.Second):
t.Fatal("timed out waiting for alias metadata flush")
}
service.Enqueue(model.RawMessage{
Topic: "tele/tasmota_896001/LWT",
Payload: []byte("Online"),
ReceivedAt: time.Date(2026, time.March, 12, 15, 21, 39, 0, time.UTC),
})
service.Enqueue(model.RawMessage{
Topic: "tele/tasmota_C88994/SENSOR",
Payload: []byte(`{"Time":"2026-03-12T16:23:13","ENERGY":{"TotalStartTime":"2026-02-04T19:13:40","Total":41.385,"Yesterday":1.124,"Today":0.799,"Period":0,"Power":1,"ApparentPower":4,"ReactivePower":4,"Factor":0.22,"Voltage":231,"Current":0.016}}`),
ReceivedAt: time.Date(2026, time.March, 12, 16, 23, 13, 0, time.UTC),
})
select {
case <-fake.flushed:
case <-time.After(2 * time.Second):
t.Fatal("timed out waiting for pipeline flush")
}
batches := fake.allBatches()
if len(batches) != 2 {
t.Fatalf("expected 2 flushed batches, got %d", len(batches))
}
metaBatch := batches[0]
if len(metaBatch) != 2 {
t.Fatalf("expected 2 alias metadata records, got %d", len(metaBatch))
}
if metaBatch[0].Measurement != "tasmota_device_meta" {
t.Fatalf("unexpected metadata measurement: %s", metaBatch[0].Measurement)
}
if metaBatch[0].Fields["device_alias"] == "" {
t.Fatalf("expected metadata batch to include device_alias, got %#v", metaBatch[0].Fields["device_alias"])
}
batch := batches[1]
if len(batch) != 2 {
t.Fatalf("expected 2 records in flushed batch, got %d", len(batch))
}
if batch[0].Measurement != "tasmota_lwt" {
t.Fatalf("unexpected first measurement: %s", batch[0].Measurement)
}
if batch[0].Fields["online"] != true {
t.Fatalf("unexpected lwt online field: %#v", batch[0].Fields["online"])
}
if batch[0].Fields["device_alias"] != "Garage Plug" {
t.Fatalf("unexpected lwt device_alias field: %#v", batch[0].Fields["device_alias"])
}
if batch[1].Measurement != "tasmota_sensor" {
t.Fatalf("unexpected second measurement: %s", batch[1].Measurement)
}
if batch[1].Fields["energy_total"] != float64(41.385) {
t.Fatalf("unexpected sensor energy_total field: %#v", batch[1].Fields["energy_total"])
}
if batch[1].Fields["device_alias"] != "Office Plug" {
t.Fatalf("unexpected sensor device_alias field: %#v", batch[1].Fields["device_alias"])
}
cancel()
select {
case err := <-errCh:
if err != nil {
t.Fatalf("service returned error: %v", err)
}
case <-time.After(2 * time.Second):
t.Fatal("timed out waiting for service shutdown")
}
}