from typing import List, Optional
from pydantic import BaseModel
from docx import Document
from docx.shared import Pt, Inches
from docx.enum.text import WD_ALIGN_PARAGRAPH
from sendgrid import SendGridAPIClient
from sendgrid.helpers.mail import Mail, Attachment, FileContent, FileName, FileType, Disposition
import re
import os
import base64
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.application import MIMEApplication
import smtplib
import json
from jinja2 import Template
import pdfkit
class Filter:
class Valves(BaseModel):
pipelines: List[str] = []
priority: int = 0
SENDGRID_API_KEY: str = ""
PASSWORD: str=""
def __init__(self):
self.type = "filter"
self.name = "Generate document & send email"
self.valves = self.Valves(
**{
"pipelines": ["*"],
"SENDGRID_API_KEY": os.getenv("SENDGRID_API_KEY", ""),
"PASSWORD": os.getenv("PASSWORD", "")
}
)
self.recipients = []
self.resume_content = None
self.sender = "mariageorgianaborca@gmail.com"
async def on_startup(self):
print(f"on_startup:{__name__}")
pass
async def on_shutdown(self):
print(f"on_shutdown:{__name__}")
pass
async def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
if body.get("title", False):
return body
user_message = self.get_last_user_message(body["messages"])
email_pattern = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}'
self.recipients = re.findall(email_pattern, user_message)
return body
async def outlet(self, body: dict, user: Optional[dict] = None) -> dict:
print(f"outlet:{__name__}")
assistent_response = self.get_last_assistent_message(body["messages"])
result = self.extract_resume_content(assistent_response)
if result:
self.resume_content = result
output_path = self.resume_content["name"]["first_name"] + "_" + self.resume_content["name"]["last_name"] + ".pdf"
html = self.generate_html()
self.save_html_to_pdf(html, output_path)
if self.recipients:
self.send_cv_via_email(output_path)
return body
def get_last_assistent_message(self, messages):
for message in list(reversed(messages)):
if message["role"] == "assistant":
return message["content"]
def get_last_user_message(self, messages):
for message in list(reversed(messages)):
if message["role"] == "user":
return message["content"]
def send_cv_via_email(self, cv_path):
body = "This is the CV generated by our assistant."
msg = MIMEMultipart()
msg['Subject'] = "CV CC RO Template"
msg['From'] = self.sender
msg['To'] = ', '.join(self.recipients)
# Attach the body of the email
msg.attach(MIMEText(body, 'plain'))
# Attach the PDF file
with open(cv_path, 'rb') as pdf_file:
pdf_attachment = MIMEApplication(pdf_file.read(), _subtype='pdf')
pdf_attachment.add_header('Content-Disposition', 'attachment', filename='file.pdf')
msg.attach(pdf_attachment)
# Send the email
with smtplib.SMTP_SSL('smtp.gmail.com', 465) as smtp_server:
smtp_server.login(self.sender, self.valves.PASSWORD)
smtp_server.sendmail(self.sender, self.recipients, msg.as_string())
print("Message sent!")
# def send_report_via_email(self, docx_path):
# message = Mail(
# from_email='hr-admin@computacenter.com.ro',
# to_emails=self.recipients,
# subject="Raport de protocol",
# html_content="This is the protocol report generated by our assistent."
# )
# # Read and encode the file as Base64
# with open(docx_path, 'rb') as f:
# file_data = f.read()
# encoded_file = base64.b64encode(file_data).decode()
# # Create the attachment
# attached_file = Attachment(
# FileContent(encoded_file),
# FileName(os.path.basename(docx_path)),
# FileType('application/vnd.openxmlformats-officedocument.wordprocessingml.document'),
# Disposition('attachment')
# )
# # Add the attachment to the email
# message.attachment = attached_file
# # try:
# # Send the email
# sg = SendGridAPIClient(self.valves.SENDGRID_API_KEY)
# response = sg.send(message)
# print(f"Email sent! Status Code: {response.status_code}")
# print(f"Response Body: {response.body}")
# print(f"Response Headers: {response.headers}")
# # except Exception as e:
# # print(f"Error sending email: {str(e)}")
def extract_resume_content(self, text):
json_match = re.search(r'\{.*\}', text, re.DOTALL)
if json_match:
json_str = json_match.group(0)
try:
json_data = json.loads(json_str)
except json.JSONDecodeError:
return None
# Top-level required fields and their expected types
required_fields = {
"name": dict,
"role": str,
"seniority_level": str,
"executive_summary": dict,
"professional_skills": list,
"work_experience": list,
"education": list,
"certifications": list
}
# Validate presence and type of top-level fields
for key, expected_type in required_fields.items():
if not isinstance(json_data.get(key), expected_type):
return None
# Validate nested dictionary structures
name = json_data.get("name", {})
if not isinstance(name.get("last_name"), str) or not isinstance(name.get("first_name"), str):
return None
executive_summary = json_data.get("executive_summary", {})
if not isinstance(executive_summary.get("about_job_profile"), str) or not isinstance(executive_summary.get("about_professional"), str):
return None
# Validate list of skills, work experience, education, and certifications entries
def validate_list_items(data_list, required_keys):
for item in data_list:
if not all(isinstance(item.get(k), t) for k, t in required_keys.items()):
return False
return True
if not validate_list_items(json_data.get("professional_skills", []), {"skill": str, "rating": int}):
return None
if not validate_list_items(json_data.get("work_experience", []), {
"role": str, "company_name": str, "responsibilities": str,
"time_interval": dict, "skills_tools": list
}) or not all(isinstance(w.get("time_interval", {}).get(k), str) for w in json_data["work_experience"] for k in ["start", "end"]):
return None
if not validate_list_items(json_data.get("education", []), {
"level": str, "topic": str, "institution": str, "description": str,
"time_interval": dict
}) or not all(isinstance(e.get("time_interval", {}).get(k), str) for e in json_data["education"] for k in ["start", "end"]):
return None
if not validate_list_items(json_data.get("certifications", []), {
"title": str, "organization": str, "date": str, "description": str
}):
return None
return json_data
return None
def generate_html(self):
html_template = """
{{ employee_name }}
{{ job_profile }}
{% if seniority_level %}
{{ seniority_level }} Level
{% endif %}
{% if about_job_profile or about_professional %}
Executive Summary
{% if about_job_profile %}
About the Job Profile
{{ about_job_profile }}
{% endif %}
{% if about_professional %}
About the Professional
{{ about_professional }}
{% endif %}
{% endif %}
Professional Skills
{% for skill in professional_skills %}
{% endfor %}
{% if work_experience %}
Work Experience
{% for work in work_experience %}
{{ work.role }} at {{ work.company_name }}
{{ work.time_interval.start }} - {{ work.time_interval.end }}
Responsabilities
{{ work.responsibilities }}
Skills and Tools:
{% for skill in work.skills_tools%}
{{ skill }}
{% endfor %}
{% endfor %}
{% endif %}
{% if education %}
Education
{% for edu in education %}
{{ edu.level }} from {{ edu.institution }}
{{ edu.time_interval.start }} - {{ edu.time_interval.end }}
{{ edu.description }}
{% endfor %}
{% endif %}
{% if certifications %}
Certifications
{% for cert in certifications %}
{{ cert.title }} from {{ cert.organization }}
{{ cert.date }}
{{ cert.description }}
{% endfor %}
{% endif %}
"""
# Prepare Jinja2 template
template = Template(html_template)
context = {
"site_brand": "Your Brand", # Replace as necessary
"employee_name": f"{self.resume_content['name']['last_name'].upper()}, {self.resume_content['name']['first_name']}",
"job_profile": self.resume_content['role'],
"seniority_level": self.resume_content['seniority_level'],
"about_job_profile": self.resume_content['executive_summary']["about_job_profile"],
"about_professional": self.resume_content['executive_summary']["about_professional"],
"professional_skills": self.resume_content['professional_skills'],
"work_experience": self.resume_content['work_experience'],
"education": self.resume_content['education'],
"certifications": self.resume_content['certifications']
}
return template.render(context)
def save_html_to_pdf(self, html_content, output_pdf_path):
options = {
'encoding': 'UTF-8',
'page-size': 'A4',
'margin-top': '0.5in',
'margin-right': '0.5in',
'margin-bottom': '0.5in',
'margin-left': '0.5in'
}
pdfkit.from_string(html_content, output_pdf_path, options=options)