Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: s3 FormData upload #1614

Merged
merged 16 commits into from
Dec 26, 2023
8 changes: 3 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ go 1.19

require (
firebase.google.com/go v3.13.0+incompatible
github.com/OpenIMSDK/protocol v0.0.39
github.com/OpenIMSDK/tools v0.0.21
github.com/bwmarrin/snowflake v0.3.0 // indirect
github.com/dtm-labs/rockscache v0.1.1
github.com/gin-gonic/gin v1.9.1
Expand Down Expand Up @@ -33,8 +35,6 @@ require github.com/google/uuid v1.3.1

require (
github.com/IBM/sarama v1.41.3
github.com/OpenIMSDK/protocol v0.0.36
github.com/OpenIMSDK/tools v0.0.21
github.com/aliyun/aliyun-oss-go-sdk v2.2.9+incompatible
github.com/go-redis/redis v6.15.9+incompatible
github.com/redis/go-redis/v9 v9.2.1
Expand Down Expand Up @@ -133,7 +133,7 @@ require (
golang.org/x/oauth2 v0.13.0 // indirect
golang.org/x/sys v0.14.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20231002182017-d307bd883b97 // indirect
Expand All @@ -156,5 +156,3 @@ require (
golang.org/x/crypto v0.14.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
)

replace github.com/OpenIMSDK/protocol v0.0.36 => github.com/luhaoling/protocol v0.0.0-20231222100538-d625562d53d5
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ firebase.google.com/go v3.13.0+incompatible/go.mod h1:xlah6XbEyW6tbfSklcfe5FHJIw
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/IBM/sarama v1.41.3 h1:MWBEJ12vHC8coMjdEXFq/6ftO6DUZnQlFYcxtOJFa7c=
github.com/IBM/sarama v1.41.3/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ=
github.com/OpenIMSDK/protocol v0.0.39 h1:DfvFcNGBcfj2vtT7W3uw4U/ipnI7NecTzQdlSYGuQz8=
github.com/OpenIMSDK/protocol v0.0.39/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y=
github.com/OpenIMSDK/tools v0.0.21 h1:iTapc2mIEVH/xl5Nd6jfwPub11Pgp44tVcE1rjB3a48=
github.com/OpenIMSDK/tools v0.0.21/go.mod h1:eg+q4A34Qmu73xkY0mt37FHGMCMfC6CtmOnm0kFEGFI=
github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM=
Expand Down Expand Up @@ -225,8 +227,6 @@ github.com/lestrrat-go/strftime v1.0.6 h1:CFGsDEt1pOpFNU+TJB0nhz9jl+K0hZSLE205Ah
github.com/lestrrat-go/strftime v1.0.6/go.mod h1:f7jQKgV5nnJpYgdEasS+/y7EsTb8ykN2z68n3TtcTaw=
github.com/lithammer/shortuuid v3.0.0+incompatible h1:NcD0xWW/MZYXEHa6ITy6kaXN5nwm/V115vj2YXfhS0w=
github.com/lithammer/shortuuid v3.0.0+incompatible/go.mod h1:FR74pbAuElzOUuenUHTK2Tciko1/vKuIKS9dSkDrA4w=
github.com/luhaoling/protocol v0.0.0-20231222100538-d625562d53d5 h1:nmrJmAgQsCAxKgw109kaTcBV4rMWDRvqOson0ehw708=
github.com/luhaoling/protocol v0.0.0-20231222100538-d625562d53d5/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y=
github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA=
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=
Expand Down Expand Up @@ -453,8 +453,8 @@ golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
Expand Down
2 changes: 2 additions & 0 deletions internal/api/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ func NewGinRouter(discov discoveryregistry.SvcDiscoveryRegistry, rdb redis.Unive
objectGroup.POST("/auth_sign", t.AuthSign)
objectGroup.POST("/complete_multipart_upload", t.CompleteMultipartUpload)
objectGroup.POST("/access_url", t.AccessURL)
objectGroup.POST("/initiate_form_data", t.InitiateFormData)
objectGroup.POST("/complete_form_data", t.CompleteFormData)
objectGroup.GET("/*name", t.ObjectRedirect)
}
// Message
Expand Down
8 changes: 8 additions & 0 deletions internal/api/third.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ func (o *ThirdApi) AccessURL(c *gin.Context) {
a2r.Call(third.ThirdClient.AccessURL, o.Client, c)
}

func (o *ThirdApi) InitiateFormData(c *gin.Context) {
a2r.Call(third.ThirdClient.InitiateFormData, o.Client, c)
}

func (o *ThirdApi) CompleteFormData(c *gin.Context) {
a2r.Call(third.ThirdClient.CompleteFormData, o.Client, c)
}

func (o *ThirdApi) ObjectRedirect(c *gin.Context) {
name := c.Param("name")
if name == "" {
Expand Down
3 changes: 1 addition & 2 deletions internal/rpc/friend/friend.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ type friendServer struct {
}

func (s *friendServer) UpdateFriends(ctx context.Context, req *pbfriend.UpdateFriendsReq) (*pbfriend.UpdateFriendsResp, error) {
//TODO implement me
panic("implement me")
return nil, errs.ErrInternalServer.Wrap("not implemented")
}

func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error {
Expand Down
113 changes: 113 additions & 0 deletions internal/rpc/third/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@

import (
"context"
"encoding/base64"
"encoding/hex"
"encoding/json"
"github.com/google/uuid"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"path"
"strconv"
"time"

Expand Down Expand Up @@ -179,6 +185,113 @@
}, nil
}

func (t *thirdServer) InitiateFormData(ctx context.Context, req *third.InitiateFormDataReq) (*third.InitiateFormDataResp, error) {
if req.Name == "" {
return nil, errs.ErrArgs.Wrap("name is empty")
}
if req.Size <= 0 {
return nil, errs.ErrArgs.Wrap("size must be greater than 0")
}
if err := checkUploadName(ctx, req.Name); err != nil {
return nil, err
}
var duration time.Duration
opUserID := mcontext.GetOpUserID(ctx)
var key string
if authverify.IsManagerUserID(opUserID) {
if req.Millisecond <= 0 {
duration = time.Minute * 10
} else {
duration = time.Millisecond * time.Duration(req.Millisecond)
}
if req.Absolute {
key = req.Name
}
} else {
duration = time.Minute * 10
}
uid, err := uuid.NewRandom()
if err != nil {
return nil, err
}
if key == "" {
date := time.Now().Format("20060102")
key = path.Join(cont.DirectPath, date, opUserID, hex.EncodeToString(uid[:])+path.Ext(req.Name))
}
mate := FormDataMate{
Name: req.Name,
Size: req.Size,
ContentType: req.ContentType,
Group: req.Group,
Key: key,
}
mateData, err := json.Marshal(&mate)
if err != nil {
return nil, err
}
resp, err := t.s3dataBase.FormData(ctx, key, req.Size, req.ContentType, duration)
if err != nil {
return nil, err
}
return &third.InitiateFormDataResp{
Id: base64.RawStdEncoding.EncodeToString(mateData),
Url: resp.URL,
File: resp.File,
Header: toPbMapArray(resp.Header),
FormData: resp.FormData,
Expires: resp.Expires.UnixMilli(),
SuccessCodes: utils.Slice(resp.SuccessCodes, func(code int) int32 {
return int32(code)
}),
}, nil
}

func (t *thirdServer) CompleteFormData(ctx context.Context, req *third.CompleteFormDataReq) (*third.CompleteFormDataResp, error) {
if req.Id == "" {
return nil, errs.ErrArgs.Wrap("id is empty")
}
data, err := base64.RawStdEncoding.DecodeString(req.Id)
if err != nil {
return nil, errs.ErrArgs.Wrap("invalid id " + err.Error())
}
var mate FormDataMate
if err := json.Unmarshal(data, &mate); err != nil {

Check failure on line 258 in internal/rpc/third/s3.go

View workflow job for this annotation

GitHub Actions / lint

shadow: declaration of "err" shadows declaration at line 253 (govet)
return nil, errs.ErrArgs.Wrap("invalid id " + err.Error())
}
if err := checkUploadName(ctx, mate.Name); err != nil {

Check failure on line 261 in internal/rpc/third/s3.go

View workflow job for this annotation

GitHub Actions / lint

shadow: declaration of "err" shadows declaration at line 253 (govet)
return nil, err
}
info, err := t.s3dataBase.StatObject(ctx, mate.Key)
if err != nil {
return nil, err
}
if info.Size > 0 && info.Size != mate.Size {
return nil, errs.ErrData.Wrap("file size mismatch")
}
obj := &relation.ObjectModel{
Name: mate.Name,
UserID: mcontext.GetOpUserID(ctx),
Hash: "etag_" + info.ETag,
Key: info.Key,
Size: info.Size,
ContentType: mate.ContentType,
Group: mate.Group,
CreateTime: time.Now(),
}
if err := t.s3dataBase.SetObject(ctx, obj); err != nil {
return nil, err
}
return &third.CompleteFormDataResp{Url: t.apiAddress(mate.Name)}, nil
}

func (t *thirdServer) apiAddress(name string) string {
return t.apiURL + name
}

type FormDataMate struct {
Name string `json:"name"`
Size int64 `json:"size"`
ContentType string `json:"contentType"`
Group string `json:"group"`
Key string `json:"key"`
}
10 changes: 0 additions & 10 deletions internal/rpc/third/third.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,16 +101,6 @@ type thirdServer struct {
defaultExpire time.Duration
}

func (t *thirdServer) InitiateFormData(ctx context.Context, req *third.InitiateFormDataReq) (*third.InitiateFormDataResp, error) {
//TODO implement me
panic("implement me")
}

func (t *thirdServer) CompleteFormData(ctx context.Context, req *third.CompleteFormDataReq) (*third.CompleteFormDataResp, error) {
//TODO implement me
panic("implement me")
}

func (t *thirdServer) FcmUpdateToken(ctx context.Context, req *third.FcmUpdateTokenReq) (resp *third.FcmUpdateTokenResp, err error) {
err = t.thirdDatabase.FcmUpdateToken(ctx, req.Account, int(req.PlatformID), req.FcmToken, req.ExpireTime)
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions internal/rpc/third/tool.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ import (
)

func toPbMapArray(m map[string][]string) []*third.KeyValues {
if len(m) == 0 {
return nil
}
res := make([]*third.KeyValues, 0, len(m))
for key := range m {
res = append(res, &third.KeyValues{
Expand Down
9 changes: 4 additions & 5 deletions internal/rpc/user/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ type userServer struct {
RegisterCenter registry.SvcDiscoveryRegistry
}

func (s *userServer) UpdateUserInfoEx(ctx context.Context, req *pbuser.UpdateUserInfoExReq) (*pbuser.UpdateUserInfoExResp, error) {
return nil, errs.ErrInternalServer.Wrap("not implemented")
}

func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error {
rdb, err := cache.NewRedis()
if err != nil {
Expand Down Expand Up @@ -484,11 +488,6 @@ func (s *userServer) SearchNotificationAccount(ctx context.Context, req *pbuser.
return &pbuser.SearchNotificationAccountResp{Total: total, NotificationAccounts: accounts}, nil
}

func (s *userServer) UpdateUserInfoEx(ctx context.Context, req *pbuser.UpdateUserInfoExReq) (*pbuser.UpdateUserInfoExResp, error) {
//TODO implement me
panic("implement me")
}

func (s *userServer) GetNotificationAccount(ctx context.Context, req *pbuser.GetNotificationAccountReq) (*pbuser.GetNotificationAccountResp, error) {
if req.UserID == "" {
return nil, errs.ErrArgs.Wrap("userID is empty")
Expand Down
2 changes: 1 addition & 1 deletion pkg/common/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type CmdOpts struct {

func WithCronTaskLogName() func(*CmdOpts) {
return func(opts *CmdOpts) {
opts.loggerPrefixName = "OpenIM.CronTask.log.all"
opts.loggerPrefixName = "openim.crontask.log.all"
}
}

Expand Down
10 changes: 10 additions & 0 deletions pkg/common/db/controller/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type S3Database interface {
CompleteMultipartUpload(ctx context.Context, uploadID string, parts []string) (*cont.UploadResult, error)
AccessURL(ctx context.Context, name string, expire time.Duration, opt *s3.AccessURLOption) (time.Time, string, error)
SetObject(ctx context.Context, info *relation.ObjectModel) error
StatObject(ctx context.Context, name string) (*s3.ObjectInfo, error)
FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error)
}

func NewS3Database(rdb redis.UniversalClient, s3 s3.Interface, obj relation.ObjectInfoModelInterface) S3Database {
Expand Down Expand Up @@ -100,3 +102,11 @@ func (s *s3Database) AccessURL(ctx context.Context, name string, expire time.Dur
}
return expireTime, rawURL, nil
}

func (s *s3Database) StatObject(ctx context.Context, name string) (*s3.ObjectInfo, error) {
return s.s3.StatObject(ctx, name)
}

func (s *s3Database) FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error) {
return s.s3.FormData(ctx, name, size, contentType, duration)
}
1 change: 1 addition & 0 deletions pkg/common/db/s3/cont/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package cont
const (
hashPath = "openim/data/hash/"
tempPath = "openim/temp/"
DirectPath = "openim/direct"
UploadTypeMultipart = 1 // 分片上传
UploadTypePresigned = 2 // 预签名上传
partSeparator = ","
Expand Down
4 changes: 4 additions & 0 deletions pkg/common/db/s3/cont/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,3 +279,7 @@ func (c *Controller) AccessURL(ctx context.Context, name string, expire time.Dur
}
return c.impl.AccessURL(ctx, name, expire, opt)
}

func (c *Controller) FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error) {
return c.impl.FormData(ctx, name, size, contentType, duration)
}
Loading
Loading