-
Notifications
You must be signed in to change notification settings - Fork 0
/
api.services.go
142 lines (125 loc) · 4.34 KB
/
api.services.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package main
import (
"context"
"time"
"go.uber.org/zap"
)
// @title Book Store API
// @version 1.0
// @description This provides a CRUD service on Book.
// @contact.name Jerome Amon
// @contact.url https://learn.cloudmentor-scale.com/contact
// @license.name MIT
// @license.url https://github.com/jeamon/demo-redis/blob/main/LICENSE
// @host localhost:8080
// @BasePath /v1
// @externalDocs.description OpenAPI
// @externalDocs.url https://swagger.io/resources/open-api/
type BookServiceProvider interface {
Add(ctx context.Context, id string, book Book) error
GetOne(ctx context.Context, id string) (Book, error)
Delete(ctx context.Context, id string) error
Update(ctx context.Context, id string, book Book) (Book, error)
GetAll(ctx context.Context) ([]Book, error)
DeleteAll(ctx context.Context, requestid string)
}
type BookService struct {
logger *zap.Logger
config *Config
clock Clocker
pstorage BookStorage // primary storage
bstorage BookStorage // backup storage
queue Queuer
}
func NewBookService(logger *zap.Logger, config *Config, clock Clocker, pstorage BookStorage, bstorage BookStorage, queue Queuer) BookServiceProvider {
return &BookService{
logger: logger,
config: config,
clock: clock,
pstorage: pstorage,
bstorage: bstorage,
queue: queue,
}
}
func (bs *BookService) Add(ctx context.Context, id string, book Book) error {
err := bs.pstorage.Add(ctx, id, book)
if err != nil {
return err
}
if perr := bs.queue.Push(ctx, CreateQueue, book); perr != nil {
bs.logger.Error("service: failed to push book to queue", zap.String("qid", CreateQueue), zap.Error(perr))
}
return err
}
func (bs *BookService) GetOne(ctx context.Context, id string) (Book, error) {
book, err := bs.pstorage.GetOne(ctx, id)
if err == nil {
return book, err
}
book, err = bs.bstorage.GetOne(ctx, id)
if err != nil {
return book, err
}
if perr := bs.pstorage.Add(ctx, id, book); perr != nil {
bs.logger.Error("service: failed to cache book into pstorage", zap.String("id", id), zap.Error(perr))
}
return book, err
}
func (bs *BookService) Delete(ctx context.Context, id string) error {
err := bs.pstorage.Delete(ctx, id)
if err != nil {
return err
}
if perr := bs.queue.Push(ctx, DeleteQueue, Book{ID: id}); perr != nil {
bs.logger.Error("service: failed to push to queue", zap.String("qid", DeleteQueue), zap.Error(perr))
}
return err
}
func (bs *BookService) Update(ctx context.Context, id string, book Book) (Book, error) {
book.UpdatedAt = bs.clock.Now().String()
b, err := bs.pstorage.Update(ctx, id, book)
if err != nil {
return b, err
}
if perr := bs.queue.Push(ctx, UpdateQueue, book); perr != nil {
bs.logger.Error("service: failed to push to queue", zap.String("qid", UpdateQueue), zap.Error(perr))
}
return b, err
}
// GetAll fetches all books from backup storage. In case there is nothing
// or an error occurred, it fallback to primary storage results.
func (bs *BookService) GetAll(ctx context.Context) ([]Book, error) {
bbooks, berr := bs.bstorage.GetAll(ctx)
if berr != nil || len(bbooks) == 0 {
return bs.pstorage.GetAll(ctx)
}
return bbooks, berr
}
// DeleteAll removes all books from primary storage (cache). This cleanup operation
// is decoupled from the request context and uses a timeout of 10 mins.
func (bs *BookService) DeleteAll(_ context.Context, rid string) {
opsCtx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
start := bs.clock.Now()
errChan := make(chan error, 1)
go func() {
errChan <- bs.pstorage.DeleteAll(opsCtx)
}()
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-opsCtx.Done():
bs.logger.Error("service: timeout clearing books cache", zap.Duration("duration", time.Since(start)), zap.String("request.id", rid), zap.Error(opsCtx.Err()))
case <-ticker.C:
bs.logger.Info("service: books cache clearing still running ", zap.Duration("duration", time.Since(start)), zap.String("request.id", rid))
case err := <-errChan:
if err != nil {
bs.logger.Error("service: error clearing books cache", zap.Duration("duration", time.Since(start)), zap.String("request.id", rid), zap.Error(err))
} else {
bs.logger.Info("service: books cache clearing completed", zap.Duration("duration", time.Since(start)), zap.String("request.id", rid))
}
return
}
}
}