-
Notifications
You must be signed in to change notification settings - Fork 85
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
remove typing dependency; add MailMessage.cc, MailMessage.bcc attrs; specify custom classes email_message_class directly; MailBox._uid_str change type check logic; Change MailMessage attr return types: lists -> tuples; MailBox.fetch add mark_seen param; fix MailMessage.from_ bug when empty;
- Loading branch information
Showing
12 changed files
with
633 additions
and
541 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1,4 @@ | ||
from .main import * | ||
from .mailbox import * | ||
from .message import * | ||
from .folder import * | ||
from .utils import * |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,130 @@ | ||
import re | ||
|
||
from . import imap_utf7 | ||
from .utils import ImapToolsError | ||
|
||
|
||
class MailBoxFolderWrongStatusError(ImapToolsError): | ||
"""Wrong folder name error""" | ||
|
||
|
||
class MailBoxFolderManager: | ||
"""Operations with mail box folders""" | ||
|
||
folder_status_options = ['MESSAGES', 'RECENT', 'UIDNEXT', 'UIDVALIDITY', 'UNSEEN'] | ||
|
||
def __init__(self, mailbox): | ||
self.mailbox = mailbox | ||
self._current_folder = None | ||
|
||
def _normalise_folder(self, folder): | ||
"""Normalise folder name""" | ||
if isinstance(folder, bytes): | ||
folder = folder.decode('ascii') | ||
return self._quote(imap_utf7.encode(folder)) | ||
|
||
@staticmethod | ||
def _quote(arg): | ||
if isinstance(arg, str): | ||
return '"' + arg.replace('\\', '\\\\').replace('"', '\\"') + '"' | ||
else: | ||
return b'"' + arg.replace(b'\\', b'\\\\').replace(b'"', b'\\"') + b'"' | ||
|
||
@staticmethod | ||
def _pairs_to_dict(items: list) -> dict: | ||
"""Example: ['MESSAGES', '3', 'UIDNEXT', '4'] -> {'MESSAGES': '3', 'UIDNEXT': '4'}""" | ||
if len(items) % 2 != 0: | ||
raise ValueError('An even-length array is expected') | ||
return dict((items[i * 2], items[i * 2 + 1]) for i in range(len(items) // 2)) | ||
|
||
def set(self, folder): | ||
"""Select current folder""" | ||
result = self.mailbox.box.select(folder) | ||
self.mailbox.check_status('box.select', result) | ||
self._current_folder = folder | ||
return result | ||
|
||
def exists(self, folder: str) -> bool: | ||
"""Checks whether a folder exists on the server.""" | ||
return len(self.list('', folder)) > 0 | ||
|
||
def create(self, folder: str): | ||
""" | ||
Create folder on the server. D | ||
*Use email box delimitor to separate folders. Example for "|" delimitor: "folder|sub folder" | ||
""" | ||
result = self.mailbox.box._simple_command('CREATE', self._normalise_folder(folder)) | ||
self.mailbox.check_status('CREATE', result) | ||
return result | ||
|
||
def get(self): | ||
"""Get current folder""" | ||
return self._current_folder | ||
|
||
def rename(self, old_name: str, new_name: str): | ||
"""Renemae folder from old_name to new_name""" | ||
result = self.mailbox.box._simple_command( | ||
'RENAME', self._normalise_folder(old_name), self._normalise_folder(new_name)) | ||
self.mailbox.check_status('RENAME', result) | ||
return result | ||
|
||
def delete(self, folder: str): | ||
"""Delete folder""" | ||
result = self.mailbox.box._simple_command('DELETE', self._normalise_folder(folder)) | ||
self.mailbox.check_status('DELETE', result) | ||
return result | ||
|
||
def status(self, folder: str, options: [str] or None = None) -> dict: | ||
""" | ||
Get the status of a folder | ||
:param folder: mailbox folder | ||
:param options: [str] with values from MailBoxFolderManager.folder_status_options or None, | ||
by default - get all options | ||
MESSAGES - The number of messages in the mailbox. | ||
RECENT - The number of messages with the \Recent flag set. | ||
UIDNEXT - The next unique identifier value of the mailbox. | ||
UIDVALIDITY - The unique identifier validity value of the mailbox. | ||
UNSEEN - The number of messages which do not have the \Seen flag set. | ||
:return: dict with available options keys | ||
""" | ||
command = 'STATUS' | ||
if not options: | ||
options = self.folder_status_options | ||
if not all([i in self.folder_status_options for i in options]): | ||
raise MailBoxFolderWrongStatusError(str(options)) | ||
status_result = self.mailbox.box._simple_command( | ||
command, self._normalise_folder(folder), '({})'.format(' '.join(options))) | ||
self.mailbox.check_status(command, status_result) | ||
result = self.mailbox.box._untagged_response(status_result[0], status_result[1], command) | ||
self.mailbox.check_status(command, result) | ||
values = result[1][0].decode().split('(')[1].split(')')[0].split(' ') | ||
return self._pairs_to_dict(values) | ||
|
||
def list(self, folder: str = '', search_args: str = '*', subscribed_only: bool = False) -> list: | ||
""" | ||
Get a listing of folders on the server | ||
:param folder: mailbox folder, if empty list shows all content from root | ||
:param search_args: search argumets, is case-sensitive mailbox name with possible wildcards | ||
* is a wildcard, and matches zero or more characters at this position | ||
% is similar to * but it does not match a hierarchy delimiter | ||
:param subscribed_only: bool - get only subscribed folders | ||
:return: [dict( | ||
flags: str - folder flags, | ||
delim: str - delimitor, | ||
name: str - folder name, | ||
)] | ||
""" | ||
folder_item_re = re.compile(r'\((?P<flags>[\S ]*)\) "(?P<delim>[\S ]+)" (?P<name>.+)') | ||
command = 'LSUB' if subscribed_only else 'LIST' | ||
typ, data = self.mailbox.box._simple_command(command, self._normalise_folder(folder), search_args) | ||
typ, data = self.mailbox.box._untagged_response(typ, data, command) | ||
result = list() | ||
for folder_item in data: | ||
if not folder_item: | ||
continue | ||
folder_match = re.search(folder_item_re, imap_utf7.decode(folder_item)) | ||
folder = folder_match.groupdict() | ||
if folder['name'].startswith('"') and folder['name'].endswith('"'): | ||
folder['name'] = folder['name'][1:len(folder['name']) - 1] | ||
result.append(folder) | ||
return result |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,193 @@ | ||
import imaplib | ||
import inspect | ||
|
||
from .message import MailMessage | ||
from .folder import MailBoxFolderManager | ||
from .utils import ImapToolsError | ||
|
||
# Maximal line length when calling readline(). This is to prevent reading arbitrary length lines. | ||
imaplib._MAXLINE = 4 * 1024 * 1024 # 4Mb | ||
|
||
|
||
class MailBoxWrongFlagError(ImapToolsError): | ||
"""Wrong flag for "flag" method""" | ||
|
||
|
||
class MailBoxUidParamError(ImapToolsError): | ||
"""Wrong uid param""" | ||
|
||
|
||
class MailBoxUnexpectedStatusError(ImapToolsError): | ||
"""Unexpected status in response""" | ||
|
||
|
||
class StandardMessageFlags: | ||
"""Standard email message flags""" | ||
SEEN = 'SEEN' | ||
ANSWERED = 'ANSWERED' | ||
FLAGGED = 'FLAGGED' | ||
DELETED = 'DELETED' | ||
DRAFT = 'DRAFT' | ||
RECENT = 'RECENT' | ||
all = ( | ||
SEEN, ANSWERED, FLAGGED, DELETED, DRAFT, RECENT | ||
) | ||
|
||
|
||
class MailBox: | ||
"""Working with the email box through IMAP4""" | ||
|
||
email_message_class = MailMessage | ||
folder_manager_class = MailBoxFolderManager | ||
|
||
def __init__(self, host='', port=None, ssl=True, keyfile=None, certfile=None, ssl_context=None): | ||
""" | ||
:param host: host's name (default: localhost) | ||
:param port: port number (default: standard IMAP4 SSL port) | ||
:param ssl: use client class over SSL connection (IMAP4_SSL) if True, else use IMAP4 | ||
:param keyfile: PEM formatted file that contains your private key (default: None) | ||
:param certfile: PEM formatted certificate chain file (default: None) | ||
:param ssl_context: SSLContext object that contains your certificate chain and private key (default: None) | ||
Note: if ssl_context is provided, then parameters keyfile or | ||
certfile should not be set otherwise ValueError is raised. | ||
""" | ||
self._host = host | ||
self._port = port | ||
self._keyfile = keyfile | ||
self._certfile = certfile | ||
self._ssl_context = ssl_context | ||
if ssl: | ||
self.box = imaplib.IMAP4_SSL( | ||
host, port or imaplib.IMAP4_SSL_PORT, keyfile, certfile, ssl_context) | ||
else: | ||
self.box = imaplib.IMAP4(host, port or imaplib.IMAP4_PORT) | ||
self._username = None | ||
self._password = None | ||
self._initial_folder = None | ||
self.folder = None | ||
|
||
@staticmethod | ||
def check_status(command, command_result, expected='OK'): | ||
""" | ||
Check that command responses status equals <expected> status | ||
If not, raises MailBoxUnexpectedStatusError | ||
""" | ||
typ, data = command_result[0], command_result[1] | ||
if typ != expected: | ||
raise MailBoxUnexpectedStatusError( | ||
'Response status for command "{command}" == "{typ}", "{exp}" expected, data: {data}'.format( | ||
command=command, typ=typ, data=str(data), exp=expected)) | ||
|
||
def login(self, username: str, password: str, initial_folder: str = 'INBOX'): | ||
self._username = username | ||
self._password = password | ||
self._initial_folder = initial_folder | ||
result = self.box.login(self._username, self._password) | ||
self.check_status('box.login', result) | ||
self.folder = self.folder_manager_class(self) | ||
self.folder.set(self._initial_folder) | ||
return result | ||
|
||
def logout(self): | ||
result = self.box.logout() | ||
self.check_status('box.logout', result, expected='BYE') | ||
return result | ||
|
||
def fetch(self, search_criteria: str = 'ALL', limit: int = None, | ||
miss_defect=True, miss_no_uid=True, mark_seen=True) -> iter: | ||
""" | ||
Mail message generator in current folder by search criteria | ||
:param search_criteria: Message search criteria (see examples at ./doc/imap_search_criteria.txt) | ||
:param limit: limit on the number of read emails | ||
:param miss_defect: miss defect emails | ||
:param miss_no_uid: miss emails witout uid | ||
:param mark_seen: mark emails as seen on fetch | ||
:return generator: MailMessage | ||
""" | ||
search_result = self.box.search(None, search_criteria) | ||
self.check_status('box.search', search_result) | ||
# first element is string with email numbers through the gap | ||
for i, message_id in enumerate(search_result[1][0].decode().split(' ') if search_result[1][0] else ()): | ||
if limit and i >= limit: | ||
break | ||
# get message by id | ||
fetch_result = self.box.fetch(message_id, "(BODY[] UID FLAGS)" if mark_seen else "(BODY.PEEK[] UID FLAGS)") | ||
self.check_status('box.fetch', fetch_result) | ||
mail_message = self.email_message_class(message_id, fetch_result[1]) | ||
if miss_defect and mail_message.obj.defects: | ||
continue | ||
if miss_no_uid and not mail_message.uid: | ||
continue | ||
yield mail_message | ||
|
||
@staticmethod | ||
def _uid_str(uid_list: str or [str] or iter) -> str: | ||
""" | ||
Prepare list of uid for use in commands: delete/copy/move/seen | ||
uid_list can be: str, list, tuple, set, fetch generator | ||
""" | ||
if not uid_list: | ||
raise MailBoxUidParamError('uid_list should not be empty') | ||
if type(uid_list) is str: | ||
uid_list = uid_list.split(',') | ||
if inspect.isgenerator(uid_list): | ||
uid_list = tuple(msg.uid for msg in uid_list if msg.uid) | ||
for uid in iter(uid_list): | ||
if type(uid) is not str: | ||
raise MailBoxUidParamError('uid "{}" is not string'.format(str(uid))) | ||
if not uid.strip().isdigit(): | ||
raise MailBoxUidParamError('Wrong uid: {}'.format(uid)) | ||
return ','.join((i.strip() for i in uid_list)) | ||
|
||
def expunge(self) -> tuple: | ||
result = self.box.expunge() | ||
self.check_status('box.expunge', result) | ||
return result | ||
|
||
def delete(self, uid_list) -> tuple: | ||
"""Delete email messages""" | ||
uid_str = self._uid_str(uid_list) | ||
store_result = self.box.uid('STORE', uid_str, '+FLAGS', r'(\Deleted)') | ||
self.check_status('box.delete', store_result) | ||
expunge_result = self.expunge() | ||
return store_result, expunge_result | ||
|
||
def copy(self, uid_list, destination_folder: str) -> tuple or None: | ||
"""Copy email messages into the specified folder""" | ||
uid_str = self._uid_str(uid_list) | ||
copy_result = self.box.uid('COPY', uid_str, destination_folder) | ||
self.check_status('box.copy', copy_result) | ||
return copy_result | ||
|
||
def move(self, uid_list, destination_folder: str) -> tuple: | ||
"""Move email messages into the specified folder""" | ||
# here for avoid double fetch in _uid_str | ||
uid_str = self._uid_str(uid_list) | ||
copy_result = self.copy(uid_str, destination_folder) | ||
delete_result = self.delete(uid_str) | ||
return copy_result, delete_result | ||
|
||
def flag(self, uid_list, flag_set: [str] or str, value: bool) -> tuple: | ||
""" | ||
Set email flags | ||
Standard flags contains in StandardMessageFlags.all | ||
""" | ||
uid_str = self._uid_str(uid_list) | ||
if type(flag_set) is str: | ||
flag_set = [flag_set] | ||
for flag_name in flag_set: | ||
if flag_name.upper() not in StandardMessageFlags.all: | ||
raise MailBoxWrongFlagError('Unsupported flag: {}'.format(flag_name)) | ||
store_result = self.box.uid( | ||
'STORE', uid_str, ('+' if value else '-') + 'FLAGS', | ||
'({})'.format(' '.join(('\\' + i for i in flag_set)))) | ||
self.check_status('box.flag', store_result) | ||
expunge_result = self.expunge() | ||
return store_result, expunge_result | ||
|
||
def seen(self, uid_list, seen_val: bool) -> tuple: | ||
""" | ||
Mark email as read/unread | ||
This is shortcut for flag method | ||
""" | ||
return self.flag(uid_list, StandardMessageFlags.SEEN, seen_val) |
Oops, something went wrong.