package pipeline import ( "context" "sync" "testing" "time" "mqqt-scrubber/internal/config" "mqqt-scrubber/internal/model" ) type fakeWriter struct { mu sync.Mutex batches [][]model.Record flushed chan struct{} } func newFakeWriter() *fakeWriter { return &fakeWriter{flushed: make(chan struct{}, 1)} } func (writer *fakeWriter) Write(_ context.Context, records []model.Record) error { writer.mu.Lock() copyBatch := append([]model.Record(nil), records...) writer.batches = append(writer.batches, copyBatch) writer.mu.Unlock() select { case writer.flushed <- struct{}{}: default: } return nil } func (writer *fakeWriter) firstBatch() []model.Record { writer.mu.Lock() defer writer.mu.Unlock() if len(writer.batches) == 0 { return nil } return writer.batches[0] } func TestServiceFlushesParsedRecords(t *testing.T) { fake := newFakeWriter() service := NewService(config.Config{ DeviceAliases: map[string]string{ "tasmota-896001": "Garage Plug", "Tasmota C88994": "Office Plug", }, App: config.AppConfig{ BatchSize: 2, BufferSize: 8, FlushInterval: config.DurationValue{Duration: time.Hour}, FlushTimeout: config.DurationValue{Duration: time.Second}, }, }, fake) ctx, cancel := context.WithCancel(context.Background()) defer cancel() errCh := make(chan error, 1) go func() { errCh <- service.Run(ctx) }() service.Enqueue(model.RawMessage{ Topic: "tele/tasmota_896001/LWT", Payload: []byte("Online"), ReceivedAt: time.Date(2026, time.March, 12, 15, 21, 39, 0, time.UTC), }) service.Enqueue(model.RawMessage{ Topic: "tele/tasmota_C88994/SENSOR", Payload: []byte(`{"Time":"2026-03-12T16:23:13","ENERGY":{"TotalStartTime":"2026-02-04T19:13:40","Total":41.385,"Yesterday":1.124,"Today":0.799,"Period":0,"Power":1,"ApparentPower":4,"ReactivePower":4,"Factor":0.22,"Voltage":231,"Current":0.016}}`), ReceivedAt: time.Date(2026, time.March, 12, 16, 23, 13, 0, time.UTC), }) select { case <-fake.flushed: case <-time.After(2 * time.Second): t.Fatal("timed out waiting for pipeline flush") } batch := fake.firstBatch() if len(batch) != 2 { t.Fatalf("expected 2 records in flushed batch, got %d", len(batch)) } if batch[0].Measurement != "tasmota_lwt" { t.Fatalf("unexpected first measurement: %s", batch[0].Measurement) } if batch[0].Fields["online"] != true { t.Fatalf("unexpected lwt online field: %#v", batch[0].Fields["online"]) } if batch[0].Fields["device_alias"] != "Garage Plug" { t.Fatalf("unexpected lwt device_alias field: %#v", batch[0].Fields["device_alias"]) } if batch[1].Measurement != "tasmota_sensor" { t.Fatalf("unexpected second measurement: %s", batch[1].Measurement) } if batch[1].Fields["energy_total"] != float64(41.385) { t.Fatalf("unexpected sensor energy_total field: %#v", batch[1].Fields["energy_total"]) } if batch[1].Fields["device_alias"] != "Office Plug" { t.Fatalf("unexpected sensor device_alias field: %#v", batch[1].Fields["device_alias"]) } cancel() select { case err := <-errCh: if err != nil { t.Fatalf("service returned error: %v", err) } case <-time.After(2 * time.Second): t.Fatal("timed out waiting for service shutdown") } }