Tool
CMS
CMS
Tool ID
CMS
Creator
@kaylis
Downloads
45+

Tool Content
python
from typing import Dict, Any, Optional
import requests
from pydantic import BaseModel, Field

class ApiPipeFilter:
    class Valves(BaseModel):
        priority: int = Field(default=0, description="Priority level for API pipe operations.")

    def __init__(self):
        self.valves = self.Valves()

    def inlet(self, body: dict, __user__: Optional[dict] = None) -> dict:
        if should_call_api(body.get("messages", []), __user__):
            api_response = call_api(body.get("messages", []), __user__)
            processed_text = process_api_response(api_response)
            # Assuming 'body' has a 'text' field to update
            body["text"] = processed_text  
        return body

def should_call_api(messages: list, __user__: Optional[dict] = None) -> bool:
    """
    Determines if an API call is necessary based on the messages and user context.
    """
    keywords = ["coverage", "Medicare", "benefits", "drug", "plan"]
    last_message = messages[-1]["content"] if messages else ""
    return any(keyword in last_message.lower() for keyword in keywords)

def call_api(messages: list, __user__: Optional[dict] = None) -> Any:
    """
    Makes the actual API call to the Coverage API.
    """
    base_url = "https://api.coverage.cms.gov/api/"
    endpoint = "search"
    last_message = messages[-1]["content"] if messages else ""
    params = {"q": last_message}

    try:
        response = requests.get(f"{base_url}{endpoint}", params=params)
        response.raise_for_status()
        return response.json()
    except requests.exceptions.RequestException as e:
        print(f"Error calling Coverage API: {e}")
        return None

def process_api_response(api_response: Any) -> str:
    """
    Processes the Coverage API response.
    """
    if api_response and "items" in api_response:
        items = api_response["items"][:3]
        formatted_items = "\n".join([
            f"- {item['name']}: {item['description']}" for item in items
        ])
        return f"Here's some information I found about Medicare coverage:\n{formatted_items}"
    else:
        return "I couldn't find any specific Medicare coverage information related to your query."

# Create an instance of the filter
api_pipe_filter = ApiPipeFilter()