Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more verbose logging to search indexing operations #333

Merged
merged 3 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 39 additions & 16 deletions search/indexing.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import (
)

func (s *Server) deletePost(ctx context.Context, ident *identity.Identity, rkey string) error {
s.logger.Info("deleting post from index", "repo", ident.DID, "rkey", rkey)
log := s.logger.With("repo", ident.DID, "rkey", rkey, "op", "deletePost")
log.Info("deleting post from index")
docID := fmt.Sprintf("%s_%s", ident.DID.String(), rkey)
req := esapi.DeleteRequest{
Index: s.postIndex,
Expand All @@ -31,28 +32,33 @@ func (s *Server) deletePost(ctx context.Context, ident *identity.Identity, rkey
return fmt.Errorf("failed to delete post: %w", err)
}
defer res.Body.Close()
io.ReadAll(res.Body)
body, err := io.ReadAll(res.Body)
if err != nil {
return fmt.Errorf("failed to read indexing response: %w", err)
}
if res.IsError() {
s.logger.Warn("opensearch indexing error", "status_code", res.StatusCode, "response", res)
log.Warn("opensearch indexing error", "status_code", res.StatusCode, "response", res, "body", string(body))
return fmt.Errorf("indexing error, code=%d", res.StatusCode)
}
return nil
}

func (s *Server) indexPost(ctx context.Context, ident *identity.Identity, rec *appbsky.FeedPost, path string, rcid cid.Cid) error {

log := s.logger.With("repo", ident.DID, "path", path, "op", "indexPost")
parts := strings.SplitN(path, "/", 3)
// TODO: replace with an atproto/syntax package type for TID
var tidRegex = regexp.MustCompile(`^[234567abcdefghijklmnopqrstuvwxyz]{13}$`)
if len(parts) != 2 || !tidRegex.MatchString(parts[1]) {
s.logger.Warn("skipping index post record with weird path/TID", "did", ident.DID, "path", path)
log.Warn("skipping index post record with weird path/TID", "did", ident.DID, "path", path)
return nil
}
rkey := parts[1]

log = log.With("rkey", rkey)

_, err := util.ParseTimestamp(rec.CreatedAt)
if err != nil {
s.logger.Warn("post had invalid timestamp", "repo", ident.DID, "rkey", rkey, "createdAt", rec.CreatedAt, "parseErr", err)
log.Warn("post had invalid timestamp", "createdAt", rec.CreatedAt, "parseErr", err)
rec.CreatedAt = ""
}

Expand All @@ -62,7 +68,7 @@ func (s *Server) indexPost(ctx context.Context, ident *identity.Identity, rec *a
return err
}

s.logger.Debug("indexing post", "did", ident.DID, "rkey", rkey)
log.Debug("indexing post")
req := esapi.IndexRequest{
Index: s.postIndex,
DocumentID: doc.DocId(),
Expand All @@ -71,26 +77,31 @@ func (s *Server) indexPost(ctx context.Context, ident *identity.Identity, rec *a

res, err := req.Do(ctx, s.escli)
if err != nil {
log.Warn("failed to send indexing request", "err", err)
return fmt.Errorf("failed to send indexing request: %w", err)
}
defer res.Body.Close()
io.ReadAll(res.Body)
body, err := io.ReadAll(res.Body)
if err != nil {
log.Warn("failed to read indexing response", "err", err)
return fmt.Errorf("failed to read indexing response: %w", err)
}
if res.IsError() {
s.logger.Warn("opensearch indexing error", "status_code", res.StatusCode, "response", res)
log.Warn("opensearch indexing error", "status_code", res.StatusCode, "response", res, "body", string(body))
return fmt.Errorf("indexing error, code=%d", res.StatusCode)
}
return nil
}

func (s *Server) indexProfile(ctx context.Context, ident *identity.Identity, rec *appbsky.ActorProfile, path string, rcid cid.Cid) error {

log := s.logger.With("repo", ident.DID, "path", path, "op", "indexProfile")
parts := strings.SplitN(path, "/", 3)
if len(parts) != 2 || parts[1] != "self" {
s.logger.Warn("skipping indexing non-canonical profile record", "did", ident.DID, "path", path)
log.Warn("skipping indexing non-canonical profile record", "did", ident.DID, "path", path)
return nil
}

s.logger.Info("indexing profile", "did", ident.DID, "handle", ident.Handle)
log.Info("indexing profile", "handle", ident.Handle)

doc := TransformProfile(rec, ident, rcid.String())
b, err := json.Marshal(doc)
Expand All @@ -105,18 +116,24 @@ func (s *Server) indexProfile(ctx context.Context, ident *identity.Identity, rec

res, err := req.Do(ctx, s.escli)
if err != nil {
log.Warn("failed to send indexing request", "err", err)
return fmt.Errorf("failed to send indexing request: %w", err)
}
defer res.Body.Close()
io.ReadAll(res.Body)
body, err := io.ReadAll(res.Body)
if err != nil {
log.Warn("failed to read indexing response", "err", err)
return fmt.Errorf("failed to read indexing response: %w", err)
}
if res.IsError() {
s.logger.Warn("opensearch indexing error", "status_code", res.StatusCode, "response", res)
log.Warn("opensearch indexing error", "status_code", res.StatusCode, "response", res, "body", string(body))
return fmt.Errorf("indexing error, code=%d", res.StatusCode)
}
return nil
}

func (s *Server) updateUserHandle(ctx context.Context, did, handle string) error {
log := s.logger.With("repo", did, "op", "updateUserHandle", "handle", handle)
b, err := json.Marshal(map[string]any{
"script": map[string]any{
"source": "ctx._source.handle = params.handle",
Expand All @@ -127,6 +144,7 @@ func (s *Server) updateUserHandle(ctx context.Context, did, handle string) error
},
})
if err != nil {
log.Warn("failed to marshal update script", "err", err)
return err
}

Expand All @@ -138,12 +156,17 @@ func (s *Server) updateUserHandle(ctx context.Context, did, handle string) error

res, err := req.Do(ctx, s.escli)
if err != nil {
log.Warn("failed to send indexing request", "err", err)
return fmt.Errorf("failed to send indexing request: %w", err)
}
defer res.Body.Close()
io.ReadAll(res.Body)
body, err := io.ReadAll(res.Body)
if err != nil {
log.Warn("failed to read indexing response", "err", err)
return fmt.Errorf("failed to read indexing response: %w", err)
}
if res.IsError() {
s.logger.Warn("opensearch indexing error", "status_code", res.StatusCode, "response", res)
log.Warn("opensearch indexing error", "status_code", res.StatusCode, "response", res, "body", string(body))
return fmt.Errorf("indexing error, code=%d", res.StatusCode)
}
return nil
Expand Down
11 changes: 8 additions & 3 deletions search/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type PostDoc struct {
DID string `json:"did"`
RecordRkey string `json:"record_rkey"`
RecordCID string `json:"record_cid"`
CreatedAt string `json:"created_at"`
CreatedAt *string `json:"created_at,omitempty"`
Text string `json:"text"`
LangCode []string `json:"lang_code,omitempty"`
LangCodeIso2 []string `json:"lang_code_iso2,omitempty"`
Expand Down Expand Up @@ -157,12 +157,11 @@ func TransformPost(post *appbsky.FeedPost, ident *identity.Identity, rkey, cid s
}
}

return PostDoc{
doc := PostDoc{
DocIndexTs: time.Now().UTC().Format(util.ISO8601),
DID: ident.DID.String(),
RecordRkey: rkey,
RecordCID: cid,
CreatedAt: post.CreatedAt,
Text: post.Text,
LangCode: post.Langs,
LangCodeIso2: langCodeIso2,
Expand All @@ -177,6 +176,12 @@ func TransformPost(post *appbsky.FeedPost, ident *identity.Identity, rkey, cid s
Hashtag: parseHashtags(post.Text),
Emoji: parseEmojis(post.Text),
}

if post.CreatedAt != "" {
doc.CreatedAt = &post.CreatedAt
}

return doc
}

func parseHashtags(s string) []string {
Expand Down
Loading