add simple mqtt support

This commit is contained in:
Martin/Geno 2019-08-07 12:27:18 +02:00
parent b559702f82
commit 12a2398457
No known key found for this signature in database
GPG key ID: 9D7D3C6BFF600C6A
10 changed files with 185 additions and 70 deletions

View file

@ -3,6 +3,7 @@ package main
import ( import (
"github.com/bdlm/std/logger" "github.com/bdlm/std/logger"
"dev.sum7.eu/ccchb/ccchatbot/mqtt"
"dev.sum7.eu/ccchb/ccchatbot/schalter" "dev.sum7.eu/ccchb/ccchatbot/schalter"
) )
@ -15,5 +16,6 @@ type Config struct {
Password string `toml:"password"` Password string `toml:"password"`
} `toml:"xmpp"` } `toml:"xmpp"`
Schalter schalter.Schalter `toml:"schalter"` Schalter *schalter.Schalter `toml:"schalter"`
MQTT *mqtt.Config `toml:"mqtt"`
} }

View file

@ -9,3 +9,9 @@ password = "test"
url = "https://schalter.ccchb.de/spaceapi.json" url = "https://schalter.ccchb.de/spaceapi.json"
interval = 5000000000 interval = 5000000000
mucs = ["ffhb_events@conference.chat.sum7.eu","#ccchb@irc.hackint.org"] mucs = ["ffhb_events@conference.chat.sum7.eu","#ccchb@irc.hackint.org"]
[mqtt]
broker = ""
client_id = ""
username = ""
password = ""

View file

@ -1,64 +1,18 @@
log_level = 50 log_level = 50
webserver_bind = ":8080"
startup_notify_user = ["user@fireorbit.de"]
startup_notify_muc = []
nickname = "logbot"
[xmpp] [xmpp]
address = "fireorbit.de" jid = "user@example.org"
jid = "bot@fireorbit.de" password = "password"
password = "example"
# suported hooks are, which could be declared multiple times with different `secrets` (see [[hooks.grafana]]):
[[hooks.grafana]]
[[hooks.prometheus]]
[[hooks.git]]
[[hooks.gitlab]]
[[hooks.circleci]]
# every hook could have following attributes:
secret = ""
notify_muc = []
notify_user = []
# for handling webhooks from prometheus alertmanager
[[hooks.prometheus]]
# for handling webhooks from grafana
# at http://localhost:8080/grafana
# for image support you have to enable `external_image_storage` (e.g. `provider = local`)
# see more at http://docs.grafana.org/installation/configuration/#external-image-storage
[[hooks.grafana]]
secret = "dev.sum7.eu-aShared-Secret"
notify_muc = ["monitoring@conference.chat.sum7.eu"]
[[hooks.grafana]]
secret = "dev.sum7.eu-aShared-Secret-for important messages"
notify_user = ["user@fireorbit.de"]
# for handling webhooks from git software (e.g. gitea, gogs, github)
# at http://localhost:8080/git
[[hooks.git]]
secret = "github-FreifunkBremen-yanic-aShared-Secret"
notify_muc = []
notify_user = ["user@fireorbit.de"]
# for handling webhooks from gitlab
# at http://localhost:8080/gitlab
[[hooks.gitlab]]
secret = "dev.sum7.eu-aShared-Secret"
notify_muc = []
notify_user = ["user@fireorbit.de"]
# for handling webhooks from circleci
# at http://localhost:8080/circleci
[[hooks.circleci]]
secret = "dev.sum7.eu-aShared-Secret"
notify_muc = []
notify_user = ["user@fireorbit.de"]
[schalter]
url = "https://schalter.ccchb.de/spaceapi.json"
interval = 5000000000
users = ["annoying@exmaple.com"]
mucs = ["#ccchb@irc.hackint.org"]
[mqtt]
broker = ""
client_id = ""
username = ""
password = ""

24
main.go
View file

