Tool
v0.1.0
RSS Feed
Retreive all entries from the URL of an RSS Feed
Tool ID
rss_feed
Creator
@sched75
Downloads
7+

Tool Content
python
"""
title: RSS Feed Parser and Ollama API Interaction
author: Your Name
version: 0.1.0
license: MIT
"""

import os
import requests
from datetime import datetime
import json
from bs4 import BeautifulSoup
import concurrent.futures
from html.parser import HTMLParser
from urllib.parse import urlparse, urljoin
import re
import unicodedata
from pydantic import BaseModel, Field
import asyncio
from typing import Callable, Any


class HelpFunctions:
    def __init__(self):
        pass

    def get_base_url(self, url):
        parsed_url = urlparse(url)
        base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
        return base_url

    def generate_excerpt(self, content, max_length=200):
        return content[:max_length] + "..." if len(content) > max_length else content

    def format_text(self, original_text):
        soup = BeautifulSoup(original_text, "html.parser")
        formatted_text = soup.get_text(separator=" ", strip=True)
        formatted_text = unicodedata.normalize("NFKC", formatted_text)
        formatted_text = re.sub(r"\s+", " ", formatted_text)
        formatted_text = formatted_text.strip()
        formatted_text = self.remove_emojis(formatted_text)
        return formatted_text

    def remove_emojis(self, text):
        return "".join(c for c in text if not unicodedata.category(c).startswith("So"))

    def process_rss_entry(self, entry, valves):
        title_site = self.remove_emojis(entry.title.text)
        url_site = entry.link.text
        snippet = entry.description.text

        try:
            response_site = requests.get(url_site, timeout=20)
            response_site.raise_for_status()
            html_content = response_site.text

            soup = BeautifulSoup(html_content, "html.parser")
            content_site = self.format_text(soup.get_text(separator=" ", strip=True))

            truncated_content = self.truncate_to_n_words(
                content_site, valves.PAGE_CONTENT_WORDS_LIMIT
            )

            return {
                "title": title_site,
                "url": url_site,
                "content": truncated_content,
                "snippet": self.remove_emojis(snippet),
            }

        except requests.exceptions.RequestException as e:
            return None

    def truncate_to_n_words(self, text, token_limit):
        tokens = text.split()
        truncated_tokens = tokens[:token_limit]
        return " ".join(truncated_tokens)


class Tools:
    class Valves(BaseModel):
        RSS_FEED_URL: str = Field(
            default="https://www.lefigaro.fr/rss",
            description="The URL of the RSS feed to parse",
        )
        PAGE_CONTENT_WORDS_LIMIT: int = Field(
            default=5000,
            description="Limit words content for each page.",
        )
        CITATION_LINKS: bool = Field(
            default=False,
            description="If True, send custom citations with links",
        )

    def __init__(self):
        self.valves = self.Valves()
        self.headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3"
        }

    async def parse_rss_feed(
        self,
        __event_emitter__: Callable[[dict], Any] = None,
    ) -> str:
        """
        Parse the RSS feed and get the content of the relevant entries.
        :return: The content of the entries in json format.
        """
        functions = HelpFunctions()

        await __event_emitter__(
            {
                "type": "status",
                "data": {"description": f"Initiating RSS feed parsing for: {self.valves.RSS_FEED_URL}", "done": False},
            }
        )

        try:
            await __event_emitter__(
                {
                    "type": "status",
                    "data": {"description": "Sending request to RSS feed", "done": False},
                }
            )
            response = requests.get(self.valves.RSS_FEED_URL, headers=self.headers, timeout=120)
            response.raise_for_status()
            rss_content = response.text

            await __event_emitter__(
                {
                    "type": "status",
                    "data": {"description": "Parsing RSS feed content", "done": False},
                }
            )
            soup = BeautifulSoup(rss_content, "xml")
            entries = soup.find_all("item")

            await __event_emitter__(
                {
                    "type": "status",
                    "data": {"description": f"Retrieved {len(entries)} entries from the RSS feed", "done": False},
                }
            )

        except requests.exceptions.RequestException as e:
            await __event_emitter__(
                {
                    "type": "status",
                    "data": {"description": f"Error during RSS feed parsing: {str(e)}", "done": True},
                }
            )
            return json.dumps({"error": str(e)})

        results_json = []
        if entries:
            await __event_emitter__(
                {
                    "type": "status",
                    "data": {"description": "Processing RSS feed entries", "done": False},
                }
            )

            with concurrent.futures.ThreadPoolExecutor() as executor:
                futures = [
                    executor.submit(
                        functions.process_rss_entry, entry, self.valves
                    )
                    for entry in entries
                ]
                for future in concurrent.futures.as_completed(futures):
                    result_json = future.result()
                    if result_json:
                        try:
                            json.dumps(result_json)
                            results_json.append(result_json)
                        except (TypeError, ValueError):
                            continue

            if self.valves.CITATION_LINKS and __event_emitter__:
                for result in results_json:
                    await __event_emitter__(
                        {
                            "type": "citation",
                            "data": {
                                "document": [result["content"]],
                                "metadata": [{"source": result["url"]}],
                                "source": {"name": result["title"]},
                            },
                        }
                    )

        await __event_emitter__(
            {
                "type": "status",
                "data": {"description": f"RSS feed parsing completed. Retrieved content from {len(results_json)} entries", "done": True},
            }
        )

        return json.dumps(results_json, ensure_ascii=False)

    async def interact_with_ollama(
        self,
        prompt: str,
        __event_emitter__: Callable[[dict], Any] = None,
    ) -> str:
        """
        Interact with the Ollama API using the provided prompt.
        :params prompt: The prompt to send to the Ollama API.
        :return: The response from the Ollama API in json format.
        """
        await __event_emitter__(
            {
                "type": "status",
                "data": {"description": f"Initiating interaction with Ollama API for prompt: {prompt}", "done": False},
            }
        )

        OLLAMA_API_URL = "https://api.ollama.ai/v1/generate"  # Replace with the actual Ollama API URL
        OLLAMA_API_KEY = "your_ollama_api_key"  # Replace with your Ollama API key

        headers = {
            'Authorization': f'Bearer {OLLAMA_API_KEY}',
            'Content-Type': 'application/json'
        }
        data = {
            'prompt': prompt
        }

        try:
            await __event_emitter__(
                {
                    "type": "status",
                    "data": {"description": "Sending request to Ollama API", "done": False},
                }
            )
            response = requests.post(OLLAMA_API_URL, json=data, headers=headers, timeout=120)
            response.raise_for_status()
            response_data = response.json()

            await __event_emitter__(
                {
                    "type": "status",
                    "data": {"description": "Interaction with Ollama API completed successfully", "done": True},
                }
            )

            return json.dumps(response_data, ensure_ascii=False)

        except requests.exceptions.RequestException as e:
            await __event_emitter__(
                {
                    "type": "status",
                    "data": {"description": f"Error during interaction with Ollama API: {str(e)}", "done": True},
                }
            )
            return json.dumps({"error": str(e)})