We're Hiring!
Whitepaper
Docs
Sign In
@romainneup
·
3 days ago
·
10 months ago
tool
Confluence Search
Get
Last Updated
3 days ago
Created
10 months ago
Tool
v0.4.0
Name
Confluence Search
Downloads
1.2K+
Saves
0+
Description
This tool allows you to search for and retrieve content from Confluence. (An Atlassian tool)
Tool Code
Show
""" title: Confluence search description: This tool allows you to search for and retrieve content from Confluence. repository: https://github.com/RomainNeup/open-webui-utilities author: @romainneup author_url: https://github.com/RomainNeup funding_url: https://github.com/sponsors/RomainNeup requirements: markdownify, sentence-transformers, numpy, rank_bm25, scikit-learn version: 0.4.0 changelog: - 0.0.1 - Initial code base. - 0.0.2 - Fix Valves variables - 0.1.0 - Split Confluence search and Confluence get page - 0.1.1 - Split Confluence search by title and by content - 0.1.2 - Improve search by splitting query into words - 0.1.3 - Add support for Personal Access Token authentication and user settings - 0.1.4 - Limit setting for search results - 0.2.0 - Implement RAG (Retrieval Augmented Generation) approach for better search results - 0.2.1 - Use Open WebUI environment variables - 0.2.2 - Fix confusion between Confluence API limit and RAG parameters - 0.2.3 - Memory optimization to prevent OOM errors - 0.2.4 - Move all hard-coded values to constants - 0.2.5 - Code structure improvements: search type enum, and better error handling - 0.2.6 - Add terms splitting option - 0.3.0 - Add settings for ssl verification - 0.4.0 - Add support for included/exluded confluence spaces in user settings """ import base64 import json import requests import asyncio import numpy as np import os from enum import Enum from typing import Awaitable, Callable, Dict, List, Any, Optional, Iterable from pydantic import BaseModel, Field from markdownify import markdownify from dataclasses import dataclass from sentence_transformers import SentenceTransformer from rank_bm25 import BM25Okapi from sklearn.neighbors import NearestNeighbors # Get environment variables DEFAULT_EMBEDDING_MODEL = os.environ.get( "RAG_EMBEDDING_MODEL", "sentence-transformers/all-MiniLM-L6-v2" ) RAG_EMBEDDING_MODEL_TRUST_REMOTE_CODE = ( os.environ.get("RAG_EMBEDDING_MODEL_TRUST_REMOTE_CODE", "True").lower() == "true" ) DEFAULT_CHUNK_SIZE = int(os.environ.get("CHUNK_SIZE", "1000")) DEFAULT_CHUNK_OVERLAP = int(os.environ.get("CHUNK_OVERLAP", "100")) DEFAULT_TOP_K = int(os.environ.get("RAG_TOP_K", "3")) DEFAULT_RELEVANCE_THRESHOLD = float(os.environ.get("RAG_RELEVANCE_THRESHOLD", "0.0")) ENABLE_HYBRID_SEARCH = os.environ.get("ENABLE_RAG_HYBRID_SEARCH", "").lower() == "true" RAG_FULL_CONTEXT = os.environ.get("RAG_FULL_CONTEXT", "False").lower() == "true" # Memory management settings MAX_PAGE_SIZE = int(os.environ.get("RAG_FILE_MAX_SIZE", "10000")) BATCH_SIZE = int(os.environ.get("RAG_FILE_MAX_COUNT", "16")) # Read cache dir from environment CACHE_DIR = os.environ.get("CACHE_DIR", "/tmp/cache") DEFAULT_MODEL_CACHE_DIR = os.path.join(CACHE_DIR, "sentence_transformers") # Additional constant values DEFAULT_RRF_CONSTANT = 60 DEFAULT_DUPLICATE_THRESHOLD = 0.95 DEFAULT_BM25_K = 4 DEFAULT_SIMILARITY_FALLBACK = 0.0 # Field validation constraints CHUNK_SIZE_MIN = 5 CHUNK_SIZE_MAX = 100000 CHUNK_OVERLAP_MIN = 0 CHUNK_OVERLAP_MAX = 1000 MAX_RESULTS_MIN = 1 SIMILARITY_SCORE_MIN = 0.0 SIMILARITY_SCORE_MAX = 1.0 MAX_PAGE_SIZE_MIN = 1000 MAX_PAGE_SIZE_MAX = 1000000 BATCH_SIZE_MIN = 1 BATCH_SIZE_MAX = 100 # Default Confluence API related constants DEFAULT_BASE_URL = "https://example.atlassian.net/wiki" DEFAULT_USERNAME = "example@example.com" DEFAULT_API_KEY = "ABCD1234" DEFAULT_API_RESULT_LIMIT = 5 # Custom exceptions for better error handling class ConfluenceError(Exception): """Base exception for Confluence-related errors""" pass class ConfluenceAuthError(ConfluenceError): """Authentication error for Confluence API""" pass class ConfluenceAPIError(ConfluenceError): """API error for Confluence endpoints""" pass class ConfluenceModelError(ConfluenceError): """Error related to embedding models""" pass # Define an enum for search types class SearchType(str, Enum): """Enum for possible Confluence search types""" TITLE = "title" CONTENT = "content" TITLE_AND_CONTENT = "title_and_content" @classmethod def from_string(cls, search_type: str) -> "SearchType": """Convert string to SearchType enum with error handling""" try: return cls(search_type.lower()) except ValueError: # Default to title and content if invalid return cls.TITLE_AND_CONTENT @dataclass class Document: """Simple document class to store page content and metadata""" page_content: str metadata: Dict class EventEmitter: """Helper class to emit events to the UI""" def __init__(self, event_emitter: Callable[[dict], Awaitable[None]]): self.event_emitter = event_emitter pass async def emit_status(self, description: str, done: bool, error: bool = False): await self.event_emitter( { "data": { "description": f"{done and (error and '❌' or '✅') or '🔎'} {description}", "status": done and "complete" or "in_progress", "done": done, }, "type": "status", } ) async def emit_message(self, content: str): await self.event_emitter({"data": {"content": content}, "type": "message"}) async def emit_source(self, name: str, url: str, content: str, html: bool = False): await self.event_emitter( { "type": "citation", "data": { "document": [content], "metadata": [{"source": url, "html": html}], "source": {"name": name}, }, } ) class TextSplitter: """Text splitter for chunking Confluence pages into manageable sections""" def __init__( self, chunk_size: int = DEFAULT_CHUNK_SIZE, chunk_overlap: int = DEFAULT_CHUNK_OVERLAP, ): self.chunk_size = chunk_size self.chunk_overlap = chunk_overlap def split_text(self, text: str) -> List[str]: """Split text into chunks with overlap for better context preservation""" if not text: return [] # Check for extremely large text and enforce hard limit text_length = len(text) if text_length > MAX_PAGE_SIZE: text_length = MAX_PAGE_SIZE text = text[:MAX_PAGE_SIZE] # Truncate text to hard limit chunks = [] start = 0 # Process text in smaller windows to reduce memory pressure while start < text_length: # Calculate window end with extra margin for finding good break points window_end = min(start + self.chunk_size + 100, text_length) window = text[start:window_end] # Find actual chunk end with good break point chunk_end = min(self.chunk_size, len(window)) if chunk_end < len(window): # Look for good break points search_start = max(self.chunk_overlap, chunk_end - 50) for i in range(chunk_end, search_start, -1): if i < len(window) and window[i - 1 : i] in [".", "!", "?", "\n"]: chunk_end = i break # Extract chunk from window chunk = window[:chunk_end] chunks.append(chunk) # Update start position for next window # Ensure we always make progress by advancing at least 1 character # This prevents infinite loops when chunk_end ≤ chunk_overlap progress = max(1, chunk_end - self.chunk_overlap) start += progress # Explicitly clean up the window variable window = None return chunks async def split_documents( self, documents: List[Document], event_emitter: EventEmitter ) -> List[Document]: """Split documents into chunks while preserving metadata""" chunked_documents = [] done = 0 for doc in documents: await event_emitter.emit_status( f"Breaking down document {done+1}/{len(documents)} for better analysis", False, ) done += 1 chunks = self.split_text(doc.page_content) metadata = dict(doc.metadata) for chunk in chunks: chunked_documents.append( Document(page_content=chunk, metadata=metadata) ) return chunked_documents def cosine_similarity(X, Y) -> np.ndarray: """Calculate similarity between matrices X and Y""" if len(X) == 0 or len(Y) == 0: return np.array([]) X = np.array(X) Y = np.array(Y) if X.shape[1] != Y.shape[1]: raise ValueError( f"Number of columns in X and Y must be the same. X has shape {X.shape} " f"and Y has shape {Y.shape}." ) X_norm = np.linalg.norm(X, axis=1) Y_norm = np.linalg.norm(Y, axis=1) # Ignore divide by zero errors run time warnings as those are handled below. with np.errstate(divide="ignore", invalid="ignore"): similarity = np.dot(X, Y.T) / np.outer(X_norm, Y_norm) similarity[np.isnan(similarity) | np.isinf(similarity)] = ( DEFAULT_SIMILARITY_FALLBACK ) return similarity def filter_similar_embeddings( embedded_documents: List[List[float]], similarity_fn: Callable, threshold: float ) -> List[int]: """Filter out redundant documents that are too similar to each other""" similarity = np.tril(similarity_fn(embedded_documents, embedded_documents), k=-1) redundant = np.where(similarity > threshold) redundant_stacked = np.column_stack(redundant) redundant_sorted = np.argsort(similarity[redundant])[::-1] included_idxs = set(range(len(embedded_documents))) for first_idx, second_idx in redundant_stacked[redundant_sorted]: if first_idx in included_idxs and second_idx in included_idxs: # Default to dropping the second document of any highly similar pair. included_idxs.remove(second_idx) return list(sorted(included_idxs)) class DenseRetriever: """Semantic search using document embeddings""" def __init__( self, embedding_model: SentenceTransformer, num_results: int = DEFAULT_TOP_K, similarity_threshold: float = DEFAULT_RELEVANCE_THRESHOLD, batch_size: int = BATCH_SIZE, ): self.embedding_model = embedding_model self.num_results = num_results self.similarity_threshold = similarity_threshold self.batch_size = batch_size self.knn = None self.documents = None self.document_embeddings = None def add_documents(self, documents: List[Document]): """Process documents and prepare embeddings for search""" self.documents = documents # Process documents in batches to avoid memory issues all_embeddings = [] for i in range(0, len(documents), self.batch_size): batch = documents[i : i + self.batch_size] batch_texts = [doc.page_content for doc in batch] batch_embeddings = self.embedding_model.encode(batch_texts) all_embeddings.append(batch_embeddings) # Concatenate all batches self.document_embeddings = ( np.vstack(all_embeddings) if all_embeddings else np.array([]) ) # Create KNN index self.knn = NearestNeighbors(n_neighbors=min(self.num_results, len(documents))) if len(self.document_embeddings) > 0: self.knn.fit(self.document_embeddings) def get_relevant_documents(self, query: str) -> List[Document]: """Find documents most relevant to the query using semantic similarity""" if not self.knn or not self.documents: return [] query_embedding = self.embedding_model.encode(query) _, neighbor_indices = self.knn.kneighbors(query_embedding.reshape(1, -1)) neighbor_indices = neighbor_indices.squeeze(0) # Handle case where we have fewer documents than k if len(neighbor_indices) == 0: return [] relevant_doc_embeddings = self.document_embeddings[neighbor_indices] # Remove duplicative content included_idxs = filter_similar_embeddings( relevant_doc_embeddings, cosine_similarity, threshold=DEFAULT_DUPLICATE_THRESHOLD, ) relevant_doc_embeddings = relevant_doc_embeddings[included_idxs] # Only include sufficiently relevant documents similarity = cosine_similarity([query_embedding], relevant_doc_embeddings)[0] similar_enough = np.where(similarity > self.similarity_threshold)[0] included_idxs = [included_idxs[i] for i in similar_enough] filtered_result_indices = neighbor_indices[included_idxs] return [self.documents[i] for i in filtered_result_indices] def default_preprocessing_func(text: str) -> List[str]: """Split text into words for keyword search""" return text.split() class BM25Retriever: """Keyword-based retrieval using BM25 algorithm""" def __init__( self, vectorizer: Any, docs: List[Document], k: int = DEFAULT_BM25_K, preprocess_func: Callable[[str], List[str]] = default_preprocessing_func, ): self.vectorizer = vectorizer self.docs = docs self.k = k self.preprocess_func = preprocess_func @classmethod def from_documents( cls, documents: Iterable[Document], *, bm25_params: Optional[Dict[str, Any]] = None, preprocess_func: Callable[[str], List[str]] = default_preprocessing_func, **kwargs: Any, ) -> "BM25Retriever": """ Create a BM25Retriever from a list of Documents. """ texts = [preprocess_func(d.page_content) for d in documents] bm25_params = bm25_params or {} vectorizer = BM25Okapi(texts, **bm25_params) return cls( vectorizer=vectorizer, docs=documents, preprocess_func=preprocess_func, **kwargs, ) def get_relevant_documents(self, query: str) -> List[Document]: """Find documents most relevant to the query using keyword search""" processed_query = self.preprocess_func(query) return_docs = self.vectorizer.get_top_n(processed_query, self.docs, n=self.k) return return_docs def weighted_reciprocal_rank( doc_lists: List[List[Document]], weights: List[float], c: int = DEFAULT_RRF_CONSTANT ) -> List[Document]: """Combine multiple ranked document lists into a single ranking""" if len(doc_lists) != len(weights): raise ValueError("Number of rank lists must be equal to the number of weights.") # Associate each doc's content with its RRF score rrf_score = {} for doc_list, weight in zip(doc_lists, weights): for rank, doc in enumerate(doc_list, start=1): if doc.page_content not in rrf_score: rrf_score[doc.page_content] = weight / (rank + c) else: rrf_score[doc.page_content] += weight / (rank + c) # Deduplicate and sort by RRF score unique_docs = {} all_docs = [] for doc_list in doc_lists: for doc in doc_list: if doc.page_content not in unique_docs: unique_docs[doc.page_content] = doc all_docs.append(doc) sorted_docs = sorted( all_docs, reverse=True, key=lambda doc: rrf_score[doc.page_content], ) return sorted_docs class ConfluenceDocumentRetriever: """Handles retrieving and processing documents from Confluence""" def __init__( self, model_cache_dir: str = DEFAULT_MODEL_CACHE_DIR, device: str = "cpu", embedding_model_name: str = DEFAULT_EMBEDDING_MODEL, batch_size: int = BATCH_SIZE, ): self.device = device self.model_cache_dir = model_cache_dir self.embedding_model = None self.embedding_model_name = embedding_model_name self.batch_size = batch_size self.text_splitter = TextSplitter( chunk_size=DEFAULT_CHUNK_SIZE, chunk_overlap=DEFAULT_CHUNK_OVERLAP ) async def load_embedding_model(self, event_emitter): """Load the embedding model for semantic search""" await event_emitter.emit_status( f"Loading embedding model {self.embedding_model_name}...", False ) def load_model(): return SentenceTransformer( self.embedding_model_name, cache_folder=self.model_cache_dir, device=self.device, trust_remote_code=RAG_EMBEDDING_MODEL_TRUST_REMOTE_CODE, ) # Run in an executor to avoid blocking the event loop self.embedding_model = await asyncio.to_thread(load_model) return self.embedding_model async def retrieve_from_confluence_pages( self, query: str, documents: List[Document], event_emitter, num_results: int = DEFAULT_TOP_K, similarity_threshold: float = DEFAULT_RELEVANCE_THRESHOLD, ensemble_weighting: float = 0.5, enable_hybrid_search: bool = ENABLE_HYBRID_SEARCH, ) -> List[Document]: """Find relevant document chunks from Confluence pages using semantic and keyword search""" if not documents: return [] # Chunk documents chunked_docs = await self.text_splitter.split_documents( documents, event_emitter ) await event_emitter.emit_status( f"Prepared {len(chunked_docs)} content sections from {len(documents)} pages for analysis", False, ) if not chunked_docs: return [] results = [] # Determine search approach based on settings if not enable_hybrid_search: ensemble_weighting = 1.0 # Semantic search with embeddings if ensemble_weighting > 0: await event_emitter.emit_status( f"Analyzing content meaning in batches of {self.batch_size}...", False ) dense_retriever = DenseRetriever( self.embedding_model, num_results=num_results, similarity_threshold=similarity_threshold, batch_size=self.batch_size, ) dense_retriever.add_documents(chunked_docs) dense_results = dense_retriever.get_relevant_documents(query) await event_emitter.emit_status( f"Located {len(dense_results)} sections that match your query's meaning", False, ) else: dense_results = [] # Keyword search with BM25 if ensemble_weighting < 1: await event_emitter.emit_status( "Looking for matching keywords in content...", False ) keyword_retriever = BM25Retriever.from_documents( chunked_docs, k=num_results ) sparse_results = keyword_retriever.get_relevant_documents(query) await event_emitter.emit_status( f"Found {len(sparse_results)} sections with matching keywords", False ) else: sparse_results = [] # Combine results from both search methods results = weighted_reciprocal_rank( [dense_results, sparse_results], weights=[ensemble_weighting, 1 - ensemble_weighting], ) return results[:num_results] class Confluence: """Interface for Confluence API operations""" def __init__( self, username: str, api_key: str, base_url: str, included_spaces: Optional[List[str]] = None, excluded_spaces: Optional[List[str]] = None, api_key_auth: bool = True, ssl_verify: bool = True ): self.base_url = base_url self.ssl_verify = ssl_verify self.headers = self.authenticate(username, api_key, api_key_auth) self.included_spaces = included_spaces self.excluded_spaces = excluded_spaces def get(self, endpoint: str, params: Dict[str, Any]) -> Dict[str, Any]: """Make a GET request to the Confluence API""" url = f"{self.base_url}/rest/api/{endpoint}" try: response = requests.get(url, params=params, headers=self.headers, verify=self.ssl_verify) if response.status_code == 401: raise ConfluenceAuthError( "Authentication failed. Check your credentials." ) elif not response.ok: raise ConfluenceAPIError( f"Failed to get data from Confluence: {response.text}" ) return response.json() except requests.RequestException as e: raise ConfluenceAPIError( f"Network error when connecting to Confluence: {str(e)}" ) def _build_search_query( self, query: str, field: Optional[str] = None, split_terms: bool = True ) -> str: """Builds a CQL search query from keywords, optionally limiting to specific fields""" terms = query.split() if not terms or not split_terms: if field: return f'{field} ~ "{query}"' return f'text ~ "{query}" OR title ~ "{query}"' if field: cql_terms = " OR ".join([f'{field} ~ "{term}"' for term in terms]) else: cql_terms = " OR ".join( [f'title ~ "{term}" OR text ~ "{term}"' for term in terms] ) if self.included_spaces is not None and len(self.included_spaces) > 0: quoted_spaces = [f"'{space}'" for space in self.included_spaces] cql_terms += f' AND space in ({", ".join(quoted_spaces)})' if self.excluded_spaces is not None and len(self.excluded_spaces) > 0: quoted_excluded_spaces = [f"'{space}'" for space in self.excluded_spaces] cql_terms += f' AND space not in ({", ".join(quoted_excluded_spaces)})' return cql_terms def search_confluence( self, query: str, search_type: SearchType, limit: int = 5, split_terms: bool = True, ) -> List[str]: """Unified search method using the search type enum""" endpoint = "content/search" if search_type == SearchType.TITLE: cql_terms = self._build_search_query(query, "title", split_terms) elif search_type == SearchType.CONTENT: cql_terms = self._build_search_query(query, "text", split_terms) else: # TITLE_AND_CONTENT cql_terms = self._build_search_query(query, split_terms=split_terms) params = {"cql": f'({cql_terms}) AND type="page"', "limit": limit} raw_response = self.get(endpoint, params) return [item["id"] for item in raw_response.get("results", [])] def get_page(self, page_id: str) -> Dict[str, str]: """Get a specific Confluence page by ID""" endpoint = f"content/{page_id}" params = {"expand": "body.view", "include-version": "false"} result = self.get(endpoint, params) # Get page content and limit size if needed body = markdownify(result["body"]["view"]["value"]) if len(body) > MAX_PAGE_SIZE: body = body[:MAX_PAGE_SIZE] return { "id": result["id"], "title": result["title"], "body": body, "link": f'{self.base_url}{result["_links"]["webui"]}', } def authenticate_api_key(self, username: str, api_key: str) -> Dict[str, str]: """Create auth headers using username and API key""" auth_string = f"{username}:{api_key}" encoded_auth_string = base64.b64encode(auth_string.encode("utf-8")).decode( "utf-8" ) return {"Authorization": "Basic " + encoded_auth_string} def authenticate_personal_access_token(self, access_token: str) -> Dict[str, str]: """Create auth headers using personal access token""" return {"Authorization": f"Bearer {access_token}"} def authenticate( self, username: str, api_key: str, api_key_auth: bool ) -> Dict[str, str]: """Set up authentication based on configuration""" if api_key_auth: return self.authenticate_api_key(username, api_key) else: return self.authenticate_personal_access_token(api_key) class Tools: def __init__(self): self.valves = self.Valves() self.document_retriever = None class Valves(BaseModel): """Configuration options for the Confluence search tool""" base_url: str = Field( DEFAULT_BASE_URL, description="The base URL of your Confluence instance", ) ssl_verify: bool = Field( True, description="SSL verification" ) username: str = Field( DEFAULT_USERNAME, description="Default username (leave empty for personal access token)", ) api_key: str = Field( DEFAULT_API_KEY, description="Default API key or personal access token" ) api_result_limit: int = Field( DEFAULT_API_RESULT_LIMIT, description="Maximum number of pages to retrieve from Confluence API", required=True, ) embedding_model_save_path: str = Field( DEFAULT_MODEL_CACHE_DIR, description="Path to the folder in which embedding models will be saved", ) embedding_model_name: str = Field( DEFAULT_EMBEDDING_MODEL, description="Name or path of the embedding model to use", ) cpu_only: bool = Field(default=True, description="Run the tool on CPU only") chunk_size: int = Field( default=DEFAULT_CHUNK_SIZE, description="Max. chunk size for Confluence pages", ge=CHUNK_SIZE_MIN, le=CHUNK_SIZE_MAX, ) chunk_overlap: int = Field( default=DEFAULT_CHUNK_OVERLAP, description="Overlap size between chunks", ge=CHUNK_OVERLAP_MIN, le=CHUNK_OVERLAP_MAX, ) max_results: int = Field( default=DEFAULT_TOP_K, description="Maximum number of relevant chunks to return after RAG processing", ge=MAX_RESULTS_MIN, ) similarity_threshold: float = Field( default=DEFAULT_RELEVANCE_THRESHOLD, description="Similarity Score Threshold. " "Discard chunks that are not similar enough to the " "search query and hence fall below the threshold.", ge=SIMILARITY_SCORE_MIN, le=SIMILARITY_SCORE_MAX, ) ensemble_weighting: float = Field( default=0.5, description="Ensemble Weighting. " "Smaller values = More keyword oriented, Larger values = More focus on semantic similarity. " "Ignored if hybrid search is disabled.", ge=SIMILARITY_SCORE_MIN, le=SIMILARITY_SCORE_MAX, ) enable_hybrid_search: bool = Field( default=ENABLE_HYBRID_SEARCH, description="Enable hybrid search (combine semantic and keyword search)", ) full_context: bool = Field( default=RAG_FULL_CONTEXT, description="Return full document content instead of just the most relevant chunks", ) max_page_size: int = Field( default=MAX_PAGE_SIZE, description="Maximum size in characters for a Confluence page to prevent OOM", ge=MAX_PAGE_SIZE_MIN, le=MAX_PAGE_SIZE_MAX, ) batch_size: int = Field( default=BATCH_SIZE, description="Number of documents to process at once for embedding", ge=BATCH_SIZE_MIN, le=BATCH_SIZE_MAX, ) pass class UserValves(BaseModel): """User-specific configuration options""" api_key_auth: bool = Field( True, description="Use API key authentication; disable this to use a personal access token instead.", ) username: str = Field( "", description="Username, typically your email address; leave empty if using a personal access token or default settings.", ) api_key: str = Field( "", description="API key or personal access token; leave empty to use the default settings.", ) split_terms: bool = Field( True, description="Split search query into words for better search results.", ) included_confluence_spaces: str = Field( "", description="Comma-separated list of Confluence spaces to search in; leave empty to search all spaces.", ) excluded_confluence_spaces: str = Field( "", description="Comma-separated list of Confluence spaces to exclude from the search; leave empty to include all spaces.", ) pass # Get content from Confluence async def search_confluence( self, query: str, type: str, __event_emitter__: Callable[[dict], Awaitable[None]], __user__: dict = {}, ) -> str: """ Search for a query on Confluence. This returns the result of the search on Confluence. Use it to search for a query on Confluence. When a user mentions a search on Confluence, this must be used. It can search by content or by title. Note: This returns a list of pages that match the search query. :param query: The text to search for on Confluence or the title of the page if asked to search by title. MUST be a string. :param type: The type of search to perform ('content' or 'title' or 'title_and_content') :return: A list of search results from Confluence in JSON format (id, title, body, link). If no results are found, an empty list is returned. """ event_emitter = EventEmitter(__event_emitter__) try: # Convert the search type string to enum for better validation search_type = SearchType.from_string(type) # Get the username and API key if __user__ and "valves" in __user__: user_valves = __user__["valves"] api_key_auth = user_valves.api_key_auth api_username = user_valves.username or self.valves.username api_key = user_valves.api_key or self.valves.api_key split_terms = user_valves.split_terms included_confluence_spaces = user_valves.included_confluence_spaces.split(",") if user_valves.included_confluence_spaces else None if included_confluence_spaces: included_confluence_spaces = [space.strip() for space in included_confluence_spaces] excluded_confluence_spaces = user_valves.excluded_confluence_spaces.split(",") if user_valves.excluded_confluence_spaces else None if excluded_confluence_spaces: excluded_confluence_spaces = [space.strip() for space in excluded_confluence_spaces] else: api_username = self.valves.username api_key = self.valves.api_key api_key_auth = True split_terms = True if (api_key_auth and not api_username) or not api_key: await event_emitter.emit_status( "Please provide a username and API key or personal access token.", True, True, ) return "Error: Please provide a username and API key or personal access token." # Apply memory settings from valves global MAX_PAGE_SIZE, BATCH_SIZE MAX_PAGE_SIZE = self.valves.max_page_size BATCH_SIZE = self.valves.batch_size # Ensure cache directory exists model_cache_dir = ( self.valves.embedding_model_save_path or DEFAULT_MODEL_CACHE_DIR ) try: os.makedirs(model_cache_dir, exist_ok=True) except Exception as e: await event_emitter.emit_status( f"Error creating model cache directory: {str(e)}", True, True ) return f"Error: {str(e)}" # Initialize document retriever and load model with proper error handling try: if not self.document_retriever: self.document_retriever = ConfluenceDocumentRetriever( model_cache_dir=model_cache_dir, device="cpu" if self.valves.cpu_only else "cuda", embedding_model_name=self.valves.embedding_model_name, batch_size=BATCH_SIZE, ) if not self.document_retriever.embedding_model: await self.document_retriever.load_embedding_model(event_emitter) except Exception as e: await event_emitter.emit_status( f"Error loading embedding model: {str(e)}", True, True ) return f"Error: Failed to load embedding model: {str(e)}" # Create Confluence client with proper error handling try: confluence = Confluence( username=api_username, api_key=api_key, base_url=self.valves.base_url, api_key_auth=api_key_auth, ssl_verify=self.valves.ssl_verify, included_spaces=included_confluence_spaces, excluded_spaces=excluded_confluence_spaces, ) except ConfluenceAuthError as e: await event_emitter.emit_status( f"Authentication error: {str(e)}", True, True ) return f"Error: Authentication failed: {str(e)}" await event_emitter.emit_status( f"Searching Confluence for '{query}' in {search_type.value}...", False ) # Search using the Confluence API try: searchResponse = confluence.search_confluence( query, search_type, self.valves.api_result_limit, split_terms ) except ConfluenceAPIError as e: await event_emitter.emit_status( f"API error during search: {str(e)}", True, True ) return f"Error: Confluence API error: {str(e)}" if not searchResponse: await event_emitter.emit_status( f"No matching results found in Confluence for '{query}'", True ) return json.dumps([]) # Fetch the full content of each page found raw_documents = [] for i, item in enumerate(searchResponse): await event_emitter.emit_status( f"Retrieving Confluence page {i+1}/{len(searchResponse)}...", False ) page = confluence.get_page(item) raw_documents.append( Document( page_content=page["body"], metadata={ "id": page["id"], "title": page["title"], "source": page["link"], }, ) ) # If full context mode is enabled, skip RAG processing and return complete pages if self.valves.full_context: await event_emitter.emit_status( f"Preparing all {len(raw_documents)} Confluence pages for you...", False, ) # Create results with the complete content results = [] for doc in raw_documents: result = { "id": doc.metadata["id"], "title": doc.metadata["title"], "body": doc.page_content, "link": doc.metadata["source"], } # Add citations for each full document await event_emitter.emit_source( result["title"], result["link"], result["body"] ) results.append(result) await event_emitter.emit_status( f"Found {len(results)} complete Confluence pages matching '{query}'", True, ) return json.dumps(results) # Apply RAG processing to find the most relevant content elif raw_documents: await event_emitter.emit_status( f"Analyzing {len(raw_documents)} pages to find the most relevant content...", False, ) # Update text splitter settings before processing self.document_retriever.text_splitter.chunk_size = ( self.valves.chunk_size ) self.document_retriever.text_splitter.chunk_overlap = ( self.valves.chunk_overlap ) relevant_chunks = ( await self.document_retriever.retrieve_from_confluence_pages( query=query, documents=raw_documents, event_emitter=event_emitter, num_results=self.valves.max_results, similarity_threshold=self.valves.similarity_threshold, ensemble_weighting=self.valves.ensemble_weighting, enable_hybrid_search=self.valves.enable_hybrid_search, ) ) # Clear raw documents to free memory raw_documents = None # Group chunks by page to build results page_chunks = {} for chunk in relevant_chunks: page_id = chunk.metadata["id"] if page_id not in page_chunks: page_chunks[page_id] = { "id": page_id, "title": chunk.metadata["title"], "link": chunk.metadata["source"], "chunks": [], } page_chunks[page_id]["chunks"].append(chunk.page_content) # Create final results results = [] for page_id, page_data in page_chunks.items(): # Join chunks with context into a coherent body body = "\n\n".join(page_data["chunks"]) result = { "id": page_data["id"], "title": page_data["title"], "body": body, "link": page_data["link"], } # Add citations for each relevant chunk await event_emitter.emit_source( result["title"], result["link"], result["body"] ) results.append(result) else: results = [] await event_emitter.emit_status( f"Search complete! Found {len(results)} pages with information about '{query}'", True, ) return json.dumps(results) except Exception as e: await event_emitter.emit_status( f"Unexpected error during search: {str(e)}.", True, True ) return f"Error: {str(e)}"
Sponsored by Open WebUI Inc.
We are hiring!
Shape the way humanity engages with
intelligence
.