package parser import ( "encoding/json" "fmt" "regexp" "sort" "strings" "time" "mqqt-scrubber/internal/model" ) var invalidNameCharacters = regexp.MustCompile(`[^a-z0-9_]+`) var acronymBoundary = regexp.MustCompile(`([A-Z]+)([A-Z][a-z])`) var camelBoundary = regexp.MustCompile(`([a-z0-9])([A-Z])`) var tasmotaTimeLayouts = []string{ time.RFC3339Nano, time.RFC3339, "2006-01-02T15:04:05", } func ParseTasmota(message model.RawMessage) ([]model.Record, error) { parts := strings.Split(message.Topic, "/") if len(parts) != 3 { return nil, fmt.Errorf("unsupported topic shape: %s", message.Topic) } if parts[0] != "tele" { return nil, fmt.Errorf("unsupported topic root: %s", message.Topic) } measurement := "tasmota_" + sanitizeName(parts[2]) tags := map[string]string{ "device": sanitizeDeviceName(parts[1]), "message_type": sanitizeName(parts[2]), "source": "tasmota", } if strings.EqualFold(parts[2], "LWT") { return []model.Record{parseLWT(message, measurement, tags)}, nil } var payload map[string]any if err := json.Unmarshal(message.Payload, &payload); err != nil { return nil, fmt.Errorf("invalid json payload: %w", err) } fields := flattenPayload(payload, nil) delete(fields, "time") if len(fields) == 0 { return nil, fmt.Errorf("no usable fields in payload for topic %s", message.Topic) } timestamp := parsePayloadTimestamp(payload, message.ReceivedAt) record := model.Record{ Measurement: measurement, Tags: tags, Fields: fields, Timestamp: timestamp, } return []model.Record{record}, nil } func parseLWT(message model.RawMessage, measurement string, tags map[string]string) model.Record { state := strings.TrimSpace(string(message.Payload)) return model.Record{ Measurement: measurement, Tags: tags, Fields: map[string]any{ "state": state, "online": strings.EqualFold(state, "Online"), }, Timestamp: message.ReceivedAt, } } func parsePayloadTimestamp(payload map[string]any, fallback time.Time) time.Time { rawTime, ok := payload["Time"].(string) if !ok || strings.TrimSpace(rawTime) == "" { return fallback } for _, layout := range tasmotaTimeLayouts { var ( parsed time.Time err error ) if layout == "2006-01-02T15:04:05" { parsed, err = time.ParseInLocation(layout, rawTime, time.UTC) } else { parsed, err = time.Parse(layout, rawTime) } if err == nil { return parsed } } return fallback } func flattenPayload(payload map[string]any, prefix []string) map[string]any { result := make(map[string]any) keys := make([]string, 0, len(payload)) for key := range payload { keys = append(keys, key) } sort.Strings(keys) for _, key := range keys { value := payload[key] nameParts := append(prefix, sanitizeName(key)) switch typed := value.(type) { case map[string]any: nested := flattenPayload(typed, nameParts) for nestedKey, nestedValue := range nested { result[nestedKey] = nestedValue } case float64, bool, string: result[strings.Join(nameParts, "_")] = typed } } return result } func sanitizeName(value string) string { normalized := strings.TrimSpace(value) normalized = strings.ReplaceAll(normalized, "-", "_") normalized = strings.ReplaceAll(normalized, " ", "_") normalized = acronymBoundary.ReplaceAllString(normalized, `${1}_${2}`) normalized = camelBoundary.ReplaceAllString(normalized, `${1}_${2}`) normalized = strings.ToLower(normalized) normalized = invalidNameCharacters.ReplaceAllString(normalized, "_") normalized = strings.Trim(normalized, "_") if normalized == "" { return "unknown" } return normalized } func sanitizeDeviceName(value string) string { normalized := strings.ToLower(strings.TrimSpace(value)) normalized = strings.ReplaceAll(normalized, "-", "_") normalized = strings.ReplaceAll(normalized, " ", "_") normalized = invalidNameCharacters.ReplaceAllString(normalized, "_") normalized = strings.Trim(normalized, "_") if normalized == "" { return "unknown" } return normalized }