185 lines
4.2 KiB
Go
185 lines
4.2 KiB
Go
package influx
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"net/url"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"mqqt-scrubber/internal/config"
|
|
"mqqt-scrubber/internal/model"
|
|
)
|
|
|
|
type Client struct {
|
|
baseURL string
|
|
database string
|
|
token string
|
|
precision string
|
|
httpClient *http.Client
|
|
}
|
|
|
|
func NewClient(cfg config.InfluxConfig) *Client {
|
|
return &Client{
|
|
baseURL: strings.TrimRight(cfg.URL, "/"),
|
|
database: cfg.Database,
|
|
token: cfg.Token,
|
|
precision: cfg.Precision,
|
|
httpClient: &http.Client{
|
|
Timeout: 30 * time.Second,
|
|
},
|
|
}
|
|
}
|
|
|
|
func (client *Client) Write(ctx context.Context, records []model.Record) error {
|
|
if len(records) == 0 {
|
|
return nil
|
|
}
|
|
|
|
lines := make([]string, 0, len(records))
|
|
for _, record := range records {
|
|
line, err := toLineProtocol(record)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
lines = append(lines, line)
|
|
}
|
|
|
|
writeURL, err := url.Parse(client.baseURL)
|
|
if err != nil {
|
|
return fmt.Errorf("parse influx url: %w", err)
|
|
}
|
|
|
|
writeURL.Path = strings.TrimRight(writeURL.Path, "/") + "/api/v3/write_lp"
|
|
query := writeURL.Query()
|
|
query.Set("db", client.database)
|
|
query.Set("precision", client.precision)
|
|
writeURL.RawQuery = query.Encode()
|
|
|
|
body := strings.Join(lines, "\n")
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodPost, writeURL.String(), bytes.NewBufferString(body))
|
|
if err != nil {
|
|
return fmt.Errorf("create write request: %w", err)
|
|
}
|
|
|
|
req.Header.Set("Content-Type", "text/plain; charset=utf-8")
|
|
if client.token != "" {
|
|
req.Header.Set("Authorization", "Bearer "+client.token)
|
|
}
|
|
|
|
resp, err := client.httpClient.Do(req)
|
|
if err != nil {
|
|
return fmt.Errorf("execute write request: %w", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode >= http.StatusBadRequest {
|
|
responseBody, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
|
|
return fmt.Errorf("influx write failed with status %d: %s", resp.StatusCode, strings.TrimSpace(string(responseBody)))
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func toLineProtocol(record model.Record) (string, error) {
|
|
if record.Measurement == "" {
|
|
return "", fmt.Errorf("record measurement is required")
|
|
}
|
|
if len(record.Fields) == 0 {
|
|
return "", fmt.Errorf("record fields are required")
|
|
}
|
|
|
|
tagKeys := sortedKeys(record.Tags)
|
|
fieldKeys := sortedKeysAny(record.Fields)
|
|
|
|
var builder strings.Builder
|
|
builder.WriteString(escapeMeasurement(record.Measurement))
|
|
|
|
for _, key := range tagKeys {
|
|
builder.WriteByte(',')
|
|
builder.WriteString(escapeTagOrKey(key))
|
|
builder.WriteByte('=')
|
|
builder.WriteString(escapeTagOrKey(record.Tags[key]))
|
|
}
|
|
|
|
builder.WriteByte(' ')
|
|
for index, key := range fieldKeys {
|
|
if index > 0 {
|
|
builder.WriteByte(',')
|
|
}
|
|
|
|
builder.WriteString(escapeTagOrKey(key))
|
|
builder.WriteByte('=')
|
|
|
|
formatted, err := formatFieldValue(record.Fields[key])
|
|
if err != nil {
|
|
return "", fmt.Errorf("format field %q: %w", key, err)
|
|
}
|
|
builder.WriteString(formatted)
|
|
}
|
|
|
|
builder.WriteByte(' ')
|
|
builder.WriteString(strconv.FormatInt(record.Timestamp.UnixNano(), 10))
|
|
|
|
return builder.String(), nil
|
|
}
|
|
|
|
func sortedKeys(values map[string]string) []string {
|
|
keys := make([]string, 0, len(values))
|
|
for key := range values {
|
|
keys = append(keys, key)
|
|
}
|
|
sort.Strings(keys)
|
|
return keys
|
|
}
|
|
|
|
func sortedKeysAny(values map[string]any) []string {
|
|
keys := make([]string, 0, len(values))
|
|
for key := range values {
|
|
keys = append(keys, key)
|
|
}
|
|
sort.Strings(keys)
|
|
return keys
|
|
}
|
|
|
|
func formatFieldValue(value any) (string, error) {
|
|
switch typed := value.(type) {
|
|
case string:
|
|
escaped := strings.ReplaceAll(typed, `\`, `\\`)
|
|
escaped = strings.ReplaceAll(escaped, `"`, `\"`)
|
|
return `"` + escaped + `"`, nil
|
|
case bool:
|
|
return strconv.FormatBool(typed), nil
|
|
case int:
|
|
return strconv.Itoa(typed) + "i", nil
|
|
case int64:
|
|
return strconv.FormatInt(typed, 10) + "i", nil
|
|
case float64:
|
|
return strconv.FormatFloat(typed, 'f', -1, 64), nil
|
|
case float32:
|
|
return strconv.FormatFloat(float64(typed), 'f', -1, 32), nil
|
|
default:
|
|
return "", fmt.Errorf("unsupported field type %T", value)
|
|
}
|
|
}
|
|
|
|
func escapeMeasurement(value string) string {
|
|
return strings.NewReplacer(
|
|
",", `\,`,
|
|
" ", `\ `,
|
|
).Replace(value)
|
|
}
|
|
|
|
func escapeTagOrKey(value string) string {
|
|
return strings.NewReplacer(
|
|
",", `\,`,
|
|
"=", `\=`,
|
|
" ", `\ `,
|
|
).Replace(value)
|
|
}
|