Skip to content

Commit

Permalink
Refactor rsslay to proactively retrieve feeds (#21)
Browse files Browse the repository at this point in the history
Goals:

- Download data in the background to always be able to return data quickly.
  - Done.
- Make the project more composable e.g. separate out markdown and html processing from constructing nostr events.
  - Created clear adapter/app/domain layers. Didn't fix everything to keep the pull request small.
- Make testing easier.
  - Testing the domain layer is easy now.
- Eliminate the bug which causes the feed to not be updated under some circumstances.
  - Unclear. Maybe we need to remove caching altogether, it is not really useful right now.

What has been done:
- http handlers no longer contain core domain logic
- http handlers no longer have direct access to the database
- http handlers no longer misuse the same types for database queries and template rendering
- http handlers no longer have access to secrets
- created an adapters layer which encapsulates database access
- created an domain layer for data validation, more core logic needs to be moved there in the future
- created an application layer to glue everything together
- cleaned up duplicated calls to libraries to convert keys to the nip19 format in http handlers
- cleaned up error handling logic in http handlers
- saving feed definitions will no longer fail quietly in case of database errors
- fixed nonsensical error handling when saving feed definitions
- events are properly retrieved in a single place in the application layer and not from multiple places
- removed duplicate code
  • Loading branch information
boreq authored Sep 18, 2023
1 parent f9c763c commit 2d07a44
Show file tree
Hide file tree
Showing 24 changed files with 1,280 additions and 275 deletions.
211 changes: 64 additions & 147 deletions cmd/rsslay/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"database/sql"
_ "embed"
"errors"
"flag"
"fmt"
"log"
Expand All @@ -25,13 +24,18 @@ import (
"github.com/nbd-wtf/go-nostr/nip11"
"github.com/piraces/rsslay/internal/handlers"
"github.com/piraces/rsslay/pkg/custom_cache"
"github.com/piraces/rsslay/pkg/events"
"github.com/piraces/rsslay/pkg/feed"
"github.com/piraces/rsslay/pkg/metrics"
"github.com/piraces/rsslay/pkg/new/adapters"
pubsubadapters "github.com/piraces/rsslay/pkg/new/adapters/pubsub"
"github.com/piraces/rsslay/pkg/new/app"
"github.com/piraces/rsslay/pkg/new/domain"
"github.com/piraces/rsslay/pkg/new/ports"
pubsub2 "github.com/piraces/rsslay/pkg/new/ports/pubsub"
"github.com/piraces/rsslay/pkg/replayer"
"github.com/piraces/rsslay/scripts"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"golang.org/x/exp/slices"
)

// Command line flags.
Expand Down Expand Up @@ -63,13 +67,14 @@ type Relay struct {
RedisConnectionString string `envconfig:"REDIS_CONNECTION_STRING" default:""`

updates chan nostr.Event
lastEmitted sync.Map
db *sql.DB
healthCheck *health.Health
mutex sync.Mutex
routineQueueLength int
converterSelector *feed.ConverterSelector
cache *cache.Cache[string]
handler *handlers.Handler
store *store
}

var relayInstance = &Relay{
Expand Down Expand Up @@ -114,20 +119,20 @@ func (r *Relay) Name() string {

func (r *Relay) OnInitialized(s *relayer.Server) {
s.Router().Path("/").HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
handlers.HandleWebpage(writer, request, r.db, &r.MainDomainName)
r.handler.HandleWebpage(writer, request, &r.MainDomainName)
})
s.Router().Path("/create").HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
handlers.HandleCreateFeed(writer, request, r.db, &r.Secret, dsn)
r.handler.HandleCreateFeed(writer, request, dsn)
})
s.Router().Path("/search").HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
handlers.HandleSearch(writer, request, r.db)
r.handler.HandleSearch(writer, request)
})
s.Router().
PathPrefix(assetsDir).
Handler(http.StripPrefix(assetsDir, http.FileServer(http.Dir("./web/"+assetsDir))))
s.Router().Path("/healthz").HandlerFunc(relayInstance.healthCheck.HandlerFunc)
s.Router().Path("/api/feed").HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
handlers.HandleApiFeed(writer, request, r.db, &r.Secret, dsn)
r.handler.HandleApiFeed(writer, request, dsn)
})
s.Router().Path("/.well-known/nostr.json").HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
handlers.HandleNip05(writer, request, r.db, &r.OwnerPublicKey, &r.EnableAutoNIP05Registration)
Expand All @@ -136,6 +141,8 @@ func (r *Relay) OnInitialized(s *relayer.Server) {
}

