diff --git a/.travis.yml b/.travis.yml
index 7dd0006..627541a 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -3,6 +3,7 @@ python:
- "3.5"
- "3.6"
- "3.7"
+ - "3.8"
install:
- pip install -r dev-requirements.txt
- pip install coveralls
diff --git a/README.rst b/README.rst
index 265dd37..b3efbb7 100644
--- a/README.rst
+++ b/README.rst
@@ -15,7 +15,7 @@ This library is inspired by imaplib_ and imaplib2_ from Piers Lauder, Nicolas Se
The aim is to port the imaplib with asyncio_, to benefit from the sleep or treat model.
-It runs with python 3.4 and 3.5.
+It runs with python 3.4, 3.5 and 3.8.
Example
-------
@@ -26,17 +26,17 @@ Example
from aioimaplib import aioimaplib
- @asyncio.coroutine
- def check_mailbox(host, user, password):
+
+ async def check_mailbox(host, user, password):
imap_client = aioimaplib.IMAP4_SSL(host=host)
- yield from imap_client.wait_hello_from_server()
+ await imap_client.wait_hello_from_server()
- yield from imap_client.login(user, password)
+ await imap_client.login(user, password)
- res, data = yield from imap_client.select()
+ res, data = await imap_client.select()
print('there is %s messages INBOX' % data[0])
- yield from imap_client.logout()
+ await imap_client.logout()
if __name__ == '__main__':
@@ -54,23 +54,22 @@ The RFC2177_ is implemented, to be able to wait for new mail messages without us
::
- @asyncio.coroutine
- def wait_for_new_message(host, user, password):
+ async def wait_for_new_message(host, user, password):
imap_client = aioimaplib.IMAP4_SSL(host=host)
- yield from imap_client.wait_hello_from_server()
+ await imap_client.wait_hello_from_server()
- yield from imap_client.login(user, password)
- yield from imap_client.select()
+ await imap_client.login(user, password)
+ await imap_client.select()
- idle = yield from imap_client.idle_start(timeout=10)
+ idle = await imap_client.idle_start(timeout=10)
while imap_client.has_pending_idle():
- msg = yield from imap_client.wait_server_push()
+ msg = await imap_client.wait_server_push()
print(msg)
if msg == STOP_WAIT_SERVER_PUSH:
imap_client.idle_done()
- yield from asyncio.wait_for(idle, 1)
+ await asyncio.wait_for(idle, 1)
- yield from imap_client.logout()
+ await imap_client.logout()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
@@ -80,22 +79,21 @@ Or in a more event based style (the IDLE command is closed at each message from
::
- @asyncio.coroutine
- def idle_loop(host, user, password):
+ async def idle_loop(host, user, password):
imap_client = aioimaplib.IMAP4_SSL(host=host, timeout=30)
- yield from imap_client.wait_hello_from_server()
+ await imap_client.wait_hello_from_server()
- yield from imap_client.login(user, password)
- yield from imap_client.select()
+ await imap_client.login(user, password)
+ await imap_client.select()
while True:
- print((yield from imap_client.uid('fetch', '1:*', 'FLAGS')))
+ print((await imap_client.uid('fetch', '1:*', 'FLAGS')))
- idle = yield from imap_client.idle_start(timeout=60)
- print((yield from imap_client.wait_server_push()))
+ idle = await imap_client.idle_start(timeout=60)
+ print((await imap_client.wait_server_push()))
imap_client.idle_done()
- yield from asyncio.wait_for(idle, 30)
+ await asyncio.wait_for(idle, 30)
Threading
---------
@@ -184,6 +182,7 @@ To add an imaplib or imaplib2 command you can :
Not unit tested
---------------
- PREAUTH
+- 'SORT' and 'THREAD' from the rfc5256_
TODO
----
@@ -200,10 +199,10 @@ TODO
- 'COMPRESS' from rfc4978_
- 'SETACL' 'DELETEACL' 'GETACL' 'MYRIGHTS' 'LISTRIGHTS' from ACL rfc4314_
- 'GETQUOTA': 'GETQUOTAROOT': 'SETQUOTA' from quota rfc2087_
-- 'SORT' and 'THREAD' from the rfc5256_
- 'ID' from the rfc2971_
- 'NAMESPACE' from rfc2342_
- 'CATENATE' from rfc4469_
+- make esearch and esort command concurrent, because their response contains tag
- tests with other servers
If it goes wrong
diff --git a/aioimaplib/aioimaplib.py b/aioimaplib/aioimaplib.py
index 75f03f2..f24286f 100644
--- a/aioimaplib/aioimaplib.py
+++ b/aioimaplib/aioimaplib.py
@@ -15,39 +15,48 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
import asyncio
+import functools
import logging
+import random
+import re
import ssl
-import sys
+import time
+from asyncio import set_event_loop
+from collections import namedtuple
from copy import copy
from datetime import datetime, timezone, timedelta
-import time
from enum import Enum
-import re
-
-import functools
-
-import random
-from collections import namedtuple
+try:
+ from asyncio import get_running_loop
+except ImportError:
+ def get_running_loop() -> asyncio.AbstractEventLoop:
+ loop = asyncio.get_event_loop()
+ if not loop.is_running():
+ raise RuntimeError("no running event loop")
+ return loop
# to avoid imap servers to kill the connection after 30mn idling
# cf https://www.imapwiki.org/ClientImplementation/Synchronization
TWENTY_NINE_MINUTES = 1740
STOP_WAIT_SERVER_PUSH = 'stop_wait_server_push'
-PY37_OR_LATER = sys.version_info[:2] >= (3, 7)
log = logging.getLogger(__name__)
-IMAP4_PORT = 143
-IMAP4_SSL_PORT = 993
-STARTED, CONNECTED, NONAUTH, AUTH, SELECTED, LOGOUT = 'STARTED', 'CONNECTED', 'NONAUTH', 'AUTH', 'SELECTED', 'LOGOUT'
-CRLF = b'\r\n'
+STARTED = 'STARTED'
+CONNECTED = 'CONNECTED'
+NONAUTH = 'NONAUTH'
+AUTH = 'AUTH'
+SELECTED = 'SELECTED'
+LOGOUT = 'LOGOUT'
ID_MAX_PAIRS_COUNT = 30
ID_MAX_FIELD_LEN = 30
ID_MAX_VALUE_LEN = 1024
+CRLF = b'\r\n'
+
AllowedVersions = ('IMAP4REV1', 'IMAP4')
Exec = Enum('Exec', 'is_sync is_async')
Cmd = namedtuple('Cmd', 'name valid_states exec')
@@ -67,6 +76,7 @@
'EXPUNGE': Cmd('EXPUNGE', (SELECTED,), Exec.is_async),
'FETCH': Cmd('FETCH', (SELECTED,), Exec.is_async),
'GETACL': Cmd('GETACL', (AUTH, SELECTED), Exec.is_async),
+ 'GETMETADATA': Cmd('GETMETADATA', (AUTH, SELECTED), Exec.is_async),
'GETQUOTA': Cmd('GETQUOTA', (AUTH, SELECTED), Exec.is_async),
'GETQUOTAROOT': Cmd('GETQUOTAROOT', (AUTH, SELECTED), Exec.is_async),
'ID': Cmd('ID', (NONAUTH, AUTH, LOGOUT, SELECTED), Exec.is_async),
@@ -83,6 +93,7 @@
'SEARCH': Cmd('SEARCH', (SELECTED,), Exec.is_async),
'SELECT': Cmd('SELECT', (AUTH, SELECTED), Exec.is_sync),
'SETACL': Cmd('SETACL', (AUTH, SELECTED), Exec.is_sync),
+ 'SETMETADATA': Cmd('SETMETADATA', (AUTH, SELECTED), Exec.is_async),
'SETQUOTA': Cmd('SETQUOTA', (AUTH, SELECTED), Exec.is_sync),
'SORT': Cmd('SORT', (SELECTED,), Exec.is_async),
'STARTTLS': Cmd('STARTTLS', (NONAUTH,), Exec.is_sync),
@@ -99,35 +110,23 @@
Response = namedtuple('Response', 'result lines')
-def get_running_loop() -> asyncio.AbstractEventLoop:
- if PY37_OR_LATER:
- return asyncio.get_running_loop()
-
- loop = asyncio.get_event_loop()
- if not loop.is_running():
- raise RuntimeError("no running event loop")
-
- return loop
-
+def quoted(s: str) -> str:
+ """Return quoted string as per RFC 3501, section 9."""
+ return '"' + s.replace('\\', '\\\\').replace('"', '\\"') + '"'
-def quoted(arg):
- """ Given a string, return a quoted string as per RFC 3501, section 9.
-
- Implementation copied from https://github.com/mjs/imapclient
- (imapclient/imapclient.py), 3-clause BSD license
- """
- if isinstance(arg, str):
- arg = arg.replace('\\', '\\\\')
- arg = arg.replace('"', '\\"')
- q = '"'
+def unquoted(s):
+ """ Given a string, return an unquoted string as per RFC 3501, section 9."""
+ if isinstance(s, str):
+ if (s[0], s[-1]) == ('"', '"'):
+ return s[1:-1].replace('\\"', '"').replace('\\\\', '\\')
+ return s
else:
- arg = arg.replace(b'\\', b'\\\\')
- arg = arg.replace(b'"', b'\\"')
- q = b'"'
- return q + arg + q
+ if (s[0], s[-1]) == (b'"', b'"'):
+ return s[1:-1].replace(b'\\"', '"').replace(b'\\\\', b'\\')
+ return s
-def arguments_rfs2971(**kwargs):
+def arguments_rfc2971(**kwargs):
if kwargs:
if len(kwargs) > ID_MAX_PAIRS_COUNT:
raise ValueError('Must not send more than 30 field-value pairs')
@@ -147,17 +146,22 @@ def arguments_rfs2971(**kwargs):
class Command(object):
- def __init__(self, name, tag, *args, prefix=None, untagged_resp_name=None, loop=None, timeout=None):
+ def __init__(self, name, tag, *args, by_uid=False, untagged_name=None, loop=None, timeout=None):
self.name = name
self.tag = tag
self.args = args
- self.prefix = prefix + ' ' if prefix else None
- self.untagged_resp_name = untagged_resp_name or name
+ self.by_uid = by_uid
+ if untagged_name is None:
+ self.untagged_names = (name,)
+ elif isinstance(untagged_name, str):
+ self.untagged_names = (untagged_name,)
+ else: # expecting iterable
+ self.untagged_names = untagged_name
self.response = None
self._exception = None
self._loop = loop if loop is not None else get_running_loop()
- self._event = asyncio.Event(loop=self._loop)
+ self._event = asyncio.Event()
self._timeout = timeout
self._timer = asyncio.Handle(lambda: None, None, self._loop) # fake timer
self._set_timer()
@@ -166,8 +170,12 @@ def __init__(self, name, tag, *args, prefix=None, untagged_resp_name=None, loop=
def __repr__(self):
return '{tag} {prefix}{name}{space}{args}'.format(
- tag=self.tag, prefix=self.prefix or '', name=self.name,
- space=' ' if self.args else '', args=' '.join(self.args))
+ tag=self.tag,
+ prefix='UID ' if self.by_uid else '',
+ name=self.name,
+ space=' ' if self.args else '',
+ args=' '.join(self.args),
+ )
# for tests
def __eq__(self, other):
@@ -180,7 +188,7 @@ def close(self, line, result):
def begin_literal_data(self, expected_size, literal_data=b''):
self._expected_size = expected_size
- self._literal_data = b''
+ self._literal_data = bytearray()
return self.append_literal_data(literal_data)
def wait_literal_data(self):
@@ -199,16 +207,16 @@ def append_literal_data(self, data):
return data[nb_bytes_to_add:]
def append_to_resp(self, line, result='Pending'):
- if self.response is None:
+ try:
+ self.response.lines.append(line)
+ if result != self.response.result:
+ self.response = Response(result, self.response.lines)
+ except AttributeError:
self.response = Response(result, [line])
- else:
- old = self.response
- self.response = Response(result, old.lines + [line])
self._reset_timer()
- @asyncio.coroutine
- def wait(self):
- yield from self._event.wait()
+ async def wait(self):
+ await self._event.wait()
if self._exception is not None:
raise self._exception
@@ -235,31 +243,26 @@ def _reset_timer(self):
class FetchCommand(Command):
FETCH_MESSAGE_DATA_RE = re.compile(r'[0-9]+ FETCH \(')
- def __init__(self, tag, *args, prefix=None, untagged_resp_name=None,
- loop=None, timeout=None):
- super().__init__('FETCH', tag, *args, prefix=prefix, untagged_resp_name=untagged_resp_name,
- loop=loop, timeout=timeout)
+ def __init__(self, tag, message_set, parts, modifiers=None, untagged_name=None, **kwargs):
+ if modifiers:
+ args = (message_set, parts, modifiers)
+ if 'VANISHED' in modifiers.upper():
+ untagged_name = ('FETCH', 'VANISHED')
+ else:
+ args = (message_set, parts)
+ super().__init__('FETCH', tag, *args, untagged_name=untagged_name, **kwargs)
def wait_data(self):
if self.response is None:
return False
- last_fetch_index = 0
- for index, line in enumerate(self.response.lines):
- if isinstance(line, str) and self.FETCH_MESSAGE_DATA_RE.match(line):
- last_fetch_index = index
- return not matched_parenthesis(''.join(filter(lambda l: isinstance(l, str),
- self.response.lines[last_fetch_index:])))
-
-
-def matched_parenthesis(string):
- return string.count('(') == string.count(')')
+ last_line = self.response.lines[-1]
+ return not isinstance(last_line, str) or \
+ not (last_line.endswith(')') or last_line.startswith('(EARLIER)'))
class IdleCommand(Command):
- def __init__(self, tag, queue, *args, prefix=None, untagged_resp_name=None,
- loop=None, timeout=None):
- super().__init__('IDLE', tag, *args, prefix=prefix, untagged_resp_name=untagged_resp_name,
- loop=loop, timeout=timeout)
+ def __init__(self, tag, queue, *args, **kwargs):
+ super().__init__('IDLE', tag, *args, **kwargs)
self.queue = queue
self.buffer = list()
@@ -281,18 +284,15 @@ def __init__(self, reason):
class Error(AioImapException):
- def __init__(self, reason):
- super().__init__(reason)
+ pass
class Abort(Error):
- def __init__(self, reason):
- super().__init__(reason)
+ pass
class CommandTimeout(AioImapException):
- def __init__(self, command):
- self.command = command
+ pass
class IncompleteRead(AioImapException):
@@ -303,10 +303,9 @@ def __init__(self, cmd, data=b''):
def change_state(coro):
@functools.wraps(coro)
- @asyncio.coroutine
- def wrapper(self, *args, **kargs):
- with (yield from self.state_condition):
- res = yield from coro(self, *args, **kargs)
+ async def wrapper(self, *args, **kargs):
+ async with self.state_condition:
+ res = await coro(self, *args, **kargs)
log.debug('state -> %s' % self.state)
self.state_condition.notify_all()
return res
@@ -317,13 +316,15 @@ def wrapper(self, *args, **kargs):
# cf https://tools.ietf.org/html/rfc3501#section-9
# untagged responses types
literal_data_re = re.compile(rb'.*\{(?P\d+)\}$')
-message_data_re = re.compile(r'[0-9]+ ((FETCH)|(EXPUNGE))')
-tagged_status_response_re = re.compile(r'[A-Z0-9]+ ((OK)|(NO)|(BAD))')
+message_data_re = re.compile(r'[0-9]+ (FETCH|EXPUNGE)')
+tagged_status_response_re = re.compile(r'[A-Z0-9]+ (OK|NO|BAD)')
+capability_re = re.compile(r'\[CAPABILITY ([^\]]+)\]')
class IMAP4ClientProtocol(asyncio.Protocol):
def __init__(self, loop, conn_lost_cb=None):
- self.loop = loop
+ self.loop = loop if loop is not None else get_running_loop()
+ set_event_loop(self.loop)
self.transport = None
self.state = STARTED
self.state_condition = asyncio.Condition()
@@ -347,7 +348,7 @@ def connection_made(self, transport):
def data_received(self, d):
log.debug('Received : %s' % d)
try:
- self._handle_responses(self.incomplete_line + d, self._handle_line, self.current_command)
+ self._handle_responses(self.incomplete_line + d, self.current_command)
self.incomplete_line = b''
self.current_command = None
except IncompleteRead as incomplete_read:
@@ -359,7 +360,7 @@ def connection_lost(self, exc):
if self.conn_lost_cb is not None:
self.conn_lost_cb(exc)
- def _handle_responses(self, data, line_handler, current_cmd=None):
+ def _handle_responses(self, data, current_cmd=None):
if not data:
if self.pending_sync_command is not None:
self.pending_sync_command.flush()
@@ -376,7 +377,7 @@ def _handle_responses(self, data, line_handler, current_cmd=None):
if not separator:
raise IncompleteRead(current_cmd, data)
- cmd = line_handler(line.decode(), current_cmd)
+ cmd = self._handle_line(line.decode(), current_cmd)
begin_literal = literal_data_re.match(line)
if begin_literal:
@@ -384,16 +385,15 @@ def _handle_responses(self, data, line_handler, current_cmd=None):
if cmd is None:
cmd = Command('NIL', 'unused')
cmd.begin_literal_data(size)
- self._handle_responses(tail, line_handler, current_cmd=cmd)
+ self._handle_responses(tail, current_cmd=cmd)
elif cmd is not None and cmd.wait_data():
- self._handle_responses(tail, line_handler, current_cmd=cmd)
+ self._handle_responses(tail, current_cmd=cmd)
else:
- self._handle_responses(tail, line_handler)
+ self._handle_responses(tail)
def _handle_line(self, line, current_cmd):
if not line:
return
-
if self.state == CONNECTED:
asyncio.ensure_future(self.welcome(line))
elif tagged_status_response_re.match(line):
@@ -413,71 +413,75 @@ def send(self, line):
log.debug('Sending : %s' % data)
self.transport.write(data)
- @asyncio.coroutine
- def execute(self, command):
- if self.state not in Commands.get(command.name).valid_states:
- raise Abort('command %s illegal in state %s' % (command.name, self.state))
-
+ async def execute(self, command):
if self.pending_sync_command is not None:
- yield from self.pending_sync_command.wait()
+ await self.pending_sync_command.wait()
- if Commands.get(command.name).exec == Exec.is_sync:
+ if Commands[command.name].exec == Exec.is_sync:
if self.pending_async_commands:
- yield from self.wait_async_pending_commands()
+ await self.wait_async_pending_commands()
self.pending_sync_command = command
else:
- if self.pending_async_commands.get(command.untagged_resp_name) is not None:
- yield from self.pending_async_commands[command.untagged_resp_name].wait()
- self.pending_async_commands[command.untagged_resp_name] = command
+ for untagged_name in command.untagged_names:
+ pending_same_name = self.pending_async_commands.get(untagged_name)
+ if pending_same_name is not None:
+ await pending_same_name.wait()
+ self.pending_async_commands[untagged_name] = command
self.send(str(command))
try:
- yield from command.wait()
+ await command.wait()
except CommandTimeout:
- if Commands.get(command.name).exec == Exec.is_sync:
+ if Commands[command.name].exec == Exec.is_sync:
self.pending_sync_command = None
else:
- self.pending_async_commands.pop(command.untagged_resp_name, None)
+ for untagged_name in command.untagged_names:
+ self.pending_async_commands.pop(untagged_name, None)
raise
return command.response
@change_state
- @asyncio.coroutine
- def welcome(self, command):
+ async def welcome(self, command):
if 'PREAUTH' in command:
self.state = AUTH
elif 'OK' in command:
self.state = NONAUTH
+ match = capability_re.search(command)
+ if match:
+ self.update_capabilities(match.group(1))
else:
raise Error(command)
- yield from self.capability()
+
+ if not self.capabilities:
+ await self.capability()
@change_state
- @asyncio.coroutine
- def login(self, user, password):
- response = yield from self.execute(
+ async def login(self, user, password):
+ if self.state not in Commands['LOGIN'].valid_states:
+ raise Error('command LOGIN illegal in state %s' % (self.state,))
+
+ response = await self.execute(
Command('LOGIN', self.new_tag(), user, '%s' % quoted(password), loop=self.loop))
if 'OK' == response.result:
self.state = AUTH
for line in response.lines:
- if 'CAPABILITY' in line:
- self.capabilities = self.capabilities.union(set(line.replace('CAPABILITY', '').strip().split()))
+ match = capability_re.search(line)
+ if match:
+ self.update_capabilities(match.group(1))
return response
@change_state
- @asyncio.coroutine
- def logout(self):
- response = (yield from self.execute(Command('LOGOUT', self.new_tag(), loop=self.loop)))
+ async def logout(self):
+ response = await self.execute(Command('LOGOUT', self.new_tag(), loop=self.loop))
if 'OK' == response.result:
self.state = LOGOUT
return response
@change_state
- @asyncio.coroutine
- def select(self, mailbox='INBOX'):
- response = yield from self.execute(
+ async def select(self, mailbox='INBOX'):
+ response = await self.execute(
Command('SELECT', self.new_tag(), mailbox, loop=self.loop))
if 'OK' == response.result:
@@ -485,18 +489,16 @@ def select(self, mailbox='INBOX'):
return response
@change_state
- @asyncio.coroutine
- def close(self):
- response = yield from self.execute(Command('CLOSE', self.new_tag(), loop=self.loop))
+ async def close(self):
+ response = await self.execute(Command('CLOSE', self.new_tag(), loop=self.loop))
if response.result == 'OK':
self.state = AUTH
return response
- @asyncio.coroutine
- def idle(self):
+ async def idle(self):
if 'IDLE' not in self.capabilities:
raise Abort('server has not IDLE capability')
- return (yield from self.execute(IdleCommand(self.new_tag(), self.idle_queue, loop=self.loop)))
+ return await self.execute(IdleCommand(self.new_tag(), self.idle_queue, loop=self.loop))
def has_pending_idle_command(self):
return self.pending_sync_command is not None and self.pending_sync_command.name == 'IDLE'
@@ -504,78 +506,114 @@ def has_pending_idle_command(self):
def idle_done(self):
self.send('DONE')
- @asyncio.coroutine
- def search(self, *criteria, charset='utf-8', by_uid=False):
- args = ('CHARSET', charset) + criteria if charset is not None else criteria
- prefix = 'UID' if by_uid else ''
-
- return (yield from self.execute(
- Command('SEARCH', self.new_tag(), *args, prefix=prefix, loop=self.loop)))
-
- @asyncio.coroutine
- def fetch(self, message_set, message_parts, by_uid=False, timeout=None):
- return (yield from self.execute(
- FetchCommand(self.new_tag(), message_set, message_parts,
- prefix='UID' if by_uid else '', loop=self.loop, timeout=timeout)))
-
- @asyncio.coroutine
- def store(self, *args, by_uid=False):
- return (yield from self.execute(
- Command('STORE', self.new_tag(), *args,
- prefix='UID' if by_uid else '', untagged_resp_name='FETCH', loop=self.loop)))
-
- @asyncio.coroutine
- def expunge(self, *args, by_uid=False):
- return (yield from self.execute(
- Command('EXPUNGE', self.new_tag(), *args,
- prefix='UID' if by_uid else '', loop=self.loop)))
-
- @asyncio.coroutine
- def uid(self, command, *criteria, timeout=None):
- if self.state not in Commands.get('UID').valid_states:
+ async def search(self, *criteria, charset='UTF-8', by_uid=False, ret=None, timeout=None):
+ # TODO: don't wait for concurrent esearch commands
+ # because esearch untagged response contains tag
+ if charset:
+ criteria = ('CHARSET', charset) + criteria
+ if ret:
+ if 'ESEARCH' not in self.capabilities:
+ raise Abort('server has not ESEARCH capability')
+ criteria = ('RETURN', '(%s)' % ret) + criteria
+ return await self.execute(
+ Command('SEARCH',
+ self.new_tag(),
+ *criteria,
+ untagged_name='ESEARCH' if ret else 'SEARCH',
+ by_uid=by_uid,
+ loop=self.loop,
+ timeout=timeout,
+ ))
+
+ async def thread(self, algorithm, *criteria, charset='UTF-8', by_uid=False, timeout=None):
+ if 'THREAD='+algorithm.upper() not in self.capabilities:
+ raise Abort('server has not THREAD=%s capability' % algorithm.upper())
+ return await self.execute(
+ Command('THREAD',
+ self.new_tag(),
+ algorithm,
+ charset,
+ *criteria,
+ by_uid=by_uid,
+ loop=self.loop,
+ timeout=timeout,
+ ))
+
+ async def sort(self, sort, search='ALL', charset='UTF-8', by_uid=False, ret=None, timeout=None):
+ # TODO: don't wait for concurrent esort commands
+ # because esort untagged response contains tag
+
+ args = ['(%s)' % sort, charset, search]
+ if ret:
+ if 'ESORT' not in self.capabilities:
+ raise Abort('server has not ESORT capability')
+ args.insert(0, 'RETURN (%s)' % ret)
+ return await self.execute(
+ Command('SORT',
+ self.new_tag(),
+ *args,
+ untagged_name='ESEARCH' if ret else 'SORT',
+ by_uid=by_uid,
+ loop=self.loop,
+ timeout=timeout,
+ ))
+
+ async def fetch(self, message_set, parts, modifiers=None, by_uid=False, timeout=None):
+ return await self.execute(
+ FetchCommand(self.new_tag(), message_set, parts, modifiers,
+ by_uid=by_uid, loop=self.loop, timeout=timeout))
+
+ async def store(self, *args, by_uid=False, timeout=None):
+ return await self.execute(
+ Command('STORE', self.new_tag(), *args, by_uid=by_uid,
+ untagged_name='FETCH', loop=self.loop, timeout=timeout))
+
+ async def expunge(self, *args, by_uid=False, timeout=None):
+ return await self.execute(
+ Command('EXPUNGE', self.new_tag(), *args, by_uid=by_uid,
+ loop=self.loop, timeout=timeout))
+
+ async def uid(self, command, *criteria, timeout=None):
+ if self.state not in Commands['UID'].valid_states:
raise Abort('command UID illegal in state %s' % self.state)
- if command.upper() == 'FETCH':
- return (yield from self.fetch(criteria[0], criteria[1], by_uid=True, timeout=timeout))
- if command.upper() == 'STORE':
- return (yield from self.store(*criteria, by_uid=True))
- if command.upper() == 'COPY':
- return (yield from self.copy(*criteria, by_uid=True))
- if command.upper() == 'MOVE':
- return (yield from self.move(*criteria, by_uid=True))
- if command.upper() == 'EXPUNGE':
+ command = command.lower()
+ if command == 'expunge':
if 'UIDPLUS' not in self.capabilities:
raise Abort('EXPUNGE with uids is only valid with UIDPLUS capability. UIDPLUS not in (%s)' % self.capabilities)
- return (yield from self.expunge(*criteria, by_uid=True))
- raise Abort('command UID only possible with COPY, FETCH, EXPUNGE (w/UIDPLUS) or STORE (was %s)' % command.upper())
-
- @asyncio.coroutine
- def copy(self, *args, by_uid=False):
- return (yield from self.execute(
- Command('COPY', self.new_tag(), *args, prefix='UID' if by_uid else '', loop=self.loop)))
-
- @asyncio.coroutine
- def move(self, uid_set, mailbox, by_uid=False):
+ elif command not in {'fetch', 'store', 'copy', 'move', 'search', 'sort'}:
+ raise Abort('command UID only possible with COPY, FETCH, MOVE,'
+ ' SEARCH, SORT, EXPUNGE (w/UIDPLUS) or STORE'
+ ' (was %s)' % (command.upper(),))
+ return await getattr(self, command)(*criteria, by_uid=True, timeout=timeout)
+
+ async def copy(self, *args, by_uid=False, timeout=None):
+ return await self.execute(
+ Command('COPY', self.new_tag(), *args, by_uid=by_uid,
+ loop=self.loop, timeout=timeout))
+
+ async def move(self, uid_set, mailbox, by_uid=False, timeout=None):
if 'MOVE' not in self.capabilities:
raise Abort('server has not MOVE capability')
- return (yield from self.execute(
- Command('MOVE', self.new_tag(), uid_set, mailbox, prefix='UID' if by_uid else '', loop=self.loop)))
+ return await self.execute(
+ Command('MOVE', self.new_tag(), uid_set, mailbox, by_uid=by_uid,
+ loop=self.loop, timeout=timeout))
- @asyncio.coroutine
- def capability(self):
- response = yield from self.execute(Command('CAPABILITY', self.new_tag(), loop=self.loop))
+ async def capability(self):
+ response = await self.execute(Command('CAPABILITY', self.new_tag(), loop=self.loop))
+ self.update_capabilities(response.lines[0])
- capability_list = response.lines[0].split()
- self.capabilities = set(capability_list)
- version = capability_list[0].upper()
- if version not in AllowedVersions:
- raise Error('server not IMAP4 compliant')
+ def update_capabilities(self, string):
+ self.capabilities = set(string.upper().strip().split())
+ for version in AllowedVersions:
+ if version in self.capabilities:
+ self.imap_version = version
+ break
else:
- self.imap_version = version
+ raise Error('server not IMAP4 compliant')
- @asyncio.coroutine
- def append(self, message_bytes, mailbox='INBOX', flags=None, date=None, timeout=None):
+ async def append(self, message_bytes, mailbox='INBOX', flags=None, date=None, timeout=None):
args = [mailbox]
if flags is not None:
if (flags[0], flags[-1]) != ('(', ')'):
@@ -586,40 +624,54 @@ def append(self, message_bytes, mailbox='INBOX', flags=None, date=None, timeout=
args.append(time2internaldate(date))
args.append('{%s}' % len(message_bytes))
self.literal_data = message_bytes
- return (yield from self.execute(Command('APPEND', self.new_tag(), *args, loop=self.loop, timeout=timeout)))
+ return await self.execute(Command('APPEND', self.new_tag(), *args, loop=self.loop, timeout=timeout))
- @asyncio.coroutine
- def id(self, **kwargs):
- args = arguments_rfs2971(**kwargs)
- return (yield from self.execute(Command('ID', self.new_tag(), *args, loop=self.loop)))
+ async def getmetadata(self, mailbox, metadata, options=None, timeout=None):
+ args = () if options is None else (options)
+ return await self.execute(Command('GETMETADATA', self.new_tag(), mailbox, metadata, *args,
+ untagged_name='METADATA', loop=self.loop, timeout=timeout))
+
+ async def setmetadata(self, mailbox, metadata, timeout=None):
+ return await self.execute(Command('SETMETADATA', self.new_tag(), mailbox, metadata, loop=self.loop, timeout=None))
+
+ async def id(self, **kwargs):
+ args = arguments_rfc2971(**kwargs)
+ return await self.execute(Command('ID', self.new_tag(), *args, loop=self.loop))
simple_commands = {'NOOP', 'CHECK', 'STATUS', 'CREATE', 'DELETE', 'RENAME',
'SUBSCRIBE', 'UNSUBSCRIBE', 'LSUB', 'LIST', 'EXAMINE', 'ENABLE'}
- @asyncio.coroutine
- def namespace(self):
+ async def namespace(self):
if 'NAMESPACE' not in self.capabilities:
raise Abort('server has not NAMESPACE capability')
- return (yield from self.execute(Command('NAMESPACE', self.new_tag(), loop=self.loop)))
-
- @asyncio.coroutine
- def simple_command(self, name, *args):
+ return (await self.execute(Command('NAMESPACE', self.new_tag(), loop=self.loop)))
+
+ async def list(self, reference_name='""', mailbox_pattern='*', ret=None, timeout=None):
+ args = [reference_name, mailbox_pattern]
+ untagged_name = None
+ if ret:
+ args.append('RETURN (%s)' % ret)
+ if 'STATUS' in ret.upper():
+ untagged_name = ('LIST', 'STATUS')
+ return await self.execute(
+ Command('LIST', self.new_tag(), *args,
+ untagged_name=untagged_name,
+ timeout=timeout))
+
+ async def simple_command(self, name, *args):
if name not in self.simple_commands:
raise NotImplementedError('simple command only available for %s' % self.simple_commands)
- return (yield from self.execute(Command(name, self.new_tag(), *args, loop=self.loop)))
+ return await self.execute(Command(name, self.new_tag(), *args, loop=self.loop))
- @asyncio.coroutine
- def wait_async_pending_commands(self):
- yield from asyncio.wait([asyncio.ensure_future(cmd.wait()) for cmd in self.pending_async_commands.values()])
+ async def wait_async_pending_commands(self):
+ await asyncio.wait([asyncio.ensure_future(cmd.wait()) for cmd in self.pending_async_commands.values()])
- @asyncio.coroutine
- def wait(self, state_regexp):
- state_re = re.compile(state_regexp)
- with (yield from self.state_condition):
- yield from self.state_condition.wait_for(lambda: state_re.match(self.state))
+ async def wait(self, states):
+ async with self.state_condition:
+ await self.state_condition.wait_for(lambda: self.state in states)
def _untagged_response(self, line):
- line = line.replace('* ', '')
+ line = line[2:] # remove '* '
if self.pending_sync_command is not None:
self.pending_sync_command.append_to_resp(line)
command = self.pending_sync_command
@@ -655,13 +707,15 @@ def _response_done(self, line):
cmds = self._find_pending_async_cmd_by_tag(tag)
if len(cmds) == 0:
raise Abort('unexpected tagged (%s) response: %s' % (tag, response))
- elif len(cmds) > 1:
+ elif len(cmds) > 1 and cmds[0] is not cmds[1]:
+ # LIST-STATUS and FETCH VANISHED is 2 times in pending_async_commands
raise Error('inconsistent state : two commands have the same tag (%s)' % cmds)
command = cmds.pop()
- self.pending_async_commands.pop(command.untagged_resp_name)
+ for untagged_name in command.untagged_names:
+ self.pending_async_commands.pop(untagged_name)
- response_result, _, response_text = response.partition(' ')
- command.close(response_text, result=response_result)
+ result, _, text = response.partition(' ')
+ command.close(text, result)
def _continuation(self, line):
if self.pending_sync_command is not None and self.pending_sync_command.name == 'APPEND':
@@ -689,13 +743,16 @@ def _find_pending_async_cmd_by_tag(self, tag):
class IMAP4(object):
TIMEOUT_SECONDS = 10
- def __init__(self, host='127.0.0.1', port=IMAP4_PORT, loop=None, timeout=TIMEOUT_SECONDS, conn_lost_cb=None, ssl_context=None):
- self.timeout = timeout
- self.port = port
+ def __init__(self, host='127.0.0.1', port=143, loop=None, timeout=TIMEOUT_SECONDS, conn_lost_cb=None, ssl_context=None):
self.host = host
+ self.port = port
+ self.loop = asyncio.get_running_loop() if loop is None else loop
+ self.timeout = timeout
+ self.conn_lost_cb = conn_lost_cb
+ self.ssl_context = ssl_context
self.protocol = None
self._idle_waiter = None
- self.create_client(host, port, loop, conn_lost_cb, ssl_context)
+ self.create_client(host, port, self.loop, conn_lost_cb, ssl_context)
def create_client(self, host, port, loop, conn_lost_cb=None, ssl_context=None):
local_loop = loop if loop is not None else get_running_loop()
@@ -703,155 +760,160 @@ def create_client(self, host, port, loop, conn_lost_cb=None, ssl_context=None):
local_loop.create_task(local_loop.create_connection(lambda: self.protocol, host, port, ssl=ssl_context))
def get_state(self):
- return self.protocol.state
+ return self.protocol and self.protocol.state
+
+ async def wait_hello_from_server(self):
+ await asyncio.wait_for(self.protocol.wait({AUTH, NONAUTH}), self.timeout)
+
+ async def login(self, user, password):
+ return await asyncio.wait_for(self.protocol.login(user, password), self.timeout)
+
+ async def logout(self):
+ if self.protocol is not None:
+ return await asyncio.wait_for(self.protocol.logout(), self.timeout)
+
+ async def select(self, mailbox='INBOX'):
+ return await asyncio.wait_for(self.protocol.select(mailbox), self.timeout)
+
+ async def search(self, *criteria, charset='UTF-8', ret=None):
+ return await self.protocol.search(*criteria, charset=charset, ret=ret, timeout=self.timeout)
+
+ async def uid_search(self, *criteria, charset='UTF-8', ret=None):
+ return await self.protocol.search(*criteria, by_uid=True, charset=charset, ret=ret, timeout=self.timeout)
+
+ async def thread(self, algorithm='REFERENCES', search='ALL', charset='UTF-8'):
+ return await self.protocol.thread(algorithm, search, charset=charset, timeout=self.timeout)
+
+ async def uid_thread(self, algorithm='REFERENCES', search='ALL', charset='UTF-8'):
+ return await self.protocol.thread(algorithm, search, charset=charset, by_uid=True, timeout=self.timeout)
- @asyncio.coroutine
- def wait_hello_from_server(self):
- yield from asyncio.wait_for(self.protocol.wait('AUTH|NONAUTH'), self.timeout)
+ async def sort(self, search, sort='ALL', charset='UTF-8', ret=None):
+ return await self.protocol.sort(search, sort, charset=charset, ret=ret, timeout=self.timeout)
- @asyncio.coroutine
- def login(self, user, password):
- return (yield from asyncio.wait_for(self.protocol.login(user, password), self.timeout))
+ async def uid_sort(self, search, sort='ALL', charset='UTF-8', ret=None):
+ return await self.protocol.sort(search, sort, by_uid=True, charset=charset, ret=ret, timeout=self.timeout)
- @asyncio.coroutine
- def logout(self):
- return (yield from asyncio.wait_for(self.protocol.logout(), self.timeout))
+ async def uid(self, command, *criteria):
+ return await self.protocol.uid(command, *criteria, timeout=self.timeout)
- @asyncio.coroutine
- def select(self, mailbox='INBOX'):
- return (yield from asyncio.wait_for(self.protocol.select(mailbox), self.timeout))
+ async def store(self, *criteria):
+ return await self.protocol.store(*criteria, timeout=self.timeout)
- @asyncio.coroutine
- def search(self, *criteria, charset='utf-8'):
- return (yield from asyncio.wait_for(self.protocol.search(*criteria, charset=charset), self.timeout))
+ async def uid_store(self, *criteria):
+ return await self.protocol.store(*criteria, by_uid=True, timeout=self.timeout)
- @asyncio.coroutine
- def uid_search(self, *criteria, charset='utf-8'):
- return (
- yield from asyncio.wait_for(self.protocol.search(*criteria, by_uid=True, charset=charset), self.timeout))
+ async def copy(self, *criteria):
+ return await self.protocol.copy(*criteria, timeout=self.timeout)
- @asyncio.coroutine
- def uid(self, command, *criteria):
- return (yield from self.protocol.uid(command, *criteria, timeout=self.timeout))
+ async def uid_copy(self, *criteria):
+ return await self.protocol.copy(*criteria, by_uid=True, timeout=self.timeout)
- @asyncio.coroutine
- def store(self, *criteria):
- return (yield from asyncio.wait_for(self.protocol.store(*criteria), self.timeout))
+ async def expunge(self):
+ return await self.protocol.expunge(timeout=self.timeout)
- @asyncio.coroutine
- def copy(self, *criteria):
- return (yield from asyncio.wait_for(self.protocol.copy(*criteria), self.timeout))
+ async def uid_expunge(self, *args):
+ return await self.protocol.expunge(*args, by_uid=True, timeout=self.timeout)
- @asyncio.coroutine
- def expunge(self):
- return (yield from asyncio.wait_for(self.protocol.expunge(), self.timeout))
+ async def fetch(self, message_set, message_parts, modifiers=None):
+ return await self.protocol.fetch(message_set, message_parts, modifiers,
+ timeout=self.timeout)
- @asyncio.coroutine
- def fetch(self, message_set, message_parts):
- return (yield from self.protocol.fetch(message_set, message_parts, timeout=self.timeout))
+ async def uid_fetch(self, message_set, message_parts, modifiers=None):
+ return await self.protocol.fetch(message_set, message_parts, modifiers,
+ by_uid=True, timeout=self.timeout)
- @asyncio.coroutine
- def idle(self):
- return (yield from self.protocol.idle())
+ async def idle(self):
+ return await self.protocol.idle()
def idle_done(self):
if self._idle_waiter is not None:
self._idle_waiter.cancel()
self.protocol.idle_done()
- @asyncio.coroutine
- def stop_wait_server_push(self):
+ async def stop_wait_server_push(self):
if self.protocol.has_pending_idle_command():
- yield from self.protocol.idle_queue.put(STOP_WAIT_SERVER_PUSH)
+ await self.protocol.idle_queue.put(STOP_WAIT_SERVER_PUSH)
return True
return False
- @asyncio.coroutine
- def wait_server_push(self, timeout=TWENTY_NINE_MINUTES):
- return (yield from asyncio.wait_for(self.protocol.idle_queue.get(), timeout=timeout))
+ async def wait_server_push(self, timeout=TWENTY_NINE_MINUTES):
+ return await asyncio.wait_for(self.protocol.idle_queue.get(), timeout=timeout)
- @asyncio.coroutine
- def idle_start(self, timeout=TWENTY_NINE_MINUTES):
+ async def idle_start(self, timeout=TWENTY_NINE_MINUTES):
if self._idle_waiter is not None:
self._idle_waiter.cancel()
idle = asyncio.ensure_future(self.idle())
self._idle_waiter = self.protocol.loop.call_later(timeout, lambda: asyncio.ensure_future(self.stop_wait_server_push()))
- yield from self.wait_server_push(self.timeout) # idling continuation
+ await self.wait_server_push(self.timeout) # idling continuation
return idle
def has_pending_idle(self):
return self.protocol.has_pending_idle_command()
- @asyncio.coroutine
- def id(self, **kwargs):
- return (yield from asyncio.wait_for(self.protocol.id(**kwargs), self.timeout))
+ async def id(self, **kwargs):
+ if self.protocol is None:
+ await self.create_client()
+ await self.wait_hello_from_server()
+ return await asyncio.wait_for(self.protocol.id(**kwargs), self.timeout)
- @asyncio.coroutine
- def namespace(self):
- return (yield from asyncio.wait_for(self.protocol.namespace(), self.timeout))
+ async def namespace(self):
+ return await asyncio.wait_for(self.protocol.namespace(), self.timeout)
- @asyncio.coroutine
- def noop(self):
- return (yield from asyncio.wait_for(self.protocol.simple_command('NOOP'), self.timeout))
+ async def noop(self):
+ return await asyncio.wait_for(self.protocol.simple_command('NOOP'), self.timeout)
- @asyncio.coroutine
- def check(self):
- return (yield from asyncio.wait_for(self.protocol.simple_command('CHECK'), self.timeout))
+ async def check(self):
+ return await asyncio.wait_for(self.protocol.simple_command('CHECK'), self.timeout)
- @asyncio.coroutine
- def examine(self, mailbox='INBOX'):
- return (yield from asyncio.wait_for(self.protocol.simple_command('EXAMINE', mailbox), self.timeout))
+ async def examine(self, mailbox='INBOX'):
+ return await asyncio.wait_for(self.protocol.simple_command('EXAMINE', mailbox), self.timeout)
- @asyncio.coroutine
- def status(self, mailbox, names):
- return (yield from asyncio.wait_for(self.protocol.simple_command('STATUS', mailbox, names), self.timeout))
+ async def status(self, mailbox, names):
+ return await asyncio.wait_for(self.protocol.simple_command('STATUS', mailbox, names), self.timeout)
- @asyncio.coroutine
- def subscribe(self, mailbox):
- return (yield from asyncio.wait_for(self.protocol.simple_command('SUBSCRIBE', mailbox), self.timeout))
+ async def subscribe(self, mailbox):
+ return await asyncio.wait_for(self.protocol.simple_command('SUBSCRIBE', mailbox), self.timeout)
- @asyncio.coroutine
- def unsubscribe(self, mailbox):
- return (yield from asyncio.wait_for(self.protocol.simple_command('UNSUBSCRIBE', mailbox), self.timeout))
+ async def unsubscribe(self, mailbox):
+ return await asyncio.wait_for(self.protocol.simple_command('UNSUBSCRIBE', mailbox), self.timeout)
- @asyncio.coroutine
- def lsub(self, reference_name, mailbox_name):
- return (yield from asyncio.wait_for(self.protocol.simple_command('LSUB', reference_name, mailbox_name), self.timeout))
+ async def lsub(self, reference_name, mailbox_name):
+ return await asyncio.wait_for(self.protocol.simple_command('LSUB', reference_name, mailbox_name), self.timeout)
- @asyncio.coroutine
- def create(self, mailbox_name):
- return (yield from asyncio.wait_for(self.protocol.simple_command('CREATE', mailbox_name), self.timeout))
+ async def create(self, mailbox_name):
+ return await asyncio.wait_for(self.protocol.simple_command('CREATE', mailbox_name), self.timeout)
- @asyncio.coroutine
- def delete(self, mailbox_name):
- return (yield from asyncio.wait_for(self.protocol.simple_command('DELETE', mailbox_name), self.timeout))
+ async def delete(self, mailbox_name):
+ return await asyncio.wait_for(self.protocol.simple_command('DELETE', mailbox_name), self.timeout)
- @asyncio.coroutine
- def rename(self, old_mailbox_name, new_mailbox_name):
- return (yield from asyncio.wait_for(self.protocol.simple_command('RENAME', old_mailbox_name, new_mailbox_name), self.timeout))
+ async def rename(self, old_mailbox_name, new_mailbox_name):
+ return await asyncio.wait_for(self.protocol.simple_command('RENAME', old_mailbox_name, new_mailbox_name), self.timeout)
- @asyncio.coroutine
- def list(self, reference_name, mailbox_pattern):
- return (yield from asyncio.wait_for(self.protocol.simple_command('LIST', reference_name, mailbox_pattern), self.timeout))
+ async def list(self, reference_name='""', mailbox_pattern='*', ret=None):
+ return await self.protocol.list(reference_name, mailbox_pattern, ret, timeout=self.timeout)
- @asyncio.coroutine
- def append(self, message_bytes, mailbox='INBOX', flags=None, date=None):
- return (yield from self.protocol.append(message_bytes, mailbox, flags, date, timeout=self.timeout))
+ async def append(self, message_bytes, mailbox='INBOX', flags=None, date=None):
+ return await self.protocol.append(message_bytes, mailbox, flags, date, timeout=self.timeout)
- @asyncio.coroutine
- def close(self):
- return (yield from asyncio.wait_for(self.protocol.close(), self.timeout))
+ async def close(self):
+ return await asyncio.wait_for(self.protocol.close(), self.timeout)
- @asyncio.coroutine
- def move(self, uid_set, mailbox):
- return (yield from asyncio.wait_for(self.protocol.move(uid_set, mailbox), self.timeout))
+ async def move(self, seq_set, mailbox):
+ return await asyncio.wait_for(self.protocol.move(seq_set, mailbox), self.timeout)
- @asyncio.coroutine
- def enable(self, capability):
+ async def uid_move(self, uid_set, mailbox):
+ return await asyncio.wait_for(self.protocol.move(uid_set, mailbox, by_uid=True), self.timeout)
+
+ async def enable(self, capability):
if 'ENABLE' not in self.protocol.capabilities:
raise Abort('server has not ENABLE capability')
+ return await asyncio.wait_for(self.protocol.simple_command('ENABLE', capability), self.timeout)
+
+ async def getmetadata(self, mailbox, metadata, options=None):
+ return await self.protocol.getmetadata(mailbox, metadata, options, timeout=self.timeout)
- return (yield from asyncio.wait_for(self.protocol.simple_command('ENABLE', capability), self.timeout))
+ async def setmetadata(self, mailbox, metadata, value):
+ return await self.protocol.setmetadata(mailbox, metadata, timeout=self.timeout)
def has_capability(self, capability):
return capability in self.protocol.capabilities
@@ -864,14 +926,11 @@ def extract_exists(response):
class IMAP4_SSL(IMAP4):
- def __init__(self, host='127.0.0.1', port=IMAP4_SSL_PORT, loop=None ,
+ def __init__(self, host='127.0.0.1', port=993, loop=None ,
timeout=IMAP4.TIMEOUT_SECONDS, ssl_context=None):
- super().__init__(host, port, loop, timeout, None, ssl_context)
-
- def create_client(self, host, port, loop, conn_lost_cb=None, ssl_context=None):
if ssl_context is None:
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
- super().create_client(host, port, loop, conn_lost_cb, ssl_context)
+ super().__init__(host, port, loop, timeout, None, ssl_context)
# functions from imaplib
@@ -885,6 +944,7 @@ def int2ap(num):
val += ap[mod:mod + 1]
return val
+
Months = ' Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec'.split(' ')
Mon2num = {s.encode():n+1 for n, s in enumerate(Months[1:])}
@@ -924,3 +984,196 @@ def time2internaldate(date_time):
raise ValueError("date_time not of a known type")
fmt = '"%d-{}-%Y %H:%M:%S %z"'.format(Months[dt.month])
return dt.strftime(fmt)
+
+
+def iter_messageset(s):
+ """Parses IMAP messageset
+ yields integers in given order without sorting
+ does not remove duplicates
+ example: "1,3:5,1:2" -> 1,3,4,5,1,2
+ raises ValueError if
+ """
+ for pair in s.split(','):
+ start, _, end = pair.partition(':')
+ for i in range(int(start or 1), int(end or start or 0)+1):
+ yield i
+
+
+thread_atom_re = re.compile(r'([()]|[0-9]+)')
+def parse_thread(lines):
+ """returns list of ints|lists from first thread line"""
+ for line in lines:
+ atoms = thread_atom_re.findall(line)
+ return nest_atoms(atoms)[0]
+
+
+def parse_fetch(lines):
+ """Iterates over fetch lines
+ yields (str, dict)
+ Need only lines without last line 'Fetch completed...'
+ """
+ parser = ResponseParser()
+ for line in lines:
+ if parser.feed(line):
+ try:
+ seq, _, vv = parser.values()
+ except ValueError:
+ print(parser.values())
+ yield seq, {vv[i]: vv[i+1] for i in range(0, len(vv), 2)}
+ parser = ResponseParser()
+
+
+def parse_metadata(lines):
+ """Iterates over metadata lines
+ yields (str, dict)
+ Need only lines without last line 'Fetch completed...'
+ """
+ parser = ResponseParser()
+ for line in lines:
+ if parser.feed(line):
+ try:
+ name, vv = parser.values()
+ except ValueError:
+ print(parser.values())
+ yield name, {vv[i]: vv[i+1] for i in range(0, len(vv), 2)}
+ parser = ResponseParser()
+
+
+class ResponseParser:
+ __slots__ = 'atoms', 'literal_next'
+
+ # doesn't work for BODY.PEEK[HEADER.FIELDS (SUBJECT)]
+ # and similar with brackets inside field name
+ atom_re = re.compile(r'''
+ ( # brackets
+ [()]
+ | # quoted
+ \"(?:|.*?[^\\](?:(?:\\\\)+)?)\"
+ | # other value without space
+ [^()\s]+
+ )''', re.VERBOSE)
+
+ def __init__(self):
+ self.atoms = []
+ self.literal_next = False
+
+ def feed(self, line):
+ if self.literal_next:
+ self.atoms[-1] = line
+ self.literal_next = False
+ return False
+ atoms = self.atom_re.findall(line)
+ self.atoms.extend(atoms)
+ if atoms[-1][-1] == '}':
+ self.literal_next = True
+ return False
+ return True
+
+ def values(self):
+ values, i = nest_atoms(self.atoms)
+ return values
+
+def nest_atoms(atoms, i=0):
+ values = []
+ while i < len(atoms):
+ value = atoms[i]
+ if value == '(':
+ value, i = nest_atoms(atoms, i+1)
+ elif value == ')':
+ return values, i
+ values.append(value)
+ i += 1
+ return values, i
+
+
+list_re = re.compile(r'\(([^)]*)\) ([^ ]+) (.+)')
+def parse_list(lines):
+ """
+ Iterate over list lines
+ yields tuples (flags:set, sep:str, name:str)
+ """
+ for line in lines:
+ match = list_re.match(line)
+ if match:
+ flags, sep, name = match.group(1, 2, 3)
+ yield set(flags.split()), unquoted(sep), name
+
+
+status_re = re.compile(r'(\S+|\".*?[^\\](?:(?:\\\\)+)?\")\s+\((.*)\)')
+def parse_status(lines):
+ """
+ Iterate over status lines
+ yields dicts
+ """
+ for line in lines:
+ match = status_re.match(line)
+ if match:
+ ss = match.group(2).split()
+ return {ss[i]: ss[i + 1] for i in range(0, len(ss), 2)}
+ return {}
+
+
+def parse_list_status(lines):
+ """
+ Iterate over list lines
+ yields tuples (flags:set, sep, name, status:dict)
+ """
+ mailboxes = {}
+ for line in lines:
+ match = list_re.match(line)
+ if match:
+ flags, sep, name = match.group(1, 2, 3)
+ mailboxes[name] = (set(flags.split()), unquoted(sep), name, {})
+ else:
+ match = status_re.match(line)
+ if match:
+ name = match.group(1)
+ try:
+ status = mailboxes[name][3]
+ except KeyError:
+ continue
+ ss = match.group(2).split()
+ status.update((ss[i], ss[i+1]) for i in range(0, len(ss), 2))
+ return mailboxes.values()
+
+
+esearch_re = re.compile(r'\(TAG "([^"]+)"\)(?: UID)?\s*(.*)')
+def parse_esearch(lines):
+ """
+ Parses first esearch line
+ returns dict or empty dict
+ """
+ for line in lines:
+ match = esearch_re.match(line)
+ if match:
+ dd = match.group(2).split()
+ return {dd[i]: dd[i+1] for i in range(0, len(dd), 2)}
+ return {}
+
+
+def format_messageset(ints) -> str:
+ "Sorts and compresses sequence of integers to str in IMAP message set format"
+ return encode_messageset(ints).decode()
+
+
+def encode_messageset(ints) -> bytearray:
+ """Sorts and compresses sequence of integers
+ returns bytearray in IMAP message set format"""
+ out = bytearray()
+ last = None
+ skipped = False
+ for i in sorted(ints):
+ if i - 1 == last:
+ skipped = True
+ elif last is None:
+ out += b'%d' % i
+ elif i - 1 > last:
+ if skipped:
+ out += b':%d,%d' % (last, i)
+ skipped = False
+ else:
+ out += b',%d' % (i)
+ last = i
+ if skipped:
+ out += b':%d' % last
+ return out
diff --git a/aioimaplib/tests/example.py b/aioimaplib/tests/example.py
index 0598b15..b9aca28 100644
--- a/aioimaplib/tests/example.py
+++ b/aioimaplib/tests/example.py
@@ -4,36 +4,34 @@
from aioimaplib import aioimaplib
-@asyncio.coroutine
-def wait_for_new_message(host, user, password):
+async def wait_for_new_message(host, user, password):
imap_client = aioimaplib.IMAP4_SSL(host=host)
- yield from imap_client.wait_hello_from_server()
+ await imap_client.wait_hello_from_server()
- yield from imap_client.login(user, password)
- yield from imap_client.select()
+ await imap_client.login(user, password)
+ await imap_client.select()
- asyncio.async(imap_client.idle())
+ await imap_client.idle()
while True:
- msg = yield from imap_client.wait_server_push()
+ msg = await imap_client.wait_server_push()
print('--> received from server: %s' % msg)
if 'EXISTS' in msg:
imap_client.idle_done()
break
- yield from imap_client.logout()
+ await imap_client.logout()
-@asyncio.coroutine
-def fetch_mail(host, user, password):
+async def fetch_mail(host, user, password):
imap_client = aioimaplib.IMAP4_SSL(host=host)
- yield from imap_client.wait_hello_from_server()
+ await imap_client.wait_hello_from_server()
- yield from imap_client.login(user, password)
+ await imap_client.login(user, password)
- response = yield from imap_client.select()
+ response = await imap_client.select()
print('there is %s messages INBOX' % aioimaplib.extract_exists(response))
- yield from imap_client.logout()
+ await imap_client.logout()
if __name__ == '__main__':
diff --git a/aioimaplib/tests/imapserver.py b/aioimaplib/tests/imapserver.py
index 40c4ecd..2d18f15 100644
--- a/aioimaplib/tests/imapserver.py
+++ b/aioimaplib/tests/imapserver.py
@@ -23,7 +23,7 @@
from collections import deque
from copy import deepcopy
from datetime import datetime, timedelta
-from email._policybase import Compat32
+from email.policy import compat32
from email.header import Header
from email.message import Message
from functools import update_wrapper
@@ -168,9 +168,8 @@ def _reindex(self, user, mailbox):
def critical_section(next_state):
- @asyncio.coroutine
- def execute_section(self, state, critical_func, *args, **kwargs):
- with (yield from self.state_condition):
+ async def execute_section(self, state, critical_func, *args, **kwargs):
+ async with self.state_condition:
critical_func(self, *args, **kwargs)
self.state = state
log.debug('state -> %s' % state)
@@ -271,7 +270,7 @@ def send(self, _bytes):
def login(self, tag, *args):
self.user_login = args[0]
self.server_state.login(self.user_login, self)
- self.send_untagged_line('CAPABILITY IMAP4rev1 %s' % self.capabilities)
+ self.send_untagged_line('[CAPABILITY IMAP4rev1 %s]' % self.capabilities)
self.send_tagged_line(tag, 'OK LOGIN completed')
@critical_section(next_state=LOGOUT)
@@ -310,10 +309,9 @@ def close(self, tag, *args):
self.user_mailbox = None
self.send_tagged_line(tag, 'OK CLOSE completed.')
- @asyncio.coroutine
- def wait(self, state):
- with (yield from self.state_condition):
- yield from self.state_condition.wait_for(lambda: self.state == state)
+ async def wait(self, state):
+ async with self.state_condition:
+ await self.state_condition.wait_for(lambda: self.state == state)
def examine(self, tag, *args):
mailbox_name = args[0]
@@ -449,7 +447,7 @@ def _build_fetch_response(self, message, parts, by_uid=True):
fetch_header = FETCH_HEADERS_RE.match(' '.join(parts))
if fetch_header:
headers = fetch_header.group('headers')
- message_headers = Message(policy=Compat32(linesep='\r\n'))
+ message_headers = Message(policy=compat32(linesep='\r\n'))
for hk in headers.split():
message_headers[hk] = message.email.get(hk, '')
response += ('BODY[HEADER.FIELDS (%s)] {%d}\r\n' %
@@ -663,8 +661,7 @@ def receive(self, mail, imap_user=None, mailbox='INBOX'):
uids.append(self._server_state.imap_receive(to, mail, mailbox))
return uids
- @asyncio.coroutine
- def wait_state(self, state, user):
+ async def wait_state(self, state, user):
user_connections = [connection for connection in self._connections if connection.user_login == user]
if len(user_connections) == 0:
other_users = list(map(lambda c: c.user_login, self._connections))
@@ -672,7 +669,7 @@ def wait_state(self, state, user):
if len(user_connections) > 1:
raise ValueError("wait_state can't handle %d connections for user %s" % (len(user_connections), user))
- yield from user_connections[0].wait(state)
+ await user_connections[0].wait(state)
def get_connection(self, user):
return self._server_state.get_connection(user)
diff --git a/aioimaplib/tests/test_acceptance_aioimaplib.py b/aioimaplib/tests/test_acceptance_aioimaplib.py
index 733f99a..982d2fd 100644
--- a/aioimaplib/tests/test_acceptance_aioimaplib.py
+++ b/aioimaplib/tests/test_acceptance_aioimaplib.py
@@ -29,19 +29,17 @@ class TestAioimaplibAcceptance(AioWithImapServer, TestCase):
def setUp(self):
self._init_server(self.loop)
- @asyncio.coroutine
- def tearDown(self):
- yield from self._shutdown_server()
+ async def tearDown(self):
+ await self._shutdown_server()
- @asyncio.coroutine
- def test_file_with_attachement(self):
+ async def test_file_with_attachement(self):
with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data/test_attachment.eml'), mode='br') as msg:
- imap_client = yield from self.login_user('user@mail', 'pass', select=True)
+ imap_client = await self.login_user('user@mail', 'pass', select=True)
mail = Mail(email.message_from_binary_file(msg))
self.imapserver.receive(mail, imap_user='user@mail')
- result, data = yield from imap_client.fetch('1', '(RFC822)')
+ result, data = await imap_client.fetch('1', '(RFC822)')
self.assertEqual('OK', result)
self.assertEqual(['1 FETCH (RFC822 {418898}', mail.as_bytes(), ')', 'FETCH completed.'], data)
diff --git a/aioimaplib/tests/test_aioimaplib.py b/aioimaplib/tests/test_aioimaplib.py
index 051c6f2..e69de29 100644
--- a/aioimaplib/tests/test_aioimaplib.py
+++ b/aioimaplib/tests/test_aioimaplib.py
@@ -1,943 +0,0 @@
-# -*- coding: utf-8 -*-
-# aioimaplib : an IMAPrev4 lib using python asyncio
-# Copyright (C) 2016 Bruno Thomas
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see .
-import asyncio
-import logging
-import os
-import ssl
-import unittest
-from datetime import datetime, timedelta
-
-import asynctest
-from mock import call, MagicMock
-from pytz import utc
-
-from aioimaplib import aioimaplib, CommandTimeout, extract_exists, \
- TWENTY_NINE_MINUTES, STOP_WAIT_SERVER_PUSH, FetchCommand, IdleCommand
-from aioimaplib.aioimaplib import Commands, IMAP4ClientProtocol, Command, Response, Abort, AioImapException
-from aioimaplib.tests import imapserver
-from aioimaplib.tests.imapserver import Mail
-from aioimaplib.tests.ssl_cert import create_temp_self_signed_cert
-from aioimaplib.tests.test_imapserver import WithImapServer
-
-aioimaplib.log.setLevel(logging.WARNING)
-sh = logging.StreamHandler()
-sh.setLevel(logging.INFO)
-sh.setFormatter(logging.Formatter("%(asctime)s %(levelname)s [%(module)s:%(lineno)d] %(message)s"))
-aioimaplib.log.addHandler(sh)
-
-
-class TestAioimaplibUtils(unittest.TestCase):
- def setUp(self):
- self.imap_protocol = IMAP4ClientProtocol(None)
- self.imap_protocol._handle_line = MagicMock(return_value=None)
- aioimaplib.get_running_loop = asyncio.new_event_loop # monkey patch to avoid Exception "No running loop"
-
- def test_split_responses_no_data(self):
- self.imap_protocol.data_received(b'')
- self.imap_protocol._handle_line.assert_not_called()
-
- def test_split_responses_regular_lines(self):
- self.imap_protocol.data_received(b'* BYE Logging out\r\nCAPB2 OK LOGOUT completed\r\n')
- self.imap_protocol._handle_line.assert_has_calls([call('* BYE Logging out', None), call('CAPB2 OK LOGOUT completed', None)])
-
- def test_split_responses_with_message_data(self):
- cmd = Command('FETCH', 'TAG')
- self.imap_protocol._handle_line = MagicMock(return_value=cmd)
- self.imap_protocol.data_received(b'* 1 FETCH (UID 1 RFC822 {26}\r\n...\r\n(mail content)\r\n...\r\n)\r\n'
- b'TAG OK FETCH completed.\r\n')
-
- self.imap_protocol._handle_line.assert_has_calls([call('* 1 FETCH (UID 1 RFC822 {26}', None)])
- self.imap_protocol._handle_line.assert_has_calls([call(')', cmd)])
- self.imap_protocol._handle_line.assert_has_calls([call('TAG OK FETCH completed.', None)])
- self.assertEqual([b'...\r\n(mail content)\r\n...\r\n'], cmd.response.lines)
-
- def test_split_responses_with_two_messages_data(self):
- cmd = Command('FETCH', 'TAG')
- self.imap_protocol._handle_line = MagicMock(return_value=cmd)
- self.imap_protocol.data_received(b'* 3 FETCH (UID 3 RFC822 {6}\r\nmail 1)\r\n'
- b'* 4 FETCH (UID 4 RFC822 {6}\r\nmail 2)\r\n'
- b'TAG OK FETCH completed.\r\n')
-
- self.imap_protocol._handle_line.assert_has_calls([call('* 3 FETCH (UID 3 RFC822 {6}', None),
- call(')', cmd),
- call('* 4 FETCH (UID 4 RFC822 {6}', None),
- call(')', cmd),
- call('TAG OK FETCH completed.', None)])
- self.assertEqual([b'mail 1', b'mail 2'], cmd.response.lines)
-
- def test_split_responses_with_flag_fetch_message_data(self):
- self.imap_protocol.data_received(b'* 1 FETCH (UID 10 FLAGS (FOO))\r\n'
- b'* 1 FETCH (UID 15 FLAGS (BAR))\r\n'
- b'TAG OK STORE completed.\r\n')
- self.imap_protocol._handle_line.assert_has_calls([call('* 1 FETCH (UID 10 FLAGS (FOO))', None),
- call('* 1 FETCH (UID 15 FLAGS (BAR))', None),
- call('TAG OK STORE completed.', None)])
-
- def test_split_responses_with_message_data_expunge(self):
- self.imap_protocol.data_received(b'* 123 EXPUNGE\r\nTAG OK SELECT completed.\r\n')
- self.imap_protocol._handle_line.assert_has_calls([call('* 123 EXPUNGE', None),
- call('TAG OK SELECT completed.', None)])
-
- def test_unconplete_line_with_litteral_fetch(self):
- cmd = Command('FETCH', 'TAG')
- self.imap_protocol._handle_line = MagicMock(return_value=cmd)
- self.imap_protocol.data_received(b'* 12 FETCH (BODY[HEADER] {4}\r\nyo\r\n)\r\n* 13 FETCH (BODY[')
- self.imap_protocol.data_received(b'HEADER] {5}\r\nyo2\r\n)\r\nTAG OK STORE completed.\r\n')
-
- self.imap_protocol._handle_line.assert_has_calls([call('* 12 FETCH (BODY[HEADER] {4}', None), call(')', cmd)])
- self.imap_protocol._handle_line.assert_has_calls([call('* 13 FETCH (BODY[HEADER] {5}', None),
- call(')', cmd),
- call('TAG OK STORE completed.', None)])
- self.assertEqual([b'yo\r\n', b'yo2\r\n'], cmd.response.lines)
-
- def test_unconplete_lines_during_litteral(self):
- cmd = Command('LIST', 'TAG')
- self.imap_protocol._handle_line = MagicMock(return_value=cmd)
-
- self.imap_protocol.data_received(b'* LIST () "/" {11}\r\nfoo/')
- self.imap_protocol.data_received(b'bar/')
- self.imap_protocol.data_received(b'baz\r\n* LIST () "/" qux\r\nTAG OK LIST completed\r\n')
-
- self.imap_protocol._handle_line.assert_has_calls([call('* LIST () "/" {11}', None)])
- self.imap_protocol._handle_line.assert_has_calls([call('* LIST () "/" qux', None),
- call('TAG OK LIST completed', None)])
- self.assertEqual([b'foo/bar/baz'], cmd.response.lines)
-
- def test_unconplete_line_during_litteral_no_cmd_found(self):
- self.imap_protocol.data_received(b'* LIST () "/" {7}\r\nfoo/')
- self.imap_protocol.data_received(b'bar\r\nTAG OK LIST completed\r\n')
-
- self.imap_protocol._handle_line.assert_has_calls([call('* LIST () "/" {7}', None)])
- self.imap_protocol._handle_line.assert_has_calls([call('* LIST () "/" {7}', None),
- call('', Command('NIL', 'unused')),
- call('TAG OK LIST completed', None)])
-
- def test_line_with_litteral_no_cmd_found_no_AttributeError_thrown(self):
- self.imap_protocol.data_received(b'* 3 FETCH (UID 12 RFC822 {4}\r\nmail)\r\n'
- b'TAG OK FETCH completed.\r\n')
- self.imap_protocol._handle_line.assert_has_calls([call('* 3 FETCH (UID 12 RFC822 {4}', None),
- call(')', Command('NIL', 'unused')),
- call('TAG OK FETCH completed.', None)])
-
- def test_line_with_attachment_litterals(self):
- cmd = Command('FETCH', 'TAG')
- self.imap_protocol._handle_line = MagicMock(return_value=cmd)
-
- self.imap_protocol.data_received(b'* 46 FETCH (UID 46 FLAGS () BODYSTRUCTURE ('
- b'("text" "calendar" ("charset" "UTF-8" "name" {16}\r\nG\xe9n\xe9ration 3.ics)'
- b' "" NIL "quoted-printable" 365 14 NIL '
- b'("attachment" ("filename" {16}\r\nG\xe9n\xe9ration 3.ics)))\r\n')
-
- self.imap_protocol._handle_line.assert_has_calls([call('* 46 FETCH (UID 46 FLAGS () BODYSTRUCTURE ('
- '("text" "calendar" ("charset" "UTF-8" "name" {16}', None),
- call(') "" NIL "quoted-printable" 365 14 NIL '
- '("attachment" ("filename" {16}', cmd),
- call(')))', cmd)])
- self.assertEqual([b'G\xe9n\xe9ration 3.ics', b'G\xe9n\xe9ration 3.ics'], cmd.response.lines)
-
- def test_uncomplete_line_followed_by_uncomplete_literal(self):
- cmd = Command('FETCH', 'TAG')
- self.imap_protocol._handle_line = MagicMock(return_value=cmd)
-
- self.imap_protocol.data_received(b'* 2 FETCH (')
- self.imap_protocol.data_received(b'FLAGS () UID 160016 BODY[] {10}\r\non the ')
- self.imap_protocol.data_received(b'dot)\r\nTAG OK FETCH completed\r\n')
-
- self.imap_protocol._handle_line.assert_has_calls([call('* 2 FETCH (FLAGS () UID 160016 BODY[] {10}', None),
- call(')', cmd), call('TAG OK FETCH completed', None)])
- self.assertEqual([b'on the dot'], cmd.response.lines)
-
- # cf 1st FETCH in https://tools.ietf.org/html/rfc3501#section-8 example
- def test_uncomplete_fetch_message_attributes_without_literal(self):
- cmd = FetchCommand('TAG')
- self.imap_protocol._handle_line = MagicMock(return_value=cmd)
-
- line = b'* 12 FETCH (FLAGS (\Seen) BODY ("TEXT" "PLAIN" ("CHARSET" "US-ASCII") NIL NIL "7BIT" 3028 \r\n'
- cmd.append_to_resp(line.decode())
- self.imap_protocol.data_received(line)
- line = b'92))\r\nTAG OK FETCH completed\r\n'
- cmd.append_to_resp(line.decode())
- self.imap_protocol.data_received(line)
-
- self.imap_protocol._handle_line.assert_has_calls(
- [call('* 12 FETCH (FLAGS (\Seen) BODY ("TEXT" "PLAIN" ("CHARSET" "US-ASCII") NIL NIL "7BIT" 3028 ', None),
- call('92))', cmd), call('TAG OK FETCH completed', None)])
-
- def test_uncomplete_fetch_with_uncomplete_line(self):
- cmd = FetchCommand('TAG')
- self.imap_protocol._handle_line = MagicMock(return_value=cmd)
-
- self.imap_protocol.data_received(b'* 21 FETCH (FLAGS (\Seen) BODY[] {16}\r\nuncomplete fetch')
- self.imap_protocol.data_received(b')\r\nTAG OK FETCH completed\r\n')
-
- self.imap_protocol._handle_line.assert_has_calls(
- [call('* 21 FETCH (FLAGS (\Seen) BODY[] {16}', None),
- call(')', cmd), call('TAG OK FETCH completed', None)])
-
- def test_command_repr(self):
- self.assertEqual('tag NAME', str(Command('NAME', 'tag')))
- self.assertEqual('tag NAME arg1 arg2', str(Command('NAME', 'tag', 'arg1', 'arg2')))
- self.assertEqual('tag UID NAME arg', str(Command('NAME', 'tag', 'arg', prefix='UID')))
- self.assertEqual('tag UID NAME', str(Command('NAME', 'tag', prefix='UID')))
-
-
-class TestDataReceived(unittest.TestCase):
- def setUp(self):
- self.imap_protocol = IMAP4ClientProtocol(None)
-
- def test_when_idle_continuation_line_in_same_dataframe_as_status_update(self):
- queue = asyncio.Queue()
- cmd = IdleCommand('TAG', queue)
- self.imap_protocol.pending_sync_command = cmd
- self.imap_protocol.data_received(b'+ idling\r\n* 1 EXISTS\r\n* 1 RECENT\r\n')
-
- self.assertEqual(['+ idling'], queue.get_nowait())
- self.assertEqual(['1 EXISTS', '1 RECENT'], queue.get_nowait())
-
-
-class TestFetchWaitsForAllMessageAttributes(unittest.TestCase):
- def test_empty_fetch(self):
- self.assertFalse(FetchCommand('TAG').wait_data())
-
- def test_simple_fetch(self):
- fetch = FetchCommand('TAG')
- fetch.append_to_resp('12 FETCH (FLAGS (\Seen))')
-
- self.assertFalse(fetch.wait_data())
-
- def test_simple_fetch_with_two_lines(self):
- fetch = FetchCommand('TAG')
- fetch.append_to_resp('12 FETCH (FLAGS (\Seen) BODY ("TEXT" "PLAIN" ("CHARSET" "US-ASCII") NIL NIL "7BIT" 3028')
- self.assertTrue(fetch.wait_data())
-
- fetch.append_to_resp('92))')
- self.assertFalse(fetch.wait_data())
-
- def test_fetch_with_litteral(self):
- fetch = FetchCommand('TAG')
- fetch.append_to_resp('12 FETCH (FLAGS () BODY[] {13}')
- fetch.begin_literal_data(13, b'literal (data')
- fetch.append_to_resp(')')
-
- self.assertFalse(fetch.wait_data())
-
- def test_fetch_only_the_last_message_data(self):
- fetch = FetchCommand('TAG')
- fetch.append_to_resp('12 FETCH (FLAGS (\Seen)') # not closed on purpose
- self.assertTrue(fetch.wait_data())
-
- fetch.append_to_resp('13 FETCH (FLAGS (\Seen)')
- self.assertTrue(fetch.wait_data())
-
- fetch.append_to_resp(')')
- self.assertFalse(fetch.wait_data())
-
-
-class TestAioimaplibCommand(asynctest.ClockedTestCase):
- @asyncio.coroutine
- def test_command_timeout(self):
- cmd = Command('CMD', 'tag', loop=self.loop, timeout=1)
- yield from self.advance(2)
- with self.assertRaises(AioImapException):
- yield from cmd.wait()
-
- @asyncio.coroutine
- def test_command_close_cancels_timer(self):
- cmd = Command('CMD', 'tag', loop=self.loop, timeout=1)
- cmd.close('line', 'OK')
- yield from self.advance(3)
-
- yield from cmd.wait()
- self.assertEqual(Response('OK', ['line']), cmd.response)
-
- @asyncio.coroutine
- def test_command_begin_literal_data_resets_timer(self):
- cmd = Command('CMD', 'tag', loop=self.loop, timeout=2)
-
- yield from self.advance(1)
- cmd.begin_literal_data(7, b'literal')
-
- yield from self.advance(1.9)
- cmd.close('line', 'OK')
-
- yield from cmd.wait()
- self.assertEqual(Response('OK', [b'literal', 'line']), cmd.response)
-
- @asyncio.coroutine
- def test_command_append_data_resets_timer(self):
- cmd = Command('CMD', 'tag', loop=self.loop, timeout=2)
- cmd.begin_literal_data(4, b'da')
-
- yield from self.advance(1.9)
- cmd.append_literal_data(b'ta')
-
- yield from self.advance(1.9)
- cmd.close('line', 'OK')
-
- yield from cmd.wait()
- self.assertEqual(Response('OK', [b'data', 'line']), cmd.response)
-
- @asyncio.coroutine
- def test_command_append_literal_data_resets_timer(self):
- cmd = Command('CMD', 'tag', loop=self.loop, timeout=2)
- cmd.begin_literal_data(12, b'literal')
-
- yield from self.advance(1.9)
- cmd.append_literal_data(b' data')
-
- yield from self.advance(1.9)
- cmd.close('line', 'OK')
-
- yield from cmd.wait()
- self.assertEqual(Response('OK', [b'literal data', 'line']), cmd.response)
-
- @asyncio.coroutine
- def test_command_append_to_resp_resets_timer(self):
- cmd = Command('CMD', 'tag', loop=self.loop, timeout=2)
-
- yield from self.advance(1.9)
- cmd.append_to_resp('line 1')
-
- yield from self.advance(1.9)
- cmd.close('line 2', 'OK')
-
- yield from cmd.wait()
- self.assertEqual(Response('OK', ['line 1', 'line 2']), cmd.response)
-
- @asyncio.coroutine
- def test_command_timeout_while_receiving_data(self):
- cmd = Command('CMD', 'tag', loop=self.loop, timeout=2)
-
- yield from self.advance(1)
- cmd.begin_literal_data(12, b'literal')
-
- yield from self.advance(3)
- with self.assertRaises(AioImapException):
- yield from cmd.wait()
-
-
-class AioWithImapServer(WithImapServer):
- @asyncio.coroutine
- def login_user(self, login, password, select=False, lib=aioimaplib.IMAP4):
- imap_client = lib(port=12345, loop=self.loop, timeout=3)
- yield from asyncio.wait_for(imap_client.wait_hello_from_server(), 2)
-
- yield from imap_client.login(login, password)
-
- if select:
- yield from imap_client.select()
- return imap_client
-
-
-class TestAioimaplib(AioWithImapServer, asynctest.TestCase):
- def setUp(self):
- self._init_server(self.loop)
-
- @asyncio.coroutine
- def tearDown(self):
- yield from self._shutdown_server()
-
- @asyncio.coroutine
- def test_capabilities(self):
- imap_client = aioimaplib.IMAP4(port=12345, loop=self.loop)
- yield from asyncio.wait_for(imap_client.wait_hello_from_server(), 2)
-
- self.assertEquals('IMAP4REV1', imap_client.protocol.imap_version)
- self.assertEquals({'IMAP4rev1', 'YESAUTH'}, imap_client.protocol.capabilities)
- self.assertTrue(imap_client.has_capability('YESAUTH'))
-
- @asyncio.coroutine
- def test_login(self):
- imap_client = aioimaplib.IMAP4(port=12345, loop=self.loop, timeout=3)
- yield from asyncio.wait_for(imap_client.wait_hello_from_server(), 2)
-
- result, data = yield from imap_client.login('user', 'password')
-
- self.assertEquals(aioimaplib.AUTH, imap_client.protocol.state)
- self.assertEqual('OK', result)
- self.assertEqual('LOGIN completed', data[-1])
- self.assertTrue(imap_client.has_capability('IDLE'))
- self.assertTrue(imap_client.has_capability('UIDPLUS'))
-
- @asyncio.coroutine
- def test_login_with_special_characters(self):
- imap_client = aioimaplib.IMAP4(port=12345, loop=self.loop, timeout=3)
- yield from asyncio.wait_for(imap_client.wait_hello_from_server(), 2)
-
- result, data = yield from imap_client.login('user', 'pass"word')
-
- self.assertEquals(aioimaplib.AUTH, imap_client.protocol.state)
- self.assertEqual('OK', result)
- self.assertEqual('LOGIN completed', data[-1])
- self.assertTrue(imap_client.has_capability('IDLE'))
- self.assertTrue(imap_client.has_capability('UIDPLUS'))
-
- @asyncio.coroutine
- def test_login_twice(self):
- with self.assertRaises(aioimaplib.Error) as expected:
- imap_client = yield from self.login_user('user', 'pass')
-
- yield from imap_client.login('user', 'password')
-
- self.assertEqual(expected.exception.args, ('command LOGIN illegal in state AUTH',))
-
- @asyncio.coroutine
- def test_logout(self):
- imap_client = yield from self.login_user('user', 'pass')
-
- result, data = yield from imap_client.logout()
-
- self.assertEqual('OK', result)
- self.assertEqual(['BYE Logging out', 'LOGOUT completed'], data)
- self.assertEquals(aioimaplib.LOGOUT, imap_client.protocol.state)
-
- @asyncio.coroutine
- def test_select_no_messages(self):
- imap_client = yield from self.login_user('user', 'pass')
-
- resp = yield from imap_client.select()
-
- self.assertEqual('OK', resp[0])
- self.assertEqual(0, extract_exists(resp))
- self.assertEquals(aioimaplib.SELECTED, imap_client.protocol.state)
-
- @asyncio.coroutine
- def test_examine_no_messages(self):
- imap_client = yield from self.login_user('user', 'pass')
-
- self.assertEquals(0, extract_exists((yield from imap_client.examine())))
-
- self.assertEquals(aioimaplib.AUTH, imap_client.protocol.state)
-
- @asyncio.coroutine
- def test_search_two_messages(self):
- self.imapserver.receive(Mail.create(['user']))
- self.imapserver.receive(Mail.create(['user']))
- imap_client = yield from self.login_user('user', 'pass', select=True)
-
- result, data = yield from imap_client.search('ALL')
-
- self.assertEqual('OK', result)
- self.assertEqual('1 2', data[0])
-
- @asyncio.coroutine
- def test_uid_with_illegal_command(self):
- imap_client = yield from self.login_user('user', 'pass', select=True)
-
- for command in {'COPY', 'FETCH', 'STORE', 'EXPUNGE', 'MOVE'}.symmetric_difference(Commands.keys()):
- with self.assertRaises(aioimaplib.Abort) as expected:
- yield from imap_client.uid(command)
-
- self.assertEqual(expected.exception.args,
- ('command UID only possible with COPY, FETCH, EXPUNGE (w/UIDPLUS) or STORE (was %s)' % command,))
-
- @asyncio.coroutine
- def test_search_three_messages_by_uid(self):
- imap_client = yield from self.login_user('user', 'pass', select=True)
- self.imapserver.receive(Mail.create(['user'])) # id=1 uid=1
- self.imapserver.receive(Mail.create(['user']), mailbox='OTHER_MAILBOX') # id=1 uid=1
- self.imapserver.receive(Mail.create(['user'])) # id=2 uid=2
-
- self.assertEqual('1 2', (yield from imap_client.search('ALL')).lines[0])
- self.assertEqual('1 2', (yield from imap_client.uid_search('ALL')).lines[0])
-
- yield from imap_client.select('OTHER_MAILBOX')
- self.assertEqual('1', (yield from imap_client.uid_search('ALL')).lines[0])
-
- @asyncio.coroutine
- def test_fetch(self):
- print('test loop %r' % self.loop)
- imap_client = yield from self.login_user('user', 'pass', select=True)
- mail = Mail.create(['user'], mail_from='me', subject='hello',
- content='pleased to meet you, wont you guess my name ?')
- self.imapserver.receive(mail)
-
- result, data = yield from imap_client.fetch('1', '(RFC822)')
- content = mail.as_bytes()
-
- self.assertEqual('OK', result)
- self.assertEqual([
- '1 FETCH (RFC822 {%s}' % len(content), content, ')',
- 'FETCH completed.'
- ], data)
-
- @asyncio.coroutine
- def test_fetch_by_uid_without_body(self):
- imap_client = yield from self.login_user('user', 'pass', select=True)
- mail = Mail.create(['user'], mail_from='me', subject='hello',
- content='pleased to meet you, wont you guess my name ?')
- self.imapserver.receive(mail)
-
- response = (yield from imap_client.uid('fetch', '1', '(UID FLAGS)'))
-
- self.assertEqual('OK', response.result)
- self.assertEquals('1 FETCH (UID 1 FLAGS ())', response.lines[0])
-
- @asyncio.coroutine
- def test_fetch_by_uid(self):
- imap_client = yield from self.login_user('user', 'pass', select=True)
- mail = Mail.create(['user'], mail_from='me', subject='hello',
- content='pleased to meet you, wont you guess my name ?')
- self.imapserver.receive(mail)
-
- response = (yield from imap_client.uid('fetch', '1', '(RFC822)'))
- self.assertEqual('OK', response.result)
- self.assertEquals(mail.as_bytes(), response.lines[1])
-
- @asyncio.coroutine
- def test_idle(self):
- imap_client = yield from self.login_user('user', 'pass', select=True)
-
- idle = yield from imap_client.idle_start(timeout=0.3)
- self.imapserver.receive(Mail.create(to=['user'], mail_from='me', subject='hello'))
-
- self.assertEquals(['1 EXISTS', '1 RECENT'], (yield from imap_client.wait_server_push()))
-
- imap_client.idle_done()
- self.assertEquals(('OK', ['IDLE terminated']), (yield from asyncio.wait_for(idle, 1)))
-
- self.assertTrue(imap_client._idle_waiter._cancelled)
- with self.assertRaises(asyncio.TimeoutError):
- yield from imap_client.wait_server_push(timeout=0.1)
-
- @asyncio.coroutine
- def test_idle_loop(self):
- imap_client = yield from self.login_user('user', 'pass', select=True)
-
- idle = yield from imap_client.idle_start(timeout=0.3)
- self.imapserver.receive(Mail.create(to=['user'], mail_from='me', subject='hello'))
-
- data = list()
- while imap_client.has_pending_idle():
- data.append((yield from imap_client.wait_server_push()))
- if data[-1] == STOP_WAIT_SERVER_PUSH:
- imap_client.idle_done()
- yield from asyncio.wait_for(idle, 1)
-
- self.assertEqual([['1 EXISTS', '1 RECENT'], STOP_WAIT_SERVER_PUSH], data)
-
- @asyncio.coroutine
- def test_idle_stop(self):
- imap_client = yield from self.login_user('user', 'pass', select=True)
- idle = yield from imap_client.idle_start()
-
- self.assertTrue((yield from imap_client.stop_wait_server_push()))
-
- self.assertEquals(STOP_WAIT_SERVER_PUSH, (yield from imap_client.wait_server_push()))
-
- imap_client.idle_done()
- yield from asyncio.wait_for(idle, 1)
-
- @asyncio.coroutine
- def test_idle_stop_does_nothing_if_no_pending_idle(self):
- imap_client = yield from self.login_user('user', 'pass', select=True)
-
- self.assertFalse((yield from imap_client.stop_wait_server_push()))
-
- @asyncio.coroutine
- def test_store_and_search_by_keyword(self):
- self.imapserver.receive(Mail.create(['user']))
- self.imapserver.receive(Mail.create(['user']))
- imap_client = yield from self.login_user('user', 'pass', select=True)
- self.assertEqual('', (yield from imap_client.uid_search('KEYWORD FOO', charset=None)).lines[0])
-
- self.assertEquals('OK', (yield from imap_client.uid('store', '1', '+FLAGS (FOO)')).result)
-
- self.assertEqual('1', (yield from imap_client.uid_search('KEYWORD FOO', charset=None)).lines[0])
- self.assertEqual('2', (yield from imap_client.uid_search('UNKEYWORD FOO', charset=None)).lines[0])
-
- @asyncio.coroutine
- def test_expunge_messages(self):
- self.imapserver.receive(Mail.create(['user']))
- self.imapserver.receive(Mail.create(['user']))
- imap_client = yield from self.login_user('user', 'pass', select=True)
-
- self.assertEquals(('OK', ['1 EXPUNGE', '2 EXPUNGE', 'EXPUNGE completed.']), (yield from imap_client.expunge()))
-
- self.assertEquals(0, extract_exists((yield from imap_client.select())))
-
- @asyncio.coroutine
- def test_copy_messages(self):
- self.imapserver.receive(Mail.create(['user']))
- self.imapserver.receive(Mail.create(['user']))
- imap_client = yield from self.login_user('user', 'pass', select=True)
-
- result, _ = yield from imap_client.copy('1', '2', 'MAILBOX')
- self.assertEqual('OK', result)
-
- self.assertEquals(2, extract_exists((yield from imap_client.select('MAILBOX'))))
-
- @asyncio.coroutine
- def test_copy_messages_by_uid(self):
- self.imapserver.receive(Mail.create(['user']))
- imap_client = yield from self.login_user('user', 'pass', select=True)
-
- result, _ = yield from imap_client.uid('copy', '1', 'MAILBOX')
- self.assertEqual('OK', result)
-
- self.assertEquals(1, extract_exists((yield from imap_client.select('MAILBOX'))))
-
- @asyncio.coroutine
- def test_concurrency_1_executing_sync_commands_sequentially(self):
- imap_client = yield from self.login_user('user', 'pass')
-
- f1 = asyncio.ensure_future(imap_client.examine('INBOX'))
- f2 = asyncio.ensure_future(imap_client.examine('MAILBOX'))
-
- yield from asyncio.wait([f1, f2])
- self.assertIsNone(f1.exception())
- self.assertIsNone(f2.exception())
-
- @asyncio.coroutine
- def test_concurrency_2_executing_same_async_commands_sequentially(self):
- self.imapserver.receive(Mail.create(['user']))
- imap_client = yield from self.login_user('user', 'pass', select=True)
-
- f1 = asyncio.ensure_future(imap_client.fetch('1', '(RFC822)'))
- f2 = asyncio.ensure_future(imap_client.fetch('1', '(RFC822)'))
-
- yield from asyncio.wait([f1, f2])
- self.assertIsNone(f1.exception())
- self.assertIsNone(f2.exception())
-
- @asyncio.coroutine
- def test_concurrency_3_executing_async_commands_in_parallel(self):
- # cf valid example in https://tools.ietf.org/html/rfc3501#section-5.5
- self.imapserver.receive(Mail.create(['user']))
- imap_client = yield from self.login_user('user', 'pass', select=True)
-
- store = asyncio.ensure_future(imap_client.store('1', '+FLAGS (FOO)'))
- copy = asyncio.ensure_future(imap_client.copy('1', 'MBOX'))
- expunge = asyncio.ensure_future(imap_client.expunge())
-
- yield from asyncio.wait([store, copy, expunge])
- self.assertEquals(0, extract_exists((yield from imap_client.select())))
- self.assertEquals(1, extract_exists((yield from imap_client.select('MBOX'))))
- self.assertEqual('1', (yield from imap_client.search('KEYWORD FOO', charset=None)).lines[0])
-
- @asyncio.coroutine
- def test_concurrency_4_sync_command_waits_for_async_commands_to_finish(self):
- self.imapserver.receive(Mail.create(['user']))
- imap_client = yield from self.login_user('user', 'pass', select=True)
-
- asyncio.ensure_future(imap_client.copy('1', 'MBOX'))
- asyncio.ensure_future(imap_client.expunge())
- examine = asyncio.ensure_future(imap_client.examine('MBOX'))
-
- self.assertEquals(1, extract_exists((yield from asyncio.wait_for(examine, 1))))
-
- @asyncio.coroutine
- def test_noop(self):
- imap_client = yield from self.login_user('user', 'pass')
- self.assertEquals(('OK', ['NOOP completed.']), (yield from imap_client.noop()))
-
- @asyncio.coroutine
- def test_noop_with_untagged_data(self):
- imap_client = yield from self.login_user('user', 'pass')
- self.imapserver.receive(Mail.create(['user']))
-
- self.assertEquals(('OK', ['1 EXISTS', '1 RECENT', 'NOOP completed.']), (yield from imap_client.noop()))
-
- @asyncio.coroutine
- def test_check(self):
- imap_client = yield from self.login_user('user', 'pass', select=True)
- self.assertEquals(('OK', ['CHECK completed.']), (yield from imap_client.check()))
-
- @asyncio.coroutine
- def test_close(self):
- imap_client = yield from self.login_user('user', 'pass', select=True)
- self.assertEquals(imapserver.SELECTED, self.imapserver.get_connection('user').state)
-
- self.assertEquals(('OK', ['CLOSE completed.']), (yield from imap_client.close()))
-
- self.assertEquals(imapserver.AUTH, self.imapserver.get_connection('user').state)
-
- @asyncio.coroutine
- def test_status(self):
- imap_client = yield from self.login_user('user', 'pass')
-
- self.assertEquals('INBOX (MESSAGES 0 UIDNEXT 1)',
- (yield from imap_client.status('INBOX', '(MESSAGES UIDNEXT)')).lines[0])
-
- @asyncio.coroutine
- def test_subscribe_unsubscribe_lsub(self):
- imap_client = yield from self.login_user('user', 'pass')
-
- self.assertEquals(('OK', ['SUBSCRIBE completed.']), (yield from imap_client.subscribe('#fr.soc.feminisme')))
- self.assertEquals(('OK', ['() "." #fr.soc.feminisme', 'LSUB completed.']),
- (yield from imap_client.lsub('#fr.', 'soc.*')))
- self.assertEquals(('OK', ['UNSUBSCRIBE completed.']), (yield from imap_client.unsubscribe('#fr.soc.feminisme')))
- self.assertEquals(('OK', ['LSUB completed.']), (yield from imap_client.lsub('#fr', '.*')))
-
- @asyncio.coroutine
- def test_create_delete_mailbox(self):
- imap_client = yield from self.login_user('user', 'pass')
- self.assertEquals('NO', (yield from imap_client.status('MBOX', '(MESSAGES)')).result)
-
- self.assertEquals(('OK', ['CREATE completed.']), (yield from imap_client.create('MBOX')))
- self.assertEquals('OK', (yield from imap_client.status('MBOX', '(MESSAGES)')).result)
-
- self.assertEquals(('OK', ['DELETE completed.']), (yield from imap_client.delete('MBOX')))
- self.assertEquals('NO', (yield from imap_client.status('MBOX', '(MESSAGES)')).result)
-
- @asyncio.coroutine
- def test_rename_mailbox(self):
- imap_client = yield from self.login_user('user', 'pass')
- self.assertEquals('NO', (yield from imap_client.status('MBOX', '(MESSAGES)')).result)
-
- self.assertEquals(('OK', ['RENAME completed.']), (yield from imap_client.rename('INBOX', 'MBOX')))
-
- self.assertEquals('OK', (yield from imap_client.status('MBOX', '(MESSAGES)')).result)
-
- @asyncio.coroutine
- def test_list(self):
- imap_client = yield from self.login_user('user', 'pass')
- self.assertEquals(('OK', ['() "/" Drafts', '() "/" INBOX', '() "/" Sent', '() "/" Trash',
- 'LIST completed.']), (yield from imap_client.list('""', '.*')))
-
- yield from imap_client.create('MYBOX')
- self.assertEquals(('OK', ['() "/" Drafts', '() "/" INBOX', '() "/" MYBOX', '() "/" Sent', '() "/" Trash',
- 'LIST completed.']),
- (yield from imap_client.list('""', '.*')))
-
- @asyncio.coroutine
- def test_append(self):
- imap_client = yield from self.login_user('user@mail', 'pass')
- self.assertEquals(0, extract_exists((yield from imap_client.examine('INBOX'))))
-
- msg = Mail.create(['user@mail'], subject='append msg', content='do you see me ?')
- response = yield from imap_client.append(msg.as_bytes(), mailbox='INBOX', flags='FOO BAR',
- date=datetime.now(tz=utc), )
- self.assertEquals('OK', response.result)
- self.assertTrue('1] APPEND completed' in response.lines[0])
-
- self.assertEquals(1, extract_exists((yield from imap_client.examine('INBOX'))))
-
- @asyncio.coroutine
- def test_rfc5032_within(self):
- self.imapserver.receive(Mail.create(['user'], date=datetime.now(tz=utc) - timedelta(seconds=84600 * 3))) # 1
- self.imapserver.receive(Mail.create(['user'], date=datetime.now(tz=utc) - timedelta(seconds=84600))) # 2
- self.imapserver.receive(Mail.create(['user'])) # 3
- imap_client = yield from self.login_user('user', 'pass', select=True)
-
- self.assertEquals('1', (yield from imap_client.search('OLDER', '84700')).lines[0])
- self.assertEquals('2 3', (yield from imap_client.search('YOUNGER', '84700')).lines[0])
-
- @asyncio.coroutine
- def test_rfc4315_uidplus_expunge(self):
- self.imapserver.receive(Mail.create(['user']))
- self.imapserver.receive(Mail.create(['user']))
- imap_client = yield from self.login_user('user', 'pass', select=True)
-
- self.assertEquals(('OK', ['1 EXPUNGE', 'UID EXPUNGE completed.']), (yield from imap_client.uid('expunge', '1:1')))
-
- self.assertEquals(1, extract_exists((yield from imap_client.select())))
-
- @asyncio.coroutine
- def test_rfc6851_move(self):
- self.imapserver.receive(Mail.create(['user']))
- imap_client = yield from self.login_user('user', 'pass', select=True)
- uidvalidity = self.imapserver.get_connection('user').uidvalidity
-
- self.assertEqual(('OK', ['OK [COPYUID %d 1:1 1:1]' % uidvalidity, '1 EXPUNGE', 'Done']),
- (yield from imap_client.move('1:1', 'Trash')))
-
- self.assertEquals(0, extract_exists((yield from imap_client.select())))
- self.assertEquals(1, extract_exists((yield from imap_client.select('Trash'))))
-
- @asyncio.coroutine
- def test_rfc6851_uidmove(self):
- self.imapserver.receive(Mail.create(['user']))
- imap_client = yield from self.login_user('user', 'pass', select=True)
- uidvalidity = self.imapserver.get_connection('user').uidvalidity
-
- self.assertEqual(('OK', ['OK [COPYUID %d 1:1 1:1]' % uidvalidity, '1 EXPUNGE', 'Done']),
- (yield from imap_client.uid('move', '1:1', 'Trash')))
-
- self.assertEquals(0, extract_exists((yield from imap_client.select())))
- self.assertEquals(1, extract_exists((yield from imap_client.select('Trash'))))
-
- @asyncio.coroutine
- def test_rfc5161_enable(self):
- imap_client = yield from self.login_user('user', 'pass')
-
- self.assertEqual(('OK', ['X-GOOD-IDEA CONDSTORE enabled']),
- (yield from imap_client.enable('X-GOOD-IDEA CONDSTORE')))
-
- @asyncio.coroutine
- def test_rfc2342_namespace(self):
- imap_client = yield from self.login_user('user', 'pass')
- response = yield from imap_client.namespace()
-
- self.assertEqual(('OK', ['(("" "/")) NIL NIL', 'NAMESPACE command completed']), response)
-
- @asyncio.coroutine
- def test_rfc2971_id(self):
- imap_client = yield from self.login_user('user', 'pass')
- response = yield from imap_client.id()
- self.assertEqual(('OK', ['ID command completed']), response)
-
-
-class TestImapServerCapabilities(AioWithImapServer, asynctest.TestCase):
- def setUp(self):
- self._init_server(self.loop, capabilities='')
-
- @asyncio.coroutine
- def tearDown(self):
- yield from self._shutdown_server()
-
- @asyncio.coroutine
- def test_idle_messages_without_idle_capability_abort_command(self):
- imap_client = yield from self.login_user('user', 'pass', select=True)
- with self.assertRaises(Abort):
- yield from imap_client.idle()
-
- @asyncio.coroutine
- def test_expunge_messages_without_uidplus_capability_abort_command(self):
- imap_client = yield from self.login_user('user', 'pass', select=True)
- with self.assertRaises(Abort):
- yield from imap_client.uid('expunge', '1:1')
-
- @asyncio.coroutine
- def test_move_without_move_capability_abort_command(self):
- imap_client = yield from self.login_user('user', 'pass', select=True)
- with self.assertRaises(Abort):
- yield from imap_client.move('1:1', 'Trash')
-
- @asyncio.coroutine
- def test_uidmove_without_move_capability_abort_command(self):
- imap_client = yield from self.login_user('user', 'pass', select=True)
- with self.assertRaises(Abort):
- yield from imap_client.uid('move', '1:1', 'Trash')
-
- @asyncio.coroutine
- def test_enable_without_enable_capability_abort_command(self):
- imap_client = yield from self.login_user('user', 'pass')
- with self.assertRaises(Abort):
- yield from imap_client.enable('CAPABILITY')
-
- @asyncio.coroutine
- def test_namespace_without_namespace_capability_abort_command(self):
- imap_client = yield from self.login_user('user', 'pass')
- with self.assertRaises(Abort):
- yield from imap_client.namespace()
-
-
-class TestAioimaplibClocked(AioWithImapServer, asynctest.ClockedTestCase):
-
- def setUp(self):
- self._init_server(self.loop)
-
- @asyncio.coroutine
- def tearDown(self):
- yield from self._shutdown_server()
-
- @asyncio.coroutine
- def test_when_async_commands_timeout__they_should_be_removed_from_protocol_state(self):
- imap_client = yield from self.login_user('user', 'pass', select=True)
- yield from (imap_client.protocol.execute(Command(
- 'DELAY', imap_client.protocol.new_tag(), '3', loop=self.loop)))
-
- noop_task = asyncio.ensure_future(imap_client.protocol.execute(
- Command('NOOP', imap_client.protocol.new_tag(), '', loop=self.loop, timeout=2)))
- yield from self.advance(1)
- self.assertEqual(1, len(imap_client.protocol.pending_async_commands))
- yield from self.advance(1.1)
-
- finished, pending = yield from asyncio.wait([noop_task], loop=self.loop)
- self.assertTrue(noop_task in finished)
- self.assertTrue(isinstance(noop_task.exception(), CommandTimeout))
- self.assertEqual(0, len(imap_client.protocol.pending_async_commands))
-
- @asyncio.coroutine
- def test_when_sync_commands_timeout__they_should_be_removed_from_protocol_state(self):
- imap_client = yield from self.login_user('user', 'pass')
- yield from (imap_client.protocol.execute(Command(
- 'DELAY', imap_client.protocol.new_tag(), '3', loop=self.loop)))
-
- delay_task = asyncio.ensure_future(imap_client.protocol.execute(
- Command('DELAY', imap_client.protocol.new_tag(), '0', loop=self.loop, timeout=2)))
- yield from self.advance(1)
- self.assertIsNotNone(imap_client.protocol.pending_sync_command)
- yield from self.advance(1.1)
-
- finished, pending = yield from asyncio.wait([delay_task], loop=self.loop)
- self.assertTrue(delay_task in finished)
- self.assertTrue(isinstance(delay_task.exception(), CommandTimeout))
- self.assertIsNone(imap_client.protocol.pending_sync_command)
-
- @asyncio.coroutine
- def test_idle_start__exits_queueget_without_timeout_error(self):
- imap_client = yield from self.login_user('user', 'pass', select=True)
-
- idle_timeout = 5
- yield from imap_client.idle_start(idle_timeout)
-
- push_task = asyncio.ensure_future(imap_client.wait_server_push(idle_timeout + 2))
- yield from self.advance(idle_timeout + 1)
-
- r = yield from asyncio.wait_for(push_task, 0)
- self.assertEqual(STOP_WAIT_SERVER_PUSH, r)
-
-
-class TestAioimaplibCallback(AioWithImapServer, asynctest.TestCase):
- def setUp(self):
- self._init_server(self.loop)
-
- @asyncio.coroutine
- def test_callback_is_called_when_connection_is_lost(self):
- queue = asyncio.Queue()
- imap_client = aioimaplib.IMAP4(port=12345, loop=self.loop, timeout=3, conn_lost_cb=(
- lambda m: queue.put_nowait('called with %s' % m)))
- yield from asyncio.wait_for(imap_client.wait_hello_from_server(), 2)
- yield from imap_client.login('login', 'password')
-
- yield from self._shutdown_server()
-
- self.assertEqual('called with None', (yield from asyncio.wait_for(queue.get(), timeout=2)))
-
-
-class TestAioimaplibSSL(WithImapServer, asynctest.TestCase):
- """ Test the aioimaplib with SSL
-
- SSL is handled transparently by asyncio, so we don't
- need to repeat all the tests - just ensure the encrypted
- connection happens
- """
- def setUp(self):
- self._cert_file, self._cert_key = create_temp_self_signed_cert()
-
- ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
- ssl_context.load_cert_chain(self._cert_file, self._cert_key)
-
- self._init_server(self.loop, ssl_context=ssl_context)
-
- @asyncio.coroutine
- def tearDown(self):
- yield from self._shutdown_server()
- os.remove(self._cert_file)
- os.remove(self._cert_key)
-
- @asyncio.coroutine
- def test_client_can_connect_to_server_over_ssl(self):
- ssl_context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH, cafile=self._cert_file)
- imap_client = aioimaplib.IMAP4_SSL(port=12345, loop=self.loop, ssl_context=ssl_context)
- yield from asyncio.wait_for(imap_client.wait_hello_from_server(), 2)
-
- self.assertEquals('IMAP4REV1', imap_client.protocol.imap_version)
- self.assertEquals({'IMAP4rev1', 'YESAUTH'}, imap_client.protocol.capabilities)
- self.assertTrue(imap_client.has_capability('YESAUTH'))
diff --git a/aioimaplib/tests/test_imapserver.py b/aioimaplib/tests/test_imapserver.py
index fbc6813..6a59a09 100644
--- a/aioimaplib/tests/test_imapserver.py
+++ b/aioimaplib/tests/test_imapserver.py
@@ -155,22 +155,20 @@ def _init_server(self, loop, capabilities=None, ssl_context=None):
host='127.0.0.1', port=12345, fetch_chunk_size=64, ssl_context=ssl_context
)
- @asyncio.coroutine
- def _shutdown_server(self):
+ async def _shutdown_server(self):
self.imapserver.reset()
self.server.close()
- yield from asyncio.wait_for(self.server.wait_closed(), 1)
+ await asyncio.wait_for(self.server.wait_closed(), 1)
- @asyncio.coroutine
- def login_user(self, login, password, select=False, lib=imaplib.IMAP4):
- imap_client = yield from asyncio.wait_for(
+ async def login_user(self, login, password, select=False, lib=imaplib.IMAP4):
+ imap_client = await asyncio.wait_for(
self.loop.run_in_executor(None, functools.partial(lib, host='127.0.0.1', port=12345)), 1)
- yield from asyncio.wait_for(
+ await asyncio.wait_for(
self.loop.run_in_executor(None, functools.partial(imap_client.login, login, password)), 1)
if select:
- yield from asyncio.wait_for(
+ await asyncio.wait_for(
self.loop.run_in_executor(None, functools.partial(imap_client.select)), 1)
return imap_client
diff --git a/aioimaplib/tests/test_imapserver_aioimaplib.py b/aioimaplib/tests/test_imapserver_aioimaplib.py
index 307cb5a..27b8a54 100644
--- a/aioimaplib/tests/test_imapserver_aioimaplib.py
+++ b/aioimaplib/tests/test_imapserver_aioimaplib.py
@@ -14,8 +14,6 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
-import asyncio
-
import asynctest
from aioimaplib import extract_exists
@@ -27,35 +25,32 @@ class TestAioimaplib(AioWithImapServer, asynctest.TestCase):
def setUp(self):
self._init_server(self.loop)
- @asyncio.coroutine
- def tearDown(self):
- yield from self._shutdown_server()
+ async def tearDown(self):
+ await self._shutdown_server()
- @asyncio.coroutine
- def test_append_too_long(self):
- imap_client = yield from self.login_user('user@mail', 'pass')
- self.assertEquals(0, extract_exists((yield from imap_client.examine('INBOX'))))
+ async def test_append_too_long(self):
+ imap_client = await self.login_user('user@mail', 'pass')
+ self.assertEquals(0, extract_exists((await imap_client.examine('INBOX'))))
message_bytes = b'do you see me ?'
imap_client.protocol.literal_data = message_bytes * 2
args = ['INBOX', '{%s}' % len(message_bytes)]
- response = yield from imap_client.protocol.execute(
+ response = await imap_client.protocol.execute(
Command('APPEND', imap_client.protocol.new_tag(), *args, loop=self.loop)
)
self.assertEquals('BAD', response.result)
self.assertTrue('expected CRLF but got' in response.lines[0])
- @asyncio.coroutine
- def test_append_too_short(self):
- imap_client = yield from self.login_user('user@mail', 'pass')
- self.assertEquals(0, extract_exists((yield from imap_client.examine('INBOX'))))
+ async def test_append_too_short(self):
+ imap_client = await self.login_user('user@mail', 'pass')
+ self.assertEquals(0, extract_exists((await imap_client.examine('INBOX'))))
message_bytes = b'do you see me ?' * 2
imap_client.protocol.literal_data = message_bytes[:5]
args = ['INBOX', '{%s}' % len(message_bytes)]
- response = yield from imap_client.protocol.execute(
+ response = await imap_client.protocol.execute(
Command('APPEND', imap_client.protocol.new_tag(), *args, loop=self.loop)
)
self.assertEquals('BAD', response.result)
diff --git a/aioimaplib/tests/test_imapserver_imaplib.py b/aioimaplib/tests/test_imapserver_imaplib.py
index aaf82a5..e8f303b 100644
--- a/aioimaplib/tests/test_imapserver_imaplib.py
+++ b/aioimaplib/tests/test_imapserver_imaplib.py
@@ -37,195 +37,179 @@ class TestImapServerWithImaplib(WithImapServer, TestCase):
def setUp(self):
self._init_server(self.loop)
- @asyncio.coroutine
- def tearDown(self):
- yield from self._shutdown_server()
+ async def tearDown(self):
+ await self._shutdown_server()
def __init__(self, methodName='runTest'):
super().__init__(methodName)
add_charset('utf-8', SHORTEST, None, 'utf-8')
add_charset('cp1252', SHORTEST, None, 'cp1252')
- @asyncio.coroutine
- def test_server_greetings_and_capabilities(self):
+ async def test_server_greetings_and_capabilities(self):
pending_imap = self.loop.run_in_executor(None, functools.partial(imaplib.IMAP4, host='127.0.0.1', port=12345))
- imap_client = yield from asyncio.wait_for(pending_imap, 1)
+ imap_client = await asyncio.wait_for(pending_imap, 1)
self.assertEqual('NONAUTH', imap_client.state)
- @asyncio.coroutine
- def test_server_login(self):
+ async def test_server_login(self):
pending_imap = self.loop.run_in_executor(None, functools.partial(imaplib.IMAP4, host='127.0.0.1', port=12345))
- imap_client = yield from asyncio.wait_for(pending_imap, 1)
+ imap_client = await asyncio.wait_for(pending_imap, 1)
pending_login = self.loop.run_in_executor(None, functools.partial(imap_client.login, 'user', 'pass'))
- result, data = yield from asyncio.wait_for(pending_login, 1)
+ result, data = await asyncio.wait_for(pending_login, 1)
self.assertEqual('OK', result)
self.assertEqual([b'LOGIN completed'], data)
self.assertEquals(imapserver.AUTH, self.imapserver.get_connection('user').state)
- @asyncio.coroutine
- def test_select_no_messages_in_mailbox(self):
- imap_client = yield from self.login_user('user@mail', 'pass')
+ async def test_select_no_messages_in_mailbox(self):
+ imap_client = await self.login_user('user@mail', 'pass')
- result, data = yield from asyncio.wait_for(
+ result, data = await asyncio.wait_for(
self.loop.run_in_executor(None, functools.partial(imap_client.select)), 1)
self.assertEqual('OK', result)
self.assertEqual([b'0'], data)
self.assertEquals(imapserver.SELECTED, self.imapserver.get_connection('user@mail').state)
- @asyncio.coroutine
- def test_select_one_message_in_mailbox(self):
+ async def test_select_one_message_in_mailbox(self):
self.imapserver.receive(Mail.create(to=['user'], mail_from='me', subject='hello'))
- imap_client = yield from self.login_user('user', 'pass')
+ imap_client = await self.login_user('user', 'pass')
- result, data = yield from asyncio.wait_for(
+ result, data = await asyncio.wait_for(
self.loop.run_in_executor(None, functools.partial(imap_client.select)), 1)
self.assertEqual('OK', result)
self.assertEqual([b'1'], data)
- @asyncio.coroutine
- def test_select_one_message_in_INBOX_zero_in_OTHER(self):
+ async def test_select_one_message_in_INBOX_zero_in_OTHER(self):
self.imapserver.receive(Mail.create(to=['user'], mail_from='me', subject='hello'))
- imap_client = yield from self.login_user('user', 'pass')
+ imap_client = await self.login_user('user', 'pass')
- _, data = yield from asyncio.wait_for(
+ _, data = await asyncio.wait_for(
self.loop.run_in_executor(None, functools.partial(imap_client.select)), 1)
self.assertEqual([b'1'], data)
- _, data = yield from asyncio.wait_for(
+ _, data = await asyncio.wait_for(
self.loop.run_in_executor(None, functools.partial(imap_client.select, 'OTHER')), 1)
self.assertEqual([b'0'], data)
- @asyncio.coroutine
- def test_examine_no_messages_in_mailbox(self):
- imap_client = yield from self.login_user('user', 'pass')
+ async def test_examine_no_messages_in_mailbox(self):
+ imap_client = await self.login_user('user', 'pass')
- self.assertEquals(('OK', [b'0']), (yield from asyncio.wait_for(
+ self.assertEquals(('OK', [b'0']), (await asyncio.wait_for(
self.loop.run_in_executor(None, functools.partial(imap_client.select, readonly=True)), 1)))
self.assertEquals(imapserver.AUTH, self.imapserver.get_connection('user').state)
- @asyncio.coroutine
- def test_search_by_uid_two_messages(self):
+ async def test_search_by_uid_two_messages(self):
self.imapserver.receive(Mail.create(['user']))
self.imapserver.receive(Mail.create(['user']))
- imap_client = yield from self.login_user('user', 'pass', select=True)
+ imap_client = await self.login_user('user', 'pass', select=True)
- result, data = yield from asyncio.wait_for(
+ result, data = await asyncio.wait_for(
self.loop.run_in_executor(None, functools.partial(imap_client.uid, 'search', 'utf-8', 'ALL')), 1)
self.assertEqual('OK', result)
self.assertEqual([b'1 2'], data)
- @asyncio.coroutine
- def test_search_by_uid_one_message_two_recipients(self):
+ async def test_search_by_uid_one_message_two_recipients(self):
self.imapserver.receive(Mail.create(['user1', 'user2']))
- imap_client = yield from self.login_user('user1', 'pass', select=True)
+ imap_client = await self.login_user('user1', 'pass', select=True)
- result, data = yield from asyncio.wait_for(
+ result, data = await asyncio.wait_for(
self.loop.run_in_executor(None, functools.partial(imap_client.uid, 'search', None, 'ALL')), 1)
self.assertEqual('OK', result)
self.assertEqual([b'1'], data)
- imap_client = yield from self.login_user('user2', 'pass', select=True)
+ imap_client = await self.login_user('user2', 'pass', select=True)
- result, data = yield from asyncio.wait_for(
+ result, data = await asyncio.wait_for(
self.loop.run_in_executor(None, functools.partial(imap_client.uid, 'search', None, 'ALL')), 1)
self.assertEqual('OK', result)
self.assertEqual([b'1'], data)
- @asyncio.coroutine
- def test_fetch_one_message_by_uid(self):
+ async def test_fetch_one_message_by_uid(self):
mail = Mail.create(['user'], mail_from='me', subject='hello', content='pleased to meet you, wont you guess my name ?')
self.imapserver.receive(mail)
- imap_client = yield from self.login_user('user', 'pass', select=True)
+ imap_client = await self.login_user('user', 'pass', select=True)
- result, data = yield from asyncio.wait_for(
+ result, data = await asyncio.wait_for(
self.loop.run_in_executor(None, functools.partial(imap_client.uid, 'fetch', '1', '(RFC822)')), 1)
self.assertEqual('OK', result)
self.assertEqual([(b'1 (UID 1 RFC822 {360}', mail.as_bytes()), b')'], data)
- @asyncio.coroutine
- def test_fetch_bad_range(self):
- imap_client = yield from self.login_user('user', 'pass', select=True)
+ async def test_fetch_bad_range(self):
+ imap_client = await self.login_user('user', 'pass', select=True)
with self.assertRaises(Exception) as expected:
- yield from asyncio.wait_for(
+ await asyncio.wait_for(
self.loop.run_in_executor(None, functools.partial(imap_client.uid, 'fetch', '0:*', '(RFC822)')), 1)
self.assertEqual('UID command error: BAD [b\'Error in IMAP command: Invalid uidset\']', str(expected.exception))
with self.assertRaises(Exception) as expected:
- yield from asyncio.wait_for(
+ await asyncio.wait_for(
self.loop.run_in_executor(None, functools.partial(imap_client.uid, 'fetch', '2:0', '(RFC822)')), 1)
self.assertEqual('UID command error: BAD [b\'Error in IMAP command: Invalid uidset\']', str(expected.exception))
- @asyncio.coroutine
- def test_fetch_one_message_by_uid_with_bodypeek(self):
+ async def test_fetch_one_message_by_uid_with_bodypeek(self):
mail = Mail.create(['user'], mail_from='me', subject='hello', content='this mail is still unread')
self.imapserver.receive(mail)
- imap_client = yield from self.login_user('user', 'pass', select=True)
+ imap_client = await self.login_user('user', 'pass', select=True)
- result, data = yield from asyncio.wait_for(
+ result, data = await asyncio.wait_for(
self.loop.run_in_executor(None, functools.partial(imap_client.uid, 'fetch', '1', '(UID BODY.PEEK[])')), 1)
self.assertEqual('OK', result)
self.assertEqual([(b'1 (UID 1 BODY.PEEK[] {340}', mail.as_bytes()), b')'], data)
- @asyncio.coroutine
- def test_fetch_one_messages_by_uid_without_body(self):
+ async def test_fetch_one_messages_by_uid_without_body(self):
mail = Mail.create(['user'], mail_from='me', subject='hello', content='whatever')
self.imapserver.receive(mail)
- imap_client = yield from self.login_user('user', 'pass', select=True)
+ imap_client = await self.login_user('user', 'pass', select=True)
- result, data = yield from asyncio.wait_for(
+ result, data = await asyncio.wait_for(
self.loop.run_in_executor(None, functools.partial(imap_client.uid, 'fetch', '1', '(UID FLAGS)')), 1)
self.assertEqual('OK', result)
self.assertEqual([(b'1 (UID 1 FLAGS ())')], data)
- @asyncio.coroutine
- def test_fetch_one_messages_by_id_without_body(self):
+ async def test_fetch_one_messages_by_id_without_body(self):
mail = Mail.create(['user'], mail_from='me', subject='hello', content='whatever')
self.imapserver.receive(mail)
- imap_client = yield from self.login_user('user', 'pass', select=True)
+ imap_client = await self.login_user('user', 'pass', select=True)
- result, data = yield from asyncio.wait_for(
+ result, data = await asyncio.wait_for(
self.loop.run_in_executor(None, functools.partial(imap_client.fetch, '1', '(UID FLAGS)')), 1)
self.assertEqual([(b'1 (UID 1 FLAGS ())')], data)
- result, data = yield from asyncio.wait_for(
+ result, data = await asyncio.wait_for(
self.loop.run_in_executor(None, functools.partial(imap_client.fetch, '1', '(FLAGS)')), 1)
self.assertEqual([(b'1 (FLAGS ())')], data)
- @asyncio.coroutine
- def test_fetch_messages_by_uid_range(self):
+ async def test_fetch_messages_by_uid_range(self):
mail = Mail.create(['user'], mail_from='me', subject='hello', content='whatever')
self.imapserver.receive(mail)
- imap_client = yield from self.login_user('user', 'pass', select=True)
+ imap_client = await self.login_user('user', 'pass', select=True)
- result, data = yield from asyncio.wait_for(
+ result, data = await asyncio.wait_for(
self.loop.run_in_executor(None, functools.partial(imap_client.uid, 'fetch', '1:1', '(FLAGS)')), 1)
self.assertEqual([(b'1 (UID 1 FLAGS ())')], data)
- result, data = yield from asyncio.wait_for(
+ result, data = await asyncio.wait_for(
self.loop.run_in_executor(None, functools.partial(imap_client.fetch, '1:*', '(UID FLAGS)')), 1)
self.assertEqual([(b'1 (UID 1 FLAGS ())')], data)
- @asyncio.coroutine
- def test_fetch_one_messages_by_uid_encoding_cp1252(self):
+ async def test_fetch_one_messages_by_uid_encoding_cp1252(self):
self.imapserver.receive(Mail.create(['user'], mail_from='me', subject='hello', content='maître', encoding='cp1252'))
- imap_client = yield from self.login_user('user', 'pass', select=True)
+ imap_client = await self.login_user('user', 'pass', select=True)
- _, data = yield from asyncio.wait_for(
+ _, data = await asyncio.wait_for(
self.loop.run_in_executor(None, functools.partial(imap_client.uid, 'fetch', '1', '(RFC822)')), 1)
mail_content = data[0][1]
@@ -233,252 +217,234 @@ def test_fetch_one_messages_by_uid_encoding_cp1252(self):
self.assertTrue(b'ma\xeetre' in mail_content)
self.assertEqual('maître', email.message_from_bytes(mail_content).get_payload().strip())
- @asyncio.coroutine
- def test_fetch_one_messages_out_of_two(self):
+ async def test_fetch_one_messages_out_of_two(self):
self.imapserver.receive(Mail.create(['user'], mail_from='me', subject='hello', content='maître'))
self.imapserver.receive(Mail.create(['user'], mail_from='you', subject='yo', content='bro'))
- imap_client = yield from self.login_user('user', 'pass', select=True)
+ imap_client = await self.login_user('user', 'pass', select=True)
- _, data = yield from asyncio.wait_for(
+ _, data = await asyncio.wait_for(
self.loop.run_in_executor(None, functools.partial(imap_client.uid, 'fetch', '1', '(RFC822)')), 1)
self.assertEqual(2, len(data))
- @asyncio.coroutine
- def test_fetch_one_message_with_headers(self):
+ async def test_fetch_one_message_with_headers(self):
self.imapserver.receive(Mail.create(['user'], mail_from='me', subject='hello', content='maître'))
- imap_client = yield from self.login_user('user', 'pass', select=True)
+ imap_client = await self.login_user('user', 'pass', select=True)
- _, data = yield from asyncio.wait_for(
+ _, data = await asyncio.wait_for(
self.loop.run_in_executor(None, functools.partial(imap_client.uid, 'fetch', '1', '(BODY.PEEK[HEADER.FIELDS (Content-type From)])')), 1)
self.assertEqual(b'1 (UID 1 BODY[HEADER.FIELDS (Content-type From)] {57}', data[0][0])
self.assertEqual(b'Content-type: text/plain; charset="utf-8"\r\nFrom: \r\n\r\n', data[0][1])
- @asyncio.coroutine
- def test_store(self):
+ async def test_store(self):
self.imapserver.receive(Mail.create(['user']))
- imap_client = yield from self.login_user('user', 'pass', select=True)
+ imap_client = await self.login_user('user', 'pass', select=True)
- result, data = yield from asyncio.wait_for(
+ result, data = await asyncio.wait_for(
self.loop.run_in_executor(None, functools.partial(imap_client.uid, 'store', '1', '+FLAGS.SILENT (\Seen \Answered)')), 1)
self.assertEqual('OK', result)
- result, data = yield from asyncio.wait_for(
+ result, data = await asyncio.wait_for(
self.loop.run_in_executor(None, functools.partial(imap_client.uid, 'fetch', '1', 'UID (FLAGS)')), 1)
self.assertEqual('OK', result)
self.assertEqual([b'1 (UID 1 FLAGS (\Seen \Answered))'], data)
- @asyncio.coroutine
- def test_store_and_search_by_keyword(self):
+ async def test_store_and_search_by_keyword(self):
self.imapserver.receive(Mail.create(['user']))
self.imapserver.receive(Mail.create(['user']))
- imap_client = yield from self.login_user('user', 'pass', select=True)
+ imap_client = await self.login_user('user', 'pass', select=True)
- result, data = yield from asyncio.wait_for(
+ result, data = await asyncio.wait_for(
self.loop.run_in_executor(None, functools.partial(imap_client.uid, 'search', None, 'KEYWORD FOO')), 1)
self.assertEqual('OK', result)
self.assertEqual([b''], data)
- result, data = yield from asyncio.wait_for(
+ result, data = await asyncio.wait_for(
self.loop.run_in_executor(None, functools.partial(imap_client.uid, 'store', '1', '+FLAGS (FOO)')), 1)
self.assertEqual('OK', result)
- result, data = yield from asyncio.wait_for(
+ result, data = await asyncio.wait_for(
self.loop.run_in_executor(None, functools.partial(imap_client.uid, 'search', None, 'KEYWORD FOO')), 1)
self.assertEqual('OK', result)
self.assertEqual([b'1'], data)
- result, data = yield from asyncio.wait_for(
+ result, data = await asyncio.wait_for(
self.loop.run_in_executor(None, functools.partial(imap_client.uid, 'search', None, 'UNKEYWORD FOO')), 1)
self.assertEqual('OK', result)
self.assertEqual([b'2'], data)
- @asyncio.coroutine
- def test_search_by_uid_range(self):
+ async def test_search_by_uid_range(self):
self.imapserver.receive(Mail.create(['user']))
self.imapserver.receive(Mail.create(['user']))
- imap_client = yield from self.login_user('user', 'pass', select=True)
+ imap_client = await self.login_user('user', 'pass', select=True)
- _, data = yield from asyncio.wait_for(
+ _, data = await asyncio.wait_for(
self.loop.run_in_executor(None, functools.partial(imap_client.uid, 'search', None, '1:2')), 1)
self.assertEqual([b'1 2'], data)
- _, data = yield from asyncio.wait_for(
+ _, data = await asyncio.wait_for(
self.loop.run_in_executor(None, functools.partial(imap_client.uid, 'search', None, '1:*')), 1)
self.assertEqual([b'1 2'], data)
- _, data = yield from asyncio.wait_for(
+ _, data = await asyncio.wait_for(
self.loop.run_in_executor(None, functools.partial(imap_client.uid, 'search', None, '1:1')), 1)
self.assertEqual([b'1'], data)
- @asyncio.coroutine
- def test_expunge_messages(self):
+ async def test_expunge_messages(self):
self.imapserver.receive(Mail.create(['user']))
self.imapserver.receive(Mail.create(['user']))
- imap_client = yield from self.login_user('user', 'pass', select=True)
+ imap_client = await self.login_user('user', 'pass', select=True)
- yield from asyncio.wait_for(self.loop.run_in_executor(None, imap_client.expunge), 1)
+ await asyncio.wait_for(self.loop.run_in_executor(None, imap_client.expunge), 1)
- self.assertEquals(('OK', [b'0']), (yield from asyncio.wait_for(
+ self.assertEquals(('OK', [b'0']), (await asyncio.wait_for(
self.loop.run_in_executor(None, functools.partial(imap_client.select)), 1)))
- @asyncio.coroutine
- def test_noop(self):
- imap_client = yield from self.login_user('user', 'pass', select=True)
+ async def test_noop(self):
+ imap_client = await self.login_user('user', 'pass', select=True)
self.assertEquals(('OK', [b'NOOP completed.']),
- (yield from asyncio.wait_for(self.loop.run_in_executor(None, imap_client.noop), 1)))
+ (await asyncio.wait_for(self.loop.run_in_executor(None, imap_client.noop), 1)))
- @asyncio.coroutine
- def test_check(self):
- imap_client = yield from self.login_user('user', 'pass', select=True)
+ async def test_check(self):
+ imap_client = await self.login_user('user', 'pass', select=True)
self.assertEquals(('OK', [b'CHECK completed.']),
- (yield from asyncio.wait_for(self.loop.run_in_executor(None, imap_client.check), 1)))
+ (await asyncio.wait_for(self.loop.run_in_executor(None, imap_client.check), 1)))
- @asyncio.coroutine
- def test_status(self):
- imap_client = yield from self.login_user('user', 'pass')
+ async def test_status(self):
+ imap_client = await self.login_user('user', 'pass')
self.assertEquals(('OK', [b'INBOX (MESSAGES 0 UIDNEXT 1)']),
- (yield from asyncio.wait_for(
+ (await asyncio.wait_for(
self.loop.run_in_executor(None, functools.partial(imap_client.status, 'INBOX',
'(MESSAGES UIDNEXT)')), 1)))
- @asyncio.coroutine
- def test_subscribe_unsubscribe_lsub(self):
- imap_client = yield from self.login_user('user', 'pass')
+ async def test_subscribe_unsubscribe_lsub(self):
+ imap_client = await self.login_user('user', 'pass')
self.assertEquals(('OK', [b'SUBSCRIBE completed.']),
- (yield from asyncio.wait_for(
+ (await asyncio.wait_for(
self.loop.run_in_executor(None, functools.partial(
imap_client.subscribe, '#fr.soc.feminisme')), 1)))
self.assertEquals(('OK', [b'() "." #fr.soc.feminisme']),
- (yield from asyncio.wait_for(
+ (await asyncio.wait_for(
self.loop.run_in_executor(None, functools.partial(
imap_client.lsub, '#fr', 'soc.*')), 1)))
self.assertEquals(('OK', [b'UNSUBSCRIBE completed.']),
- (yield from asyncio.wait_for(
+ (await asyncio.wait_for(
self.loop.run_in_executor(None, functools.partial(
imap_client.unsubscribe, '#fr.soc.feminisme')), 1)))
self.assertEquals(('OK', [None]),
- (yield from asyncio.wait_for(
+ (await asyncio.wait_for(
self.loop.run_in_executor(None, functools.partial(
imap_client.lsub, '#fr', '.*')), 1)))
- @asyncio.coroutine
- def test_close(self):
- imap_client = yield from self.login_user('user', 'pass', select=True)
+ async def test_close(self):
+ imap_client = await self.login_user('user', 'pass', select=True)
self.assertEquals(imapserver.SELECTED, self.imapserver.get_connection('user').state)
self.assertEquals(('OK', [b'CLOSE completed.']),
- (yield from asyncio.wait_for(self.loop.run_in_executor(None, imap_client.close), 1)))
+ (await asyncio.wait_for(self.loop.run_in_executor(None, imap_client.close), 1)))
self.assertEquals(imapserver.AUTH, self.imapserver.get_connection('user').state)
- @asyncio.coroutine
- def test_copy_messages(self):
+ async def test_copy_messages(self):
self.imapserver.receive(Mail.create(['user']))
self.imapserver.receive(Mail.create(['user']))
- imap_client = yield from self.login_user('user', 'pass', select=True)
+ imap_client = await self.login_user('user', 'pass', select=True)
- result, _ = yield from asyncio.wait_for(
+ result, _ = await asyncio.wait_for(
self.loop.run_in_executor(None, functools.partial(imap_client.copy, '1 2', 'MAILBOX')), 20)
self.assertEqual('OK', result)
- self.assertEquals(('OK', [b'2']), (yield from asyncio.wait_for(
+ self.assertEquals(('OK', [b'2']), (await asyncio.wait_for(
self.loop.run_in_executor(None, functools.partial(imap_client.select, 'MAILBOX')), 20)))
- @asyncio.coroutine
- def test_create_delete_mailbox(self):
- imap_client = yield from self.login_user('user', 'pass')
+ async def test_create_delete_mailbox(self):
+ imap_client = await self.login_user('user', 'pass')
self.assertEquals(('NO', [b'STATUS completed.']),
- (yield from asyncio.wait_for(
+ (await asyncio.wait_for(
self.loop.run_in_executor(None, functools.partial(imap_client.status, 'MBOX', '(MESSAGES)')), 1)))
self.assertEquals(('OK', [b'CREATE completed.']),
- (yield from asyncio.wait_for(
+ (await asyncio.wait_for(
self.loop.run_in_executor(None, functools.partial(imap_client.create, 'MBOX')), 1)))
self.assertEquals(('OK', [b'MBOX (MESSAGES 0)']),
- (yield from asyncio.wait_for(
+ (await asyncio.wait_for(
self.loop.run_in_executor(None, functools.partial(imap_client.status, 'MBOX', '(MESSAGES)')), 1)))
self.assertEquals(('OK', [b'DELETE completed.']),
- (yield from asyncio.wait_for(
+ (await asyncio.wait_for(
self.loop.run_in_executor(None, functools.partial(imap_client.delete, 'MBOX')), 1)))
self.assertEquals(('NO', [b'STATUS completed.']),
- (yield from asyncio.wait_for(
+ (await asyncio.wait_for(
self.loop.run_in_executor(None, functools.partial(imap_client.status, 'MBOX', '(MESSAGES)')), 1)))
- @asyncio.coroutine
- def test_rename_mailbox(self):
+ async def test_rename_mailbox(self):
self.imapserver.receive(Mail.create(['user']))
- imap_client = yield from self.login_user('user', 'pass')
+ imap_client = await self.login_user('user', 'pass')
self.assertEquals(('NO', [b'STATUS completed.']),
- (yield from asyncio.wait_for(
+ (await asyncio.wait_for(
self.loop.run_in_executor(None, functools.partial(imap_client.status, 'MBOX', '(MESSAGES)')), 1)))
self.assertEquals(('OK', [b'RENAME completed.']),
- (yield from asyncio.wait_for(
+ (await asyncio.wait_for(
self.loop.run_in_executor(None, functools.partial(imap_client.rename, 'INBOX', 'MBOX')), 1)))
self.assertEquals(('OK', [b'MBOX (MESSAGES 1)']),
- (yield from asyncio.wait_for(
+ (await asyncio.wait_for(
self.loop.run_in_executor(None, functools.partial(imap_client.status, 'MBOX', '(MESSAGES)')), 1)))
- @asyncio.coroutine
- def test_list(self):
- imap_client = yield from self.login_user('user', 'pass')
+ async def test_list(self):
+ imap_client = await self.login_user('user', 'pass')
self.assertEquals(('OK', [b'() "/" Drafts', b'() "/" INBOX', b'() "/" Sent', b'() "/" Trash']),
- (yield from asyncio.wait_for(
+ (await asyncio.wait_for(
self.loop.run_in_executor(None, functools.partial(imap_client.list, '""', '*')), 1)))
- @asyncio.coroutine
- def test_append(self):
- imap_client = yield from self.login_user('user@mail', 'pass')
+ async def test_append(self):
+ imap_client = await self.login_user('user@mail', 'pass')
- self.assertEquals(('OK', [b'0']), (yield from asyncio.wait_for(
+ self.assertEquals(('OK', [b'0']), (await asyncio.wait_for(
self.loop.run_in_executor(None, functools.partial(imap_client.select, 'INBOX', readonly=True)), 2)))
msg = Mail.create(['user@mail'], subject='append msg', content='do you see me ?')
- self.assertEquals('OK', (yield from asyncio.wait_for(
+ self.assertEquals('OK', (await asyncio.wait_for(
self.loop.run_in_executor(None, functools.partial(
imap_client.append, 'INBOX', 'FOO BAR', datetime.now(tz=utc), msg.as_bytes())), 2))[0])
- self.assertEquals(('OK', [b'1']), (yield from asyncio.wait_for(
+ self.assertEquals(('OK', [b'1']), (await asyncio.wait_for(
self.loop.run_in_executor(None, functools.partial(imap_client.select, 'INBOX', readonly=True)), 2)))
- @asyncio.coroutine
- def test_logout(self):
- imap_client = yield from self.login_user('user', 'pass')
+ async def test_logout(self):
+ imap_client = await self.login_user('user', 'pass')
- result, data = yield from asyncio.wait_for(self.loop.run_in_executor(None, imap_client.logout), 1)
+ result, data = await asyncio.wait_for(self.loop.run_in_executor(None, imap_client.logout), 1)
self.assertEqual('BYE', result) # uhh ?
self.assertEqual([b'Logging out'], data)
self.assertEquals(imapserver.LOGOUT, self.imapserver.get_connection('user').state)
- @asyncio.coroutine
- def test_rfc5032_within(self):
+ async def test_rfc5032_within(self):
self.imapserver.receive(Mail.create(['user'], date=datetime.now(tz=utc) - timedelta(seconds=84600 * 3))) # 1
self.imapserver.receive(Mail.create(['user'], date=datetime.now(tz=utc) - timedelta(seconds=84600))) # 2
self.imapserver.receive(Mail.create(['user'])) # 3
- imap_client = yield from self.login_user('user', 'pass', select=True)
+ imap_client = await self.login_user('user', 'pass', select=True)
- self.assertEquals([b'2 3'], (yield from asyncio.wait_for(
+ self.assertEquals([b'2 3'], (await asyncio.wait_for(
self.loop.run_in_executor(None, functools.partial(imap_client.search, 'utf-8', 'YOUNGER', '84700')), 1))[1])
- self.assertEquals([b'1'], (yield from asyncio.wait_for(
+ self.assertEquals([b'1'], (await asyncio.wait_for(
self.loop.run_in_executor(None, functools.partial(imap_client.search, 'utf-8', 'OLDER', '84700')), 1))[1])
@@ -497,9 +463,8 @@ def setUp(self):
self._init_server(self.loop, ssl_context=ssl_context)
- @asyncio.coroutine
- def tearDown(self):
- yield from self._shutdown_server()
+ async def tearDown(self):
+ await self._shutdown_server()
os.remove(self._cert_file)
os.remove(self._cert_key)
@@ -508,8 +473,7 @@ def __init__(self, methodName='runTest'):
add_charset('utf-8', SHORTEST, None, 'utf-8')
add_charset('cp1252', SHORTEST, None, 'cp1252')
- @asyncio.coroutine
- def test_client_can_connect_to_server_over_ssl(self):
+ async def test_client_can_connect_to_server_over_ssl(self):
ssl_context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH, cafile=self._cert_file)
pending_imap = self.loop.run_in_executor(None, functools.partial(
@@ -518,6 +482,6 @@ def test_client_can_connect_to_server_over_ssl(self):
port=12345,
ssl_context=ssl_context)
)
- imap_client = yield from asyncio.wait_for(pending_imap, 1)
+ imap_client = await asyncio.wait_for(pending_imap, 1)
self.assertEqual('NONAUTH', imap_client.state)
diff --git a/aioimaplib/tests/test_imapserver_imaplib2.py b/aioimaplib/tests/test_imapserver_imaplib2.py
index c1770a0..e567ebc 100644
--- a/aioimaplib/tests/test_imapserver_imaplib2.py
+++ b/aioimaplib/tests/test_imapserver_imaplib2.py
@@ -31,30 +31,27 @@ class TestImapServerIdle(WithImapServer, TestCase):
def setUp(self):
self._init_server(self.loop)
- @asyncio.coroutine
- def tearDown(self):
- yield from self._shutdown_server()
+ async def tearDown(self):
+ await self._shutdown_server()
- @asyncio.coroutine
- def test_idle(self):
- imap_client = yield from self.login_user('user', 'pass', select=True, lib=imaplib2.IMAP4)
+ async def test_idle(self):
+ imap_client = await self.login_user('user', 'pass', select=True, lib=imaplib2.IMAP4)
idle_callback = Mock()
self.loop.run_in_executor(None, functools.partial(imap_client.idle, callback=idle_callback))
- yield from asyncio.wait_for(self.imapserver.get_connection('user').wait(imapserver.IDLE), 1)
+ await asyncio.wait_for(self.imapserver.get_connection('user').wait(imapserver.IDLE), 1)
self.loop.run_in_executor(None, functools.partial(self.imapserver.receive,
Mail.create(to=['user'], mail_from='me', subject='hello')))
- yield from asyncio.wait_for(self.imapserver.get_connection('user').wait(imapserver.SELECTED), 1)
+ await asyncio.wait_for(self.imapserver.get_connection('user').wait(imapserver.SELECTED), 1)
time.sleep(0.1) # eurk hate sleeps but I don't know how to wait for the lib to receive end of IDLE
idle_callback.assert_called_once()
- @asyncio.coroutine
- def test_login_twice(self):
+ async def test_login_twice(self):
with self.assertRaises(imaplib2.IMAP4.error) as expected:
- imap_client = yield from self.login_user('user', 'pass', lib=imaplib2.IMAP4)
+ imap_client = await self.login_user('user', 'pass', lib=imaplib2.IMAP4)
- yield from asyncio.wait_for(
+ await asyncio.wait_for(
self.loop.run_in_executor(None, functools.partial(imap_client.login, 'user', 'pass')), 1)
self.assertEqual(expected.exception.args, ('command LOGIN illegal in state AUTH',))
diff --git a/aioimaplib/tests/test_utils.py b/aioimaplib/tests/test_utils.py
index cba40d2..ed7f4fd 100644
--- a/aioimaplib/tests/test_utils.py
+++ b/aioimaplib/tests/test_utils.py
@@ -15,7 +15,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from unittest import TestCase
-from aioimaplib import quoted, arguments_rfs2971, ID_MAX_FIELD_LEN, ID_MAX_VALUE_LEN
+from aioimaplib import quoted, arguments_rfc2971, ID_MAX_FIELD_LEN, ID_MAX_VALUE_LEN
class TestQuote(TestCase):
@@ -36,23 +36,23 @@ def test_quote_returns_bytes_when_input_is_bytes(self):
class TestArgument(TestCase):
- def test_arguments_rfs2971_empty(self):
- self.assertEqual(['NIL'], arguments_rfs2971())
+ def test_arguments_rfc2971_empty(self):
+ self.assertEqual(['NIL'], arguments_rfc2971())
- def test_arguments_rfs2971_with_kwargs(self):
- self.assertEqual(['(', '"name"', '"test"', ')'], arguments_rfs2971(name='test'))
+ def test_arguments_rfc2971_with_kwargs(self):
+ self.assertEqual(['(', '"name"', '"test"', ')'], arguments_rfc2971(name='test'))
- def test_arguments_rfs2971_with_max_items(self):
+ def test_arguments_rfc2971_with_max_items(self):
with self.assertRaises(ValueError):
fields = range(31)
- arguments_rfs2971(**{str(field): field for field in fields})
+ arguments_rfc2971(**{str(field): field for field in fields})
- def test_arguments_rfs2971_with_max_field_length(self):
+ def test_arguments_rfc2971_with_max_field_length(self):
with self.assertRaises(ValueError):
field = 'test' * (ID_MAX_FIELD_LEN + 1)
- arguments_rfs2971(**{field: 'test'})
+ arguments_rfc2971(**{field: 'test'})
- def test_arguments_rfs2971_with_max_value_length(self):
+ def test_arguments_rfc2971_with_max_value_length(self):
with self.assertRaises(ValueError):
value = 'test' * (ID_MAX_VALUE_LEN + 1)
- arguments_rfs2971(field=value)
+ arguments_rfc2971(field=value)