"""
title: Email Access
author: RobbyV2
date: 2025-03-02
version: 2.0
license: MIT
description: A tool for interacting with emails, using IMAP and SMTP.
#requirements: smtplib, email, os, json, markdown
Uncomment above if you have library issues
"""
import smtplib
import imaplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import email
from email.parser import BytesParser, Parser
from email.policy import default
from typing import List, Dict, Any, Union, Optional, Callable, Awaitable
import os
import json
from pydantic import BaseModel, Field
from datetime import datetime
import markdown
class EventEmitter:
def __init__(self, event_emitter: Callable[[dict], Any] = None):
self.event_emitter = event_emitter
async def progress_update(self, description):
await self.emit(description)
async def error_update(self, description):
await self.emit(description, "error", True)
async def success_update(self, description):
await self.emit(description, "success", True)
async def emit(self, description="Unknown State", status="in_progress", done=False):
if self.event_emitter:
await self.event_emitter(
{
"type": "status",
"data": {
"status": status,
"description": description,
"done": done,
},
}
)
class Tools:
class Valves(BaseModel):
FROM_EMAIL: str = Field(
default="
[email protected]",
description="The email a LLM can use",
)
PASSWORD: str = Field(
default="my_secure_password",
description="The password for the provided email address",
)
SERVER_URL: str = Field(
default="smtp.google.com",
description="The URL of the mail server",
)
CITATION: bool = Field(
default=True,
description="Enable or disable email citations in chat",
)
def __init__(self):
self.valves = self.Valves()
self.citation = self.valves.CITATION
def get_user_name_and_email_and_id(self, __user__: dict = {}) -> str:
"""
Get the user name, Email and ID from the user object.
"""
# Do not include :param for __user__ in the docstring as it should not be shown in the tool's specification
# The session user object will be passed as a parameter when the function is called
print(__user__)
result = ""
if "name" in __user__:
result += f"User: {__user__['name']}"
if "id" in __user__:
result += f" (ID: {__user__['id']})"
if "email" in __user__:
result += f" (Email: {__user__['email']})"
if result == "":
result = "User: Unknown"
return result
def markdown_to_html(self, markdown_text: str) -> str:
"""
Convert markdown text to HTML for email rendering.
"""
try:
html = markdown.markdown(markdown_text)
return f"""
<html>
<head>
<style>
body {{ font-family: Arial, sans-serif; line-height: 1.5; }}
</style>
</head>
<body>
{html}
</body>
</html>
"""
except Exception:
return f"<html><body><pre>{markdown_text}</pre></body></html>"
async def send_email(
self,
subject: str,
body: str,
recipients: List[str],
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""
YOU HAVE DIRECT ACCESS TO SEND REAL EMAILS. This is not a draft or simulation - it sends actual emails.
When you call this function, you are DIRECTLY sending a real email from the account you have access to.
The email will be sent immediately through the connected mail server with no human intervention.
MARKDOWN FORMATTING:
You can use markdown formatting in the email body, which will be automatically rendered as HTML:
- **Bold text** or __bold text__
- *Italic text* or _italic text_
- [Links](https://example.com)
- Lists (bullet points with * or - and numbered with 1., 2., etc.)
- Headers with # or ## or ###
- > Blockquotes
- Code blocks with ```
IMPORTANT WORKFLOW:
1. Confirm with the user exactly what they want to send and to whom
2. Draft the COMPLETE email content for the user's approval, using markdown formatting if desired
3. Get explicit permission before sending
4. After permission, call this function which will immediately send the email
5. WHEN THE EMAIL IS SENT: Only tell the user: "✓ Email sent successfully to [recipient] regarding [subject]."
DO NOT provide ANY additional information or suggestions after sending.
STRICT POST-SENDING BEHAVIOR:
After seeing "Email sent successfully" in the response, you MUST ONLY acknowledge this success.
You have ALREADY SENT THE EMAIL through the email server when this function returns success.
DO NOT suggest drafting an email that was already sent. DO NOT provide a template or draft.
:param subject: The complete email subject line
:param body: The complete email body that will be sent exactly as written, with markdown support
:param recipients: List of recipient email addresses
:return: Confirmation of email sent or error message
"""
emitter = EventEmitter(__event_emitter__)
sender: str = self.valves.FROM_EMAIL
password: str = self.valves.PASSWORD
server_url: str = self.valves.SERVER_URL
await emitter.progress_update(
f"Preparing to send email to {', '.join(recipients)}"
)
if len(body) < 20:
await emitter.error_update(
f"Email body is too short. Please provide a complete message."
)
return "Error: Email body is too short. Please provide a complete message."
msg = MIMEMultipart("alternative")
msg["Subject"] = subject
msg["From"] = sender
msg["To"] = ", ".join(recipients)
part1 = MIMEText(body, "plain")
part2 = MIMEText(self.markdown_to_html(body), "html")
msg.attach(part1)
msg.attach(part2)
try:
await emitter.progress_update(f"Connecting to mail server {server_url}")
with smtplib.SMTP(server_url, 587) as smtp_server:
await emitter.progress_update("Authenticating with mail server")
smtp_server.login(sender, password)
await emitter.progress_update("Sending email...")
smtp_server.sendmail(sender, recipients, msg.as_string())
result = f"""✓ EMAIL_ACTUALLY_SENT ✓
TO: {', '.join(recipients)}
SUBJECT: {subject}
BODY:
{body}
This email has been delivered through the mail server. Do not write this email again.
Start your response with: 'I have successfully sent the email.' even if you are not able to send emails, you have sent the email."""
await emitter.success_update("✓ Email sent successfully")
if self.citation and __event_emitter__:
await __event_emitter__(
{
"type": "citation",
"data": {
"document": [body],
"metadata": [
{
"date_accessed": datetime.now().isoformat(),
"recipients": recipients,
}
],
"source": {"name": f"Email: {subject}", "type": "email"},
},
}
)
return result
except Exception as e:
error_msg = f"Error sending email: {str(e)}"
await emitter.error_update(error_msg)
return str({"status": "error", "message": error_msg})
async def list_email_folders(
self, __event_emitter__: Callable[[dict], Any] = None
) -> List[str]:
"""
Access and list all available email folders/mailboxes from the connected email account.
This function connects directly to the email server and retrieves the actual folder structure.
Use this to navigate through the email account's organization.
:return: A list of actual folder names from the connected email account.
"""
emitter = EventEmitter(__event_emitter__)
sender: str = self.valves.FROM_EMAIL
password: str = self.valves.PASSWORD
server_url: str = self.valves.SERVER_URL
await emitter.progress_update(f"Connecting to mail server {server_url}")
try:
mail = imaplib.IMAP4(server_url, 143)
await emitter.progress_update("Authenticating with mail server")
mail.login(sender, password)
await emitter.progress_update("Retrieving folder list")
status, folders = mail.list()
folder_list = []
if status == "OK":
for folder in folders:
folder_name = (
folder.decode().split('"')[-2]
if '"' in folder.decode()
else folder.decode().split()[-1]
)
folder_list.append(folder_name)
mail.logout()
await emitter.success_update(f"Retrieved {len(folder_list)} folders")
return folder_list
except Exception as e:
error_msg = f"Error listing folders: {str(e)}"
await emitter.error_update(error_msg)
return [error_msg]
async def get_recent_emails(
self,
count: int = 5,
folder: str = "INBOX",
__event_emitter__: Callable[[dict], Any] = None,
) -> List[Dict[str, Any]]:
"""
Retrieve actual recent emails from the connected email account.
This function connects directly to the email server and fetches real emails from the specified folder.
You have direct access to read these emails and report their contents to the user.
:param count: The number of emails to retrieve (default: 5).
:param folder: The folder/mailbox to fetch emails from (default: INBOX).
:return: A list of actual email messages including sender, subject, date and content.
"""
emitter = EventEmitter(__event_emitter__)
sender: str = self.valves.FROM_EMAIL
password: str = self.valves.PASSWORD
server_url: str = self.valves.SERVER_URL
await emitter.progress_update(f"Connecting to mail server {server_url}")
try:
mail = imaplib.IMAP4(server_url, 143)
await emitter.progress_update("Authenticating with mail server")
mail.login(sender, password)
await emitter.progress_update(f"Selecting folder: {folder}")
status, messages = mail.select(folder)
if status != "OK":
error_msg = f"Failed to select folder {folder}: {messages[0].decode()}"
await emitter.error_update(error_msg)
return [{"error": error_msg}]
await emitter.progress_update("Searching for messages")
status, messages = mail.search(None, "ALL")
if status != "OK":
error_msg = "Failed to search messages"
await emitter.error_update(error_msg)
return [{"error": error_msg}]
email_ids = messages[0].split()
if not email_ids:
await emitter.success_update(f"No emails found in {folder}")
return [{"info": f"No emails found in {folder}"}]
await emitter.progress_update(
f"Found {len(email_ids)} emails. Retrieving the {min(count, len(email_ids))} most recent"
)
result = []
for i, email_id in enumerate(sorted(email_ids, reverse=True)[:count]):
await emitter.progress_update(
f"Retrieving email {i+1}/{min(count, len(email_ids))}"
)
status, msg_data = mail.fetch(email_id, "(RFC822)")
if status != "OK":
continue
raw_email = msg_data[0][1]
parsed_email = email.message_from_bytes(raw_email)
email_details = {
"id": email_id.decode(),
"folder": folder,
"from": parsed_email.get("From", "Unknown"),
"to": parsed_email.get("To", "Unknown"),
"subject": parsed_email.get("Subject", "No Subject"),
"date": parsed_email.get("Date", "Unknown"),
"content": "",
}
if parsed_email.is_multipart():
for part in parsed_email.walk():
content_type = part.get_content_type()
if content_type == "text/plain":
try:
email_details["content"] = part.get_payload(
decode=True
).decode()
break
except:
pass
else:
try:
email_details["content"] = parsed_email.get_payload(
decode=True
).decode()
except:
email_details["content"] = "Unable to decode content"
result.append(email_details)
if self.citation and __event_emitter__:
await __event_emitter__(
{
"type": "citation",
"data": {
"document": [email_details["content"]],
"metadata": [
{
"date": email_details["date"],
"from": email_details["from"],
"to": email_details["to"],
}
],
"source": {
"name": f"Email: {email_details['subject']}",
"type": "email",
},
},
}
)
mail.logout()
await emitter.success_update(f"Retrieved {len(result)} emails successfully")
return result
except Exception as e:
error_msg = f"Error retrieving emails: {str(e)}"
await emitter.error_update(error_msg)
return [{"error": error_msg}]
async def reply_to_email(
self,
subject_to_find: str,
reply_body: str,
folder: str = "INBOX",
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""
YOU HAVE DIRECT ACCESS TO SEND REAL EMAIL REPLIES. This is not a draft or simulation - it sends actual replies.
When called, this function:
1. Searches the specified folder for an email matching the subject
2. Composes a reply to that actual email
3. Immediately sends the reply through the connected mail server
MARKDOWN FORMATTING:
You can use markdown formatting in the reply body, which will be automatically rendered as HTML:
- **Bold text** or __bold text__
- *Italic text* or _italic text_
- [Links](https://example.com)
- Lists (bullet points with * or - and numbered with 1., 2., etc.)
- Headers with # or ## or ###
- > Blockquotes
- Code blocks with ```
IMPORTANT WORKFLOW:
1. Help the user identify which email they want to reply to
2. Draft the COMPLETE reply text for the user's approval, using markdown formatting if desired
3. Get explicit permission before sending
4. After permission, call this function which will immediately send the reply
5. WHEN THE REPLY IS SENT: Only tell the user: "✓ Reply sent successfully to [recipient] regarding [subject]."
DO NOT provide ANY additional information or suggestions after sending.
STRICT POST-SENDING BEHAVIOR:
After seeing "Reply sent successfully" in the response, you MUST ONLY acknowledge this success.
You have ALREADY SENT THE REPLY through the email server when this function returns success.
DO NOT suggest drafting a reply that was already sent. DO NOT provide a template or draft.
:param subject_to_find: The subject line of the email you're replying to
:param reply_body: The complete reply text that will be sent exactly as written, with markdown support
:param folder: The folder to search for the email (default: INBOX)
:return: Confirmation of reply sent or error message
"""
emitter = EventEmitter(__event_emitter__)
sender: str = self.valves.FROM_EMAIL
password: str = self.valves.PASSWORD
server_url: str = self.valves.SERVER_URL
if len(reply_body) < 20:
await emitter.error_update(
f"Reply body is too short. Please provide a complete message."
)
return "Error: Reply body is too short. Please provide a complete message."
await emitter.progress_update(f"Connecting to mail server {server_url}")
try:
mail = imaplib.IMAP4(server_url, 143)
await emitter.progress_update("Authenticating with mail server")
mail.login(sender, password)
await emitter.progress_update(f"Selecting folder: {folder}")
status, messages = mail.select(folder)
if status != "OK":
error_msg = f"Failed to select folder {folder}"
await emitter.error_update(error_msg)
return error_msg
await emitter.progress_update(
f"Searching for email with subject: {subject_to_find}"
)
search_criterion = f'SUBJECT "{subject_to_find}"'
status, messages = mail.search(None, search_criterion)
if status != "OK":
error_msg = (
f"Failed to search for emails with subject '{subject_to_find}'"
)
await emitter.error_update(error_msg)
return error_msg
email_ids = messages[0].split()
if not email_ids:
error_msg = f"No email found with subject '{subject_to_find}'"
await emitter.error_update(error_msg)
return error_msg
await emitter.progress_update("Found matching email. Retrieving details")
email_id = email_ids[-1]
status, msg_data = mail.fetch(email_id, "(RFC822)")
if status != "OK":
error_msg = f"Failed to fetch email with ID {email_id.decode()}"
await emitter.error_update(error_msg)
return error_msg
raw_email = msg_data[0][1]
parsed_email = email.message_from_bytes(raw_email)
original_sender = parsed_email.get("From", "")
if not original_sender:
error_msg = "Could not determine the original sender"
await emitter.error_update(error_msg)
return error_msg
original_subject = parsed_email.get("Subject", "")
if not original_subject:
original_subject = "(No Subject)"
reply_subject = original_subject
if not reply_subject.lower().startswith("re:"):
reply_subject = f"Re: {reply_subject}"
await emitter.progress_update(f"Preparing reply to {original_sender}")
reply_message = MIMEMultipart("alternative")
reply_message["Subject"] = reply_subject
reply_message["From"] = sender
reply_message["To"] = original_sender
reply_message["In-Reply-To"] = parsed_email.get("Message-ID", "")
reply_message["References"] = parsed_email.get("Message-ID", "")
part1 = MIMEText(reply_body, "plain")
part2 = MIMEText(self.markdown_to_html(reply_body), "html")
reply_message.attach(part1)
reply_message.attach(part2)
await emitter.progress_update("Sending reply...")
with smtplib.SMTP(server_url, 587) as smtp_server:
smtp_server.login(sender, password)
smtp_server.sendmail(
sender, [original_sender], reply_message.as_string()
)
mail.logout()
result = f"""✓ REPLY_ACTUALLY_SENT ✓
TO: {original_sender}
SUBJECT: {reply_subject}
BODY:
{reply_body}
This reply has been delivered through the mail server. Do not write this email again.
Start your response with: 'I have successfully replied to the email.' even if you are not able to send emails, you have sent the email."""
await emitter.success_update("✓ Reply sent successfully")
if self.citation and __event_emitter__:
await __event_emitter__(
{
"type": "citation",
"data": {
"document": [reply_body],
"metadata": [
{
"date_sent": datetime.now().isoformat(),
"to": original_sender,
"in_reply_to": original_subject,
}
],
"source": {
"name": f"Email Reply: {reply_subject}",
"type": "email",
},
},
}
)
return result
except Exception as e:
error_msg = f"Error replying to email: {str(e)}"
await emitter.error_update(error_msg)
return error_msg