diff --git a/telegram/cache.go b/telegram/cache.go index 1eef9eeb..b1dca949 100644 --- a/telegram/cache.go +++ b/telegram/cache.go @@ -184,16 +184,19 @@ func (c *CACHE) ReadFile() { break } c.logger.Error("error reading from cache file: ", err) + c.Unlock() return } var id, accessHash int64 if err := binary.Read(file, binary.BigEndian, &id); err != nil { c.logger.Error("cache file corrupted: ", err) + c.Unlock() return } if err := binary.Read(file, binary.BigEndian, &accessHash); err != nil { c.logger.Error("cache file corrupted: ", err) + c.Unlock() return } diff --git a/telegram/channels.go b/telegram/channels.go index 12d6ea95..221e9ba4 100755 --- a/telegram/channels.go +++ b/telegram/channels.go @@ -4,6 +4,8 @@ package telegram import ( "math" + "reflect" + "time" "github.com/pkg/errors" ) @@ -196,10 +198,11 @@ func (c *Client) GetChatMember(chatID, userID any) (*Participant, error) { } type ParticipantOptions struct { - Query string `json:"query,omitempty"` - Filter ChannelParticipantsFilter `json:"filter,omitempty"` - Offset int32 `json:"offset,omitempty"` - Limit int32 `json:"limit,omitempty"` + Query string `json:"query,omitempty"` + Filter ChannelParticipantsFilter `json:"filter,omitempty"` + Offset int32 `json:"offset,omitempty"` + Limit int32 `json:"limit,omitempty"` + SleepThresholdMs int32 `json:"sleep_threshold_ms,omitempty"` } // GetChatMembers returns the members of a chat @@ -221,61 +224,236 @@ func (c *Client) GetChatMembers(chatID any, Opts ...*ParticipantOptions) ([]*Par opts := getVariadic(Opts, &ParticipantOptions{Filter: &ChannelParticipantsSearch{}, Limit: 1}) if opts.Query != "" { opts.Filter = &ChannelParticipantsSearch{Q: opts.Query} + } else if opts.Filter == nil { + opts.Filter = &ChannelParticipantsSearch{} } - participants, err := c.ChannelsGetParticipants(&InputChannelObj{ChannelID: chat.ChannelID, AccessHash: chat.AccessHash}, opts.Filter, opts.Offset, opts.Limit, 0) + + var fetched int32 = 0 + var participantsList []*Participant + var reqLimit, reqOffset int32 = 200, opts.Offset + var totalCount int32 + + for { + remaning := opts.Limit - int32(fetched) + reqLimit = min(remaning, 200) + + participants, err := c.ChannelsGetParticipants(&InputChannelObj{ChannelID: chat.ChannelID, AccessHash: chat.AccessHash}, opts.Filter, reqOffset, reqLimit, 0) + if err != nil { + return nil, 0, err + } + cParts, ok := participants.(*ChannelsChannelParticipantsObj) + if opts.Limit == -1 { + opts.Limit = cParts.Count + continue + } + + if !ok { + return nil, 0, errors.New("could not get participants") + } + c.Cache.UpdatePeersToCache(cParts.Users, cParts.Chats) + var ( + status string = Member + rights *ChatAdminRights = &ChatAdminRights{} + rank string = "" + UserID int64 = 0 + ) + for _, p := range cParts.Participants { + switch p := p.(type) { + case *ChannelParticipantCreator: + status = Creator + rights = p.AdminRights + rank = p.Rank + UserID = p.UserID + case *ChannelParticipantAdmin: + status = Admin + rights = p.AdminRights + rank = p.Rank + UserID = p.UserID + case *ChannelParticipantObj: + status = Member + UserID = p.UserID + case *ChannelParticipantSelf: + status = Member + UserID = p.UserID + case *ChannelParticipantBanned: + status = Restricted + UserID = c.GetPeerID(p.Peer) + case *ChannelParticipantLeft: + status = Left + UserID = c.GetPeerID(p.Peer) + } + partUser, err := c.GetUser(UserID) + if err != nil { + return nil, 0, err + } + participantsList = append(participantsList, &Participant{ + User: partUser, + Participant: p, + Status: status, + Rights: rights, + Rank: rank, + }) + + fetched++ + } + + if fetched >= opts.Limit || len(cParts.Participants) == 0 { + break + } + + reqOffset = fetched + totalCount = cParts.Count + + time.Sleep(time.Duration(opts.SleepThresholdMs) * time.Millisecond) + } + return participantsList, totalCount, nil +} + +func (c *Client) IterChatMembers(chatID any, Opts ...*ParticipantOptions) (<-chan *Participant, <-chan error) { + ch := make(chan *Participant) + errCh := make(chan error) + + var peerToAct, err = c.ResolvePeer(chatID) if err != nil { - return nil, 0, err + errCh <- err + close(ch) + return ch, errCh } - cParts, ok := participants.(*ChannelsChannelParticipantsObj) + + var chat, ok = peerToAct.(*InputPeerChannel) if !ok { - return nil, 0, errors.New("could not get participants") + errCh <- errors.New("peer is not a channel") + close(ch) + return ch, errCh } - c.Cache.UpdatePeersToCache(cParts.Users, cParts.Chats) - var ( - status string = Member - rights *ChatAdminRights = &ChatAdminRights{} - rank string = "" - UserID int64 = 0 - ) - participantsList := make([]*Participant, 0) - for _, p := range cParts.Participants { - switch p := p.(type) { - case *ChannelParticipantCreator: - status = Creator - rights = p.AdminRights - rank = p.Rank - UserID = p.UserID - case *ChannelParticipantAdmin: - status = Admin - rights = p.AdminRights - rank = p.Rank - UserID = p.UserID - case *ChannelParticipantObj: - status = Member - UserID = p.UserID - case *ChannelParticipantSelf: - status = Member - UserID = p.UserID - case *ChannelParticipantBanned: - status = Restricted - UserID = c.GetPeerID(p.Peer) - case *ChannelParticipantLeft: - status = Left - UserID = c.GetPeerID(p.Peer) - } - partUser, err := c.GetUser(UserID) - if err != nil { - return nil, 0, err - } - participantsList = append(participantsList, &Participant{ - User: partUser, - Participant: p, - Status: status, - Rights: rights, - Rank: rank, + + go func() { + defer close(ch) + defer close(errCh) + + var opts = getVariadic(Opts, &ParticipantOptions{ + Limit: 1, + SleepThresholdMs: 20, + Filter: &ChannelParticipantsSearch{}, }) - } - return participantsList, cParts.Count, nil + + if opts.Query != "" { + opts.Filter = &ChannelParticipantsSearch{Q: opts.Query} + } else if opts.Filter == nil { + opts.Filter = &ChannelParticipantsSearch{} + } + + var fetched int32 = 0 + req := &ChannelsGetParticipantsParams{ + Channel: &InputChannelObj{ChannelID: chat.ChannelID, AccessHash: chat.AccessHash}, + Filter: opts.Filter, + Offset: opts.Offset, + Limit: 200, + Hash: 0, + } + + for { + if opts.Limit == -1 { + req.Limit = 0 + resp, err := c.MakeRequest(req) + if err != nil { + errCh <- err + return + } + + switch resp := resp.(type) { + case *ChannelsChannelParticipantsObj: + if resp.Count == 0 { + return + } + opts.Limit = resp.Count + case *ChannelsChannelParticipantsNotModified: + default: + } + + continue + } + + remaining := opts.Limit - int32(fetched) + perReqLimit := int32(200) + if remaining < perReqLimit { + perReqLimit = remaining + } + req.Limit = perReqLimit + + resp, err := c.MakeRequest(req) + if err != nil { + if handleIfFlood(err, c) { + continue + } + errCh <- err + return + } + + switch resp := resp.(type) { + case *ChannelsChannelParticipantsObj: + c.Cache.UpdatePeersToCache(resp.Users, resp.Chats) + for _, participant := range resp.Participants { + var ( + status string = Member + rights *ChatAdminRights = &ChatAdminRights{} + rank string = "" + UserID int64 = 0 + ) + switch p := participant.(type) { + case *ChannelParticipantCreator: + status = Creator + rights = p.AdminRights + rank = p.Rank + UserID = p.UserID + case *ChannelParticipantAdmin: + status = Admin + rights = p.AdminRights + rank = p.Rank + UserID = p.UserID + case *ChannelParticipantObj: + status = Member + UserID = p.UserID + case *ChannelParticipantSelf: + status = Member + UserID = p.UserID + case *ChannelParticipantBanned: + status = Restricted + UserID = c.GetPeerID(p.Peer) + case *ChannelParticipantLeft: + status = Left + UserID = c.GetPeerID(p.Peer) + } + partUser, err := c.GetUser(UserID) + if err != nil { + errCh <- err + return + } + ch <- &Participant{ + User: partUser, + Participant: participant, + Status: status, + Rights: rights, + Rank: rank, + } + + fetched++ + } + if len(resp.Participants) < int(perReqLimit) || fetched >= opts.Limit && opts.Limit > 0 { + return + } + + req.Offset = fetched + default: + errCh <- errors.New("unexpected response: " + reflect.TypeOf(resp).String()) + return + } + + time.Sleep(time.Duration(opts.SleepThresholdMs) * time.Millisecond) + } + }() + + return ch, errCh } type AdminOptions struct { diff --git a/telegram/const.go b/telegram/const.go index 76adad11..5366acc5 100644 --- a/telegram/const.go +++ b/telegram/const.go @@ -8,7 +8,7 @@ import ( const ( ApiVersion = 195 - Version = "v1.4.8" + Version = "v1.5.0" LogDebug = utils.DebugLevel LogInfo = utils.InfoLevel diff --git a/telegram/messages.go b/telegram/messages.go index 52b3d08c..6f1346cd 100644 --- a/telegram/messages.go +++ b/telegram/messages.go @@ -1017,15 +1017,13 @@ func (c *Client) GetMessages(PeerID any, Opts ...*SearchOption) ([]NewMessage, e return messages, nil } -func (c *Client) IterMessages(PeerID any, Opts ...*SearchOption) (<-chan NewMessage, <-chan bool, <-chan error) { +func (c *Client) IterMessages(PeerID any, Opts ...*SearchOption) (<-chan NewMessage, <-chan error) { ch := make(chan NewMessage) - doneCh := make(chan bool) errCh := make(chan error) go func() { defer close(ch) defer close(errCh) - defer close(doneCh) opt := getVariadic(Opts, &SearchOption{ Filter: &InputMessagesFilterEmpty{}, @@ -1096,20 +1094,17 @@ func (c *Client) IterMessages(PeerID any, Opts ...*SearchOption) (<-chan NewMess case *MessagesChannelMessages: c.Cache.UpdatePeersToCache(result.Users, result.Chats) for _, msg := range result.Messages { - select { - case ch <- *packMessage(c, msg): - case <-doneCh: - return - } + ch <- *packMessage(c, msg) } case *MessagesMessagesObj: c.Cache.UpdatePeersToCache(result.Users, result.Chats) for _, msg := range result.Messages { - select { - case ch <- *packMessage(c, msg): - case <-doneCh: - return - } + ch <- *packMessage(c, msg) + } + case *MessagesMessagesSlice: + c.Cache.UpdatePeersToCache(result.Users, result.Chats) + for _, msg := range result.Messages { + ch <- *packMessage(c, msg) } } @@ -1176,6 +1171,10 @@ func (c *Client) IterMessages(PeerID any, Opts ...*SearchOption) (<-chan NewMess } } + for _, msg := range messages { + ch <- msg + } + if (len(messages) >= int(opt.Limit) || len(messages) == 0) && opt.Limit > 0 { break } @@ -1183,21 +1182,12 @@ func (c *Client) IterMessages(PeerID any, Opts ...*SearchOption) (<-chan NewMess params.OffsetID = messages[len(messages)-1].ID params.MaxDate = messages[len(messages)-1].Date() - for _, msg := range messages { - select { - case ch <- msg: - case <-doneCh: - return - } - } - time.Sleep(time.Duration(opt.SleepThresholdMs) * time.Millisecond) } } - doneCh <- true }() - return ch, doneCh, errCh + return ch, errCh } func (c *Client) GetMessageByID(PeerID any, MsgID int32) (*NewMessage, error) { @@ -1305,15 +1295,13 @@ func (c *Client) GetHistory(PeerID any, opts ...*HistoryOption) ([]NewMessage, e } } -func (c *Client) IterHistory(PeerID any, opts ...*HistoryOption) (<-chan NewMessage, <-chan bool, <-chan error) { +func (c *Client) IterHistory(PeerID any, opts ...*HistoryOption) (<-chan NewMessage, <-chan error) { ch := make(chan NewMessage) - doneCh := make(chan bool) errCh := make(chan error) go func() { defer close(ch) defer close(errCh) - defer close(doneCh) var opt = getVariadic(opts, &HistoryOption{ Limit: 1, @@ -1361,8 +1349,11 @@ func (c *Client) IterHistory(PeerID any, opts ...*HistoryOption) (<-chan NewMess messages = append(messages, *packMessage(c, msg)) } fetched += len(resp.Messages) + + for _, msg := range messages { + ch <- msg + } if len(resp.Messages) < int(perReqLimit) || fetched >= int(opt.Limit) && opt.Limit > 0 { - doneCh <- true return } @@ -1375,8 +1366,11 @@ func (c *Client) IterHistory(PeerID any, opts ...*HistoryOption) (<-chan NewMess messages = append(messages, *packMessage(c, msg)) } fetched += len(resp.Messages) + + for _, msg := range messages { + ch <- msg + } if len(resp.Messages) < int(perReqLimit) || fetched >= int(opt.Limit) && opt.Limit > 0 { - doneCh <- true return } @@ -1388,8 +1382,11 @@ func (c *Client) IterHistory(PeerID any, opts ...*HistoryOption) (<-chan NewMess messages = append(messages, *packMessage(c, msg)) } fetched += len(resp.Messages) + + for _, msg := range messages { + ch <- msg + } if len(resp.Messages) < int(perReqLimit) || fetched >= int(opt.Limit) && opt.Limit > 0 { - doneCh <- true return } @@ -1400,19 +1397,11 @@ func (c *Client) IterHistory(PeerID any, opts ...*HistoryOption) (<-chan NewMess return } - for _, msg := range messages { - select { - case ch <- msg: - case <-doneCh: - return - } - } - time.Sleep(time.Duration(opt.SleepThresholdMs) * time.Millisecond) } }() - return ch, doneCh, errCh + return ch, errCh } type PinOptions struct { diff --git a/telegram/users.go b/telegram/users.go index 496cd503..e1b6a93c 100644 --- a/telegram/users.go +++ b/telegram/users.go @@ -247,7 +247,7 @@ func (c *Client) GetDialogs(Opts ...*DialogOptions) ([]Dialog, error) { } } -func (c *Client) IterDialogs(Opts ...*DialogOptions) (<-chan Dialog, <-chan bool, <-chan error) { +func (c *Client) IterDialogs(Opts ...*DialogOptions) (<-chan Dialog, <-chan error) { options := getVariadic(Opts, &DialogOptions{ Limit: 1, OffsetPeer: &InputPeerEmpty{}, @@ -272,12 +272,10 @@ func (c *Client) IterDialogs(Opts ...*DialogOptions) (<-chan Dialog, <-chan bool } dialogs := make(chan Dialog) - done := make(chan bool) errs := make(chan error) go func() { defer close(dialogs) - defer close(done) defer close(errs) var fetched int @@ -319,7 +317,6 @@ func (c *Client) IterDialogs(Opts ...*DialogOptions) (<-chan Dialog, <-chan bool fetched += len(p.Dialogs) if len(p.Dialogs) < int(perReqLimit) || fetched >= int(options.Limit) && options.Limit > 0 { - done <- true return } @@ -342,12 +339,10 @@ func (c *Client) IterDialogs(Opts ...*DialogOptions) (<-chan Dialog, <-chan bool fetched += len(p.Dialogs) if len(p.Dialogs) < int(perReqLimit) || fetched >= int(options.Limit) && options.Limit > 0 { - done <- true return } case *MessagesDialogsNotModified: - done <- true return default: @@ -359,7 +354,7 @@ func (c *Client) IterDialogs(Opts ...*DialogOptions) (<-chan Dialog, <-chan bool } }() - return dialogs, done, errs + return dialogs, errs } // GetCommonChats returns the common chats of a user