88
rss/job.go
Normal file
88
rss/job.go
Normal file
@@ -0,0 +1,88 @@
|
||||
package rss
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/l3uddz/nabarr/cmd/nabarr/pvr"
|
||||
"github.com/robfig/cron/v3"
|
||||
"time"
|
||||
)
|
||||
|
||||
func (c *Client) AddJob(feed feedItem) error {
|
||||
// prepare job
|
||||
if feed.Cron == "" {
|
||||
feed.Cron = "*/15 * * * *"
|
||||
}
|
||||
|
||||
if feed.CacheDuration == 0 {
|
||||
feed.CacheDuration = (24 * time.Hour) * 28
|
||||
}
|
||||
|
||||
// create job
|
||||
job := &rssJob{
|
||||
name: feed.Name,
|
||||
log: c.log.With().Str("feed_name", feed.Name).Logger(),
|
||||
url: feed.URL,
|
||||
pvrs: make(map[string]pvr.PVR, 0),
|
||||
|
||||
attempts: 0,
|
||||
errors: make([]error, 0),
|
||||
|
||||
cron: c.cron,
|
||||
cache: c.cache,
|
||||
cacheDuration: feed.CacheDuration,
|
||||
cacheFiltersHash: c.cacheFiltersHash,
|
||||
}
|
||||
|
||||
// add pvrs
|
||||
for _, p := range feed.Pvrs {
|
||||
po, exists := c.pvrs[p]
|
||||
if !exists {
|
||||
return fmt.Errorf("pvr object does not exist: %v", p)
|
||||
}
|
||||
job.pvrs[p] = po
|
||||
}
|
||||
|
||||
// schedule job
|
||||
if id, err := c.cron.AddJob(feed.Cron, cron.NewChain(
|
||||
cron.SkipIfStillRunning(cron.DiscardLogger)).Then(job),
|
||||
); err != nil {
|
||||
return fmt.Errorf("add job: %w", err)
|
||||
} else {
|
||||
job.jobID = id
|
||||
}
|
||||
|
||||
job.log.Info().Msg("Initialised")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (j *rssJob) Run() {
|
||||
// increase attempt counter
|
||||
j.attempts++
|
||||
|
||||
// run job
|
||||
err := j.process()
|
||||
|
||||
// handle job response
|
||||
switch {
|
||||
case err == nil:
|
||||
// job completed successfully
|
||||
j.attempts = 0
|
||||
j.errors = j.errors[:0]
|
||||
return
|
||||
|
||||
default:
|
||||
j.log.Warn().
|
||||
Err(err).
|
||||
Int("attempts", j.attempts).
|
||||
Msg("Unexpected error occurred")
|
||||
j.errors = append(j.errors, err)
|
||||
}
|
||||
|
||||
if j.attempts > 5 {
|
||||
j.log.Error().
|
||||
Errs("error", j.errors).
|
||||
Int("attempts", j.attempts).
|
||||
Msg("Consecutive errors occurred while refreshing rss, job has been stopped...")
|
||||
j.cron.Remove(j.jobID)
|
||||
}
|
||||
}
|
||||
136
rss/process.go
Normal file
136
rss/process.go
Normal file
@@ -0,0 +1,136 @@
|
||||
package rss
|
||||
|
||||
import (
|
||||
"encoding/xml"
|
||||
"fmt"
|
||||
"github.com/l3uddz/nabarr/media"
|
||||
"github.com/lucperkins/rek"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
func (j *rssJob) process() error {
|
||||
// retrieve feed items
|
||||
j.log.Debug().Msg("Refreshing")
|
||||
items, err := j.getFeed()
|
||||
if err != nil {
|
||||
return fmt.Errorf("get feed: %w", err)
|
||||
}
|
||||
|
||||
// add feed items to pvrs
|
||||
if len(items) == 0 {
|
||||
j.log.Debug().Msg("Refreshed, no items to queue")
|
||||
return nil
|
||||
}
|
||||
|
||||
for p, _ := range items {
|
||||
j.queueItemWithPvrs(&items[p])
|
||||
}
|
||||
|
||||
j.log.Info().
|
||||
Int("count", len(items)).
|
||||
Msg("Queued items")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (j *rssJob) queueItemWithPvrs(item *media.FeedItem) {
|
||||
for _, pvr := range j.pvrs {
|
||||
switch {
|
||||
case item.TvdbId != "" && pvr.Type() == "sonarr":
|
||||
// tvdbId is present, queue with sonarr
|
||||
pvr.QueueFeedItem(item)
|
||||
case item.ImdbId != "" && pvr.Type() == "radarr":
|
||||
// imdbId is present, queue with radarr
|
||||
pvr.QueueFeedItem(item)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (j *rssJob) getFeed() ([]media.FeedItem, error) {
|
||||
// request feed
|
||||
res, err := rek.Get(j.url, rek.Timeout(30*time.Minute))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("request feed: %w", err)
|
||||
}
|
||||
defer res.Body().Close()
|
||||
|
||||
// validate response
|
||||
if res.StatusCode() != 200 {
|
||||
return nil, fmt.Errorf("validate response: %s", res.Status())
|
||||
}
|
||||
|
||||
// decode response
|
||||
b := new(media.Rss)
|
||||
if err := xml.NewDecoder(res.Body()).Decode(b); err != nil {
|
||||
return nil, fmt.Errorf("decode feed: %w", err)
|
||||
}
|
||||
|
||||
// prepare result
|
||||
items := make([]media.FeedItem, 0)
|
||||
if len(b.Channel.Items) < 1 {
|
||||
return items, nil
|
||||
}
|
||||
|
||||
// sort response items
|
||||
sort.SliceStable(b.Channel.Items, func(i, j int) bool {
|
||||
return b.Channel.Items[i].PubDate.After(b.Channel.Items[j].PubDate.Time)
|
||||
})
|
||||
|
||||
// process feed items
|
||||
for p, i := range b.Channel.Items {
|
||||
// ignore items
|
||||
if i.GUID == "" {
|
||||
// items must always have a guid
|
||||
continue
|
||||
}
|
||||
|
||||
// guid seen before?
|
||||
cacheKey := fmt.Sprintf("%s_%s", j.name, i.GUID)
|
||||
if cacheValue, err := j.cache.Get(j.name, cacheKey); err == nil {
|
||||
if string(cacheValue) == j.cacheFiltersHash {
|
||||
// item has been seen before and the filters have not changed
|
||||
continue
|
||||
}
|
||||
// item has been seen, however the filters have changed since it was last seen, re-process
|
||||
}
|
||||
|
||||
// process feed item attributes
|
||||
for _, a := range i.Attributes {
|
||||
switch strings.ToLower(a.Name) {
|
||||
case "language":
|
||||
b.Channel.Items[p].Language = a.Value
|
||||
case "tvdb", "tvdbid":
|
||||
b.Channel.Items[p].TvdbId = a.Value
|
||||
case "imdb", "imdbid":
|
||||
if strings.HasPrefix(a.Value, "tt") {
|
||||
b.Channel.Items[p].ImdbId = a.Value
|
||||
} else {
|
||||
b.Channel.Items[p].ImdbId = fmt.Sprintf("tt%s", a.Value)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// validate item
|
||||
switch {
|
||||
case b.Channel.Items[p].TvdbId == "", b.Channel.Items[p].TvdbId == "0":
|
||||
continue
|
||||
case b.Channel.Items[p].ImdbId == "":
|
||||
continue
|
||||
}
|
||||
|
||||
// add validated item for processing
|
||||
b.Channel.Items[p].Feed = j.name
|
||||
items = append(items, b.Channel.Items[p])
|
||||
|
||||
// add item to temp cache (to prevent re-processing)
|
||||
if err := j.cache.Put(j.name, cacheKey, []byte(j.cacheFiltersHash), j.cacheDuration); err != nil {
|
||||
j.log.Error().
|
||||
Err(err).
|
||||
Str("guid", i.GUID).
|
||||
Msg("Failed storing item in temp cache")
|
||||
}
|
||||
}
|
||||
|
||||
return items, nil
|
||||
}
|
||||
62
rss/rss.go
Normal file
62
rss/rss.go
Normal file
@@ -0,0 +1,62 @@
|
||||
package rss
|
||||
|
||||
import (
|
||||
"github.com/l3uddz/nabarr/cache"
|
||||
"github.com/l3uddz/nabarr/cmd/nabarr/pvr"
|
||||
"github.com/l3uddz/nabarr/logger"
|
||||
"github.com/lefelys/state"
|
||||
"github.com/robfig/cron/v3"
|
||||
"github.com/rs/zerolog"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Client struct {
|
||||
cron *cron.Cron
|
||||
cache *cache.Client
|
||||
cacheFiltersHash string
|
||||
pvrs map[string]pvr.PVR
|
||||
|
||||
log zerolog.Logger
|
||||
}
|
||||
|
||||
func New(c Config, cc *cache.Client, cfh string, pvrs map[string]pvr.PVR) *Client {
|
||||
return &Client{
|
||||
cron: cron.New(cron.WithChain(
|
||||
cron.Recover(cron.DefaultLogger),
|
||||
)),
|
||||
cache: cc,
|
||||
cacheFiltersHash: cfh,
|
||||
pvrs: pvrs,
|
||||
|
||||
log: logger.New(c.Verbosity).With().Logger(),
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) Start() state.State {
|
||||
c.cron.Start()
|
||||
|
||||
st, tail := state.WithShutdown()
|
||||
ticker := time.NewTicker(1 * time.Second)
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-tail.End():
|
||||
ticker.Stop()
|
||||
|
||||
// shutdown cron
|
||||
ctx := c.cron.Stop()
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-time.After(5 * time.Second):
|
||||
}
|
||||
|
||||
tail.Done()
|
||||
return
|
||||
case <-ticker.C:
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return st
|
||||
}
|
||||
39
rss/struct.go
Normal file
39
rss/struct.go
Normal file
@@ -0,0 +1,39 @@
|
||||
package rss
|
||||
|
||||
import (
|
||||
"github.com/l3uddz/nabarr/cache"
|
||||
"github.com/l3uddz/nabarr/cmd/nabarr/pvr"
|
||||
"github.com/robfig/cron/v3"
|
||||
"github.com/rs/zerolog"
|
||||
"time"
|
||||
)
|
||||
|
||||
type feedItem struct {
|
||||
Name string `yaml:"name"`
|
||||
URL string `yaml:"url"`
|
||||
Cron string `yaml:"cron"`
|
||||
CacheDuration time.Duration `yaml:"cache_duration"`
|
||||
Pvrs []string `yaml:"pvrs"`
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
Feeds []feedItem `yaml:"feeds"`
|
||||
|
||||
Verbosity string `yaml:"verbosity,omitempty"`
|
||||
}
|
||||
|
||||
type rssJob struct {
|
||||
name string
|
||||
log zerolog.Logger
|
||||
url string
|
||||
pvrs map[string]pvr.PVR
|
||||
|
||||
attempts int
|
||||
errors []error
|
||||
|
||||
cron *cron.Cron
|
||||
cache *cache.Client
|
||||
cacheDuration time.Duration
|
||||
cacheFiltersHash string
|
||||
jobID cron.EntryID
|
||||
}
|
||||
Reference in New Issue
Block a user