forked from jjordanbaird/EmailVectorDB
-
Notifications
You must be signed in to change notification settings - Fork 0
/
email_fetcher.py
87 lines (73 loc) · 3.27 KB
/
email_fetcher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import imaplib
import email
import os
import json
from dotenv import load_dotenv
from email.header import decode_header, make_header
import logging
from datetime import datetime
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class EmailFetcher:
"""
EmailFetcher is a class to fetch emails from a specified email address.
It connects to the Gmail server using IMAP, searches for emails from a specific sender,
and retrieves the email content.
"""
def __init__(self, email_address: str, password: str = None) -> None:
if not password:
load_dotenv()
self.password = password or os.getenv('EMAIL_PASSWORD')
else:
self.password = password
self.email_address = email_address
self.mail = None
def connect(self) -> None:
self.mail = imaplib.IMAP4_SSL('imap.gmail.com')
self.mail.login(self.email_address, self.password)
self.mail.select('inbox')
def disconnect(self) -> None:
self.mail.logout()
def _get_text_from_email(self, msg) -> bytes:
for part in msg.walk():
if part.get_content_type() == 'text/plain':
return part.get_payload(decode=True)
def load_existing_emails(self, path) -> list[dict]:
if os.path.exists(path):
with open(path, 'r') as f:
existing_emails = json.load(f)
else:
existing_emails = []
logging.info(f"Loaded {len(existing_emails)} existing emails")
return existing_emails
def get_message_ids(self, mail, email_ids):
message_ids = {}
for email_id in email_ids:
_, response = mail.fetch(email_id, '(BODY.PEEK[HEADER.FIELDS (MESSAGE-ID)])')
message_id_header = response[0][1].decode()
message_id = message_id_header.strip().split(': ')[1]
message_ids[email_id] = message_id
return message_ids
def fetch_emails(self, sender_email: str, processed_email_output_path: str) -> list[dict]:
status, response = self.mail.search(None, f'FROM "{sender_email}"')
email_binary_ids = response[0].split()
email_ids = self.get_message_ids(self.mail, email_binary_ids)
email_list = []
existing_emails = self.load_existing_emails(processed_email_output_path)
existing_ids = {email['id'] for email in existing_emails}
new_ids = [k for k, v in email_ids.items() if v not in existing_ids]
logging.info(f"Found {len(new_ids)} new emails")
for i, email_id in enumerate(new_ids):
email_data = {}
_, msg_data = self.mail.fetch(email_id, '(RFC822)')
msg = email.message_from_bytes(msg_data[0][1])
if msg['Message-ID'] in existing_ids:
continue
email_data['id'] = msg['Message-ID']
email_data['subject'] = str(make_header(decode_header(msg['Subject'])))
email_data['from'] = msg['From']
email_data['date'] = datetime.strptime(msg['date'], "%a, %d %b %Y %H:%M:%S %z").isoformat()
body = self._get_text_from_email(msg)
if body:
email_data['body'] = body.decode()
email_list.append(email_data)
return email_list