Skip to content

Commit

Permalink
completed sync system with telegram
Browse files Browse the repository at this point in the history
  • Loading branch information
faisalraja committed Jan 5, 2022
1 parent d4ef511 commit 70d076b
Show file tree
Hide file tree
Showing 7 changed files with 224 additions and 5 deletions.
2 changes: 2 additions & 0 deletions backend/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/altlimit/dmedia/model"
"github.com/altlimit/dmedia/sync"
"github.com/altlimit/dmedia/util"
"github.com/go-playground/validator/v10"
"github.com/gorilla/handlers"
Expand Down Expand Up @@ -71,6 +72,7 @@ func (ae alertError) Error() string {

// NewServer returns the instance of api server that implements the Handler interface
func NewServer() *Server {
sync.Init()
r := mux.NewRouter()

srv := &Server{
Expand Down
2 changes: 2 additions & 0 deletions backend/api/media.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"strconv"
"strings"

"github.com/altlimit/dmedia/sync"
"github.com/altlimit/dmedia/util"
"github.com/gorilla/mux"
)
Expand Down Expand Up @@ -72,6 +73,7 @@ func (s *Server) handleDeleteMedia() http.HandlerFunc {
if err := u.DeleteMediaById(intIds); err != nil {
return err
}
go sync.ScheduleSync(u.ID)
return nil
})
}
5 changes: 4 additions & 1 deletion backend/api/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"net/http"

"github.com/altlimit/dmedia/model"
"github.com/altlimit/dmedia/sync"
"github.com/altlimit/dmedia/util"
"github.com/gorilla/mux"
)
Expand All @@ -22,6 +23,7 @@ func (s *Server) handleCreateSyncLocation() http.HandlerFunc {
if err := req.Save(u); err != nil {
return err
}
go sync.ScheduleSync(u.ID)
return req
})
}
Expand Down Expand Up @@ -49,6 +51,7 @@ func (s *Server) handleSaveSyncLocation() http.HandlerFunc {
if err := loc.Save(u); err != nil {
return err
}
go sync.ScheduleSync(u.ID)
return loc
})
}
Expand All @@ -60,7 +63,7 @@ func (s *Server) handleGetSync() http.HandlerFunc {
if u == nil {
return errAuth
}
syncs, err := model.GetSyncs(u.ID)
syncs, err := model.GetSyncs(u.ID, false)
if err != nil {
return err
}
Expand Down
3 changes: 3 additions & 0 deletions backend/api/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"path/filepath"

"github.com/altlimit/dmedia/model"
"github.com/altlimit/dmedia/sync"
"github.com/altlimit/dmedia/util"
)

Expand Down Expand Up @@ -79,6 +80,7 @@ func (s *Server) handleUpload() http.HandlerFunc {
}
return err
}
go sync.ScheduleSync(u.ID)
return id
})
}
Expand All @@ -100,6 +102,7 @@ func (s *Server) handleUploadDir() http.HandlerFunc {
id, err := u.AddMediaFromPath(file)
log.Printf("AddMedia: %d -> Err: %v -> %s", id, err, file)
}
go sync.ScheduleSync(u.ID)
}()
}
return nil
Expand Down
83 changes: 81 additions & 2 deletions backend/model/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"encoding/json"
"errors"
"fmt"
"strconv"
"strings"
)

var (
Expand All @@ -21,6 +23,13 @@ type (
Deleted *DateTime `json:"deleted,omitempty" db:"deleted"`
Config SyncConfig `json:"config" db:"config" validate:"required"`
}

SyncMedia struct {
ID int64 `json:"id" db:"id"`
LocationID int64 `json:"location_id" db:"location_id"`
MediaID int64 `json:"media_id" db:"media_id"`
Meta string `json:"meta" db:"meta"`
}
)

