package mqtt import ( "log/slog" "time" paho "github.com/eclipse/paho.mqtt.golang" "mqqt-scrubber/internal/config" "mqqt-scrubber/internal/model" ) type Subscriber struct { config config.MQTTConfig handle func(model.RawMessage) client paho.Client started bool } func NewSubscriber(cfg config.MQTTConfig, handle func(model.RawMessage)) *Subscriber { return &Subscriber{ config: cfg, handle: handle, } } func (subscriber *Subscriber) Start() error { options := paho.NewClientOptions() options.AddBroker(subscriber.config.Broker) options.SetClientID(subscriber.config.ClientID) options.SetAutoReconnect(true) options.SetConnectRetry(true) options.SetConnectRetryInterval(5 * time.Second) options.SetKeepAlive(30 * time.Second) options.SetPingTimeout(10 * time.Second) options.SetOrderMatters(false) if subscriber.config.Username != "" { options.SetUsername(subscriber.config.Username) } if subscriber.config.Password != "" { options.SetPassword(subscriber.config.Password) } callback := func(_ paho.Client, message paho.Message) { subscriber.handle(model.RawMessage{ Topic: message.Topic(), Payload: append([]byte(nil), message.Payload()...), ReceivedAt: time.Now().UTC(), }) } options.SetOnConnectHandler(func(client paho.Client) { filters := make(map[string]byte, len(subscriber.config.Topics)) for _, topic := range subscriber.config.Topics { filters[topic] = subscriber.config.QoS } token := client.SubscribeMultiple(filters, callback) token.Wait() if err := token.Error(); err != nil { slog.Error("failed to subscribe after connect", "error", err) return } slog.Info("subscribed to mqtt topics", "topics", subscriber.config.Topics) }) options.SetConnectionLostHandler(func(_ paho.Client, err error) { slog.Warn("mqtt connection lost", "error", err) }) options.SetReconnectingHandler(func(_ paho.Client, _ *paho.ClientOptions) { slog.Info("mqtt reconnecting") }) subscriber.client = paho.NewClient(options) token := subscriber.client.Connect() token.Wait() if err := token.Error(); err != nil { return err } subscriber.started = true return nil } func (subscriber *Subscriber) Stop() { if !subscriber.started || subscriber.client == nil { return } subscriber.client.Disconnect(250) subscriber.started = false }