@ -11,6 +11,7 @@ import (
"gosrc.io/xmpp" "gosrc.io/xmpp"
"gosrc.io/xmpp/stanza" "gosrc.io/xmpp/stanza"
"dev.sum7.eu/ccchb/ccchatbot/mqtt"
"dev.sum7.eu/ccchb/ccchatbot/runtime" "dev.sum7.eu/ccchb/ccchatbot/runtime"
) )
@ -27,6 +28,11 @@ func main() {
log.SetLevel(config.LogLevel) log.SetLevel(config.LogLevel)
var mqttService *mqtt.Service
if config.MQTT != nil {
mqttService = mqtt.Connect(config.MQTT)
}
router := xmpp.NewRouter() router := xmpp.NewRouter()
router.HandleFunc("presence", handlePresence) router.HandleFunc("presence", handlePresence)
router.HandleFunc("message", func(s xmpp.Sender, p stanza.Packet) { router.HandleFunc("message", func(s xmpp.Sender, p stanza.Packet) {
@ -35,9 +41,15 @@ func main() {
log.Errorf("ignoring wrong routed packet: %T", p) log.Errorf("ignoring wrong routed packet: %T", p)
return return
} }
if err := config.Schalter.HandleBotMessage(s, msg); err != nil { if ok := config.Schalter.HandleBotMessage(s, msg); ok {
log.Debugf("bot could not handle message: %s", err) return
} }
if mqttService != nil {
if ok := mqttService.HandleBotMessage(s, msg); ok {
return
}
}
log.Debugf("no bot has handle message of %s: %s", msg.From, msg.Body)
}) })
var err error var err error
@ -47,6 +59,10 @@ func main() {
Password: config.XMPP.Password, Password: config.XMPP.Password,
}, router) }, router)
config.Schalter.ChangeEvent = append(config.Schalter.ChangeEvent, config.Schalter.XMPPChangeEvent(client))
if mqttService != nil {
config.Schalter.ChangeEvent = append(config.Schalter.ChangeEvent, mqttService.HandleSchalterStateChange)
}
if err != nil { if err != nil {
log.Panicf("error on startup xmpp client: %s", err) log.Panicf("error on startup xmpp client: %s", err)
} }
@ -64,6 +80,10 @@ func main() {
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
sig := <-sigs sig := <-sigs
if mqttService != nil {
mqttService.Close()
}
config.Schalter.Close() config.Schalter.Close()
runtime.LeaveAllMUCs(client) runtime.LeaveAllMUCs(client)

70
mqtt/bot.go Normal file
View file

@ -0,0 +1,70 @@
package mqtt
import (
"fmt"
"github.com/mattn/go-shellwords"
"gosrc.io/xmpp"
"gosrc.io/xmpp/stanza"
)
var botCommands map[string]func(*Service, xmpp.Sender, stanza.Attrs, []string) bool
func (s *Service) HandleBotMessage(c xmpp.Sender, msg stanza.Message) bool {
msgParts, err := shellwords.Parse(msg.Body)
if err != nil {
return false
}
if len(msgParts) <= 0 || msgParts[0][0] != '.' {
return false
}
command := msgParts[0][1:]
if f, ok := botCommands[command]; ok {
attrs := stanza.Attrs{To: msg.From, Type: msg.Type}
if msg.Type == stanza.MessageTypeGroupchat {
jid, _ := xmpp.NewJid(msg.From)
attrs.To = jid.Bare()
}
return f(s, c, attrs, msgParts[1:])
}
return false
}
func init() {
botCommands = make(map[string]func(*Service, xmpp.Sender, stanza.Attrs, []string) bool)
botCommands["sub"] = func(s *Service, c xmpp.Sender, replyAttrs stanza.Attrs, leftCommands []string) bool {
c.Send(stanza.Message{
Attrs: replyAttrs,
Body: "not supported yet:\n.sub <topic>",
})
return true
}
botCommands["unsub"] = func(s *Service, c xmpp.Sender, replyAttrs stanza.Attrs, leftCommands []string) bool {
c.Send(stanza.Message{
Attrs: replyAttrs,
Body: "not supported yet:\n.unsub <topic>",
})
return true
}
botCommands["pub"] = func(s *Service, c xmpp.Sender, replyAttrs stanza.Attrs, leftCommands []string) bool {
if len(leftCommands) != 2 {
c.Send(stanza.Message{
Attrs: replyAttrs,
Body: "wrong arguments: need format:\n.pub <topic> <payload>",
})
return true
}
topic := leftCommands[0]
payload := leftCommands[1]
token := s.client.Publish(topic, 0, false, payload)
token.Wait()
c.Send(stanza.Message{
Attrs: replyAttrs,
Body: fmt.Sprintf("published in %s : %s", topic, payload),
})
return true
}
}

8
mqtt/config.go Normal file
View file

@ -0,0 +1,8 @@
package mqtt
type Config struct {
Broker string `toml:"broker"`
ClientID string `toml:"client_id"`
Username string `toml:"username"`
Password string `toml:"passoword"`
}

5
mqtt/schalter.go Normal file
View file

@ -0,0 +1,5 @@
package mqtt
func (s *Service) HandleSchalterStateChange(open bool) {
s.client.Publish("ccchb/schalter", 0, true, open)
}

38
mqtt/service.go Normal file
View file

@ -0,0 +1,38 @@
package mqtt
import (
"github.com/bdlm/log"
MQTT "github.com/eclipse/paho.mqtt.golang"
)
type Service struct {
config *Config
client MQTT.Client
}
func Connect(c *Config) *Service {
if c.Broker == "" {
log.Warn("no broker given, starting without mqtt service")
return nil
}
optsPub := MQTT.NewClientOptions()
optsPub.AddBroker(c.Broker)
optsPub.SetClientID(c.ClientID)
if c.Username != "" {
optsPub.SetUsername(c.Username)
}
if c.Password != "" {
optsPub.SetPassword(c.Password)
}
clientPub := MQTT.NewClient(optsPub)
if tokenPub := clientPub.Connect(); tokenPub.Wait() && tokenPub.Error() != nil {
log.Panic(tokenPub.Error())
}
return &Service{config: c, client: clientPub}
}
func (s *Service) Close() {
s.client.Disconnect(250)
}

View file

@ -7,7 +7,7 @@ import (
"gosrc.io/xmpp/stanza" "gosrc.io/xmpp/stanza"
) )
func (s *Schalter) HandleBotMessage(c xmpp.Sender, msg stanza.Message) error { func (s *Schalter) HandleBotMessage(c xmpp.Sender, msg stanza.Message) bool {
if msg.Body == ".status" { if msg.Body == ".status" {
jid, _ := xmpp.NewJid(msg.From) jid, _ := xmpp.NewJid(msg.From)
reply := stanza.Message{ reply := stanza.Message{
@ -20,7 +20,8 @@ func (s *Schalter) HandleBotMessage(c xmpp.Sender, msg stanza.Message) error {
reply.To = jid.Bare() reply.To = jid.Bare()
reply.Body = fmt.Sprintf("%s: %s", jid.Resource, reply.Body) reply.Body = fmt.Sprintf("%s: %s", jid.Resource, reply.Body)
} }
return c.Send(reply) c.Send(reply)
return true
} }
return fmt.Errorf("not handled by this bot: %v", msg) return false
} }

View file

@ -23,6 +23,8 @@ type Schalter struct {
Interval time.Duration `toml:"interval"` Interval time.Duration `toml:"interval"`
ChangeEvent []func(bool)
state bool state bool
spaceName string spaceName string
worker *worker.Worker worker *worker.Worker
@ -61,19 +63,28 @@ func (s *Schalter) Start(c xmpp.Sender) {
func (s *Schalter) run(c xmpp.Sender) func() { func (s *Schalter) run(c xmpp.Sender) func() {
return func() { return func() {
if s.fetchState() { if s.fetchState() {
s.updatePresence(c) for _, f := range s.ChangeEvent {
text := fmt.Sprintf("%s changed to closed", s.spaceName) f(s.state)
if s.state {
text = fmt.Sprintf("%s changed to open", s.spaceName)
} }
runtime.SendText(c, s.Users, s.MUCs, text, text)
log.Infof("worker detect changes of status: %s", text) log.Infof("worker detect changes of status: %s", s.stateString())
} else { } else {
log.Debug("worker run, but no changes detected") log.Debug("worker run, but no changes detected")
} }
} }
} }
func (s *Schalter) XMPPChangeEvent(c xmpp.Sender) func(bool) {
return func(state bool) {
s.updatePresence(c)
text := fmt.Sprintf("%s changed to closed", s.spaceName)
if state {
text = fmt.Sprintf("%s changed to open", s.spaceName)
}
runtime.SendText(c, s.Users, s.MUCs, text, text)
}
}
func (s *Schalter) Close() { func (s *Schalter) Close() {
if s.worker != nil { if s.worker != nil {
s.worker.Close() s.worker.Close()