diff --git a/search/indexing.go b/search/indexing.go index a69e21c50..cc54f0373 100644 --- a/search/indexing.go +++ b/search/indexing.go @@ -18,7 +18,8 @@ import ( ) func (s *Server) deletePost(ctx context.Context, ident *identity.Identity, rkey string) error { - s.logger.Info("deleting post from index", "repo", ident.DID, "rkey", rkey) + log := s.logger.With("repo", ident.DID, "rkey", rkey, "op", "deletePost") + log.Info("deleting post from index") docID := fmt.Sprintf("%s_%s", ident.DID.String(), rkey) req := esapi.DeleteRequest{ Index: s.postIndex, @@ -31,28 +32,33 @@ func (s *Server) deletePost(ctx context.Context, ident *identity.Identity, rkey return fmt.Errorf("failed to delete post: %w", err) } defer res.Body.Close() - io.ReadAll(res.Body) + body, err := io.ReadAll(res.Body) + if err != nil { + return fmt.Errorf("failed to read indexing response: %w", err) + } if res.IsError() { - s.logger.Warn("opensearch indexing error", "status_code", res.StatusCode, "response", res) + log.Warn("opensearch indexing error", "status_code", res.StatusCode, "response", res, "body", string(body)) return fmt.Errorf("indexing error, code=%d", res.StatusCode) } return nil } func (s *Server) indexPost(ctx context.Context, ident *identity.Identity, rec *appbsky.FeedPost, path string, rcid cid.Cid) error { - + log := s.logger.With("repo", ident.DID, "path", path, "op", "indexPost") parts := strings.SplitN(path, "/", 3) // TODO: replace with an atproto/syntax package type for TID var tidRegex = regexp.MustCompile(`^[234567abcdefghijklmnopqrstuvwxyz]{13}$`) if len(parts) != 2 || !tidRegex.MatchString(parts[1]) { - s.logger.Warn("skipping index post record with weird path/TID", "did", ident.DID, "path", path) + log.Warn("skipping index post record with weird path/TID", "did", ident.DID, "path", path) return nil } rkey := parts[1] + log = log.With("rkey", rkey) + _, err := util.ParseTimestamp(rec.CreatedAt) if err != nil { - s.logger.Warn("post had invalid timestamp", "repo", ident.DID, "rkey", rkey, "createdAt", rec.CreatedAt, "parseErr", err) + log.Warn("post had invalid timestamp", "createdAt", rec.CreatedAt, "parseErr", err) rec.CreatedAt = "" } @@ -62,7 +68,7 @@ func (s *Server) indexPost(ctx context.Context, ident *identity.Identity, rec *a return err } - s.logger.Debug("indexing post", "did", ident.DID, "rkey", rkey) + log.Debug("indexing post") req := esapi.IndexRequest{ Index: s.postIndex, DocumentID: doc.DocId(), @@ -71,26 +77,31 @@ func (s *Server) indexPost(ctx context.Context, ident *identity.Identity, rec *a res, err := req.Do(ctx, s.escli) if err != nil { + log.Warn("failed to send indexing request", "err", err) return fmt.Errorf("failed to send indexing request: %w", err) } defer res.Body.Close() - io.ReadAll(res.Body) + body, err := io.ReadAll(res.Body) + if err != nil { + log.Warn("failed to read indexing response", "err", err) + return fmt.Errorf("failed to read indexing response: %w", err) + } if res.IsError() { - s.logger.Warn("opensearch indexing error", "status_code", res.StatusCode, "response", res) + log.Warn("opensearch indexing error", "status_code", res.StatusCode, "response", res, "body", string(body)) return fmt.Errorf("indexing error, code=%d", res.StatusCode) } return nil } func (s *Server) indexProfile(ctx context.Context, ident *identity.Identity, rec *appbsky.ActorProfile, path string, rcid cid.Cid) error { - + log := s.logger.With("repo", ident.DID, "path", path, "op", "indexProfile") parts := strings.SplitN(path, "/", 3) if len(parts) != 2 || parts[1] != "self" { - s.logger.Warn("skipping indexing non-canonical profile record", "did", ident.DID, "path", path) + log.Warn("skipping indexing non-canonical profile record", "did", ident.DID, "path", path) return nil } - s.logger.Info("indexing profile", "did", ident.DID, "handle", ident.Handle) + log.Info("indexing profile", "handle", ident.Handle) doc := TransformProfile(rec, ident, rcid.String()) b, err := json.Marshal(doc) @@ -105,18 +116,24 @@ func (s *Server) indexProfile(ctx context.Context, ident *identity.Identity, rec res, err := req.Do(ctx, s.escli) if err != nil { + log.Warn("failed to send indexing request", "err", err) return fmt.Errorf("failed to send indexing request: %w", err) } defer res.Body.Close() - io.ReadAll(res.Body) + body, err := io.ReadAll(res.Body) + if err != nil { + log.Warn("failed to read indexing response", "err", err) + return fmt.Errorf("failed to read indexing response: %w", err) + } if res.IsError() { - s.logger.Warn("opensearch indexing error", "status_code", res.StatusCode, "response", res) + log.Warn("opensearch indexing error", "status_code", res.StatusCode, "response", res, "body", string(body)) return fmt.Errorf("indexing error, code=%d", res.StatusCode) } return nil } func (s *Server) updateUserHandle(ctx context.Context, did, handle string) error { + log := s.logger.With("repo", did, "op", "updateUserHandle", "handle", handle) b, err := json.Marshal(map[string]any{ "script": map[string]any{ "source": "ctx._source.handle = params.handle", @@ -127,6 +144,7 @@ func (s *Server) updateUserHandle(ctx context.Context, did, handle string) error }, }) if err != nil { + log.Warn("failed to marshal update script", "err", err) return err } @@ -138,12 +156,17 @@ func (s *Server) updateUserHandle(ctx context.Context, did, handle string) error res, err := req.Do(ctx, s.escli) if err != nil { + log.Warn("failed to send indexing request", "err", err) return fmt.Errorf("failed to send indexing request: %w", err) } defer res.Body.Close() - io.ReadAll(res.Body) + body, err := io.ReadAll(res.Body) + if err != nil { + log.Warn("failed to read indexing response", "err", err) + return fmt.Errorf("failed to read indexing response: %w", err) + } if res.IsError() { - s.logger.Warn("opensearch indexing error", "status_code", res.StatusCode, "response", res) + log.Warn("opensearch indexing error", "status_code", res.StatusCode, "response", res, "body", string(body)) return fmt.Errorf("indexing error, code=%d", res.StatusCode) } return nil diff --git a/search/transform.go b/search/transform.go index 9a922d7df..3f8ed3834 100644 --- a/search/transform.go +++ b/search/transform.go @@ -31,7 +31,7 @@ type PostDoc struct { DID string `json:"did"` RecordRkey string `json:"record_rkey"` RecordCID string `json:"record_cid"` - CreatedAt string `json:"created_at"` + CreatedAt *string `json:"created_at,omitempty"` Text string `json:"text"` LangCode []string `json:"lang_code,omitempty"` LangCodeIso2 []string `json:"lang_code_iso2,omitempty"` @@ -157,12 +157,11 @@ func TransformPost(post *appbsky.FeedPost, ident *identity.Identity, rkey, cid s } } - return PostDoc{ + doc := PostDoc{ DocIndexTs: time.Now().UTC().Format(util.ISO8601), DID: ident.DID.String(), RecordRkey: rkey, RecordCID: cid, - CreatedAt: post.CreatedAt, Text: post.Text, LangCode: post.Langs, LangCodeIso2: langCodeIso2, @@ -177,6 +176,12 @@ func TransformPost(post *appbsky.FeedPost, ident *identity.Identity, rkey, cid s Hashtag: parseHashtags(post.Text), Emoji: parseEmojis(post.Text), } + + if post.CreatedAt != "" { + doc.CreatedAt = &post.CreatedAt + } + + return doc } func parseHashtags(s string) []string {