func (sc *SyncConfig) Scan(src interface{}) error {
Expand Down Expand Up @@ -90,13 +99,17 @@ func GetSyncLocation(userID int64, locID int64) (*SyncLocation, error) {
return loc, nil
}

func GetSyncs(userID int64) ([]SyncLocation, error) {
func GetSyncs(userID int64, all bool) ([]SyncLocation, error) {
db, err := getDB(userID)
if err != nil {
return nil, fmt.Errorf("GetSyncs getDB error: %v", err)
}
var where string
if !all {
where = " WHERE deleted IS NULL"
}
syncs := []SyncLocation{}
if err = db.Select(&syncs, `SELECT * FROM sync_location WHERE deleted IS NULL`); err != nil {
if err = db.Select(&syncs, fmt.Sprintf(`SELECT * FROM sync_location %s`, where)); err != nil {
return nil, fmt.Errorf("GetSyncs select error %v", err)
}
return syncs, nil
Expand All @@ -113,3 +126,69 @@ func DeleteSyncByID(userID int64, locID int64) error {
}
return nil
}

func GetMediaToSync(userID int64, loc *SyncLocation) ([]Media, []SyncMedia, error) {
db, err := getDB(userID)
if err != nil {
return nil, nil, fmt.Errorf("GetMediaToSync getDB error %v", err)
}
medias := []Media{}
toDelete := []SyncMedia{}
if loc.Deleted == nil {
if err := db.Select(&medias, `SELECT * FROM media WHERE id NOT IN(
SELECT media_id FROM sync_media WHERE location_id = $1
) ORDER BY created`, loc.ID); err != nil {
return nil, nil, fmt.Errorf("GetMediaToSync select error %v", err)
}
if err := db.Select(&toDelete, `SELECT * FROM sync_media
WHERE
location_id = $1 AND
media_id NOT IN(
SELECT id FROM media
)`, loc.ID); err != nil {
return nil, nil, fmt.Errorf("GetMediaToSync select meta error %v", err)
}
} else {
if err := db.Select(&toDelete, `SELECT * FROM sync_media
WHERE
location_id = $1`, loc.ID); err != nil {
return nil, nil, fmt.Errorf("GetMediaToSync select * meta error %v", err)
}
}
return medias, toDelete, nil
}

func UpdateSyncMedia(userID int64, loc *SyncLocation, syncMedias []SyncMedia, deletedMedias []SyncMedia) error {
db, err := getDB(userID)
if err != nil {
return fmt.Errorf("UpdateSyncMedia getDB error %v", err)
}
tx, err := db.Beginx()
if err != nil {
return fmt.Errorf("UpdateSyncMedia Beginx error %v", err)
}
for _, sm := range syncMedias {
if _, err := tx.NamedExec(`INSERT INTO sync_media(location_id, media_id, meta)
VALUES (:location_id, :media_id, :meta)`, sm); err != nil {
return fmt.Errorf("UpdateSyncMedia insert err %v -> Rollback: %v", err, tx.Rollback())
}
}
var delIDs []string
for _, dm := range deletedMedias {
delIDs = append(delIDs, strconv.FormatInt(dm.ID, 10))
}
if len(delIDs) > 0 {
if _, err := tx.Exec(fmt.Sprintf(`DELETE FROM sync_media WHERE id IN (%s)`, strings.Join(delIDs, ","))); err != nil {
return fmt.Errorf("UpdateSyncMedia delete error %v -> Rollback: %v", err, tx.Rollback())
}
}
if loc.Deleted != nil {
if _, err := tx.Exec(`DELETE FROM sync_location WHERE id = $1`, loc.ID); err != nil {
return fmt.Errorf("UpdateSyncMedia delete loc err %v -> Rollback: %v", err, tx.Rollback())
}
}
if err := tx.Commit(); err != nil {
return fmt.Errorf("UpdateSyncMedia commit error %v -> Rollback: %v", err, tx.Rollback())
}
return nil
}
118 changes: 118 additions & 0 deletions backend/sync/sync.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,126 @@
package sync

import (
"log"

"github.com/altlimit/dmedia/model"
)

type (
Sync interface {
Valid() bool
Upload(cType string, path string) (string, error)
Delete(meta string) error
}
)

var (
SyncChannel = make(chan int64)
)

func syncListener() {
log.Println("Started sync listener")
for {
userID := <-SyncChannel
SyncUser(userID)
}
}

func Init() {
go syncListener()

users, err := model.GetUsers()
if err != nil {
log.Fatalf("sync get users error %v", err)
}

for _, u := range users {
go func(uID int64) {
SyncChannel <- uID
}(u.ID)
}
log.Println("Initialized sync for", len(users), "users")
}

func ScheduleSync(userID int64) {
SyncChannel <- userID
}

func SyncUser(userID int64) {
locs, err := model.GetSyncs(userID, true)
if err != nil {
log.Printf("SyncUser error get syncs %v", err)
return
}

for _, loc := range locs {
var syncer Sync
if loc.Type == "telegram" {
syncer = &Telegram{
Token: loc.Config["token"].(string),
Channel: loc.Config["channel"].(string),
}
}
if syncer == nil {
log.Println("SyncUser", userID, "location type", loc.Type, "invalid")
continue
} else if !syncer.Valid() {
log.Println("SyncUser", userID, "invalid config", loc.Name)
continue
}
go SyncLocation(userID, &loc, syncer)
}
}

func SyncLocation(userID int64, loc *model.SyncLocation, syncer Sync) {
medias, delMedias, err := model.GetMediaToSync(userID, loc)
if err != nil {
log.Println("SyncLocation[", userID, "][", loc.ID, loc.Name, "] get sync error ", err)
return
}
toUpload := len(medias)
toDelete := len(delMedias)
log.Println("SyncLocation[", userID, "][", loc.ID, loc.Name, "]", toUpload, "# to upload", toDelete, "# to delete")
var (
addSync []model.SyncMedia
delSync []model.SyncMedia
uploaded int
failedUpload int
deleted int
failedDelete int
)
for _, m := range medias {
meta, err := syncer.Upload(m.ContentType, m.Path(userID))
if err != nil {
log.Println("SyncLocation[", userID, "][", loc.ID, loc.Name, "] upload", m.ID, "error", err)
failedUpload++
continue
}
uploaded++
log.Println("SyncLocation[", userID, "][", loc.ID, loc.Name, "] uploaded", m.ID, "progress", uploaded, "/", toUpload, "failed", failedUpload)
addSync = append(addSync, model.SyncMedia{
LocationID: loc.ID,
MediaID: m.ID,
Meta: meta,
})
}
for _, sm := range delMedias {
err = syncer.Delete(sm.Meta)
if err != nil {
log.Println("SyncLocation[", userID, "][", loc.ID, loc.Name, "] delete", sm.MediaID, "error", err)
failedDelete++
continue
}
deleted++
log.Println("SyncLocation[", userID, "][", loc.ID, loc.Name, "] deleted", sm.MediaID, "progress", deleted, "/", toDelete, "failed", failedDelete)
delSync = append(delSync, sm)
}

for i := 0; i < 3; i++ {
if err := model.UpdateSyncMedia(userID, loc, addSync, delSync); err != nil {
log.Println("SyncLocation[", userID, "][", loc.ID, loc.Name, "] update failed", err)
continue
}
break
}
}
16 changes: 14 additions & 2 deletions backend/sync/telegram.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,26 @@ type Telegram struct {
Channel string `json:"channel"`
}

func (t *Telegram) Valid() bool {
return t.Token != "" && t.Channel != ""
}

func (t *Telegram) getURL(method string) string {
return fmt.Sprintf("https://api.telegram.org/bot%s/%s", t.Token, method)
}

func (t *Telegram) Upload(cType string, path string) (string, error) {
form := map[string]string{"chat_id": t.Channel}
var field string
var (
field string
method string
)
if strings.HasPrefix(cType, "video/") {
field = "video"
method = "sendVideo"
} else if strings.HasPrefix(cType, "image/") {
field = "photo"
method = "sendPhoto"
} else {
return "", fmt.Errorf("SendMedia not support")
}
Expand All @@ -39,7 +48,7 @@ func (t *Telegram) Upload(cType string, path string) (string, error) {
if err != nil {
return "", fmt.Errorf("SendMedia form error %v", err)
}
resp, err := http.Post(t.getURL("sendPhoto"), ct, data)
resp, err := http.Post(t.getURL(method), ct, data)
if err != nil {
return "", fmt.Errorf("SendMedia post error %v", err)
}
Expand Down Expand Up @@ -77,6 +86,9 @@ func (t *Telegram) Delete(meta string) error {
}
errCode := gjson.Get(json, "error_code")
errDesc := gjson.Get(json, "description")
if strings.Contains(errDesc.String(), "message to delete not found") {
return nil
}
return fmt.Errorf("DeleteMessage error %d %s", errCode.Int(), errDesc.String())
}

Expand Down

0 comments on commit 70d076b

Please sign in to comment.