Refactored the handler functions into interface types. Follows the /net/http handlers pattern. Handlers are now chainable. Moved database checking from notifyHandlers. Implemented database feature as chainable handler

This commit is contained in:
Ben Davies 2014-10-04 12:45:05 +01:00
parent ddfdb20a68
commit cb314c235b
7 changed files with 273 additions and 31 deletions

View File

@ -3,6 +3,10 @@ Credits go to github.com/SlyMarbo/rss for inspiring this solution.
*/ */
package feeder package feeder
import (
"fmt"
)
type database struct { type database struct {
request chan string request chan string
response chan bool response chan bool
@ -16,8 +20,10 @@ func (d *database) Run() {
for { for {
s = <-d.request s = <-d.request
if _, ok := d.known[s]; ok { if _, ok := d.known[s]; ok {
fmt.Println("Database used: true")
d.response <- true d.response <- true
} else { } else {
fmt.Println("Database used: false")
d.response <- false d.response <- false
d.known[s] = struct{}{} d.known[s] = struct{}{}
} }

63
databaseHandler.go Normal file
View File

@ -0,0 +1,63 @@
package feeder
import (
"fmt"
)
type databaseHandler struct {
db *database
itemhandler ItemHandler
chanhandler ChannelHandler
}
func (d *databaseHandler) ProcessItems(f *Feed, ch *Channel, items []*Item) {
fmt.Println("Processing items")
var newitems []*Item
for _, item := range items {
if d.db.request <- item.Key(); !<-d.db.response {
newitems = append(newitems, item)
}
}
if len(newitems) > 0 && d.itemhandler != nil {
d.itemhandler.ProcessItems(f, ch, newitems)
}
// No items to process, may as well end here
}
func (d *databaseHandler) ProcessChannels(f *Feed, ch []*Channel) {
fmt.Println("Processing channels")
var newchannels []*Channel
for _, channel := range ch {
if d.db.request <- channel.Key(); !<-d.db.response {
newchannels = append(newchannels, channel)
}
}
if len(newchannels) > 0 && d.chanhandler != nil {
d.chanhandler.ProcessChannels(f, newchannels)
}
// No channels to process, may as well end here
}
func NewDatabaseHandler(handler Handler) Handler {
database := new(databaseHandler)
database.db = NewDatabase()
database.itemhandler = handler
database.chanhandler = handler
return database
}
func NewDatabaseItemHandler(itemhandler ItemHandler) ItemHandler {
database := new(databaseHandler)
database.db = NewDatabase()
database.itemhandler = itemhandler
return database
}
func NewDatabaseChannelHandler(chanhandler ChannelHandler) ChannelHandler {
database := new(databaseHandler)
database.db = NewDatabase()
database.chanhandler = chanhandler
return database
}

90
feed.go
View File

@ -43,8 +43,50 @@ func (err *UnsupportedFeedError) Error() string {
return fmt.Sprintf("Unsupported feed: %s, version: %+v", err.Type, err.Version) return fmt.Sprintf("Unsupported feed: %s, version: %+v", err.Type, err.Version)
} }
type ChannelHandler func(f *Feed, newchannels []*Channel) type ChannelHandlerFunc func(f *Feed, newchannels []*Channel)
type ItemHandler func(f *Feed, ch *Channel, newitems []*Item)
func (h ChannelHandlerFunc) ProcessChannels(f *Feed, newchannels []*Channel) {
h(f, newchannels)
}
type ItemHandlerFunc func(f *Feed, ch *Channel, newitems []*Item)
func (h ItemHandlerFunc) ProcessItems(f *Feed, ch *Channel, newitems []*Item) {
h(f, ch, newitems)
}
type Handler interface {
ChannelHandler
ItemHandler
}
type ChannelHandler interface {
ProcessChannels(f *Feed, newchannels []*Channel)
}
type ItemHandler interface {
ProcessItems(f *Feed, ch *Channel, newitems []*Item)
}
type HandlerBonder struct {
itemhandler ItemHandler
chanhandler ChannelHandler
}
func (hb *HandlerBonder) ProcessChannels(f *Feed, newchannels []*Channel) {
hb.chanhandler.ProcessChannels(f, newchannels)
}
func (hb *HandlerBonder) ProcessItems(f *Feed, ch *Channel, newitems []*Item) {
hb.itemhandler.ProcessItems(f, ch, newitems)
}
func NewHandlerBonder(chanhandler ChannelHandler, itemhandler ItemHandler) Handler {
return &HandlerBonder{
itemhandler: itemhandler,
chanhandler: chanhandler,
}
}
type Feed struct { type Feed struct {
// Custom cache timeout in minutes. // Custom cache timeout in minutes.
@ -66,30 +108,27 @@ type Feed struct {
// Url from which this feed was created. // Url from which this feed was created.
Url string Url string
// Database containing a list of known Items and Channels for this instance // The channel and item handler
database *database handler Handler
// A notification function, used to notify the host when a new channel
// has been found.
chanhandler ChannelHandler
// A notification function, used to notify the host when a new item
// has been found for a given channel.
itemhandler ItemHandler
// Last time content was fetched. Used in conjunction with CacheTimeout // Last time content was fetched. Used in conjunction with CacheTimeout
// to ensure we don't get content too often. // to ensure we don't get content too often.
lastupdate int64 lastupdate int64
} }
func New(cachetimeout int, enforcecachelimit bool, ch ChannelHandler, ih ItemHandler) *Feed { // New is a helper function to stay semi-compatible with
// the old code
func New(cachetimeout int, enforcecachelimit bool, ch ChannelHandlerFunc, ih ItemHandlerFunc) *Feed {
return NewWithHandler(cachetimeout, enforcecachelimit, NewHandlerBonder(ch, ih))
}
// NewWithHandler creates a new feed with a handler
func NewWithHandler(cachetimeout int, enforcecachelimit bool, h Handler) *Feed {
v := new(Feed) v := new(Feed)
v.CacheTimeout = cachetimeout v.CacheTimeout = cachetimeout
v.EnforceCacheLimit = enforcecachelimit v.EnforceCacheLimit = enforcecachelimit
v.Type = "none" v.Type = "none"
v.database = NewDatabase() v.handler = h
v.chanhandler = ch
v.itemhandler = ih
return v return v
} }
@ -176,25 +215,14 @@ func (this *Feed) makeFeed(doc *xmlx.Document) (err error) {
} }
func (this *Feed) notifyListeners() { func (this *Feed) notifyListeners() {
var newchannels []*Channel
for _, channel := range this.Channels { for _, channel := range this.Channels {
if this.database.request <- channel.Key(); !<-this.database.response { if len(channel.Items) > 0 && this.handler != nil {
newchannels = append(newchannels, channel) this.handler.ProcessItems(this, channel, channel.Items)
}
var newitems []*Item
for _, item := range channel.Items {
if this.database.request <- item.Key(); !<-this.database.response {
newitems = append(newitems, item)
}
}
if len(newitems) > 0 && this.itemhandler != nil {
this.itemhandler(this, channel, newitems)
} }
} }
if len(newchannels) > 0 && this.chanhandler != nil { if len(this.Channels) > 0 && this.handler != nil {
this.chanhandler(this, newchannels) this.handler.ProcessChannels(this, this.Channels)
} }
} }

86
notes.md Normal file
View File

@ -0,0 +1,86 @@
## The problem with the current database solution
The purpose of the database is to ensure that the channel and item handlers are only called once for each new channel and each new item.It is clear that many users of go-pkg-rss are having problems with their channel and item handlers being called multiple times for the same items.
The current solution makes writing handlers very clean and safe as the user can be sure that their handlers are only executed once for each new item/channel.
### Use cases ###
The use cases where database shines is with regards to long running go routines what watch a specific feed url and over the lifetime of the program periodically check that feed url for updates. In that situation, having a database prevents the duplication of items as there is a high likelyhood that the refetch of the feed url will contain items already processed by the item handler.
The benefits of this include:
1) Batteries included: If the user is creating a program that processes a set of feed urls that it repeatedly polls on an existing feed then the built in database provides a "batteries included" solution to prevent calling the users channel or item handlers unnecessarily, greatly simplifying development of those handlers.
2) Zero maintenance: The database is maintence free. There is nothing the developer needs to do to take advantage of it. At the same time the developer doesnt need to query it at any time nor do they need to purge items from it. They need not even know it exists.
### Problems
The problem with the current solution is that it doesnt scale. Time for an example. I'm writing a program that will pull feed urls from a queue system to be processed. In order to execute several feed fetches at once it may run several hundred go routines, each fetching a feed url from the queue, processing it via go-pkg-rss and the putting the item urls into another queue for processing by another program. The feed url job is then released to the queue which will then delay the feed url from being processed for a set amount of time (usually the lastupdate/cacheTimeout). As there are several thousand feed urls to get through, I will be running my program on several servers, each fetching feed urls from the queue via its several hundred go routines.
In order to prevent duplication of effort, as across several thousand feed urls there is a very high likelyhood that items will be duplicated across feeds, I record a hash for each item in memcached. This provides a very quick and lightweight way of determining if I have already fetched that article before, and therefore do not need to fetch it again. This has the added benefit that the cache can be shared across several servers all collectiong feed urls and article urls as a centralised "database" (although I am also leaning on the caching features of memcache to store the entry for a limited time, allowing me to fetch an article again in the future, in case of updates to the article).
In addition to this, I also check and catch errors raised by network issues such as timeouts, unparsable urls, http error codes and unparsable documents. For these I also store a hash in memcache for each feed url, however this is an incrementing value, allowing me to keep track or the number of retry attempts made to fetch that feed url. After a certain threshold is met, and the feed url is still failing, I mark the job as bad in the queue system, which prevents me from constantly refetching a bad feed url.
The current database solution contributes the following issues:
1) The database is too simple: In the above example I need to track (and prevent) the number of fetches I make for article items. I also need to allow a number of retry attempts before preventing refetches.The current database is not sophisticated enough to handle these two different cases at once.
2) The database does expire or clean up entries: I expect my program to run for a very long time, processing many thousand feeds and even more article urls. The current implementation of the database is simple in that it continues to grow indefinately, consuming lots of memory.
3) The database replaces a job that is trivial to implement in a handler: The current database doesnt provide anything that couldnt be developed by a user of the package with ease.
4) The database doesnt provide a fine-grained enough key for duplicates: The current version uses a multitude of options for item keys and channels, all of which could very easily be falsly marked as duplicates. For example, the item title is not a very unique key, expecially when each item and channel each have unique key in the form of the url.
### Proposed solution
Looking across the stdlib provided with Go we can see and example where similar concerns have been met with elegance. The net/http package uses handlers, much like go-pkg-rss does, to off load implementation complexity to outside of the core http package. It also provides batteris included solutions to common problems that developers may have with built in handlers such as a FileServer, a NotFoundHandler, RedirectHandler, StripPrefix and TimeoutHandler. I propose that the current database implementation be stripped from the package and moved to a set of built in handlers called DedupeChannelHandler and DedupeItemHandler.
The developer will then b provided with two powerful options:
1) Use the built in deduplication handlers as part of a chain along with their own existing handlers to get the same functionality as currently provided by the existing database implementation.
2) Roll their own custom handlers which may be inserted into the handler chain. They even have the option of using the provided deduplication handlers if they want.
This opens up exciting possibilities for developers and the future of the go-pkg-rss package. We could add handlers for caching via memcache, a retry count handler for channels, a whitelist/blacklist handler based on item title, a filter handler that strips out items that have a date older than 3 hours, etc.
type ChannelHandler interface {
func ProcessChannels(f *Feed, newchannels []*Channel)
}
type ItemHandler interface {
func ProcessItems(f *Feed, ch *Channel, newitems []*Item)
}
type ItemCache struct {
mc *memcache.Conn
}
func (ic *ItemCache) ProcessItems(ih rss.ItemHandler) rss.ItemHandler {
return func(f *Feed, ch *Channel, newitems []*Item) {
for _, v := range newitems {
_ := ic.mc.Add(v)
}
ih(f, ch, newitems)
}
}
func (this *Feed) notifyListeners() {
var newchannels []*Channel
for _, channel := range this.Channels {
if this.database.request <- channel.Key(); !<-this.database.response {
newchannels = append(newchannels, channel)
}
var newitems []*Item
for _, item := range channel.Items {
if this.database.request <- item.Key(); !<-this.database.response {
newitems = append(newitems, item)
}
}
if len(newitems) > 0 && this.itemhandler != nil {
this.itemhandler(this, channel, newitems)
}
}
if len(newchannels) > 0 && this.chanhandler != nil {
this.chanhandler(this, newchannels)
}
}

