From 2d07a44d888c56d6078fab162b8c28f2b32451ed Mon Sep 17 00:00:00 2001 From: Filip Borkiewicz Date: Mon, 18 Sep 2023 15:11:16 +0200 Subject: [PATCH] Refactor rsslay to proactively retrieve feeds (#21) Goals: - Download data in the background to always be able to return data quickly. - Done. - Make the project more composable e.g. separate out markdown and html processing from constructing nostr events. - Created clear adapter/app/domain layers. Didn't fix everything to keep the pull request small. - Make testing easier. - Testing the domain layer is easy now. - Eliminate the bug which causes the feed to not be updated under some circumstances. - Unclear. Maybe we need to remove caching altogether, it is not really useful right now. What has been done: - http handlers no longer contain core domain logic - http handlers no longer have direct access to the database - http handlers no longer misuse the same types for database queries and template rendering - http handlers no longer have access to secrets - created an adapters layer which encapsulates database access - created an domain layer for data validation, more core logic needs to be moved there in the future - created an application layer to glue everything together - cleaned up duplicated calls to libraries to convert keys to the nip19 format in http handlers - cleaned up error handling logic in http handlers - saving feed definitions will no longer fail quietly in case of database errors - fixed nonsensical error handling when saving feed definitions - events are properly retrieved in a single place in the application layer and not from multiple places - removed duplicate code --- cmd/rsslay/main.go | 211 +++++------------ cmd/rsslay/store.go | 51 ++++ go.mod | 2 + go.sum | 4 + internal/handlers/handlers.go | 198 ++++++---------- pkg/metrics/registries.go | 8 +- pkg/new/adapters/event_storage.go | 54 +++++ pkg/new/adapters/feed_definition_storage.go | 147 ++++++++++++ .../adapters/pubsub/event_created_pubsub.go | 25 ++ pkg/new/adapters/pubsub/pubsub.go | 67 ++++++ pkg/new/app/app.go | 39 ++++ pkg/new/app/handler_create_feed_definition.go | 77 ++++++ pkg/new/app/handler_get_events.go | 17 ++ pkg/new/app/handler_get_random_feeds.go | 23 ++ pkg/new/app/handler_get_total_event_count.go | 15 ++ pkg/new/app/handler_on_new_event_created.go | 30 +++ pkg/new/app/handler_update_feeds.go | 219 ++++++++++++++++++ pkg/new/app/search_feeds.go | 23 ++ pkg/new/domain/domain.go | 19 ++ pkg/new/domain/feed/feed.go | 59 +++++ pkg/new/domain/nostr/nostr.go | 156 +++++++++++++ pkg/new/domain/nostr/nostr_test.go | 41 ++++ pkg/new/ports/pubsub/received_events.go | 36 +++ pkg/new/ports/timer_update_feeds.go | 34 +++ 24 files changed, 1280 insertions(+), 275 deletions(-) create mode 100644 cmd/rsslay/store.go create mode 100644 pkg/new/adapters/event_storage.go create mode 100644 pkg/new/adapters/feed_definition_storage.go create mode 100644 pkg/new/adapters/pubsub/event_created_pubsub.go create mode 100644 pkg/new/adapters/pubsub/pubsub.go create mode 100644 pkg/new/app/app.go create mode 100644 pkg/new/app/handler_create_feed_definition.go create mode 100644 pkg/new/app/handler_get_events.go create mode 100644 pkg/new/app/handler_get_random_feeds.go create mode 100644 pkg/new/app/handler_get_total_event_count.go create mode 100644 pkg/new/app/handler_on_new_event_created.go create mode 100644 pkg/new/app/handler_update_feeds.go create mode 100644 pkg/new/app/search_feeds.go create mode 100644 pkg/new/domain/domain.go create mode 100644 pkg/new/domain/feed/feed.go create mode 100644 pkg/new/domain/nostr/nostr.go create mode 100644 pkg/new/domain/nostr/nostr_test.go create mode 100644 pkg/new/ports/pubsub/received_events.go create mode 100644 pkg/new/ports/timer_update_feeds.go diff --git a/cmd/rsslay/main.go b/cmd/rsslay/main.go index 86edd15..6a0a18c 100644 --- a/cmd/rsslay/main.go +++ b/cmd/rsslay/main.go @@ -4,7 +4,6 @@ import ( "context" "database/sql" _ "embed" - "errors" "flag" "fmt" "log" @@ -25,13 +24,18 @@ import ( "github.com/nbd-wtf/go-nostr/nip11" "github.com/piraces/rsslay/internal/handlers" "github.com/piraces/rsslay/pkg/custom_cache" - "github.com/piraces/rsslay/pkg/events" "github.com/piraces/rsslay/pkg/feed" "github.com/piraces/rsslay/pkg/metrics" + "github.com/piraces/rsslay/pkg/new/adapters" + pubsubadapters "github.com/piraces/rsslay/pkg/new/adapters/pubsub" + "github.com/piraces/rsslay/pkg/new/app" + "github.com/piraces/rsslay/pkg/new/domain" + "github.com/piraces/rsslay/pkg/new/ports" + pubsub2 "github.com/piraces/rsslay/pkg/new/ports/pubsub" "github.com/piraces/rsslay/pkg/replayer" "github.com/piraces/rsslay/scripts" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus/promhttp" - "golang.org/x/exp/slices" ) // Command line flags. @@ -63,13 +67,14 @@ type Relay struct { RedisConnectionString string `envconfig:"REDIS_CONNECTION_STRING" default:""` updates chan nostr.Event - lastEmitted sync.Map db *sql.DB healthCheck *health.Health mutex sync.Mutex routineQueueLength int converterSelector *feed.ConverterSelector cache *cache.Cache[string] + handler *handlers.Handler + store *store } var relayInstance = &Relay{ @@ -114,20 +119,20 @@ func (r *Relay) Name() string { func (r *Relay) OnInitialized(s *relayer.Server) { s.Router().Path("/").HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { - handlers.HandleWebpage(writer, request, r.db, &r.MainDomainName) + r.handler.HandleWebpage(writer, request, &r.MainDomainName) }) s.Router().Path("/create").HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { - handlers.HandleCreateFeed(writer, request, r.db, &r.Secret, dsn) + r.handler.HandleCreateFeed(writer, request, dsn) }) s.Router().Path("/search").HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { - handlers.HandleSearch(writer, request, r.db) + r.handler.HandleSearch(writer, request) }) s.Router(). PathPrefix(assetsDir). Handler(http.StripPrefix(assetsDir, http.FileServer(http.Dir("./web/"+assetsDir)))) s.Router().Path("/healthz").HandlerFunc(relayInstance.healthCheck.HandlerFunc) s.Router().Path("/api/feed").HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { - handlers.HandleApiFeed(writer, request, r.db, &r.Secret, dsn) + r.handler.HandleApiFeed(writer, request, dsn) }) s.Router().Path("/.well-known/nostr.json").HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { handlers.HandleNip05(writer, request, r.db, &r.OwnerPublicKey, &r.EnableAutoNIP05Registration) @@ -136,6 +141,8 @@ func (r *Relay) OnInitialized(s *relayer.Server) { } func (r *Relay) Init() error { + ctx := context.TODO() + flag.Parse() err := envconfig.Process("", r) if err != nil { @@ -145,54 +152,58 @@ func (r *Relay) Init() error { } ConfigureCache() - r.db = InitDatabase(r) - - go r.UpdateListeningFilters() - longFormConverter := feed.NewLongFormConverter() - r.converterSelector = feed.NewConverterSelector(longFormConverter) + db := InitDatabase(r) + feedDefinitionStorage := adapters.NewFeedDefinitionStorage(db) + eventStorage := adapters.NewEventStorage() + receivedEventPubSub := pubsubadapters.NewReceivedEventPubSub() - return nil -} + secret, err := domain.NewSecret(r.Secret) + if err != nil { + return errors.Wrap(err, "error creating a secret") + } -func (r *Relay) UpdateListeningFilters() { - for { - time.Sleep(20 * time.Minute) - metrics.ListeningFiltersOps.Inc() - - filters := relayer.GetListeningFilters() - log.Printf("[DEBUG] Checking for updates; %d filters active", len(filters)) - - var parsedEvents []replayer.EventWithPrivateKey - for _, filter := range filters { - if filter.Kinds == nil || slices.Contains(filter.Kinds, nostr.KindTextNote) { - for _, pubkey := range filter.Authors { - parsedFeed, entity := events.GetParsedFeedForPubKey(pubkey, r.db, r.DeleteFailingFeeds, r.NitterInstances) - if parsedFeed == nil { - continue - } - - converter := r.converterSelector.Select(parsedFeed) - - for _, item := range parsedFeed.Items { - defaultCreatedAt := time.Unix(time.Now().Unix(), 0) - evt := converter.Convert(pubkey, item, parsedFeed, defaultCreatedAt, entity.URL) - last, ok := r.lastEmitted.Load(entity.URL) - if last == nil { - last = uint32(time.Now().Unix()) - } - if !ok || nostr.Timestamp(int64(last.(uint32))) < evt.CreatedAt { - _ = evt.Sign(entity.PrivateKey) - r.updates <- evt - r.lastEmitted.Store(entity.URL, last.(uint32)) - parsedEvents = append(parsedEvents, replayer.EventWithPrivateKey{Event: &evt, PrivateKey: entity.PrivateKey}) - } - } - } - } - } - r.AttemptReplayEvents(parsedEvents) + r.converterSelector = feed.NewConverterSelector(feed.NewLongFormConverter()) + + handlerCreateFeedDefinition := app.NewHandlerCreateFeedDefinition(secret, feedDefinitionStorage) + handlerUpdateFeeds := app.NewHandlerUpdateFeeds( + r.DeleteFailingFeeds, + r.NitterInstances, + r.EnableAutoNIP05Registration, + r.DefaultProfilePictureUrl, + r.MainDomainName, + db, + feedDefinitionStorage, + r.converterSelector, + eventStorage, + receivedEventPubSub, + ) + handlerGetEvents := app.NewHandlerGetEvents(eventStorage) + handlerOnNewEventCreated := app.NewHandlerOnNewEventCreated(r.updates) + handlerGetTotalFeedCount := app.NewHandlerGetTotalFeedCount(feedDefinitionStorage) + handlerGetRandomFeeds := app.NewHandlerGetRandomFeeds(feedDefinitionStorage) + handlerSearchFeeds := app.NewHandlerSearchFeeds(feedDefinitionStorage) + + updateFeedsTimer := ports.NewUpdateFeedsTimer(handlerUpdateFeeds) + receivedEventSubscriber := pubsub2.NewReceivedEventSubscriber(receivedEventPubSub, handlerOnNewEventCreated) + + app := app.App{ + CreateFeedDefinition: handlerCreateFeedDefinition, + UpdateFeeds: handlerUpdateFeeds, + GetEvents: handlerGetEvents, + GetTotalFeedCount: handlerGetTotalFeedCount, + GetRandomFeeds: handlerGetRandomFeeds, + SearchFeeds: handlerSearchFeeds, } + + r.db = db + r.handler = handlers.NewHandler(app) + r.store = newStore(app) + + go updateFeedsTimer.Run(ctx) + go receivedEventSubscriber.Run(ctx) + + return nil } func (r *Relay) AttemptReplayEvents(events []replayer.EventWithPrivateKey) { @@ -217,101 +228,7 @@ func (r *Relay) AcceptEvent(_ *nostr.Event) bool { } func (r *Relay) Storage() relayer.Storage { - return store{r.db} -} - -type store struct { - db *sql.DB -} - -func (b store) Init() error { return nil } -func (b store) SaveEvent(_ *nostr.Event) error { - metrics.InvalidEventsRequests.Inc() - return errors.New("blocked: we don't accept any events") -} - -func (b store) DeleteEvent(_, _ string) error { - metrics.InvalidEventsRequests.Inc() - return errors.New("blocked: we can't delete any events") -} - -func (b store) QueryEvents(filter *nostr.Filter) ([]nostr.Event, error) { - var parsedEvents []nostr.Event - var eventsToReplay []replayer.EventWithPrivateKey - - metrics.QueryEventsRequests.Inc() - - if filter.IDs != nil || len(filter.Tags) > 0 { - return parsedEvents, nil - } - - for _, pubkey := range filter.Authors { - parsedFeed, entity := events.GetParsedFeedForPubKey(pubkey, relayInstance.db, relayInstance.DeleteFailingFeeds, relayInstance.NitterInstances) - - if parsedFeed == nil { - continue - } - - converter := relayInstance.converterSelector.Select(parsedFeed) - - if filter.Kinds == nil || slices.Contains(filter.Kinds, nostr.KindSetMetadata) { - evt := feed.EntryFeedToSetMetadata(pubkey, parsedFeed, entity.URL, relayInstance.EnableAutoNIP05Registration, relayInstance.DefaultProfilePictureUrl, relayInstance.MainDomainName) - - if filter.Since != nil && evt.CreatedAt < *filter.Since { - continue - } - if filter.Until != nil && evt.CreatedAt > *filter.Until { - continue - } - - _ = evt.Sign(entity.PrivateKey) - parsedEvents = append(parsedEvents, evt) - if relayInstance.ReplayToRelays { - eventsToReplay = append(eventsToReplay, replayer.EventWithPrivateKey{Event: &evt, PrivateKey: entity.PrivateKey}) - } - } - - if filter.Kinds == nil || slices.Contains(filter.Kinds, nostr.KindTextNote) || slices.Contains(filter.Kinds, feed.KindLongFormTextContent) { - var last uint32 = 0 - for _, item := range parsedFeed.Items { - defaultCreatedAt := time.Unix(time.Now().Unix(), 0) - evt := converter.Convert(pubkey, item, parsedFeed, defaultCreatedAt, entity.URL) - - // Feed need to have a date for each entry... - if evt.CreatedAt == nostr.Timestamp(defaultCreatedAt.Unix()) { - continue - } - - if filter.Since != nil && evt.CreatedAt < *filter.Since { - continue - } - if filter.Until != nil && evt.CreatedAt > *filter.Until { - continue - } - - _ = evt.Sign(entity.PrivateKey) - - if !filter.Matches(&evt) { - continue - } - - if evt.CreatedAt > nostr.Timestamp(int64(last)) { - last = uint32(evt.CreatedAt) - } - - parsedEvents = append(parsedEvents, evt) - if relayInstance.ReplayToRelays { - eventsToReplay = append(eventsToReplay, replayer.EventWithPrivateKey{Event: &evt, PrivateKey: entity.PrivateKey}) - } - } - - relayInstance.lastEmitted.Store(entity.URL, last) - } - } - - relayInstance.AttemptReplayEvents(eventsToReplay) - - return parsedEvents, nil + return r.store } func (r *Relay) InjectEvents() chan nostr.Event { diff --git a/cmd/rsslay/store.go b/cmd/rsslay/store.go new file mode 100644 index 0000000..1f7bf9f --- /dev/null +++ b/cmd/rsslay/store.go @@ -0,0 +1,51 @@ +package main + +import ( + "github.com/nbd-wtf/go-nostr" + "github.com/piraces/rsslay/pkg/metrics" + "github.com/piraces/rsslay/pkg/new/app" + nostrdomain "github.com/piraces/rsslay/pkg/new/domain/nostr" + "github.com/pkg/errors" +) + +type store struct { + app app.App +} + +func newStore(app app.App) *store { + return &store{app: app} +} + +func (b store) Init() error { + return nil +} + +func (b store) SaveEvent(_ *nostr.Event) error { + metrics.InvalidEventsRequests.Inc() + return errors.New("blocked: we don't accept any events") +} + +func (b store) DeleteEvent(_, _ string) error { + metrics.InvalidEventsRequests.Inc() + return errors.New("blocked: we can't delete any events") +} + +func (b store) QueryEvents(libfilter *nostr.Filter) ([]nostr.Event, error) { + metrics.QueryEventsRequests.Inc() + + filter := nostrdomain.NewFilter(libfilter) + events, err := b.app.GetEvents.Handle(filter) + if err != nil { + return nil, errors.Wrap(err, "error getting events") + } + + return b.toEvents(events), nil +} + +func (b store) toEvents(events []nostrdomain.Event) []nostr.Event { + var result []nostr.Event + for _, event := range events { + result = append(result, event.Libevent()) + } + return result +} diff --git a/go.mod b/go.mod index 8e8a848..c1a9a03 100644 --- a/go.mod +++ b/go.mod @@ -47,6 +47,8 @@ require ( github.com/gorilla/css v1.0.0 // indirect github.com/gorilla/mux v1.8.0 // indirect github.com/gorilla/websocket v1.5.0 // indirect + github.com/hashicorp/errwrap v1.0.0 // indirect + github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/mailru/easyjson v0.7.7 // indirect diff --git a/go.sum b/go.sum index 7f2d435..7791c69 100644 --- a/go.sum +++ b/go.sum @@ -109,6 +109,10 @@ github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/hashicorp/logutils v1.0.0 h1:dLEQVugN8vlakKOUE3ihGLTZJRB4j+M2cdTm/ORI65Y= github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64= github.com/hellofresh/health-go/v5 v5.3.0 h1:T0tapAAuqVIiagRn0YQzFoIPAQek120/vQYPxpMMJ9M= diff --git a/internal/handlers/handlers.go b/internal/handlers/handlers.go index 4341bf4..4c2a928 100644 --- a/internal/handlers/handlers.go +++ b/internal/handlers/handlers.go @@ -9,17 +9,14 @@ import ( "net/url" "os" "path/filepath" - "strings" _ "github.com/mattn/go-sqlite3" - "github.com/nbd-wtf/go-nostr" "github.com/nbd-wtf/go-nostr/nip05" - "github.com/nbd-wtf/go-nostr/nip19" "github.com/piraces/rsslay/pkg/feed" - "github.com/piraces/rsslay/pkg/helpers" "github.com/piraces/rsslay/pkg/metrics" + "github.com/piraces/rsslay/pkg/new/app" + domainfeed "github.com/piraces/rsslay/pkg/new/domain/feed" "github.com/piraces/rsslay/web/templates" - "github.com/prometheus/client_golang/prometheus" ) var t = template.Must(template.ParseFS(templates.Templates, "*.tmpl")) @@ -40,54 +37,53 @@ type PageData struct { MainDomainName string } -func HandleWebpage(w http.ResponseWriter, r *http.Request, db *sql.DB, mainDomainName *string) { +type FeedDefnitionStorage interface { + ListRandom(n int) ([]domainfeed.FeedDefinition, error) + Search() +} + +type Handler struct { + app app.App +} + +func NewHandler( + app app.App, +) *Handler { + return &Handler{ + app: app, + } +} + +func (f *Handler) HandleWebpage(w http.ResponseWriter, r *http.Request, mainDomainName *string) { mustRedirect := handleOtherRegion(w, r) if mustRedirect { return } metrics.IndexRequests.Inc() - var count uint64 - row := db.QueryRow(`SELECT count(*) FROM feeds`) - err := row.Scan(&count) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - var items []Entry - rows, err := db.Query(`SELECT publickey, url FROM feeds ORDER BY RANDOM() LIMIT 50`) + totalCount, err := f.app.GetTotalFeedCount.Handle() if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } - for rows.Next() { - var entry Entry - if err := rows.Scan(&entry.PubKey, &entry.Url); err != nil { - log.Printf("[ERROR] failed to scan row iterating feeds: %v", err) - metrics.AppErrors.With(prometheus.Labels{"type": "SQL_SCAN"}).Inc() - continue - } - - entry.NPubKey, _ = nip19.EncodePublicKey(entry.PubKey) - items = append(items, entry) - } - if err := rows.Close(); err != nil { + randomFeedDefinitions, err := f.app.GetRandomFeeds.Handle(50) + if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } data := PageData{ - Count: count, - Entries: items, + Count: uint64(totalCount), + Entries: toEntries(randomFeedDefinitions), MainDomainName: *mainDomainName, } _ = t.ExecuteTemplate(w, "index.html.tmpl", data) } -func HandleSearch(w http.ResponseWriter, r *http.Request, db *sql.DB) { +func (f *Handler) HandleSearch(w http.ResponseWriter, r *http.Request) { mustRedirect := handleOtherRegion(w, r) if mustRedirect { return @@ -100,60 +96,42 @@ func HandleSearch(w http.ResponseWriter, r *http.Request, db *sql.DB) { return } - var count uint64 - row := db.QueryRow(`SELECT count(*) FROM feeds`) - err := row.Scan(&count) + totalCount, err := f.app.GetTotalFeedCount.Handle() if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } - var items []Entry - rows, err := db.Query(`SELECT publickey, url FROM feeds WHERE url like '%' || $1 || '%' LIMIT 50`, query) + feedDefinitions, err := f.app.SearchFeeds.Handle(query, 50) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } - for rows.Next() { - var entry Entry - if err := rows.Scan(&entry.PubKey, &entry.Url); err != nil { - log.Printf("[ERROR] failed to scan row iterating feeds searching: %v", err) - metrics.AppErrors.With(prometheus.Labels{"type": "SQL_SCAN"}).Inc() - continue - } - - entry.NPubKey, _ = nip19.EncodePublicKey(entry.PubKey) - items = append(items, entry) - } - if err := rows.Close(); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - data := PageData{ - Count: count, - FilteredCount: uint64(len(items)), - Entries: items, + Count: uint64(totalCount), + FilteredCount: uint64(len(feedDefinitions)), + Entries: toEntries(feedDefinitions), } _ = t.ExecuteTemplate(w, "search.html.tmpl", data) } -func HandleCreateFeed(w http.ResponseWriter, r *http.Request, db *sql.DB, secret *string, dsn *string) { +func (f *Handler) HandleCreateFeed(w http.ResponseWriter, r *http.Request, dsn *string) { mustRedirect := handleRedirectToPrimaryNode(w, dsn) if mustRedirect { return } metrics.CreateRequests.Inc() - entry := createFeedEntry(r, db, secret) + + entry := f.createFeed(r) _ = t.ExecuteTemplate(w, "created.html.tmpl", entry) } -func HandleApiFeed(w http.ResponseWriter, r *http.Request, db *sql.DB, secret *string, dsn *string) { +func (f *Handler) HandleApiFeed(w http.ResponseWriter, r *http.Request, dsn *string) { if r.Method == http.MethodGet || r.Method == http.MethodPost { - handleCreateFeedEntry(w, r, db, secret, dsn) + f.handleCreateFeedEntry(w, r, dsn) } else { http.Error(w, "Method not supported", http.StatusMethodNotAllowed) } @@ -191,14 +169,16 @@ func HandleNip05(w http.ResponseWriter, r *http.Request, db *sql.DB, ownerPubKey _, _ = w.Write(response) } -func handleCreateFeedEntry(w http.ResponseWriter, r *http.Request, db *sql.DB, secret *string, dsn *string) { +func (f *Handler) handleCreateFeedEntry(w http.ResponseWriter, r *http.Request, dsn *string) { mustRedirect := handleRedirectToPrimaryNode(w, dsn) if mustRedirect { return } metrics.CreateRequestsAPI.Inc() - entry := createFeedEntry(r, db, secret) + + entry := f.createFeed(r) + w.Header().Set("Content-Type", "application/json") if entry.ErrorCode >= 400 { @@ -211,6 +191,30 @@ func handleCreateFeedEntry(w http.ResponseWriter, r *http.Request, db *sql.DB, s _, _ = w.Write(response) } +func (f *Handler) createFeed(r *http.Request) Entry { + urlParam := r.URL.Query().Get("url") + + address, err := domainfeed.NewAddress(urlParam) + if err != nil { + return Entry{ + Error: true, + ErrorMessage: err.Error(), + ErrorCode: http.StatusBadRequest, + } + } + + feedDefinition, err := f.app.CreateFeedDefinition.Handle(address) + if err != nil { + return Entry{ + Error: true, + ErrorMessage: err.Error(), + ErrorCode: http.StatusInternalServerError, + } + } + + return toEntry(*feedDefinition) +} + func handleOtherRegion(w http.ResponseWriter, r *http.Request) bool { // If a different region is specified, redirect to that region. if region := r.URL.Query().Get("region"); region != "" && region != os.Getenv("FLY_REGION") { @@ -238,72 +242,18 @@ func handleRedirectToPrimaryNode(w http.ResponseWriter, dsn *string) bool { return false } -func createFeedEntry(r *http.Request, db *sql.DB, secret *string) *Entry { - urlParam := r.URL.Query().Get("url") - entry := Entry{ - Error: false, - } - - if !helpers.IsValidHttpUrl(urlParam) { - log.Printf("[DEBUG] tried to create feed from invalid feed url '%q' skipping...", urlParam) - entry.ErrorCode = http.StatusBadRequest - entry.Error = true - entry.ErrorMessage = "Invalid URL provided (must be in absolute format and with https or https scheme)..." - return &entry - } - - feedUrl := feed.GetFeedURL(urlParam) - if feedUrl == "" { - entry.ErrorCode = http.StatusBadRequest - entry.Error = true - entry.ErrorMessage = "Could not find a feed URL in there..." - return &entry +func toEntries(definitions []*domainfeed.FeedDefinition) []Entry { + var entries []Entry + for _, definition := range definitions { + entries = append(entries, toEntry(*definition)) } - - parsedFeed, err := feed.ParseFeed(feedUrl) - if err != nil { - entry.ErrorCode = http.StatusBadRequest - entry.Error = true - entry.ErrorMessage = "Bad feed: " + err.Error() - return &entry - } - - sk := feed.PrivateKeyFromFeed(feedUrl, *secret) - publicKey, err := nostr.GetPublicKey(sk) - if err != nil { - entry.ErrorCode = http.StatusInternalServerError - entry.Error = true - entry.ErrorMessage = "bad private key: " + err.Error() - return &entry - } - - publicKey = strings.TrimSpace(publicKey) - isNitterFeed := strings.Contains(parsedFeed.Description, "Twitter feed") - defer insertFeed(err, feedUrl, publicKey, sk, isNitterFeed, db) - - entry.Url = feedUrl - entry.PubKey = publicKey - entry.NPubKey, _ = nip19.EncodePublicKey(publicKey) - return &entry + return entries } -func insertFeed(err error, feedUrl string, publicKey string, sk string, nitter bool, db *sql.DB) { - row := db.QueryRow("SELECT privatekey, url FROM feeds WHERE publickey=$1", publicKey) - - var entity feed.Entity - err = row.Scan(&entity.PrivateKey, &entity.URL) - if err != nil && err == sql.ErrNoRows { - log.Printf("[DEBUG] not found feed at url %q as publicKey %s", feedUrl, publicKey) - if _, err := db.Exec(`INSERT INTO feeds (publickey, privatekey, url, nitter) VALUES (?, ?, ?, ?)`, publicKey, sk, feedUrl, nitter); err != nil { - log.Printf("[ERROR] failure: %v", err) - metrics.AppErrors.With(prometheus.Labels{"type": "SQL_WRITE"}).Inc() - } else { - log.Printf("[DEBUG] saved feed at url %q as publicKey %s", feedUrl, publicKey) - } - } else if err != nil { - metrics.AppErrors.With(prometheus.Labels{"type": "SQL_SCAN"}).Inc() - log.Fatalf("[ERROR] failed when trying to retrieve row with pubkey '%s': %v", publicKey, err) - } else { - log.Printf("[DEBUG] found feed at url %q as publicKey %s", feedUrl, publicKey) +func toEntry(definition domainfeed.FeedDefinition) Entry { + return Entry{ + PubKey: definition.PublicKey().Hex(), + NPubKey: definition.PublicKey().Nip19(), + Url: definition.Address().String(), } } diff --git a/pkg/metrics/registries.go b/pkg/metrics/registries.go index cef91d3..27ce558 100644 --- a/pkg/metrics/registries.go +++ b/pkg/metrics/registries.go @@ -38,10 +38,6 @@ var ( Name: "rsslay_processed_invalid_events_ops_total", Help: "The total number of processed invalid events requests", }) - ListeningFiltersOps = promauto.NewCounter(prometheus.CounterOpts{ - Name: "rsslay_processed_listening_filters_ops_total", - Help: "The total number of updated listening filters", - }) CacheHits = promauto.NewCounter(prometheus.CounterOpts{ Name: "rsslay_processed_cache_hits_ops_total", Help: "The total number of cache hits", @@ -66,4 +62,8 @@ var ( Name: "rsslay_replay_events_error_total", Help: "Number of error replayed events by relay.", }, []string{"relay"}) + UpdateResults = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "rsslay_update_results", + Help: "Feed update results", + }, []string{"result"}) ) diff --git a/pkg/new/adapters/event_storage.go b/pkg/new/adapters/event_storage.go new file mode 100644 index 0000000..46f7af2 --- /dev/null +++ b/pkg/new/adapters/event_storage.go @@ -0,0 +1,54 @@ +package adapters + +import ( + "errors" + "log" + "sync" + + domain "github.com/piraces/rsslay/pkg/new/domain/nostr" +) + +type EventStorage struct { + events map[string][]domain.Event + eventsLock sync.RWMutex +} + +func NewEventStorage() *EventStorage { + return &EventStorage{ + events: make(map[string][]domain.Event), + } +} + +func (e *EventStorage) PutEvents(author domain.PublicKey, events []domain.Event) error { + e.eventsLock.Lock() + defer e.eventsLock.Unlock() + + log.Printf("saving %d events for feed %s", len(events), author.Hex()) + + for _, event := range events { + if !author.Equal(event.PublicKey()) { + return errors.New("one or more events weren't created by this author") + } + } + + e.events[author.Hex()] = events + return nil +} + +func (e *EventStorage) GetEvents(filter domain.Filter) ([]domain.Event, error) { + e.eventsLock.RLock() + defer e.eventsLock.RUnlock() + + // todo optimize + var results []domain.Event + + // todo optimize + for _, events := range e.events { + for _, event := range events { + if filter.Matches(event) { + results = append(results, event) + } + } + } + return results, nil +} diff --git a/pkg/new/adapters/feed_definition_storage.go b/pkg/new/adapters/feed_definition_storage.go new file mode 100644 index 0000000..ee8a4da --- /dev/null +++ b/pkg/new/adapters/feed_definition_storage.go @@ -0,0 +1,147 @@ +package adapters + +import ( + "database/sql" + "log" + + "github.com/piraces/rsslay/pkg/feed" + "github.com/piraces/rsslay/pkg/metrics" + domainfeed "github.com/piraces/rsslay/pkg/new/domain/feed" + "github.com/piraces/rsslay/pkg/new/domain/nostr" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" +) + +type FeedDefinitionStorage struct { + db *sql.DB +} + +func NewFeedDefinitionStorage(db *sql.DB) *FeedDefinitionStorage { + return &FeedDefinitionStorage{db: db} +} + +func (f *FeedDefinitionStorage) CountTotal() (int, error) { + var count int + row := f.db.QueryRow(`SELECT count(*) FROM feeds`) + err := row.Scan(&count) + if err != nil { + return 0, errors.Wrap(err, "error counting feeds") + } + + return count, nil +} + +func (f *FeedDefinitionStorage) List() ([]*domainfeed.FeedDefinition, error) { + rows, err := f.db.Query(` + SELECT publickey, privatekey, url, nitter + FROM feeds`, + ) + if err != nil { + return nil, errors.Wrap(err, "error getting feed definitions") + } + defer rows.Close() // not much we can do here + + return f.scan(rows) +} + +func (f *FeedDefinitionStorage) ListRandom(limit int) ([]*domainfeed.FeedDefinition, error) { + rows, err := f.db.Query(` + SELECT publickey, privatekey, url, nitter + FROM feeds + ORDER BY RANDOM() + LIMIT $1`, + limit, + ) + if err != nil { + return nil, errors.Wrap(err, "error getting feed definitions") + } + defer rows.Close() // not much we can do here + + return f.scan(rows) +} + +func (f *FeedDefinitionStorage) Search(query string, limit int) ([]*domainfeed.FeedDefinition, error) { + rows, err := f.db.Query(` + SELECT publickey, privatekey, url, nitter + FROM feeds + WHERE url + LIKE '%' || $1 || '%' LIMIT $2`, + query, + limit, + ) + if err != nil { + return nil, errors.Wrap(err, "error getting feed definitions") + } + defer rows.Close() // not much we can do here + + return f.scan(rows) +} + +func (f *FeedDefinitionStorage) Put(definition *domainfeed.FeedDefinition) error { + row := f.db.QueryRow("SELECT privatekey, url FROM feeds WHERE publickey=$1", definition.PublicKey().Hex()) + + var entity feed.Entity + err := row.Scan(&entity.PrivateKey, &entity.URL) + if err != nil && err == sql.ErrNoRows { + log.Printf("[DEBUG] not found feed at url %q as publicKey %s", definition.Address().String(), definition.PublicKey().Hex()) + if _, err := f.db.Exec(`INSERT INTO feeds (publickey, privatekey, url, nitter) VALUES (?, ?, ?, ?)`, definition.PublicKey().Hex(), definition.PrivateKey().Hex(), definition.Address().String(), definition.Nitter()); err != nil { + log.Printf("[ERROR] failure: %v", err) + metrics.AppErrors.With(prometheus.Labels{"type": "SQL_WRITE"}).Inc() + return errors.Wrap(err, "error inserting the new feed") + } else { + log.Printf("[DEBUG] saved feed at url %q as publicKey %s", definition.Address().String(), definition.PublicKey().Hex()) + return nil + } + } else if err != nil { + metrics.AppErrors.With(prometheus.Labels{"type": "SQL_SCAN"}).Inc() + return errors.Wrap(err, "error checking if feed exists") + } + + log.Printf("[DEBUG] found feed at url %q as publicKey %s", definition.Address().String(), definition.PublicKey().Hex()) + return nil +} + +func (f *FeedDefinitionStorage) scan(rows *sql.Rows) ([]*domainfeed.FeedDefinition, error) { + var items []*domainfeed.FeedDefinition + for rows.Next() { + var ( + tmppublickey string + tmpprivatekey string + tmpurl string + tmpnitter bool + ) + + if err := rows.Scan(&tmppublickey, &tmpprivatekey, &tmpurl, &tmpnitter); err != nil { + metrics.AppErrors.With(prometheus.Labels{"type": "SQL_SCAN"}).Inc() + return nil, errors.Wrap(err, "error scanning the retrieved rows") + } + + publicKey, err := nostr.NewPublicKeyFromHex(tmppublickey) + if err != nil { + return nil, errors.Wrap(err, "error creating public key") + } + + privateKey, err := nostr.NewPrivateKeyFromHex(tmpprivatekey) + if err != nil { + return nil, errors.Wrap(err, "error creating private key") + } + + address, err := domainfeed.NewAddress(tmpurl) + if err != nil { + return nil, errors.Wrap(err, "error creating address") + } + + feedDefinition, err := domainfeed.NewFeedDefinition( + publicKey, + privateKey, + address, + tmpnitter, + ) + if err != nil { + return nil, errors.Wrap(err, "error loading feed definition") + } + + items = append(items, feedDefinition) + } + return items, nil +} diff --git a/pkg/new/adapters/pubsub/event_created_pubsub.go b/pkg/new/adapters/pubsub/event_created_pubsub.go new file mode 100644 index 0000000..a50a69e --- /dev/null +++ b/pkg/new/adapters/pubsub/event_created_pubsub.go @@ -0,0 +1,25 @@ +package pubsub + +import ( + "context" + + "github.com/piraces/rsslay/pkg/new/domain/nostr" +) + +type EventCreatedPubSub struct { + pubsub *GoChannelPubSub[nostr.Event] +} + +func NewReceivedEventPubSub() *EventCreatedPubSub { + return &EventCreatedPubSub{ + pubsub: NewGoChannelPubSub[nostr.Event](), + } +} + +func (m *EventCreatedPubSub) PublishNewEventCreated(evt nostr.Event) { + m.pubsub.Publish(evt) +} + +func (m *EventCreatedPubSub) Subscribe(ctx context.Context) <-chan nostr.Event { + return m.pubsub.Subscribe(ctx) +} diff --git a/pkg/new/adapters/pubsub/pubsub.go b/pkg/new/adapters/pubsub/pubsub.go new file mode 100644 index 0000000..653e729 --- /dev/null +++ b/pkg/new/adapters/pubsub/pubsub.go @@ -0,0 +1,67 @@ +package pubsub + +import ( + "context" + "sync" +) + +type channelWithContext[T any] struct { + Ch chan T + Ctx context.Context +} + +type GoChannelPubSub[T any] struct { + subscriptions []channelWithContext[T] + lock sync.Mutex +} + +func NewGoChannelPubSub[T any]() *GoChannelPubSub[T] { + return &GoChannelPubSub[T]{} +} + +func (g *GoChannelPubSub[T]) Subscribe(ctx context.Context) <-chan T { + ch := make(chan T) + + g.addSubscription(ctx, ch) + + go func() { + <-ctx.Done() + g.removeSubscription(ch) + close(ch) + }() + + return ch +} + +func (g *GoChannelPubSub[T]) Publish(value T) { + g.lock.Lock() + defer g.lock.Unlock() + + for _, sub := range g.subscriptions { + select { + case sub.Ch <- value: + case <-sub.Ctx.Done(): + } + } +} + +func (g *GoChannelPubSub[T]) addSubscription(ctx context.Context, ch chan T) { + g.lock.Lock() + defer g.lock.Unlock() + + g.subscriptions = append(g.subscriptions, channelWithContext[T]{Ch: ch, Ctx: ctx}) +} + +func (g *GoChannelPubSub[T]) removeSubscription(ch chan T) { + g.lock.Lock() + defer g.lock.Unlock() + + for i := range g.subscriptions { + if g.subscriptions[i].Ch == ch { + g.subscriptions = append(g.subscriptions[:i], g.subscriptions[i+1:]...) + return + } + } + + panic("somehow the subscription was already removed, this must be a bug") +} diff --git a/pkg/new/app/app.go b/pkg/new/app/app.go new file mode 100644 index 0000000..1fafdee --- /dev/null +++ b/pkg/new/app/app.go @@ -0,0 +1,39 @@ +package app + +import ( + "github.com/mmcdole/gofeed" + "github.com/piraces/rsslay/pkg/feed" + feeddomain "github.com/piraces/rsslay/pkg/new/domain/feed" + domain "github.com/piraces/rsslay/pkg/new/domain/nostr" +) + +type App struct { + CreateFeedDefinition *HandlerCreateFeedDefinition + UpdateFeeds *HandlerUpdateFeeds + + GetEvents *HandlerGetEvents + GetTotalFeedCount *HandlerGetTotalFeedCount + GetRandomFeeds *HandlerGetRandomFeeds + SearchFeeds *HandlerSearchFeeds +} + +type FeedDefinitionStorage interface { + Put(definition *feeddomain.FeedDefinition) error + CountTotal() (int, error) + List() ([]*feeddomain.FeedDefinition, error) + ListRandom(limit int) ([]*feeddomain.FeedDefinition, error) + Search(query string, limit int) ([]*feeddomain.FeedDefinition, error) +} + +type EventStorage interface { + GetEvents(filter domain.Filter) ([]domain.Event, error) + PutEvents(author domain.PublicKey, events []domain.Event) error +} + +type ConverterSelector interface { + Select(feed *gofeed.Feed) feed.ItemToEventConverter +} + +type EventPublisher interface { + PublishNewEventCreated(evt domain.Event) +} diff --git a/pkg/new/app/handler_create_feed_definition.go b/pkg/new/app/handler_create_feed_definition.go new file mode 100644 index 0000000..f70d4da --- /dev/null +++ b/pkg/new/app/handler_create_feed_definition.go @@ -0,0 +1,77 @@ +package app + +import ( + "strings" + + "github.com/nbd-wtf/go-nostr" + "github.com/piraces/rsslay/pkg/feed" + "github.com/piraces/rsslay/pkg/new/domain" + feeddomain "github.com/piraces/rsslay/pkg/new/domain/feed" + nostrdomain "github.com/piraces/rsslay/pkg/new/domain/nostr" + "github.com/pkg/errors" +) + +type HandlerCreateFeedDefinition struct { + secret domain.Secret + feedDefinitionStorage FeedDefinitionStorage +} + +func NewHandlerCreateFeedDefinition(secret domain.Secret, feedDefinitionStorage FeedDefinitionStorage) *HandlerCreateFeedDefinition { + return &HandlerCreateFeedDefinition{secret: secret, feedDefinitionStorage: feedDefinitionStorage} +} + +func (h *HandlerCreateFeedDefinition) Handle(address feeddomain.Address) (*feeddomain.FeedDefinition, error) { + feedUrl := feed.GetFeedURL(address.String()) + if feedUrl == "" { + return nil, errors.New("could not find a feed URL in there") + } + + parsedFeed, err := feed.ParseFeed(feedUrl) + if err != nil { + return nil, errors.Wrap(err, "error parsing feed") + } + + privateKey := feed.PrivateKeyFromFeed(feedUrl, h.secret.String()) + publicKey, err := nostr.GetPublicKey(privateKey) + if err != nil { + return nil, errors.Wrap(err, "error creating a public key") + } + + publicKey = strings.TrimSpace(publicKey) + isNitterFeed := strings.Contains(parsedFeed.Description, "Twitter feed") // todo this decision should occur at domain level + + // this function still calls other functions which do not use appropriate + // domain types therefore we need to convert the return values to domain + // types here + + domainPublicKey, err := nostrdomain.NewPublicKeyFromHex(publicKey) + if err != nil { + return nil, errors.Wrap(err, "error creating address from feed url") + } + + domainPrivateKey, err := nostrdomain.NewPrivateKeyFromHex(privateKey) + if err != nil { + return nil, errors.Wrap(err, "error creating address from feed url") + } + + domainFeedUrl, err := feeddomain.NewAddress(feedUrl) + if err != nil { + return nil, errors.Wrap(err, "error creating address from feed url") + } + + definition, err := feeddomain.NewFeedDefinition( + domainPublicKey, + domainPrivateKey, + domainFeedUrl, + isNitterFeed, + ) + if err != nil { + return nil, errors.Wrap(err, "error creating feed definition") + } + + if err := h.feedDefinitionStorage.Put(definition); err != nil { + return nil, errors.Wrap(err, "error saving the feed definition") + } + + return definition, nil +} diff --git a/pkg/new/app/handler_get_events.go b/pkg/new/app/handler_get_events.go new file mode 100644 index 0000000..c33ab19 --- /dev/null +++ b/pkg/new/app/handler_get_events.go @@ -0,0 +1,17 @@ +package app + +import ( + domain "github.com/piraces/rsslay/pkg/new/domain/nostr" +) + +type HandlerGetEvents struct { + eventStorage EventStorage +} + +func NewHandlerGetEvents(eventStorage EventStorage) *HandlerGetEvents { + return &HandlerGetEvents{eventStorage: eventStorage} +} + +func (h *HandlerGetEvents) Handle(domainfilter domain.Filter) ([]domain.Event, error) { + return h.eventStorage.GetEvents(domainfilter) +} diff --git a/pkg/new/app/handler_get_random_feeds.go b/pkg/new/app/handler_get_random_feeds.go new file mode 100644 index 0000000..2e6b704 --- /dev/null +++ b/pkg/new/app/handler_get_random_feeds.go @@ -0,0 +1,23 @@ +package app + +import ( + domainfeed "github.com/piraces/rsslay/pkg/new/domain/feed" + "github.com/pkg/errors" +) + +type HandlerGetRandomFeeds struct { + feedDefinitionStorage FeedDefinitionStorage +} + +func NewHandlerGetRandomFeeds(feedDefinitionStorage FeedDefinitionStorage) *HandlerGetRandomFeeds { + return &HandlerGetRandomFeeds{ + feedDefinitionStorage: feedDefinitionStorage, + } +} + +func (h *HandlerGetRandomFeeds) Handle(limit int) ([]*domainfeed.FeedDefinition, error) { + if limit <= 0 { + return nil, errors.New("limit must be positive") + } + return h.feedDefinitionStorage.ListRandom(limit) +} diff --git a/pkg/new/app/handler_get_total_event_count.go b/pkg/new/app/handler_get_total_event_count.go new file mode 100644 index 0000000..293cb40 --- /dev/null +++ b/pkg/new/app/handler_get_total_event_count.go @@ -0,0 +1,15 @@ +package app + +type HandlerGetTotalFeedCount struct { + feedDefinitionStorage FeedDefinitionStorage +} + +func NewHandlerGetTotalFeedCount(feedDefinitionStorage FeedDefinitionStorage) *HandlerGetTotalFeedCount { + return &HandlerGetTotalFeedCount{ + feedDefinitionStorage: feedDefinitionStorage, + } +} + +func (h *HandlerGetTotalFeedCount) Handle() (int, error) { + return h.feedDefinitionStorage.CountTotal() +} diff --git a/pkg/new/app/handler_on_new_event_created.go b/pkg/new/app/handler_on_new_event_created.go new file mode 100644 index 0000000..69e3743 --- /dev/null +++ b/pkg/new/app/handler_on_new_event_created.go @@ -0,0 +1,30 @@ +package app + +import ( + "context" + "time" + + "github.com/nbd-wtf/go-nostr" + domain "github.com/piraces/rsslay/pkg/new/domain/nostr" +) + +type HandlerOnNewEventCreated struct { + updatesCh chan<- nostr.Event + lastEventTime map[string]time.Time +} + +func NewHandlerOnNewEventCreated(updatesCh chan<- nostr.Event) *HandlerOnNewEventCreated { + return &HandlerOnNewEventCreated{ + updatesCh: updatesCh, + lastEventTime: make(map[string]time.Time), + } +} + +func (h *HandlerOnNewEventCreated) Handle(ctx context.Context, event domain.Event) error { + key := event.PublicKey().Hex() + if last, ok := h.lastEventTime[key]; !ok || last.Before(event.CreatedAt()) { + h.lastEventTime[key] = event.CreatedAt() + h.updatesCh <- event.Libevent() + } + return nil +} diff --git a/pkg/new/app/handler_update_feeds.go b/pkg/new/app/handler_update_feeds.go new file mode 100644 index 0000000..3c0e1fa --- /dev/null +++ b/pkg/new/app/handler_update_feeds.go @@ -0,0 +1,219 @@ +package app + +import ( + "context" + "database/sql" + "log" + "time" + + "github.com/hashicorp/go-multierror" + "github.com/mmcdole/gofeed" + "github.com/nbd-wtf/go-nostr" + "github.com/piraces/rsslay/pkg/events" + "github.com/piraces/rsslay/pkg/feed" + "github.com/piraces/rsslay/pkg/metrics" + domainfeed "github.com/piraces/rsslay/pkg/new/domain/feed" + domain "github.com/piraces/rsslay/pkg/new/domain/nostr" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" +) + +const numWorkers = 10 + +type HandlerUpdateFeeds struct { + deleteFailingFeeds bool + nitterInstances []string + enableAutoNIP05Registration bool + defaultProfilePictureUrl string + mainDomainName string + + db *sql.DB // todo remove! + feedDefinitionStorage FeedDefinitionStorage + converterSelector ConverterSelector + eventStorage EventStorage + eventPublisher EventPublisher +} + +func NewHandlerUpdateFeeds( + deleteFailingFeeds bool, + nitterInstances []string, + enableAutoNIP05Registration bool, + defaultProfilePictureUrl string, + mainDomainName string, + db *sql.DB, + feedDefinitionStorage FeedDefinitionStorage, + converterSelector ConverterSelector, + eventStorage EventStorage, + eventPublisher EventPublisher, +) *HandlerUpdateFeeds { + return &HandlerUpdateFeeds{ + deleteFailingFeeds: deleteFailingFeeds, + nitterInstances: nitterInstances, + enableAutoNIP05Registration: enableAutoNIP05Registration, + defaultProfilePictureUrl: defaultProfilePictureUrl, + mainDomainName: mainDomainName, + db: db, + feedDefinitionStorage: feedDefinitionStorage, + converterSelector: converterSelector, + eventStorage: eventStorage, + eventPublisher: eventPublisher, + } +} + +func (h *HandlerUpdateFeeds) Handle(ctx context.Context) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + definitions, err := h.feedDefinitionStorage.List() + if err != nil { + return errors.Wrap(err, "error getting feed definitions") + } + + chIn := make(chan *domainfeed.FeedDefinition) + chOut := make(chan definitionWithError) + + go func() { + for _, definition := range definitions { + definition := definition + select { + case chIn <- definition: + continue + case <-ctx.Done(): + return + } + } + }() + + h.startWorkers(ctx, chIn, chOut) + + counterSuccess := 0 + counterError := 0 + + var resultErr error + for i := 0; i < len(definitions); i++ { + select { + case definitionWithError := <-chOut: + if err := definitionWithError.Err; err != nil { + resultErr = multierror.Append(resultErr, err) + counterError++ + } else { + counterSuccess++ + } + case <-ctx.Done(): + return ctx.Err() + } + } + + log.Printf("updating feeds result success=%d error=%d", counterSuccess, counterError) + metrics.UpdateResults.With(prometheus.Labels{"result": "success"}).Set(float64(counterSuccess)) + metrics.UpdateResults.With(prometheus.Labels{"result": "error"}).Set(float64(counterError)) + + return resultErr +} + +func (h *HandlerUpdateFeeds) startWorkers(ctx context.Context, chIn chan *domainfeed.FeedDefinition, chOut chan definitionWithError) { + for i := 0; i < numWorkers; i++ { + go h.startWorker(ctx, chIn, chOut) + } +} + +func (h *HandlerUpdateFeeds) startWorker(ctx context.Context, chIn chan *domainfeed.FeedDefinition, chOut chan definitionWithError) { + for { + select { + case definition := <-chIn: + err := h.updateFeed(ctx, definition) + select { + case chOut <- definitionWithError{ + Definition: definition, + Err: err, + }: + case <-ctx.Done(): + return + } + case <-ctx.Done(): + return + } + } +} + +// todo restore the capability to replay events? +func (h *HandlerUpdateFeeds) updateFeed(ctx context.Context, definition *domainfeed.FeedDefinition) error { + log.Printf("updating feed %s", definition.PublicKey().Hex()) + + events, err := h.getFeedEvents(definition) + if err != nil { + return errors.Wrapf(err, "error getting events for feed '%s'", definition.PublicKey().Hex()) + } + + if err := h.eventStorage.PutEvents(definition.PublicKey(), events); err != nil { + return errors.Wrap(err, "error saving events") + } + + for _, event := range events { + h.eventPublisher.PublishNewEventCreated(event) + } + + return nil +} + +func (h *HandlerUpdateFeeds) getFeedEvents(definition *domainfeed.FeedDefinition) ([]domain.Event, error) { + parsedFeed, entity := events.GetParsedFeedForPubKey( + definition.PublicKey().Hex(), + h.db, + h.deleteFailingFeeds, + h.nitterInstances, + ) + if parsedFeed == nil { + return nil, nil // todo why is this not an error? + } + + var events []domain.Event + + metadataEvent, err := h.makeMetadataEvent(definition, parsedFeed, entity) + if err != nil { + return nil, errors.Wrap(err, "error creating the metadata event") + } + events = append(events, metadataEvent) + + converter := h.converterSelector.Select(parsedFeed) + + for _, item := range parsedFeed.Items { + defaultCreatedAt := time.Unix(time.Now().Unix(), 0) + evt := converter.Convert(definition.PublicKey().Hex(), item, parsedFeed, defaultCreatedAt, entity.URL) + + // Feed need to have a date for each entry... + if evt.CreatedAt == nostr.Timestamp(defaultCreatedAt.Unix()) { + continue + } + + if err = evt.Sign(entity.PrivateKey); err != nil { + return nil, errors.Wrap(err, "error signing the event") + } + + domainEvent, err := domain.NewEvent(evt) + if err != nil { + return nil, errors.Wrap(err, "error creating a domain event") + } + + events = append(events, domainEvent) + } + + return events, nil +} + +func (h *HandlerUpdateFeeds) makeMetadataEvent(definition *domainfeed.FeedDefinition, parsedFeed *gofeed.Feed, entity feed.Entity) (domain.Event, error) { + evt := feed.EntryFeedToSetMetadata(definition.PublicKey().Hex(), parsedFeed, entity.URL, h.enableAutoNIP05Registration, h.defaultProfilePictureUrl, h.mainDomainName) + if err := evt.Sign(entity.PrivateKey); err != nil { + return domain.Event{}, errors.Wrap(err, "error signing the event") + } + domainMetadataEvent, err := domain.NewEvent(evt) + if err != nil { + return domain.Event{}, errors.Wrap(err, "error creating a domain event") + } + return domainMetadataEvent, nil +} + +type definitionWithError struct { + Definition *domainfeed.FeedDefinition + Err error +} diff --git a/pkg/new/app/search_feeds.go b/pkg/new/app/search_feeds.go new file mode 100644 index 0000000..cee1cce --- /dev/null +++ b/pkg/new/app/search_feeds.go @@ -0,0 +1,23 @@ +package app + +import ( + domainfeed "github.com/piraces/rsslay/pkg/new/domain/feed" + "github.com/pkg/errors" +) + +type HandlerSearchFeeds struct { + feedDefinitionStorage FeedDefinitionStorage +} + +func NewHandlerSearchFeeds(feedDefinitionStorage FeedDefinitionStorage) *HandlerSearchFeeds { + return &HandlerSearchFeeds{ + feedDefinitionStorage: feedDefinitionStorage, + } +} + +func (h *HandlerSearchFeeds) Handle(query string, limit int) ([]*domainfeed.FeedDefinition, error) { + if limit <= 0 { + return nil, errors.New("limit must be positive") + } + return h.feedDefinitionStorage.Search(query, limit) +} diff --git a/pkg/new/domain/domain.go b/pkg/new/domain/domain.go new file mode 100644 index 0000000..361f4ce --- /dev/null +++ b/pkg/new/domain/domain.go @@ -0,0 +1,19 @@ +package domain + +import "errors" + +type Secret struct { + s string +} + +func NewSecret(s string) (Secret, error) { + if s == "" { + return Secret{}, errors.New("secret can't be an empty string") + } + + return Secret{s: s}, nil +} + +func (s Secret) String() string { + return s.s +} diff --git a/pkg/new/domain/feed/feed.go b/pkg/new/domain/feed/feed.go new file mode 100644 index 0000000..6d67cd5 --- /dev/null +++ b/pkg/new/domain/feed/feed.go @@ -0,0 +1,59 @@ +package feed + +import ( + "errors" + + "github.com/piraces/rsslay/pkg/helpers" + "github.com/piraces/rsslay/pkg/new/domain/nostr" +) + +type FeedDefinition struct { + publicKey nostr.PublicKey + privateKey nostr.PrivateKey + address Address + nitter bool +} + +func NewFeedDefinition(publicKey nostr.PublicKey, privateKey nostr.PrivateKey, address Address, nitter bool) (*FeedDefinition, error) { + if !publicKey.Matches(privateKey) { + return nil, errors.New("public/private key mismatch") + } + + return &FeedDefinition{publicKey: publicKey, privateKey: privateKey, address: address, nitter: nitter}, nil +} + +func (f FeedDefinition) PublicKey() nostr.PublicKey { + return f.publicKey +} + +func (f FeedDefinition) PrivateKey() nostr.PrivateKey { + return f.privateKey +} + +func (f FeedDefinition) Address() Address { + return f.address +} + +func (f FeedDefinition) Nitter() bool { + return f.nitter +} + +type Address struct { + s string +} + +func NewAddress(s string) (Address, error) { + if s == "" { + return Address{}, errors.New("address can't me an empty string") + } + + if !helpers.IsValidHttpUrl(s) { + return Address{}, errors.New("invalid URL provided (must be in absolute format and with https or https scheme)") + } + + return Address{s: s}, nil +} + +func (a Address) String() string { + return a.s +} diff --git a/pkg/new/domain/nostr/nostr.go b/pkg/new/domain/nostr/nostr.go new file mode 100644 index 0000000..8ba40d7 --- /dev/null +++ b/pkg/new/domain/nostr/nostr.go @@ -0,0 +1,156 @@ +package nostr + +import ( + "bytes" + "crypto/sha256" + "encoding/hex" + "fmt" + "time" + + "github.com/btcsuite/btcd/btcec/v2" + "github.com/btcsuite/btcd/btcec/v2/schnorr" + "github.com/nbd-wtf/go-nostr" + "github.com/nbd-wtf/go-nostr/nip19" + "github.com/pkg/errors" +) + +const ( + publicKeyBytesLen = 32 + privateKeyBytesLen = btcec.PrivKeyBytesLen + idBytesLen = sha256.Size +) + +type Filter struct { + filter *nostr.Filter +} + +func NewFilter(filter *nostr.Filter) Filter { + return Filter{ + filter: filter, + } +} + +func (f Filter) Matches(event Event) bool { + return f.filter.Matches(&event.event) +} + +type Event struct { + id ID + publicKey PublicKey + event nostr.Event +} + +func NewEvent(event nostr.Event) (Event, error) { + publicKey, err := NewPublicKeyFromHex(event.PubKey) + if err != nil { + return Event{}, errors.Wrap(err, "error parsing the public key") + } + + id, err := NewIDFromHex(event.ID) + if err != nil { + return Event{}, errors.Wrap(err, "error parsing the id") + } + + return Event{ + id: id, + publicKey: publicKey, + event: event, + }, nil +} + +func (e Event) ID() ID { + return e.id +} + +func (e Event) PublicKey() PublicKey { + return e.publicKey +} + +func (e Event) CreatedAt() time.Time { + return e.event.CreatedAt.Time() +} + +func (e Event) Libevent() nostr.Event { + return e.event +} + +type ID struct { + b []byte +} + +func NewIDFromHex(s string) (ID, error) { + b, err := hex.DecodeString(s) + if err != nil { + return ID{}, errors.Wrap(err, "error decoding hex string") + } + + if l := len(b); l != idBytesLen { + return ID{}, fmt.Errorf("invalid event id length '%d'", l) + } + + return ID{b: b}, nil +} + +func (id ID) Hex() string { + return hex.EncodeToString(id.b) +} + +type PublicKey struct { + b []byte +} + +func NewPublicKeyFromHex(s string) (PublicKey, error) { + b, err := hex.DecodeString(s) + if err != nil { + return PublicKey{}, errors.Wrap(err, "error decoding hex string") + } + + if l := len(b); l != publicKeyBytesLen { + return PublicKey{}, fmt.Errorf("invalid public key length '%d'", l) + } + + return PublicKey{b: b}, nil +} + +func (p PublicKey) Hex() string { + return hex.EncodeToString(p.b) +} + +func (p PublicKey) Nip19() string { + nip19, err := nip19.EncodePublicKey(p.Hex()) + if err != nil { + panic(err) // will either always fail or never fail so tests are enough + } + return nip19 +} + +func (p PublicKey) Equal(o PublicKey) bool { + return bytes.Equal(p.b, o.b) +} + +func (p PublicKey) Matches(key PrivateKey) bool { + _, publicKey := btcec.PrivKeyFromBytes(key.b) + hexPublicKey := hex.EncodeToString(schnorr.SerializePubKey(publicKey)) + return p.Hex() == hexPublicKey +} + +type PrivateKey struct { + b []byte +} + +func NewPrivateKeyFromHex(s string) (PrivateKey, error) { + b, err := hex.DecodeString(s) + if err != nil { + return PrivateKey{}, errors.Wrap(err, "error decoding hex string") + } + + if l := len(b); l != privateKeyBytesLen { + return PrivateKey{}, fmt.Errorf("invalid private key length '%d'", l) + } + + return PrivateKey{b: b}, nil +} + +func (k PrivateKey) Hex() any { + return hex.EncodeToString(k.b) +} diff --git a/pkg/new/domain/nostr/nostr_test.go b/pkg/new/domain/nostr/nostr_test.go new file mode 100644 index 0000000..fa5fa04 --- /dev/null +++ b/pkg/new/domain/nostr/nostr_test.go @@ -0,0 +1,41 @@ +package nostr_test + +import ( + "encoding/hex" + "testing" + + "github.com/btcsuite/btcd/btcec/v2" + "github.com/btcsuite/btcd/btcec/v2/schnorr" + "github.com/piraces/rsslay/pkg/new/domain/nostr" + "github.com/stretchr/testify/require" +) + +func TestPublicKeyLength(t *testing.T) { + privateKey, err := btcec.NewPrivateKey() + require.NoError(t, err) + + publicKey := privateKey.PubKey() + + hexPublicKey := hex.EncodeToString(schnorr.SerializePubKey(publicKey)) + + _, err = nostr.NewPublicKeyFromHex(hexPublicKey) + require.NoError(t, err) +} + +func TestPrivateKeyLength(t *testing.T) { + privateKey, err := btcec.NewPrivateKey() + require.NoError(t, err) + + hexPrivateKey := hex.EncodeToString(privateKey.Serialize()) + + _, err = nostr.NewPrivateKeyFromHex(hexPrivateKey) + require.NoError(t, err) +} + +func TestNip19DoesNotPanic(t *testing.T) { + publicKey, err := nostr.NewPublicKeyFromHex("6ce3fe33ca1d1c4ab7de95ddf2dcceea7d328ce9c0ff14f5209e10f2db248a6d") + require.NoError(t, err) + + nip19 := publicKey.Nip19() + require.Equal(t, "npub1dn3luv72r5wy4d77jhwl9hxwaf7n9r8fcrl3fafqncg09key3fksk92ep4", nip19) +} diff --git a/pkg/new/ports/pubsub/received_events.go b/pkg/new/ports/pubsub/received_events.go new file mode 100644 index 0000000..17f630b --- /dev/null +++ b/pkg/new/ports/pubsub/received_events.go @@ -0,0 +1,36 @@ +package pubsub + +import ( + "context" + "log" + + "github.com/piraces/rsslay/pkg/new/adapters/pubsub" + domain "github.com/piraces/rsslay/pkg/new/domain/nostr" +) + +type EventCreatedHandler interface { + Handle(ctx context.Context, event domain.Event) error +} + +type ReceivedEventSubscriber struct { + pubsub *pubsub.EventCreatedPubSub + handler EventCreatedHandler +} + +func NewReceivedEventSubscriber( + pubsub *pubsub.EventCreatedPubSub, + handler EventCreatedHandler, +) *ReceivedEventSubscriber { + return &ReceivedEventSubscriber{ + pubsub: pubsub, + handler: handler, + } +} + +func (p *ReceivedEventSubscriber) Run(ctx context.Context) { + for event := range p.pubsub.Subscribe(ctx) { + if err := p.handler.Handle(ctx, event); err != nil { + log.Printf("error passing event '%s' to event created handler: %s", event.ID().Hex(), err) + } + } +} diff --git a/pkg/new/ports/timer_update_feeds.go b/pkg/new/ports/timer_update_feeds.go new file mode 100644 index 0000000..768db22 --- /dev/null +++ b/pkg/new/ports/timer_update_feeds.go @@ -0,0 +1,34 @@ +package ports + +import ( + "context" + "log" + "time" +) + +type HandlerUpdateFeeds interface { + Handle(ctx context.Context) error +} + +type UpdateFeedsTimer struct { + handler HandlerUpdateFeeds +} + +func NewUpdateFeedsTimer(handler HandlerUpdateFeeds) *UpdateFeedsTimer { + return &UpdateFeedsTimer{handler: handler} +} + +func (h *UpdateFeedsTimer) Run(ctx context.Context) { + for { + if err := h.handler.Handle(ctx); err != nil { + log.Printf("error updating feeds %s", err) + } + + select { + case <-time.After(30 * time.Minute): + continue + case <-ctx.Done(): + return + } + } +}