From c3b2c45b6039086ae78f2b6b3cb8ee26d4f1efef Mon Sep 17 00:00:00 2001 From: Jaz Volpert Date: Fri, 22 Sep 2023 20:54:08 +0000 Subject: [PATCH 1/3] Add more verbose logging to search indexing operations --- search/indexing.go | 55 ++++++++++++++++++++++++++++++++-------------- 1 file changed, 39 insertions(+), 16 deletions(-) diff --git a/search/indexing.go b/search/indexing.go index a69e21c50..cc54f0373 100644 --- a/search/indexing.go +++ b/search/indexing.go @@ -18,7 +18,8 @@ import ( ) func (s *Server) deletePost(ctx context.Context, ident *identity.Identity, rkey string) error { - s.logger.Info("deleting post from index", "repo", ident.DID, "rkey", rkey) + log := s.logger.With("repo", ident.DID, "rkey", rkey, "op", "deletePost") + log.Info("deleting post from index") docID := fmt.Sprintf("%s_%s", ident.DID.String(), rkey) req := esapi.DeleteRequest{ Index: s.postIndex, @@ -31,28 +32,33 @@ func (s *Server) deletePost(ctx context.Context, ident *identity.Identity, rkey return fmt.Errorf("failed to delete post: %w", err) } defer res.Body.Close() - io.ReadAll(res.Body) + body, err := io.ReadAll(res.Body) + if err != nil { + return fmt.Errorf("failed to read indexing response: %w", err) + } if res.IsError() { - s.logger.Warn("opensearch indexing error", "status_code", res.StatusCode, "response", res) + log.Warn("opensearch indexing error", "status_code", res.StatusCode, "response", res, "body", string(body)) return fmt.Errorf("indexing error, code=%d", res.StatusCode) } return nil } func (s *Server) indexPost(ctx context.Context, ident *identity.Identity, rec *appbsky.FeedPost, path string, rcid cid.Cid) error { - + log := s.logger.With("repo", ident.DID, "path", path, "op", "indexPost") parts := strings.SplitN(path, "/", 3) // TODO: replace with an atproto/syntax package type for TID var tidRegex = regexp.MustCompile(`^[234567abcdefghijklmnopqrstuvwxyz]{13}$`) if len(parts) != 2 || !tidRegex.MatchString(parts[1]) { - s.logger.Warn("skipping index post record with weird path/TID", "did", ident.DID, "path", path) + log.Warn("skipping index post record with weird path/TID", "did", ident.DID, "path", path) return nil } rkey := parts[1] + log = log.With("rkey", rkey) + _, err := util.ParseTimestamp(rec.CreatedAt) if err != nil { - s.logger.Warn("post had invalid timestamp", "repo", ident.DID, "rkey", rkey, "createdAt", rec.CreatedAt, "parseErr", err) + log.Warn("post had invalid timestamp", "createdAt", rec.CreatedAt, "parseErr", err) rec.CreatedAt = "" } @@ -62,7 +68,7 @@ func (s *Server) indexPost(ctx context.Context, ident *identity.Identity, rec *a return err } - s.logger.Debug("indexing post", "did", ident.DID, "rkey", rkey) + log.Debug("indexing post") req := esapi.IndexRequest{ Index: s.postIndex, DocumentID: doc.DocId(), @@ -71,26 +77,31 @@ func (s *Server) indexPost(ctx context.Context, ident *identity.Identity, rec *a res, err := req.Do(ctx, s.escli) if err != nil { + log.Warn("failed to send indexing request", "err", err) return fmt.Errorf("failed to send indexing request: %w", err) } defer res.Body.Close() - io.ReadAll(res.Body) + body, err := io.ReadAll(res.Body) + if err != nil { + log.Warn("failed to read indexing response", "err", err) + return fmt.Errorf("failed to read indexing response: %w", err) + } if res.IsError() { - s.logger.Warn("opensearch indexing error", "status_code", res.StatusCode, "response", res) + log.Warn("opensearch indexing error", "status_code", res.StatusCode, "response", res, "body", string(body)) return fmt.Errorf("indexing error, code=%d", res.StatusCode) } return nil } func (s *Server) indexProfile(ctx context.Context, ident *identity.Identity, rec *appbsky.ActorProfile, path string, rcid cid.Cid) error { - + log := s.logger.With("repo", ident.DID, "path", path, "op", "indexProfile") parts := strings.SplitN(path, "/", 3) if len(parts) != 2 || parts[1] != "self" { - s.logger.Warn("skipping indexing non-canonical profile record", "did", ident.DID, "path", path) + log.Warn("skipping indexing non-canonical profile record", "did", ident.DID, "path", path) return nil } - s.logger.Info("indexing profile", "did", ident.DID, "handle", ident.Handle) + log.Info("indexing profile", "handle", ident.Handle) doc := TransformProfile(rec, ident, rcid.String()) b, err := json.Marshal(doc) @@ -105,18 +116,24 @@ func (s *Server) indexProfile(ctx context.Context, ident *identity.Identity, rec res, err := req.Do(ctx, s.escli) if err != nil { + log.Warn("failed to send indexing request", "err", err) return fmt.Errorf("failed to send indexing request: %w", err) } defer res.Body.Close() - io.ReadAll(res.Body) + body, err := io.ReadAll(res.Body) + if err != nil { + log.Warn("failed to read indexing response", "err", err) + return fmt.Errorf("failed to read indexing response: %w", err) + } if res.IsError() { - s.logger.Warn("opensearch indexing error", "status_code", res.StatusCode, "response", res) + log.Warn("opensearch indexing error", "status_code", res.StatusCode, "response", res, "body", string(body)) return fmt.Errorf("indexing error, code=%d", res.StatusCode) } return nil } func (s *Server) updateUserHandle(ctx context.Context, did, handle string) error { + log := s.logger.With("repo", did, "op", "updateUserHandle", "handle", handle) b, err := json.Marshal(map[string]any{ "script": map[string]any{ "source": "ctx._source.handle = params.handle", @@ -127,6 +144,7 @@ func (s *Server) updateUserHandle(ctx context.Context, did, handle string) error }, }) if err != nil { + log.Warn("failed to marshal update script", "err", err) return err } @@ -138,12 +156,17 @@ func (s *Server) updateUserHandle(ctx context.Context, did, handle string) error res, err := req.Do(ctx, s.escli) if err != nil { + log.Warn("failed to send indexing request", "err", err) return fmt.Errorf("failed to send indexing request: %w", err) } defer res.Body.Close() - io.ReadAll(res.Body) + body, err := io.ReadAll(res.Body) + if err != nil { + log.Warn("failed to read indexing response", "err", err) + return fmt.Errorf("failed to read indexing response: %w", err) + } if res.IsError() { - s.logger.Warn("opensearch indexing error", "status_code", res.StatusCode, "response", res) + log.Warn("opensearch indexing error", "status_code", res.StatusCode, "response", res, "body", string(body)) return fmt.Errorf("indexing error, code=%d", res.StatusCode) } return nil From 8911ead778e264ae4df8f33cdaaa4b922708f381 Mon Sep 17 00:00:00 2001 From: Jaz Volpert Date: Fri, 22 Sep 2023 21:07:42 +0000 Subject: [PATCH 2/3] Don't try to index posts with invalid timestamps, an empty timestamp violates our ES schema --- search/indexing.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/search/indexing.go b/search/indexing.go index cc54f0373..5b0bfb512 100644 --- a/search/indexing.go +++ b/search/indexing.go @@ -59,7 +59,7 @@ func (s *Server) indexPost(ctx context.Context, ident *identity.Identity, rec *a _, err := util.ParseTimestamp(rec.CreatedAt) if err != nil { log.Warn("post had invalid timestamp", "createdAt", rec.CreatedAt, "parseErr", err) - rec.CreatedAt = "" + return fmt.Errorf("post had invalid timestamp: %w", err) } doc := TransformPost(rec, ident, rkey, rcid.String()) From 1c5152e105fde55bdaf349e7ead29e80fd6e2f07 Mon Sep 17 00:00:00 2001 From: Jaz Volpert Date: Fri, 22 Sep 2023 21:19:27 +0000 Subject: [PATCH 3/3] Support invalid createdAt by not passing it to ES --- search/indexing.go | 2 +- search/transform.go | 11 ++++++++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/search/indexing.go b/search/indexing.go index 5b0bfb512..cc54f0373 100644 --- a/search/indexing.go +++ b/search/indexing.go @@ -59,7 +59,7 @@ func (s *Server) indexPost(ctx context.Context, ident *identity.Identity, rec *a _, err := util.ParseTimestamp(rec.CreatedAt) if err != nil { log.Warn("post had invalid timestamp", "createdAt", rec.CreatedAt, "parseErr", err) - return fmt.Errorf("post had invalid timestamp: %w", err) + rec.CreatedAt = "" } doc := TransformPost(rec, ident, rkey, rcid.String()) diff --git a/search/transform.go b/search/transform.go index 9a922d7df..3f8ed3834 100644 --- a/search/transform.go +++ b/search/transform.go @@ -31,7 +31,7 @@ type PostDoc struct { DID string `json:"did"` RecordRkey string `json:"record_rkey"` RecordCID string `json:"record_cid"` - CreatedAt string `json:"created_at"` + CreatedAt *string `json:"created_at,omitempty"` Text string `json:"text"` LangCode []string `json:"lang_code,omitempty"` LangCodeIso2 []string `json:"lang_code_iso2,omitempty"` @@ -157,12 +157,11 @@ func TransformPost(post *appbsky.FeedPost, ident *identity.Identity, rkey, cid s } } - return PostDoc{ + doc := PostDoc{ DocIndexTs: time.Now().UTC().Format(util.ISO8601), DID: ident.DID.String(), RecordRkey: rkey, RecordCID: cid, - CreatedAt: post.CreatedAt, Text: post.Text, LangCode: post.Langs, LangCodeIso2: langCodeIso2, @@ -177,6 +176,12 @@ func TransformPost(post *appbsky.FeedPost, ident *identity.Identity, rkey, cid s Hashtag: parseHashtags(post.Text), Emoji: parseEmojis(post.Text), } + + if post.CreatedAt != "" { + doc.CreatedAt = &post.CreatedAt + } + + return doc } func parseHashtags(s string) []string {