diff --git a/validate_email.py b/validate_email.py index 0f18e3e..1bf8007 100644 --- a/validate_email.py +++ b/validate_email.py @@ -95,6 +95,7 @@ class ServerError(Exception): MX_DNS_CACHE = {} MX_CHECK_CACHE = {} +logger = logging.getLogger('validate_email') def get_mx_ip(hostname): if hostname not in MX_DNS_CACHE: @@ -109,7 +110,25 @@ def get_mx_ip(hostname): return MX_DNS_CACHE[hostname] -def validate_email(email, check_mx=False, verify=False, debug=False, smtp_timeout=10): +def check_command(result_tuple, server_name='server', ok_codes=None, debug=False): + status, mes = result_tuple + if not ok_codes: + ok_codes = [250] + if status not in ok_codes: + if debug: + logger.debug(u'%s answer: %s - %s', server_name, status, mes) + return False + return True + + +def check_command_for_server(server_name): + def wrapper(*args, **kwargs): + kwargs['server_name'] = server_name + return check_command(*args, **kwargs) + return wrapper + + +def validate_email(email, check_mx=False, verify=False, debug=False, mail_from='', smtp_timeout=10): """Indicate whether the given string is a valid email address according to the 'addr-spec' portion of RFC 2822 (see section 3.4.1). Parts of the spec that are marked obsolete are *not* @@ -118,10 +137,7 @@ def validate_email(email, check_mx=False, verify=False, debug=False, smtp_timeou general this should correctly identify any email address likely to be in use as of 2011.""" if debug: - logger = logging.getLogger('validate_email') logger.setLevel(logging.DEBUG) - else: - logger = None try: assert re.match(VALID_ADDRESS_REGEXP, email) is not None @@ -136,38 +152,37 @@ def validate_email(email, check_mx=False, verify=False, debug=False, smtp_timeou return False for mx in mx_hosts: try: - if not verify and mx[1] in MX_CHECK_CACHE: - return MX_CHECK_CACHE[mx[1]] + server_name = mx[1] + check = check_command_for_server(server_name) + if not verify and server_name in MX_CHECK_CACHE: + return MX_CHECK_CACHE[server_name] smtp = smtplib.SMTP(timeout=smtp_timeout) - smtp.connect(mx[1]) - MX_CHECK_CACHE[mx[1]] = True + smtp.connect(server_name) + MX_CHECK_CACHE[server_name] = True if not verify: - try: - smtp.quit() - except smtplib.SMTPServerDisconnected: - pass return True - status, _ = smtp.helo() - if status != 250: - smtp.quit() - if debug: - logger.debug(u'%s answer: %s - %s', mx[1], status, _) + + if not check(smtp.helo()): continue - smtp.mail('') - status, _ = smtp.rcpt(email) - if status == 250: - smtp.quit() + + if not check(smtp.mail(mail_from)): + continue + + if check(smtp.rcpt(email)): return True - if debug: - logger.debug(u'%s answer: %s - %s', mx[1], status, _) - smtp.quit() except smtplib.SMTPServerDisconnected: # Server not permits verify user if debug: logger.debug(u'%s disconected.', mx[1]) except smtplib.SMTPConnectError: if debug: logger.debug(u'Unable to connect to %s.', mx[1]) - return None + finally: + try: + smtp.quit() + except smtplib.SMTPServerDisconnected: + pass + + return False except AssertionError: return False except (ServerError, socket.error) as e: @@ -193,9 +208,11 @@ def validate_email(email, check_mx=False, verify=False, debug=False, smtp_timeou else: validate = False + sender_address = raw_input('Use sender address? [] ').strip() + logging.basicConfig() - result = validate_email(email, mx, validate, debug=True, smtp_timeout=1) + result = validate_email(email, mx, validate, debug=True, mail_from=sender_address, smtp_timeout=1) if result: print("Valid!") elif result is None: