Clustering via NATS

This commit is contained in:
Andreas Neue 2016-08-12 23:18:39 +02:00
parent 262da96e46
commit 8aa7fe1a49
4 changed files with 75 additions and 61 deletions

View File

@ -6,6 +6,7 @@ import (
"strings"
"code.dnix.de/an/irc"
"code.dnix.de/an/xlog"
"github.com/nats-io/nats"
)
@ -24,7 +25,7 @@ func NewClusterConnector(servers string, ssl bool) *ClusterConnector {
opts.Secure = ssl
conn, err := opts.Connect()
if err != nil {
// foo
xlog.Error(err.Error())
}
subs := make(map[string]*nats.Subscription)
return &ClusterConnector{conn: conn, subs: subs}
@ -39,9 +40,10 @@ func (cc *ClusterConnector) Subscribe(subj string, ch chan *irc.Message) {
ch <- m
})
if err != nil {
cc.subs[subj] = sub
xlog.Error(err.Error())
return
}
cc.subs[subj] = sub
}
func (cc *ClusterConnector) Unsubscribe(subj string) {
@ -52,6 +54,6 @@ func (cc *ClusterConnector) Unsubscribe(subj string) {
}
func (cc *ClusterConnector) Publish(msg *irc.Message) {
subj := strings.ToLower(msg.Pre)
subj := strings.ToLower(msg.Args[0])
cc.conn.Publish(subj, []byte(msg.String()))
}

View File

@ -90,6 +90,7 @@ func handleCmdJoin(sv *Server, msg *irc.Message) {
if !sv.localOrigin(msg) {
return
}
sv.cluster.Subscribe(chid, sv.remoteq)
sv.sendReply(msg.Pre, RPL_TOPIC, msg.Args[0], sv.chTopics[msg.Args[0]])
sv.channelNames(msg.Pre, msg.Args[0])
m, isoper := sv.opers[clid]
@ -124,17 +125,7 @@ func handleCmdPart(sv *Server, msg *irc.Message) {
return
}
sv.sendMsg(msg)
delete(sv.chUsers[chid], clid)
//delete(sv.chModes[chid], "q "+clid)
//delete(sv.chModes[chid], "a "+clid)
delete(sv.chModes[chid], "o "+clid)
delete(sv.chModes[chid], "h "+clid)
delete(sv.chModes[chid], "v "+clid)
if len(sv.chUsers[chid]) == 0 {
delete(sv.chUsers, chid)
delete(sv.chTopics, chid)
delete(sv.chModes, chid)
}
sv.channelRemoveUser(chid, clid)
}
func handleCmdQuit(sv *Server, msg *irc.Message) {
@ -233,12 +224,7 @@ func handleCmdKick(sv *Server, msg *irc.Message) {
sv.sendReply(clid, ERR_CHANOPRIVSNEEDED, chid, "You're not channel operator")
}
sv.sendMsg(msg)
delete(sv.chUsers[chid], target)
//delete(sv.chModes[chid], "q "+target)
//delete(sv.chModes[chid], "a "+target)
delete(sv.chModes[chid], "o "+target)
delete(sv.chModes[chid], "h "+target)
delete(sv.chModes[chid], "v "+target)
sv.channelRemoveUser(chid, target)
}
func handleCmdNames(sv *Server, msg *irc.Message) {

View File

@ -4,19 +4,26 @@ package ircd
import (
"net/http"
"runtime"
"time"
"github.com/prometheus/client_golang/prometheus"
)
var (
gaugeGoroutinesRunning prometheus.Gauge
gaugePacketsTransferred prometheus.Gauge
gaugeConnectionsCurrent prometheus.Gauge
gaugeConnectionsCount prometheus.Gauge
gaugeQueueLen prometheus.Gauge
gaugeLocalQueueLen prometheus.Gauge
gaugeRemoteQueueLen prometheus.Gauge
)
func monitoringRun(sv *Server) {
gaugeGoroutinesRunning = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "ircd_goroutines_running",
Help: "Goroutines runnning",
})
gaugePacketsTransferred = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "ircd_packets_transferred",
Help: "Packets handled",
@ -29,14 +36,20 @@ func monitoringRun(sv *Server) {
Name: "ircd_connections_count",
Help: "Client connections",
})
gaugeQueueLen = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "ircd_queue_len",
Help: "Unhandled msgs in dispatcher queue",
gaugeLocalQueueLen = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "ircd_local_queue_len",
Help: "Unhandled msgs in dispatcher local queue",
})
gaugeRemoteQueueLen = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "ircd_remote_queue_len",
Help: "Unhandled msgs in dispatcher remote queue",
})
prometheus.MustRegister(gaugeGoroutinesRunning)
prometheus.MustRegister(gaugePacketsTransferred)
prometheus.MustRegister(gaugeConnectionsCurrent)
prometheus.MustRegister(gaugeConnectionsCount)
prometheus.MustRegister(gaugeQueueLen)
prometheus.MustRegister(gaugeLocalQueueLen)
prometheus.MustRegister(gaugeRemoteQueueLen)
go monitoringUpdater(sv)
http.Handle("/metrics", prometheus.Handler())
laddr, _ := sv.config.GetString("net", "listen_prom")
@ -46,9 +59,11 @@ func monitoringRun(sv *Server) {
func monitoringUpdater(sv *Server) {
for {
time.Sleep(5 * time.Second)
gaugeGoroutinesRunning.Set(float64(runtime.NumGoroutine()))
gaugePacketsTransferred.Set(sv.packetsTransferred)
gaugeConnectionsCurrent.Set(sv.connectionsCurrent)
gaugeConnectionsCurrent.Set(float64(len(sv.clients)))
gaugeConnectionsCount.Set(sv.connectionsCount)
gaugeQueueLen.Set(sv.queueLen)
gaugeLocalQueueLen.Set(float64(len(sv.localq)))
gaugeRemoteQueueLen.Set(float64(len(sv.remoteq)))
}
}

View File

@ -8,7 +8,6 @@ import (
"fmt"
"net"
"os"
"runtime"
"strings"
"time"
@ -25,13 +24,14 @@ const (
var myinfo string = "%s %s/%s * *"
//var isupport string = "CASEMAPPING=rfc1459 CHANTYPES=# NICKLEN=32 MODES=1 PREFIX=(qaohv)~&@%+"
var isupport string = "CASEMAPPING=rfc1459 CHANTYPES=# NICKLEN=32 MODES=1 PREFIX=(ohv)@%+"
type Server struct {
queue chan *irc.Message
addq chan Client
delq chan Client
localq chan *irc.Message
remoteq chan *irc.Message
addq chan Client
delq chan Client
host string
info string
@ -40,6 +40,8 @@ type Server struct {
created string
motd string
cluster *ClusterConnector
opers map[string]string
clients map[string]Client
@ -53,19 +55,18 @@ type Server struct {
configPath string
packetsTransferred float64
connectionsCurrent float64
connectionsCount float64
queueLen float64
authCallback func(name, pass string) bool
}
// Create a new server instance.
func NewServer(configPath, software, version string) *Server {
sv := &Server{software: software, version: version,
created: time.Now().String()}
sv.queue = make(chan *irc.Message, 1024)
sv.localq = make(chan *irc.Message, 65536)
sv.remoteq = make(chan *irc.Message, 65536)
sv.addq = make(chan Client, 128)
sv.delq = make(chan Client, 128)
@ -88,6 +89,9 @@ func NewServer(configPath, software, version string) *Server {
sv.info, _ = sv.config.GetString("server", "info")
sv.motd, _ = sv.config.GetString("server", "motd")
urls, _ := sv.config.GetString("cluster", "urls")
sv.cluster = NewClusterConnector(urls, false)
nicks, _ := sv.config.GetString("control", "a")
for _, nick := range strings.Split(nicks, ",") {
sv.opers[nick] = "a"
@ -98,9 +102,7 @@ func NewServer(configPath, software, version string) *Server {
}
sv.packetsTransferred = 0
sv.connectionsCurrent = 0
sv.connectionsCount = 0
sv.queueLen = 0
return sv
}
@ -109,7 +111,6 @@ func (sv *Server) SetAuthCallback(authCB func(name, pass string) bool) {
sv.authCallback = authCB
}
// Open the listening port and start the main server loop.
func (sv *Server) Run() {
xlog.Info("%s/%s", sv.software, sv.version)
go monitoringRun(sv)
@ -139,7 +140,7 @@ func (sv *Server) Run() {
}
func (sv *Server) Dispatch(msg *irc.Message) {
sv.queue <- msg
sv.localq <- msg
}
func (sv *Server) AddClient(cl Client) {
@ -202,9 +203,14 @@ func (sv *Server) dispatcher() (err error) {
}
}()
for {
sv.queueLen = float64(len(sv.queue))
select {
case msg := <-sv.queue:
case msg := <-sv.localq:
sv.recvMsg(msg)
sv.packetsTransferred++
case msg := <-sv.remoteq:
if sv.localOrigin(msg) {
continue
}
sv.recvMsg(msg)
sv.packetsTransferred++
case cl := <-sv.addq:
@ -249,27 +255,25 @@ func (sv *Server) addClient(cl Client) {
}
sv.clients[clid] = cl
sv.sendLogon(cl.Name())
sv.connectionsCurrent = float64(len(sv.clients))
cl.Register(true)
sv.cluster.Subscribe(clid, sv.remoteq)
xlog.Info("Client registered: '%s'", clid)
xlog.Info("Server has %d client(s)", len(sv.clients))
xlog.Debug("Goroutines running: %d", runtime.NumGoroutine())
}
func (sv *Server) delClient(cl Client) {
clid := strings.ToLower(cl.Name())
sv.cluster.Unsubscribe(clid)
cl.Destroy()
for chname, ch := range sv.chUsers {
if _, exists := ch[clid]; exists {
delete(ch, clid)
sv.sendMsg(irc.M(cl.Name(), "PART", chname, "quit"))
for chid, _ := range sv.chUsers {
if _, exists := sv.chUsers[chid][clid]; exists {
sv.channelRemoveUser(chid, clid)
sv.sendMsg(irc.M(cl.Name(), "PART", chid, "quit"))
}
}
delete(sv.clients, clid)
sv.connectionsCurrent = float64(len(sv.clients))
xlog.Info("Client deleted: '%s'", clid)
xlog.Info("Server has %d client(s)", len(sv.clients))
xlog.Debug("Goroutines running: %d", runtime.NumGoroutine())
}
func (sv *Server) recvMsg(msg *irc.Message) {
@ -304,13 +308,11 @@ func (sv *Server) recvMsg(msg *irc.Message) {
hook.HookFn(sv, msg)
}
// Forward an irc message to cluster and deliver locally
func (sv *Server) sendMsg(msg *irc.Message) {
sv.deliverMsg(msg)
sv.forwardMsg(msg)
}
// Local delivery of an irc message to channel or client
func (sv *Server) deliverMsg(msg *irc.Message) {
if strings.HasPrefix(msg.Args[0], "#") {
chid := strings.ToLower(msg.Args[0])
@ -337,16 +339,12 @@ func (sv *Server) deliverMsg(msg *irc.Message) {
}
}
// Forward to cluster
func (sv *Server) forwardMsg(msg *irc.Message) {
if sv.localOrigin(msg) {
// TODO: forward to cluster
println("forward:", msg.String())
sv.cluster.Publish(msg)
}
}
// Send irc reply to local client and drop, if client doesnt
// exist locally
func (sv *Server) sendReply(nick, cmd, args, trail string) {
clid := strings.ToLower(nick)
if _, exists := sv.clients[clid]; !exists {
@ -361,7 +359,6 @@ func (sv *Server) sendReply(nick, cmd, args, trail string) {
cl.Receive(irc.M(sv.host, cmd, args, trail))
}
// Check if message's origin is local
func (sv *Server) localOrigin(msg *irc.Message) bool {
_, localClient := sv.clients[strings.ToLower(msg.Pre)]
localServer := (msg.Pre == sv.host)
@ -395,7 +392,6 @@ func (sv *Server) sendLogon(nick string) {
}
}
// Send channel name list to client
func (sv *Server) channelNames(nick, ch string) {
chid := strings.ToLower(ch)
if _, exists := sv.chUsers[chid]; !exists {
@ -425,7 +421,6 @@ func (sv *Server) channelNames(nick, ch string) {
sv.sendReply(nick, RPL_ENDOFNAMES, ch, "End of /NAMES list")
}
// Check if mode exists for the specified channel
func (sv *Server) channelCheckMode(chid, mode string) bool {
if _, exists := sv.chModes[chid]; !exists {
return false
@ -444,3 +439,19 @@ func (sv *Server) channelCheckPerm(chid, clid, modes string) bool {
}
return false
}
func (sv *Server) channelRemoveUser(chid, clid string) {
if _, exists := sv.chUsers[chid]; !exists {
return
}
delete(sv.chUsers[chid], clid)
delete(sv.chModes[chid], "o "+clid)
delete(sv.chModes[chid], "h "+clid)
delete(sv.chModes[chid], "v "+clid)
if len(sv.chUsers[chid]) == 0 {
sv.cluster.Unsubscribe(chid)
delete(sv.chUsers, chid)
delete(sv.chTopics, chid)
delete(sv.chModes, chid)
}
}