"""
title: RSS Feed Parser and Ollama API Interaction
author: Your Name
version: 0.1.0
license: MIT
"""
import os
import requests
from datetime import datetime
import json
from bs4 import BeautifulSoup
import concurrent.futures
from html.parser import HTMLParser
from urllib.parse import urlparse, urljoin
import re
import unicodedata
from pydantic import BaseModel, Field
import asyncio
from typing import Callable, Any
class HelpFunctions:
def __init__(self):
pass
def get_base_url(self, url):
parsed_url = urlparse(url)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
return base_url
def generate_excerpt(self, content, max_length=200):
return content[:max_length] + "..." if len(content) > max_length else content
def format_text(self, original_text):
soup = BeautifulSoup(original_text, "html.parser")
formatted_text = soup.get_text(separator=" ", strip=True)
formatted_text = unicodedata.normalize("NFKC", formatted_text)
formatted_text = re.sub(r"\s+", " ", formatted_text)
formatted_text = formatted_text.strip()
formatted_text = self.remove_emojis(formatted_text)
return formatted_text
def remove_emojis(self, text):
return "".join(c for c in text if not unicodedata.category(c).startswith("So"))
def process_rss_entry(self, entry, valves):
title_site = self.remove_emojis(entry.title.text)
url_site = entry.link.text
snippet = entry.description.text
try:
response_site = requests.get(url_site, timeout=20)
response_site.raise_for_status()
html_content = response_site.text
soup = BeautifulSoup(html_content, "html.parser")
content_site = self.format_text(soup.get_text(separator=" ", strip=True))
truncated_content = self.truncate_to_n_words(
content_site, valves.PAGE_CONTENT_WORDS_LIMIT
)
return {
"title": title_site,
"url": url_site,
"content": truncated_content,
"snippet": self.remove_emojis(snippet),
}
except requests.exceptions.RequestException as e:
return None
def truncate_to_n_words(self, text, token_limit):
tokens = text.split()
truncated_tokens = tokens[:token_limit]
return " ".join(truncated_tokens)
class Tools:
class Valves(BaseModel):
RSS_FEED_URL: str = Field(
default="https://www.lefigaro.fr/rss",
description="The URL of the RSS feed to parse",
)
PAGE_CONTENT_WORDS_LIMIT: int = Field(
default=5000,
description="Limit words content for each page.",
)
CITATION_LINKS: bool = Field(
default=False,
description="If True, send custom citations with links",
)
def __init__(self):
self.valves = self.Valves()
self.headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3"
}
async def parse_rss_feed(
self,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""
Parse the RSS feed and get the content of the relevant entries.
:return: The content of the entries in json format.
"""
functions = HelpFunctions()
await __event_emitter__(
{
"type": "status",
"data": {"description": f"Initiating RSS feed parsing for: {self.valves.RSS_FEED_URL}", "done": False},
}
)
try:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Sending request to RSS feed", "done": False},
}
)
response = requests.get(self.valves.RSS_FEED_URL, headers=self.headers, timeout=120)
response.raise_for_status()
rss_content = response.text
await __event_emitter__(
{
"type": "status",
"data": {"description": "Parsing RSS feed content", "done": False},
}
)
soup = BeautifulSoup(rss_content, "xml")
entries = soup.find_all("item")
await __event_emitter__(
{
"type": "status",
"data": {"description": f"Retrieved {len(entries)} entries from the RSS feed", "done": False},
}
)
except requests.exceptions.RequestException as e:
await __event_emitter__(
{
"type": "status",
"data": {"description": f"Error during RSS feed parsing: {str(e)}", "done": True},
}
)
return json.dumps({"error": str(e)})
results_json = []
if entries:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Processing RSS feed entries", "done": False},
}
)
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [
executor.submit(
functions.process_rss_entry, entry, self.valves
)
for entry in entries
]
for future in concurrent.futures.as_completed(futures):
result_json = future.result()
if result_json:
try:
json.dumps(result_json)
results_json.append(result_json)
except (TypeError, ValueError):
continue
if self.valves.CITATION_LINKS and __event_emitter__:
for result in results_json:
await __event_emitter__(
{
"type": "citation",
"data": {
"document": [result["content"]],
"metadata": [{"source": result["url"]}],
"source": {"name": result["title"]},
},
}
)
await __event_emitter__(
{
"type": "status",
"data": {"description": f"RSS feed parsing completed. Retrieved content from {len(results_json)} entries", "done": True},
}
)
return json.dumps(results_json, ensure_ascii=False)
async def interact_with_ollama(
self,
prompt: str,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""
Interact with the Ollama API using the provided prompt.
:params prompt: The prompt to send to the Ollama API.
:return: The response from the Ollama API in json format.
"""
await __event_emitter__(
{
"type": "status",
"data": {"description": f"Initiating interaction with Ollama API for prompt: {prompt}", "done": False},
}
)
OLLAMA_API_URL = "https://api.ollama.ai/v1/generate" # Replace with the actual Ollama API URL
OLLAMA_API_KEY = "your_ollama_api_key" # Replace with your Ollama API key
headers = {
'Authorization': f'Bearer {OLLAMA_API_KEY}',
'Content-Type': 'application/json'
}
data = {
'prompt': prompt
}
try:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Sending request to Ollama API", "done": False},
}
)
response = requests.post(OLLAMA_API_URL, json=data, headers=headers, timeout=120)
response.raise_for_status()
response_data = response.json()
await __event_emitter__(
{
"type": "status",
"data": {"description": "Interaction with Ollama API completed successfully", "done": True},
}
)
return json.dumps(response_data, ensure_ascii=False)
except requests.exceptions.RequestException as e:
await __event_emitter__(
{
"type": "status",
"data": {"description": f"Error during interaction with Ollama API: {str(e)}", "done": True},
}
)
return json.dumps({"error": str(e)})