diff --git a/ietf/api/tests.py b/ietf/api/tests.py index 97e928dc51..7dc9c42cc8 100644 --- a/ietf/api/tests.py +++ b/ietf/api/tests.py @@ -33,9 +33,8 @@ from ietf.meeting.models import Session from ietf.nomcom.models import Volunteer from ietf.nomcom.factories import NomComFactory, nomcom_kwargs_for_year -from ietf.person.factories import PersonFactory, random_faker, EmailFactory +from ietf.person.factories import PersonFactory, random_faker, EmailFactory, PersonalApiKeyFactory from ietf.person.models import Email, User -from ietf.person.models import PersonalApiKey from ietf.stats.models import MeetingRegistration from ietf.utils.mail import empty_outbox, outbox, get_payload_text from ietf.utils.models import DumpInfo @@ -71,7 +70,7 @@ def test_deprecated_api_set_session_video_url(self): meeting = MeetingFactory(type_id='ietf') session = SessionFactory(group__type_id='wg', meeting=meeting) group = session.group - apikey = PersonalApiKey.objects.create(endpoint=url, person=recman) + apikey = PersonalApiKeyFactory(endpoint=url, person=recman) video = 'https://foo.example.com/bar/beer/' # error cases @@ -79,7 +78,7 @@ def test_deprecated_api_set_session_video_url(self): self.assertContains(r, "Missing apikey parameter", status_code=400) badrole = RoleFactory(group__type_id='ietf', name_id='ad') - badapikey = PersonalApiKey.objects.create(endpoint=url, person=badrole.person) + badapikey = PersonalApiKeyFactory(endpoint=url, person=badrole.person) badrole.person.user.last_login = timezone.now() badrole.person.user.save() r = self.client.post(url, {'apikey': badapikey.hash()} ) @@ -151,7 +150,7 @@ def test_api_set_session_video_url(self): recman = recmanrole.person meeting = MeetingFactory(type_id="ietf") session = SessionFactory(group__type_id="wg", meeting=meeting) - apikey = PersonalApiKey.objects.create(endpoint=url, person=recman) + apikey = PersonalApiKeyFactory(endpoint=url, person=recman) video = "https://foo.example.com/bar/beer/" # error cases @@ -159,7 +158,7 @@ def test_api_set_session_video_url(self): self.assertContains(r, "Missing apikey parameter", status_code=400) badrole = RoleFactory(group__type_id="ietf", name_id="ad") - badapikey = PersonalApiKey.objects.create(endpoint=url, person=badrole.person) + badapikey = PersonalApiKeyFactory(endpoint=url, person=badrole.person) badrole.person.user.last_login = timezone.now() badrole.person.user.save() r = self.client.post(url, {"apikey": badapikey.hash()}) @@ -228,7 +227,7 @@ def test_api_set_meetecho_recording_name(self): recman = recmanrole.person meeting = MeetingFactory(type_id="ietf") session = SessionFactory(group__type_id="wg", meeting=meeting) - apikey = PersonalApiKey.objects.create(endpoint=url, person=recman) + apikey = PersonalApiKeyFactory(endpoint=url, person=recman) name = "testname" # error cases @@ -236,7 +235,7 @@ def test_api_set_meetecho_recording_name(self): self.assertContains(r, "Missing apikey parameter", status_code=400) badrole = RoleFactory(group__type_id="ietf", name_id="ad") - badapikey = PersonalApiKey.objects.create(endpoint=url, person=badrole.person) + badapikey = PersonalApiKeyFactory(endpoint=url, person=badrole.person) badrole.person.user.last_login = timezone.now() badrole.person.user.save() r = self.client.post(url, {"apikey": badapikey.hash()}) @@ -295,10 +294,10 @@ def test_api_add_session_attendees_deprecated(self): recman = recmanrole.person meeting = MeetingFactory(type_id='ietf') session = SessionFactory(group__type_id='wg', meeting=meeting) - apikey = PersonalApiKey.objects.create(endpoint=url, person=recman) + apikey = PersonalApiKeyFactory(endpoint=url, person=recman) badrole = RoleFactory(group__type_id='ietf', name_id='ad') - badapikey = PersonalApiKey.objects.create(endpoint=url, person=badrole.person) + badapikey = PersonalApiKeyFactory(endpoint=url, person=badrole.person) badrole.person.user.last_login = timezone.now() badrole.person.user.save() @@ -361,10 +360,10 @@ def test_api_add_session_attendees(self): recman = recmanrole.person meeting = MeetingFactory(type_id="ietf") session = SessionFactory(group__type_id="wg", meeting=meeting) - apikey = PersonalApiKey.objects.create(endpoint=url, person=recman) + apikey = PersonalApiKeyFactory(endpoint=url, person=recman) badrole = RoleFactory(group__type_id="ietf", name_id="ad") - badapikey = PersonalApiKey.objects.create(endpoint=url, person=badrole.person) + badapikey = PersonalApiKeyFactory(endpoint=url, person=badrole.person) badrole.person.user.last_login = timezone.now() badrole.person.user.save() @@ -517,8 +516,8 @@ def test_api_upload_polls_and_chatlog(self): ), ): url = urlreverse(f"ietf.meeting.views.api_upload_{type_id}") - apikey = PersonalApiKey.objects.create(endpoint=url, person=recmanrole.person) - badapikey = PersonalApiKey.objects.create(endpoint=url, person=badrole.person) + apikey = PersonalApiKeyFactory(endpoint=url, person=recmanrole.person) + badapikey = PersonalApiKeyFactory(endpoint=url, person=badrole.person) r = self.client.post(url, {}) self.assertContains(r, "Missing apikey parameter", status_code=400) @@ -562,7 +561,7 @@ def test_deprecated_api_upload_bluesheet(self): meeting = MeetingFactory(type_id='ietf') session = SessionFactory(group__type_id='wg', meeting=meeting) group = session.group - apikey = PersonalApiKey.objects.create(endpoint=url, person=recman) + apikey = PersonalApiKeyFactory(endpoint=url, person=recman) people = [ {"name": "Andrea Andreotti", "affiliation": "Azienda"}, @@ -579,7 +578,7 @@ def test_deprecated_api_upload_bluesheet(self): self.assertContains(r, "Missing apikey parameter", status_code=400) badrole = RoleFactory(group__type_id='ietf', name_id='ad') - badapikey = PersonalApiKey.objects.create(endpoint=url, person=badrole.person) + badapikey = PersonalApiKeyFactory(endpoint=url, person=badrole.person) badrole.person.user.last_login = timezone.now() badrole.person.user.save() r = self.client.post(url, {'apikey': badapikey.hash()}) @@ -654,7 +653,7 @@ def test_api_upload_bluesheet(self): meeting = MeetingFactory(type_id="ietf") session = SessionFactory(group__type_id="wg", meeting=meeting) group = session.group - apikey = PersonalApiKey.objects.create(endpoint=url, person=recman) + apikey = PersonalApiKeyFactory(endpoint=url, person=recman) people = [ {"name": "Andrea Andreotti", "affiliation": "Azienda"}, @@ -671,7 +670,7 @@ def test_api_upload_bluesheet(self): self.assertContains(r, "Missing apikey parameter", status_code=400) badrole = RoleFactory(group__type_id="ietf", name_id="ad") - badapikey = PersonalApiKey.objects.create(endpoint=url, person=badrole.person) + badapikey = PersonalApiKeyFactory(endpoint=url, person=badrole.person) badrole.person.user.last_login = timezone.now() badrole.person.user.save() r = self.client.post(url, {"apikey": badapikey.hash()}) @@ -781,14 +780,14 @@ def test_api_v2_person_export_view(self): url = urlreverse('ietf.api.views.ApiV2PersonExportView') robot = PersonFactory(user__is_staff=True) RoleFactory(name_id='robot', person=robot, email=robot.email(), group__acronym='secretariat') - apikey = PersonalApiKey.objects.create(endpoint=url, person=robot) + apikey = PersonalApiKeyFactory(endpoint=url, person=robot) # error cases r = self.client.post(url, {}) self.assertContains(r, "Missing apikey parameter", status_code=400) badrole = RoleFactory(group__type_id='ietf', name_id='ad') - badapikey = PersonalApiKey.objects.create(endpoint=url, person=badrole.person) + badapikey = PersonalApiKeyFactory(endpoint=url, person=badrole.person) badrole.person.user.last_login = timezone.now() badrole.person.user.save() r = self.client.post(url, {'apikey': badapikey.hash()}) @@ -827,7 +826,7 @@ def test_api_new_meeting_registration(self): oidcp = PersonFactory(user__is_staff=True) # Make sure 'oidcp' has an acceptable role RoleFactory(name_id='robot', person=oidcp, email=oidcp.email(), group__acronym='secretariat') - key = PersonalApiKey.objects.create(person=oidcp, endpoint=url) + key = PersonalApiKeyFactory(person=oidcp, endpoint=url) reg['apikey'] = key.hash() # # Test valid POST @@ -911,7 +910,7 @@ def test_api_new_meeting_registration_nomcom_volunteer(self): oidcp = PersonFactory(user__is_staff=True) # Make sure 'oidcp' has an acceptable role RoleFactory(name_id='robot', person=oidcp, email=oidcp.email(), group__acronym='secretariat') - key = PersonalApiKey.objects.create(person=oidcp, endpoint=url) + key = PersonalApiKeyFactory(person=oidcp, endpoint=url) reg['apikey'] = key.hash() # first test is_nomcom_volunteer False @@ -945,28 +944,30 @@ def test_api_version(self): def test_api_appauth(self): - url = urlreverse('ietf.api.views.app_auth') - person = PersonFactory() - apikey = PersonalApiKey.objects.create(endpoint=url, person=person) - - self.client.login(username=person.user.username,password=f'{person.user.username}+password') - self.client.logout() - - # error cases - # missing apikey - r = self.client.post(url, {}) - self.assertContains(r, 'Missing apikey parameter', status_code=400) - - # invalid apikey - r = self.client.post(url, {'apikey': 'foobar'}) - self.assertContains(r, 'Invalid apikey', status_code=403) - - # working case - r = self.client.post(url, {'apikey': apikey.hash()}) - self.assertEqual(r.status_code, 200) - jsondata = r.json() - self.assertEqual(jsondata['success'], True) + for app in ["authortools", "bibxml"]: + url = urlreverse('ietf.api.views.app_auth', kwargs={"app": app}) + person = PersonFactory() + apikey = PersonalApiKeyFactory(endpoint=url, person=person) + self.client.login(username=person.user.username,password=f'{person.user.username}+password') + self.client.logout() + + # error cases + # missing apikey + r = self.client.post(url, {}) + self.assertContains(r, 'Missing apikey parameter', status_code=400) + + # invalid apikey + r = self.client.post(url, {'apikey': 'foobar'}) + self.assertContains(r, 'Invalid apikey', status_code=403) + + # working case + r = self.client.post(url, {'apikey': apikey.hash()}) + self.assertEqual(r.status_code, 200) + jsondata = r.json() + self.assertEqual(jsondata['success'], True) + self.client.logout() + def test_api_get_session_matherials_no_agenda_meeting_url(self): meeting = MeetingFactory(type_id='ietf') session = SessionFactory(meeting=meeting) diff --git a/ietf/api/urls.py b/ietf/api/urls.py index 431ad5c5d4..6846e32747 100644 --- a/ietf/api/urls.py +++ b/ietf/api/urls.py @@ -69,7 +69,7 @@ # Datatracker version url(r'^version/?$', api_views.version), # Application authentication API key - url(r'^appauth/[authortools|bibxml]', api_views.app_auth), + url(r'^appauth/(?Pauthortools|bibxml)$', api_views.app_auth), # latest versions url(r'^rfcdiff-latest-json/%(name)s(?:-%(rev)s)?(\.txt|\.html)?/?$' % settings.URL_REGEXPS, api_views.rfcdiff_latest_json), url(r'^rfcdiff-latest-json/(?P[Rr][Ff][Cc] [0-9]+?)(\.txt|\.html)?/?$', api_views.rfcdiff_latest_json), diff --git a/ietf/api/views.py b/ietf/api/views.py index f8662f9a0e..df67cffd56 100644 --- a/ietf/api/views.py +++ b/ietf/api/views.py @@ -30,7 +30,7 @@ from tastypie.utils.mime import determine_format, build_content_type from textwrap import dedent from traceback import format_exception, extract_tb -from typing import Iterable, Optional +from typing import Iterable, Optional, Literal import ietf from ietf.api import _api_list @@ -251,7 +251,7 @@ def version(request): @require_api_key @csrf_exempt -def app_auth(request): +def app_auth(request, app: Literal["authortools", "bibxml"]): return HttpResponse( json.dumps({'success': True}), content_type='application/json') diff --git a/ietf/doc/tests_ballot.py b/ietf/doc/tests_ballot.py index f95ba8812b..c7362b58e2 100644 --- a/ietf/doc/tests_ballot.py +++ b/ietf/doc/tests_ballot.py @@ -27,8 +27,8 @@ from ietf.ipr.factories import HolderIprDisclosureFactory from ietf.name.models import BallotPositionName from ietf.iesg.models import TelechatDate -from ietf.person.models import Person, PersonalApiKey -from ietf.person.factories import PersonFactory +from ietf.person.models import Person +from ietf.person.factories import PersonFactory, PersonalApiKeyFactory from ietf.person.utils import get_active_ads from ietf.utils.test_utils import TestCase, login_testing_unauthorized from ietf.utils.mail import outbox, empty_outbox, get_payload_text @@ -111,7 +111,7 @@ def test_api_set_position(self): create_ballot_if_not_open(None, draft, ad, 'approve') ad.user.last_login = timezone.now() ad.user.save() - apikey = PersonalApiKey.objects.create(endpoint=url, person=ad) + apikey = PersonalApiKeyFactory(endpoint=url, person=ad) # vote events_before = draft.docevent_set.count() diff --git a/ietf/ietfauth/tests.py b/ietf/ietfauth/tests.py index 722c1e8b6f..fed764e8bd 100644 --- a/ietf/ietfauth/tests.py +++ b/ietf/ietfauth/tests.py @@ -35,7 +35,7 @@ from ietf.meeting.factories import MeetingFactory from ietf.nomcom.factories import NomComFactory from ietf.person.factories import PersonFactory, EmailFactory, UserFactory, PersonalApiKeyFactory -from ietf.person.models import Person, Email, PersonalApiKey +from ietf.person.models import Person, Email from ietf.person.tasks import send_apikey_usage_emails_task from ietf.review.factories import ReviewRequestFactory, ReviewAssignmentFactory from ietf.review.models import ReviewWish, UnavailablePeriod @@ -788,9 +788,8 @@ def test_apikey_errors(self): self.assertContains(r, 'Invalid apikey', status_code=403) # invalid apikey (invalidated api key) - unauthorized_url = urlreverse('ietf.api.views.app_auth') - invalidated_apikey = PersonalApiKey.objects.create( - endpoint=unauthorized_url, person=person, valid=False) + unauthorized_url = urlreverse('ietf.api.views.app_auth', kwargs={'app': 'authortools'}) + invalidated_apikey = PersonalApiKeyFactory(endpoint=unauthorized_url, person=person, valid=False) r = self.client.post(unauthorized_url, {'apikey': invalidated_apikey.hash()}) self.assertContains(r, 'Invalid apikey', status_code=403) @@ -803,7 +802,11 @@ def test_apikey_errors(self): person.user.save() # endpoint mismatch - key2 = PersonalApiKey.objects.create(person=person, endpoint='/') + key2 = PersonalApiKeyFactory( + person=person, + endpoint='/', + validate_model=False, # allow invalid endpoint + ) r = self.client.post(key.endpoint, {'apikey':key2.hash(), 'dummy':'dummy',}) self.assertContains(r, 'Apikey endpoint mismatch', status_code=400) key2.delete() diff --git a/ietf/meeting/tests_views.py b/ietf/meeting/tests_views.py index 642edcb9b4..0d291f12bc 100644 --- a/ietf/meeting/tests_views.py +++ b/ietf/meeting/tests_views.py @@ -38,7 +38,7 @@ from ietf.doc.models import Document, NewRevisionDocEvent from ietf.group.models import Group, Role, GroupFeatures from ietf.group.utils import can_manage_group -from ietf.person.models import Person, PersonalApiKey +from ietf.person.models import Person from ietf.meeting.helpers import can_approve_interim_request, can_request_interim_meeting, can_view_interim_request, preprocess_assignments_for_agenda from ietf.meeting.helpers import send_interim_approval_request, AgendaKeywordTagger from ietf.meeting.helpers import send_interim_meeting_cancellation_notice, send_interim_session_cancellation_notice @@ -56,7 +56,7 @@ from ietf.utils.test_utils import TestCase, login_testing_unauthorized, unicontent from ietf.utils.timezone import date_today, time_now -from ietf.person.factories import PersonFactory +from ietf.person.factories import PersonFactory, PersonalApiKeyFactory from ietf.group.factories import GroupFactory, GroupEventFactory, RoleFactory from ietf.meeting.factories import (SessionFactory, ScheduleFactory, SessionPresentationFactory, MeetingFactory, FloorPlanFactory, @@ -8743,7 +8743,7 @@ def test_session_attendance(self): add_attendees_url = urlreverse('ietf.meeting.views.api_add_session_attendees') recmanrole = RoleFactory(group__type_id='ietf', name_id='recman', person__user__last_login=timezone.now()) recman = recmanrole.person - apikey = PersonalApiKey.objects.create(endpoint=add_attendees_url, person=recman) + apikey = PersonalApiKeyFactory(endpoint=add_attendees_url, person=recman) attendees = [person.user.pk for person in persons] self.client.login(username='recman', password='recman+password') r = self.client.post(add_attendees_url, {'apikey':apikey.hash(), 'attended':f'{{"session_id":{session.pk},"attendees":{attendees}}}'}) diff --git a/ietf/person/factories.py b/ietf/person/factories.py index 2012483c0d..45de554766 100644 --- a/ietf/person/factories.py +++ b/ietf/person/factories.py @@ -158,10 +158,22 @@ class Meta: class PersonalApiKeyFactory(factory.django.DjangoModelFactory): person = factory.SubFactory(PersonFactory) - endpoint = FuzzyChoice(PERSON_API_KEY_ENDPOINTS) - + endpoint = FuzzyChoice(v for v, n in PERSON_API_KEY_ENDPOINTS) + class Meta: model = PersonalApiKey + skip_postgeneration_save = True + + @factory.post_generation + def validate_model(obj, create, extracted, **kwargs): + """Validate the model after creation + + Passing validate_model=False will disable the validation. + """ + do_clean = True if extracted is None else extracted + if do_clean: + obj.full_clean() + class PersonApiKeyEventFactory(factory.django.DjangoModelFactory): key = factory.SubFactory(PersonalApiKeyFactory) diff --git a/ietf/person/migrations/0003_alter_personalapikey_endpoint.py b/ietf/person/migrations/0003_alter_personalapikey_endpoint.py new file mode 100644 index 0000000000..202af4b101 --- /dev/null +++ b/ietf/person/migrations/0003_alter_personalapikey_endpoint.py @@ -0,0 +1,42 @@ +# Generated by Django 4.2.16 on 2024-10-24 21:39 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("person", "0002_alter_historicalperson_ascii_and_more"), + ] + + operations = [ + migrations.AlterField( + model_name="personalapikey", + name="endpoint", + field=models.CharField( + choices=[ + ("/api/appauth/authortools", "/api/appauth/authortools"), + ("/api/appauth/bibxml", "/api/appauth/bibxml"), + ("/api/iesg/position", "/api/iesg/position"), + ( + "/api/meeting/session/recording-name", + "/api/meeting/session/recording-name", + ), + ( + "/api/meeting/session/video/url", + "/api/meeting/session/video/url", + ), + ("/api/notify/meeting/bluesheet", "/api/notify/meeting/bluesheet"), + ( + "/api/notify/meeting/registration", + "/api/notify/meeting/registration", + ), + ("/api/notify/session/attendees", "/api/notify/session/attendees"), + ("/api/notify/session/chatlog", "/api/notify/session/chatlog"), + ("/api/notify/session/polls", "/api/notify/session/polls"), + ("/api/v2/person/person", "/api/v2/person/person"), + ], + max_length=128, + ), + ), + ] diff --git a/ietf/person/models.py b/ietf/person/models.py index 0c25152361..85989acfc1 100644 --- a/ietf/person/models.py +++ b/ietf/person/models.py @@ -376,6 +376,7 @@ def salt(): ("/api/iesg/position", "/api/iesg/position", "Area Director"), ("/api/v2/person/person", "/api/v2/person/person", "Robot"), ("/api/meeting/session/video/url", "/api/meeting/session/video/url", "Recording Manager"), + ("/api/meeting/session/recording-name", "/api/meeting/session/recording-name", "Recording Manager"), ("/api/notify/meeting/registration", "/api/notify/meeting/registration", "Robot"), ("/api/notify/meeting/bluesheet", "/api/notify/meeting/bluesheet", "Recording Manager"), ("/api/notify/session/attendees", "/api/notify/session/attendees", "Recording Manager"), diff --git a/pyzmail/__init__.py b/pyzmail/__init__.py deleted file mode 100644 index f6c8854abc..0000000000 --- a/pyzmail/__init__.py +++ /dev/null @@ -1,21 +0,0 @@ -# -# pyzmail/__init__.py -# (c) Alain Spineux -# http://www.magiksys.net/pyzmail -# Released under LGPL - -from . import utils -from .generate import compose_mail, send_mail, send_mail2 -from .parse import email_address_re, PyzMessage, PzMessage, decode_text -from .parse import message_from_string, message_from_file -from .parse import message_from_bytes, message_from_binary_file # python >= 3.2 -from .version import __version__ - -# to help epydoc to display functions available from top of the package -__all__= [ 'compose_mail', 'send_mail', 'send_mail2', 'email_address_re', \ - 'PyzMessage', 'PzMessage', 'decode_text', '__version__', - 'utils', 'generate', 'parse', 'version', - 'message_from_string','message_from_file', - 'message_from_binary_file', 'message_from_bytes', # python >= 3.2 - ] - diff --git a/pyzmail/generate.py b/pyzmail/generate.py deleted file mode 100644 index 2ad1f62246..0000000000 --- a/pyzmail/generate.py +++ /dev/null @@ -1,529 +0,0 @@ -# -# pyzmail/generate.py -# (c) Alain Spineux -# http://www.magiksys.net/pyzmail -# Released under LGPL - -""" -Useful functions to compose and send emails. - -For short: - ->>> payload, mail_from, rcpt_to, msg_id=compose_mail((u'Me', 'me@foo.com'), -... [(u'Him', 'him@bar.com')], u'the subject', 'iso-8859-1', ('Hello world', 'us-ascii'), -... attachments=[('attached', 'text', 'plain', 'text.txt', 'us-ascii')]) -... #doctest: +SKIP ->>> error=send_mail(payload, mail_from, rcpt_to, 'localhost', smtp_port=25) -... #doctest: +SKIP -""" - -import os, sys -import time -import base64 -import smtplib, socket -import email -import email.encoders -import email.header -import email.utils -import email.mime -import email.mime.base -import email.mime.text -import email.mime.image -import email.mime.multipart - -from . import utils - -def format_addresses(addresses, header_name=None, charset=None): - """ - Convert a list of addresses into a MIME-compliant header for a From, To, Cc, - or any other I{address} related field. - This mixes the use of email.utils.formataddr() and email.header.Header(). - - @type addresses: list - @param addresses: list of addresses, can be a mix of string a tuple of the form - C{[ 'address@domain', (u'Name', 'name@domain'), ...]}. - If C{u'Name'} contains non us-ascii characters, it must be a - unicode string or encoded using the I{charset} argument. - @type header_name: string or None - @keyword header_name: the name of the header. Its length is used to limit - the length of the first line of the header according the RFC's - requirements. (not very important, but it's better to match the - requirements when possible) - @type charset: str - @keyword charset: the encoding charset for non unicode I{name} and a B{hint} - for encoding of unicode string. In other words, - if the I{name} of an address in a byte string containing non - I{us-ascii} characters, then C{name.decode(charset)} - must generate the expected result. If a unicode string - is used instead, charset will be tried to encode the - string, if it fail, I{utf-8} will be used. - With B{Python 3.x} I{charset} is no more a hint and an exception will - be raised instead of using I{utf-8} has a fall back. - @rtype: str - @return: the encoded list of formated addresses separated by commas, - ready to use as I{Header} value. - - >>> print format_addresses([('John', 'john@foo.com') ], 'From', 'us-ascii').encode() - John - >>> print format_addresses([(u'l\\xe9o', 'leo@foo.com') ], 'To', 'iso-8859-1').encode() - =?iso-8859-1?q?l=E9o?= - >>> print format_addresses([(u'l\\xe9o', 'leo@foo.com') ], 'To', 'us-ascii').encode() - ... # don't work in 3.X because charset is more than a hint - ... #doctest: +SKIP - =?utf-8?q?l=C3=A9o?= - >>> # because u'l\xe9o' cannot be encoded into us-ascii, utf8 is used instead - >>> print format_addresses([('No\\xe9', 'noe@f.com'), (u'M\\u0101ori', 'maori@b.com') ], 'Cc', 'iso-8859-1').encode() - ... # don't work in 3.X because charset is more than a hint - ... #doctest: +SKIP - =?iso-8859-1?q?No=E9?= , =?utf-8?b?TcSBb3Jp?= - >>> # 'No\xe9' is already encoded into iso-8859-1, but u'M\\u0101ori' cannot be encoded into iso-8859-1 - >>> # then utf8 is used here - >>> print format_addresses(['a@bar.com', ('John', 'john@foo.com') ], 'From', 'us-ascii').encode() - a@bar.com , John - """ - header=email.header.Header(charset=charset, header_name=header_name) - for i, address in enumerate(addresses): - if i!=0: - # add separator between addresses - header.append(',', charset='us-ascii') - - try: - name, addr=address - except ValueError: - # address is not a tuple, their is no name, only email address - header.append(address, charset='us-ascii') - else: - # check if address name is a unicode or byte string in "pure" us-ascii - if utils.is_usascii(name): - # name is a us-ascii byte string, i can use formataddr - formated_addr=email.utils.formataddr((name, addr)) - # us-ascii must be used and not default 'charset' - header.append(formated_addr, charset='us-ascii') - else: - # this is not as "pure" us-ascii string - # Header will use "RFC2047" to encode the address name - # if name is byte string, charset will be used to decode it first - header.append(name) - # here us-ascii must be used and not default 'charset' - header.append('<%s>' % (addr,), charset='us-ascii') - - return header - - -def build_mail(text, html=None, attachments=None, embeddeds=None): - """ - Generate the core of the email message regarding the parameters. - The structure of the MIME email may vary, but the general one is as follow:: - - multipart/mixed (only if attachments are included) - | - +-- multipart/related (only if embedded contents are included) - | | - | +-- multipart/alternative (only if text AND html are available) - | | | - | | +-- text/plain (text version of the message) - | | +-- text/html (html version of the message) - | | - | +-- image/gif (where to include embedded contents) - | - +-- application/msword (where to add attachments) - - @param text: the text version of the message, under the form of a tuple: - C{(encoded_content, encoding)} where I{encoded_content} is a byte string - encoded using I{encoding}. - I{text} can be None if the message has no text version. - @type text: tuple or None - @keyword html: the HTML version of the message, under the form of a tuple: - C{(encoded_content, encoding)} where I{encoded_content} is a byte string - encoded using I{encoding} - I{html} can be None if the message has no HTML version. - @type html: tuple or None - @keyword attachments: the list of attachments to include into the mail, in the - form [(data, maintype, subtype, filename, charset), ..] where : - - I{data} : is the raw data, or a I{charset} encoded string for 'text' - content. - - I{maintype} : is a MIME main type like : 'text', 'image', 'application' .... - - I{subtype} : is a MIME sub type of the above I{maintype} for example : - 'plain', 'png', 'msword' for respectively 'text/plain', 'image/png', - 'application/msword'. - - I{filename} this is the filename of the attachment, it must be a - 'us-ascii' string or a tuple of the form - C{(encoding, language, encoded_filename)} - following the RFC2231 requirement, for example - C{('iso-8859-1', 'fr', u'r\\xe9pertoir.png'.encode('iso-8859-1'))} - - I{charset} : if I{maintype} is 'text', then I{data} must be encoded - using this I{charset}. It can be None for non 'text' content. - @type attachments: list - @keyword embeddeds: is a list of documents embedded inside the HTML or text - version of the message. It is similar to the I{attachments} list, - but I{filename} is replaced by I{content_id} that is related to - the B{cid} reference into the HTML or text version of the message. - @type embeddeds: list - @rtype: inherit from email.Message - @return: the message in a MIME object - - >>> mail=build_mail(('Hello world', 'us-ascii'), attachments=[('attached', 'text', 'plain', 'text.txt', 'us-ascii')]) - >>> mail.set_boundary('===limit1==') - >>> print mail.as_string(unixfrom=False) - Content-Type: multipart/mixed; boundary="===limit1==" - MIME-Version: 1.0 - - --===limit1== - Content-Type: text/plain; charset="us-ascii" - MIME-Version: 1.0 - Content-Transfer-Encoding: 7bit - - Hello world - --===limit1== - Content-Type: text/plain; charset="us-ascii" - MIME-Version: 1.0 - Content-Transfer-Encoding: 7bit - Content-Disposition: attachment; filename="text.txt" - - attached - --===limit1==-- - """ - - if attachments is None: - attachments = [] - if embeddeds is None: - embeddeds = [] - - main=text_part=html_part=None - if text: - content, charset=text - main=text_part=email.mime.text.MIMEText(content, 'plain', charset) - - if html: - content, charset=html - main=html_part=email.mime.text.MIMEText(content, 'html', charset) - - if not text_part and not html_part: - main=text_part=email.mime.text.MIMEText('', 'plain', 'us-ascii') - elif text_part and html_part: - # need to create a multipart/alternative to include text and html version - main=email.mime.multipart.MIMEMultipart('alternative', None, [text_part, html_part]) - - if embeddeds: - related=email.mime.multipart.MIMEMultipart('related') - related.attach(main) - for part in embeddeds: - if not isinstance(part, email.mime.base.MIMEBase): - data, maintype, subtype, content_id, charset=part - if (maintype=='text'): - part=email.mime.text.MIMEText(data, subtype, charset) - else: - part=email.mime.base.MIMEBase(maintype, subtype) - part.set_payload(data) - email.encoders.encode_base64(part) - part.add_header('Content-ID', '<'+content_id+'>') - part.add_header('Content-Disposition', 'inline') - related.attach(part) - main=related - - if attachments: - mixed=email.mime.multipart.MIMEMultipart('mixed') - mixed.attach(main) - for part in attachments: - if not isinstance(part, email.mime.base.MIMEBase): - data, maintype, subtype, filename, charset=part - if (maintype=='text'): - part=email.mime.text.MIMEText(data, subtype, charset) - else: - part=email.mime.base.MIMEBase(maintype, subtype) - part.set_payload(data) - email.encoders.encode_base64(part) - part.add_header('Content-Disposition', 'attachment', filename=filename) - mixed.attach(part) - main=mixed - - return main - -def complete_mail(message, sender, recipients, subject, default_charset, cc=None, bcc=None, message_id_string=None, date=None, headers=None): - """ - Fill in the From, To, Cc, Subject, Date and Message-Id I{headers} of - one existing message regarding the parameters. - - @type message:email.Message - @param message: the message to fill in - @type sender: tuple - @param sender: a tuple of the form (u'Sender Name', 'sender.address@domain.com') - @type recipients: list - @param recipients: a list of addresses. Address can be tuple or string like - expected by L{format_addresses()}, for example: C{[ 'address@dmain.com', - (u'Recipient Name', 'recipient.address@domain.com'), ... ]} - @type subject: str - @param subject: The subject of the message, can be a unicode string or a - string encoded using I{default_charset} encoding. Prefert unicode to - byte string here. - @type default_charset: str - @param default_charset: The default charset for this email. Arguments - that are non unicode string are supposed to be encoded using this charset. - This I{charset} will be used has an hint when encoding mail content. - @type cc: list - @keyword cc: The I{carbone copy} addresses. Same format as the I{recipients} - argument. - @type bcc: list - @keyword bcc: The I{blind carbone copy} addresses. Same format as the I{recipients} - argument. - @type message_id_string: str or None - @keyword message_id_string: if None, don't append any I{Message-ID} to the - mail, let the SMTP do the job, else use the string to generate a unique - I{ID} using C{email.utils.make_msgid()}. The generated value is - returned as last argument. For example use the name of your application. - @type date: int or None - @keyword date: utc time in second from the epoch or None. If None then - use curent time C{time.time()} instead. - @type headers: list of tuple - @keyword headers: a list of C{(field, value)} tuples to fill in the mail - header fields. Values are encoded using I{default_charset}. - @rtype: tuple - @return: B{(payload, mail_from, rcpt_to, msg_id)} - - I{payload} (str) is the content of the email, generated from the message - - I{mail_from} (str) is the address of the sender to pass to the SMTP host - - I{rcpt_to} (list) is a list of the recipients addresses to pass to the SMTP host - of the form C{[ 'a@b.com', c@d.com', ]}. This combine all recipients, - I{carbone copy} addresses and I{blind carbone copy} addresses. - - I{msg_id} (None or str) None if message_id_string==None else the generated value for - the message-id. If not None, this I{Message-ID} is already written - into the payload. - - >>> import email.mime.text - >>> msg=email.mime.text.MIMEText('The text.', 'plain', 'us-ascii') - >>> # I could use build_mail() instead - >>> payload, mail_from, rcpt_to, msg_id=complete_mail(msg, ('Me', 'me@foo.com'), - ... [ ('Him', 'him@bar.com'), ], 'Non unicode subject', 'iso-8859-1', - ... cc=['her@bar.com',], date=1313558269, headers=[('User-Agent', u'pyzmail'), ]) - >>> print payload - ... # 3.X encode User-Agent: using 'iso-8859-1' even if it contains only us-asccii - ... # doctest: +ELLIPSIS - Content-Type: text/plain; charset="us-ascii" - MIME-Version: 1.0 - Content-Transfer-Encoding: 7bit - From: Me - To: Him - Cc: her@bar.com - Subject: =?iso-8859-1?q?Non_unicode_subject?= - Date: ... - User-Agent: ...pyzmail... - - The text. - >>> print 'mail_from=%r rcpt_to=%r' % (mail_from, rcpt_to) - mail_from='me@foo.com' rcpt_to=['him@bar.com', 'her@bar.com'] - """ - def getaddr(address): - if isinstance(address, tuple): - return address[1] - else: - return address - - if cc is None: - cc=[] - if bcc is None: - bcc=[] - if headers is None: - headers=[] - - mail_from=getaddr(sender[1]) - rcpt_to=list(map(getaddr, recipients)) - rcpt_to.extend(list(map(getaddr, cc))) - rcpt_to.extend(list(map(getaddr, bcc))) - - message['From'] = format_addresses([ sender, ], header_name='from', charset=default_charset) - if recipients: - message['To'] = format_addresses(recipients, header_name='to', charset=default_charset) - if cc: - message['Cc'] = format_addresses(cc, header_name='cc', charset=default_charset) - message['Subject'] = email.header.Header(subject, default_charset) - if date: - utc_from_epoch=date - else: - utc_from_epoch=time.time() - message['Date'] = email.utils.formatdate(utc_from_epoch, localtime=True) - - if message_id_string: - msg_id=message['Message-Id']=email.utils.make_msgid(message_id_string) - else: - msg_id=None - - for field, value in headers: - message[field]=email.header.Header(value, default_charset) - - payload=message.as_string() - - return payload, mail_from, rcpt_to, msg_id - -def compose_mail(sender, recipients, subject, default_charset, text, html=None, attachments=None, embeddeds=None, cc=None, bcc=None, message_id_string=None, date=None, headers=None): - """ - Compose an email regarding the arguments. Call L{build_mail()} and - L{complete_mail()} at once. - - Read the B{parameters} descriptions of both functions L{build_mail()} and L{complete_mail()}. - - Returned value is the same as for L{build_mail()} and L{complete_mail()}. - You can pass the returned values to L{send_mail()} or L{send_mail2()}. - - @rtype: tuple - @return: B{(payload, mail_from, rcpt_to, msg_id)} - - >>> payload, mail_from, rcpt_to, msg_id=compose_mail((u'Me', 'me@foo.com'), [(u'Him', 'him@bar.com')], u'the subject', 'iso-8859-1', ('Hello world', 'us-ascii'), attachments=[('attached', 'text', 'plain', 'text.txt', 'us-ascii')]) - """ - if attachments is None: - attachments=[] - if embeddeds is None: - embeddeds=[] - if cc is None: - cc=[] - if bcc is None: - bcc = [] - if headers is None: - headers=[] - - message=build_mail(text, html, attachments, embeddeds) - return complete_mail(message, sender, recipients, subject, default_charset, cc, bcc, message_id_string, date, headers) - - -def send_mail2(payload, mail_from, rcpt_to, smtp_host, smtp_port=25, smtp_mode='normal', smtp_login=None, smtp_password=None): - """ - Send the message to a SMTP host. Look at the L{send_mail()} documentation. - L{send_mail()} call this function and catch all exceptions to convert them - into a user friendly error message. The returned value - is always a dictionary. It can be empty if all recipients have been - accepted. - - @rtype: dict - @return: This function return the value returnd by C{smtplib.SMTP.sendmail()} - or raise the same exceptions. - - This method will return normally if the mail is accepted for at least one - recipient. Otherwise it will raise an exception. That is, if this - method does not raise an exception, then someone should get your mail. - If this method does not raise an exception, it returns a dictionary, - with one entry for each recipient that was refused. Each entry contains a - tuple of the SMTP error code and the accompanying error message sent by the server. - - @raise smtplib.SMTPException: Look at the standard C{smtplib.SMTP.sendmail()} documentation. - - """ - if smtp_mode=='ssl': - smtp=smtplib.SMTP_SSL(smtp_host, smtp_port) - else: - smtp=smtplib.SMTP(smtp_host, smtp_port) - if smtp_mode=='tls': - smtp.starttls() - - if smtp_login and smtp_password: - if sys.version_info<(3, 0): - # python 2.x - # login and password must be encoded - # because HMAC used in CRAM_MD5 require non unicode string - smtp.login(smtp_login.encode('utf-8'), smtp_password.encode('utf-8')) - else: - #python 3.x - smtp.login(smtp_login, smtp_password) - try: - ret=smtp.sendmail(mail_from, rcpt_to, payload) - finally: - try: - smtp.quit() - except Exception as e: - pass - - return ret - -def send_mail(payload, mail_from, rcpt_to, smtp_host, smtp_port=25, smtp_mode='normal', smtp_login=None, smtp_password=None): - """ - Send the message to a SMTP host. Handle SSL, TLS and authentication. - I{payload}, I{mail_from} and I{rcpt_to} can come from values returned by - L{complete_mail()}. This function call L{send_mail2()} but catch all - exceptions and return friendly error message instead. - - @type payload: str - @param payload: the mail content. - @type mail_from: str - @param mail_from: the sender address, for example: C{'me@domain.com'}. - @type rcpt_to: list - @param rcpt_to: The list of the recipient addresses in the form - C{[ 'a@b.com', c@d.com', ]}. No names here, only email addresses. - @type smtp_host: str - @param smtp_host: the IP address or the name of the SMTP host. - @type smtp_port: int - @keyword smtp_port: the port to connect to on the SMTP host. Default is C{25}. - @type smtp_mode: str - @keyword smtp_mode: the way to connect to the SMTP host, can be: - C{'normal'}, C{'ssl'} or C{'tls'}. default is C{'normal'} - @type smtp_login: str or None - @keyword smtp_login: If authentication is required, this is the login. - Be carefull to I{UTF8} encode your login if it contains - non I{us-ascii} characters. - @type smtp_password: str or None - @keyword smtp_password: If authentication is required, this is the password. - Be carefull to I{UTF8} encode your password if it - contains non I{us-ascii} characters. - - @rtype: dict or str - @return: This function return a dictionary of failed recipients - or a string with an error message. - - If all recipients have been accepted the dictionary is empty. If the - returned value is a string, none of the recipients will get the message. - - The dictionary is exactly of the same sort as - smtplib.SMTP.sendmail() returns with one entry for each recipient that - was refused. Each entry contains a tuple of the SMTP error code and - the accompanying error message sent by the server. - - Example: - - >>> send_mail('Subject: hello\\n\\nmessage', 'a@foo.com', [ 'b@bar.com', ], 'localhost') #doctest: +SKIP - {} - - Here is how to use the returned value:: - if isinstance(ret, dict): - if ret: - print 'failed' recipients: - for recipient, (code, msg) in ret.iteritems(): - print 'code=%d recipient=%s\terror=%s' % (code, recipient, msg) - else: - print 'success' - else: - print 'Error:', ret - - To use your GMail account to send your mail:: - smtp_host='smtp.gmail.com' - smtp_port=587 - smtp_mode='tls' - smtp_login='your.gmail.addresse@gmail.com' - smtp_password='your.gmail.password' - - Use your GMail address for the sender ! - - """ - - error=dict() - try: - ret=send_mail2(payload, mail_from, rcpt_to, smtp_host, smtp_port, smtp_mode, smtp_login, smtp_password) - except (socket.error, ) as e: - error='server %s:%s not responding: %s' % (smtp_host, smtp_port, e) - except smtplib.SMTPAuthenticationError as e: - error='authentication error: %s' % (e, ) - except smtplib.SMTPRecipientsRefused as e: - # code, error=e.recipients[recipient_addr] - error='all recipients refused: '+', '.join(list(e.recipients.keys())) - except smtplib.SMTPSenderRefused as e: - # e.sender, e.smtp_code, e.smtp_error - error='sender refused: %s' % (e.sender, ) - except smtplib.SMTPDataError as e: - error='SMTP protocol mismatch: %s' % (e, ) - except smtplib.SMTPHeloError as e: - error="server didn't reply properly to the HELO greeting: %s" % (e, ) - except smtplib.SMTPException as e: - error='SMTP error: %s' % (e, ) -# except Exception, e: -# raise # unknown error - else: - # failed addresses and error messages - error=ret - - return error - diff --git a/pyzmail/parse.py b/pyzmail/parse.py deleted file mode 100644 index 9f86c37177..0000000000 --- a/pyzmail/parse.py +++ /dev/null @@ -1,817 +0,0 @@ -# -# pyzmail/parse.py -# (c) Alain Spineux -# http://www.magiksys.net/pyzmail -# Released under LGPL - -""" -Useful functions to parse emails - -@var email_address_re: a regex that match well formed email address (from perlfaq9) -@undocumented: atom_rfc2822 -@undocumented: atom_posfix_restricted -@undocumented: atom -@undocumented: dot_atom -@undocumented: local -@undocumented: domain_lit -@undocumented: domain -@undocumented: addr_spec -""" - -import re -import io -import email -import email.errors -import email.header -import email.message -import mimetypes - -from .utils import * - -# email address REGEX matching the RFC 2822 spec from perlfaq9 -# my $atom = qr{[a-zA-Z0-9_!#\$\%&'*+/=?\^`{}~|\-]+}; -# my $dot_atom = qr{$atom(?:\.$atom)*}; -# my $quoted = qr{"(?:\\[^\r\n]|[^\\"])*"}; -# my $local = qr{(?:$dot_atom|$quoted)}; -# my $domain_lit = qr{\[(?:\\\S|[\x21-\x5a\x5e-\x7e])*\]}; -# my $domain = qr{(?:$dot_atom|$domain_lit)}; -# my $addr_spec = qr{$local\@$domain}; -# -# Python's translation -atom_rfc2822=r"[a-zA-Z0-9_!#\$\%&'*+/=?\^`{}~|\-]+" -atom_posfix_restricted=r"[a-zA-Z0-9_#\$&'*+/=?\^`{}~|\-]+" # without '!' and '%' -atom=atom_rfc2822 -dot_atom=atom + r"(?:\." + atom + ")*" -quoted=r'"(?:\\[^\r\n]|[^\\"])*"' -local="(?:" + dot_atom + "|" + quoted + ")" -domain_lit=r"\[(?:\\\S|[\x21-\x5a\x5e-\x7e])*\]" -domain="(?:" + dot_atom + "|" + domain_lit + ")" -addr_spec=local + "@" + domain -# and the result -email_address_re=re.compile('^'+addr_spec+'$') - -class MailPart: - """ - Data related to a mail part (aka message content, attachment or - embedded content in an email) - - @type charset: str or None - @ivar charset: the encoding of the I{get_payload()} content if I{type} is 'text/*' - and charset has been specified in the message - @type content_id: str or None - @ivar content_id: the MIME Content-ID if specified in the message. - @type description: str or None - @ivar description: the MIME Content-Description if specified in the message. - @type disposition: str or None - @ivar disposition: C{None}, C{'inline'} or C{'attachment'} depending - the MIME Content-Disposition value - @type filename: unicode or None - @ivar filename: the name of the file, if specified in the message. - @type part: inherit from email.mime.base.MIMEBase - @ivar part: the related part inside the message. - @type is_body: str or None - @ivar is_body: None if this part is not the mail content itself (an - attachment or embedded content), C{'text/plain'} if this part is the - text content or C{'text/html'} if this part is the HTML version. - @type sanitized_filename: str or None - @ivar sanitized_filename: This field is filled by L{PyzMessage} to store - a valid unique filename related or not with the original filename. - @type type: str - @ivar type: the MIME type, like 'text/plain', 'image/png', 'application/msword' ... - """ - - def __init__(self, part, filename=None, type=None, charset=None, content_id=None, description=None, disposition=None, sanitized_filename=None, is_body=None): - """ - Create an mail part and initialize all attributes - """ - self.part=part # original python part - self.filename=filename # filename in unicode (if any) - self.type=type # the mime-type - self.charset=charset # the charset (if any) - self.description=description # if any - self.disposition=disposition # 'inline', 'attachment' or None - self.sanitized_filename=sanitized_filename # cleanup your filename here (TODO) - self.is_body=is_body # usually in (None, 'text/plain' or 'text/html') - self.content_id=content_id # if any - if self.content_id: - # strip '<>' to ease search and replace in "root" content (TODO) - if self.content_id.startswith('<') and self.content_id.endswith('>'): - self.content_id=self.content_id[1:-1] - - def get_payload(self): - """ - decode and return part payload. if I{type} is 'text/*' and I{charset} - not C{None}, be careful to take care of the text encoding. Use - something like C{part.get_payload().decode(part.charset)} - """ - - payload=None - if self.type.startswith('message/'): - # I don't use msg.as_string() because I want to use mangle_from_=False - if sys.version_info<(3, 0): - # python 2.x - from email.generator import Generator - fp = io.StringIO() - g = Generator(fp, mangle_from_=False) - g.flatten(self.part, unixfrom=False) - payload=fp.getvalue() - else: - # support only for python >= 3.2 - from email.generator import BytesGenerator - import io - fp = io.BytesIO() - g = BytesGenerator(fp, mangle_from_=False) - g.flatten(self.part, unixfrom=False) - payload=fp.getvalue() - - else: - payload=self.part.get_payload(decode=True) - return payload - - def __repr__(self): - st='MailPart<' - if self.is_body: - st+='*' - st+=self.type - if self.charset: - st+=' charset='+self.charset - if self.filename: - st+=' filename='+self.filename - if self.content_id: - st+=' content_id='+self.content_id - st+=' len=%d' % (len(self.get_payload()), ) - st+='>' - return st - - - -_line_end_re=re.compile('\r\n|\n\r|\n|\r') - -def _friendly_header(header): - """ - Convert header returned by C{email.message.Message.get()} into a - user friendly string. - - Py3k C{email.message.Message.get()} return C{header.Header()} with charset - set to C{charset.UNKNOWN8BIT} when the header contains invalid characters, - else it return I{str} as Python 2.X does - - @type header: str or email.header.Header - @param header: the header to convert into a user friendly string - - @rtype: str - @returns: the converter header - """ - - save=header - if isinstance(header, email.header.Header): - header=str(header) - - return re.sub(_line_end_re, ' ', header) - -def decode_mail_header(value, default_charset='us-ascii'): - """ - Decode a header value into a unicode string. - Works like a more smarter python - C{u"".join(email.header.decode_header()} function - - @type value: str - @param value: the value of the header. - @type default_charset: str - @keyword default_charset: if one charset used in the header (multiple charset - can be mixed) is unknown, then use this charset instead. - - >>> decode_mail_header('=?iso-8859-1?q?Courrier_=E8lectronique_en_Fran=E7ais?=') - u'Courrier \\xe8lectronique en Fran\\xe7ais' - """ - -# value=_friendly_header(value) - try: - headers=email.header.decode_header(value) - except email.errors.HeaderParseError: - # this can append in email.base64mime.decode(), for example for this value: - # '=?UTF-8?B?15HXmdeh15jXqNeVINeY15DXpteUINeTJ9eV16jXlSDXkdeg15XXldeUINem15PXpywg15TXptei16bXldei15nXnSDXqdecINek15zXmdeZ?==?UTF-8?B?157XldeR15nXnCwg157Xldek16Ig157Xl9eV15wg15HXodeV15bXnyDXk9ec15DXnCDXldeh15gg157Xl9eR16rXldeqINep15wg15HXmdeQ?==?UTF-8?B?15zXmNeZ?=' - # then return a sanitized ascii string - # TODO: some improvements are possible here, but a failure here is - # unlikely - return value.encode('us-ascii', 'replace').decode('us-ascii') - else: - for i, (text, charset) in enumerate(headers): - # python 3.x - # email.header.decode_header('a') -> [('a', None)] - # email.header.decode_header('a =?ISO-8859-1?Q?foo?= b') - # --> [(b'a', None), (b'foo', 'iso-8859-1'), (b'b', None)] - # in Py3 text is sometime str and sometime byte :-( - # python 2.x - # email.header.decode_header('a') -> [('a', None)] - # email.header.decode_header('a =?ISO-8859-1?Q?foo?= b') - # --> [('a', None), ('foo', 'iso-8859-1'), ('b', None)] - if (charset is None and sys.version_info>=(3, 0)): - # Py3 - if isinstance(text, str): - # convert Py3 string into bytes string to be sure their is no - # non us-ascii chars and because next line expect byte string - text=text.encode('us-ascii', 'replace') - try: - headers[i]=text.decode(charset or 'us-ascii', 'replace') - except LookupError: - # if the charset is unknown, force default - headers[i]=text.decode(default_charset, 'replace') - - return "".join(headers) - -def get_mail_addresses(message, header_name): - """ - retrieve all email addresses from one message header - - @type message: email.message.Message - @param message: the email message - @type header_name: str - @param header_name: the name of the header, can be 'from', 'to', 'cc' or - any other header containing one or more email addresses - @rtype: list - @returns: a list of the addresses in the form of tuples - C{[(u'Name', 'addresse@domain.com'), ...]} - - >>> import email - >>> import email.mime.text - >>> msg=email.mime.text.MIMEText('The text.', 'plain', 'us-ascii') - >>> msg['From']=email.email.utils.formataddr(('Me', 'me@foo.com')) - >>> msg['To']=email.email.utils.formataddr(('A', 'a@foo.com'))+', '+email.email.utils.formataddr(('B', 'b@foo.com')) - >>> print msg.as_string(unixfrom=False) - Content-Type: text/plain; charset="us-ascii" - MIME-Version: 1.0 - Content-Transfer-Encoding: 7bit - From: Me - To: A , B - - The text. - >>> get_mail_addresses(msg, 'from') - [(u'Me', 'me@foo.com')] - >>> get_mail_addresses(msg, 'to') - [(u'A', 'a@foo.com'), (u'B', 'b@foo.com')] - """ - addrs=email.utils.getaddresses([ _friendly_header(h) for h in message.get_all(header_name, [])]) - for i, (addr_name, addr) in enumerate(addrs): - if not addr_name and addr: - # only one string! Is it the address or the address name ? - # use the same for both and see later - addr_name=addr - - if is_usascii(addr): - # address must be ascii only and must match address regex - if not email_address_re.match(addr): - addr='' - else: - addr='' - addrs[i]=(decode_mail_header(addr_name), addr) - return addrs - -def get_filename(part): - """ - Find the filename of a mail part. Many MUA send attachments with the - filename in the I{name} parameter of the I{Content-type} header instead - of in the I{filename} parameter of the I{Content-Disposition} header. - - @type part: inherit from email.mime.base.MIMEBase - @param part: the mail part - @rtype: None or unicode - @returns: the filename or None if not found - - >>> import email.mime.image - >>> attach=email.mime.image.MIMEImage('data', 'png') - >>> attach.add_header('Content-Disposition', 'attachment', filename='image.png') - >>> get_filename(attach) - u'image.png' - >>> print attach.as_string(unixfrom=False) - Content-Type: image/png - MIME-Version: 1.0 - Content-Transfer-Encoding: base64 - Content-Disposition: attachment; filename="image.png" - - ZGF0YQ== - >>> import email.mime.text - >>> attach=email.mime.text.MIMEText('The text.', 'plain', 'us-ascii') - >>> attach.add_header('Content-Disposition', 'attachment', filename=('iso-8859-1', 'fr', u'Fran\\xe7ais.txt'.encode('iso-8859-1'))) - >>> get_filename(attach) - u'Fran\\xe7ais.txt' - >>> print attach.as_string(unixfrom=False) - Content-Type: text/plain; charset="us-ascii" - MIME-Version: 1.0 - Content-Transfer-Encoding: 7bit - Content-Disposition: attachment; filename*="iso-8859-1'fr'Fran%E7ais.txt" - - The text. - """ - filename=part.get_param('filename', None, 'content-disposition') - if not filename: - filename=part.get_param('name', None) # default is 'content-type' - - if filename: - if isinstance(filename, tuple): - # RFC 2231 must be used to encode parameters inside MIME header - filename=email.utils.collapse_rfc2231_value(filename).strip() - else: - # But a lot of MUA erroneously use RFC 2047 instead of RFC 2231 - # in fact anybody missuse RFC2047 here !!! - filename=decode_mail_header(filename) - - return filename - -def _search_message_content(contents, part): - """ - recursive search of message content (text or HTML) inside - the structure of the email. Used by L{search_message_content()} - - @type contents: dict - @param contents: contents already found in parents or brothers I{parts}. - The dictionary will be completed as and when. key is the MIME type of the part. - @type part: inherit email.mime.base.MIMEBase - @param part: the part of the mail to look inside recursively. - """ - type=part.get_content_type() - if part.is_multipart(): # type.startswith('multipart/'): - # explore only True 'multipart/*' - # because 'messages/rfc822' are 'multipart/*' too but - # must not be explored here - if type=='multipart/related': - # the first part or the one pointed by start - start=part.get_param('start', None) - related_type=part.get_param('type', None) - for i, subpart in enumerate(part.get_payload()): - if (not start and i==0) or (start and start==subpart.get('Content-Id')): - _search_message_content(contents, subpart) - return - elif type=='multipart/alternative': - # all parts are candidates and latest is the best - for subpart in part.get_payload(): - _search_message_content(contents, subpart) - elif type in ('multipart/report', 'multipart/signed'): - # only the first part is candidate - try: - subpart=part.get_payload()[0] - except IndexError: - return - else: - _search_message_content(contents, subpart) - return - - elif type=='multipart/encrypted': - # the second part is the good one, but we need to de-crypt it - # using the first part. Do nothing - return - - else: - # unknown types must be handled as 'multipart/mixed' - # This is the peace of code that could probably be improved, - # I use a heuristic : if not already found, use first valid non - # 'attachment' parts found - for subpart in part.get_payload(): - tmp_contents=dict() - _search_message_content(tmp_contents, subpart) - for k, v in tmp_contents.items(): - if not subpart.get_param('attachment', None, 'content-disposition')=='': - # if not an attachment, initiate value if not already found - contents.setdefault(k, v) - return - else: - contents[part.get_content_type().lower()]=part - return - - return - -def search_message_content(mail): - """ - search of message content (text or HTML) inside - the structure of the mail. This function is used by L{get_mail_parts()} - to set the C{is_body} part of the L{MailPart}s - - @type mail: inherit from email.message.Message - @param mail: the message to search in. - @rtype: dict - @returns: a dictionary of the form C{{'text/plain': text_part, 'text/html': html_part}} - where text_part and html_part inherite from C{email.mime.text.MIMEText} - and are respectively the I{text} and I{HTML} version of the message content. - One part can be missing. The dictionay can aven be empty if none of the - parts math the requirements to be considered as the content. - """ - contents=dict() - _search_message_content(contents, mail) - return contents - -def get_mail_parts(msg): - """ - return a list of all parts of the message as a list of L{MailPart}. - Retrieve parts attributes to fill in L{MailPart} object. - - @type msg: inherit email.message.Message - @param msg: the message - @rtype: list - @returns: list of mail parts - - >>> import email.mime.multipart - >>> msg=email.mime.multipart.MIMEMultipart(boundary='===limit1==') - >>> import email.mime.text - >>> txt=email.mime.text.MIMEText('The text.', 'plain', 'us-ascii') - >>> msg.attach(txt) - >>> import email.mime.image - >>> image=email.mime.image.MIMEImage('data', 'png') - >>> image.add_header('Content-Disposition', 'attachment', filename='image.png') - >>> msg.attach(image) - >>> print msg.as_string(unixfrom=False) - Content-Type: multipart/mixed; boundary="===limit1==" - MIME-Version: 1.0 - - --===limit1== - Content-Type: text/plain; charset="us-ascii" - MIME-Version: 1.0 - Content-Transfer-Encoding: 7bit - - The text. - --===limit1== - Content-Type: image/png - MIME-Version: 1.0 - Content-Transfer-Encoding: base64 - Content-Disposition: attachment; filename="image.png" - - ZGF0YQ== - --===limit1==-- - >>> parts=get_mail_parts(msg) - >>> parts - [MailPart<*text/plain charset=us-ascii len=9>, MailPart] - >>> # the star "*" means this is the mail content, not an attachment - >>> parts[0].get_payload().decode(parts[0].charset) - u'The text.' - >>> parts[1].filename, len(parts[1].get_payload()) - (u'image.png', 4) - - """ - mailparts=[] - - # retrieve messages of the email - contents=search_message_content(msg) - # reverse contents dict - parts=dict((v,k) for k, v in contents.items()) - - # organize the stack to handle deep first search - stack=[ msg, ] - while stack: - part=stack.pop(0) - type=part.get_content_type() - if type.startswith('message/'): - # ('message/delivery-status', 'message/rfc822', 'message/disposition-notification'): - # I don't want to explore the tree deeper her and just save source using msg.as_string() - # but I don't use msg.as_string() because I want to use mangle_from_=False - filename='message.eml' - mailparts.append(MailPart(part, filename=filename, type=type, charset=part.get_param('charset'), description=part.get('Content-Description'))) - elif part.is_multipart(): - # insert new parts at the beginning of the stack (deep first search) - stack[:0]=part.get_payload() - else: - charset=part.get_param('charset') - filename=get_filename(part) - - disposition=None - if part.get_param('inline', None, 'content-disposition')=='': - disposition='inline' - elif part.get_param('attachment', None, 'content-disposition')=='': - disposition='attachment' - - mailparts.append(MailPart(part, filename=filename, type=type, charset=charset, content_id=part.get('Content-Id'), description=part.get('Content-Description'), disposition=disposition, is_body=parts.get(part, False))) - - return mailparts - - -def decode_text(payload, charset, default_charset): - """ - Try to decode text content by trying multiple charset until success. - First try I{charset}, else try I{default_charset} finally - try popular charsets in order : ascii, utf-8, utf-16, windows-1252, cp850 - If all fail then use I{default_charset} and replace wrong characters - - @type payload: str - @param payload: the content to decode - @type charset: str or None - @param charset: the first charset to try if != C{None} - @type default_charset: str or None - @param default_charset: the second charset to try if != C{None} - - @rtype: tuple - @returns: a tuple of the form C{(payload, charset)} - - I{payload}: this is the decoded payload if charset is not None and - payload is a unicode string - - I{charset}: the charset that was used to decode I{payload} If charset is - C{None} then something goes wrong: if I{payload} is unicode then - invalid characters have been replaced and the used charset is I{default_charset} - else, if I{payload} is still byte string then nothing has been done. - - - """ - for chset in [ charset, default_charset, 'ascii', 'utf-8', 'utf-16', 'windows-1252', 'cp850' ]: - if chset: - try: - return payload.decode(chset), chset - except UnicodeError: - pass - - if default_charset: - return payload.decode(chset, 'replace'), None - - return payload, None - -class PyzMessage(email.message.Message): - """ - Inherit from email.message.Message. Combine L{get_mail_parts()}, - L{get_mail_addresses()} and L{decode_mail_header()} into a - B{convenient} object to access mail contents and attributes. - This class also B{sanitize} part filenames. - - @type mailparts: list of L{MailPart} - @ivar mailparts: list of L{MailPart} objects composing the email, I{text_part} - and I{html_part} are part of this list as are other attachements and embedded - contents. - @type text_part: L{MailPart} or None - @ivar text_part: the L{MailPart} object that contains the I{text} - version of the message, None if the mail has not I{text} content. - @type html_part: L{MailPart} or None - @ivar html_part: the L{MailPart} object that contains the I{HTML} - version of the message, None if the mail has not I{HTML} content. - - @note: Sample: - - >>> raw='''Content-Type: text/plain; charset="us-ascii" - ... MIME-Version: 1.0 - ... Content-Transfer-Encoding: 7bit - ... Subject: The subject - ... From: Me - ... To: A , B - ... - ... The text. - ... ''' - >>> msg=PyzMessage.factory(raw) - >>> print 'Subject: %r' % (msg.get_subject(), ) - Subject: u'The subject' - >>> print 'From: %r' % (msg.get_address('from'), ) - From: (u'Me', 'me@foo.com') - >>> print 'To: %r' % (msg.get_addresses('to'), ) - To: [(u'A', 'a@foo.com'), (u'B', 'b@foo.com')] - >>> print 'Cc: %r' % (msg.get_addresses('cc'), ) - Cc: [] - >>> for mailpart in msg.mailparts: - ... print ' %sfilename=%r sanitized_filename=%r type=%s charset=%s desc=%s size=%d' % ('*'if mailpart.is_body else ' ', mailpart.filename, mailpart.sanitized_filename, mailpart.type, mailpart.charset, mailpart.part.get('Content-Description'), 0 if mailpart.get_payload()==None else len(mailpart.get_payload())) - ... if mailpart.is_body=='text/plain': - ... payload, used_charset=decode_text(mailpart.get_payload(), mailpart.charset, None) - ... print ' >', payload.split('\\n')[0] - ... - *filename=None sanitized_filename='text.txt' type=text/plain charset=us-ascii desc=None size=10 - > The text. - """ - - @staticmethod - def smart_parser(input): - """ - Use the appropriate parser and return a email.message.Message object - (this is not a L{PyzMessage} object) - - @type input: string, file, bytes, binary_file or email.message.Message - @param input: the source of the message - @rtype: email.message.Message - @returns: the message - """ - if isinstance(input, email.message.Message): - return input - - if sys.version_info<(3, 0): - # python 2.x - if isinstance(input, str): - return email.message_from_string(input) - elif hasattr(input, 'read') and hasattr(input, 'readline'): - return email.message_from_file(input) - else: - raise ValueError('input must be a string, a file or a Message') - else: - # python 3.x - if isinstance(input, str): - return email.message_from_string(input) - elif isinstance(input, bytes): - # python >= 3.2 only - return email.message_from_bytes(input) - elif hasattr(input, 'read') and hasattr(input, 'readline'): - if hasattr(input, 'encoding'): - # python >= 3.2 only - return email.message_from_file(input) - else: - return email.message_from_binary_file(input) - else: - raise ValueError('input must be a string a bytes, a file or a Message') - - @staticmethod - def factory(input): - """ - Use the appropriate parser and return a L{PyzMessage} object - see L{smart_parser} - @type input: string, file, bytes, binary_file or email.message.Message - @param input: the source of the message - @rtype: L{PyzMessage} - @returns: the L{PyzMessage} message - """ - return PyzMessage(PyzMessage.smart_parser(input)) - - - def __init__(self, message): - """ - Initialize the object with data coming from I{message}. - - @type message: inherit email.message.Message - @param message: The message - """ - if not isinstance(message, email.message.Message): - raise ValueError("message must inherit from email.message.Message use PyzMessage.factory() instead") - self.__dict__.update(message.__dict__) - - self.mailparts=get_mail_parts(self) - self.text_part=None - self.html_part=None - - filenames=[] - for part in self.mailparts: - ext=mimetypes.guess_extension(part.type) - if not ext: - # default to .bin - ext='.bin' - elif ext=='.ksh': - # guess_extension() is not very accurate, .txt is more - # appropriate than .ksh - ext='.txt' - - sanitized_filename=sanitize_filename(part.filename, part.type.split('/', 1)[0], ext) - sanitized_filename=handle_filename_collision(sanitized_filename, filenames) - filenames.append(sanitized_filename.lower()) - part.sanitized_filename=sanitized_filename - - if part.is_body=='text/plain': - self.text_part=part - - if part.is_body=='text/html': - self.html_part=part - - def get_addresses(self, name): - """ - return the I{name} header value as an list of addresses tuple as - returned by L{get_mail_addresses()} - - @type name: str - @param name: the name of the header to read value from: 'to', 'cc' are - valid I{name} here. - @rtype: tuple - @returns: a tuple of the form C{('Sender Name', 'sender.address@domain.com')} - or C{('', '')} if no header match that I{name}. - """ - return get_mail_addresses(self, name) - - def get_address(self, name): - """ - return the I{name} header value as an address tuple as returned by - L{get_mail_addresses()} - - @type name: str - @param name: the name of the header to read value from: : C{'from'} can - be used to return the sender address. - @rtype: list of tuple - @returns: a list of tuple of the form C{[('Recipient Name', 'recipient.address@domain.com'), ...]} - or an empty list if no header match that I{name}. - """ - value=get_mail_addresses(self, name) - if value: - return value[0] - else: - return ('', '') - - def get_subject(self, default=''): - """ - return the RFC2047 decoded subject. - - @type default: any - @param default: The value to return if the message has no I{Subject} - @rtype: unicode - @returns: the subject or C{default} - """ - return self.get_decoded_header('subject', default) - - def get_decoded_header(self, name, default=''): - """ - return decoded header I{name} using RFC2047. Always use this function - to access header, because any header can contain invalid characters - and this function sanitize the string and avoid unicode exception later - in your program. - EVEN for date, I already saw a "Center box bar horizontal" instead - of a minus character. - - @type name: str - @param name: the name of the header to read value from. - @type default: any - @param default: The value to return if the I{name} field don't exist - in this message. - @rtype: unicode - @returns: the value of the header having that I{name} or C{default} if no - header have that name. - """ - value=self.get(name) - if value==None: - value=default - else: - value=decode_mail_header(value) - return value - -class PzMessage(PyzMessage): - """ - Old name and interface for PyzMessage. - B{Deprecated} - """ - - def __init__(self, input): - """ - Initialize the object with data coming from I{input}. - - @type input: str or file or email.message.Message - @param input: used as the raw content for the email, can be a string, - a file object or an email.message.Message object. - """ - PyzMessage.__init__(self, self.smart_parser(input)) - - -def message_from_string(s, *args, **kws): - """ - Parse a string into a L{PyzMessage} object model. - @type s: str - @param s: the input string - @rtype: L{PyzMessage} - @return: the L{PyzMessage} object - """ - return PyzMessage(email.message_from_string(s, *args, **kws)) - -def message_from_file(fp, *args, **kws): - """ - Read a file and parse its contents into a L{PyzMessage} object model. - @type fp: text_file - @param fp: the input file (must be open in text mode if Python >= 3.0) - @rtype: L{PyzMessage} - @return: the L{PyzMessage} object - """ - return PyzMessage(email.message_from_file(fp, *args, **kws)) - -def message_from_bytes(s, *args, **kws): - """ - Parse a bytes string into a L{PyzMessage} object model. - B{(Python >= 3.2)} - @type s: bytes - @param s: the input bytes string - @rtype: L{PyzMessage} - @return: the L{PyzMessage} object - """ - return PyzMessage(email.message_from_bytes(s, *args, **kws)) - -def message_from_binary_file(fp, *args, **kws): - """ - Read a binary file and parse its contents into a L{PyzMessage} object model. - B{(Python >= 3.2)} - @type fp: binary_file - @param fp: the input file, must be open in binary mode - @rtype: L{PyzMessage} - @return: the L{PyzMessage} object - """ - return PyzMessage(email.message_from_binary_file(fp, *args, **kws)) - - -if __name__ == "__main__": - import sys - - if len(sys.argv)<=1: - print('usage : %s filename' % sys.argv[0]) - print('read an email from file and display a resume of its content') - sys.exit(1) - - msg=PyzMessage.factory(open(sys.argv[1], 'rb')) - - print('Subject: %r' % (msg.get_subject(), )) - print('From: %r' % (msg.get_address('from'), )) - print('To: %r' % (msg.get_addresses('to'), )) - print('Cc: %r' % (msg.get_addresses('cc'), )) - print('Date: %r' % (msg.get_decoded_header('date', ''), )) - print('Message-Id: %r' % (msg.get_decoded_header('message-id', ''), )) - - for mailpart in msg.mailparts: - # dont forget to be careful to sanitize 'filename' and be carefull - # for filename collision, to before to save : - print(' %sfilename=%r type=%s charset=%s desc=%s size=%d' % ('*'if mailpart.is_body else ' ', mailpart.filename, mailpart.type, mailpart.charset, mailpart.part.get('Content-Description'), 0 if mailpart.get_payload()==None else len(mailpart.get_payload()))) - - if mailpart.is_body=='text/plain': - # print first 3 lines - payload, used_charset=decode_text(mailpart.get_payload(), mailpart.charset, None) - for line in payload.split('\n')[:3]: - # be careful console can be unable to display unicode characters - if line: - print(' >', line) - - - diff --git a/pyzmail/tests/__init__.py b/pyzmail/tests/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/pyzmail/tests/test_both.py b/pyzmail/tests/test_both.py deleted file mode 100644 index 2607ed8d7f..0000000000 --- a/pyzmail/tests/test_both.py +++ /dev/null @@ -1,99 +0,0 @@ -import unittest -import pyzmail -from pyzmail.generate import * -from pyzmail.parse import * - -class TestBoth(unittest.TestCase): - - def setUp(self): - pass - - def test_compose_and_parse(self): - """test generate and parse""" - - sender=('Me', 'me@foo.com') - recipients=[('Him', 'him@bar.com'), 'just@me.com'] - subject='Le sujet en Fran\xe7ais' - text_content='Bonjour aux Fran\xe7ais' - prefered_encoding='iso-8859-1' - text_encoding='iso-8859-1' - attachments=[('attached content', 'text', 'plain', 'textfile1.txt', 'us-ascii'), - ('Fran\xe7ais', 'text', 'plain', 'textfile2.txt', 'iso-8859-1'), - ('Fran\xe7ais', 'text', 'plain', 'textfile3.txt', 'iso-8859-1'), - (b'image', 'image', 'jpg', 'imagefile.jpg', None), - ] - embeddeds=[('embedded content', 'text', 'plain', 'embedded', 'us-ascii'), - (b'picture', 'image', 'png', 'picture', None), - ] - headers=[ ('X-extra', 'extra value'), ('X-extra2', "Seconde ent\xe8te"), ('X-extra3', 'last extra'),] - - message_id_string='pyzmail' - date=1313558269 - - payload, mail_from, rcpt_to, msg_id=pyzmail.compose_mail(\ - sender, \ - recipients, \ - subject, \ - prefered_encoding, \ - (text_content, text_encoding), \ - html=None, \ - attachments=attachments, \ - embeddeds=embeddeds, \ - headers=headers, \ - message_id_string=message_id_string, \ - date=date\ - ) - - msg=PyzMessage.factory(payload) - - self.assertEqual(sender, msg.get_address('from')) - self.assertEqual(recipients[0], msg.get_addresses('to')[0]) - self.assertEqual(recipients[1], msg.get_addresses('to')[1][1]) - self.assertEqual(subject, msg.get_subject()) - self.assertEqual(subject, msg.get_decoded_header('subject')) - - # try to handle different timezone carefully - mail_date=list(email.utils.parsedate(msg.get_decoded_header('date'))) - self.assertEqual(mail_date[:6], list(time.localtime(date))[:6]) - - self.assertNotEqual(msg.get('message-id').find(message_id_string), -1) - for name, value in headers: - self.assertEqual(value, msg.get_decoded_header(name)) - - for mailpart in msg.mailparts: - if mailpart.is_body: - self.assertEqual(mailpart.content_id, None) - self.assertEqual(mailpart.filename, None) - self.assertEqual(type(mailpart.sanitized_filename), str) - if mailpart.type=='text/plain': - self.assertEqual(mailpart.get_payload(), text_content.encode(text_encoding)) - else: - self.fail('found unknown body part') - else: - if mailpart.filename: - lst=attachments - self.assertEqual(mailpart.filename, mailpart.sanitized_filename) - self.assertEqual(mailpart.content_id, None) - elif mailpart.content_id: - lst=embeddeds - self.assertEqual(mailpart.filename, None) - else: - self.fail('found unknown part') - - found=False - for attach in lst: - found=(mailpart.filename and attach[3]==mailpart.filename) \ - or (mailpart.content_id and attach[3]==mailpart.content_id) - if found: - break - - if found: - self.assertEqual(mailpart.type, attach[1]+'/'+attach[2]) - payload=mailpart.get_payload() - if attach[1]=='text' and attach[4] and isinstance(attach[0], str): - payload=payload.decode(attach[4]) - self.assertEqual(payload, attach[0]) - else: - self.fail('found unknown attachment') - - diff --git a/pyzmail/tests/test_generate.py b/pyzmail/tests/test_generate.py deleted file mode 100644 index 7afdebc593..0000000000 --- a/pyzmail/tests/test_generate.py +++ /dev/null @@ -1,30 +0,0 @@ -import unittest, doctest -import pyzmail -from pyzmail.generate import * - -class TestGenerate(unittest.TestCase): - - def setUp(self): - pass - - def test_format_addresses(self): - """test format_addresse""" - self.assertEqual('foo@example.com', str(format_addresses([ 'foo@example.com', ]))) - self.assertEqual('Foo ', str(format_addresses([ ('Foo', 'foo@example.com'), ]))) - # notice the space around the comma - self.assertEqual('foo@example.com , bar@example.com', str(format_addresses([ 'foo@example.com', 'bar@example.com']))) - # notice the space around the comma - self.assertEqual('Foo , Bar ', str(format_addresses([ ('Foo', 'foo@example.com'), ( 'Bar', 'bar@example.com')]))) - -# Add doctest -def load_tests(loader, tests, ignore): - # this works with python 2.7 and 3.x - tests.addTests(doctest.DocTestSuite(pyzmail.generate)) - return tests - -def additional_tests(): - # Add doctest for python 2.6 and below - if sys.version_info<(2, 7): - return doctest.DocTestSuite(pyzmail.generate) - else: - return unittest.TestSuite() diff --git a/pyzmail/tests/test_parse.py b/pyzmail/tests/test_parse.py deleted file mode 100644 index f7a5adb9e2..0000000000 --- a/pyzmail/tests/test_parse.py +++ /dev/null @@ -1,290 +0,0 @@ -import unittest, doctest -import pyzmail -from pyzmail.parse import * - - -class Msg: - """mimic a email.Message""" - def __init__(self, value): - self.value=value - - def get_all(self, header_name, default): - if self.value: - return [self.value, ] - else: - return [] - -class TestParse(unittest.TestCase): - - def setUp(self): - pass - - def test_decode_mail_header(self): - """test decode_mail_header()""" - self.assertEqual(decode_mail_header(''), '') - self.assertEqual(decode_mail_header('hello'), 'hello') - self.assertEqual(decode_mail_header('hello '), 'hello ') - self.assertEqual(decode_mail_header('=?iso-8859-1?q?Courrier_=E8lectronique_Fran=E7ais?='), 'Courrier \xe8lectronique Fran\xe7ais') - self.assertEqual(decode_mail_header('=?utf8?q?Courrier_=C3=A8lectronique_Fran=C3=A7ais?='), 'Courrier \xe8lectronique Fran\xe7ais') - self.assertEqual(decode_mail_header('=?utf-8?b?RnJhbsOnYWlz?='), 'Fran\xe7ais') - self.assertEqual(decode_mail_header('=?iso-8859-1?q?Courrier_=E8lectronique_?= =?utf8?q?Fran=C3=A7ais?='), 'Courrier \xe8lectronique Fran\xe7ais') - self.assertEqual(decode_mail_header('=?iso-8859-1?q?Courrier_=E8lectronique_?= =?utf-8?b?RnJhbsOnYWlz?='), 'Courrier \xe8lectronique Fran\xe7ais') - self.assertEqual(decode_mail_header('h_subject_q_iso_8858_1 : =?ISO-8859-1?Q?Fran=E7ais=E20accentu=E9?= !'), 'h_subject_q_iso_8858_1 :Fran\xe7ais\xe20accentu\xe9!') - - def test_get_mail_addresses(self): - """test get_mail_addresses()""" - self.assertEqual([ ('foo@example.com', 'foo@example.com') ], get_mail_addresses(Msg('foo@example.com'), 'to')) - self.assertEqual([ ('Foo', 'foo@example.com'), ], get_mail_addresses(Msg('Foo '), 'to')) - # notice the space around the comma - self.assertEqual([ ('foo@example.com', 'foo@example.com'), ('bar@example.com', 'bar@example.com')], get_mail_addresses(Msg('foo@example.com , bar@example.com'), 'to')) - self.assertEqual([ ('Foo', 'foo@example.com'), ( 'Bar', 'bar@example.com')], get_mail_addresses(Msg('Foo , Bar '), 'to')) - self.assertEqual([ ('Foo', 'foo@example.com'), ('bar@example.com', 'bar@example.com')], get_mail_addresses(Msg('Foo , bar@example.com'), 'to')) - self.assertEqual([ ('Mr Foo', 'foo@example.com'), ('bar@example.com', 'bar@example.com')], get_mail_addresses(Msg('Mr\nFoo , bar@example.com'), 'to')) - - self.assertEqual([ ('Beno\xeet', 'benoit@example.com')], get_mail_addresses(Msg('=?utf-8?q?Beno=C3=AEt?= '), 'to')) - - # address already encoded into utf8 (bad) - address='Ant\xf3nio Foo '.encode('utf8') - if sys.version_info<(3, 0): - self.assertEqual([('Ant\ufffd\ufffdnio Foo', 'a.foo@example.com')], get_mail_addresses(Msg(address), 'to')) - else: - # Python 3.2 return header when surrogate characters are used in header - self.assertEqual([('Ant??nio Foo', 'a.foo@example.com'), ], get_mail_addresses(Msg(email.header.Header(address, charset=email.charset.UNKNOWN8BIT, header_name='to')), 'to')) - - def test_get_filename(self): - """test get_filename()""" - import email.mime.image - - filename='Fran\xe7ais.png' - if sys.version_info<(3, 0): - encoded_filename=filename.encode('iso-8859-1') - else: - encoded_filename=filename - - payload=b'data' - attach=email.mime.image.MIMEImage(payload, 'png') - attach.add_header('Content-Disposition', 'attachment', filename='image.png') - self.assertEqual('image.png', get_filename(attach)) - - attach=email.mime.image.MIMEImage(payload, 'png') - attach.add_header('Content-Disposition', 'attachment', filename=('iso-8859-1', 'fr', encoded_filename)) - self.assertEqual('Fran\xe7ais.png', get_filename(attach)) - - attach=email.mime.image.MIMEImage(payload, 'png') - attach.set_param('name', 'image.png') - self.assertEqual('image.png', get_filename(attach)) - - attach=email.mime.image.MIMEImage(payload, 'png') - attach.set_param('name', ('iso-8859-1', 'fr', encoded_filename)) - self.assertEqual('Fran\xe7ais.png', get_filename(attach)) - - attach=email.mime.image.MIMEImage(payload, 'png') - attach.add_header('Content-Disposition', 'attachment', filename='image.png') - attach.set_param('name', 'image_wrong.png') - self.assertEqual('image.png', get_filename(attach)) - - def test_get_mailparts(self): - """test get_mailparts()""" - import email.mime.multipart - import email.mime.text - import email.mime.image - msg=email.mime.multipart.MIMEMultipart(boundary='===limit1==') - txt=email.mime.text.MIMEText('The text.', 'plain', 'us-ascii') - msg.attach(txt) - image=email.mime.image.MIMEImage(b'data', 'png') - image.add_header('Content-Disposition', 'attachment', filename='image.png') - image.add_header('Content-Description', 'the description') - image.add_header('Content-ID', '') - msg.attach(image) - - raw=msg.as_string(unixfrom=False) - expected_raw="""Content-Type: multipart/mixed; boundary="===limit1==" -MIME-Version: 1.0 - ---===limit1== -Content-Type: text/plain; charset="us-ascii" -MIME-Version: 1.0 -Content-Transfer-Encoding: 7bit - -The text. ---===limit1== -Content-Type: image/png -MIME-Version: 1.0 -Content-Transfer-Encoding: base64 -Content-Disposition: attachment; filename="image.png" -Content-Description: the description -Content-ID: - -ZGF0YQ== ---===limit1==--""" - - if sys.version_info<(3, 0): - expected_raw=expected_raw.replace('','') - else: - expected_raw=expected_raw.replace('','\n') - - self.assertEqual(raw, expected_raw) - - parts=get_mail_parts(msg) - # [MailPart<*text/plain charset=us-ascii len=9>, MailPart] - - self.assertEqual(len(parts), 2) - - self.assertEqual(parts[0].type, 'text/plain') - self.assertEqual(parts[0].is_body, 'text/plain') # not a error, is_body must be type - self.assertEqual(parts[0].charset, 'us-ascii') - self.assertEqual(parts[0].get_payload().decode(parts[0].charset), 'The text.') - - self.assertEqual(parts[1].type, 'image/png') - self.assertEqual(parts[1].is_body, False) - self.assertEqual(parts[1].charset, None) - self.assertEqual(parts[1].filename, 'image.png') - self.assertEqual(parts[1].description, 'the description') - self.assertEqual(parts[1].content_id, 'this.is.the.normaly.unique.contentid') - self.assertEqual(parts[1].get_payload(), b'data') - - - raw_1='''Content-Type: text/plain; charset="us-ascii" -MIME-Version: 1.0 -Content-Transfer-Encoding: 7bit -Subject: simple test -From: Me -To: A , B -Cc: C , d@foo.com -User-Agent: pyzmail - -The text. -''' - - def check_message_1(self, msg): - self.assertEqual(msg.get_subject(), 'simple test') - self.assertEqual(msg.get_decoded_header('subject'), 'simple test') - self.assertEqual(msg.get_decoded_header('User-Agent'), 'pyzmail') - self.assertEqual(msg.get('User-Agent'), 'pyzmail') - self.assertEqual(msg.get_address('from'), ('Me', 'me@foo.com')) - self.assertEqual(msg.get_addresses('to'), [('A', 'a@foo.com'), ('B', 'b@foo.com')]) - self.assertEqual(msg.get_addresses('cc'), [('C', 'c@foo.com'), ('d@foo.com', 'd@foo.com')]) - self.assertEqual(len(msg.mailparts), 1) - self.assertEqual(msg.text_part, msg.mailparts[0]) - self.assertEqual(msg.html_part, None) - - # use 8bits encoding and 2 different charsets ! python 3.0 & 3.1 are not eable to parse this sample - raw_2=b"""From: sender@domain.com -To: recipient@domain.com -Date: Tue, 7 Jun 2011 16:32:17 +0200 -Subject: contains 8bits attachments using different encoding -Content-Type: multipart/mixed; boundary=mixed - ---mixed -Content-Type: text/plain; charset="us-ascii" -MIME-Version: 1.0 -Content-Transfer-Encoding: 7bit - -body ---mixed -Content-Type: text/plain; charset="windows-1252" -MIME-Version: 1.0 -Content-Transfer-Encoding: 8bit -Content-Disposition: attachment; filename="file1.txt" - -bo\xeete mail = mailbox ---mixed -Content-Type: text/plain; charset="utf-8" -MIME-Version: 1.0 -Content-Transfer-Encoding: 8bit -Content-Disposition: attachment; filename="file2.txt" - -bo\xc3\xaete mail = mailbox ---mixed-- -""" - - def check_message_2(self, msg): - self.assertEqual(msg.get_subject(), 'contains 8bits attachments using different encoding') - - body, file1, file2=msg.mailparts - - self.assertEqual('file1.txt', file1.filename) - self.assertEqual('file2.txt', file2.filename) - self.assertEqual('windows-1252', file1.charset) - self.assertEqual('utf-8', file2.charset) - content=b'bo\xeete mail = mailbox'.decode("windows-1252") - content1=file1.get_payload().decode(file1.charset) - content2=file2.get_payload().decode(file2.charset) - self.assertEqual(content, content1) - self.assertEqual(content, content2) - - # this one contain non us-ascii chars in the header - # py 2x and py3k return different value here - raw_3=b'Content-Type: text/plain; charset="us-ascii"\n' \ - b'MIME-Version: 1.0\n' \ - b'Content-Transfer-Encoding: 7bit\n' \ - + 'Subject: Beno\xeet & Ant\xf3nio\n'.encode('utf8') +\ - b'From: =?utf-8?q?Beno=C3=AEt?= \n' \ - + 'To: Ant\xf3nio Foo \n'.encode('utf8') \ - + 'Cc: Beno\xeet , d@foo.com\n'.encode('utf8') +\ - b'User-Agent: pyzmail\n' \ - b'\n' \ - b'The text.\n' - - def check_message_3(self, msg): - subject='Beno\ufffd\ufffdt & Ant\ufffd\ufffdnio' # if sys.version_info<(3, 0) else u'Beno??t & Ant??nio' - self.assertEqual(msg.get_subject(), subject) - self.assertEqual(msg.get_decoded_header('subject'), subject) - self.assertEqual(msg.get_decoded_header('User-Agent'), 'pyzmail') - self.assertEqual(msg.get('User-Agent'), 'pyzmail') - self.assertEqual(msg.get_address('from'), ('Beno\xeet', 'benoit@example.com')) - - to=msg.get_addresses('to') - self.assertEqual(to[0][1], 'a.foo@example.com') - self.assertEqual(to[0][0], 'Ant\ufffd\ufffdnio Foo' if sys.version_info<(3, 0) else 'Ant??nio Foo') - - cc=msg.get_addresses('cc') - self.assertEqual(cc[0][1], 'benoit@foo.com') - self.assertEqual(cc[0][0], 'Beno\ufffd\ufffdt' if sys.version_info<(3, 0) else 'Beno??t') - self.assertEqual(cc[1], ('d@foo.com', 'd@foo.com')) - - self.assertEqual(len(msg.mailparts), 1) - self.assertEqual(msg.text_part, msg.mailparts[0]) - self.assertEqual(msg.html_part, None) - - - def check_pyzmessage_factories(self, input, check): - """test PyzMessage from different sources""" - if isinstance(input, bytes) and sys.version_info>=(3, 2): - check(PyzMessage.factory(input)) - check(message_from_bytes(input)) - - import io - check(PyzMessage.factory(io.BytesIO(input))) - check(message_from_binary_file(io.BytesIO(input))) - - if isinstance(input, str): - - check(PyzMessage.factory(input)) - check(message_from_string(input)) - - import io - check(PyzMessage.factory(io.StringIO(input))) - check(message_from_file(io.StringIO(input))) - - def test_pyzmessage_factories(self): - """test PyzMessage class different sources""" - self.check_pyzmessage_factories(self.raw_1, self.check_message_1) - self.check_pyzmessage_factories(self.raw_2, self.check_message_2) - self.check_pyzmessage_factories(self.raw_3, self.check_message_3) - - -# Add doctest -def load_tests(loader, tests, ignore): - # this works with python 2.7 and 3.x - if sys.version_info<(3, 0): - tests.addTests(doctest.DocTestSuite(pyzmail.parse)) - return tests - -def additional_tests(): - # Add doctest for python 2.6 and below - if sys.version_info<(2, 7): - return doctest.DocTestSuite(pyzmail.parse) - else: - return unittest.TestSuite() - diff --git a/pyzmail/tests/test_send.py b/pyzmail/tests/test_send.py deleted file mode 100644 index 554f8549a4..0000000000 --- a/pyzmail/tests/test_send.py +++ /dev/null @@ -1,77 +0,0 @@ -import threading, smtpd, asyncore, socket, smtplib, time -import unittest -import pyzmail -from pyzmail.generate import * - - -smtpd_addr='127.0.0.1' -smtpd_port=32525 -smtp_bad_port=smtpd_port-1 - -smtp_mode='normal' -smtp_login=None -smtp_password=None - - -class SMTPServer(smtpd.SMTPServer): - def __init__(self, localaddr, remoteaddr, received): - smtpd.SMTPServer.__init__(self, localaddr, remoteaddr) - self.set_reuse_addr() - # put the received mail into received list - self.received=received - - def process_message(self, peer, mail_from, rcpt_to, data): - ret=None - if mail_from.startswith('data_error'): - ret='552 Requested mail action aborted: exceeded storage allocation' - self.received.append((ret, peer, mail_from, rcpt_to, data)) - return ret - -class TestSend(unittest.TestCase): - - def setUp(self): - self.received=[] - self.smtp_server=SMTPServer((smtpd_addr, smtpd_port), None, self.received) - - def asyncloop(): - # check every sec if all channel are close - asyncore.loop(1) - - - self.payload, self.mail_from, self.rcpt_to, self.msg_id=compose_mail(('Me', 'me@foo.com'), [('Him', 'him@bar.com')], 'the subject', 'iso-8859-1', ('Hello world', 'us-ascii')) - - # start the server after having built the payload, to handle failure in - # the code above - self.smtpd_thread=threading.Thread(target=asyncloop) - self.smtpd_thread.daemon=True - self.smtpd_thread.start() - - - def tearDown(self): - self.smtp_server.close() - self.smtpd_thread.join() - - def test_simple_send(self): - """simple send""" - ret=send_mail(self.payload, self.mail_from, self.rcpt_to, smtpd_addr, smtpd_port, smtp_mode=smtp_mode, smtp_login=smtp_login, smtp_password=smtp_password) - self.assertEqual(ret, dict()) - (ret, peer, mail_from, rcpt_to, payload)=self.received[0] - self.assertEqual(self.payload, payload) - self.assertEqual(self.mail_from, mail_from) - self.assertEqual(self.rcpt_to, rcpt_to) - self.assertEqual('127.0.0.1', peer[0]) - - def test_send_to_a_wrong_port(self): - """send to a wrong port""" - self.smtp_server.close() - ret=send_mail(self.payload, self.mail_from, self.rcpt_to, smtpd_addr, smtpd_port, smtp_mode=smtp_mode, smtp_login=smtp_login, smtp_password=smtp_password) - self.assertEqual(type(ret), str) - - def test_send_data_error(self): - """smtp server return error code""" - ret=send_mail(self.payload, 'data_error@foo.com', self.rcpt_to, smtpd_addr, smtp_bad_port, smtp_mode=smtp_mode, smtp_login=smtp_login, smtp_password=smtp_password) - self.assertEqual(type(ret), str) - -if __name__ == '__main__': - unittest.main() - diff --git a/pyzmail/tests/test_utils.py b/pyzmail/tests/test_utils.py deleted file mode 100644 index e03d07d0d9..0000000000 --- a/pyzmail/tests/test_utils.py +++ /dev/null @@ -1,24 +0,0 @@ -import unittest, doctest -import pyzmail -from pyzmail.utils import * - -class TestUtils(unittest.TestCase): - - def setUp(self): - pass - - def test_nothing(self): - pass - -# Add doctest -def load_tests(loader, tests, ignore): - # this works with python 2.7 and 3.x - tests.addTests(doctest.DocTestSuite(pyzmail.utils)) - return tests - -def additional_tests(): - # Add doctest for python 2.6 and below - if sys.version_info<(2, 7): - return doctest.DocTestSuite(pyzmail.utils) - else: - return unittest.TestSuite() diff --git a/pyzmail/utils.py b/pyzmail/utils.py deleted file mode 100644 index 436e2a4c34..0000000000 --- a/pyzmail/utils.py +++ /dev/null @@ -1,155 +0,0 @@ -# -# pyzmail/utils.py -# (c) Alain Spineux -# http://www.magiksys.net/pyzmail -# Released under LGPL - -""" -Various functions used by other modules -@var invalid_chars_in_filename: a mix of characters not permitted in most used filesystems -@var invalid_windows_name: a list of unauthorized filenames under Windows -""" - -import sys - -invalid_chars_in_filename=b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f' \ - b'\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f' \ - b'<>:"/\\|?*%\'' - -invalid_windows_name=[b'CON', b'PRN', b'AUX', b'NUL', b'COM1', b'COM2', b'COM3', - b'COM4', b'COM5', b'COM6', b'COM7', b'COM8', b'COM9', - b'LPT1', b'LPT2', b'LPT3', b'LPT4', b'LPT5', b'LPT6', b'LPT7', - b'LPT8', b'LPT9' ] - -def sanitize_filename(filename, alt_name, alt_ext): - """ - Convert the given filename into a name that should work on all - platform. Remove non us-ascii characters, and drop invalid filename. - Use the I{alternative} filename if needed. - - @type filename: unicode or None - @param filename: the originale filename or None. Can be unicode. - @type alt_name: str - @param alt_name: the alternative filename if filename is None or useless - @type alt_ext: str - @param alt_ext: the alternative filename extension (including the '.') - - @rtype: str - @returns: a valid filename. - - >>> sanitize_filename('document.txt', 'file', '.txt') - 'document.txt' - >>> sanitize_filename('number1.txt', 'file', '.txt') - 'number1.txt' - >>> sanitize_filename(None, 'file', '.txt') - 'file.txt' - >>> sanitize_filename(u'R\\xe9pertoir.txt', 'file', '.txt') - 'Rpertoir.txt' - >>> # the '\\xe9' has been removed - >>> sanitize_filename(u'\\xe9\\xe6.html', 'file', '.txt') - 'file.html' - >>> # all non us-ascii characters have been removed, the alternative name - >>> # has been used the replace empty string. The originale extention - >>> # is still valid - >>> sanitize_filename(u'COM1.txt', 'file', '.txt') - 'COM1A.txt' - >>> # if name match an invalid name or assimilated then a A is added - """ - - if not filename: - return alt_name+alt_ext - - if ((sys.version_info<(3, 0) and isinstance(filename, str)) or \ - (sys.version_info>=(3, 0) and isinstance(filename, str))): - filename=filename.encode('ascii', 'ignore') - - filename=filename.translate(None, invalid_chars_in_filename) - filename=filename.strip() - - upper=filename.upper() - for name in invalid_windows_name: - if upper==name: - filename=filename+b'A' - break - if upper.startswith(name+b'.'): - filename=filename[:len(name)]+b'A'+filename[len(name):] - break - - if sys.version_info>=(3, 0): - # back to string - filename=filename.decode('us-ascii') - - if filename.rfind('.')==0: - filename=alt_name+filename - - return filename - -def handle_filename_collision(filename, filenames): - """ - Avoid filename collision, add a sequence number to the name when required. - 'file.txt' will be renamed into 'file-01.txt' then 'file-02.txt' ... - until their is no more collision. The file is not added to the list. - - Windows don't make the difference between lower and upper case. To avoid - "case" collision, the function compare C{filename.lower()} to the list. - If you provide a list in lower case only, then any collisions will be avoided. - - @type filename: str - @param filename: the filename - @type filenames: list or set - @param filenames: a list of filenames. - - @rtype: str - @returns: the I{filename} or the appropriately I{indexed} I{filename} - - >>> handle_filename_collision('file.txt', [ ]) - 'file.txt' - >>> handle_filename_collision('file.txt', [ 'file.txt' ]) - 'file-01.txt' - >>> handle_filename_collision('file.txt', [ 'file.txt', 'file-01.txt',]) - 'file-02.txt' - >>> handle_filename_collision('foo', [ 'foo',]) - 'foo-01' - >>> handle_filename_collision('foo', [ 'foo', 'foo-01',]) - 'foo-02' - >>> handle_filename_collision('FOO', [ 'foo', 'foo-01',]) - 'FOO-02' - """ - if filename.lower() in filenames: - try: - basename, ext=filename.rsplit('.', 1) - ext='.'+ext - except ValueError: - basename, ext=filename, '' - - i=1 - while True: - filename='%s-%02d%s' % (basename, i, ext) - if filename.lower() not in filenames: - break - i+=1 - - return filename - -def is_usascii(value): - """" - test if string contains us-ascii characters only - - >>> is_usascii('foo') - True - >>> is_usascii(u'foo') - True - >>> is_usascii(u'Fran\xe7ais') - False - >>> is_usascii('bad\x81') - False - """ - try: - # if value is byte string, it will be decoded first using us-ascii - # and will generate UnicodeEncodeError, this is fine too - value.encode('us-ascii') - except UnicodeError: - return False - - return True - \ No newline at end of file diff --git a/pyzmail/version.py b/pyzmail/version.py deleted file mode 100644 index 03e1b0c6dc..0000000000 --- a/pyzmail/version.py +++ /dev/null @@ -1 +0,0 @@ -__version__='1.0.3'