59
testdata/handlers/handlerexample.go vendored Normal file
View File

@ -0,0 +1,59 @@
package main
/*
This is a minimal sample application, demonstrating how to set up an RSS feed
for regular polling of new channels/items.
Build & run with:
$ 6g example.go && 6l example.6 && ./6.out
*/
import (
"fmt"
rss "github.com/JalfResi/go-pkg-rss"
"os"
"time"
)
func main() {
// This sets up a new feed and polls it for new channels/items.
// Invoke it with 'go PollFeed(...)' to have the polling performed in a
// separate goroutine, so you can continue with the rest of your program.
PollFeed("http://blog.case.edu/news/feed.atom", 5)
}
func PollFeed(uri string, timeout int) {
feed := rss.NewWithHandler(timeout, true, rss.NewDatabaseHandler(NewMyHandler()))
for {
if err := feed.Fetch(uri, nil); err != nil {
fmt.Fprintf(os.Stderr, "[e] %s: %s", uri, err)
return
}
<-time.After(time.Duration(10 * time.Second))
}
}
/*
func itemHandler(feed *rss.Feed, ch *rss.Channel, newitems []*rss.Item) {
fmt.Printf("%d new item(s) in %s\n", len(newitems), feed.Url)
}
*/
type MyHandler struct{}
func NewMyHandler() rss.Handler {
return &MyHandler{}
}
func (m *MyHandler) ProcessChannels(feed *rss.Feed, newchannels []*rss.Channel) {
fmt.Printf("%d new channel(s) in %s\n", len(newchannels), feed.Url)
}
func (m *MyHandler) ProcessItems(feed *rss.Feed, ch *rss.Channel, newitems []*rss.Item) {
fmt.Printf("%d new rad item(s) in %s\n", len(newitems), feed.Url)
}

BIN
testdata/handlers/handlers vendored Executable file

Binary file not shown.

BIN
testdata/testdata vendored Executable file

Binary file not shown.