From 9a4fc577e572e1db97a385378bf3a5401253e55f Mon Sep 17 00:00:00 2001 From: totegamma Date: Sun, 13 Oct 2024 00:51:29 +0900 Subject: [PATCH] fix commitlog exporter --- core/dbschema.go | 2 +- x/store/repository.go | 62 +++++++++++++++++++++++-------------------- 2 files changed, 34 insertions(+), 30 deletions(-) diff --git a/core/dbschema.go b/core/dbschema.go index a140bd6d..0f78cff5 100644 --- a/core/dbschema.go +++ b/core/dbschema.go @@ -220,7 +220,7 @@ type Job struct { type CommitOwner struct { ID uint `json:"id" gorm:"primaryKey;auto_increment"` CommitLogID uint `json:"commitLogID" gorm:"index"` - Owner string `json:"owner" gorm:"type:char(42)"` + Owner string `json:"owner" gorm:"type:char(42);index"` } type CommitLog struct { diff --git a/x/store/repository.go b/x/store/repository.go index e01a3652..9116bc33 100644 --- a/x/store/repository.go +++ b/x/store/repository.go @@ -203,14 +203,28 @@ func (r *repository) SyncCommitFile(ctx context.Context, owner string) error { } defer r.rdb.Del(ctx, lockKey) - var logs string lastSignedAt, err := r.getLatestCommitDateByOwner(ctx, owner) if err != nil { span.RecordError(err) return err } - var pageSize = 10 + userlogPath := filepath.Join("/tmp/concrnt", "/user") + err = os.MkdirAll(userlogPath, 0755) + if err != nil { + slog.Error("failed to create repository directory:", slog.String("error", err.Error())) + panic(err) + } + + filename := fmt.Sprintf("%s.log", owner) + userStore, err := os.OpenFile(filepath.Join(userlogPath, filename), os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644) + if err != nil { + slog.Error("failed to open user log file:", slog.String("error", err.Error())) + return err + } + defer userStore.Close() + + var pageSize = 1000 var firstCommitDate time.Time err = r.db.WithContext(ctx). @@ -247,6 +261,9 @@ func (r *repository) SyncCommitFile(ctx context.Context, owner string) error { progressCtx, cancel := context.WithCancel(ctx) defer cancel() + progress := float64(lastSignedAt.Sub(firstCommitDate).Seconds()) / float64(latestCommitDate.Sub(firstCommitDate).Seconds()) + r.rdb.SetNX(ctx, fmt.Sprintf("store:progress:%s", owner), fmt.Sprintf("%.2f%%", progress*100), 10*time.Minute) + // log dump progress go func() { for { @@ -256,7 +273,7 @@ func (r *repository) SyncCommitFile(ctx context.Context, owner string) error { case <-time.After(10 * time.Second): progress := float64(lastSignedAt.Sub(firstCommitDate).Seconds()) / float64(latestCommitDate.Sub(firstCommitDate).Seconds()) fmt.Printf("dumping %s logs. (%.2f%%)\n", owner, progress*100) - r.rdb.SetNX(ctx, fmt.Sprintf("store:progress:%s", owner), fmt.Sprintf("%.2f%%", progress), time.Hour) + r.rdb.SetNX(ctx, fmt.Sprintf("store:progress:%s", owner), fmt.Sprintf("%.2f%%", progress*100), 10*time.Minute) // re-set lock r.rdb.SetNX(ctx, lockKey, "1", time.Minute) @@ -265,29 +282,37 @@ func (r *repository) SyncCommitFile(ctx context.Context, owner string) error { }() for { - fmt.Printf("dump lastSignedAt: %v\n", lastSignedAt) var commits []core.CommitLog - query := r.db.WithContext(ctx). + query := r.db.Debug().WithContext(ctx). Joins("JOIN commit_owners ON commit_owners.commit_log_id = commit_logs.id"). Where("commit_owners.owner = ?", owner). Where("commit_logs.is_ephemeral = ?", false) - if lastSignedAt.IsZero() { - query = query.Order("commit_logs.signed_at ASC") + if !lastSignedAt.IsZero() { + query = query.Where("commit_logs.signed_at > ?", lastSignedAt) } - err = query.Find(&commits).Limit(pageSize).Error + q := query. + Order("commit_logs.signed_at ASC"). + Limit(pageSize).Find(&commits) + err = q.Error if err != nil { span.RecordError(err) return err } + var logs string for _, commit := range commits { // ID Owner Signature Document logs += fmt.Sprintf("%s %s %s %s\n", commit.DocumentID, owner, commit.Signature, commit.Document) } + _, err = userStore.WriteString(logs) + if err != nil { + slog.Error("failed to write to user log file:", slog.String("error", err.Error())) + return err + } if len(commits) > 0 { lastSignedAt = commits[len(commits)-1].SignedAt @@ -298,26 +323,5 @@ func (r *repository) SyncCommitFile(ctx context.Context, owner string) error { } } - userlogPath := filepath.Join("/tmp/concrnt", "/user") - err = os.MkdirAll(userlogPath, 0755) - if err != nil { - slog.Error("failed to create repository directory:", slog.String("error", err.Error())) - panic(err) - } - - filename := fmt.Sprintf("%s.log", owner) - userStore, err := os.OpenFile(filepath.Join(userlogPath, filename), os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644) - if err != nil { - slog.Error("failed to open user log file:", slog.String("error", err.Error())) - return err - } - defer userStore.Close() - - _, err = userStore.WriteString(logs) - if err != nil { - slog.Error("failed to write to user log file:", slog.String("error", err.Error())) - return err - } - return nil }