import os
import requests
from datetime import datetime
import json
from requests import get
from bs4 import BeautifulSoup
import concurrent.futures
from html.parser import HTMLParser
from urllib.parse import urlparse, urljoin
import re
import unicodedata
from pydantic import BaseModel, Field
import asyncio
from typing import Callable, Any, List
import logging
# Configuration de la journalisation
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class HelpFunctions:
def __init__(self):
pass
def get_base_url(self, url: str) -> str:
"""Retourne l'URL de base."""
parsed_url = urlparse(url)
return f"{parsed_url.scheme}://{parsed_url.netloc}"
def generate_excerpt(self, content: str, max_length: int = 200) -> str:
"""Génère un extrait à partir du contenu fourni."""
return content[:max_length] + "..." if len(content) > max_length else content
def format_text(self, original_text: str) -> str:
"""Formate le texte brut en nettoyant les balises HTML et les espaces."""
soup = BeautifulSoup(original_text, "html.parser")
formatted_text = soup.get_text(separator=" ", strip=True)
formatted_text = unicodedata.normalize("NFKC", formatted_text)
formatted_text = re.sub(r"\s+", " ", formatted_text).strip()
return self.remove_emojis(formatted_text)
def remove_emojis(self, text: str) -> str:
"""Supprime les emojis du texte."""
return "".join(c for c in text if not unicodedata.category(c).startswith("So"))
def process_search_result(self, result: dict, valves) -> dict:
"""Traite un résultat de recherche et récupère le contenu de la page."""
title_site = self.remove_emojis(result["title"])
url_site = result["url"]
snippet = result.get("content", "")
# Vérifie si le site web est dans la liste ignorée
if valves.IGNORED_WEBSITES:
base_url = self.get_base_url(url_site)
if any(
ignored_site.strip() in base_url
for ignored_site in valves.IGNORED_WEBSITES.split(",")
):
return None
try:
response_site = requests.get(url_site, timeout=20)
response_site.raise_for_status()
html_content = response_site.text
soup = BeautifulSoup(html_content, "html.parser")
content_site = self.format_text(soup.get_text(separator=" ", strip=True))
truncated_content = self.truncate_to_n_words(
content_site, valves.PAGE_CONTENT_WORDS_LIMIT
)
return {
"title": title_site,
"url": url_site,
"content": truncated_content,
"snippet": self.remove_emojis(snippet),
}
except requests.exceptions.Timeout:
logger.error(f"Temps de réponse dépassé pour {url_site}.")
return None
except requests.exceptions.RequestException as e:
logger.error(f"Erreur lors de la récupération de {url_site}: {str(e)}")
return None
def truncate_to_n_words(self, text: str, token_limit: int) -> str:
"""Tronque le texte à un nombre de mots spécifié."""
tokens = text.split()
truncated_tokens = tokens[:token_limit]
return " ".join(truncated_tokens)
class EventEmitter:
def __init__(self, event_emitter: Callable[[dict], Any] = None):
self.event_emitter = event_emitter
async def emit(
self,
description: str = "État inconnu",
status: str = "en_cours",
done: bool = False,
):
"""Émet un événement avec l'état actuel."""
if self.event_emitter:
await self.event_emitter(
{
"type": "status",
"data": {
"status": status,
"description": description,
"done": done,
},
}
)
# Configuration de la journalisation
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class Tools:
class Valves(BaseModel):
SEARXNG_ENGINE_API_BASE_URL: str = Field(
default="http://host.docker.internal:8080/search",
description="L'URL de base pour le moteur de recherche",
)
IGNORED_WEBSITES: str = Field(
default="",
description="Liste séparée par des virgules des sites web à ignorer",
)
RETURNED_SCRAPPED_PAGES_NO: int = Field(
default=7,
description="Le nombre de résultats de moteur de recherche à analyser",
)
SCRAPPED_PAGES_NO: int = Field(
default=14,
description="Nombre total de pages grattées. Idéalement supérieur à l'un des pages retournées",
)
PAGE_CONTENT_WORDS_LIMIT: int = Field(
default=5000,
description="Limite de mots pour le contenu de chaque page.",
)
CITATION_LINKS: bool = Field(
default=False,
description="Si True, envoyer des citations personnalisées avec des liens",
)
def __init__(self):
self.valves = self.Valves()
self.headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3"
}
def build_advanced_query(self, base_query: str, focus_keywords: List[str]) -> str:
"""
Construit une requête avancée avec des opérateurs pour améliorer la pertinence des résultats.
"""
focus_part = " OR ".join([f'"{kw}"' for kw in focus_keywords])
return f"({base_query}) AND ({focus_part})"
async def search_web(
self,
query: str,
focus_keywords: List[str] = None,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""
Rechercher sur le web et obtenir le contenu des pages pertinentes.
"""
functions = HelpFunctions()
emitter = EventEmitter(__event_emitter__)
await emitter.emit(f"Initiation de la recherche web pour : {query}")
search_engine_url = self.valves.SEARXNG_ENGINE_API_BASE_URL
# S'assurer que RETURNED_SCRAPPED_PAGES_NO ne dépasse pas SCRAPPED_PAGES_NO
self.valves.RETURNED_SCRAPPED_PAGES_NO = min(
self.valves.RETURNED_SCRAPPED_PAGES_NO, self.valves.SCRAPPED_PAGES_NO
)
if focus_keywords is None:
focus_keywords = [] # Utiliser une liste vide si aucun mot-clé n'est fourni
advanced_query = self.build_advanced_query(query, focus_keywords)
params = {
"q": advanced_query,
"format": "json",
"number_of_results": self.valves.RETURNED_SCRAPPED_PAGES_NO,
}
try:
await emitter.emit("Envoi de la requête au moteur de recherche")
resp = requests.get(
search_engine_url, params=params, headers=self.headers, timeout=120
)
resp.raise_for_status()
data = resp.json()
results = data.get("results", [])
limited_results = results[: self.valves.SCRAPPED_PAGES_NO]
await emitter.emit(
f"Récupéré {len(limited_results)} résultats de recherche"
)
valid_results = self.validate_search_results(limited_results)
await emitter.emit(f"{len(valid_results)} résultats valides trouvés.")
except requests.exceptions.RequestException as e:
await emitter.emit(
status="error",
description=f"Erreur pendant la recherche : {str(e)}",
done=True,
)
return json.dumps({"error": str(e)})
results_json = []
urls = [] # Liste pour stocker les URLs
if valid_results:
await emitter.emit(f"Traitement des résultats de recherche")
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [
executor.submit(
functions.process_search_result, result, self.valves
)
for result in valid_results
]
for future in concurrent.futures.as_completed(futures):
result_json = future.result()
if result_json:
cleaned_content = self.clean_and_correct_content(
result_json.get("content", "")
)
result_json["content"] = cleaned_content
results_json.append(result_json)
urls.append(result_json.get("url"))
if len(results_json) >= self.valves.RETURNED_SCRAPPED_PAGES_NO:
break
# Filtrer pour ne garder que les 3 résultats les plus pertinents
results_json = self.get_top_relevant_results(results_json, top_n=3)
# Formater les résultats selon le type de contenu
if "recipe" in query.lower():
formatter = RecipeFormatter()
formatted_response = formatter.format_recipes(results_json)
elif "news" in query.lower():
formatter = NewsFormatter()
formatted_response = formatter.format_news(results_json)
elif "movie" in query.lower():
formatter = MovieFormatter()
formatted_response = formatter.format_movies(results_json)
else:
formatted_response = json.dumps(results_json, ensure_ascii=False)
await emitter.emit(
status="complete",
description=f"La recherche web est terminée. Contenu récupéré de {len(results_json)} pages",
done=True,
)
citation = f"\n\nLiens sources : " + ", ".join(urls)
return formatted_response + citation
def get_top_relevant_results(self, results: List[dict], top_n: int) -> List[dict]:
"""
Sélectionne les résultats les plus pertinents.
"""
sorted_results = sorted(
results, key=lambda x: len(x.get("content", "")), reverse=True
)
return sorted_results[:top_n]
def clean_and_correct_content(self, content: str) -> str:
"""
Nettoie et corrige le contenu scrappé.
"""
paraphrased_content = self.paraphrase(content)
corrected_content = self.correct_content(paraphrased_content)
return corrected_content
def paraphrase(self, content: str) -> str:
"""
Simule le paraphrasing du contenu.
"""
return content.replace("important", "crucial").replace("utiliser", "employer")
def correct_content(self, content: str) -> str:
"""
Simule la correction du contenu.
"""
return content.replace("utiliser", "utiliser").replace(
"erreur", "erreur corrigée"
)
def validate_search_results(self, results: List[dict]) -> List[dict]:
"""
Valide les résultats de recherche.
"""
valid_results = []
for result in results:
if (
"title" in result
and "url" in result
and result["url"].startswith("http")
):
valid_results.append(result)
else:
logger.warning(f"Résultat invalide : {result}")
return valid_results
class BaseFormatter:
def __init__(self, title: str):
self.title = title
self.formatted_content = []
def add_section(self, header: str, content: str):
self.formatted_content.append(f"#### {header}\n")
self.formatted_content.append(f"{content}\n")
def finalize(self) -> str:
return f"### {self.title}\n" + "".join(self.formatted_content) + "\n---\n"
class RecipeFormatter(BaseFormatter):
def __init__(self):
super().__init__("Recettes de Pain Rapides")
def format_recipes(self, results: List[dict]) -> str:
self.formatted_content.append(
"La recherche web est terminée. Voici quelques recettes de pain rapides et faciles à préparer :\n"
)
for result in results:
self.add_section(
result["title"],
f"- **Temps de Préparation** : {result['prep_time']}\n"
f"- **Temps de Cuisson** : {result['cook_time']}\n"
f"- **Température** : {result['temperature']}°C\n"
f"- **Portions** : {result['servings']}\n\n"
f"**Ingrédients** :\n"
+ "\n".join(f"- {ingredient}" for ingredient in result["ingredients"])
+ "\n\n"
+ "**Instructions** :\n"
+ result["instructions"],
)
return self.finalize()
class NewsFormatter(BaseFormatter):
def __init__(self):
super().__init__("Actualités")
def format_news(self, results: List[dict]) -> str:
self.formatted_content.append("Voici les dernières nouvelles :\n")
for result in results:
self.add_section(
result["title"],
f"- **Source** : {result['source']}\n"
f"- **Date** : {result['date']}\n"
f"- **Résumé** : {result['summary']}\n\n"
f"**Lien** : [Lire l'article]({result['url']})\n",
)
return self.finalize()
class MovieFormatter(BaseFormatter):
def __init__(self):
super().__init__("Films")
def format_movies(self, results: List[dict]) -> str:
self.formatted_content.append("Voici quelques films recommandés :\n")
for result in results:
self.add_section(
result["title"],
f"- **Année** : {result['year']}\n"
f"- **Genre** : {', '.join(result['genre'])}\n"
f"- **Résumé** : {result['summary']}\n"
f"- **Note** : {result['rating']}\n\n"
f"**Lien** : [Voir le film]({result['url']})\n",
)
return self.finalize()
async def get_website(
self, url: str, __event_emitter__: Callable[[dict], Any] = None
) -> str:
"""
Gratter le site web fourni et obtenir son contenu.
"""
functions = HelpFunctions()
emitter = EventEmitter(__event_emitter__)
await emitter.emit(f"Récupération du contenu depuis l'URL : {url}")
results_json = []
try:
response_site = requests.get(url, headers=self.headers, timeout=120)
response_site.raise_for_status()
html_content = response_site.text
await emitter.emit("Analyse du contenu du site web")
soup = BeautifulSoup(html_content, "html.parser")
page_title = soup.title.string if soup.title else "Aucun titre trouvé"
page_title = unicodedata.normalize("NFKC", page_title.strip())
page_title = functions.remove_emojis(page_title)
content_site = functions.format_text(soup.get_text(separator=" ", strip=True))
truncated_content = functions.truncate_to_n_words(
content_site, self.valves.PAGE_CONTENT_WORDS_LIMIT
)
result_site = {
"title": page_title,
"url": url,
"content": truncated_content,
"excerpt": functions.generate_excerpt(content_site),
}
results_json.append(result_site)
if self.valves.CITATION_LINKS and __event_emitter__:
await __event_emitter__(
{
"type": "citation",
"data": {
"document": [truncated_content],
"metadata": [{"source": url}],
"source": {"name": page_title},
},
}
)
await emitter.emit(
status="complete",
description="Contenu du site web récupéré et traité avec succès",
done=True,
)
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
logger.warning(f"Ignorer l'erreur 404 pour l'URL : {url}")
results_json.append(
{
"url": url,
"content": "Erreur 404: Page non trouvée.",
}
)
await emitter.emit(
status="info",
description=f"Ignorer l'erreur 404 pour l'URL : {url}",
done=True,
)
else:
results_json.append(
{
"url": url,
"content": f"Échec de la récupération de la page. Erreur : {str(e)}",
}
)
await emitter.emit(
status="error",
description=f"Erreur lors de la récupération du contenu du site web : {str(e)}",
done=True,
)
return json.dumps(results_json, ensure_ascii=False)