diff --git a/.gitignore b/.gitignore index d3e24cb4b..b5fc4198c 100644 --- a/.gitignore +++ b/.gitignore @@ -29,6 +29,9 @@ test-coverage.out /lexgen /stress /labelmaker +/palomar +/sonar-cli +/supercollider # Don't ignore this file itself, or other specific dotfiles !.gitignore diff --git a/Makefile b/Makefile index e6a3d383b..5058891d6 100644 --- a/Makefile +++ b/Makefile @@ -26,6 +26,7 @@ build: ## Build all executables go build ./cmd/labelmaker go build ./cmd/supercollider go build -o ./sonar-cli ./cmd/sonar + go build ./cmd/palomar .PHONY: all all: build diff --git a/atproto/identity/base_directory.go b/atproto/identity/base_directory.go index 29323f38d..40577eac3 100644 --- a/atproto/identity/base_directory.go +++ b/atproto/identity/base_directory.go @@ -19,6 +19,8 @@ type BaseDirectory struct { Resolver net.Resolver // when doing DNS handle resolution, should this resolver attempt re-try against an authoritative nameserver if the first TXT lookup fails? TryAuthoritativeDNS bool + // set of handle domain suffixes for for which DNS handle resolution will be skipped + SkipDNSDomainSuffixes []string } var _ Directory = (*BaseDirectory)(nil) @@ -61,10 +63,11 @@ func (d *BaseDirectory) LookupDID(ctx context.Context, did syntax.DID) (*Identit return nil, err } resolvedDID, err := d.ResolveHandle(ctx, declared) - if err != nil { + if err != nil && err != ErrHandleNotFound { return nil, err - } - if resolvedDID == did { + } else if ErrHandleNotFound == err || resolvedDID != did { + ident.Handle = syntax.Handle("handle.invalid") + } else { ident.Handle = declared } diff --git a/atproto/identity/handle.go b/atproto/identity/handle.go index 9583cd265..41bb473d2 100644 --- a/atproto/identity/handle.go +++ b/atproto/identity/handle.go @@ -124,24 +124,37 @@ func (d *BaseDirectory) ResolveHandleWellKnown(ctx context.Context, handle synta func (d *BaseDirectory) ResolveHandle(ctx context.Context, handle syntax.Handle) (syntax.DID, error) { // TODO: *could* do resolution in parallel, but expecting that sequential is sufficient to start - start := time.Now() - triedAuthoritative := false - did, dnsErr := d.ResolveHandleDNS(ctx, handle) - if dnsErr == ErrHandleNotFound && d.TryAuthoritativeDNS { - slog.Info("attempting authoritative handle DNS resolution", "handle", handle) - triedAuthoritative = true - // try harder with authoritative lookup - did, dnsErr = d.ResolveHandleDNSAuthoritative(ctx, handle) + var dnsErr error + var did syntax.DID + + tryDNS := true + for _, suffix := range d.SkipDNSDomainSuffixes { + if strings.HasSuffix(handle.String(), suffix) { + tryDNS = false + break + } } - elapsed := time.Since(start) - slog.Debug("resolve handle DNS", "handle", handle, "err", dnsErr, "did", did, "authoritative", triedAuthoritative, "duration_ms", elapsed.Milliseconds()) - if nil == dnsErr { // if *not* an error - return did, nil + + if tryDNS { + start := time.Now() + triedAuthoritative := false + did, dnsErr = d.ResolveHandleDNS(ctx, handle) + if dnsErr == ErrHandleNotFound && d.TryAuthoritativeDNS { + slog.Info("attempting authoritative handle DNS resolution", "handle", handle) + triedAuthoritative = true + // try harder with authoritative lookup + did, dnsErr = d.ResolveHandleDNSAuthoritative(ctx, handle) + } + elapsed := time.Since(start) + slog.Debug("resolve handle DNS", "handle", handle, "err", dnsErr, "did", did, "authoritative", triedAuthoritative, "duration_ms", elapsed.Milliseconds()) + if nil == dnsErr { // if *not* an error + return did, nil + } } - start = time.Now() + start := time.Now() did, httpErr := d.ResolveHandleWellKnown(ctx, handle) - elapsed = time.Since(start) + elapsed := time.Since(start) slog.Debug("resolve handle HTTP well-known", "handle", handle, "err", httpErr, "did", did, "duration_ms", elapsed.Milliseconds()) if nil == httpErr { // if *not* an error return did, nil diff --git a/atproto/identity/identity.go b/atproto/identity/identity.go index bbdb6ff89..1e88e72a1 100644 --- a/atproto/identity/identity.go +++ b/atproto/identity/identity.go @@ -61,6 +61,8 @@ func DefaultDirectory() Directory { }, }, TryAuthoritativeDNS: true, + // primary Bluesky PDS instance only supports HTTP resolution method + SkipDNSDomainSuffixes: []string{".bsky.social"}, } cached := NewCacheDirectory(&base, 10000, time.Hour*24, time.Minute*2) return &cached diff --git a/cmd/palomar/Dockerfile.opensearch b/cmd/palomar/Dockerfile.opensearch new file mode 100644 index 000000000..ddee16f73 --- /dev/null +++ b/cmd/palomar/Dockerfile.opensearch @@ -0,0 +1,2 @@ +FROM opensearchproject/opensearch:2.5.0 +RUN /usr/share/opensearch/bin/opensearch-plugin install --batch analysis-icu diff --git a/cmd/palomar/README.md b/cmd/palomar/README.md index b74b17627..e1ade205c 100644 --- a/cmd/palomar/README.md +++ b/cmd/palomar/README.md @@ -1,48 +1,92 @@ # Palomar -Palomar is an Elasticsearch/OpenSearch frontend and ATP (AT Protocol) repository crawler designed to provide search services for the Bluesky network. +Palomar is a backend search service for atproto, specifically the `bsky.app` post and profile record types. It works by consuming a repo event stream ("firehose") and upating an OpenSearch cluster (fork of Elasticsearch) with docs. -## Prerequisites +Almost all the code for this service is actually in the `search/` directory at the top of this repo. -- GoLang (version 1.21) -- Running instance of Elasticsearch or OpenSearch for indexing. +In September 2023, this service was substantially re-written. It no longer stores records in a local database, returns only "skelton" results (list of ATURIs or DIDs) via the HTTP API, and defines index mappings. -## Building -``` -go build -``` +## Query String Syntax + +Currently only a simple query string syntax is supported. Double-quotes can surround phrases, `-` prefix negates a single keyword, and the following initial filters are supported: + +- `from:` will filter to results from that account, based on current (cached) identity resolution +- entire DIDs as an un-quoted keyword will result in filtering to results from that account + ## Configuration Palomar uses environment variables for configuration. -- `ATP_BGS_HOST`: URL of the Bluesky BGS (e.g., `https://bgs.staging.bsky.dev`). -- `ELASTIC_HTTPS_FINGERPRINT`: Required if using a self-signed cert for your Elasticsearch deployment. -- `ELASTIC_USERNAME`: Elasticsearch username (default: `admin`). -- `ELASTIC_PASSWORD`: Password for Elasticsearch authentication. -- `ELASTIC_HOSTS`: Comma-separated list of Elasticsearch endpoints. -- `READONLY`: Set this if the instance should act as a readonly HTTP server (no indexing). +- `ATP_BGS_HOST`: URL of firehose to subscribe to, either global BGS or individual PDS (default: `wss://bsky.social`) +- `ATP_PLC_HOST`: PLC directory for identity lookups (default: `https://plc.directory`) +- `DATABASE_URL`: connection string for database to persist firehose cursor subscription state +- `PALOMAR_BIND`: IP/port to have HTTP API listen on (default: `:3999`) +- `ES_USERNAME`: Elasticsearch username (default: `admin`) +- `ES_PASSWORD`: Password for Elasticsearch authentication +- `ES_CERT_FILE`: Optional, for TLS connections +- `ES_HOSTS`: Comma-separated list of Elasticsearch endpoints +- `ES_POST_INDEX`: name of index for post docs (default: `palomar_post`) +- `ES_PROFILE_INDEX`: name of index for profile docs (default: `palomar_profile`) +- `PALOMAR_READONLY`: Set this if the instance should act as a readonly HTTP server (no indexing) + +## HTTP API + +### Query Posts: `/xrpc/app.bsky.unspecced.searchPostsSkeleton` + +HTTP Query Params: + +- `q`: query string, required +- `limit`: integer, default 25 +- `cursor`: string, for partial pagination (uses offset, not a scroll) + +Response: + +- `posts`: array of AT-URI strings +- `hits_total`: integer; optional number of search hits (may not be populated for large result sets, eg over 10k hits) +- `cursor`: string; optionally included if there are more results that can be paginated + +### Query Profiles: `/xrpc/app.bsky.unspecced.searchActorsSkeleton` + +HTTP Query Params: + +- `q`: query string, required +- `limit`: integer, default 25 +- `cursor`: string, for partial pagination (uses offset, not a scroll) +- `typeahead`: boolean, for typeahead behavior (vs. full search) + +Response: + +- `actors`: array of AT-URI strings +- `hits_total`: integer; optional number of search hits (may not be populated for large result sets, eg over 10k hits) +- `cursor`: string; optionally included if there are more results that can be paginated + +## Development Quickstart + +Run an ephemeral opensearch instance on local port 9200, with SSL disabled, and the `analysis-icu` plugin installed, using docker: -## Running the Application + docker build -f Dockerfile.opensearch . -t opensearch-palomar + docker run -p 9200:9200 -p 9600:9600 -e "discovery.type=single-node" -e "plugins.security.disabled=true" opensearch-palomar -Once the environment variables are set properly, you can start Palomar by running: +See [README.opensearch.md]() for more Opensearch operational tips. -``` -./palomar run -``` +From the top level of the repository: -## Indexing + # run combined indexing and search service + make run-dev-search -For now, there isnt an easy way to get updates from the PDS, so to keep the -index up to date you will periodcally need to scrape the data. + # run just the search service + READONLY=true make run-dev-search -## API +You'll need to get some content in to the index. An easy way to do this is to have palomar consume from the public production firehose. -### `/index/:did` +You can run test queries from the top level of the repository: -Indexes the content in the given user's repository. It keeps track of the last repository update and only fetches incremental changes. + go run ./cmd/palomar search-post "hello" + go run ./cmd/palomar search-profile "hello" + go run ./cmd/palomar search-profile -typeahead "h" -### `/search?q=QUERY` +For more commands and args: -Performs a simple, case-insensitive search across the entire application. + go run ./cmd/palomar --help diff --git a/cmd/palomar/README.opensearch.md b/cmd/palomar/README.opensearch.md new file mode 100644 index 000000000..1a20f6ed9 --- /dev/null +++ b/cmd/palomar/README.opensearch.md @@ -0,0 +1,90 @@ + +# Basic OpenSearch Operations + +We use OpenSearch version 2.5+, with the `analysis-icu` plugin. This is included automatically on the AWS hosted version of Opensearch, otherwise you need to install: + + sudo /usr/share/opensearch/bin/opensearch-plugin install analysis-icu + sudo service opensearch restart + +If you are trying to use Elasticsearch 7.10 instead of OpenSearch, you can install the plugin with: + + sudo /usr/share/elasticsearch/bin/elasticsearch-plugin install analysis-icu + sudo service elasticsearch restart + +## Local Development + +With OpenSearch running locally. + +To manually drop and re-build the indices with new schemas (palomar will create these automatically if they don't exist, but this can be helpful when developing the schema itself): + + http delete :9200/palomar_post + http delete :9200/palomar_profile + http put :9200/palomar_post < post_schema.json + http put :9200/palomar_profile < profile_schema.json + +Put a single object (good for debugging): + + head -n1 examples.json | http post :9200/palomar_post/_doc/0 + http get :9200/palomar_post/_doc/0 + +Bulk insert from a file on disk: + + # esbulk is a golang CLI tool which must be installed separately + esbulk -verbose -id ident -index palomar_post -type _doc examples.json + +## Index Aliases + +To make re-indexing and schema changes easier, we can create versioned (or +time-stamped) elasticsearch indexes, and then point to them using index +aliases. The index alias updates are fast and atomic, so we can slowly build up +a new index and then cut over with no downtime. + + http put :9200/palomar_post_v04 < post_schema.json + +To do an atomic swap from one alias to a new one ("zero downtime"): + + http post :9200/_aliases << EOF + { + "actions": [ + { "remove": { "index": "palomar_post_v05", "alias": "palomar_post" }}, + { "add": { "index": "palomar_post_v06", "alias": "palomar_post" }} + ] + } + EOF + +To replace an existing ("real") index with an alias pointer, do two actions +(not truly zero-downtime, but pretty fast): + + http delete :9200/palomar_post + http put :9200/palomar_post_v03/_alias/palomar_post + +## Full-Text Querying + +A generic full-text "query string" query look like this (replace "blood" with +actual query string, and "size" field with the max results to return): + + GET /palomar_post/_search + { + "query": { + "query_string": { + "query": "blood", + "analyzer": "textIcuSearch", + "default_operator": "AND", + "analyze_wildcard": true, + "lenient": true, + "fields": ["handle^5", "text"] + } + }, + "size": 3 + } + +In the results take `.hits.hits[]._source` as the objects; `.hits.total` is the +total number of search hits. + + +## Index Debugging + +Check index size: + + http get :9200/palomar_post/_count + http get :9200/palomar_profile/_count diff --git a/cmd/palomar/main.go b/cmd/palomar/main.go index 3d9e3d1e0..1b8641c79 100644 --- a/cmd/palomar/main.go +++ b/cmd/palomar/main.go @@ -1,29 +1,30 @@ package main import ( - "bytes" "context" "encoding/json" "fmt" + "log/slog" + "net/http" "os" "strings" + "time" _ "github.com/joho/godotenv/autoload" + "github.com/bluesky-social/indigo/atproto/identity" "github.com/bluesky-social/indigo/search" "github.com/bluesky-social/indigo/util/cliutil" "github.com/bluesky-social/indigo/util/version" - logging "github.com/ipfs/go-log" es "github.com/opensearch-project/opensearch-go/v2" cli "github.com/urfave/cli/v2" ) -var log = logging.Logger("palomar") - func main() { if err := run(os.Args); err != nil { - log.Fatal(err) + slog.Error("exiting", "err", err) + os.Exit(-1) } } @@ -57,18 +58,18 @@ func run(args []string) error { Name: "elastic-hosts", Usage: "elasticsearch hosts (schema/host/port)", Value: "http://localhost:9200", - EnvVars: []string{"ES_HOSTS", "ELASTIC_HOSTS"}, + EnvVars: []string{"ES_HOSTS", "ELASTIC_HOSTS", "OPENSEARCH_URL", "ELASTICSEARCH_URL"}, }, &cli.StringFlag{ Name: "es-post-index", Usage: "ES index for 'post' documents", - Value: "posts", + Value: "palomar_post", EnvVars: []string{"ES_POST_INDEX"}, }, &cli.StringFlag{ Name: "es-profile-index", Usage: "ES index for 'profile' documents", - Value: "profiles", + Value: "palomar_profile", EnvVars: []string{"ES_PROFILE_INDEX"}, }, &cli.StringFlag{ @@ -83,13 +84,6 @@ func run(args []string) error { Value: "https://plc.directory", EnvVars: []string{"ATP_PLC_HOST"}, }, - // TODO(bnewbold): this is a temporary hack to fetch our own blobs - &cli.StringFlag{ - Name: "atp-pds-host", - Usage: "method, hostname, and port of PDS instance", - Value: "https://bsky.social", - EnvVars: []string{"ATP_PDS_HOST"}, - }, &cli.IntFlag{ Name: "max-metadb-connections", EnvVars: []string{"MAX_METADB_CONNECTIONS"}, @@ -98,9 +92,10 @@ func run(args []string) error { } app.Commands = []*cli.Command{ - elasticCheckCmd, - searchCmd, runCmd, + elasticCheckCmd, + searchPostCmd, + searchProfileCmd, } return app.Run(args) @@ -111,14 +106,13 @@ var runCmd = &cli.Command{ Usage: "combined indexing+query server", Flags: []cli.Flag{ &cli.StringFlag{ - Name: "database-url", - // XXX: data/palomar/search.db - Value: "sqlite://data/thecloud.db", + Name: "database-url", + Value: "sqlite://data/palomar/search.db", EnvVars: []string{"DATABASE_URL"}, }, &cli.BoolFlag{ Name: "readonly", - EnvVars: []string{"READONLY"}, + EnvVars: []string{"PALOMAR_READONLY", "READONLY"}, }, &cli.StringFlag{ Name: "bind", @@ -126,8 +120,25 @@ var runCmd = &cli.Command{ Value: ":3999", EnvVars: []string{"PALOMAR_BIND"}, }, + &cli.IntFlag{ + Name: "bgs-sync-rate-limit", + Usage: "max repo sync (checkout) requests per second to upstream (BGS)", + Value: 8, + EnvVars: []string{"PALOMAR_BGS_SYNC_RATE_LIMIT"}, + }, + &cli.IntFlag{ + Name: "index-max-concurrency", + Usage: "max number of concurrent index requests (HTTP POST) to search index", + Value: 20, + EnvVars: []string{"PALOMAR_INDEX_MAX_CONCURRENCY"}, + }, }, Action: func(cctx *cli.Context) error { + logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ + Level: slog.LevelInfo, + })) + slog.SetDefault(logger) + db, err := cliutil.SetupDatabase(cctx.String("database-url"), cctx.Int("max-metadb-connections")) if err != nil { return err @@ -138,12 +149,29 @@ var runCmd = &cli.Command{ return fmt.Errorf("failed to get elasticsearch: %w", err) } + // TODO: replace this with "bingo" resolver + base := identity.BaseDirectory{ + PLCURL: cctx.String("atp-plc-host"), + HTTPClient: http.Client{ + Timeout: time.Second * 15, + }, + TryAuthoritativeDNS: true, + SkipDNSDomainSuffixes: []string{".bsky.social"}, + } + dir := identity.NewCacheDirectory(&base, 200000, time.Hour*24, time.Minute*2) + srv, err := search.NewServer( db, escli, - cctx.String("atp-plc-host"), - cctx.String("atp-pds-host"), - cctx.String("atp-bgs-host"), + &dir, + search.Config{ + BGSHost: cctx.String("atp-bgs-host"), + ProfileIndex: cctx.String("es-profile-index"), + PostIndex: cctx.String("es-post-index"), + Logger: logger, + BGSSyncRateLimit: cctx.Int("bgs-sync-rate-limit"), + IndexMaxConcurrency: cctx.Int("index-max-concurrency"), + }, ) if err != nil { return err @@ -156,7 +184,10 @@ var runCmd = &cli.Command{ if cctx.Bool("readonly") { select {} } else { - ctx := context.TODO() + ctx := context.Background() + if err := srv.EnsureIndices(ctx); err != nil { + return fmt.Errorf("failed to create opensearch indices: %w", err) + } if err := srv.RunIndexer(ctx); err != nil { return fmt.Errorf("failed to run indexer: %w", err) } @@ -184,49 +215,102 @@ var elasticCheckCmd = &cli.Command{ if err != nil { return fmt.Errorf("failed to get info: %w", err) } + defer inf.Body.Close() + if inf.IsError() { + return fmt.Errorf("failed to get info") + } + slog.Info("opensearch client connected", "client_info", inf) + + resp, err := escli.Indices.Exists([]string{cctx.String("es-profile-index"), cctx.String("es-post-index")}) + if err != nil { + return fmt.Errorf("failed to check index existence: %w", err) + } + defer resp.Body.Close() + if inf.IsError() { + return fmt.Errorf("failed to check index existence") + } + slog.Info("index existence", "resp", resp) - fmt.Println(inf) return nil }, } -var searchCmd = &cli.Command{ - Name: "search", - Usage: "run a simple query against search index", +func printHits(resp *search.EsSearchResponse) { + fmt.Printf("%d hits in %d\n", len(resp.Hits.Hits), resp.Took) + for _, hit := range resp.Hits.Hits { + b, _ := json.Marshal(hit.Source) + fmt.Println(string(b)) + } + return +} + +var searchPostCmd = &cli.Command{ + Name: "search-post", + Usage: "run a simple query against posts index", Action: func(cctx *cli.Context) error { escli, err := createEsClient(cctx) if err != nil { return err } - - var buf bytes.Buffer - query := map[string]interface{}{ - "query": map[string]interface{}{ - "match": map[string]interface{}{ - "text": cctx.Args().First(), - }, - }, - } - if err := json.NewEncoder(&buf).Encode(query); err != nil { - log.Fatalf("Error encoding query: %s", err) - } - - // Perform the search request. - res, err := escli.Search( - escli.Search.WithContext(context.Background()), - escli.Search.WithIndex(cctx.String("es-posts-index")), - escli.Search.WithBody(&buf), - escli.Search.WithTrackTotalHits(true), - escli.Search.WithPretty(), + res, err := search.DoSearchPosts( + context.Background(), + identity.DefaultDirectory(), // TODO: parse PLC arg + escli, + cctx.String("es-post-index"), + strings.Join(cctx.Args().Slice(), " "), + 0, + 20, ) if err != nil { - log.Fatalf("Error getting response: %s", err) + return err } - - fmt.Println(res) + printHits(res) return nil + }, +} +var searchProfileCmd = &cli.Command{ + Name: "search-profile", + Usage: "run a simple query against posts index", + Flags: []cli.Flag{ + &cli.BoolFlag{ + Name: "typeahead", + }, + }, + Action: func(cctx *cli.Context) error { + escli, err := createEsClient(cctx) + if err != nil { + return err + } + if cctx.Bool("typeahead") { + res, err := search.DoSearchProfilesTypeahead( + context.Background(), + escli, + cctx.String("es-profile-index"), + strings.Join(cctx.Args().Slice(), " "), + 10, + ) + if err != nil { + return err + } + printHits(res) + } else { + res, err := search.DoSearchProfiles( + context.Background(), + identity.DefaultDirectory(), // TODO: parse PLC arg + escli, + cctx.String("es-profile-index"), + strings.Join(cctx.Args().Slice(), " "), + 0, + 20, + ) + if err != nil { + return err + } + printHits(res) + } + return nil }, } @@ -252,8 +336,10 @@ func createEsClient(cctx *cli.Context) (*es.Client, error) { Addresses: addrs, Username: cctx.String("elastic-username"), Password: cctx.String("elastic-password"), - - CACert: cert, + CACert: cert, + Transport: &http.Transport{ + MaxIdleConnsPerHost: 20, + }, } escli, err := es.NewClient(cfg) @@ -266,7 +352,7 @@ func createEsClient(cctx *cli.Context) (*es.Client, error) { return nil, fmt.Errorf("cannot get escli info: %w", err) } defer info.Body.Close() - log.Info(info) + slog.Debug("opensearch client initialized", "info", info) return escli, nil } diff --git a/go.mod b/go.mod index 2630f8a7d..d4f562e69 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/dustinkirkland/golang-petname v0.0.0-20230626224747-e794b9370d49 github.com/goccy/go-json v0.10.2 github.com/golang-jwt/jwt v3.2.2+incompatible + github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 github.com/gorilla/websocket v1.5.0 github.com/hashicorp/go-retryablehttp v0.7.2 github.com/hashicorp/golang-lru v0.5.4 @@ -31,7 +32,7 @@ require ( github.com/jackc/pgx/v5 v5.3.0 github.com/joho/godotenv v1.5.1 github.com/labstack/echo-contrib v0.15.0 - github.com/labstack/echo/v4 v4.10.2 + github.com/labstack/echo/v4 v4.11.1 github.com/labstack/gommon v0.4.0 github.com/lestrrat-go/jwx/v2 v2.0.12 github.com/minio/sha256-simd v1.0.0 @@ -42,6 +43,7 @@ require ( github.com/polydawn/refmt v0.89.1-0.20221221234430-40501e09de1f github.com/prometheus/client_golang v1.14.0 github.com/prometheus/client_model v0.3.0 + github.com/rivo/uniseg v0.1.0 github.com/stretchr/testify v1.8.4 github.com/urfave/cli/v2 v2.25.1 github.com/whyrusleeping/cbor-gen v0.0.0-20230818171029-f91ae536ca25 @@ -112,7 +114,7 @@ require ( github.com/lestrrat-go/iter v1.0.2 // indirect github.com/lestrrat-go/option v1.0.1 // indirect github.com/mattn/go-colorable v0.1.13 // indirect - github.com/mattn/go-isatty v0.0.17 // indirect + github.com/mattn/go-isatty v0.0.19 // indirect github.com/mattn/go-sqlite3 v1.14.15 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect @@ -128,6 +130,8 @@ require ( github.com/prometheus/procfs v0.9.0 // indirect github.com/prometheus/statsd_exporter v0.22.7 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect + github.com/samber/lo v1.38.1 // indirect + github.com/samber/slog-echo v1.2.1 // indirect github.com/segmentio/asm v1.2.0 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect @@ -141,7 +145,7 @@ require ( go.uber.org/atomic v1.10.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/mod v0.10.0 // indirect - golang.org/x/net v0.10.0 // indirect + golang.org/x/net v0.14.0 // indirect golang.org/x/sys v0.11.0 // indirect golang.org/x/text v0.12.0 // indirect google.golang.org/genproto v0.0.0-20230526015343-6ee61e4f9d5f // indirect diff --git a/go.sum b/go.sum index 239061263..04b0d2c84 100644 --- a/go.sum +++ b/go.sum @@ -209,6 +209,8 @@ github.com/google/pprof v0.0.0-20200229191704-1ebb73c60ed3/go.mod h1:ZgVRPoUq/hf github.com/google/pprof v0.0.0-20200430221834-fc25d7d30c6d/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= +github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaUGG7oYTSPP8MxqL4YI3kZKwcP4= +github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= @@ -390,6 +392,8 @@ github.com/labstack/echo-contrib v0.15.0 h1:9K+oRU265y4Mu9zpRDv3X+DGTqUALY6oRHCS github.com/labstack/echo-contrib v0.15.0/go.mod h1:lei+qt5CLB4oa7VHTE0yEfQSEB9XTJI1LUqko9UWvo4= github.com/labstack/echo/v4 v4.10.2 h1:n1jAhnq/elIFTHr1EYpiYtyKgx4RW9ccVgkqByZaN2M= github.com/labstack/echo/v4 v4.10.2/go.mod h1:OEyqf2//K1DFdE57vw2DRgWY0M7s65IVQO2FzvI4J5k= +github.com/labstack/echo/v4 v4.11.1 h1:dEpLU2FLg4UVmvCGPuk/APjlH6GDpbEPti61srUUUs4= +github.com/labstack/echo/v4 v4.11.1/go.mod h1:YuYRTSM3CHs2ybfrL8Px48bO6BAnYIN4l8wSTMP6BDQ= github.com/labstack/gommon v0.4.0 h1:y7cvthEAEbU0yHOf4axH8ZG2NH8knB9iNSoTO8dyIk8= github.com/labstack/gommon v0.4.0/go.mod h1:uW6kP17uPlLJsD3ijUYn3/M5bAxtlZhMI6m3MFxTMTM= github.com/lestrrat-go/blackmagic v1.0.1 h1:lS5Zts+5HIC/8og6cGHb0uCcNCa3OUt1ygh3Qz2Fe80= @@ -431,6 +435,8 @@ github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27k github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPng= github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= +github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-runewidth v0.0.10/go.mod h1:RAqKPSqVFrSLVXbA8x7dzmKdmGzieGRCM46jaSJTDAk= github.com/mattn/go-sqlite3 v1.14.15 h1:vfoHhTN1af61xCRSWzFIWzx2YskyMTwHLrExkBOjvxI= github.com/mattn/go-sqlite3 v1.14.15/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= @@ -542,6 +548,7 @@ github.com/prometheus/procfs v0.9.0 h1:wzCHvIvM5SxWqYvwgVL7yJY8Lz3PKn49KQtpgMYJf github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB/chUwxUZY= github.com/prometheus/statsd_exporter v0.22.7 h1:7Pji/i2GuhK6Lu7DHrtTkFmNBCudCPT1pX2CziuyQR0= github.com/prometheus/statsd_exporter v0.22.7/go.mod h1:N/TevpjkIh9ccs6nuzY3jQn9dFqnUakOjnEuMPJJJnI= +github.com/rivo/uniseg v0.1.0 h1:+2KBaVoUmb9XzDsrx/Ct0W/EYOSFf/nWTauy++DprtY= github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= @@ -551,6 +558,10 @@ github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncj github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM= +github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA= +github.com/samber/slog-echo v1.2.1 h1:VO3DuuGhKIqlx9sz02nyLmSPKsUFdrXj1/7eFD2dzZM= +github.com/samber/slog-echo v1.2.1/go.mod h1:4OnV/xNkLTsb5x5GOnze/LZbHHOuVqaiWnyqBQoSN24= github.com/scylladb/termtables v0.0.0-20191203121021-c4c0b6d42ff4/go.mod h1:C1a7PQSMz9NShzorzCiG2fk9+xuCgLkPeCvMHYR2OWg= github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys= github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= @@ -757,6 +768,8 @@ golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.14.0 h1:BONx9s002vGdD9umnlX1Po8vOZmrgH34qlHcD1MfK14= +golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -835,6 +848,7 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/search/firehose.go b/search/firehose.go index 53d48564c..6f6612a1e 100644 --- a/search/firehose.go +++ b/search/firehose.go @@ -5,16 +5,19 @@ import ( "context" "fmt" "net/http" + "net/url" "strings" comatproto "github.com/bluesky-social/indigo/api/atproto" bsky "github.com/bluesky-social/indigo/api/bsky" + "github.com/bluesky-social/indigo/atproto/syntax" "github.com/bluesky-social/indigo/backfill" "github.com/bluesky-social/indigo/events" "github.com/bluesky-social/indigo/events/schedulers/autoscaling" lexutil "github.com/bluesky-social/indigo/lex/util" "github.com/bluesky-social/indigo/repo" "github.com/bluesky-social/indigo/repomgr" + "github.com/gorilla/websocket" "github.com/ipfs/go-cid" typegen "github.com/whyrusleeping/cbor-gen" @@ -50,7 +53,15 @@ func (s *Server) RunIndexer(ctx context.Context) error { go s.bf.Start() d := websocket.DefaultDialer - con, _, err := d.Dial(fmt.Sprintf("%s/xrpc/com.atproto.sync.subscribeRepos?cursor=%d", s.bgshost, cur), http.Header{}) + u, err := url.Parse(s.bgshost) + if err != nil { + return fmt.Errorf("invalid bgshost URI: %w", err) + } + u.Path = "xrpc/com.atproto.sync.subscribeRepos" + if cur != 0 { + u.RawQuery = fmt.Sprintf("cursor=%d", cur) + } + con, _, err := d.Dial(u.String(), http.Header{}) if err != nil { return fmt.Errorf("events dial failed: %w", err) } @@ -60,18 +71,21 @@ func (s *Server) RunIndexer(ctx context.Context) error { defer func() { if evt.Seq%50 == 0 { if err := s.updateLastCursor(evt.Seq); err != nil { - log.Error("Failed to update cursor: ", err) + s.logger.Error("failed to persist cursor", "err", err) } } }() + logEvt := s.logger.With("repo", evt.Repo, "rev", evt.Rev, "seq", evt.Seq) if evt.TooBig && evt.Prev != nil { - log.Errorf("skipping non-genesis too big events for now: %d", evt.Seq) + // TODO: handle this case (instead of return nil) + logEvt.Error("skipping non-genesis tooBig events for now") return nil } if evt.TooBig { if err := s.processTooBigCommit(ctx, evt); err != nil { - log.Errorf("failed to process tooBig event: %s", err) + // TODO: handle this case (instead of return nil) + logEvt.Error("failed to process tooBig event", "err", err) return nil } @@ -80,34 +94,39 @@ func (s *Server) RunIndexer(ctx context.Context) error { r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks)) if err != nil { - log.Errorf("reading repo from car (seq: %d, len: %d): %w", evt.Seq, len(evt.Blocks), err) + // TODO: handle this case (instead of return nil) + logEvt.Error("reading repo from car", "size_bytes", len(evt.Blocks), "err", err) return nil } for _, op := range evt.Ops { ek := repomgr.EventKind(op.Action) + logOp := logEvt.With("op_path", op.Path, "op_cid", op.Cid) switch ek { case repomgr.EvtKindCreateRecord, repomgr.EvtKindUpdateRecord: rc, rec, err := r.GetRecord(ctx, op.Path) if err != nil { - e := fmt.Errorf("getting record %s (%s) within seq %d for %s: %w", op.Path, *op.Cid, evt.Seq, evt.Repo, err) - log.Error(e) + // TODO: handle this case (instead of return nil) + logOp.Error("fetching record from event CAR slice", "err", err) return nil } if lexutil.LexLink(rc) != *op.Cid { - log.Errorf("mismatch in record and op cid: %s != %s", rc, *op.Cid) + // TODO: handle this case (instead of return nil) + logOp.Error("mismatch in record and op cid", "record_cid", rc) return nil } if err := s.handleOp(ctx, ek, evt.Seq, op.Path, evt.Repo, &rc, rec); err != nil { - log.Errorf("failed to handle op: %s", err) + // TODO: handle this case (instead of return nil) + logOp.Error("failed to handle event op", "err", err) return nil } case repomgr.EvtKindDeleteRecord: if err := s.handleOp(ctx, ek, evt.Seq, op.Path, evt.Repo, nil, nil); err != nil { - log.Errorf("failed to handle delete: %s", err) + // TODO: handle this case (instead of return nil) + logOp.Error("failed to handle delete", "err", err) return nil } } @@ -118,7 +137,8 @@ func (s *Server) RunIndexer(ctx context.Context) error { }, RepoHandle: func(evt *comatproto.SyncSubscribeRepos_Handle) error { if err := s.updateUserHandle(ctx, evt.Did, evt.Handle); err != nil { - log.Errorf("failed to update user handle: %s", err) + // TODO: handle this case (instead of return nil) + s.logger.Error("failed to update user handle", "did", evt.Did, "handle", evt.Handle, "seq", evt.Seq, "err", err) } return nil }, @@ -133,39 +153,49 @@ func (s *Server) RunIndexer(ctx context.Context) error { ) } -func (s *Server) handleCreateOrUpdate(ctx context.Context, did string, path string, recP *typegen.CBORMarshaler, rcid *cid.Cid) error { +func (s *Server) handleCreateOrUpdate(ctx context.Context, rawDID string, path string, recP *typegen.CBORMarshaler, rcid *cid.Cid) error { // Since this gets called in a backfill job, we need to check if the path is a post or profile if !strings.Contains(path, "app.bsky.feed.post") && !strings.Contains(path, "app.bsky.actor.profile") { return nil } - u, err := s.getOrCreateUser(ctx, did) + did, err := syntax.ParseDID(rawDID) + if err != nil { + return fmt.Errorf("bad DID syntax in event: %w", err) + } + + ident, err := s.dir.LookupDID(ctx, did) if err != nil { - return fmt.Errorf("checking user: %w", err) + return fmt.Errorf("resolving identity: %w", err) } rec := *recP switch rec := rec.(type) { case *bsky.FeedPost: - if err := s.indexPost(ctx, u, rec, path, *rcid); err != nil { - return fmt.Errorf("indexing post: %w", err) + if err := s.indexPost(ctx, ident, rec, path, *rcid); err != nil { + return fmt.Errorf("indexing post for %s: %w", did.String(), err) } case *bsky.ActorProfile: - if err := s.indexProfile(ctx, u, rec); err != nil { - return fmt.Errorf("indexing profile: %w", err) + if err := s.indexProfile(ctx, ident, rec, path, *rcid); err != nil { + return fmt.Errorf("indexing profile for %s: %w", did.String(), err) } default: } return nil } -func (s *Server) handleDelete(ctx context.Context, did string, path string) error { +func (s *Server) handleDelete(ctx context.Context, rawDID, path string) error { // Since this gets called in a backfill job, we need to check if the path is a post or profile if !strings.Contains(path, "app.bsky.feed.post") && !strings.Contains(path, "app.bsky.actor.profile") { return nil } - u, err := s.getOrCreateUser(ctx, did) + did, err := syntax.ParseDID(rawDID) + if err != nil { + return fmt.Errorf("invalid DID in event: %w", err) + } + + ident, err := s.dir.LookupDID(ctx, did) if err != nil { return err } @@ -173,7 +203,7 @@ func (s *Server) handleDelete(ctx context.Context, did string, path string) erro switch { // TODO: handle profile deletes, its an edge case, but worth doing still case strings.Contains(path, "app.bsky.feed.post"): - if err := s.deletePost(ctx, u, path); err != nil { + if err := s.deletePost(ctx, ident, path); err != nil { return err } } @@ -188,42 +218,42 @@ func (s *Server) handleOp(ctx context.Context, op repomgr.EventKind, seq int64, } if op == repomgr.EvtKindCreateRecord || op == repomgr.EvtKindUpdateRecord { - log.Infof("handling create(%d): %s - %s", seq, did, path) + s.logger.Debug("processing create record op", "seq", seq, "did", did, "path", path) // Try to buffer the op, if it fails, we need to create a backfill job _, err := s.bfs.BufferOp(ctx, did, string(op), path, &rec, rcid) if err == backfill.ErrJobNotFound { - log.Infof("no job found for repo %s, creating one", did) + s.logger.Debug("no backfill job found for repo, creating one", "did", did) if err := s.bfs.EnqueueJob(did); err != nil { - return fmt.Errorf("enqueueing job: %w", err) + return fmt.Errorf("enqueueing backfill job: %w", err) } // Try to buffer the op again so it gets picked up by the backfill job _, err = s.bfs.BufferOp(ctx, did, string(op), path, &rec, rcid) if err != nil { - return fmt.Errorf("buffering op: %w", err) + return fmt.Errorf("buffering backfill op: %w", err) } } else if err == backfill.ErrJobComplete { // Backfill is done for this repo so we can just index it now err = s.handleCreateOrUpdate(ctx, did, path, &rec, rcid) } } else if op == repomgr.EvtKindDeleteRecord { - log.Infof("handling delete(%d): %s - %s", seq, did, path) + s.logger.Debug("processing delete record op", "seq", seq, "did", did, "path", path) // Try to buffer the op, if it fails, we need to create a backfill job _, err := s.bfs.BufferOp(ctx, did, string(op), path, &rec, rcid) if err == backfill.ErrJobNotFound { - log.Infof("no job found for repo %s, creating one", did) + s.logger.Debug("no backfill job found for repo, creating one", "did", did) if err := s.bfs.EnqueueJob(did); err != nil { - return fmt.Errorf("enqueueing job: %w", err) + return fmt.Errorf("enqueueing backfill job: %w", err) } // Try to buffer the op again so it gets picked up by the backfill job _, err = s.bfs.BufferOp(ctx, did, string(op), path, &rec, rcid) if err != nil { - return fmt.Errorf("buffering op: %w", err) + return fmt.Errorf("buffering backfill op: %w", err) } } else if err == backfill.ErrJobComplete { // Backfill is done for this repo so we can delete imemdiately @@ -249,7 +279,12 @@ func (s *Server) processTooBigCommit(ctx context.Context, evt *comatproto.SyncSu return err } - u, err := s.getOrCreateUser(ctx, evt.Repo) + did, err := syntax.ParseDID(evt.Repo) + if err != nil { + return fmt.Errorf("bad DID in repo event: %w", err) + } + + ident, err := s.dir.LookupDID(ctx, did) if err != nil { return err } @@ -258,17 +293,18 @@ func (s *Server) processTooBigCommit(ctx context.Context, evt *comatproto.SyncSu if strings.HasPrefix(k, "app.bsky.feed.post") || strings.HasPrefix(k, "app.bsky.actor.profile") { rcid, rec, err := r.GetRecord(ctx, k) if err != nil { - log.Errorf("failed to get record from repo checkout: %s", err) + // TODO: handle this case (instead of return nil) + s.logger.Error("failed to get record from repo checkout", "path", k, "err", err) return nil } switch rec := rec.(type) { case *bsky.FeedPost: - if err := s.indexPost(ctx, u, rec, k, rcid); err != nil { + if err := s.indexPost(ctx, ident, rec, k, rcid); err != nil { return fmt.Errorf("indexing post: %w", err) } case *bsky.ActorProfile: - if err := s.indexProfile(ctx, u, rec); err != nil { + if err := s.indexProfile(ctx, ident, rec, k, rcid); err != nil { return fmt.Errorf("indexing profile: %w", err) } default: diff --git a/search/handlers.go b/search/handlers.go index 216d93fc6..6232da073 100644 --- a/search/handlers.go +++ b/search/handlers.go @@ -2,69 +2,91 @@ package search import ( "context" + "encoding/json" "fmt" "strconv" "strings" - api "github.com/bluesky-social/indigo/api" - bsky "github.com/bluesky-social/indigo/api/bsky" + "github.com/bluesky-social/indigo/atproto/syntax" + "github.com/labstack/echo/v4" otel "go.opentelemetry.io/otel" ) -type ActorSearchResp struct { - bsky.ActorProfile - DID string `json:"did"` +type SearchPostsSkeletonResp struct { + Cursor string `json:"cursor,omitempty"` + HitsTotal *int `json:"hitsTotal,omitempty"` + Posts []syntax.ATURI `json:"posts"` } -func (s *Server) handleFromDid(ctx context.Context, did string) (string, error) { - phr := &api.ProdHandleResolver{} - handle, _, err := api.ResolveDidToHandle(ctx, s.xrpcc, s.plc, phr, did) - if err != nil { - return "", err - } - - return handle, nil +type SearchActorsSkeletonResp struct { + Cursor string `json:"cursor,omitempty"` + HitsTotal *int `json:"hitsTotal,omitempty"` + Actors []syntax.DID `json:"actors"` } -func (s *Server) handleSearchRequestPosts(e echo.Context) error { - ctx, span := otel.Tracer("search").Start(e.Request().Context(), "handleSearchRequestPosts") - defer span.End() - - q := strings.TrimSpace(e.QueryParam("q")) - if q == "" { - return e.JSON(400, map[string]any{ - "error": "must pass non-empty search query", - }) - } - +func parseCursorLimit(e echo.Context) (int, int, error) { offset := 0 - if q := strings.TrimSpace(e.QueryParam("offset")); q != "" { - v, err := strconv.Atoi(q) + if c := strings.TrimSpace(e.QueryParam("cursor")); c != "" { + v, err := strconv.Atoi(c) if err != nil { - return &echo.HTTPError{ + return 0, 0, &echo.HTTPError{ Code: 400, - Message: fmt.Sprintf("invalid value for 'offset': %s", err), + Message: fmt.Sprintf("invalid value for 'cursor': %s", err), } } - offset = v } - count := 30 - if q := strings.TrimSpace(e.QueryParam("count")); q != "" { - v, err := strconv.Atoi(q) + if offset < 0 { + offset = 0 + } + if offset > 10000 { + return 0, 0, &echo.HTTPError{ + Code: 400, + Message: fmt.Sprintf("invalid value for 'cursor' (can't paginate so deep)"), + } + } + + limit := 25 + if l := strings.TrimSpace(e.QueryParam("limit")); l != "" { + v, err := strconv.Atoi(l) if err != nil { - return &echo.HTTPError{ + return 0, 0, &echo.HTTPError{ Code: 400, Message: fmt.Sprintf("invalid value for 'count': %s", err), } } - count = v + limit = v + } + + if limit > 100 { + limit = 100 + } + if limit < 0 { + limit = 0 + } + return offset, limit, nil +} + +func (s *Server) handleSearchPostsSkeleton(e echo.Context) error { + ctx, span := otel.Tracer("search").Start(e.Request().Context(), "handleSearchPostsSkeleton") + defer span.End() + + q := strings.TrimSpace(e.QueryParam("q")) + if q == "" { + return e.JSON(400, map[string]any{ + "error": "must pass non-empty search query", + }) + } + + offset, limit, err := parseCursorLimit(e) + if err != nil { + return err } - out, err := s.SearchPosts(ctx, q, offset, count) + out, err := s.SearchPosts(ctx, q, offset, limit) if err != nil { return err } @@ -72,8 +94,8 @@ func (s *Server) handleSearchRequestPosts(e echo.Context) error { return e.JSON(200, out) } -func (s *Server) handleSearchRequestProfiles(e echo.Context) error { - ctx, span := otel.Tracer("search").Start(e.Request().Context(), "handleSearchRequestProfiles") +func (s *Server) handleSearchActorsSkeleton(e echo.Context) error { + ctx, span := otel.Tracer("search").Start(e.Request().Context(), "handleSearchActorsSkeleton") defer span.End() q := strings.TrimSpace(e.QueryParam("q")) @@ -83,10 +105,88 @@ func (s *Server) handleSearchRequestProfiles(e echo.Context) error { }) } - out, err := s.SearchProfiles(ctx, q) + offset, limit, err := parseCursorLimit(e) + if err != nil { + return err + } + + typeahead := false + if q := strings.TrimSpace(e.QueryParam("typeahead")); q == "true" || q == "1" || q == "y" { + typeahead = true + } + + out, err := s.SearchProfiles(ctx, q, typeahead, offset, limit) if err != nil { return err } return e.JSON(200, out) } + +func (s *Server) SearchPosts(ctx context.Context, q string, offset, size int) (*SearchPostsSkeletonResp, error) { + resp, err := DoSearchPosts(ctx, s.dir, s.escli, s.postIndex, q, offset, size) + if err != nil { + return nil, err + } + + posts := []syntax.ATURI{} + for _, r := range resp.Hits.Hits { + var doc PostDoc + if err := json.Unmarshal(r.Source, &doc); err != nil { + return nil, fmt.Errorf("decoding post doc from search response: %w", err) + } + + did, err := syntax.ParseDID(doc.DID) + if err != nil { + return nil, fmt.Errorf("invalid DID in indexed document: %w", err) + } + + posts = append(posts, syntax.ATURI(fmt.Sprintf("at://%s/app.bsky.feed.post/%s", did, doc.RecordRkey))) + } + + out := SearchPostsSkeletonResp{Posts: posts} + if len(posts) == size && (offset+size) < 10000 { + out.Cursor = fmt.Sprintf("%d", offset+size) + } + if resp.Hits.Total.Relation == "eq" { + out.HitsTotal = &resp.Hits.Total.Value + } + return &out, nil +} + +func (s *Server) SearchProfiles(ctx context.Context, q string, typeahead bool, offset, size int) (*SearchActorsSkeletonResp, error) { + var resp *EsSearchResponse + var err error + if typeahead { + resp, err = DoSearchProfilesTypeahead(ctx, s.escli, s.profileIndex, q, size) + } else { + resp, err = DoSearchProfiles(ctx, s.dir, s.escli, s.profileIndex, q, offset, size) + } + if err != nil { + return nil, err + } + + actors := []syntax.DID{} + for _, r := range resp.Hits.Hits { + var doc ProfileDoc + if err := json.Unmarshal(r.Source, &doc); err != nil { + return nil, fmt.Errorf("decoding profile doc from search response: %w", err) + } + + did, err := syntax.ParseDID(doc.DID) + if err != nil { + return nil, fmt.Errorf("invalid DID in indexed document: %w", err) + } + + actors = append(actors, did) + } + + out := SearchActorsSkeletonResp{Actors: actors} + if len(actors) == size && (offset+size) < 10000 { + out.Cursor = fmt.Sprintf("%d", offset+size) + } + if resp.Hits.Total.Relation == "eq" { + out.HitsTotal = &resp.Hits.Total.Value + } + return &out, nil +} diff --git a/search/indexing.go b/search/indexing.go index 6c89a1995..59903712f 100644 --- a/search/indexing.go +++ b/search/indexing.go @@ -5,20 +5,24 @@ import ( "context" "encoding/json" "fmt" - "time" + "io/ioutil" + "regexp" + "strings" - bsky "github.com/bluesky-social/indigo/api/bsky" + appbsky "github.com/bluesky-social/indigo/api/bsky" + "github.com/bluesky-social/indigo/atproto/identity" "github.com/bluesky-social/indigo/util" "github.com/ipfs/go-cid" esapi "github.com/opensearch-project/opensearch-go/v2/opensearchapi" ) -func (s *Server) deletePost(ctx context.Context, u *User, path string) error { - log.Infof("deleting post: %s", path) +func (s *Server) deletePost(ctx context.Context, ident *identity.Identity, rkey string) error { + s.logger.Info("deleting post from index", "repo", ident.DID, "rkey", rkey) + docID := fmt.Sprintf("%s_%s", ident.DID.String(), rkey) req := esapi.DeleteRequest{ - Index: "posts", - DocumentID: encodeDocumentID(u.ID, path), + Index: s.postIndex, + DocumentID: docID, Refresh: "true", } @@ -26,104 +30,93 @@ func (s *Server) deletePost(ctx context.Context, u *User, path string) error { if err != nil { return fmt.Errorf("failed to delete post: %w", err) } - - fmt.Println(res) - + defer res.Body.Close() + ioutil.ReadAll(res.Body) + if res.IsError() { + s.logger.Warn("opensearch indexing error", "status_code", res.StatusCode, "response", res) + return fmt.Errorf("indexing error, code=%d", res.StatusCode) + } return nil } -func (s *Server) indexPost(ctx context.Context, u *User, rec *bsky.FeedPost, tid string, pcid cid.Cid) error { - if err := s.db.Create(&PostRef{ - Cid: pcid.String(), - Tid: tid, - Uid: u.ID, - }).Error; err != nil { - return err +func (s *Server) indexPost(ctx context.Context, ident *identity.Identity, rec *appbsky.FeedPost, path string, rcid cid.Cid) error { + + parts := strings.SplitN(path, "/", 3) + // TODO: replace with an atproto/syntax package type for TID + var tidRegex = regexp.MustCompile(`^[234567abcdefghijklmnopqrstuvwxyz]{13}$`) + if len(parts) != 2 || !tidRegex.MatchString(parts[1]) { + s.logger.Warn("skipping index post record with weird path/TID", "did", ident.DID, "path", path) + return nil } + rkey := parts[1] - ts, err := time.Parse(util.ISO8601, rec.CreatedAt) + _, err := util.ParseTimestamp(rec.CreatedAt) if err != nil { - return fmt.Errorf("post (%d, %s) had invalid timestamp (%q): %w", u.ID, tid, rec.CreatedAt, err) + s.logger.Warn("post had invalid timestamp", "repo", ident.DID, "rkey", rkey, "createdAt", rec.CreatedAt, "parseErr", err) + rec.CreatedAt = "" } - blob := map[string]any{ - "text": rec.Text, - "createdAt": ts.UnixNano(), - "user": u.Handle, - } - b, err := json.Marshal(blob) + doc := TransformPost(rec, ident, rkey, rcid.String()) + b, err := json.Marshal(doc) if err != nil { return err } - log.Infof("Indexing post") + s.logger.Debug("indexing post", "did", ident.DID, "rkey", rkey) req := esapi.IndexRequest{ - Index: "posts", - DocumentID: encodeDocumentID(u.ID, tid), + Index: s.postIndex, + DocumentID: doc.DocId(), Body: bytes.NewReader(b), - Refresh: "true", } res, err := req.Do(ctx, s.escli) if err != nil { return fmt.Errorf("failed to send indexing request: %w", err) } - - fmt.Println(res) - + defer res.Body.Close() + ioutil.ReadAll(res.Body) + if res.IsError() { + s.logger.Warn("opensearch indexing error", "status_code", res.StatusCode, "response", res) + return fmt.Errorf("indexing error, code=%d", res.StatusCode) + } return nil } -func (s *Server) indexProfile(ctx context.Context, u *User, rec *bsky.ActorProfile) error { - b, err := json.Marshal(rec) - if err != nil { - return err - } +func (s *Server) indexProfile(ctx context.Context, ident *identity.Identity, rec *appbsky.ActorProfile, path string, rcid cid.Cid) error { - n := "" - if rec.DisplayName != nil { - n = *rec.DisplayName + parts := strings.SplitN(path, "/", 3) + if len(parts) != 2 || parts[1] != "self" { + s.logger.Warn("skipping indexing non-canonical profile record", "did", ident.DID, "path", path) + return nil } - blob := map[string]string{ - "displayName": n, - "handle": u.Handle, - "did": u.Did, - } + s.logger.Info("indexing profile", "did", ident.DID, "handle", ident.Handle) - if rec.Description != nil { - blob["description"] = *rec.Description + doc := TransformProfile(rec, ident, rcid.String()) + b, err := json.Marshal(doc) + if err != nil { + return err } - - log.Infof("Indexing profile: %s", n) req := esapi.IndexRequest{ - Index: "profiles", - DocumentID: fmt.Sprint(u.ID), + Index: s.profileIndex, + DocumentID: ident.DID.String(), Body: bytes.NewReader(b), - Refresh: "true", } res, err := req.Do(context.Background(), s.escli) if err != nil { return fmt.Errorf("failed to send indexing request: %w", err) } - fmt.Println(res) - + defer res.Body.Close() + ioutil.ReadAll(res.Body) + if res.IsError() { + s.logger.Warn("opensearch indexing error", "status_code", res.StatusCode, "response", res) + return fmt.Errorf("indexing error, code=%d", res.StatusCode) + } return nil } func (s *Server) updateUserHandle(ctx context.Context, did, handle string) error { - u, err := s.getOrCreateUser(ctx, did) - if err != nil { - return err - } - - if err := s.db.Model(User{}).Where("id = ?", u.ID).Update("handle", handle).Error; err != nil { - return err - } - - u.Handle = handle - b, err := json.Marshal(map[string]any{ "script": map[string]any{ "source": "ctx._source.handle = params.handle", @@ -138,17 +131,20 @@ func (s *Server) updateUserHandle(ctx context.Context, did, handle string) error } req := esapi.UpdateRequest{ - Index: "profiles", - DocumentID: fmt.Sprint(u.ID), + Index: s.profileIndex, + DocumentID: did, Body: bytes.NewReader(b), - Refresh: "true", } res, err := req.Do(context.Background(), s.escli) if err != nil { return fmt.Errorf("failed to send indexing request: %w", err) } - fmt.Println(res) - + defer res.Body.Close() + ioutil.ReadAll(res.Body) + if res.IsError() { + s.logger.Warn("opensearch indexing error", "status_code", res.StatusCode, "response", res) + return fmt.Errorf("indexing error, code=%d", res.StatusCode) + } return nil } diff --git a/search/parse_query.go b/search/parse_query.go new file mode 100644 index 000000000..894b42088 --- /dev/null +++ b/search/parse_query.go @@ -0,0 +1,70 @@ +package search + +import ( + "context" + "fmt" + "log/slog" + "strings" + + "github.com/bluesky-social/indigo/atproto/identity" + "github.com/bluesky-social/indigo/atproto/syntax" + + "github.com/google/shlex" +) + +// takes a query string and pulls out some facet patterns ("from:handle.net") as filters +func ParseQuery(ctx context.Context, dir identity.Directory, raw string) (string, []map[string]interface{}) { + var filters []map[string]interface{} + parts, err := shlex.Split(raw) + if err != nil { + // pass-through if failed to parse + return raw, filters + } + keep := make([]string, len(parts)) + for _, p := range parts { + if !strings.ContainsRune(p, ':') || strings.ContainsRune(p, ' ') { + // simple: quoted (whitespace), or just a token + keep = append(keep, p) + continue + } + if strings.HasPrefix(p, "did:") { + filters = append(filters, map[string]interface{}{ + "term": map[string]interface{}{"did": p}, + }) + continue + } + if strings.HasPrefix(p, "from:") && len(p) > 6 { + handle, err := syntax.ParseHandle(p[5:]) + if err != nil { + keep = append(keep, p) + continue + } + id, err := dir.LookupHandle(ctx, handle) + if err != nil { + if err != identity.ErrHandleNotFound { + slog.Error("failed to resolve handle", "err", err) + } + continue + } + filters = append(filters, map[string]interface{}{ + "term": map[string]interface{}{"did": id.DID.String()}, + }) + continue + } + keep = append(keep, p) + } + + out := "" + for _, p := range keep { + if strings.ContainsRune(p, ' ') { + out += fmt.Sprintf(" \"%s\"", p) + } else { + if out == "" { + out = p + } else { + out += " " + p + } + } + } + return out, filters +} diff --git a/search/parse_query_test.go b/search/parse_query_test.go new file mode 100644 index 000000000..18a00bf51 --- /dev/null +++ b/search/parse_query_test.go @@ -0,0 +1,43 @@ +package search + +import ( + "context" + "testing" + + "github.com/bluesky-social/indigo/atproto/identity" + "github.com/bluesky-social/indigo/atproto/syntax" + + "github.com/stretchr/testify/assert" +) + +func TestParseQuery(t *testing.T) { + ctx := context.Background() + assert := assert.New(t) + dir := identity.NewMockDirectory() + dir.Insert(identity.Identity{ + Handle: syntax.Handle("known.example.com"), + DID: syntax.DID("did:plc:abc222"), + }) + + var q string + var f []map[string]interface{} + + q, f = ParseQuery(ctx, &dir, "") + assert.Equal("", q) + assert.Empty(f) + + p1 := "some +test \"with phrase\" -ok" + q, f = ParseQuery(ctx, &dir, p1) + assert.Equal(p1, q) + assert.Empty(f) + + p2 := "missing from:missing.example.com" + q, f = ParseQuery(ctx, &dir, p2) + assert.Equal("missing", q) + assert.Empty(f) + + p3 := "known from:known.example.com" + q, f = ParseQuery(ctx, &dir, p3) + assert.Equal("known", q) + assert.Equal(1, len(f)) +} diff --git a/search/post_schema.json b/search/post_schema.json new file mode 100644 index 000000000..f8d052974 --- /dev/null +++ b/search/post_schema.json @@ -0,0 +1,70 @@ +{ +"settings": { + "index": { + "number_of_shards": 6, + "number_of_replicas": 0, + "analysis": { + "analyzer": { + "default": { + "type": "custom", + "tokenizer": "standard", + "filter": [ "lowercase", "asciifolding" ] + }, + "textIcu": { + "type": "custom", + "tokenizer": "icu_tokenizer", + "char_filter": [ "icu_normalizer" ], + "filter": [ "icu_folding" ] + }, + "textIcuSearch": { + "type": "custom", + "tokenizer": "icu_tokenizer", + "char_filter": [ "icu_normalizer" ], + "filter": [ "icu_folding" ] + } + }, + "normalizer": { + "default": { + "type": "custom", + "char_filter": [], + "filter": ["lowercase"] + }, + "caseSensitive": { + "type": "custom", + "char_filter": [], + "filter": [] + } + } + } + } +}, +"mappings": { + "dynamic": false, + "properties": { + "doc_index_ts": { "type": "date" }, + "did": { "type": "keyword", "normalizer": "default", "doc_values": false }, + "record_rkey": { "type": "keyword", "normalizer": "default", "doc_values": false }, + "record_cid": { "type": "keyword", "normalizer": "default", "doc_values": false }, + + "created_at": { "type": "date" }, + "text": { "type": "text", "analyzer": "textIcu", "search_analyzer": "textIcuSearch", "copy_to": "everything" }, + "lang_code": { "type": "keyword", "normalizer": "default" }, + "lang_code_iso2": { "type": "keyword", "normalizer": "default" }, + "mention_did": { "type": "keyword", "normalizer": "default" }, + "link_url": { "type": "keyword", "normalizer": "default" }, + "embed_url": { "type": "keyword", "normalizer": "default" }, + "embed_aturi": { "type": "keyword", "normalizer": "default" }, + "reply_root_aturi": { "type": "keyword", "normalizer": "default" }, + "embed_img_count": { "type": "integer" }, + "embed_img_alt_text": { "type": "text", "analyzer": "textIcu", "search_analyzer": "textIcuSearch", "copy_to": "everything" }, + "self_label": { "type": "keyword", "normalizer": "default" }, + + "hashtag": { "type": "keyword", "normalizer": "default" }, + "emoji": { "type": "keyword", "normalizer": "caseSensitive" }, + + "everything": { "type": "text", "analyzer": "textIcu", "search_analyzer": "textIcuSearch" }, + + "lang": { "type": "alias", "path": "lang_code_iso2" } + } +} +} diff --git a/search/profile_schema.json b/search/profile_schema.json new file mode 100644 index 000000000..e3c3bbab7 --- /dev/null +++ b/search/profile_schema.json @@ -0,0 +1,63 @@ +{ +"settings": { + "index": { + "number_of_shards": 1, + "number_of_replicas": 0, + "analysis": { + "analyzer": { + "default": { + "type": "custom", + "tokenizer": "standard", + "filter": [ "lowercase", "asciifolding" ] + }, + "textIcu": { + "type": "custom", + "tokenizer": "icu_tokenizer", + "char_filter": [ "icu_normalizer" ], + "filter": [ "icu_folding" ] + }, + "textIcuSearch": { + "type": "custom", + "tokenizer": "icu_tokenizer", + "char_filter": [ "icu_normalizer" ], + "filter": [ "icu_folding" ] + } + }, + "normalizer": { + "default": { + "type": "custom", + "char_filter": [], + "filter": ["lowercase"] + }, + "caseSensitive": { + "type": "custom", + "char_filter": [], + "filter": [] + } + } + } + } +}, +"mappings": { + "dynamic": false, + "properties": { + "doc_index_ts": { "type": "date" }, + "did": { "type": "keyword", "normalizer": "default", "doc_values": false }, + "handle": { "type": "keyword", "normalizer": "default", "copy_to": ["everything", "typeahead"] }, + + "display_name": { "type": "text", "analyzer": "textIcu", "search_analyzer": "textIcuSearch", "copy_to": ["everything", "typeahead"] }, + "description": { "type": "text", "analyzer": "textIcu", "search_analyzer": "textIcuSearch", "copy_to": "everything" }, + "img_alt_text": { "type": "text", "analyzer": "textIcu", "search_analyzer": "textIcuSearch", "copy_to": "everything" }, + "self_label": { "type": "keyword", "normalizer": "default" }, + + "hashtag": { "type": "keyword", "normalizer": "default" }, + "emoji": { "type": "keyword", "normalizer": "caseSensitive" }, + + "has_avatar": { "type": "boolean" }, + "has_banner": { "type": "boolean" }, + + "typeahead": { "type": "search_as_you_type" }, + "everything": { "type": "text", "analyzer": "textIcu", "search_analyzer": "textIcuSearch" } + } +} +} diff --git a/search/query.go b/search/query.go index 1d6194aff..d58ca9adb 100644 --- a/search/query.go +++ b/search/query.go @@ -5,6 +5,10 @@ import ( "context" "encoding/json" "fmt" + "io/ioutil" + "log/slog" + + "github.com/bluesky-social/indigo/atproto/identity" es "github.com/opensearch-project/opensearch-go/v2" ) @@ -17,7 +21,7 @@ type EsSearchHit struct { } type EsSearchHits struct { - Total struct { + Total struct { // not used Value int Relation string } `json:"total"` @@ -26,10 +30,9 @@ type EsSearchHits struct { } type EsSearchResponse struct { - Took int `json:"took"` - TimedOut bool `json:"timed_out"` - // Shards ??? - Hits EsSearchHits `json:"hits"` + Took int `json:"took"` + TimedOut bool `json:"timed_out"` + Hits EsSearchHits `json:"hits"` } type UserResult struct { @@ -44,60 +47,142 @@ type PostSearchResult struct { Post any `json:"post"` } -func doSearchPosts(ctx context.Context, escli *es.Client, q string, offset int, size int) (*EsSearchResponse, error) { +func checkParams(offset, size int) error { + if offset+size > 10000 || size > 250 || offset > 10000 || offset < 0 || size < 0 { + return fmt.Errorf("disallowed size/offset parameters") + } + return nil +} + +func DoSearchPosts(ctx context.Context, dir identity.Directory, escli *es.Client, index, q string, offset, size int) (*EsSearchResponse, error) { + if err := checkParams(offset, size); err != nil { + return nil, err + } + queryStr, filters := ParseQuery(ctx, dir, q) + basic := map[string]interface{}{ + "simple_query_string": map[string]interface{}{ + "query": queryStr, + "fields": []string{"everything"}, + "flags": "AND|NOT|OR|PHRASE|PRECEDENCE|WHITESPACE", + "default_operator": "and", + "lenient": true, + "analyze_wildcard": false, + }, + } query := map[string]interface{}{ + "query": map[string]interface{}{ + "bool": map[string]interface{}{ + "must": basic, + "filter": filters, + }, + }, "sort": map[string]any{ - "createdAt": map[string]any{ + "created_at": map[string]any{ "order": "desc", }, }, + "size": size, + "from": offset, + } + + return doSearch(ctx, escli, index, query) +} + +func DoSearchProfiles(ctx context.Context, dir identity.Directory, escli *es.Client, index, q string, offset, size int) (*EsSearchResponse, error) { + if err := checkParams(offset, size); err != nil { + return nil, err + } + queryStr, filters := ParseQuery(ctx, dir, q) + basic := map[string]interface{}{ + "simple_query_string": map[string]interface{}{ + "query": queryStr, + "fields": []string{"everything"}, + "flags": "AND|NOT|OR|PHRASE|PRECEDENCE|WHITESPACE", + "default_operator": "and", + "lenient": true, + "analyze_wildcard": false, + }, + } + query := map[string]interface{}{ "query": map[string]interface{}{ - "match": map[string]interface{}{ - "text": map[string]any{ - "query": q, - "operator": "and", + "bool": map[string]interface{}{ + "must": basic, + "should": []interface{}{ + map[string]interface{}{"term": map[string]interface{}{"has_avatar": true}}, + map[string]interface{}{"term": map[string]interface{}{"has_banner": true}}, }, + "filter": filters, + "boost": 1.0, }, }, "size": size, "from": offset, } - return doSearch(ctx, escli, "posts", query) + return doSearch(ctx, escli, index, query) } -func doSearchProfiles(ctx context.Context, escli *es.Client, q string) (*EsSearchResponse, error) { +func DoSearchProfilesTypeahead(ctx context.Context, escli *es.Client, index, q string, size int) (*EsSearchResponse, error) { + if err := checkParams(0, size); err != nil { + return nil, err + } query := map[string]interface{}{ "query": map[string]interface{}{ "multi_match": map[string]interface{}{ - "query": q, - "fields": []string{"description", "displayName", "handle"}, - "operator": "or", + "query": q, + "type": "bool_prefix", + "fields": []string{ + "typeahead", + "typeahead._2gram", + "typeahead._3gram", + }, + }, + }, + "size": size, + } + + return doSearch(ctx, escli, index, query) +} + +// helper to do a full-featured Lucene query parser (query_string) search, with all possible facets. Not safe to expose publicly. +func DoSearchGeneric(ctx context.Context, escli *es.Client, index, q string) (*EsSearchResponse, error) { + query := map[string]interface{}{ + "query": map[string]interface{}{ + "query_string": map[string]interface{}{ + "query": q, + "default_operator": "and", + "analyze_wildcard": true, + "allow_leading_wildcard": false, + "lenient": true, + "default_field": "everything", }, }, } - return doSearch(ctx, escli, "profiles", query) + return doSearch(ctx, escli, index, query) } func doSearch(ctx context.Context, escli *es.Client, index string, query interface{}) (*EsSearchResponse, error) { - var buf bytes.Buffer - if err := json.NewEncoder(&buf).Encode(query); err != nil { - log.Fatalf("Error encoding query: %s", err) + b, err := json.Marshal(query) + if err != nil { + return nil, fmt.Errorf("failed to serialize query: %w", err) } + slog.Warn("sending query", "index", index, "query", string(b)) // Perform the search request. res, err := escli.Search( escli.Search.WithContext(ctx), escli.Search.WithIndex(index), - escli.Search.WithBody(&buf), - escli.Search.WithTrackTotalHits(true), - escli.Search.WithSize(30), + escli.Search.WithBody(bytes.NewBuffer(b)), ) if err != nil { - log.Fatalf("Error getting response: %s", err) + return nil, fmt.Errorf("search query error: %w", err) } defer res.Body.Close() + if res.IsError() { + ioutil.ReadAll(res.Body) + return nil, fmt.Errorf("search query error, code=%d", res.StatusCode) + } var out EsSearchResponse if err := json.NewDecoder(res.Body).Decode(&out); err != nil { diff --git a/search/server.go b/search/server.go index 063dec4bd..8f7f60160 100644 --- a/search/server.go +++ b/search/server.go @@ -2,57 +2,39 @@ package search import ( "context" - "encoding/base32" - "encoding/json" + _ "embed" "fmt" - "strconv" + "io/ioutil" + "log/slog" + "os" "strings" - api "github.com/bluesky-social/indigo/api" - bsky "github.com/bluesky-social/indigo/api/bsky" + "github.com/bluesky-social/indigo/atproto/identity" "github.com/bluesky-social/indigo/backfill" "github.com/bluesky-social/indigo/util/version" "github.com/bluesky-social/indigo/xrpc" - lru "github.com/hashicorp/golang-lru" - flatfs "github.com/ipfs/go-ds-flatfs" - blockstore "github.com/ipfs/go-ipfs-blockstore" - logging "github.com/ipfs/go-log" + "github.com/labstack/echo-contrib/echoprometheus" "github.com/labstack/echo/v4" "github.com/labstack/echo/v4/middleware" es "github.com/opensearch-project/opensearch-go/v2" + slogecho "github.com/samber/slog-echo" gorm "gorm.io/gorm" ) -var log = logging.Logger("search") - type Server struct { - escli *es.Client - db *gorm.DB - bgshost string - xrpcc *xrpc.Client - bgsxrpc *xrpc.Client - plc *api.PLCServer - echo *echo.Echo + escli *es.Client + postIndex string + profileIndex string + db *gorm.DB + bgshost string + bgsxrpc *xrpc.Client + dir identity.Directory + echo *echo.Echo + logger *slog.Logger bfs *backfill.Gormstore bf *backfill.Backfiller - - userCache *lru.Cache -} - -type PostRef struct { - gorm.Model - Cid string - Tid string `gorm:"index"` - Uid uint `gorm:"index"` -} - -type User struct { - gorm.Model - Did string `gorm:"index"` - Handle string - LastCrawl string } type LastSeq struct { @@ -60,24 +42,29 @@ type LastSeq struct { Seq int64 } -func NewServer(db *gorm.DB, escli *es.Client, plcHost, pdsHost, bgsHost string) (*Server, error) { +type Config struct { + BGSHost string + ProfileIndex string + PostIndex string + Logger *slog.Logger + BGSSyncRateLimit int + IndexMaxConcurrency int +} - log.Info("Migrating database") - db.AutoMigrate(&PostRef{}) - db.AutoMigrate(&User{}) - db.AutoMigrate(&LastSeq{}) - db.AutoMigrate(&backfill.GormDBJob{}) +func NewServer(db *gorm.DB, escli *es.Client, dir identity.Directory, config Config) (*Server, error) { - // TODO: robust client - xc := &xrpc.Client{ - Host: pdsHost, + logger := config.Logger + if logger == nil { + logger = slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ + Level: slog.LevelInfo, + })) } - plc := &api.PLCServer{ - Host: plcHost, - } + logger.Info("running database migrations") + db.AutoMigrate(&LastSeq{}) + db.AutoMigrate(&backfill.GormDBJob{}) - bgsws := bgsHost + bgsws := config.BGSHost if !strings.HasPrefix(bgsws, "ws") { return nil, fmt.Errorf("specified bgs host must include 'ws://' or 'wss://'") } @@ -87,19 +74,30 @@ func NewServer(db *gorm.DB, escli *es.Client, plcHost, pdsHost, bgsHost string) Host: bgshttp, } - ucache, _ := lru.New(100000) s := &Server{ - escli: escli, - db: db, - bgshost: bgsHost, - xrpcc: xc, - bgsxrpc: bgsxrpc, - plc: plc, - userCache: ucache, + escli: escli, + profileIndex: config.ProfileIndex, + postIndex: config.PostIndex, + db: db, + bgshost: config.BGSHost, // NOTE: the original URL, not 'bgshttp' + bgsxrpc: bgsxrpc, + dir: dir, + logger: logger, } bfstore := backfill.NewGormstore(db) opts := backfill.DefaultBackfillOptions() + if config.BGSSyncRateLimit > 0 { + opts.SyncRequestsPerSecond = config.BGSSyncRateLimit + } else { + opts.SyncRequestsPerSecond = 8 + } + if config.IndexMaxConcurrency > 0 { + opts.ParallelRecordCreates = config.IndexMaxConcurrency + } else { + opts.ParallelRecordCreates = 20 + } + opts.NSIDFilter = "app.bsky." bf := backfill.NewBackfiller( "search", bfstore, @@ -115,144 +113,51 @@ func NewServer(db *gorm.DB, escli *es.Client, plcHost, pdsHost, bgsHost string) return s, nil } -func (s *Server) SearchPosts(ctx context.Context, srch string, offset, size int) ([]PostSearchResult, error) { - resp, err := doSearchPosts(ctx, s.escli, srch, offset, size) - if err != nil { - return nil, err - } - - out := []PostSearchResult{} - for _, r := range resp.Hits.Hits { - uid, tid, err := decodeDocumentID(r.ID) - if err != nil { - return nil, fmt.Errorf("decoding document id: %w", err) - } +//go:embed post_schema.json +var palomarPostSchemaJSON string - var p PostRef - if err := s.db.First(&p, "tid = ? AND uid = ?", tid, uid).Error; err != nil { - log.Infof("failed to find post in database that is referenced by elasticsearch: %s", r.ID) - return nil, err - } - - var u User - if err := s.db.First(&u, "id = ?", p.Uid).Error; err != nil { - return nil, err - } +//go:embed profile_schema.json +var palomarProfileSchemaJSON string - var rec map[string]any - if err := json.Unmarshal(r.Source, &rec); err != nil { - return nil, err - } +func (s *Server) EnsureIndices(ctx context.Context) error { - out = append(out, PostSearchResult{ - Tid: p.Tid, - Cid: p.Cid, - User: UserResult{ - Did: u.Did, - Handle: u.Handle, - }, - Post: &rec, - }) + indices := []struct { + Name string + SchemaJSON string + }{ + {Name: s.postIndex, SchemaJSON: palomarPostSchemaJSON}, + {Name: s.profileIndex, SchemaJSON: palomarProfileSchemaJSON}, } - - return out, nil -} - -func (s *Server) getOrCreateUser(ctx context.Context, did string) (*User, error) { - cu, ok := s.userCache.Get(did) - if ok { - return cu.(*User), nil - } - - var u User - if err := s.db.Find(&u, "did = ?", did).Error; err != nil { - return nil, err - } - if u.ID == 0 { - // TODO: figure out peoples handles - h, err := s.handleFromDid(ctx, did) + for _, idx := range indices { + resp, err := s.escli.Indices.Exists([]string{idx.Name}) if err != nil { - log.Errorw("failed to resolve did to handle", "did", did, "err", err) - } else { - u.Handle = h + return err } - - u.Did = did - if err := s.db.Create(&u).Error; err != nil { - return nil, err + defer resp.Body.Close() + ioutil.ReadAll(resp.Body) + if resp.IsError() && resp.StatusCode != 404 { + return fmt.Errorf("failed to check index existence") } - } - - s.userCache.Add(did, &u) - - return &u, nil -} - -var ErrDoneIterating = fmt.Errorf("done iterating") - -func encodeDocumentID(uid uint, tid string) string { - comb := fmt.Sprintf("%d:%s", uid, tid) - return base32.StdEncoding.EncodeToString([]byte(comb)) -} - -func decodeDocumentID(docid string) (uint, string, error) { - dec, err := base32.StdEncoding.DecodeString(docid) - if err != nil { - return 0, "", err - } - - parts := strings.SplitN(string(dec), ":", 2) - if len(parts) < 2 { - return 0, "", fmt.Errorf("invalid document id: %q", string(dec)) - } - - uid, err := strconv.Atoi(parts[0]) - if err != nil { - return 0, "", err - } - - return uint(uid), parts[1], nil -} - -func (s *Server) SearchProfiles(ctx context.Context, srch string) ([]*ActorSearchResp, error) { - resp, err := doSearchProfiles(ctx, s.escli, srch) - if err != nil { - return nil, err - } - - out := []*ActorSearchResp{} - for _, r := range resp.Hits.Hits { - uid, err := strconv.Atoi(r.ID) - if err != nil { - return nil, err - } - - var u User - if err := s.db.First(&u, "id = ?", uid).Error; err != nil { - return nil, err - } - - var rec bsky.ActorProfile - if err := json.Unmarshal(r.Source, &rec); err != nil { - return nil, err + if resp.StatusCode == 404 { + s.logger.Warn("creating opensearch index", "index", idx.Name) + if len(idx.SchemaJSON) < 2 { + return fmt.Errorf("empty schema file (go:embed failed)") + } + buf := strings.NewReader(idx.SchemaJSON) + resp, err := s.escli.Indices.Create( + idx.Name, + s.escli.Indices.Create.WithBody(buf)) + if err != nil { + return err + } + defer resp.Body.Close() + ioutil.ReadAll(resp.Body) + if resp.IsError() { + return fmt.Errorf("failed to create index") + } } - - out = append(out, &ActorSearchResp{ - ActorProfile: rec, - DID: u.Did, - }) } - - return out, nil -} - -func OpenBlockstore(dir string) (blockstore.Blockstore, error) { - fds, err := flatfs.CreateOrOpen(dir, flatfs.IPFS_DEF_SHARD, false) - if err != nil { - return nil, err - } - - return blockstore.NewBlockstoreNoPrefix(fds), nil + return nil } type HealthStatus struct { @@ -263,7 +168,7 @@ type HealthStatus struct { func (s *Server) handleHealthCheck(c echo.Context) error { if err := s.db.Exec("SELECT 1").Error; err != nil { - log.Errorf("healthcheck can't connect to database: %v", err) + s.logger.Error("healthcheck can't connect to database", "err", err) return c.JSON(500, HealthStatus{Status: "error", Version: version.Version, Message: "can't connect to database"}) } else { return c.JSON(200, HealthStatus{Status: "ok", Version: version.Version}) @@ -272,30 +177,31 @@ func (s *Server) handleHealthCheck(c echo.Context) error { func (s *Server) RunAPI(listen string) error { - log.Infof("Configuring HTTP server") + s.logger.Info("Configuring HTTP server") e := echo.New() e.HideBanner = true - - e.Use(middleware.LoggerWithConfig(middleware.LoggerConfig{ - Format: "method=${method} uri=${uri} status=${status} latency=${latency_human}\n", - })) + e.Use(slogecho.New(s.logger)) + e.Use(middleware.Recover()) + e.Use(echoprometheus.NewMiddleware("palomar")) + e.Use(middleware.BodyLimit("64M")) e.HTTPErrorHandler = func(err error, ctx echo.Context) { code := 500 if he, ok := err.(*echo.HTTPError); ok { code = he.Code } - log.Warnw("HTTP request error", "statusCode", code, "path", ctx.Path(), "err", err) + s.logger.Warn("HTTP request error", "statusCode", code, "path", ctx.Path(), "err", err) ctx.Response().WriteHeader(code) } e.Use(middleware.CORS()) e.GET("/_health", s.handleHealthCheck) - e.GET("/search/posts", s.handleSearchRequestPosts) - e.GET("/search/profiles", s.handleSearchRequestProfiles) + e.GET("/metrics", echoprometheus.NewHandler()) + e.GET("/xrpc/app.bsky.unspecced.searchPostsSkeleton", s.handleSearchPostsSkeleton) + e.GET("/xrpc/app.bsky.unspecced.searchActorsSkeleton", s.handleSearchActorsSkeleton) s.echo = e - log.Infof("starting search API daemon at: %s", listen) + s.logger.Info("starting search API daemon", "bind", listen) return s.echo.Start(listen) } diff --git a/search/testdata/transform-post-fixtures.json b/search/testdata/transform-post-fixtures.json new file mode 100644 index 000000000..a1bd12787 --- /dev/null +++ b/search/testdata/transform-post-fixtures.json @@ -0,0 +1,175 @@ +[ + { + "did": "did:plc:u5cwb2mwiv2bfq53cjufe6yn", + "handle": "handle.example.com", + "rkey": "3k4duaz5vfs2b", + "cid": "bafyreibjifzpqj6o6wcq3hejh7y4z4z2vmiklkvykc57tw3pcbx3kxifpm", + "PostRecord": { + "$type": "app.bsky.feed.post", + "text": "post which embeds an external URL as a card", + "createdAt": "2023-08-07T05:46:14.423045Z", + "embed": { + "$type": "app.bsky.embed.external", + "external": { + "uri": "https://bsky.app", + "title": "Bluesky Social", + "description": "See what's next.", + "thumb": { + "$type": "blob", + "ref": { + "$link": "bafkreiash5eihfku2jg4skhyh5kes7j5d5fd6xxloaytdywcvb3r3zrzhu" + }, + "mimeType": "image/png", + "size": 23527 + } + } + } + }, + "doc_id": "did:plc:u5cwb2mwiv2bfq53cjufe6yn_3k4duaz5vfs2b", + "PostDoc": { + "doc_index_ts": "2006-01-02T15:04:05.000Z", + "did": "did:plc:u5cwb2mwiv2bfq53cjufe6yn", + "handle": "handle.example.com", + "record_rkey": "3k4duaz5vfs2b", + "record_cid": "bafyreibjifzpqj6o6wcq3hejh7y4z4z2vmiklkvykc57tw3pcbx3kxifpm", + "created_at": "2023-08-07T05:46:14.423045Z", + "text": "post which embeds an external URL as a card", + "embed_url": "https://bsky.app", + "embed_img_count": 0 + } + }, + { + "did": "did:plc:u5cwb2mwiv2bfq53cjufe6yn", + "handle": "handle.example.com", + "rkey": "3k4duaz5vfs2b", + "cid": "bafyreibjifzpqj6o6wcq3hejh7y4z4z2vmiklkvykc57tw3pcbx3kxifpm", + "PostRecord": { + "$type": "app.bsky.feed.post", + "text": "longer example with #some #hashtags, emoji \u2620 \ud83d\ude42 \ud83c\udf85\ud83c\udfff, flags \ud83c\uddf8\ud83c\udde8 ", + "createdAt": "2023-08-07T05:46:14.423045Z", + "langs": ["th", "en-US"], + "facets": [ + { + "index": { + "byteStart": 23, + "byteEnd": 35 + }, + "features": [ + { + "$type": "app.bsky.richtext.facet#mention", + "did": "did:plc:ewvi7nxzyoun6zhxrhs64oiz" + } + ] + }, + { + "index": { + "byteStart": 74, + "byteEnd": 108 + }, + "features": [ + { + "$type": "app.bsky.richtext.facet#link", + "uri": "https://en.wikipedia.org/wiki/CBOR" + } + ] + } + ], + "labels": { + "$type": "com.atproto.label.defs#selfLabels", + "values": [ + {"val": "nudity"} + ] + }, + "reply": { + "root": { + "uri": "at://did:plc:u5cwb2mwiv2bfq53cjufe6yn/app.bsky.feed.post/3k43tv4rft22g", + "cid": "bafyreig2fjxi3rptqdgylg7e5hmjl6mcke7rn2b6cugzlqq3i4zu6rq52q" + }, + "parent": { + "uri": "at://did:plc:u5cwb2mwiv2bfq53cjufe6yn/app.bsky.feed.post/3k43tv4rft22g", + "cid": "bafyreig2fjxi3rptqdgylg7e5hmjl6mcke7rn2b6cugzlqq3i4zu6rq52q" + } + }, + "embed": { + "$type": "app.bsky.embed.record", + "record": { + "uri": "at://did:plc:u5cwb2mwiv2bfq53cjufe6yn/app.bsky.feed.post/3k44deefqdk2g", + "cid": "bafyreiecx6dujwoeqpdzl27w67z4h46hyklk3an4i4cvvmioaqb2qbyo5u" + } + } + }, + "doc_id": "did:plc:u5cwb2mwiv2bfq53cjufe6yn_3k4duaz5vfs2b", + "PostDoc": { + "doc_index_ts": "2006-01-02T15:04:05.000Z", + "did": "did:plc:u5cwb2mwiv2bfq53cjufe6yn", + "handle": "handle.example.com", + "record_rkey": "3k4duaz5vfs2b", + "record_cid": "bafyreibjifzpqj6o6wcq3hejh7y4z4z2vmiklkvykc57tw3pcbx3kxifpm", + "created_at": "2023-08-07T05:46:14.423045Z", + "text": "longer example with #some #hashtags, emoji \u2620 \ud83d\ude42 \ud83c\udf85\ud83c\udfff, flags \ud83c\uddf8\ud83c\udde8 ", + "reply_root_aturi": "at://did:plc:u5cwb2mwiv2bfq53cjufe6yn/app.bsky.feed.post/3k43tv4rft22g", + "link_url": [ "https://en.wikipedia.org/wiki/CBOR" ], + "mention_did": [ "did:plc:ewvi7nxzyoun6zhxrhs64oiz" ], + "embed_aturi": "at://did:plc:u5cwb2mwiv2bfq53cjufe6yn/app.bsky.feed.post/3k44deefqdk2g", + "lang_code": ["th", "en-US"], + "lang_code_iso2": ["th", "en"], + "self_label": ["nudity"], + "hashtag": ["some", "hashtags"], + "emoji": ["\u2620", "\ud83d\ude42", "\ud83c\udf85\ud83c\udfff", "\ud83c\uddf8\ud83c\udde8"], + "embed_img_count": 0 + } + }, + { + "did": "did:plc:u5cwb2mwiv2bfq53cjufe6yn", + "handle": "handle.example.com", + "rkey": "3k4duaz5vfs2b", + "cid": "bafyreibjifzpqj6o6wcq3hejh7y4z4z2vmiklkvykc57tw3pcbx3kxifpm", + "PostRecord": { + "$type": "app.bsky.feed.post", + "text": "", + "createdAt": "2023-08-07T05:46:14.423045Z", + "embed": { + "$type": "app.bsky.embed.images", + "images": [ + { + "alt": "brief alt text description of the first image", + "image": { + "$type": "blob", + "ref": { + "$link": "bafkreibabalobzn6cd366ukcsjycp4yymjymgfxcv6xczmlgpemzkz3cfa" + }, + "mimeType": "image/webp", + "size": 760898 + } + }, + { + "alt": "brief alt text description of the second image", + "image": { + "$type": "blob", + "ref": { + "$link": "bafkreif3fouono2i3fmm5moqypwskh3yjtp7snd5hfq5pr453oggygyrte" + }, + "mimeType": "image/png", + "size": 13208 + } + } + ] + } + }, + "doc_id": "did:plc:u5cwb2mwiv2bfq53cjufe6yn_3k4duaz5vfs2b", + "PostDoc": { + "doc_index_ts": "2006-01-02T15:04:05.000Z", + "did": "did:plc:u5cwb2mwiv2bfq53cjufe6yn", + "handle": "handle.example.com", + "record_rkey": "3k4duaz5vfs2b", + "record_cid": "bafyreibjifzpqj6o6wcq3hejh7y4z4z2vmiklkvykc57tw3pcbx3kxifpm", + "created_at": "2023-08-07T05:46:14.423045Z", + "text": "", + "embed_img_alt_text": [ + "brief alt text description of the first image", + "brief alt text description of the second image" + ], + "embed_img_count": 2 + } + } +] diff --git a/search/testdata/transform-profile-fixtures.json b/search/testdata/transform-profile-fixtures.json new file mode 100644 index 000000000..7c037a2a2 --- /dev/null +++ b/search/testdata/transform-profile-fixtures.json @@ -0,0 +1,63 @@ +[ + { + "did": "did:plc:u5cwb2mwiv2bfq53cjufe6yn", + "rkey": "self", + "cid": "bafyreibjifzpqj6o6wcq3hejh7y4z4z2vmiklkvykc57tw3pcbx3kxifpm", + "handle": "handle.example.com", + "ProfileRecord": { + "$type": "app.bsky.actor.profile" + }, + "doc_id": "did:plc:u5cwb2mwiv2bfq53cjufe6yn", + "ProfileDoc": { + "doc_index_ts": "2006-01-02T15:04:05.000Z", + "did": "did:plc:u5cwb2mwiv2bfq53cjufe6yn", + "handle": "handle.example.com", + "record_cid": "bafyreibjifzpqj6o6wcq3hejh7y4z4z2vmiklkvykc57tw3pcbx3kxifpm", + "has_avatar": false, + "has_banner": false + } + }, + { + "did": "did:plc:u5cwb2mwiv2bfq53cjufe6yn", + "rkey": "self", + "cid": "bafyreibjifzpqj6o6wcq3hejh7y4z4z2vmiklkvykc57tw3pcbx3kxifpm", + "handle": "handle.example.com", + "ProfileRecord": { + "$type": "app.bsky.actor.profile", + "displayName": "Big Bubba", + "description": "Big Description πŸ₯Έ #cheese", + "labels": { + "$type": "com.atproto.label.defs#selfLabels", + "values": [ + {"val": "nudity"} + ] + }, + "avatar": { + "$type": "blob", + "mimeType": "image/jpeg", + "ref": { + "$link": "bafkreiglnysron3h2je7nf6cmvtimuaxi7xe2c7rkxitmks3mzmajnc2ou" + }, + "size": 106760 + }, + "banner": { + "cid": "bafkreih42ufe7ies4zephtkdw4tcb4hvuoc2pjzybpjxpecz3tyymnvkhm", + "mimeType": "image/jpeg" + } + }, + "doc_id": "did:plc:u5cwb2mwiv2bfq53cjufe6yn", + "ProfileDoc": { + "doc_index_ts": "2006-01-02T15:04:05.000Z", + "did": "did:plc:u5cwb2mwiv2bfq53cjufe6yn", + "handle": "handle.example.com", + "record_cid": "bafyreibjifzpqj6o6wcq3hejh7y4z4z2vmiklkvykc57tw3pcbx3kxifpm", + "display_name": "Big Bubba", + "description": "Big Description πŸ₯Έ #cheese", + "self_label": ["nudity"], + "emoji": ["πŸ₯Έ"], + "hashtag": ["cheese"], + "has_avatar": true, + "has_banner": true + } + } +] diff --git a/search/transform.go b/search/transform.go new file mode 100644 index 000000000..9a922d7df --- /dev/null +++ b/search/transform.go @@ -0,0 +1,217 @@ +package search + +import ( + "regexp" + "strings" + "time" + + appbsky "github.com/bluesky-social/indigo/api/bsky" + "github.com/bluesky-social/indigo/atproto/identity" + "github.com/bluesky-social/indigo/util" + "github.com/rivo/uniseg" +) + +type ProfileDoc struct { + DocIndexTs string `json:"doc_index_ts"` + DID string `json:"did"` + RecordCID string `json:"record_cid"` + Handle string `json:"handle"` + DisplayName *string `json:"display_name,omitempty"` + Description *string `json:"description,omitempty"` + ImgAltText []string `json:"img_alt_text,omitempty"` + SelfLabel []string `json:"self_label,omitempty"` + Hashtag []string `json:"hashtag,omitempty"` + Emoji []string `json:"emoji,omitempty"` + HasAvatar bool `json:"has_avatar"` + HasBanner bool `json:"has_banner"` +} + +type PostDoc struct { + DocIndexTs string `json:"doc_index_ts"` + DID string `json:"did"` + RecordRkey string `json:"record_rkey"` + RecordCID string `json:"record_cid"` + CreatedAt string `json:"created_at"` + Text string `json:"text"` + LangCode []string `json:"lang_code,omitempty"` + LangCodeIso2 []string `json:"lang_code_iso2,omitempty"` + MentionDID []string `json:"mention_did,omitempty"` + LinkURL []string `json:"link_url,omitempty"` + EmbedURL *string `json:"embed_url,omitempty"` + EmbedATURI *string `json:"embed_aturi,omitempty"` + ReplyRootATURI *string `json:"reply_root_aturi,omitempty"` + EmbedImgCount int `json:"embed_img_count"` + EmbedImgAltText []string `json:"embed_img_alt_text,omitempty"` + SelfLabel []string `json:"self_label,omitempty"` + Hashtag []string `json:"hashtag,omitempty"` + Emoji []string `json:"emoji,omitempty"` +} + +// Returns the search index document ID (`_id`) for this document. +// +// This identifier should be URL safe and not contain a slash ("/"). +func (d *ProfileDoc) DocId() string { + return d.DID +} + +// Returns the search index document ID (`_id`) for this document. +// +// This identifier should be URL safe and not contain a slash ("/"). +func (d *PostDoc) DocId() string { + return d.DID + "_" + d.RecordRkey +} + +func TransformProfile(profile *appbsky.ActorProfile, ident *identity.Identity, cid string) ProfileDoc { + // TODO: placeholder for future alt text on profile blobs + var altText []string + var hashtags []string + var emojis []string + if profile.Description != nil { + hashtags = parseHashtags(*profile.Description) + emojis = parseEmojis(*profile.Description) + } + var selfLabels []string + if profile.Labels != nil && profile.Labels.LabelDefs_SelfLabels != nil { + for _, le := range profile.Labels.LabelDefs_SelfLabels.Values { + selfLabels = append(selfLabels, le.Val) + } + } + handle := "" + if !ident.Handle.IsInvalidHandle() { + handle = ident.Handle.String() + } + return ProfileDoc{ + DocIndexTs: time.Now().UTC().Format(util.ISO8601), + DID: ident.DID.String(), + RecordCID: cid, + Handle: handle, + DisplayName: profile.DisplayName, + Description: profile.Description, + ImgAltText: altText, + SelfLabel: selfLabels, + Hashtag: hashtags, + Emoji: emojis, + HasAvatar: profile.Avatar != nil, + HasBanner: profile.Banner != nil, + } +} + +func TransformPost(post *appbsky.FeedPost, ident *identity.Identity, rkey, cid string) PostDoc { + altText := []string{} + if post.Embed != nil && post.Embed.EmbedImages != nil { + for _, img := range post.Embed.EmbedImages.Images { + if img.Alt != "" { + altText = append(altText, img.Alt) + } + } + } + var langCodeIso2 []string + for _, lang := range post.Langs { + // TODO: include an actual language code map to go from 3char to 2char + prefix := strings.SplitN(lang, "-", 2)[0] + if len(prefix) == 2 { + langCodeIso2 = append(langCodeIso2, strings.ToLower(prefix)) + } + } + var mentionDIDs []string + var linkURLs []string + for _, facet := range post.Facets { + for _, feat := range facet.Features { + if feat.RichtextFacet_Mention != nil { + mentionDIDs = append(mentionDIDs, feat.RichtextFacet_Mention.Did) + } + if feat.RichtextFacet_Link != nil { + linkURLs = append(linkURLs, feat.RichtextFacet_Link.Uri) + } + } + } + var replyRootATURI *string + if post.Reply != nil { + replyRootATURI = &(post.Reply.Root.Uri) + } + var embedURL *string + if post.Embed != nil && post.Embed.EmbedExternal != nil { + embedURL = &post.Embed.EmbedExternal.External.Uri + } + var embedATURI *string + if post.Embed != nil && post.Embed.EmbedRecord != nil { + embedATURI = &post.Embed.EmbedRecord.Record.Uri + } + if post.Embed != nil && post.Embed.EmbedRecordWithMedia != nil { + embedATURI = &post.Embed.EmbedRecordWithMedia.Record.Record.Uri + } + var embedImgCount int = 0 + var embedImgAltText []string + if post.Embed != nil && post.Embed.EmbedImages != nil { + embedImgCount = len(post.Embed.EmbedImages.Images) + for _, img := range post.Embed.EmbedImages.Images { + if img.Alt != "" { + embedImgAltText = append(embedImgAltText, img.Alt) + } + } + } + var selfLabels []string + if post.Labels != nil && post.Labels.LabelDefs_SelfLabels != nil { + for _, le := range post.Labels.LabelDefs_SelfLabels.Values { + selfLabels = append(selfLabels, le.Val) + } + } + + return PostDoc{ + DocIndexTs: time.Now().UTC().Format(util.ISO8601), + DID: ident.DID.String(), + RecordRkey: rkey, + RecordCID: cid, + CreatedAt: post.CreatedAt, + Text: post.Text, + LangCode: post.Langs, + LangCodeIso2: langCodeIso2, + MentionDID: mentionDIDs, + LinkURL: linkURLs, + EmbedURL: embedURL, + EmbedATURI: embedATURI, + ReplyRootATURI: replyRootATURI, + EmbedImgCount: embedImgCount, + EmbedImgAltText: embedImgAltText, + SelfLabel: selfLabels, + Hashtag: parseHashtags(post.Text), + Emoji: parseEmojis(post.Text), + } +} + +func parseHashtags(s string) []string { + var hashtagRegex = regexp.MustCompile(`\B#([A-Za-z]+)\b`) + var ret []string = []string{} + seen := make(map[string]bool) + for _, m := range hashtagRegex.FindAllStringSubmatch(s, -1) { + if seen[m[1]] == false { + ret = append(ret, m[1]) + seen[m[1]] = true + } + } + if len(ret) == 0 { + return nil + } + return ret +} + +func parseEmojis(s string) []string { + var ret []string = []string{} + seen := make(map[string]bool) + gr := uniseg.NewGraphemes(s) + for gr.Next() { + // check if this grapheme cluster starts with an emoji rune (Unicode codepoint, int32) + firstRune := gr.Runes()[0] + if (firstRune >= 0x1F000 && firstRune <= 0x1FFFF) || (firstRune >= 0x2600 && firstRune <= 0x26FF) { + emoji := gr.Str() + if seen[emoji] == false { + ret = append(ret, emoji) + seen[emoji] = true + } + } + } + if len(ret) == 0 { + return nil + } + return ret +} diff --git a/search/transform_test.go b/search/transform_test.go new file mode 100644 index 000000000..de84fd02e --- /dev/null +++ b/search/transform_test.go @@ -0,0 +1,125 @@ +package search + +import ( + "encoding/json" + "io" + "os" + "testing" + + appbsky "github.com/bluesky-social/indigo/api/bsky" + "github.com/bluesky-social/indigo/atproto/identity" + "github.com/bluesky-social/indigo/atproto/syntax" + + "github.com/stretchr/testify/assert" +) + +func TestParseHashtags(t *testing.T) { + assert := assert.New(t) + + assert.Equal(parseHashtags("#basic post with #HashTag #examples"), []string{"basic", "HashTag", "examples"}) + assert.Equal(parseHashtags("#dedupe #dedupe"), []string{"dedupe"}) + assert.Equal(parseHashtags("##double"), []string{"double"}) + assert.Equal(parseHashtags("#with-punc"), []string{"with"}) + assert.True(parseHashtags("not https://example.com/thing#fragment") == nil) +} + +func TestParseEmojis(t *testing.T) { + assert := assert.New(t) + + assert.Equal(parseEmojis("bunch πŸŽ… of 🏑 emoji 🀰and πŸ«„ some πŸ‘©β€πŸ‘©β€πŸ‘§β€πŸ‘§ compound"), []string{"πŸŽ…", "🏑", "🀰", "πŸ«„", "πŸ‘©β€πŸ‘©β€πŸ‘§β€πŸ‘§"}) + + assert.Equal(parseEmojis("more β›„ from ☠ lower β›΄ range"), []string{"β›„", "☠", "β›΄"}) + assert.True(parseEmojis("blah") == nil) +} + +type profileFixture struct { + DID string `json:"did"` + Handle string `json:"handle"` + Rkey string `json:"rkey"` + Cid string `json:"cid"` + DocId string `json:"doc_id"` + ProfileRecord *appbsky.ActorProfile + ProfileDoc ProfileDoc +} + +func TestTransformProfileFixtures(t *testing.T) { + f, err := os.Open("testdata/transform-profile-fixtures.json") + if err != nil { + t.Fatal(err) + } + defer f.Close() + + fixBytes, err := io.ReadAll(f) + if err != nil { + t.Fatal(err) + } + + var fixtures []profileFixture + if err := json.Unmarshal(fixBytes, &fixtures); err != nil { + t.Fatal(err) + } + + for _, row := range fixtures { + _ = row + testProfileFixture(t, row) + } +} + +func testProfileFixture(t *testing.T, row profileFixture) { + assert := assert.New(t) + + repo := identity.Identity{ + Handle: syntax.Handle(row.Handle), + DID: syntax.DID(row.DID), + } + doc := TransformProfile(row.ProfileRecord, &repo, row.Cid) + doc.DocIndexTs = "2006-01-02T15:04:05.000Z" + assert.Equal(row.ProfileDoc, doc) + assert.Equal(row.DocId, doc.DocId()) +} + +type postFixture struct { + DID string `json:"did"` + Handle string `json:"handle"` + Rkey string `json:"rkey"` + Cid string `json:"cid"` + DocId string `json:"doc_id"` + PostRecord *appbsky.FeedPost + PostDoc PostDoc +} + +func TestTransformPostFixtures(t *testing.T) { + f, err := os.Open("testdata/transform-post-fixtures.json") + if err != nil { + t.Fatal(err) + } + defer f.Close() + + fixBytes, err := io.ReadAll(f) + if err != nil { + t.Fatal(err) + } + + var fixtures []postFixture + if err := json.Unmarshal(fixBytes, &fixtures); err != nil { + t.Fatal(err) + } + + for _, row := range fixtures { + _ = row + testPostFixture(t, row) + } +} + +func testPostFixture(t *testing.T, row postFixture) { + assert := assert.New(t) + + repo := identity.Identity{ + Handle: syntax.Handle(row.Handle), + DID: syntax.DID(row.DID), + } + doc := TransformPost(row.PostRecord, &repo, row.Rkey, row.Cid) + doc.DocIndexTs = "2006-01-02T15:04:05.000Z" + assert.Equal(row.PostDoc, doc) + assert.Equal(row.DocId, doc.DocId()) +} diff --git a/util/time.go b/util/time.go index da9a6a6e9..d178f1a06 100644 --- a/util/time.go +++ b/util/time.go @@ -13,6 +13,10 @@ const ISO8601_numtz = "2006-01-02T15:04:05.000-07:00" const ISO8601_numtz_milli = "2006-01-02T15:04:05.000000-07:00" +const ISO8601_sec = "2006-01-02T15:04:05Z" + +const ISO8601_numtz_sec = "2006-01-02T15:04:05-07:00" + func ParseTimestamp(s string) (time.Time, error) { t, err := time.Parse(ISO8601, s) if err == nil { @@ -34,5 +38,15 @@ func ParseTimestamp(s string) (time.Time, error) { return t, nil } + t, err = time.Parse(ISO8601_sec, s) + if err == nil { + return t, nil + } + + t, err = time.Parse(ISO8601_numtz_sec, s) + if err == nil { + return t, nil + } + return time.Time{}, fmt.Errorf("failed to parse %q as timestamp", s) } diff --git a/util/time_test.go b/util/time_test.go index d9d280c78..fd3c4192a 100644 --- a/util/time_test.go +++ b/util/time_test.go @@ -8,6 +8,7 @@ func TestTimeParsing(t *testing.T) { "2023-07-19T21:54:14.163Z", "2023-07-19T21:52:02.000+00:00", "2023-07-19T21:52:02.123456+00:00", + "2023-09-13T11:23:33+09:00", } for _, g := range good {