ircd/cluster.go

69 lines
1.3 KiB
Go
Raw Normal View History

2016-08-12 06:08:38 +00:00
// vim:ts=4:sts=4:sw=4:noet:tw=72
package ircd
import (
"strings"
"code.dnix.de/an/irc"
2016-08-12 21:18:39 +00:00
"code.dnix.de/an/xlog"
2016-08-12 06:08:38 +00:00
"github.com/nats-io/nats"
)
type ClusterConnector struct {
conn *nats.Conn
subs map[string]*nats.Subscription
}
func NewClusterConnector(urls string, ssl bool) *ClusterConnector {
2016-08-12 06:08:38 +00:00
opts := nats.DefaultOptions
opts.Servers = strings.Split(urls, ",")
2016-08-12 06:08:38 +00:00
for i, s := range opts.Servers {
opts.Servers[i] = strings.Trim(s, " ")
}
opts.Secure = ssl
conn, err := opts.Connect()
if err != nil {
2016-08-12 21:18:39 +00:00
xlog.Error(err.Error())
2016-08-12 06:08:38 +00:00
}
subs := make(map[string]*nats.Subscription)
2016-08-12 06:24:58 +00:00
return &ClusterConnector{conn: conn, subs: subs}
2016-08-12 06:08:38 +00:00
}
func (cc *ClusterConnector) Subscribe(subj string, ch chan *irc.Message) {
if cc.conn == nil {
return
}
2016-08-12 06:24:58 +00:00
if _, exists := cc.subs[subj]; exists {
2016-08-12 06:08:38 +00:00
return
}
2016-08-12 06:24:58 +00:00
sub, err := cc.conn.Subscribe(subj, func(n *nats.Msg) {
2016-08-12 06:08:38 +00:00
m := irc.Parse(string(n.Data))
ch <- m
})
if err != nil {
2016-08-12 21:18:39 +00:00
xlog.Error(err.Error())
return
2016-08-12 06:08:38 +00:00
}
2016-08-12 21:18:39 +00:00
cc.subs[subj] = sub
2016-08-12 06:08:38 +00:00
}
func (cc *ClusterConnector) Unsubscribe(subj string) {
if cc.conn == nil {
return
}
2016-08-12 06:24:58 +00:00
if sub, exists := cc.subs[subj]; exists {
sub.Unsubscribe()
delete(cc.subs, subj)
}
2016-08-12 06:08:38 +00:00
}
func (cc *ClusterConnector) Publish(msg *irc.Message) {
if cc.conn == nil {
return
}
2016-08-12 21:18:39 +00:00
subj := strings.ToLower(msg.Args[0])
2016-08-12 06:24:58 +00:00
cc.conn.Publish(subj, []byte(msg.String()))
2016-08-12 06:08:38 +00:00
}