func (r *Relay) Init() error {
ctx := context.TODO()

flag.Parse()
err := envconfig.Process("", r)
if err != nil {
Expand All @@ -145,54 +152,58 @@ func (r *Relay) Init() error {
}

ConfigureCache()
r.db = InitDatabase(r)

go r.UpdateListeningFilters()

longFormConverter := feed.NewLongFormConverter()
r.converterSelector = feed.NewConverterSelector(longFormConverter)
db := InitDatabase(r)
feedDefinitionStorage := adapters.NewFeedDefinitionStorage(db)
eventStorage := adapters.NewEventStorage()
receivedEventPubSub := pubsubadapters.NewReceivedEventPubSub()

return nil
}
secret, err := domain.NewSecret(r.Secret)
if err != nil {
return errors.Wrap(err, "error creating a secret")
}

func (r *Relay) UpdateListeningFilters() {
for {
time.Sleep(20 * time.Minute)
metrics.ListeningFiltersOps.Inc()

filters := relayer.GetListeningFilters()
log.Printf("[DEBUG] Checking for updates; %d filters active", len(filters))

var parsedEvents []replayer.EventWithPrivateKey
for _, filter := range filters {
if filter.Kinds == nil || slices.Contains(filter.Kinds, nostr.KindTextNote) {
for _, pubkey := range filter.Authors {
parsedFeed, entity := events.GetParsedFeedForPubKey(pubkey, r.db, r.DeleteFailingFeeds, r.NitterInstances)
if parsedFeed == nil {
continue
}

converter := r.converterSelector.Select(parsedFeed)

for _, item := range parsedFeed.Items {
defaultCreatedAt := time.Unix(time.Now().Unix(), 0)
evt := converter.Convert(pubkey, item, parsedFeed, defaultCreatedAt, entity.URL)
last, ok := r.lastEmitted.Load(entity.URL)
if last == nil {
last = uint32(time.Now().Unix())
}
if !ok || nostr.Timestamp(int64(last.(uint32))) < evt.CreatedAt {
_ = evt.Sign(entity.PrivateKey)
r.updates <- evt
r.lastEmitted.Store(entity.URL, last.(uint32))
parsedEvents = append(parsedEvents, replayer.EventWithPrivateKey{Event: &evt, PrivateKey: entity.PrivateKey})
}
}
}
}
}
r.AttemptReplayEvents(parsedEvents)
r.converterSelector = feed.NewConverterSelector(feed.NewLongFormConverter())

handlerCreateFeedDefinition := app.NewHandlerCreateFeedDefinition(secret, feedDefinitionStorage)
handlerUpdateFeeds := app.NewHandlerUpdateFeeds(
r.DeleteFailingFeeds,
r.NitterInstances,
r.EnableAutoNIP05Registration,
r.DefaultProfilePictureUrl,
r.MainDomainName,
db,
feedDefinitionStorage,
r.converterSelector,
eventStorage,
receivedEventPubSub,
)
handlerGetEvents := app.NewHandlerGetEvents(eventStorage)
handlerOnNewEventCreated := app.NewHandlerOnNewEventCreated(r.updates)
handlerGetTotalFeedCount := app.NewHandlerGetTotalFeedCount(feedDefinitionStorage)
handlerGetRandomFeeds := app.NewHandlerGetRandomFeeds(feedDefinitionStorage)
handlerSearchFeeds := app.NewHandlerSearchFeeds(feedDefinitionStorage)

updateFeedsTimer := ports.NewUpdateFeedsTimer(handlerUpdateFeeds)
receivedEventSubscriber := pubsub2.NewReceivedEventSubscriber(receivedEventPubSub, handlerOnNewEventCreated)

app := app.App{
CreateFeedDefinition: handlerCreateFeedDefinition,
UpdateFeeds: handlerUpdateFeeds,
GetEvents: handlerGetEvents,
GetTotalFeedCount: handlerGetTotalFeedCount,
GetRandomFeeds: handlerGetRandomFeeds,
SearchFeeds: handlerSearchFeeds,
}

r.db = db
r.handler = handlers.NewHandler(app)
r.store = newStore(app)

go updateFeedsTimer.Run(ctx)
go receivedEventSubscriber.Run(ctx)

return nil
}

func (r *Relay) AttemptReplayEvents(events []replayer.EventWithPrivateKey) {
Expand All @@ -217,101 +228,7 @@ func (r *Relay) AcceptEvent(_ *nostr.Event) bool {
}

func (r *Relay) Storage() relayer.Storage {
return store{r.db}
}

type store struct {
db *sql.DB
}

func (b store) Init() error { return nil }
func (b store) SaveEvent(_ *nostr.Event) error {
metrics.InvalidEventsRequests.Inc()
return errors.New("blocked: we don't accept any events")
}

func (b store) DeleteEvent(_, _ string) error {
metrics.InvalidEventsRequests.Inc()
return errors.New("blocked: we can't delete any events")
}

func (b store) QueryEvents(filter *nostr.Filter) ([]nostr.Event, error) {
var parsedEvents []nostr.Event
var eventsToReplay []replayer.EventWithPrivateKey

metrics.QueryEventsRequests.Inc()

if filter.IDs != nil || len(filter.Tags) > 0 {
return parsedEvents, nil
}

for _, pubkey := range filter.Authors {
parsedFeed, entity := events.GetParsedFeedForPubKey(pubkey, relayInstance.db, relayInstance.DeleteFailingFeeds, relayInstance.NitterInstances)

if parsedFeed == nil {
continue
}

converter := relayInstance.converterSelector.Select(parsedFeed)

if filter.Kinds == nil || slices.Contains(filter.Kinds, nostr.KindSetMetadata) {
evt := feed.EntryFeedToSetMetadata(pubkey, parsedFeed, entity.URL, relayInstance.EnableAutoNIP05Registration, relayInstance.DefaultProfilePictureUrl, relayInstance.MainDomainName)

if filter.Since != nil && evt.CreatedAt < *filter.Since {
continue
}
if filter.Until != nil && evt.CreatedAt > *filter.Until {
continue
}

_ = evt.Sign(entity.PrivateKey)
parsedEvents = append(parsedEvents, evt)
if relayInstance.ReplayToRelays {
eventsToReplay = append(eventsToReplay, replayer.EventWithPrivateKey{Event: &evt, PrivateKey: entity.PrivateKey})
}
}

if filter.Kinds == nil || slices.Contains(filter.Kinds, nostr.KindTextNote) || slices.Contains(filter.Kinds, feed.KindLongFormTextContent) {
var last uint32 = 0
for _, item := range parsedFeed.Items {
defaultCreatedAt := time.Unix(time.Now().Unix(), 0)
evt := converter.Convert(pubkey, item, parsedFeed, defaultCreatedAt, entity.URL)

// Feed need to have a date for each entry...
if evt.CreatedAt == nostr.Timestamp(defaultCreatedAt.Unix()) {
continue
}

if filter.Since != nil && evt.CreatedAt < *filter.Since {
continue
}
if filter.Until != nil && evt.CreatedAt > *filter.Until {
continue
}

_ = evt.Sign(entity.PrivateKey)

if !filter.Matches(&evt) {
continue
}

if evt.CreatedAt > nostr.Timestamp(int64(last)) {
last = uint32(evt.CreatedAt)
}

parsedEvents = append(parsedEvents, evt)
if relayInstance.ReplayToRelays {
eventsToReplay = append(eventsToReplay, replayer.EventWithPrivateKey{Event: &evt, PrivateKey: entity.PrivateKey})
}
}

relayInstance.lastEmitted.Store(entity.URL, last)
}
}

relayInstance.AttemptReplayEvents(eventsToReplay)

return parsedEvents, nil
return r.store
}

func (r *Relay) InjectEvents() chan nostr.Event {
Expand Down
51 changes: 51 additions & 0 deletions cmd/rsslay/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package main

import (
"github.com/nbd-wtf/go-nostr"
"github.com/piraces/rsslay/pkg/metrics"
"github.com/piraces/rsslay/pkg/new/app"
nostrdomain "github.com/piraces/rsslay/pkg/new/domain/nostr"
"github.com/pkg/errors"
)

type store struct {
app app.App
}

func newStore(app app.App) *store {
return &store{app: app}
}

func (b store) Init() error {
return nil
}

func (b store) SaveEvent(_ *nostr.Event) error {
metrics.InvalidEventsRequests.Inc()
return errors.New("blocked: we don't accept any events")
}

func (b store) DeleteEvent(_, _ string) error {
metrics.InvalidEventsRequests.Inc()
return errors.New("blocked: we can't delete any events")
}

func (b store) QueryEvents(libfilter *nostr.Filter) ([]nostr.Event, error) {
metrics.QueryEventsRequests.Inc()

filter := nostrdomain.NewFilter(libfilter)
events, err := b.app.GetEvents.Handle(filter)
if err != nil {
return nil, errors.Wrap(err, "error getting events")
}

return b.toEvents(events), nil
}

func (b store) toEvents(events []nostrdomain.Event) []nostr.Event {
var result []nostr.Event
for _, event := range events {
result = append(result, event.Libevent())
}
return result
}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ require (
github.com/gorilla/css v1.0.0 // indirect
github.com/gorilla/mux v1.8.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/hashicorp/logutils v1.0.0 h1:dLEQVugN8vlakKOUE3ihGLTZJRB4j+M2cdTm/ORI65Y=
github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64=
github.com/hellofresh/health-go/v5 v5.3.0 h1:T0tapAAuqVIiagRn0YQzFoIPAQek120/vQYPxpMMJ9M=
Expand Down
Loading

0 comments on commit 2d07a44

Please sign in to comment.