Skip to content

Commit

Permalink
fix commitlog exporter
Browse files Browse the repository at this point in the history
  • Loading branch information
totegamma committed Oct 12, 2024
1 parent 82a3c82 commit 9a4fc57
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 30 deletions.
2 changes: 1 addition & 1 deletion core/dbschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ type Job struct {
type CommitOwner struct {
ID uint `json:"id" gorm:"primaryKey;auto_increment"`
CommitLogID uint `json:"commitLogID" gorm:"index"`
Owner string `json:"owner" gorm:"type:char(42)"`
Owner string `json:"owner" gorm:"type:char(42);index"`
}

type CommitLog struct {
Expand Down
62 changes: 33 additions & 29 deletions x/store/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,14 +203,28 @@ func (r *repository) SyncCommitFile(ctx context.Context, owner string) error {
}
defer r.rdb.Del(ctx, lockKey)

var logs string
lastSignedAt, err := r.getLatestCommitDateByOwner(ctx, owner)
if err != nil {
span.RecordError(err)
return err
}

var pageSize = 10
userlogPath := filepath.Join("/tmp/concrnt", "/user")
err = os.MkdirAll(userlogPath, 0755)
if err != nil {
slog.Error("failed to create repository directory:", slog.String("error", err.Error()))
panic(err)
}

filename := fmt.Sprintf("%s.log", owner)
userStore, err := os.OpenFile(filepath.Join(userlogPath, filename), os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644)
if err != nil {
slog.Error("failed to open user log file:", slog.String("error", err.Error()))
return err
}
defer userStore.Close()

var pageSize = 1000

var firstCommitDate time.Time
err = r.db.WithContext(ctx).
Expand Down Expand Up @@ -247,6 +261,9 @@ func (r *repository) SyncCommitFile(ctx context.Context, owner string) error {
progressCtx, cancel := context.WithCancel(ctx)
defer cancel()

progress := float64(lastSignedAt.Sub(firstCommitDate).Seconds()) / float64(latestCommitDate.Sub(firstCommitDate).Seconds())
r.rdb.SetNX(ctx, fmt.Sprintf("store:progress:%s", owner), fmt.Sprintf("%.2f%%", progress*100), 10*time.Minute)

// log dump progress
go func() {
for {
Expand All @@ -256,7 +273,7 @@ func (r *repository) SyncCommitFile(ctx context.Context, owner string) error {
case <-time.After(10 * time.Second):
progress := float64(lastSignedAt.Sub(firstCommitDate).Seconds()) / float64(latestCommitDate.Sub(firstCommitDate).Seconds())
fmt.Printf("dumping %s logs. (%.2f%%)\n", owner, progress*100)
r.rdb.SetNX(ctx, fmt.Sprintf("store:progress:%s", owner), fmt.Sprintf("%.2f%%", progress), time.Hour)
r.rdb.SetNX(ctx, fmt.Sprintf("store:progress:%s", owner), fmt.Sprintf("%.2f%%", progress*100), 10*time.Minute)

// re-set lock
r.rdb.SetNX(ctx, lockKey, "1", time.Minute)
Expand All @@ -265,29 +282,37 @@ func (r *repository) SyncCommitFile(ctx context.Context, owner string) error {
}()

for {
fmt.Printf("dump lastSignedAt: %v\n", lastSignedAt)
var commits []core.CommitLog

query := r.db.WithContext(ctx).
query := r.db.Debug().WithContext(ctx).
Joins("JOIN commit_owners ON commit_owners.commit_log_id = commit_logs.id").
Where("commit_owners.owner = ?", owner).
Where("commit_logs.is_ephemeral = ?", false)

if lastSignedAt.IsZero() {
query = query.Order("commit_logs.signed_at ASC")
if !lastSignedAt.IsZero() {
query = query.Where("commit_logs.signed_at > ?", lastSignedAt)
}

err = query.Find(&commits).Limit(pageSize).Error
q := query.
Order("commit_logs.signed_at ASC").
Limit(pageSize).Find(&commits)

err = q.Error
if err != nil {
span.RecordError(err)
return err
}

var logs string
for _, commit := range commits {
// ID Owner Signature Document
logs += fmt.Sprintf("%s %s %s %s\n", commit.DocumentID, owner, commit.Signature, commit.Document)
}
_, err = userStore.WriteString(logs)
if err != nil {
slog.Error("failed to write to user log file:", slog.String("error", err.Error()))
return err
}

if len(commits) > 0 {
lastSignedAt = commits[len(commits)-1].SignedAt
Expand All @@ -298,26 +323,5 @@ func (r *repository) SyncCommitFile(ctx context.Context, owner string) error {
}
}

userlogPath := filepath.Join("/tmp/concrnt", "/user")
err = os.MkdirAll(userlogPath, 0755)
if err != nil {
slog.Error("failed to create repository directory:", slog.String("error", err.Error()))
panic(err)
}

filename := fmt.Sprintf("%s.log", owner)
userStore, err := os.OpenFile(filepath.Join(userlogPath, filename), os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644)
if err != nil {
slog.Error("failed to open user log file:", slog.String("error", err.Error()))
return err
}
defer userStore.Close()

_, err = userStore.WriteString(logs)
if err != nil {
slog.Error("failed to write to user log file:", slog.String("error", err.Error()))
return err
}

return nil
}

0 comments on commit 9a4fc57

Please sign in to comment.