Whitepaper
Docs
Sign In
Function
Function
pipe
Deep Research at Home
Function ID
deep_research_at_home
Creator
@radeon
Downloads
1K+
flick your brain bean with a proper AI research implementation
Get
README
No README available
Function Code
Show
# I had issues with the previous/latest 7000-odd line version of this code so here's an intermediate as a stand-in # The newest version added citation management and verification but I'm still working that out # This version does most of the same cool tricks which means it takes a long time to generate the final synthesis # It's generating it in multiple parts and combining them, then editing and reviewing the result # Research generally follows a research outline created with an initial research cycle prior to use # The pipe collects user input after the initial cycle to use as continuous influence throughout the process # The research outline management balances updates with continuity to keep research grounded yet open-minded # It also uses a neat semantic compression technique to help manage context size without losing much of relevance # As-is it's intended for use with two 3090s. You could probably use gemma3:4b and play with compression level, # max cycle count, max content length in that order to have satisfactory results with 12gb # Needs my gemma3 prompt to be performant btw: github.com/atineiatte/deep-research-at-home/blob/main/gemma3-12b-prompt # Apologies to anyone who was inconvenienced by earlier buggy code. If this one doesn't work it's your fault :) import logging import json import asyncio import re import numpy as np import aiohttp from typing import Dict, List, Callable, Awaitable, Optional, Any, Union, Set, Tuple from pydantic import BaseModel, Field from sklearn.metrics.pairwise import cosine_similarity from sklearn.decomposition import PCA from open_webui.constants import TASKS from open_webui.main import generate_chat_completions from open_webui.models.users import User name = "Deep Research at Home" def setup_logger(): logger = logging.getLogger(name) if not logger.handlers: logger.setLevel(logging.DEBUG) handler = logging.StreamHandler() handler.set_name(name) formatter = logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) handler.setFormatter(formatter) logger.addHandler(handler) logger.propagate = False return logger logger = setup_logger() class EmbeddingCache: """Cache for embeddings to avoid redundant API calls""" def __init__(self, max_size=100000): self.cache = {} self.max_size = max_size self.hit_count = 0 self.miss_count = 0 def get(self, text_key): """Get embedding from cache using text as key""" # Use a hash of the text as the key to limit memory usage key = hash(text_key[:1000]) result = self.cache.get(key) if result is not None: self.hit_count += 1 return result def set(self, text_key, embedding): """Store embedding in cache""" # Use a hash of the text as the key to limit memory usage key = hash(text_key[:1000]) self.cache[key] = embedding self.miss_count += 1 # Simple LRU-like pruning if cache gets too large if len(self.cache) > self.max_size: # Remove a random key as a simple eviction strategy self.cache.pop(next(iter(self.cache))) def stats(self): """Return cache statistics""" total = self.hit_count + self.miss_count hit_rate = self.hit_count / total if total > 0 else 0 return { "size": len(self.cache), "hits": self.hit_count, "misses": self.miss_count, "hit_rate": hit_rate, } class Pipe: __current_event_emitter__: Callable[[dict], Awaitable[None]] __current_event_call__: Callable[[dict], Awaitable[Any]] __user__: User __model__: str __request__: Any class Valves(BaseModel): ENABLED: bool = Field( default=True, description="Enable Deep Research pipe", ) LARGE_MODEL: str = Field( default="gemma3:12b", description="Model for generating research queries and synthesizing results", ) SYNTHESIS_MODEL: str = Field( default="gemma3:27b", description="Optional separate model for final synthesis (leave empty to use LARGE_MODEL)", ) EMBEDDING_MODEL: str = Field( default="granite-embedding:30m", description="Model for semantic comparison of content", ) MAX_CYCLES: int = Field( default=10, description="Maximum number of research cycles before terminating", ge=3, le=50, ) MIN_CYCLES: int = Field( default=5, description="Minimum number of research cycles to perform", ge=1, le=10, ) SEARCH_RESULTS_PER_QUERY: int = Field( default=4, description="Number of search results to try per query", ge=1, le=10, ) EXTRA_RESULTS_PER_QUERY: int = Field( default=3, description="Extra results to fetch and select the most relevant from", ge=0, le=5, ) SUCCESSFUL_RESULTS_PER_QUERY: int = Field( default=1, description="Number of successful results to keep per query", ge=1, le=5, ) CHUNK_LEVEL: int = Field( default=2, description="Level of chunking (1=phrase, 2=sentence, 3=paragraph, 4+=multi-paragraph)", ge=1, le=10, ) COMPRESSION_LEVEL: int = Field( default=4, description="Level of compression (1=minimal, 10=maximum)", ge=1, le=10, ) LOCAL_INFLUENCE_RADIUS: int = Field( default=3, description="Number of chunks before and after to consider for local similarity", ge=0, le=5, ) QUERY_WEIGHT: float = Field( default=0.5, description="Weight to give query similarity vs document relevance (0.0-1.0)", ge=0.0, le=1.0, ) FOLLOWUP_WEIGHT: float = Field( default=0.5, description="Weight to give followup query vs previous comprehensive summary (0.0-1.0)", ge=0.0, le=1.0, ) TEMPERATURE: float = Field( default=0.7, description="Temperature for generation", ge=0.0, le=2.0 ) SYNTHESIS_TEMPERATURE: float = Field( default=0.6, description="Temperature for final synthesis", ge=0.0, le=2.0 ) OLLAMA_URL: str = Field( default="http://localhost:11434", description="URL for Ollama API" ) SEARCH_URL: str = Field( default="http://192.168.1.1:8888/search?q=", description="URL for web search API", ) MAX_FAILED_RESULTS: int = Field( default=3, description="Maximum number of failed results before abandoning a query", ge=1, le=10, ) EXTRACT_CONTENT_ONLY: bool = Field( default=True, description="Extract only text content from HTML, removing scripts, styles, etc.", ) PDF_MAX_PAGES: int = Field( default=25, description="Maximum number of pages to extract from a PDF", ge=5, le=500, ) HANDLE_PDFS: bool = Field( default=True, description="Enable processing of PDF files", ) RELEVANCY_SNIPPET_LENGTH: int = Field( default=1000, description="Number of characters to use when comparing extra results for relevance", ge=100, le=5000, ) INTERACTIVE_RESEARCH: bool = Field( default=True, description="Enable user interaction during research", ) USER_PREFERENCE_THROUGHOUT: bool = Field( default=True, description="Use user removal preferences throughout research cycles", ) INTERACTION_CYCLES: int = Field( default=1, description="Number of user interaction cycles", ge=1, le=5, ) SEMANTIC_TRANSFORMATION_STRENGTH: float = Field( default=0.7, description="Strength of semantic transformations for directing research (0.0-1.0)", ge=0.0, le=1.0, ) TRAJECTORY_MOMENTUM: float = Field( default=0.6, description="Weight given to previous research trajectory (0.0-1.0)", ge=0.0, le=1.0, ) GAP_EXPLORATION_WEIGHT: float = Field( default=0.4, description="Weight given to exploring research gaps (0.0-1.0)", ge=0.0, le=1.0, ) STEPPED_SYNTHESIS_COMPRESSION: bool = Field( default=True, description="Enable tiered compression for older vs newer research results", ) def __init__(self): self.type = "manifold" self.valves = self.Valves() self.prev_comprehensive_summary = "" # Store the previous comprehensive answer self.url_results_cache = {} # Cache for URL results self.url_considered_count = ( {} ) # Track URLs that were considered but not selected self.url_selected_count = {} # Track URLs that were actually shown to the user self.embedding_cache = EmbeddingCache(max_size=10000) # Larger embedding cache self.user_preferences = { "pdv": None, "strength": 0.0, "impact": 0.0, } # Store user preference direction vector with impact self.research_dimensions = None # For dimension-aware research tracking self.semantic_transformations = None # Store semantic transformation matrices self.research_trajectory = None # Store research trajectory vectors self.vocabulary_cache = None # Cache for vocabulary words self.vocabulary_embeddings = None # Cache for vocabulary embeddings self.section_synthesized_content = ( {} ) # Store synthesized content for each section self._user_input_received = False # Flag to prevent showing duplicate messages self.is_pdf_content = False # Flag for tracking if content is from a PDF # Research state tracking self._waiting_for_outline_feedback = False self._outline_feedback_data = None self._research_state = None self._research_completed = False # Flag to track if research is completed self._follow_up_mode = False # Flag to track if we're in follow-up mode self.total_tokens_used = 0 # Track total tokens used self.memory_stats = { "results_tokens": 0, "section_tokens": {}, "synthesis_tokens": 0, "total_tokens": 0, } # Track token usage statistics def pipes(self) -> list[dict[str, str]]: return [{"id": f"{name}-pipe", "name": f"{name} Pipe"}] async def count_tokens(self, text: str) -> int: """Count tokens in text using Ollama API""" try: # Use Ollama's tokenize endpoint with the specified model async with aiohttp.ClientSession() as session: payload = { "model": self.valves.LARGE_MODEL, "prompt": text[:2000], # Limit length for token counting } async with session.post( f"{self.valves.OLLAMA_URL}/api/tokenize", json=payload, timeout=10 ) as response: if response.status == 200: result = await response.json() tokens = result.get("tokens", []) # If we got only a partial count due to truncation, estimate full count if len(text) > 2000: ratio = len(text) / 2000 return int(len(tokens) * ratio) return len(tokens) except Exception as e: logger.error(f"Error counting tokens with Ollama API: {e}") # Fallback to simple estimation if API call fails words = text.split() return int(len(words) * 1.3) # Approximate token count using words async def get_embedding(self, text: str) -> Optional[List[float]]: """Get embedding for a text string using the configured embedding model with caching""" if not text or not text.strip(): return None # Check cache first cached_embedding = self.embedding_cache.get(text) if cached_embedding is not None: return cached_embedding # If not in cache, get from API async with aiohttp.ClientSession() as session: payload = { "model": self.valves.EMBEDDING_MODEL, "prompt": text, } try: async with session.post( f"{self.valves.OLLAMA_URL}/api/embeddings", json=payload, timeout=30 ) as response: if response.status == 200: result = await response.json() embedding = result.get("embedding", []) if embedding: # Cache the result self.embedding_cache.set(text, embedding) return embedding except Exception as e: logger.error(f"Error getting embedding: {e}") return None async def load_vocabulary(self): """Load the 10,000 word vocabulary for semantic analysis""" if self.vocabulary_cache is not None: return self.vocabulary_cache try: url = "https://www.mit.edu/~ecprice/wordlist.10000" async with aiohttp.ClientSession() as session: async with session.get(url, timeout=10) as response: if response.status == 200: text = await response.text() self.vocabulary_cache = [ word.strip() for word in text.splitlines() if word.strip() ] logger.info( f"Loaded {len(self.vocabulary_cache)} words vocabulary" ) return self.vocabulary_cache except Exception as e: logger.error(f"Error loading vocabulary: {e}") # Fallback - create a minimal vocabulary self.vocabulary_cache = ["research", "information", "analysis", "data", "study"] return self.vocabulary_cache async def get_vocabulary_embeddings(self): """Get embeddings for vocabulary words""" if self.vocabulary_embeddings is not None: return self.vocabulary_embeddings self.vocabulary_embeddings = {} # Load vocabulary if not already loaded vocab = await self.load_vocabulary() # Process all words without batch size limitations for word in vocab: embedding = await self.get_embedding(word) if embedding: self.vocabulary_embeddings[word] = embedding logger.info( f"Generated embeddings for {len(self.vocabulary_embeddings)} vocabulary words" ) return self.vocabulary_embeddings def chunk_text(self, text: str) -> List[str]: """Split text into chunks based on the configured chunk level""" chunk_level = self.valves.CHUNK_LEVEL # If no chunking requested, return the whole text as a single chunk if chunk_level <= 0: return [text] # Level 1: Phrase-level chunking (split by commas, colons, semicolons) if chunk_level == 1: # Split by commas, colons, semicolons that are followed by a space # First split by newlines to maintain paragraph structure paragraphs = text.split("\n") # Then split each paragraph by phrases chunks = [] for paragraph in paragraphs: if not paragraph.strip(): continue # Split paragraph into phrases paragraph_phrases = re.split(r"(?<=[,;:])\s+", paragraph) # Only add non-empty phrases for phrase in paragraph_phrases: if phrase.strip(): chunks.append(phrase.strip()) return chunks # Level 2: Sentence-level chunking (split by periods, exclamation, question marks) if chunk_level == 2: # Different handling for PDF vs regular content if self.is_pdf_content: # For PDFs: Don't remove newlines, handle sentences directly chunks = [] # Split by sentences, preserving newlines sentences = re.split(r"(?<=[.!?])\s+", text) # Only add non-empty sentences for sentence in sentences: if sentence.strip(): chunks.append(sentence.strip()) else: # For regular content: First split by paragraphs paragraphs = text.split("\n") chunks = [] for paragraph in paragraphs: if not paragraph.strip(): continue # Split paragraph into sentences sentences = re.split(r"(?<=[.!?])\s+", paragraph) # Only add non-empty sentences for sentence in sentences: if sentence.strip(): chunks.append(sentence.strip()) return chunks # Level 3: Paragraph-level chunking paragraphs = [p.strip() for p in text.split("\n") if p.strip()] if chunk_level == 3: return paragraphs # Level 4-10: Multi-paragraph chunking (4=2 paragraphs, 5=3 paragraphs, etc.) chunks = [] # Calculate how many paragraphs per chunk (chunk_level 4 = 2 paragraphs, 5 = 3 paragraphs, etc.) paragraphs_per_chunk = chunk_level - 2 for i in range(0, len(paragraphs), paragraphs_per_chunk): chunk = "\n".join(paragraphs[i : i + paragraphs_per_chunk]) chunks.append(chunk) return chunks async def compute_semantic_eigendecomposition(self, chunks, embeddings): """Perform semantic eigendecomposition on chunk embeddings""" if not chunks or not embeddings or len(chunks) < 3: return None try: # Convert embeddings to numpy array embeddings_array = np.array(embeddings) # Check for invalid values if np.isnan(embeddings_array).any() or np.isinf(embeddings_array).any(): logger.warning( "Invalid values in embeddings, cannot perform eigendecomposition" ) return None # Center the embeddings centered_embeddings = embeddings_array - np.mean(embeddings_array, axis=0) # Compute covariance matrix cov_matrix = np.cov(centered_embeddings, rowvar=False) # Perform eigendecomposition eigenvalues, eigenvectors = np.linalg.eigh(cov_matrix) # Sort by eigenvalues in descending order idx = np.argsort(eigenvalues)[::-1] eigenvalues = eigenvalues[idx] eigenvectors = eigenvectors[:, idx] # Determine how many principal components to keep total_variance = np.sum(eigenvalues) if total_variance <= 0: logger.warning( "Total variance is zero or negative, cannot continue with eigendecomposition" ) return None explained_variance_ratio = eigenvalues / total_variance # Keep components that explain 80% of variance cumulative_variance = np.cumsum(explained_variance_ratio) n_components = np.argmax(cumulative_variance >= 0.8) + 1 n_components = max(3, min(n_components, 10)) # At least 3, at most 10 # Project embeddings onto principal components principal_components = eigenvectors[:, :n_components] projected_embeddings = np.dot(centered_embeddings, principal_components) return { "eigenvalues": eigenvalues[:n_components].tolist(), "eigenvectors": principal_components.tolist(), "explained_variance": explained_variance_ratio[:n_components].tolist(), "projected_embeddings": projected_embeddings.tolist(), "n_components": n_components, } except Exception as e: logger.error(f"Error in semantic eigendecomposition: {e}") return None async def create_semantic_transformation( self, semantic_eigendecomposition, pdv=None, trajectory=None, gap_vector=None ): """Create a semantic transformation matrix based on eigendecomposition and direction vectors""" if not semantic_eigendecomposition: return None try: # Get principal components eigenvectors = np.array(semantic_eigendecomposition["eigenvectors"]) eigenvalues = np.array(semantic_eigendecomposition["eigenvalues"]) # Create initial transformation (identity) embedding_dim = eigenvectors.shape[0] transformation = np.eye(embedding_dim) # Get importance weights for each eigenvector variance_importance = eigenvalues / np.sum(eigenvalues) # Enhance dimensions based on eigenvalues (semantic importance) for i, importance in enumerate(variance_importance): eigenvector = eigenvectors[:, i] # Scale amplification by dimension importance amplification = 1.0 + importance * 2.0 # 1.0 to 3.0 # Add outer product to emphasize this dimension transformation += (amplification - 1.0) * np.outer( eigenvector, eigenvector ) # Calculate weights for different direction vectors pdv_weight = ( self.valves.SEMANTIC_TRANSFORMATION_STRENGTH * self.user_preferences.get("impact", 0.0) if pdv is not None else 0.0 ) trajectory_weight = ( self.valves.TRAJECTORY_MOMENTUM if trajectory is not None else 0.0 ) gap_weight = ( self.valves.GAP_EXPLORATION_WEIGHT if gap_vector is not None else 0.0 ) # Normalize weights to sum to at most 0.8 (leaving some room for the eigendecomposition base) total_weight = pdv_weight + trajectory_weight + gap_weight if total_weight > 0.8: scale_factor = 0.8 / total_weight pdv_weight *= scale_factor trajectory_weight *= scale_factor gap_weight *= scale_factor # Apply PDV transformation if pdv is not None and pdv_weight > 0.1: pdv_array = np.array(pdv) norm = np.linalg.norm(pdv_array) if norm > 1e-10: pdv_array = pdv_array / norm transformation += pdv_weight * np.outer(pdv_array, pdv_array) # Apply trajectory transformation if trajectory is not None and trajectory_weight > 0.1: trajectory_array = np.array(trajectory) norm = np.linalg.norm(trajectory_array) if norm > 1e-10: trajectory_array = trajectory_array / norm transformation += trajectory_weight * np.outer( trajectory_array, trajectory_array ) # Apply gap vector transformation if gap_vector is not None and gap_weight > 0.1: gap_array = np.array(gap_vector) norm = np.linalg.norm(gap_array) if norm > 1e-10: gap_array = gap_array / norm transformation += gap_weight * np.outer(gap_array, gap_array) return { "matrix": transformation.tolist(), "dimension": embedding_dim, "pdv_weight": pdv_weight, "trajectory_weight": trajectory_weight, "gap_weight": gap_weight, } except Exception as e: logger.error(f"Error creating semantic transformation: {e}") return None async def apply_semantic_transformation(self, embedding, transformation): """Apply semantic transformation to an embedding""" if not transformation or not embedding: return embedding try: # Convert to numpy arrays embedding_array = np.array(embedding) transform_matrix = np.array(transformation["matrix"]) # Check for invalid values if ( np.isnan(embedding_array).any() or np.isnan(transform_matrix).any() or np.isinf(embedding_array).any() or np.isinf(transform_matrix).any() ): logger.warning("Invalid values in embedding or transformation matrix") return embedding # Apply transformation transformed = np.dot(embedding_array, transform_matrix) # Check for valid result if np.isnan(transformed).any() or np.isinf(transformed).any(): logger.warning("Transformation produced invalid values") return embedding # Normalize to unit vector norm = np.linalg.norm(transformed) if norm > 1e-10: # Avoid division by near-zero transformed = transformed / norm return transformed.tolist() else: logger.warning("Transformation produced zero vector") return embedding except Exception as e: logger.error(f"Error applying semantic transformation: {e}") return embedding async def compress_content_with_local_similarity( self, content: str, query_embedding: List[float], summary_embedding: Optional[List[float]] = None, ratio: Optional[float] = None, ) -> str: """Apply semantic compression with local similarity influence""" # Skip compression for very short content if len(content) < 100: return content # Split content into chunks based on chunk_level chunks = self.chunk_text(content) # Skip compression if only one chunk if len(chunks) <= 1: return content # Get embeddings for chunks chunk_embeddings = [] for chunk in chunks: embedding = await self.get_embedding(chunk) if embedding: chunk_embeddings.append(embedding) # Skip compression if not enough embeddings if len(chunk_embeddings) <= 1: return content # Define compression ratio if not provided if ratio is None: compress_ratios = { 1: 0.9, # 90% - minimal compression 2: 0.8, # 80% 3: 0.7, # 70% 4: 0.6, # 60% 5: 0.5, # 50% - moderate compression 6: 0.4, # 40% 7: 0.3, # 30% 8: 0.2, # 20% 9: 0.15, # 15% 10: 0.1, # 10% - maximum compression } level = self.valves.COMPRESSION_LEVEL ratio = compress_ratios.get(level, 0.5) # Calculate how many chunks to keep n_chunks = len(chunk_embeddings) n_keep = max(1, min(n_chunks - 1, int(n_chunks * ratio))) # Ensure we're compressing at least a little if n_keep >= n_chunks: n_keep = max(1, n_chunks - 1) try: # Convert embeddings to numpy array embeddings_array = np.array(chunk_embeddings) # Calculate document centroid document_centroid = np.mean(embeddings_array, axis=0) # Calculate local similarity for each chunk local_similarities = [] local_radius = self.valves.LOCAL_INFLUENCE_RADIUS # Get from valve for i in range(len(embeddings_array)): # Calculate similarity to adjacent chunks (local influence) local_sim = 0.0 count = 0 # Check previous chunks within radius for j in range(max(0, i - local_radius), i): local_sim += cosine_similarity( [embeddings_array[i]], [embeddings_array[j]] )[0][0] count += 1 # Check next chunks within radius for j in range(i + 1, min(len(embeddings_array), i + local_radius + 1)): local_sim += cosine_similarity( [embeddings_array[i]], [embeddings_array[j]] )[0][0] count += 1 if count > 0: local_sim /= count local_similarities.append(local_sim) # Calculate importance scores with all factors importance_scores = [] for i, embedding in enumerate(embeddings_array): # Fix any NaN or Inf values if np.isnan(embedding).any() or np.isinf(embedding).any(): embedding = np.nan_to_num( embedding, nan=0.0, posinf=1.0, neginf=-1.0 ) # Calculate similarity to document centroid doc_similarity = cosine_similarity([embedding], [document_centroid])[0][ 0 ] # Calculate similarity to query query_similarity = cosine_similarity([embedding], [query_embedding])[0][ 0 ] # Calculate similarity to previous summary if provided summary_similarity = 0.0 if summary_embedding is not None: summary_similarity = cosine_similarity( [embedding], [summary_embedding] )[0][0] # Blend query and summary similarity query_similarity = ( query_similarity * self.valves.FOLLOWUP_WEIGHT ) + (summary_similarity * (1.0 - self.valves.FOLLOWUP_WEIGHT)) # Include local similarity influence local_influence = local_similarities[i] # Include preference direction vector if available pdv_alignment = 0.5 # Neutral default if ( self.valves.USER_PREFERENCE_THROUGHOUT and self.user_preferences["pdv"] is not None ): chunk_embedding_np = np.array(embedding) pdv_np = np.array(self.user_preferences["pdv"]) alignment = np.dot(chunk_embedding_np, pdv_np) pdv_alignment = (alignment + 1) / 2 # Normalize to 0-1 # Weight by preference strength pdv_influence = min(0.3, self.user_preferences["strength"] / 10) else: pdv_influence = 0.0 # Weight the factors doc_weight = ( 1.0 - self.valves.QUERY_WEIGHT ) * 0.4 # Some preference towards relevance towards query local_weight = ( 1.0 - self.valves.QUERY_WEIGHT ) * 0.8 # More preference towards standout local chunks query_weight = self.valves.QUERY_WEIGHT * (1.0 - pdv_influence) final_score = ( (doc_similarity * doc_weight) + (query_similarity * query_weight) + (local_influence * local_weight) + (pdv_alignment * pdv_influence) ) importance_scores.append((i, final_score)) # Sort chunks by importance (most important first) importance_scores.sort(key=lambda x: x[1], reverse=True) # Select the top n_keep most important chunks selected_indices = [x[0] for x in importance_scores[:n_keep]] # Sort indices to maintain original document order selected_indices.sort() # Get the selected chunks selected_chunks = [chunks[i] for i in selected_indices if i < len(chunks)] # Join compressed chunks back into text with proper formatting chunk_level = self.valves.CHUNK_LEVEL if chunk_level == 1: # Phrase level compressed_content = " ".join(selected_chunks) elif chunk_level == 2: # Sentence level processed_sentences = [] for sentence in selected_chunks: if not sentence.endswith((".", "!", "?", ":", ";")): sentence += "." processed_sentences.append(sentence) compressed_content = " ".join(processed_sentences) else: # Paragraph levels compressed_content = "\n".join(selected_chunks) return compressed_content except Exception as e: logger.error(f"Error during compression with local similarity: {e}") return content async def compress_content_with_eigendecomposition( self, content: str, query_embedding: List[float], summary_embedding: Optional[List[float]] = None, ratio: Optional[float] = None, ) -> str: """Apply semantic compression using eigendecomposition""" # Skip compression for very short content if len(content) < 200: return content # Split content into chunks based on chunk_level chunks = self.chunk_text(content) # Skip compression if only one chunk if len(chunks) <= 2: return content # Get embeddings for chunks chunk_embeddings = [] for chunk in chunks: embedding = await self.get_embedding(chunk) if embedding: chunk_embeddings.append(embedding) # Skip compression if not enough embeddings if len(chunk_embeddings) <= 2: return content # Define compression ratio if not provided if ratio is None: compress_ratios = { 1: 0.9, # 90% - minimal compression 2: 0.8, # 80% 3: 0.7, # 70% 4: 0.6, # 60% 5: 0.5, # 50% - moderate compression 6: 0.4, # 40% 7: 0.3, # 30% 8: 0.2, # 20% 9: 0.15, # 15% 10: 0.1, # 10% - maximum compression } level = self.valves.COMPRESSION_LEVEL ratio = compress_ratios.get(level, 0.5) # Calculate how many chunks to keep n_chunks = len(chunks) n_keep = max(1, min(n_chunks - 1, int(n_chunks * ratio))) # Ensure we're compressing at least a little if n_keep >= n_chunks: n_keep = max(1, n_chunks - 1) try: # Perform semantic eigendecomposition eigendecomposition = await self.compute_semantic_eigendecomposition( chunks, chunk_embeddings ) if eigendecomposition: # Calculate importance scores based on the eigendecomposition embeddings_array = np.array(chunk_embeddings) importance_scores = [] # Create basic directions directions = {} if query_embedding: directions["query"] = query_embedding if summary_embedding: directions["summary"] = summary_embedding if self.user_preferences["pdv"] is not None: directions["pdv"] = self.user_preferences["pdv"] # Create transformation transformation = await self.create_semantic_transformation( eigendecomposition, pdv=( self.user_preferences["pdv"] if self.user_preferences["impact"] > 0.1 else None ), ) # Project chunks into the principal component space for better analysis projected_chunks = eigendecomposition["projected_embeddings"] eigenvectors = np.array(eigendecomposition["eigenvectors"]) # Calculate local coherence using the eigenspace local_coherence = [] local_radius = self.valves.LOCAL_INFLUENCE_RADIUS for i in range(len(projected_chunks)): # Calculate similarity to adjacent chunks local_sim = 0.0 count = 0 # Look at previous and next chunks within radius for j in range( max(0, i - local_radius), min(len(projected_chunks), i + local_radius + 1), ): if i == j: continue # Use weighted similarity in eigenspace sim = 0.0 for k in range(eigendecomposition["n_components"]): # Weight by eigenvalue importance weight = eigendecomposition["explained_variance"][k] dim_sim = 1.0 - abs( projected_chunks[i][k] - projected_chunks[j][k] ) sim += weight * dim_sim local_sim += sim count += 1 if count > 0: local_sim /= count local_coherence.append(local_sim) # Calculate relevance to query using transformed embeddings query_relevance = [] if query_embedding: try: # Project query into eigenspace query_array = np.array(query_embedding) query_centered = query_array - np.mean(embeddings_array, axis=0) query_projected = np.dot(query_centered, eigenvectors) # Calculate relevance in eigenspace for i in range(len(projected_chunks)): relevance = 0.0 for k in range(eigendecomposition["n_components"]): weight = eigendecomposition["explained_variance"][k] dim_sim = 1.0 - abs( projected_chunks[i][k] - query_projected[k] ) relevance += weight * dim_sim query_relevance.append(relevance) except Exception as e: logger.warning(f"Error calculating query relevance: {e}") query_relevance = [0.5] * len(projected_chunks) else: # Default relevance if no query query_relevance = [0.5] * len(projected_chunks) # Combine scores for i in range(len(chunks)): if i >= len(local_coherence) or i >= len(query_relevance): continue # Weights for different factors coherence_weight = 0.4 relevance_weight = 0.6 # Adjust based on user preferences if ( self.user_preferences["pdv"] is not None and self.user_preferences["impact"] > 0.1 ): # Reduce other weights to make room for preference weight pdv_weight = min(0.3, self.user_preferences["impact"]) coherence_weight *= 1.0 - pdv_weight relevance_weight *= 1.0 - pdv_weight # Calculate PDV alignment if available if i < len(chunk_embeddings): try: chunk_embed = chunk_embeddings[i] pdv_alignment = np.dot( chunk_embed, self.user_preferences["pdv"] ) # Normalize to 0-1 pdv_alignment = (pdv_alignment + 1) / 2 except Exception as e: logger.warning(f"Error calculating PDV alignment: {e}") pdv_alignment = 0.5 else: pdv_alignment = 0.5 final_score = ( (local_coherence[i] * coherence_weight) + (query_relevance[i] * relevance_weight) + (pdv_alignment * pdv_weight) ) else: final_score = (local_coherence[i] * coherence_weight) + ( query_relevance[i] * relevance_weight ) importance_scores.append((i, final_score)) # Sort chunks by importance importance_scores.sort(key=lambda x: x[1], reverse=True) # Select the top n_keep chunks selected_indices = [x[0] for x in importance_scores[:n_keep]] # Sort to maintain document order selected_indices.sort() # Get selected chunks selected_chunks = [ chunks[i] for i in selected_indices if i < len(chunks) ] # Join compressed chunks with proper formatting chunk_level = self.valves.CHUNK_LEVEL if chunk_level == 1: # Phrase level compressed_content = " ".join(selected_chunks) elif chunk_level == 2: # Sentence level processed_sentences = [] for sentence in selected_chunks: if not sentence.endswith((".", "!", "?", ":", ";")): sentence += "." processed_sentences.append(sentence) compressed_content = " ".join(processed_sentences) else: # Paragraph levels compressed_content = "\n".join(selected_chunks) return compressed_content # Fallback if eigendecomposition fails logger.warning( "Eigendecomposition compression failed, using original method" ) return await self.compress_content_with_local_similarity( content, query_embedding, summary_embedding, ratio ) except Exception as e: logger.error(f"Error during compression with eigendecomposition: {e}") # Fall back to original compression method try: return await self.compress_content_with_local_similarity( content, query_embedding, summary_embedding, ratio ) except Exception as fallback_error: logger.error(f"Fallback compression also failed: {fallback_error}") return content # Return original content if both methods fail async def handle_repeated_content( self, content: str, url: str, query_embedding: List[float], repeat_count: int ) -> str: """Process repeated content with query-aware compression""" # Only consider URLs that were actually shown to the user selected_count = self.url_selected_count.get(url, 0) # If first occurrence or very short content, return unchanged if selected_count <= 1 or len(content) < 200: return content # Get content embedding content_embedding = await self.get_embedding(content[:2000]) if not content_embedding: return content # Calculate similarity to query similarity = cosine_similarity([content_embedding], [query_embedding])[0][0] # Use increased compression level for repeats compression_level = 6 # Reduce length for repeated content - multiply by 0.7 each time length_reduction_factor = 0.7 ** (selected_count - 1) logger.info( f"Repeat URL {url} (selected count: {selected_count}): relevance {similarity:.3f}, " f"compression level {compression_level}, length reduction {length_reduction_factor:.2f}" ) # Define compression ratios compress_ratios = { 1: 0.9, # 90% - minimal compression 2: 0.8, # 80% 3: 0.7, # 70% 4: 0.6, # 60% 5: 0.5, # 50% - moderate compression 6: 0.4, # 40% 7: 0.3, # 30% 8: 0.2, # 20% 9: 0.15, # 15% 10: 0.1, # 10% - maximum compression } # Get compression ratio and adjust by length reduction factor ratio = compress_ratios.get(compression_level, 0.5) * length_reduction_factor # Use eigendecomposition-based compression compressed = await self.compress_content_with_eigendecomposition( content, query_embedding, ratio=ratio ) # Limit the max content length to 30000 characters if len(compressed) > 30000: compressed = compressed[:30000] return compressed async def apply_stepped_compression( self, results_history: List[Dict], query_embedding: List[float], summary_embedding: Optional[List[float]] = None, ) -> List[Dict]: """Apply tiered compression to all research results based on age""" if not self.valves.STEPPED_SYNTHESIS_COMPRESSION or len(results_history) <= 2: return results_history # Make a copy to avoid modifying the original results = results_history.copy() # Divide results into first 50% (older) and second 50% (newer) mid_point = len(results) // 2 older_results = results[:mid_point] newer_results = results[mid_point:] # Track token counts before and after compression total_tokens_before = 0 total_tokens_after = 0 # Process older results with standard compression processed_older = [] for result in older_results: content = result.get("content", "") url = result.get("url", "") # Count tokens in original content original_tokens = await self.count_tokens(content) total_tokens_before += original_tokens # Skip very short content if len(content) < 300: result["tokens"] = original_tokens processed_older.append(result) total_tokens_after += original_tokens continue # Apply standard compression compression_level = self.valves.COMPRESSION_LEVEL # Map compression level to ratio compress_ratios = { 1: 0.9, # 90% - minimal compression 2: 0.8, # 80% 3: 0.7, # 70% 4: 0.6, # 60% 5: 0.5, # 50% - moderate compression 6: 0.4, # 40% 7: 0.3, # 30% 8: 0.2, # 20% 9: 0.15, # 15% 10: 0.1, # 10% - maximum compression } ratio = compress_ratios.get(compression_level, 0.5) try: # Compress using eigendecomposition compressed = await self.compress_content_with_eigendecomposition( content, query_embedding, summary_embedding, ratio ) # Count tokens in compressed content compressed_tokens = await self.count_tokens(compressed) total_tokens_after += compressed_tokens # Create new result with compressed content new_result = result.copy() new_result["content"] = compressed new_result["tokens"] = compressed_tokens # Log the token reduction logger.info( f"Standard compression (older result): {original_tokens} → {compressed_tokens} tokens " f"({compressed_tokens/original_tokens:.1%} of original)" ) processed_older.append(new_result) except Exception as e: logger.error(f"Error during standard compression: {e}") # Keep original if compression fails result["tokens"] = original_tokens processed_older.append(result) total_tokens_after += original_tokens # Process newer results with increased compression processed_newer = [] for result in newer_results: content = result.get("content", "") url = result.get("url", "") # Count tokens in original content original_tokens = await self.count_tokens(content) total_tokens_before += original_tokens # Skip very short content if len(content) < 300: result["tokens"] = original_tokens processed_newer.append(result) total_tokens_after += original_tokens continue # Apply one level higher compression for newer results compression_level = min(10, self.valves.COMPRESSION_LEVEL + 1) # Map compression level to ratio compress_ratios = { 1: 0.9, # 90% - minimal compression 2: 0.8, # 80% 3: 0.7, # 70% 4: 0.6, # 60% 5: 0.5, # 50% - moderate compression 6: 0.4, # 40% 7: 0.3, # 30% 8: 0.2, # 20% 9: 0.15, # 15% 10: 0.1, # 10% - maximum compression } ratio = compress_ratios.get(compression_level, 0.5) try: # Compress using eigendecomposition compressed = await self.compress_content_with_eigendecomposition( content, query_embedding, summary_embedding, ratio ) # Count tokens in compressed content compressed_tokens = await self.count_tokens(compressed) total_tokens_after += compressed_tokens # Create new result with compressed content new_result = result.copy() new_result["content"] = compressed new_result["tokens"] = compressed_tokens # Log the token reduction logger.info( f"Higher compression (newer result): {original_tokens} → {compressed_tokens} tokens " f"({compressed_tokens/original_tokens:.1%} of original)" ) processed_newer.append(new_result) except Exception as e: logger.error(f"Error during higher compression: {e}") # Keep original if compression fails result["tokens"] = original_tokens processed_newer.append(result) total_tokens_after += original_tokens # Log the overall token reduction token_reduction = total_tokens_before - total_tokens_after if total_tokens_before > 0: percent_reduction = (token_reduction / total_tokens_before) * 100 logger.info( f"Stepped compression total results: {total_tokens_before} → {total_tokens_after} tokens " f"(saved {token_reduction} tokens, {percent_reduction:.1f}% reduction)" ) # Update memory statistics self.memory_stats["results_tokens"] = total_tokens_after # Combine and return all results in original order return processed_older + processed_newer async def calculate_research_trajectory(self, previous_queries, successful_results): """Calculate the research trajectory based on successful searches""" if not previous_queries or not successful_results: return None try: # Get embeddings for previous queries query_embeddings = [] for query in previous_queries: embedding = await self.get_embedding(query) if embedding: query_embeddings.append(embedding) if not query_embeddings: return None # Get embeddings for successful results result_embeddings = [] for result in successful_results: content = result.get("content", "") if content: embedding = await self.get_embedding(content[:1000]) if embedding: result_embeddings.append(embedding) if not result_embeddings: return None # Calculate centroid of queries and results query_centroid = np.mean(query_embeddings, axis=0) result_centroid = np.mean(result_embeddings, axis=0) # Calculate trajectory vector (direction from queries to results) trajectory = result_centroid - query_centroid # Check for NaN or Inf values if np.isnan(trajectory).any() or np.isinf(trajectory).any(): logger.warning("Invalid values in trajectory vector") return None # Normalize norm = np.linalg.norm(trajectory) if norm > 1e-10: trajectory = trajectory / norm return trajectory.tolist() else: logger.warning("Trajectory vector has zero norm") return None except Exception as e: logger.error(f"Error calculating research trajectory: {e}") return None async def calculate_gap_vector(self): """Calculate a vector pointing toward research gaps""" if not self.research_dimensions: return None try: coverage = np.array(self.research_dimensions["coverage"]) components = np.array(self.research_dimensions["pca"].components_) # Create a weighted sum of components based on coverage gaps gap_vector = np.zeros(components.shape[1]) for i, cov in enumerate(coverage): # Calculate gap (1.0 - coverage) gap = 1.0 - cov # Only consider significant gaps if gap > 0.3: # Weight by gap size - larger gaps have more influence gap_vector += gap * components[i] # Check for NaN or Inf values if np.isnan(gap_vector).any() or np.isinf(gap_vector).any(): logger.warning("Invalid values in gap vector") return None # Normalize norm = np.linalg.norm(gap_vector) if norm > 1e-10: gap_vector = gap_vector / norm return gap_vector.tolist() else: logger.warning("Gap vector has zero norm") return None except Exception as e: logger.error(f"Error calculating gap vector: {e}") return None async def calculate_query_similarity( self, content_embedding: List[float], query_embedding: List[float], outline_embedding: Optional[List[float]] = None, summary_embedding: Optional[List[float]] = None, ) -> float: """Calculate similarity to query with optional context embeddings""" # Convert to numpy arrays c_emb = np.array(content_embedding) q_emb = np.array(query_embedding) # Normalize embeddings c_emb = c_emb / np.linalg.norm(c_emb) q_emb = q_emb / np.linalg.norm(q_emb) # Base query similarity using cosine similarity query_sim = np.dot(c_emb, q_emb) # Weight factors query_weight = 0.7 # Primary query importance outline_weight = 0.2 # Research outline importance summary_weight = 0.1 # Previous summary importance # If we have an outline embedding, include it if outline_embedding is not None: o_emb = np.array(outline_embedding) o_emb = o_emb / np.linalg.norm(o_emb) outline_sim = np.dot(c_emb, o_emb) else: outline_sim = 0.0 # Redistribute weight query_weight += outline_weight outline_weight = 0.0 # If we have a summary embedding (for follow-ups), include it if summary_embedding is not None: s_emb = np.array(summary_embedding) s_emb = s_emb / np.linalg.norm(s_emb) summary_sim = np.dot(c_emb, s_emb) else: summary_sim = 0.0 # Redistribute weight query_weight += summary_weight summary_weight = 0.0 # Weighted combination of similarities combined_sim = ( (query_sim * query_weight) + (outline_sim * outline_weight) + (summary_sim * summary_weight) ) return combined_sim async def calculate_preference_impact(self, kept_items, removed_items, all_topics): """Calculate the impact of user preferences based on the proportion modified""" if not kept_items or not removed_items: return 0.0 # Calculate impact based on proportion of items removed total_items = len(all_topics) if total_items == 0: return 0.0 impact = len(removed_items) / total_items logger.info( f"User preference impact: {impact:.3f} ({len(removed_items)}/{total_items} items removed)" ) return impact async def calculate_preference_direction_vector( self, kept_items: List[str], removed_items: List[str], all_topics: List[str] ) -> Dict: """Calculate the Preference Direction Vector based on kept and removed items""" if not kept_items or not removed_items: return {"pdv": None, "strength": 0.0, "impact": 0.0} # Get embeddings for kept items kept_embeddings = [] for item in kept_items: embedding = await self.get_embedding(item) if embedding: kept_embeddings.append(embedding) # Get embeddings for removed items removed_embeddings = [] for item in removed_items: embedding = await self.get_embedding(item) if embedding: removed_embeddings.append(embedding) if not kept_embeddings or not removed_embeddings: return {"pdv": None, "strength": 0.0, "impact": 0.0} try: # Calculate mean vectors kept_mean = np.mean(kept_embeddings, axis=0) removed_mean = np.mean(removed_embeddings, axis=0) # Check for NaN or Inf values if ( np.isnan(kept_mean).any() or np.isnan(removed_mean).any() or np.isinf(kept_mean).any() or np.isinf(removed_mean).any() ): logger.warning("Invalid values in kept or removed mean vectors") return {"pdv": None, "strength": 0.0, "impact": 0.0} # Calculate the preference direction vector pdv = kept_mean - removed_mean # Normalize the vector pdv_norm = np.linalg.norm(pdv) if pdv_norm < 1e-10: logger.warning("PDV has near-zero norm") return {"pdv": None, "strength": 0.0, "impact": 0.0} pdv = pdv / pdv_norm # Calculate preference strength (distance between centroids) strength = np.linalg.norm(kept_mean - removed_mean) # Calculate impact factor based on proportion of items removed impact = await self.calculate_preference_impact( kept_items, removed_items, all_topics ) return {"pdv": pdv.tolist(), "strength": float(strength), "impact": impact} except Exception as e: logger.error(f"Error calculating PDV: {e}") return {"pdv": None, "strength": 0.0, "impact": 0.0} async def calculate_preference_alignment(self, content_embedding, pdv): """Calculate alignment between content and preference vector""" if not pdv or not content_embedding: return 0.5 # Neutral value if we can't calculate try: # Calculate dot product between vectors alignment = np.dot(content_embedding, pdv) # Normalize to 0-1 scale (dot product is between -1 and 1) normalized = (alignment + 1) / 2 return normalized except Exception as e: logger.error(f"Error calculating preference alignment: {e}") return 0.5 # Neutral value on error async def initialize_research_dimensions( self, outline_items: List[str], user_query: str ): """Initialize the semantic dimensions for tracking research progress""" # Combine outline items and query all_text = " ".join(outline_items) + " " + user_query # Get embedding for combined text combined_embedding = await self.get_embedding(all_text) if not combined_embedding: self.research_dimensions = None return try: # Get embeddings for each outline item item_embeddings = [] for item in outline_items: embedding = await self.get_embedding(item) if embedding: item_embeddings.append(embedding) if len(item_embeddings) < 3: self.research_dimensions = None return # Apply PCA to reduce to key dimensions pca = PCA(n_components=min(10, len(item_embeddings))) embedding_array = np.array(item_embeddings) pca.fit(embedding_array) # Store the PCA model and progress trackers self.research_dimensions = { "pca": pca, "total_variance": pca.explained_variance_ratio_.sum(), "dimensions": pca.n_components_, "coverage": np.zeros(pca.n_components_), "dimension_names": await self.name_dimensions( pca, outline_items, embedding_array ), } except Exception as e: logger.error(f"Error initializing research dimensions: {e}") self.research_dimensions = None async def name_dimensions(self, pca, outline_items, embedding_array): """Name dimensions based on semantic centroids using vocabulary""" try: n_components = pca.n_components_ # Get vocabulary embeddings if not already cached if not self.vocabulary_embeddings: await self.get_vocabulary_embeddings() if not self.vocabulary_embeddings: # Fallback - use outline items return self.name_dimensions_fallback(pca, outline_items) # Project components into embedding space components = pca.components_ # Name dimensions based on semantic centroids dimension_names = [] for i in range(n_components): try: # Get component vector (this is the direction in embedding space) component = components[i] # Find vocabulary words closest to this component word_similarities = [] for word, embedding in self.vocabulary_embeddings.items(): similarity = np.dot(component, embedding) word_similarities.append((word, similarity)) # Get top positive and negative words top_positive = sorted( word_similarities, key=lambda x: x[1], reverse=True )[:2] top_negative = sorted(word_similarities, key=lambda x: x[1])[:1] # Create dimension name if top_positive: pos_terms = " + ".join([word for word, _ in top_positive]) if ( top_negative and top_negative[0][1] < -0.1 ): # Only include negative if significant neg_terms = " - ".join([word for word, _ in top_negative]) dimension_name = f"{pos_terms} vs {neg_terms}" else: dimension_name = pos_terms else: dimension_name = f"Dimension {i+1}" dimension_names.append(dimension_name) except Exception as dim_error: logger.warning(f"Error naming dimension {i}: {dim_error}") dimension_names.append(f"Dimension {i+1}") return dimension_names except Exception as e: logger.error(f"Error in semantic dimension naming: {e}") return self.name_dimensions_fallback(pca, outline_items) def name_dimensions_fallback(self, pca, outline_items): """Fallback method for naming dimensions using outline items""" try: n_components = pca.n_components_ n_items = len(outline_items) # Get the component loadings components = pca.components_ # For each component, find the outline items with highest loading dimension_names = [] for i in range(n_components): # Get the absolute loadings for this component abs_loadings = np.abs(components[i, :n_items]) # Get indices of top 2 loadings top_indices = np.argsort(abs_loadings)[-2:] # Create a name from these outline items if len(top_indices) > 0: name_parts = [ outline_items[idx] for idx in top_indices if idx < len(outline_items) ] dimension_name = " + ".join(name_parts) dimension_names.append(dimension_name) else: dimension_names.append(f"Dimension {i+1}") return dimension_names except Exception as e: logger.error(f"Error in fallback dimension naming: {e}") return [f"Dimension {i+1}" for i in range(pca.n_components_)] async def update_dimension_coverage(self, content: str, quality_score: float): """Update the coverage of research dimensions based on new content""" if not self.research_dimensions: return try: # Get embedding for the content content_embedding = await self.get_embedding(content[:2000]) if not content_embedding: return # Project onto principal components projection = self.research_dimensions["pca"].transform([content_embedding])[ 0 ] # Update coverage based on projection and quality # Higher quality content contributes more to dimension coverage contribution = np.abs(projection) * quality_score # Update coverage with diminishing returns for already-covered dimensions for i in range(len(contribution)): current_coverage = self.research_dimensions["coverage"][i] # Apply diminishing returns formula new_contribution = contribution[i] * (1 - current_coverage / 2) self.research_dimensions["coverage"][i] += new_contribution # Normalize coverage to 0-1 range max_possible = 3.0 # Maximum reasonable coverage value self.research_dimensions["coverage"] = ( np.minimum(self.research_dimensions["coverage"], max_possible) / max_possible ) except Exception as e: logger.error(f"Error updating dimension coverage: {e}") async def identify_research_gaps(self) -> List[str]: """Identify semantic dimensions that need more research""" if not self.research_dimensions: return [] try: # Find dimensions with low coverage coverage = self.research_dimensions["coverage"] dimension_names = self.research_dimensions["dimension_names"] # Sort dimensions by coverage (ascending) sorted_dims = np.argsort(coverage) # Return names of the least covered dimensions gaps = [dimension_names[i] for i in sorted_dims[:3] if coverage[i] < 0.5] return gaps except Exception as e: logger.error(f"Error identifying research gaps: {e}") return [] async def extract_text_from_html(self, html_content: str) -> str: """Extract meaningful text content from HTML""" try: # Quick regex extraction first import re # Remove script and style tags content = re.sub( r"<script[^>]*>.*?</script>", " ", html_content, flags=re.DOTALL ) content = re.sub(r"<style[^>]*>.*?</style>", " ", content, flags=re.DOTALL) content = re.sub(r"<head[^>]*>.*?</head>", " ", content, flags=re.DOTALL) # Remove HTML tags content = re.sub(r"<[^>]*>", " ", content) # Cleanup whitespace content = re.sub(r"\s+", " ", content).strip() # Try BeautifulSoup if available try: from bs4 import BeautifulSoup # Create a task for BS4 extraction def extract_with_bs4(): soup = BeautifulSoup(html_content, "html.parser") for element in soup( ["script", "style", "head", "iframe", "noscript"] ): element.decompose() text = soup.get_text() lines = (line.strip() for line in text.splitlines()) chunks = ( phrase.strip() for line in lines for phrase in line.split(" ") ) return "\n".join(chunk for chunk in chunks if chunk) # Run in executor to avoid blocking loop = asyncio.get_event_loop() bs4_extraction_task = loop.run_in_executor(None, extract_with_bs4) bs4_result = await asyncio.wait_for(bs4_extraction_task, timeout=5.0) # If BS4 extraction gave substantial content, use it if bs4_result and len(bs4_result) > len(content) * 0.5: return bs4_result # Otherwise fall back to the regex version return content except (ImportError, asyncio.TimeoutError, Exception): # Use regex version if BS4 fails return content except Exception as e: logger.error(f"Error extracting text from HTML: {e}") # Simple fallback - remove all HTML tags try: import re return re.sub(r"<[^>]*>", " ", html_content) except: return html_content async def fetch_content(self, url: str) -> str: """Fetch content from a URL using Open WebUI's functionality if possible""" try: # Add to considered URLs counter self.url_considered_count[url] = self.url_considered_count.get(url, 0) + 1 # Check if URL is in cache and use that if available if url in self.url_results_cache: logger.info(f"Using cached content for URL: {url}") return self.url_results_cache[url] # Try to use Open WebUI's fetch functionality if available try: from open_webui.routers.retrieval import fetch_url logger.debug(f"Using Open WebUI to fetch URL: {url}") fetch_task = asyncio.create_task(fetch_url(url)) content = await asyncio.wait_for(fetch_task, timeout=20.0) if content: # Check if this is a PDF by examining content if content.startswith(b"%PDF") or ( isinstance(content, str) and content.startswith("%PDF") ): logger.info(f"Detected PDF content from URL: {url}") self.is_pdf_content = True # Set the PDF flag extracted_content = await self.extract_text_from_pdf(content) self.url_results_cache[url] = extracted_content return extracted_content # If extract_content_only is true, extract just the text if ( self.valves.EXTRACT_CONTENT_ONLY and isinstance(content, str) and content.strip().startswith("<") ): self.is_pdf_content = False # Clear the PDF flag extracted = await self.extract_text_from_html(content) self.url_results_cache[url] = extracted return extracted self.is_pdf_content = False # Clear the PDF flag self.url_results_cache[url] = content return content except (ImportError, AttributeError, asyncio.TimeoutError) as e: # Fall back to direct fetch logger.warning(f"Open WebUI fetch failed, using direct fetch: {e}") pass # Direct fetch as fallback logger.debug(f"Using direct fetch for URL: {url}") # Create a ClientSession with SSL verification disabled for problematic sites ssl_context = None if "ssl:" in url or url.startswith("https://"): try: import ssl # Create a default SSL context ssl_context = ssl.create_default_context() # But make it less strict if needed if url.endswith(".org") or "certificate" in url.lower(): ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE logger.info(f"Using relaxed SSL verification for URL: {url}") except ImportError: logger.warning( "SSL module not available, proceeding without SSL context" ) # Create a connector with the SSL context connector = aiohttp.TCPConnector(ssl=ssl_context) async with aiohttp.ClientSession(connector=connector) as session: headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36", "Accept": "text/html,application/xhtml+xml,application/xml,application/pdf;q=0.9,image/webp,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.5", } # Check if URL appears to be a PDF is_pdf = url.lower().endswith(".pdf") if is_pdf: # Use binary mode for PDFs async with session.get( url, headers=headers, timeout=20.0, verify_ssl=False ) as response: if response.status == 200: # Get PDF content as bytes pdf_content = await response.read() self.is_pdf_content = True # Set the PDF flag extracted_content = await self.extract_text_from_pdf( pdf_content ) self.url_results_cache[url] = extracted_content return extracted_content else: # Normal text/HTML mode async with session.get( url, headers=headers, timeout=20.0, verify_ssl=False ) as response: if response.status == 200: # Check content type in response headers content_type = response.headers.get( "Content-Type", "" ).lower() if "application/pdf" in content_type: # This is a PDF even though the URL didn't end with .pdf pdf_content = await response.read() self.is_pdf_content = True # Set the PDF flag extracted_content = await self.extract_text_from_pdf( pdf_content ) self.url_results_cache[url] = extracted_content return extracted_content # Handle as normal HTML/text content = await response.text() self.is_pdf_content = False # Clear the PDF flag if ( self.valves.EXTRACT_CONTENT_ONLY and content.strip().startswith("<") ): extracted = await self.extract_text_from_html(content) self.url_results_cache[url] = extracted return extracted self.url_results_cache[url] = content return content logger.error( f"Error fetching URL {url}: HTTP {response.status}" ) return f"Error fetching content: HTTP status {response.status}" except asyncio.TimeoutError: logger.error(f"Timeout fetching content from {url}") return f"Timeout while fetching content from {url}" except Exception as e: logger.error(f"Error fetching content from {url}: {e}") return f"Error fetching content: {str(e)}" async def extract_text_from_pdf(self, pdf_content) -> str: """Extract text from PDF content using PyPDF2 or pdfplumber""" if not self.valves.HANDLE_PDFS: return "PDF processing is disabled in settings." # Ensure we have bytes for the PDF content if isinstance(pdf_content, str): if pdf_content.startswith("%PDF"): pdf_content = pdf_content.encode("utf-8", errors="ignore") else: return "Error: Invalid PDF content format" # Limit extraction to configured max pages to avoid too much processing max_pages = self.valves.PDF_MAX_PAGES try: # Try PyPDF2 first try: import io from PyPDF2 import PdfReader # Create a reader object pdf_file = io.BytesIO(pdf_content) pdf_reader = PdfReader(pdf_file) # Get the total number of pages num_pages = len(pdf_reader.pages) logger.info(f"PDF has {num_pages} pages, extracting up to {max_pages}") # Extract text from each page up to the limit text = [] for page_num in range(min(num_pages, max_pages)): try: page = pdf_reader.pages[page_num] page_text = page.extract_text() or "" if page_text.strip(): text.append(f"Page {page_num + 1}:\n{page_text}") except Exception as e: logger.warning(f"Error extracting page {page_num}: {e}") # Join all pages with spacing full_text = "\n\n".join(text) if full_text.strip(): logger.info( f"Successfully extracted text from PDF using PyPDF2: {len(full_text)} chars" ) # Add a note if we limited the page count if num_pages > max_pages: full_text += f"\n\n[Note: This PDF has {num_pages} pages, but only the first {max_pages} were processed.]" return full_text else: logger.warning( "PyPDF2 extraction returned empty text, trying pdfplumber..." ) except (ImportError, Exception) as e: logger.warning(f"PyPDF2 extraction failed: {e}, trying pdfplumber...") # Try pdfplumber as a fallback try: import io import pdfplumber pdf_file = io.BytesIO(pdf_content) with pdfplumber.open(pdf_file) as pdf: # Get total pages num_pages = len(pdf.pages) text = [] for i, page in enumerate(pdf.pages[:max_pages]): try: page_text = page.extract_text() or "" if page_text.strip(): text.append(f"Page {i + 1}:\n{page_text}") except Exception as page_error: logger.warning( f"Error extracting page {i} with pdfplumber: {page_error}" ) full_text = "\n\n".join(text) if full_text.strip(): logger.info( f"Successfully extracted text from PDF using pdfplumber: {len(full_text)} chars" ) # Add a note if we limited the page count if num_pages > max_pages: full_text += f"\n\n[Note: This PDF has {num_pages} pages, but only the first {max_pages} were processed.]" return full_text else: logger.warning("pdfplumber extraction returned empty text") except (ImportError, Exception) as e: logger.warning(f"pdfplumber extraction failed: {e}") # If both methods failed but we can tell it's a PDF, provide a more useful message if pdf_content.startswith(b"%PDF"): logger.warning( "PDF detected but text extraction failed. May be scanned or encrypted." ) return "This appears to be a PDF document, but text extraction failed. The PDF may contain scanned images rather than text, or it may be encrypted/protected." return "Could not extract text from PDF. The file may not be a valid PDF or may contain security restrictions." except Exception as e: logger.error(f"PDF text extraction failed: {e}") return f"Error extracting text from PDF: {str(e)}" async def sanitize_query(self, query: str) -> str: """Sanitize search query by removing quotes and handling special characters""" # Remove quotes that might cause problems with search engines sanitized = query.replace('"', " ").replace('"', " ").replace('"', " ") # Replace multiple spaces with a single space sanitized = " ".join(sanitized.split()) # Ensure the query isn't too long if len(sanitized) > 250: sanitized = sanitized[:250] logger.info(f"Sanitized query: '{query}' -> '{sanitized}'") return sanitized async def process_search_result( self, result: Dict, query: str, query_embedding: List[float], outline_embedding: List[float], summary_embedding: Optional[List[float]] = None, ) -> Dict: """Process a search result to extract and compress content""" title = result.get("title", "") url = result.get("url", "") snippet = result.get("snippet", "") await self.emit_status("info", f"Processing result: {title[:50]}...", False) try: # Check if this is a repeated URL repeat_count = 0 if url: repeat_count = self.url_selected_count.get(url, 0) # If the snippet is empty or short but we have a URL, try to fetch content if (not snippet or len(snippet) < 200) and url: await self.emit_status( "info", f"Fetching content from URL: {url}...", False ) content = await self.fetch_content(url) if content and len(content) > 200: snippet = content logger.debug( f"Successfully fetched content from URL: {url} ({len(content)} chars)" ) else: logger.warning(f"Failed to fetch useful content from URL: {url}") # If we still don't have useful content, return what we have if not snippet or len(snippet) < 200: return { "title": title or f"Result for '{query}'", "url": url, "content": snippet or f"No substantial content available for this result.", "query": query, } # For repeated URLs, apply special compression based on query relevance if repeat_count > 0: snippet = await self.handle_repeated_content( snippet, url, query_embedding, repeat_count ) # Process the content - apply compression using eigendecomposition if len(snippet) > 300: try: await self.emit_status("info", "Compressing content...", False) # Use eigendecomposition-based compression compressed_content = ( await self.compress_content_with_eigendecomposition( snippet, query_embedding, summary_embedding ) ) # If we got useful compressed content, use it if compressed_content and len(compressed_content) > 100: # Mark URL as actually selected (shown to user) if url: self.url_selected_count[url] = ( self.url_selected_count.get(url, 0) + 1 ) # Count tokens tokens = await self.count_tokens(compressed_content) return { "title": title, "url": url, "content": compressed_content, "query": query, "repeat_count": repeat_count, "tokens": tokens, } except Exception as e: logger.error(f"Error in content compression: {e}") # If compression fails, we'll fall back to using original content # If compression failed or wasn't attempted, return limited original content # Mark URL as actually selected (shown to user) if url: self.url_selected_count[url] = self.url_selected_count.get(url, 0) + 1 # Count tokens in the content limited_content = snippet[:30000] tokens = await self.count_tokens(limited_content) return { "title": title, "url": url, "content": limited_content, "query": query, "repeat_count": repeat_count, "tokens": tokens, } except Exception as e: logger.error(f"Unhandled error in process_search_result: {e}") # Return a failure result error_msg = f"Error processing search result: {str(e)}\n\nOriginal snippet: {snippet[:1000] if snippet else 'No content available'}" tokens = await self.count_tokens(error_msg) return { "title": title or f"Error processing result for '{query}'", "url": url, "content": error_msg, "query": query, "repeat_count": repeat_count if "repeat_count" in locals() else 0, "tokens": tokens, } async def _try_openwebui_search(self, query: str) -> List[Dict]: """Try to use Open WebUI's built-in search functionality""" try: from open_webui.routers.retrieval import process_web_search, SearchForm # Create a search form with the query search_form = SearchForm(query=query) # Call the search function logger.debug(f"Executing built-in search with query: {query}") # Set a timeout for this operation search_task = asyncio.create_task( process_web_search(self.__request__, search_form, user=self.__user__) ) search_results = await asyncio.wait_for(search_task, timeout=15.0) logger.debug(f"Search results received: {type(search_results)}") results = [] # Process the results if search_results: if "docs" in search_results: # Extract information from search results docs = search_results.get("docs", []) urls = search_results.get("filenames", []) logger.debug(f"Found {len(docs)} documents in search results") # Create a result object for each document for i, doc in enumerate( docs[ : self.valves.SEARCH_RESULTS_PER_QUERY + self.valves.EXTRA_RESULTS_PER_QUERY ] ): url = urls[i] if i < len(urls) else "" results.append( { "title": f"'{query}'", "url": url, "snippet": doc, } ) elif "collection_name" in search_results: # For collection-based results collection_name = search_results.get("collection_name") urls = search_results.get("filenames", []) logger.debug( f"Found collection {collection_name} with {len(urls)} documents" ) for i, url in enumerate( urls[ : self.valves.SEARCH_RESULTS_PER_QUERY + self.valves.EXTRA_RESULTS_PER_QUERY ] ): results.append( { "title": f"Search Result {i+1} from {collection_name}", "url": url, "snippet": f"Result from collection: {collection_name}", } ) return results except asyncio.TimeoutError: logger.error(f"OpenWebUI search timed out for query: {query}") return [] except Exception as e: logger.error(f"Error in _try_openwebui_search: {str(e)}") return [] async def _fallback_search(self, query: str) -> List[Dict]: """Fallback search method using direct HTTP request to search API""" try: # URL encode the query for safer search from urllib.parse import quote encoded_query = quote(query) search_url = f"{self.valves.SEARCH_URL}{encoded_query}" logger.debug(f"Using fallback search with URL: {search_url}") async with aiohttp.ClientSession() as session: # Set a timeout for this request async with session.get(search_url, timeout=15.0) as response: if response.status == 200: search_json = await response.json() results = [] if isinstance(search_json, list): for i, item in enumerate( search_json[ : self.valves.SEARCH_RESULTS_PER_QUERY + self.valves.EXTRA_RESULTS_PER_QUERY ] ): results.append( { "title": item.get("title", f"Result {i+1}"), "url": item.get("url", ""), "snippet": item.get("snippet", ""), } ) elif isinstance(search_json, dict) and "results" in search_json: for i, item in enumerate( search_json["results"][ : self.valves.SEARCH_RESULTS_PER_QUERY + self.valves.EXTRA_RESULTS_PER_QUERY ] ): results.append( { "title": item.get("title", f"Result {i+1}"), "url": item.get("url", ""), "snippet": item.get("snippet", ""), } ) return results else: logger.error( f"Fallback search returned status code {response.status}" ) return [] except asyncio.TimeoutError: logger.error(f"Fallback search timed out for query: {query}") return [] except Exception as e: logger.error(f"Error in fallback search: {str(e)}") return [] async def search_web(self, query: str) -> List[Dict]: """Perform web search with fallbacks""" logger.debug(f"Starting web search for query: {query}") # First try OpenWebUI search results = await self._try_openwebui_search(query) # If that failed, try fallback search if not results: logger.debug( f"OpenWebUI search returned no results, trying fallback search for: {query}" ) results = await self._fallback_search(query) # If we got results, return them if results: logger.debug( f"Search successful, found {len(results)} results for: {query}" ) return results # No results - create a minimal result to continue logger.warning(f"No search results found for query: {query}") return [ { "title": f"No results for '{query}'", "url": "", "snippet": f"No search results were found for the query: {query}", } ] async def select_most_relevant_results( self, results: List[Dict], query: str, query_embedding: List[float], outline_embedding: List[float], summary_embedding: Optional[List[float]] = None, ) -> List[Dict]: """Select the most relevant results from extra results pool using semantic transformations""" if not results or len(results) <= self.valves.SEARCH_RESULTS_PER_QUERY: return results # Calculate relevance scores for each result relevance_scores = [] # Get transformation if available transformation = self.semantic_transformations for i, result in enumerate(results): try: # Get a snippet for evaluation snippet = result.get("snippet", "") url = result.get("url", "") # If snippet is too short and URL is available, fetch a bit of content if len(snippet) < self.valves.RELEVANCY_SNIPPET_LENGTH and url: try: await self.emit_status( "info", f"Fetching snippet for relevance check: {url[:30]}...", False, ) # Only fetch the first part of the content for evaluation content_preview = await self.fetch_content(url) if content_preview: snippet = content_preview[ : self.valves.RELEVANCY_SNIPPET_LENGTH ] except Exception as e: logger.error(f"Error fetching content for relevance check: {e}") # Calculate relevance if we have enough content if snippet and len(snippet) > 100: # Get embedding for the snippet snippet_embedding = await self.get_embedding(snippet) if snippet_embedding: # Calculate similarity using transformation if available if transformation: # Apply transformation to both query and content transformed_query = ( await self.apply_semantic_transformation( query_embedding, transformation ) ) transformed_snippet = ( await self.apply_semantic_transformation( snippet_embedding, transformation ) ) # Calculate similarity in transformed space similarity = cosine_similarity( [transformed_snippet], [transformed_query] )[0][0] else: # Calculate basic similarity if no transformation similarity = cosine_similarity( [snippet_embedding], [query_embedding] )[0][0] # Store score for sorting relevance_scores.append((i, similarity)) # Also store in the result for future use result["similarity"] = similarity else: # No embedding, assign low score relevance_scores.append((i, 0.1)) result["similarity"] = 0.1 else: # Insufficient content, assign low score relevance_scores.append((i, 0.0)) result["similarity"] = 0.0 except Exception as e: logger.error(f"Error calculating relevance for result {i}: {e}") relevance_scores.append((i, 0.0)) result["similarity"] = 0.0 # Sort by relevance score (highest first) relevance_scores.sort(key=lambda x: x[1], reverse=True) # Select top results selected_indices = [ x[0] for x in relevance_scores[: self.valves.SEARCH_RESULTS_PER_QUERY] ] selected_results = [results[i] for i in selected_indices] # Log selection information logger.info( f"Selected {len(selected_results)} most relevant results from {len(results)} total" ) # If dimension tracking is enabled, log this search's dimension coverage if self.research_dimensions and selected_results: for result in selected_results: content = result.get("content", "") if content: # Use similarity as quality score quality = result.get("similarity", 0.5) await self.update_dimension_coverage(content, quality) return selected_results async def process_query( self, query: str, query_embedding: List[float], outline_embedding: List[float], cycle_feedback: Optional[Dict] = None, summary_embedding: Optional[List[float]] = None, ) -> List[Dict]: """Process a single search query and get results""" await self.emit_status("info", f"Searching for: {query}", False) # Sanitize the query to make it safer for search engines sanitized_query = await self.sanitize_query(query) # Get search results for the query search_results = await self.search_web(sanitized_query) if not search_results: await self.emit_message(f"*No results found for query: {query}*\n\n") return [] # If we have extra results enabled, select the most relevant if self.valves.EXTRA_RESULTS_PER_QUERY > 0: search_results = await self.select_most_relevant_results( search_results, query, query_embedding, outline_embedding, summary_embedding, ) # Process each search result until we have enough successful results successful_results = [] failed_count = 0 for result in search_results: # Stop if we've reached our target of successful results if len(successful_results) >= self.valves.SUCCESSFUL_RESULTS_PER_QUERY: break # Stop if we've had too many consecutive failures if failed_count >= self.valves.MAX_FAILED_RESULTS: await self.emit_message( f"*Skipping remaining results for query: {query} after {failed_count} failures*\n\n" ) break try: # Process the result processed_result = await self.process_search_result( result, query, query_embedding, outline_embedding, summary_embedding ) # Check if processing was successful (has substantial content) if ( processed_result and processed_result.get("content") and len(processed_result.get("content", "")) > 200 ): # Add to successful results successful_results.append(processed_result) # Get the document title for display document_title = processed_result["title"] if document_title == f"'{query}'" and processed_result["url"]: # Try to get a better title from the URL from urllib.parse import urlparse parsed_url = urlparse(processed_result["url"]) path_parts = parsed_url.path.split("/") if path_parts[-1]: file_name = path_parts[-1] # Clean up filename to use as title if file_name.endswith(".pdf"): document_title = ( file_name[:-4].replace("-", " ").replace("_", " ") ) elif "." in file_name: document_title = ( file_name.split(".")[0] .replace("-", " ") .replace("_", " ") ) else: document_title = file_name.replace("-", " ").replace( "_", " " ) else: # Use domain as title if no useful path document_title = parsed_url.netloc # Get token count for displaying token_count = processed_result.get("tokens", 0) if token_count == 0: token_count = await self.count_tokens( processed_result["content"] ) # Display the result to the user with improved formatting if processed_result["url"]: # Check if this is a PDF based on URL or content flag prefix = "Site: " url = processed_result["url"].lower() # Check if this is a PDF (either by extension or by content type detection) if ( url.endswith(".pdf") or "application/pdf" in url or self.is_pdf_content ): prefix = "PDF: " result_text = f"#### [{prefix}{document_title}]({processed_result['url']}) [{token_count} tokens]\n\n" else: result_text = ( f"#### {document_title} [{token_count} tokens]\n\n" ) result_text += f"*Search query: {query}*\n\n" result_text += f"{processed_result['content'][:2000]}...\n\n" # Add repeat indicator if this is a repeated URL repeat_count = processed_result.get("repeat_count", 0) if repeat_count > 1: result_text += f"*Note: This URL has been processed {repeat_count} times*\n\n" await self.emit_message(result_text) # Reset failed count on success failed_count = 0 else: # Count as a failure failed_count += 1 logger.warning( f"Failed to get substantial content from result {len(successful_results) + failed_count} for query: {query}" ) except Exception as e: # Count as a failure failed_count += 1 logger.error(f"Error processing result for query '{query}': {e}") await self.emit_message( f"*Error processing a result for query: {query}*\n\n" ) return successful_results async def generate_completion( self, model: str, messages: List[Dict], stream: bool = False, temperature: Optional[float] = None, ): """Generate a completion from the specified model""" try: # Use provided temperature or default from valves if temperature is None: temperature = self.valves.TEMPERATURE form_data = { "model": model, "messages": messages, "stream": stream, "temperature": temperature, } response = await generate_chat_completions( self.__request__, form_data, user=self.__user__, ) return response except Exception as e: logger.error(f"Error generating completion with model {model}: {e}") # Return a minimal valid response structure return {"choices": [{"message": {"content": f"Error: {str(e)}"}}]} async def emit_message(self, message: str): """Emit a message to the client""" try: await self.__current_event_emitter__( {"type": "message", "data": {"content": message}} ) except Exception as e: logger.error(f"Error emitting message: {e}") # Can't do much if this fails, but we don't want to crash async def emit_status(self, level: str, message: str, done: bool = False): """Emit a status message to the client""" try: # Check if research is completed if self._research_completed and not done: status = "complete" else: status = "complete" if done else "in_progress" await self.__current_event_emitter__( { "type": "status", "data": { "status": status, "level": level, "description": message, "done": done, }, } ) except Exception as e: logger.error(f"Error emitting status: {e}") # Can't do much if this fails, but we don't want to crash async def rank_topics_by_research_priority( self, active_topics: List[str], gap_vector: Optional[List[float]] = None, completed_topics: Optional[Set[str]] = None, research_results: Optional[List[Dict]] = None, ) -> List[str]: """Rank research topics by priority using semantic dimensions and gap analysis""" if not active_topics: return [] # If we only have a few topics, keep the original order if len(active_topics) <= 3: return active_topics # Initialize scores for each topic topic_scores = {} # Get gap vector if not provided if not gap_vector and self.research_dimensions: gap_vector = await self.calculate_gap_vector() # Calculate scores for each topic based on multiple factors for topic in active_topics: # Get topic embedding topic_embedding = await self.get_embedding(topic) if not topic_embedding: topic_scores[topic] = 0.5 # Default neutral score continue # Start with a base score score = 0.5 weights = {} weighted_scores = {} # Factor 1: Alignment with gap vector (most important) # This helps prioritize topics that address research gaps if gap_vector: gap_alignment = np.dot(topic_embedding, gap_vector) # Normalize to 0-1 range gap_alignment = (gap_alignment + 1) / 2 weighted_scores["gap"] = gap_alignment * 0.4 # 40% weight weights["gap"] = 0.4 # Factor 2: User preference alignment # This helps prioritize topics aligned with user preferences if ( self.user_preferences["pdv"] is not None and self.user_preferences["impact"] > 0.1 ): pdv_alignment = np.dot(topic_embedding, self.user_preferences["pdv"]) # Normalize to 0-1 range pdv_alignment = (pdv_alignment + 1) / 2 pdv_weight = min(0.3, self.user_preferences["impact"]) weighted_scores["pdv"] = pdv_alignment * pdv_weight weights["pdv"] = pdv_weight # Factor 3: Research trajectory alignment # This helps maintain momentum in the research direction if self.research_trajectory is not None: traj_alignment = np.dot(topic_embedding, self.research_trajectory) # Normalize to 0-1 range traj_alignment = (traj_alignment + 1) / 2 weighted_scores["trajectory"] = traj_alignment * 0.2 # 20% weight weights["trajectory"] = 0.2 # Factor 4: Topic novelty compared to completed research # This helps prioritize topics that are different from what's already been covered if completed_topics and len(completed_topics) > 0: # Calculate average similarity to completed topics similarity_sum = 0 for completed in completed_topics: completed_embedding = await self.get_embedding(completed) if completed_embedding: sim = cosine_similarity( [topic_embedding], [completed_embedding] )[0][0] similarity_sum += sim if completed_topics: avg_similarity = similarity_sum / len(completed_topics) # Invert - lower similarity means higher novelty novelty = 1.0 - avg_similarity weighted_scores["novelty"] = novelty * 0.2 # 20% weight weights["novelty"] = 0.2 # Factor 5: Information need based on search results # This helps prioritize topics that haven't been well-addressed by existing results if research_results and len(research_results) > 0: # Calculate average relevance of existing results to this topic relevance_sum = 0 for result in research_results[-10:]: # Consider recent results result_content = result.get("content", "")[:1000] if result_content: result_embedding = await self.get_embedding(result_content) if result_embedding: rel = cosine_similarity( [topic_embedding], [result_embedding] )[0][0] relevance_sum += rel if research_results: avg_relevance = relevance_sum / min(10, len(research_results)) # Invert - lower relevance means higher information need info_need = 1.0 - avg_relevance weighted_scores["info_need"] = info_need * 0.2 # 20% weight weights["info_need"] = 0.2 # Calculate final score as weighted average of all factors if weighted_scores: total_weight = sum(weights.values()) if total_weight > 0: score = sum(weighted_scores.values()) / total_weight # Store the score topic_scores[topic] = score # Sort topics by score (highest first) sorted_topics = sorted(topic_scores.items(), key=lambda x: x[1], reverse=True) ranked_topics = [topic for topic, score in sorted_topics] logger.info(f"Ranked {len(ranked_topics)} topics by research priority") return ranked_topics async def process_user_outline_feedback( self, outline_items: List[Dict], original_query: str ) -> Dict: """Process user feedback on research outline items by asking for feedback in chat""" # Number each outline item (maintain hierarchy but flatten for numbering) numbered_outline = [] flat_items = [] # Process the hierarchical outline structure item_num = 1 for topic_item in outline_items: topic = topic_item.get("topic", "") subtopics = topic_item.get("subtopics", []) # Add main topic with number flat_items.append(topic) numbered_outline.append(f"{item_num}. {topic}") item_num += 1 # Add subtopics with numbers for subtopic in subtopics: flat_items.append(subtopic) numbered_outline.append(f"{item_num}. {subtopic}") item_num += 1 # Prepare the outline display outline_display = "\n".join(numbered_outline) # Emit a message with instructions using improved slash commands feedback_message = ( "### Research Outline\n\n" f"{outline_display}\n\n" "**Please provide feedback on this research outline.**\n\n" "You can:\n" "- Use commands like `/keep 1,3,5-7` or `/remove 2,4,8-10` to select specific items by number\n" "- Or simply describe what topics you want to focus on or avoid in natural language\n\n" "Examples:\n" "- `/k 1,3,5-7` (keep only items 1,3,5,6,7)\n" "- `/r 2,4,8-10` (remove items 2,4,8,9,10)\n" '- "Focus on historical aspects and avoid technical details"\n' '- "I\'m more interested in practical applications than theoretical concepts"\n\n' "If you want to continue with all items, just reply 'continue' or leave your message empty.\n\n" "**I'll pause here to await your response before continuing the research.**" ) await self.emit_message(feedback_message) # Set flag to indicate we're waiting for feedback self._waiting_for_outline_feedback = True self._outline_feedback_data = { "outline_items": outline_items, "flat_items": flat_items, "numbered_outline": numbered_outline, "original_query": original_query, } # Return a default response (this will be overridden in the next call) return { "kept_items": flat_items, "removed_items": [], "kept_indices": list(range(len(flat_items))), "removed_indices": [], "preference_vector": {"pdv": None, "strength": 0.0, "impact": 0.0}, } async def process_natural_language_feedback( self, user_message: str, flat_items: List[str] ) -> Dict: """Process natural language feedback to determine which topics to keep/remove""" # Create a prompt for the model to interpret user feedback interpret_prompt = { "role": "system", "content": """You are an AI research assistant analyzing user feedback on a research outline. Based on the user's natural language input, determine which research topics should be kept or removed. The user's message expresses preferences about the research direction. Analyze this to identify: 1. Which specific topics from the outline align with their interests 2. Which specific topics should be removed based on their preferences Your task is to categorize each topic as "keep" or "remove" based on the user's natural language feedback. Provide your response as a JSON object with two lists: "keep" for indices to keep, and "remove" for indices to remove. Indices should be 0-based (first item is index 0).""", } # Prepare context with list of topics and user message topics_list = "\n".join([f"{i}. {topic}" for i, topic in enumerate(flat_items)]) context = f"""Research outline topics: {topics_list} User feedback: "{user_message}" Based on this feedback, categorize each topic (by index) as either "keep" or "remove". If the user clearly expresses a preference to focus on certain topics or avoid others, use that to guide your decisions. If the user's feedback is ambiguous about some topics, categorize them based on their similarity to clearly mentioned preferences. """ # Generate interpretation of user feedback try: response = await self.generate_completion( self.get_research_model(), [interpret_prompt, {"role": "user", "content": context}], temperature=0.2, # Low temperature for consistent interpretation ) result_content = response["choices"][0]["message"]["content"] # Extract JSON from response try: json_str = result_content[ result_content.find("{") : result_content.rfind("}") + 1 ] result_data = json.loads(json_str) # Get keep and remove lists keep_indices = result_data.get("keep", []) remove_indices = result_data.get("remove", []) # Ensure each index is in either keep or remove all_indices = set(range(len(flat_items))) missing_indices = all_indices - set(keep_indices) - set(remove_indices) # By default, keep missing indices keep_indices.extend(missing_indices) # Convert to kept and removed items kept_items = [ flat_items[i] for i in keep_indices if i < len(flat_items) ] removed_items = [ flat_items[i] for i in remove_indices if i < len(flat_items) ] logger.info( f"Natural language feedback interpretation: keep {len(kept_items)}, remove {len(removed_items)}" ) return { "kept_items": kept_items, "removed_items": removed_items, "kept_indices": keep_indices, "removed_indices": remove_indices, } except (json.JSONDecodeError, ValueError) as e: logger.error(f"Error parsing feedback interpretation: {e}") # Default to keeping all items return { "kept_items": flat_items, "removed_items": [], "kept_indices": list(range(len(flat_items))), "removed_indices": [], } except Exception as e: logger.error(f"Error interpreting natural language feedback: {e}") # Default to keeping all items return { "kept_items": flat_items, "removed_items": [], "kept_indices": list(range(len(flat_items))), "removed_indices": [], } async def process_outline_feedback_continuation(self, user_message: str): """Process the user feedback received in a continuation call""" # Get the data from the previous call feedback_data = self._outline_feedback_data outline_items = feedback_data["outline_items"] flat_items = feedback_data["flat_items"] original_query = feedback_data["original_query"] # Process the user input user_input = user_message.strip() # If user just wants to continue with all items if user_input.lower() == "continue" or not user_input: await self.emit_message( "\n*Continuing with all research outline items.*\n\n" ) return { "kept_items": flat_items, "removed_items": [], "kept_indices": list(range(len(flat_items))), "removed_indices": [], "preference_vector": {"pdv": None, "strength": 0.0, "impact": 0.0}, } # Check if it's a slash command (keep or remove) slash_keep_patterns = [r"^/k\s", r"^/keep\s"] slash_remove_patterns = [r"^/r\s", r"^/remove\s"] is_keep_cmd = any( re.match(pattern, user_input) for pattern in slash_keep_patterns ) is_remove_cmd = any( re.match(pattern, user_input) for pattern in slash_remove_patterns ) # Process slash commands if is_keep_cmd or is_remove_cmd: # Extract the item indices/ranges part if is_keep_cmd: items_part = re.sub(r"^(/k|/keep)\s+", "", user_input).replace(",", " ") else: items_part = re.sub(r"^(/r|/remove)\s+", "", user_input).replace( ",", " " ) # Process the indices and ranges selected_indices = set() for part in items_part.split(): part = part.strip() if not part: continue # Check if it's a range (e.g., 5-9) if "-" in part: try: start, end = map(int, part.split("-")) # Validate range bounds before converting to 0-indexed if ( start < 1 or start > len(flat_items) or end < 1 or end > len(flat_items) ): await self.emit_message( f"Invalid range '{part}': valid range is 1-{len(flat_items)}. Skipping." ) continue # Convert to 0-indexed start = start - 1 end = end - 1 selected_indices.update(range(start, end + 1)) except ValueError: await self.emit_message( f"Invalid range format: '{part}'. Skipping." ) else: # Single number try: idx = int(part) # Validate index before converting to 0-indexed if idx < 1 or idx > len(flat_items): await self.emit_message( f"Index {idx} out of range: valid range is 1-{len(flat_items)}. Skipping." ) continue # Convert to 0-indexed idx = idx - 1 selected_indices.add(idx) except ValueError: await self.emit_message(f"Invalid number: '{part}'. Skipping.") # Convert to lists selected_indices = sorted(list(selected_indices)) # Determine kept and removed indices based on mode if is_keep_cmd: # Keep mode - selected indices are kept, others removed kept_indices = selected_indices removed_indices = [ i for i in range(len(flat_items)) if i not in kept_indices ] else: # Remove mode - selected indices are removed, others kept removed_indices = selected_indices kept_indices = [ i for i in range(len(flat_items)) if i not in removed_indices ] # Get the actual items kept_items = [flat_items[i] for i in kept_indices if i < len(flat_items)] removed_items = [ flat_items[i] for i in removed_indices if i < len(flat_items) ] else: # Process natural language feedback nl_feedback = await self.process_natural_language_feedback( user_input, flat_items ) kept_items = nl_feedback["kept_items"] removed_items = nl_feedback["removed_items"] kept_indices = nl_feedback["kept_indices"] removed_indices = nl_feedback["removed_indices"] # Calculate preference direction vector based on kept and removed items preference_vector = await self.calculate_preference_direction_vector( kept_items, removed_items, flat_items ) # Store for later use throughout research self.user_preferences = preference_vector # Show the user what's happening await self.emit_message("\n### Feedback Processed\n") if kept_items: kept_list = "\n".join([f"✓ {item}" for item in kept_items]) await self.emit_message( f"**Keeping {len(kept_items)} items:**\n{kept_list}\n" ) if removed_items: removed_list = "\n".join([f"✗ {item}" for item in removed_items]) await self.emit_message( f"**Removing {len(removed_items)} items:**\n{removed_list}\n" ) await self.emit_message("Generating replacement items for removed topics...\n") return { "kept_items": kept_items, "removed_items": removed_items, "kept_indices": kept_indices, "removed_indices": removed_indices, "preference_vector": preference_vector, } async def group_replacement_topics(self, replacement_topics): """Group replacement topics semantically into groups of 3-6 topics each""" # Skip if too few topics if len(replacement_topics) <= 6: return [replacement_topics] # Just one group if 6 or fewer topics # Get embeddings for each topic topic_embeddings = [] for topic in replacement_topics: embedding = await self.get_embedding(topic) if embedding: topic_embeddings.append((topic, embedding)) # Calculate similarity matrix similarity_matrix = np.zeros((len(topic_embeddings), len(topic_embeddings))) for i, (topic1, emb1) in enumerate(topic_embeddings): for j, (topic2, emb2) in enumerate(topic_embeddings): if i != j: similarity_matrix[i][j] = cosine_similarity([emb1], [emb2])[0][0] # Calculate optimal number of groups total_topics = len(replacement_topics) # Calculate optimal number of groups to keep each group size between 3-6 min_groups = total_topics // 6 + ( 1 if total_topics % 6 > 0 else 0 ) # At least this many to keep groups ≤6 max_groups = total_topics // 3 # At most this many to keep groups ≥3 target_groups = max(min(max_groups, 6), min_groups) # Between min_groups and 6 # Use hierarchical clustering to group similar topics from scipy.cluster.hierarchy import linkage, fcluster Z = linkage(similarity_matrix, "average") clusters = fcluster(Z, t=target_groups, criterion="maxclust") # Group topics by cluster grouped_topics = {} for i, cluster_id in enumerate(clusters): if cluster_id not in grouped_topics: grouped_topics[cluster_id] = [] if i < len(topic_embeddings): grouped_topics[cluster_id].append(topic_embeddings[i][0]) # Check if any groups are too small or too large groups_list = list(grouped_topics.values()) # Balance groups if needed - if any group has fewer than 3 topics and we have more than 1 group if len(groups_list) > 1: # Sort groups by size (smallest first) groups_list.sort(key=len) # If smallest group is too small, merge it with the next smallest while len(groups_list) > 1 and len(groups_list[0]) < 3: smallest = groups_list.pop(0) second_smallest = groups_list.pop(0) merged = smallest + second_smallest # Insert the merged group in the right position by size for i, group in enumerate(groups_list): if len(merged) <= len(group): groups_list.insert(i, merged) break else: groups_list.append(merged) # If largest group is too large, split it while groups_list and len(groups_list[-1]) > 6: largest = groups_list.pop() # Get embeddings for topics in the largest group large_embeddings = [] for topic in largest: embedding = await self.get_embedding(topic) if embedding: large_embeddings.append((topic, embedding)) # Split the largest group into two groups if len(large_embeddings) > 6: # Simple split using k-means with k=2 from sklearn.cluster import KMeans kmeans = KMeans(n_clusters=2, random_state=0).fit( [emb for _, emb in large_embeddings] ) # Create the two split groups split1 = [] split2 = [] for i, (topic, _) in enumerate(large_embeddings): if kmeans.labels_[i] == 0: split1.append(topic) else: split2.append(topic) # Add the split groups back to the list groups_list.append(split1) groups_list.append(split2) else: # If we couldn't get embeddings properly, just split in half mid = len(largest) // 2 groups_list.append(largest[:mid]) groups_list.append(largest[mid:]) return groups_list async def generate_group_query(self, topic_group, user_message): """Generate a search query that covers a group of related topics""" if not topic_group: return user_message topics_text = ", ".join(topic_group) # Create a prompt for generating the query prompt = { "role": "system", "content": """You are a research assistant generating an effective search query. Create a search query that will find relevant information for a group of related topics. The query should be specific enough to find targeted information, but broad enough to cover all topics in the group. Make the query concise (maximum 10 words) and focused.""", } # Create the message content message = { "role": "user", "content": f"""Generate a search query for this group of topics: {topics_text} This is related to the original user query: "{user_message}" Generate a single concise search query that will find information relevant to these topics. Just respond with the search query text only.""", } # Generate the query try: response = await self.generate_completion( self.get_research_model(), [prompt, message], temperature=self.valves.TEMPERATURE * 0.7, ) query = response["choices"][0]["message"]["content"].strip() # Clean up the query: remove quotes and ensure it's not too long query = query.replace('"', "").replace('"', "").replace('"', "") # If the query is too long, truncate it if len(query.split()) > 12: query = " ".join(query.split()[:12]) return query except Exception as e: logger.error(f"Error generating group query: {e}") # Fallback: combine the first topic with the user message return f"{user_message} {topic_group[0]}" async def extract_topic_relevant_info(self, results, topics): """Extract information from search results specifically relevant to given topics""" if not results: return [] # Create a prompt for extracting relevant information extraction_prompt = { "role": "system", "content": """You are a research assistant extracting information from search results. Identify and extract information that is specifically relevant to the given topics. Format the extracted information as concise bullet points, focusing on facts, data, and insights. Ignore general information not directly related to the topics.""", } # Create context with search results and topics topics_str = ", ".join(topics) extraction_context = f"Topics: {topics_str}\n\nSearch Results:\n\n" for i, result in enumerate(results): extraction_context += f"Result {i+1}:\n" extraction_context += f"Title: {result.get('title', 'Untitled')}\n" extraction_context += f"Content: {result.get('content', '')[:1000]}...\n\n" extraction_context += "\nExtract relevant information for the listed topics from these search results." # Create messages for extraction extraction_messages = [ extraction_prompt, {"role": "user", "content": extraction_context}, ] # Extract relevant information try: response = await self.generate_completion( self.get_research_model(), extraction_messages, temperature=self.valves.TEMPERATURE * 0.4, # Lower temperature for factual extraction ) if response and "choices" in response and len(response["choices"]) > 0: extracted_info = response["choices"][0]["message"]["content"] return extracted_info else: return "No relevant information found." except Exception as e: logger.error(f"Error extracting topic-relevant information: {e}") return "Error extracting information from search results." async def refine_topics_with_research( self, topics, relevant_info, pdv, original_query ): """Refine topics based on both user preferences and research results""" # Create a prompt for refining topics refine_prompt = { "role": "system", "content": """You are a research assistant refining research topics. Based on the extracted information and user preferences, revise each topic to: 1. Be more specific and targeted based on the research findings 2. Maintain alignment with user preferences 3. Focus on aspects that will yield the most valuable information 4. Be phrased as clear, researchable questions or topics Your refined topics should incorporate new discoveries while preserving the intent of the original topics.""", } # Create context with topics, research info, and preference direction pdv_context = "" if pdv is not None: pdv_context = "\nUser preferences are directing research toward topics similar to what was kept and away from what was removed." refine_context = f"""Original topics: {', '.join(topics)} Original query: {original_query} Extracted research information: {relevant_info} {pdv_context} Refine these topics based on the research findings and user preferences. Provide a list of the same number of refined topics.""" # Create messages for refinement refine_messages = [refine_prompt, {"role": "user", "content": refine_context}] # Generate refined topics try: response = await self.generate_completion( self.get_research_model(), refine_messages, temperature=self.valves.TEMPERATURE * 0.7, # Balanced temperature for creativity with focus ) if response and "choices" in response and len(response["choices"]) > 0: refined_content = response["choices"][0]["message"]["content"] # Extract topics using regex (looking for numbered or bulleted list items) refined_topics = re.findall( r"(?:^|\n)(?:\d+\.\s*|\*\s*|-\s*)([^\n]+)", refined_content ) # If we couldn't extract enough topics, use the original ones if len(refined_topics) < len(topics): logger.warning( f"Not enough refined topics extracted ({len(refined_topics)}), using originals" ) return topics # Limit to the same number as original topics refined_topics = refined_topics[: len(topics)] return refined_topics else: return topics except Exception as e: logger.error(f"Error refining topics: {e}") return topics async def continue_research_after_feedback( self, feedback_result, user_message, outline_items, all_topics, outline_embedding, ): """Continue the research process after receiving user feedback on the outline""" kept_items = feedback_result["kept_items"] removed_items = feedback_result["removed_items"] preference_vector = feedback_result["preference_vector"] # Generate replacement topics for removed items if needed if removed_items: await self.emit_status("info", "Generating replacement topics...", False) replacement_topics = await self.generate_replacement_topics( user_message, kept_items, removed_items, preference_vector, all_topics, ) if replacement_topics: # Group replacement topics semantically topic_groups = await self.group_replacement_topics(replacement_topics) # For each group, generate and execute targeted queries group_results = [] for group in topic_groups: # Generate a query that covers this group of topics group_query = await self.generate_group_query(group, user_message) # Get query embedding query_embedding = await self.get_embedding(group_query) # Execute search for this group await self.emit_message( f"**Researching topics:** {', '.join(group)}\n**Query:** {group_query}\n\n" ) results = await self.process_query( group_query, query_embedding, outline_embedding ) group_results.append( {"topics": group, "query": group_query, "results": results} ) # Now refine each topic based on both PDV and search results refined_topics = [] for group in group_results: topics = group["topics"] results = group["results"] # Extract key information from results relevant to these topics relevant_info = await self.extract_topic_relevant_info( results, topics ) # Generate refined topics that incorporate both user preferences and new research refined = await self.refine_topics_with_research( topics, relevant_info, self.user_preferences["pdv"], user_message, ) refined_topics.extend(refined) # Use these refined topics in place of the original replacement topics replacement_topics = refined_topics # Create new research outline structure new_research_outline = [] new_all_topics = [] # First reconstruct from kept items, maintaining hierarchy for topic_item in outline_items: topic = topic_item["topic"] if topic in kept_items: # Filter subtopics to only those that were kept kept_subtopics = [ st for st in topic_item.get("subtopics", []) if st in kept_items ] if kept_subtopics: # Only add if it has kept subtopics new_topic_item = { "topic": topic, "subtopics": kept_subtopics, } new_research_outline.append(new_topic_item) new_all_topics.append(topic) new_all_topics.extend(kept_subtopics) # Add replacement topics as a new section if replacement_topics: # Group replacement topics (put half in one topic, half in another) mid = len(replacement_topics) // 2 if mid > 0: new_research_outline.append( { "topic": "Additional Research Areas (Part 1)", "subtopics": replacement_topics[:mid], } ) new_research_outline.append( { "topic": "Additional Research Areas (Part 2)", "subtopics": replacement_topics[mid:], } ) new_all_topics.append("Additional Research Areas (Part 1)") new_all_topics.extend(replacement_topics[:mid]) new_all_topics.append("Additional Research Areas (Part 2)") new_all_topics.extend(replacement_topics[mid:]) else: # Just one topic if only a few replacements new_research_outline.append( { "topic": "Additional Research Areas", "subtopics": replacement_topics, } ) new_all_topics.append("Additional Research Areas") new_all_topics.extend(replacement_topics) # Update the research outline and topic list if new_research_outline: # Only update if we have valid content research_outline = new_research_outline all_topics = new_all_topics # Update outline embedding outline_text = " ".join(all_topics) outline_embedding = await self.get_embedding(outline_text) # Re-initialize dimension tracking with new topics await self.initialize_research_dimensions(all_topics, user_message) # Show the updated outline to the user updated_outline = "### Updated Research Outline\n\n" for topic_item in research_outline: updated_outline += f"**{topic_item['topic']}**\n" for subtopic in topic_item.get("subtopics", []): updated_outline += f"- {subtopic}\n" updated_outline += "\n" await self.emit_message(updated_outline) # Updated message about continuing with main research await self.emit_message( "\n*Updated research outline with user preferences. Continuing to main research cycles...*\n\n" ) # Store the updated research state self._research_state = { "research_outline": research_outline, "all_topics": all_topics, "outline_embedding": outline_embedding, "user_message": user_message, } # Clear waiting flag self._waiting_for_outline_feedback = False return research_outline, all_topics, outline_embedding else: # If we couldn't create a valid outline, continue with original await self.emit_message( "\n*No valid outline could be created. Continuing with original outline.*\n\n" ) self._research_state = { "research_outline": outline_items, "all_topics": all_topics, "outline_embedding": outline_embedding, "user_message": user_message, } # Clear waiting flag self._waiting_for_outline_feedback = False return outline_items, all_topics, outline_embedding else: # No items were removed, continue with original outline await self.emit_message( "\n*No changes made to research outline. Continuing with original outline.*\n\n" ) self._research_state = { "research_outline": outline_items, "all_topics": all_topics, "outline_embedding": outline_embedding, "user_message": user_message, } # Clear waiting flag self._waiting_for_outline_feedback = False return outline_items, all_topics, outline_embedding async def is_follow_up_query(self, messages: List[Dict]) -> bool: """Determine if the current query is a follow-up to a previous research session""" # If we have a previous comprehensive summary and research has been completed, # treat any new query as a follow-up return bool(self.prev_comprehensive_summary and self._research_completed) async def generate_synthesis_outline( self, original_outline: List[Dict], completed_topics: Set[str], user_query: str, research_results: List[Dict], ) -> List[Dict]: """Generate a refined research outline for synthesis that better integrates additional research areas""" # Create a prompt for generating the synthesis outline synthesis_outline_prompt = { "role": "system", "content": """You are a post-grad research assistant tasked with reorganizing a research outline for the final synthesis stage. Create a refined outline that: 1. Maintains all the key topics and insights from the original research 2. Better integrates any "Additional Research Areas" items into the conceptual flow 3. Restructures the outline to reflect the progress and outcome of research activities 4. Maintains the original intent and scope of the research 5. Creates a logical narrative flow for the final comprehensive report that points to the most informative possible response to the user's original query The goal is NOT to change the research focus or redirect the outline, but rather to create a more cohesive structure that naturally incorporates all discovered topics. Format your response as a valid JSON object with the following structure: {"outline": [ {"topic": "Main topic 1", "subtopics": ["Subtopic 1.1", "Subtopic 1.2"]}, {"topic": "Main topic 2", "subtopics": ["Subtopic 2.1", "Subtopic 2.2"]} ]}""", } # Identify "Additional Research Areas" sections that need integration additional_areas = [] for topic_item in original_outline: if "Additional Research Areas" in topic_item["topic"]: additional_areas.extend(topic_item.get("subtopics", [])) # Build context from the original outline and research results outline_context = "### Original Research Outline:\n\n" for topic_item in original_outline: outline_context += f"- {topic_item['topic']}\n" for subtopic in topic_item.get("subtopics", []): outline_context += f" - {subtopic}\n" # Include information about completed topics outline_context += "\n### Completed Research Topics:\n" for topic in completed_topics: outline_context += f"- {topic}\n" # Highlight additional research areas that need integration if additional_areas: outline_context += "\n### Additional Research Areas to Integrate:\n" for area in additional_areas: outline_context += f"- {area}\n" # Include summary of research results to inform the reorganization outline_context += "\n### Research Findings Summary:\n" # Group results by query for a more structured summary query_results = {} for result in research_results[-20:]: # Use most recent results query = result.get("query", "Unknown") if query not in query_results: query_results[query] = [] query_results[query].append(result) # Add summaries of findings by query for query, results in query_results.items(): outline_context += f"\nQuery: {query}\n" for i, result in enumerate(results[:2]): # Limit to 2 results per query outline_context += f"- {result.get('title', 'Untitled result')}\n" # Add semantic dimensions if available if self.research_dimensions: try: dimension_names = self.research_dimensions.get("dimension_names", []) coverage = self.research_dimensions.get("coverage", []) if ( dimension_names and coverage and len(dimension_names) == len(coverage) ): outline_context += "\n### Research Dimensions and Coverage:\n" for i, (name, cov) in enumerate(zip(dimension_names, coverage)): outline_context += f"- {name}: {int(cov * 100)}% covered\n" except Exception as e: logger.error( f"Error adding research dimensions to outline context: {e}" ) # Create messages for the model messages = [ synthesis_outline_prompt, { "role": "user", "content": f"Original query: {user_query}\n\n{outline_context}\n\nGenerate a refined research outline for synthesis.", }, ] # Generate the synthesis outline try: await self.emit_status( "info", "Generating refined outline for synthesis...", False ) # Use synthesis model for this task synthesis_model = self.get_synthesis_model() response = await self.generate_completion( synthesis_model, messages, temperature=self.valves.SYNTHESIS_TEMPERATURE ) outline_content = response["choices"][0]["message"]["content"] # Extract JSON from response try: outline_json_str = outline_content[ outline_content.find("{") : outline_content.rfind("}") + 1 ] outline_data = json.loads(outline_json_str) synthesis_outline = outline_data.get("outline", []) return synthesis_outline except (json.JSONDecodeError, ValueError) as e: logger.error(f"Error parsing synthesis outline JSON: {e}") # Fallback: return the original outline return original_outline except Exception as e: logger.error(f"Error generating synthesis outline: {e}") return original_outline def get_synthesis_model(self): """Get the appropriate model for synthesis tasks""" if ( self.valves.SYNTHESIS_MODEL and self.valves.SYNTHESIS_MODEL != self.valves.LARGE_MODEL ): return self.valves.SYNTHESIS_MODEL return self.valves.LARGE_MODEL def get_research_model(self): """Get the appropriate model for research/mechanical tasks""" # Always use the main research model return self.valves.LARGE_MODEL async def generate_section_content( self, section_title: str, subtopics: List[str], original_query: str, research_results: List[Dict], synthesis_model: str, is_follow_up: bool = False, previous_summary: str = "", ) -> str: """Generate content for a single section of the research outline""" # Create a prompt specific to this section section_prompt = { "role": "system", "content": f"""You are a post-grad research assistant tasked with writing a comprehensive report section about subject "{section_title}" for a research report addressing original query "{original_query}". Your section should: 1. Thoroughly address the main topic "{section_title}" and all its subtopics. We paid a lot for this research - please use all of it to write this section 2. Synthesize domain-specific concepts, demonstrate equations and problems, show examples, etc. from the provided research results relevant to this topic in a highly detailed and factual way 3. NOT include any section titles, headings, or subheadings. These will be added separately. 4. Not include an introduction, conclusion, or meta text about the section's place in the overall report. This section will be dropped into place. Please just write the section content alone 5. "Solve" the query as far as the subject is concerned. Leave no stone unturned in providing a proper, summarizing answer bursting with information that demonstrates advanced knowledge of the content Focus exclusively on this section and its subtopics as the others will be handled entirely separately. Again, use all research available to you to flesh out relevant details to the greatest extent possible, but do NOT include any headings or titles in your response. """, } # Add semantic concepts for this section to enhance contextual understanding if self.research_dimensions and self.vocabulary_embeddings: try: # Get embedding for the section section_embedding = await self.get_embedding( section_title + " " + " ".join(subtopics) ) if section_embedding: # Find vocabulary words that are semantically related to this section word_similarities = [] for word, embedding in self.vocabulary_embeddings.items(): similarity = cosine_similarity( [section_embedding], [embedding] )[0][0] word_similarities.append((word, similarity)) # Get the top related words top_words = sorted( word_similarities, key=lambda x: x[1], reverse=True )[:20] # Add to the prompt as semantic context semantic_context = "Semantically related concepts to consider (for context only):\n" semantic_context += ", ".join([word for word, _ in top_words]) # Add to the prompt section_prompt["content"] += f"\n\n{semantic_context}" except Exception as e: logger.error(f"Error adding semantic concepts to section prompt: {e}") # Build context from research results that might be relevant to this section section_context = f"# Section to Write: {section_title}\n\n" # Add subtopics if subtopics: section_context += "## Subtopics to address:\n" for subtopic in subtopics: section_context += f"- {subtopic}\n" section_context += "\n" # Filter for relevant research results using semantic transformation section_context += "## Research Results Relevant to This Section:\n\n" section_embedding = await self.get_embedding( section_title + " " + " ".join(subtopics) ) # If we have an embedding, rank results by relevance to this section section_results = [] if section_embedding: # Calculate relevance scores using semantic transformation if available result_scores = [] for i, result in enumerate(research_results): content = result.get("content", "") if content: content_embedding = await self.get_embedding(content[:1000]) if content_embedding: # Apply transformation if available if ( self.semantic_transformations and self.user_preferences["impact"] > 0.1 ): transformed_section = ( await self.apply_semantic_transformation( section_embedding, self.semantic_transformations ) ) transformed_content = ( await self.apply_semantic_transformation( content_embedding, self.semantic_transformations ) ) similarity = cosine_similarity( [transformed_content], [transformed_section] )[0][0] else: similarity = cosine_similarity( [content_embedding], [section_embedding] )[0][0] result_scores.append((i, similarity)) # Sort by relevance score result_scores.sort(key=lambda x: x[1], reverse=True) # Take all results ordered by relevance (context window will naturally limit) section_results = [research_results[i] for i, _ in result_scores] else: # If no embedding, just use all results section_results = research_results # Add relevant results to context for i, result in enumerate(section_results): tokens = result.get("tokens", 0) token_info = f" [{tokens} tokens]" if tokens > 0 else "" section_context += f"Result {i+1}: {result['title']}{token_info}\n" section_context += f"Query: {result['query']}\n" section_context += f"Content: {result['content'][:1000]}...\n\n" # Include previous summary if this is a follow-up if is_follow_up and previous_summary: section_context += "## Previous Research Summary:\n\n" section_context += f"{previous_summary[:1000]}...\n\n" # Prepare final message section_context += f"\nWrite a comprehensive section about {section_title} addressing all subtopics. Remember, do NOT include headings or titles in your response." # Create messages array for completion messages = [section_prompt, {"role": "user", "content": section_context}] # Generate section content try: # Emit proper status updates await self.emit_status( "info", f"Generating content for section: {section_title}...", False ) # Calculate scaled temperature from the synthesis temperature valve scaled_temperature = self.valves.SYNTHESIS_TEMPERATURE # Use synthesis model for generating sections response = await self.generate_completion( synthesis_model, messages, stream=False, temperature=scaled_temperature, ) if response and "choices" in response and len(response["choices"]) > 0: section_content = response["choices"][0]["message"]["content"] # Count tokens in the section content tokens = await self.count_tokens(section_content) self.memory_stats["section_tokens"][section_title] = tokens # Show section headers during generation await self.emit_status( "info", f"Section generated: {section_title} [{tokens} tokens]", False, ) # Store content for later use self.section_synthesized_content[section_title] = section_content return section_content else: return f"*Error generating content for section: {section_title}*" except Exception as e: logger.error(f"Error generating section content for '{section_title}': {e}") return f"*Error generating content for section: {section_title}*" async def review_synthesis( self, compiled_sections: Dict[str, str], original_query: str, research_outline: List[Dict], synthesis_model: str, ) -> Dict[str, List[Dict]]: """Review the compiled synthesis and suggest edits""" review_prompt = { "role": "system", "content": """You are a post-grad research editor reviewing a comprehensive research report. Your task is to identify issues with the synthesis and combination of multiple sections, and suggest specific edits to improve the informational and organizational flow of the report. Focus on: 1. Finding redundancies repetitions across sections 2. Identifying areas needing better transitions between sections or where topics should be moved from one section to another 3. Making the report read as though it were written by one author who compiled these topics together for good purpose Do not: 1. Impart your own biases, interests, or preferences onto the report 2. Re-interpret the research information or soften its conclusions 3. Make useless or unnecessary revisions beyond the scope of ensuring flow from start to finish Format your response as a JSON object with edits organized by section: { "global_edits": [ {"issue": "Description of the issue", "suggestion": "Specific suggestion to fix it"} ], "section_edits": { "Section Title 1": [ {"issue": "Description of issue in this section", "suggestion": "Specific suggestion to fix it"} ], "Section Title 2": [ {"issue": "Description of issue in this section", "suggestion": "Specific suggestion to fix it"} ] } } Be precise and specific with your suggestions. Include the exact text that needs changing when possible.""", } # Add semantic concepts to the review prompt if self.research_dimensions and self.vocabulary_cache: try: # Get key concepts from research dimensions dimension_names = self.research_dimensions.get("dimension_names", []) # Extract individual words from dimension names key_concepts = [] for name in dimension_names: words = name.split() key_concepts.extend( [ w for w in words if len(w) > 3 and w.lower() in self.vocabulary_cache ] ) if key_concepts: # Add to the prompt semantic_context = "\nKey semantic concepts for this research:\n" semantic_context += ", ".join(key_concepts) review_prompt["content"] += semantic_context except Exception as e: logger.error(f"Error adding semantic concepts to review prompt: {e}") # Create context with all sections review_context = f"# Complete Research Report on: {original_query}\n\n" review_context += "## Research Outline:\n" for topic in research_outline: review_context += f"- {topic['topic']}\n" for subtopic in topic.get("subtopics", []): review_context += f" - {subtopic}\n" review_context += "\n" # Add the full content of each section review_context += "## Complete Report Content by Section:\n\n" for section_title, content in compiled_sections.items(): # Get token count for this section tokens = self.memory_stats["section_tokens"].get(section_title, 0) if tokens == 0: tokens = await self.count_tokens(content) self.memory_stats["section_tokens"][section_title] = tokens review_context += f"### {section_title} [{tokens} tokens]\n\n" review_context += f"{content}\n\n" review_context += "\nPlease review this research report and suggest specific edits to improve it." # Create messages array messages = [review_prompt, {"role": "user", "content": review_context}] # Generate the review try: await self.emit_status( "info", "Reviewing and improving the synthesis...", False ) # Scale temperature based on synthesis temperature valve review_temperature = ( self.valves.SYNTHESIS_TEMPERATURE * 0.5 ) # Lower temperature for more consistent review # Use synthesis model for reviewing response = await self.generate_completion( synthesis_model, messages, stream=False, temperature=review_temperature, ) if response and "choices" in response and len(response["choices"]) > 0: review_content = response["choices"][0]["message"]["content"] # Parse the JSON review try: review_json_str = review_content[ review_content.find("{") : review_content.rfind("}") + 1 ] review_data = json.loads(review_json_str) return review_data except (json.JSONDecodeError, ValueError) as e: logger.error(f"Error parsing review JSON: {e}") # Return a minimal structure if parsing fails return {"global_edits": [], "section_edits": {}} else: return {"global_edits": [], "section_edits": {}} except Exception as e: logger.error(f"Error generating synthesis review: {e}") return {"global_edits": [], "section_edits": {}} async def apply_review_edits( self, compiled_sections: Dict[str, str], review_data: Dict[str, Any], synthesis_model: str, ): """Apply the suggested edits from the review to improve the synthesis""" # Create deep copy of sections to modify edited_sections = compiled_sections.copy() # Track if we made any changes changes_made = False # Apply global edits first global_edits = review_data.get("global_edits", []) if global_edits: changes_made = True global_edit_prompt = { "role": "system", "content": """You are a post-grad research editor applying global edits to improve a research report. Apply the requested edits to the entire document, maintaining the overall structure and content while improving the issues identified. Return the full improved document.""", } # Create a single document with all sections full_document = "" for section_title, content in compiled_sections.items(): full_document += f"# {section_title}\n\n{content}\n\n" # Format the global edits edits_text = "The following global edits need to be applied:\n\n" for i, edit in enumerate(global_edits): edits_text += f"{i+1}. Issue: {edit.get('issue')}\n" edits_text += f" Suggestion: {edit.get('suggestion')}\n\n" # Create message global_edit_message = f"{edits_text}\n\nDocument to edit:\n\n{full_document}\n\nApply these global edits and return the improved document." # Send to model try: await self.emit_status( "info", "Applying global edits to synthesis...", False ) # Calculate temperature based on synthesis temperature edit_temperature = ( self.valves.SYNTHESIS_TEMPERATURE * 0.3 ) # Low temperature for precise edits # Use research model for applying edits research_model = self.get_research_model() response = await self.generate_completion( research_model, [ global_edit_prompt, {"role": "user", "content": global_edit_message}, ], stream=False, temperature=edit_temperature, ) if response and "choices" in response and len(response["choices"]) > 0: edited_document = response["choices"][0]["message"]["content"] # Split the edited document back into sections sections = re.split(r"(?=# [^\n]+)", edited_document) for section in sections: if section.strip(): # Parse section title and content match = re.match(r"# ([^\n]+)\n\n(.*)", section, re.DOTALL) if match: section_title = match.group(1).strip() section_content = match.group(2).strip() if section_title in edited_sections: edited_sections[section_title] = section_content except Exception as e: logger.error(f"Error applying global edits: {e}") # Apply section-specific edits section_edits = review_data.get("section_edits", {}) for section_title, edits in section_edits.items(): if section_title in edited_sections and edits: changes_made = True section_edit_prompt = { "role": "system", "content": f"""You are a post-grad research editor applying specific edits to the section "{section_title}" of a research report. Apply the requested edits to improve this section, maintaining the overall content while fixing the issues identified. Return only the improved section content.""", } # Format the section edits edits_text = f"The following edits need to be applied to the section '{section_title}':\n\n" for i, edit in enumerate(edits): edits_text += f"{i+1}. Issue: {edit.get('issue')}\n" edits_text += f" Suggestion: {edit.get('suggestion')}\n\n" # Create message section_content = edited_sections[section_title] section_edit_message = f"{edits_text}\n\nSection content to edit:\n\n{section_content}\n\nApply these edits and return the improved section content." # Send to model try: await self.emit_status( "info", f"Applying edits to section: {section_title}...", False ) # Use research model for applying edits research_model = self.get_research_model() response = await self.generate_completion( research_model, [ section_edit_prompt, {"role": "user", "content": section_edit_message}, ], stream=False, temperature=self.valves.SYNTHESIS_TEMPERATURE * 0.3, # Low temperature for precise edits ) if ( response and "choices" in response and len(response["choices"]) > 0 ): edited_section = response["choices"][0]["message"]["content"] edited_sections[section_title] = edited_section except Exception as e: logger.error( f"Error applying edits to section '{section_title}': {e}" ) # Return the edited sections and whether changes were made return edited_sections, changes_made async def generate_replacement_topics( self, query: str, kept_items: List[str], removed_items: List[str], preference_vector: Dict, outline_items: List[str], ) -> List[str]: """Generate replacement topics using semantic transformation""" # If nothing was removed, return empty list if not removed_items: return [] # If nothing was kept, use the full original outline as reference if not kept_items: kept_items = outline_items # Create a prompt to generate replacements replacement_prompt = { "role": "system", "content": """You are a post-grad research assistant generating replacement topics for a research outline. Based on the kept topics, original query, and user's preferences, generate new research topics to replace removed ones. Each new topic should: 1. Be directly relevant to answering or addressing the original query 2. Be conceptually aligned with the kept topics 3. Avoid concepts related to removed topics and their associated themes 4. Be specific and actionable for research Generate EXACTLY the requested number of replacement topics in a numbered list format. Each replacement should be thoughtful and unique, exploring different aspects of the research subject. """, } # Extract preference information pdv = preference_vector.get("pdv") strength = preference_vector.get("strength", 0.0) impact = preference_vector.get("impact", 0.0) # Prepare the request content content = f"""Original query: {query} Kept topics (conceptually preferred): {kept_items} Removed topics (to avoid): {removed_items} """ if pdv is not None: content += ( f"User preference strength: {strength:.2f} (on scale of 0.0-1.0)\n" ) content += f"User preference impact: {impact:.2f} (based on proportion of topics modified)\n\n" content += f"""Please generate EXACTLY {len(removed_items)} replacement research topics in a numbered list. These should align with the kept topics and original query, while avoiding concepts from removed topics. """ messages = [replacement_prompt, {"role": "user", "content": content}] # Generate all replacements at once try: await self.emit_status( "info", f"Generating {len(removed_items)} replacement topics...", False ) # Apply semantic transformation to the query before sending to model # This helps bias the model toward the preferred semantic direction if pdv is not None and impact > 0.1: # Create a semantic transformation query_embedding = await self.get_embedding(query) if query_embedding: # Create a simple eigendecomposition for this specific purpose kept_embeddings = [] for item in kept_items: embed = await self.get_embedding(item) if embed: kept_embeddings.append(embed) if len(kept_embeddings) >= 3: kept_array = np.array(kept_embeddings) # Simple PCA try: pca = PCA(n_components=min(3, len(kept_embeddings))) pca.fit(kept_array) eigen_data = { "eigenvectors": pca.components_.tolist(), "eigenvalues": pca.explained_variance_.tolist(), "explained_variance": pca.explained_variance_ratio_.tolist(), } # Create transformation that includes PDV transformation = await self.create_semantic_transformation( eigen_data, pdv=pdv ) # Store for later use self.semantic_transformations = transformation logger.info( f"Created semantic transformation for replacement topics generation" ) except Exception as e: logger.error( f"Error creating PCA for topic replacement: {e}" ) # Generate replacements # Use research model for generating replacements research_model = self.get_research_model() response = await self.generate_completion( research_model, messages, temperature=self.valves.TEMPERATURE * 1.1, # Slightly higher temperature for creative replacements ) if response and "choices" in response and len(response["choices"]) > 0: generated_text = response["choices"][0]["message"]["content"] # Parse the generated text to extract topics (numbered list format) lines = generated_text.split("\n") replacements = [] for line in lines: # Look for numbered list items: 1. Topic description match = re.search(r"^\s*\d+\.\s*(.+)$", line) if match: topic = match.group(1).strip() if ( topic and len(topic) > 10 ): # Minimum length to be a valid topic replacements.append(topic) # Ensure we have exactly the right number of replacements if len(replacements) > len(removed_items): replacements = replacements[: len(removed_items)] elif len(replacements) < len(removed_items): # If we didn't get enough, create generic ones to fill the gap while len(replacements) < len(removed_items): missing_count = len(removed_items) - len(replacements) await self.emit_status( "info", f"Generating {missing_count} additional topics...", False, ) replacements.append( f"Additional research on {query} aspect {len(replacements)+1}" ) return replacements except Exception as e: logger.error(f"Error generating replacement topics: {e}") # Fallback - create generic replacements return [ f"Alternative research topic {i+1} for {query}" for i in range(len(removed_items)) ] async def pipe( self, body: dict, __user__: dict, __event_emitter__=None, __event_call__=None, __task__=None, __model__=None, __request__=None, ) -> str: self.__current_event_emitter__ = __event_emitter__ self.__current_event_call__ = __event_call__ self.__user__ = User(**__user__) self.__model__ = __model__ self.__request__ = __request__ # If the pipe is disabled or it's not a default task, return if not self.valves.ENABLED or (__task__ and __task__ != TASKS.DEFAULT): return "" # Get messages from the body messages = body.get("messages", []) if not messages: return "" # Get user query from the latest message user_message = messages[-1].get("content", "").strip() if not user_message: return "" # Check if we're continuing from a previous outline feedback request if self._waiting_for_outline_feedback: # Process the user's feedback self._waiting_for_outline_feedback = False feedback_result = await self.process_outline_feedback_continuation( user_message ) # Get the needed research state for continuation research_state = self._outline_feedback_data original_query = research_state.get("original_query", "") outline_items = research_state.get("outline_items", []) flat_items = research_state.get("flat_items", []) # Retrieve all_topics and outline_embedding if we have them all_topics = [] for topic_item in outline_items: all_topics.append(topic_item["topic"]) all_topics.extend(topic_item.get("subtopics", [])) # Update outline embedding based on all_topics outline_text = " ".join(all_topics) outline_embedding = await self.get_embedding(outline_text) # Continue the research process from the outline feedback research_outline, all_topics, outline_embedding = ( await self.continue_research_after_feedback( feedback_result, original_query, outline_items, all_topics, outline_embedding, ) ) # Now continue with the main research process using the updated research state user_message = original_query # Store research state for the actual research cycles self._research_state = { "research_outline": research_outline, "all_topics": all_topics, "outline_embedding": outline_embedding, "user_message": user_message, } # Regular processing for a new research query # Initialize state tracking variables if not already done if not hasattr(self, "_waiting_for_outline_feedback"): self._waiting_for_outline_feedback = False self._outline_feedback_data = None self._research_state = None self._research_completed = False # Reset research completion flag for new queries if not self._waiting_for_outline_feedback and not self._research_state: self._research_completed = False # Check if this is a follow-up query is_follow_up = await self.is_follow_up_query(messages) self._follow_up_mode = is_follow_up # Get summary embedding if this is a follow-up summary_embedding = None if is_follow_up and self.prev_comprehensive_summary: try: await self.emit_status("info", "Processing follow-up query...", False) summary_embedding = await self.get_embedding( self.prev_comprehensive_summary ) await self.emit_message("## Deep Research Mode: Follow-up\n\n") await self.emit_message( "I'll continue researching based on your follow-up query while considering our previous findings.\n\n" ) except Exception as e: logger.error(f"Error getting summary embedding: {e}") # Continue without the summary embedding if there's an error is_follow_up = False self._follow_up_mode = False await self.emit_message("## Deep Research Mode: Activated\n\n") await self.emit_message( "I'll search for comprehensive information about your query. This might take a moment...\n\n" ) else: await self.emit_status("info", "Starting deep research...", False) await self.emit_message("## Deep Research Mode: Activated\n\n") await self.emit_message( "I'll search for comprehensive information about your query. This might take a moment...\n\n" ) # Check if we have research state from previous feedback if self._research_state: # Use the existing research state from feedback research_outline = self._research_state["research_outline"] all_topics = self._research_state["all_topics"] outline_embedding = self._research_state["outline_embedding"] user_message = self._research_state["user_message"] # Display the outline to the user outline_text = "### Research Outline\n\n" for topic in research_outline: outline_text += f"**{topic['topic']}**\n" for subtopic in topic.get("subtopics", []): outline_text += f"- {subtopic}\n" outline_text += "\n" await self.emit_message(outline_text) await self.emit_status( "info", "Continuing research with updated outline...", False ) # Skip to research cycles initial_results = [] # We'll regenerate search results else: # For follow-up queries, we need to generate a new research outline based on the previous summary if is_follow_up: # Step 1: Generate initial search queries for follow-up considering previous summary await self.emit_status( "info", "Generating initial search queries for follow-up...", False ) initial_query_prompt = { "role": "system", "content": """You are a post-grad research assistant generating effective search queries. Based on the user's follow-up question and the previous research summary, generate 3 initial search queries. Each query should be specific, use relevant keywords, and be designed to find new information that builds on the previous research. Format your response as a valid JSON object with the following structure: {"queries": [ "search query 1", "search query 2", "search query 3" ]}""", } initial_query_messages = [ initial_query_prompt, { "role": "user", "content": f"Follow-up question: {user_message}\n\nPrevious research summary:\n{self.prev_comprehensive_summary[:2000]}...\n\nGenerate initial search queries that build on the previous research.", }, ] # Get initial search queries query_response = await self.generate_completion( self.get_research_model(), initial_query_messages, temperature=self.valves.TEMPERATURE, ) query_content = query_response["choices"][0]["message"]["content"] # Extract JSON from response try: query_json_str = query_content[ query_content.find("{") : query_content.rfind("}") + 1 ] query_data = json.loads(query_json_str) initial_queries = query_data.get("queries", []) except (json.JSONDecodeError, ValueError) as e: logger.error(f"Error parsing query JSON: {e}") # Fallback: extract queries using regex if JSON parsing fails import re initial_queries = re.findall(r'"([^"]+)"', query_content)[:3] if not initial_queries: initial_queries = ["Information about " + user_message] # Display the queries to the user await self.emit_message(f"### Initial Follow-up Research Queries\n\n") for i, query in enumerate(initial_queries): await self.emit_message(f"**Query {i+1}**: {query}\n\n") # Execute initial searches with the follow-up queries # Use summary embedding for context relevance initial_results = [] for query in initial_queries: # Get query embedding query_embedding = await self.get_embedding(query) # Use previous summary as outline for now if not query_embedding: query_embedding = [0] * 384 # Default embedding size # Process the query with summary embedding for context results = await self.process_query( query, query_embedding, None, None, summary_embedding ) initial_results.extend(results) # Generate research outline that incorporates previous findings and new follow-up await self.emit_status( "info", "Generating research outline for follow-up...", False ) outline_prompt = { "role": "system", "content": """You are a post-grad research assistant creating a structured research outline. Based on the user's follow-up question, previous research summary, and new search results, create a comprehensive outline that builds on the previous research while addressing the new aspects from the follow-up question. The outline should: 1. Include relevant topics from the previous research that provide context 2. Add new topics that specifically address the follow-up question 3. Be organized in a hierarchical structure with main topics and subtopics 4. Focus on aspects that weren't covered in depth in the previous research Format your response as a valid JSON object with the following structure: {"outline": [ {"topic": "Main topic 1", "subtopics": ["Subtopic 1.1", "Subtopic 1.2"]}, {"topic": "Main topic 2", "subtopics": ["Subtopic 2.1", "Subtopic 2.2"]} ]}""", } # Build context from initial search results and previous summary outline_context = "### Previous Research Summary:\n\n" outline_context += f"{self.prev_comprehensive_summary[:2000]}...\n\n" outline_context += "### New Search Results:\n\n" for i, result in enumerate(initial_results): outline_context += f"Result {i+1} (Query: '{result['query']}')\n" outline_context += f"Title: {result['title']}\n" outline_context += f"Content: {result['content'][:1000]}...\n\n" outline_messages = [ outline_prompt, { "role": "user", "content": f"Follow-up question: {user_message}\n\n{outline_context}\n\nGenerate a comprehensive research outline that builds on previous research while addressing the follow-up question.", }, ] # Generate the research outline outline_response = await self.generate_completion( self.get_research_model(), outline_messages ) outline_content = outline_response["choices"][0]["message"]["content"] # Extract JSON from response try: outline_json_str = outline_content[ outline_content.find("{") : outline_content.rfind("}") + 1 ] outline_data = json.loads(outline_json_str) research_outline = outline_data.get("outline", []) except (json.JSONDecodeError, ValueError) as e: logger.error(f"Error parsing outline JSON: {e}") # Fallback: create a simple outline if JSON parsing fails research_outline = [ { "topic": "Follow-up Information", "subtopics": ["Key Aspects", "New Developments"], }, { "topic": "Extended Analysis", "subtopics": ["Additional Details", "Further Examples"], }, ] # Create a flat list of all topics for tracking all_topics = [] for topic_item in research_outline: all_topics.append(topic_item["topic"]) all_topics.extend(topic_item.get("subtopics", [])) # Create outline embedding outline_text = " ".join(all_topics) outline_embedding = await self.get_embedding(outline_text) # Initialize research dimensions await self.initialize_research_dimensions(all_topics, user_message) # Display the outline to the user outline_text = "### Research Outline for Follow-up\n\n" for topic in research_outline: outline_text += f"**{topic['topic']}**\n" for subtopic in topic.get("subtopics", []): outline_text += f"- {subtopic}\n" outline_text += "\n" await self.emit_message(outline_text) await self.emit_message( "\n*Continuing with research based on this outline and previous findings...*\n\n" ) else: # Regular new query - generate initial search queries await self.emit_status( "info", "Generating initial search queries...", False ) initial_query_prompt = { "role": "system", "content": """You are a post-grad research assistant generating effective search queries. Based on the user's question, generate 3 initial search queries to begin research. Each query should be specific, use relevant keywords, and be designed to find information to help answer the question. Format your response as a valid JSON object with the following structure: {"queries": [ "search query 1", "search query 2", "search query 3" ]}""", } initial_query_messages = [ initial_query_prompt, { "role": "user", "content": f"Generate initial search queries for: {user_message}", }, ] # Get initial search queries query_response = await self.generate_completion( self.get_research_model(), initial_query_messages, temperature=self.valves.TEMPERATURE, ) query_content = query_response["choices"][0]["message"]["content"] # Extract JSON from response try: query_json_str = query_content[ query_content.find("{") : query_content.rfind("}") + 1 ] query_data = json.loads(query_json_str) initial_queries = query_data.get("queries", []) except (json.JSONDecodeError, ValueError) as e: logger.error(f"Error parsing query JSON: {e}") # Fallback: extract queries using regex if JSON parsing fails import re initial_queries = re.findall(r'"([^"]+)"', query_content)[:3] if not initial_queries: initial_queries = ["Information about " + user_message] # Display the queries to the user await self.emit_message(f"### Initial Research Queries\n\n") for i, query in enumerate(initial_queries): await self.emit_message(f"**Query {i+1}**: {query}\n\n") # Step 2: Execute initial searches and collect results # Get outline embedding (placeholder - will be updated after outline is created) outline_embedding = await self.get_embedding(user_message) initial_results = [] for query in initial_queries: # Get query embedding for content comparison try: await self.emit_status( "info", f"Getting embedding for query: {query}", False ) query_embedding = await self.get_embedding(query) if not query_embedding: # If we can't get an embedding from the model, create a default one logger.warning( f"Failed to get embedding for '{query}', using default" ) query_embedding = [0] * 384 # Default embedding size except Exception as e: logger.error(f"Error getting embedding: {e}") query_embedding = [0] * 384 # Default embedding size # Process the query and get results results = await self.process_query( query, query_embedding, outline_embedding, None, summary_embedding, ) # Add successful results to our collection initial_results.extend(results) # Check if we got any useful results useful_results = [ r for r in initial_results if len(r.get("content", "")) > 200 ] # If we didn't get any useful results, create a minimal result to continue if not useful_results: await self.emit_message( f"*Unable to find initial search results. Creating research outline based on the query alone.*\n\n" ) initial_results = [ { "title": f"Information about {user_message}", "url": "", "content": f"This is a placeholder for research about {user_message}. The search failed to return usable results.", "query": user_message, } ] else: # Log the successful results logger.info( f"Found {len(useful_results)} useful results from initial queries" ) # Step 3: Generate research outline based on user query AND initial results await self.emit_status( "info", "Analyzing initial results and generating research outline...", False, ) outline_prompt = { "role": "system", "content": """You are a post-grad research assistant tasked with creating a structured research outline. Based on the user's query and the initial search results, create a comprehensive outline of the information needed to completely and thoroughly address the user's input. The outline should: 1. Break down the query into key concepts that need to be researched and key details about important figures 2. Identify specific information needs, questions to answer, and trickier edge cases or technical details to figure out 3. Be organized in a hierarchical structure with main topics and subtopics 4. Include topics discovered in the initial search results as they relate to addressing the user's input 5. Prioritize topics that seem most relevant to answering the query and that will reasonably result in worthwhile expanded research 6. Be realistic. Some questions deserve a philosophical or theoretical approach, but not many. Aim to keep it to things a graduate student could Google and eventually answer Format your response as a valid JSON object with the following structure: {"outline": [ {"topic": "Main topic 1", "subtopics": ["Subtopic 1.1", "Subtopic 1.2"]}, {"topic": "Main topic 2", "subtopics": ["Subtopic 2.1", "Subtopic 2.2"]} ]}""", } # Build context from initial search results outline_context = "### Initial Search Results:\n\n" for i, result in enumerate(initial_results): outline_context += f"Result {i+1} (Query: '{result['query']}')\n" outline_context += f"Title: {result['title']}\n" outline_context += f"Content: {result['content'][:1000]}...\n\n" outline_messages = [ outline_prompt, { "role": "user", "content": f"Original query: {user_message}\n\n{outline_context}\n\nGenerate a comprehensive research outline.", }, ] # Generate the research outline outline_response = await self.generate_completion( self.get_research_model(), outline_messages ) outline_content = outline_response["choices"][0]["message"]["content"] # Extract JSON from response try: outline_json_str = outline_content[ outline_content.find("{") : outline_content.rfind("}") + 1 ] outline_data = json.loads(outline_json_str) research_outline = outline_data.get("outline", []) except (json.JSONDecodeError, ValueError) as e: logger.error(f"Error parsing outline JSON: {e}") # Fallback: create a simple outline if JSON parsing fails research_outline = [ { "topic": "General Information", "subtopics": ["Background", "Key Concepts"], }, { "topic": "Specific Aspects", "subtopics": ["Detailed Analysis", "Examples"], }, ] # Create a flat list of all topics and subtopics for tracking completeness all_topics = [] for topic_item in research_outline: all_topics.append(topic_item["topic"]) all_topics.extend(topic_item.get("subtopics", [])) # Update outline embedding now that we have the actual outline outline_text = " ".join(all_topics) outline_embedding = await self.get_embedding(outline_text) # Initialize dimension-aware research tracking await self.initialize_research_dimensions(all_topics, user_message) # User interaction for outline feedback (if enabled) if self.valves.INTERACTIVE_RESEARCH: # Get user feedback on the research outline if not self._waiting_for_outline_feedback: # Display the outline to the user outline_text = "### Research Outline\n\n" for topic in research_outline: outline_text += f"**{topic['topic']}**\n" for subtopic in topic.get("subtopics", []): outline_text += f"- {subtopic}\n" outline_text += "\n" await self.emit_message(outline_text) # Get user feedback (this will set the flags and state for continuation) feedback_result = await self.process_user_outline_feedback( research_outline, user_message ) # Return empty string to pause execution until next message return "" else: # Regular display of outline if interactive research is disabled # Display the outline to the user outline_text = "### Research Outline\n\n" for topic in research_outline: outline_text += f"**{topic['topic']}**\n" for subtopic in topic.get("subtopics", []): outline_text += f"- {subtopic}\n" outline_text += "\n" await self.emit_message(outline_text) # Update status to show we've moved beyond outline generation await self.emit_status( "info", "Research outline generated. Beginning research cycles...", False ) # Initialize research variables for continued cycles cycle = 1 # We've already done one cycle with the initial queries max_cycles = self.valves.MAX_CYCLES min_cycles = self.valves.MIN_CYCLES completed_topics = set() irrelevant_topics = set() # Track irrelevant topics search_history = [] # Start fresh with search history results_history = initial_results.copy() if initial_results else [] active_outline = list(all_topics) # Topics that still need research cycle_summaries = [] # Store all cycle summaries for context # Step 4: Begin research cycles while cycle < max_cycles and active_outline: cycle += 1 await self.emit_status( "info", f"Research cycle {cycle}/{max_cycles}: Generating search queries...", False, ) # Calculate research trajectory from previous cycles if cycle > 2 and results_history: research_trajectory = await self.calculate_research_trajectory( search_history, results_history ) # Update research trajectory self.research_trajectory = research_trajectory # Calculate gap vector for directing research toward uncovered areas gap_vector = await self.calculate_gap_vector() # Rank active topics by priority using semantic analysis prioritized_topics = await self.rank_topics_by_research_priority( active_outline, gap_vector, completed_topics, results_history ) # Get most important topics for this cycle (limited to 10) priority_topics = prioritized_topics[:10] # Generate search queries with the large model query_prompt = { "role": "system", "content": """You are a post-grad research assistant generating effective search queries. Based on the user's original question and the current research needs, generate 3 search queries. Each query should be specific, use relevant keywords, and be designed to find targeted information. Consider: 1. The original user query 2. The research outline topics that still need to be addressed 3. Information already gathered from previous searches (if any) 4. Create queries that are distinct from previous searches Note: detail is king, but brevity is wit. Don't bog your queries down with lots of terms; the rare ones will just be ignored anyway if your query is too dense Do be specific as appropriate and necessary. You may only use quotes in ONE of your three search queries. Seriously, keep count Format your response as a valid JSON object with the following structure: {"queries": [ {"query": "search query 1", "topic": "related research topic"}, {"query": "search query 2", "topic": "related research topic"}, {"query": "search query 3", "topic": "related research topic"} ]}""", } # Build context from previous search results search_context = "" if results_history: search_context += "### Previously gathered information:\n\n" for i, result in enumerate( results_history[-5:] ): # Just include the last 5 results search_context += f"Result {i+1} (Query: '{result['query']}')\n" search_context += f"URL: {result['url']}\n" search_context += f"Summary: {result['content'][:500]}...\n\n" # Include previous queries to avoid duplication if search_history: search_context += "### Previous search queries:\n" search_context += ", ".join([f"'{q}'" for q in search_history[-10:]]) search_context += "\n\n" # Include prioritized research topics search_context += "### Priority research topics for this cycle:\n" for topic in priority_topics: search_context += f"- {topic}\n" # Add a separate section for all remaining topics if len(active_outline) > len(priority_topics): search_context += "\n### Additional topics still needing research:\n" for topic in active_outline: if topic not in priority_topics: search_context += f"- {topic}\n" # Include previous cycle summaries if cycle_summaries: search_context += "\n### Previous cycle summaries:\n" for i, summary in enumerate(cycle_summaries): search_context += f"Cycle {i+1} Summary: {summary}\n\n" # Include user preferences if enabled if ( self.valves.USER_PREFERENCE_THROUGHOUT and self.user_preferences["pdv"] is not None ): search_context += ( "\n### User preferences are being applied to search results\n" ) # Include identified research gaps from dimensional analysis if self.research_dimensions: gaps = await self.identify_research_gaps() if gaps: search_context += "\n### Identified research gaps:\n" for gap in gaps: search_context += f"- {gap}\n" # Include previous comprehensive summary if this is a follow-up if is_follow_up and self.prev_comprehensive_summary: search_context += "### Previous Research Summary:\n\n" search_context += f"{self.prev_comprehensive_summary[:1000]}...\n\n" query_messages = [ query_prompt, { "role": "user", "content": f"Original query: {user_message}\n\n{search_context}\n\nGenerate 3 effective search queries to gather information for the remaining research topics.", }, ] # Get search queries query_response = await self.generate_completion( self.get_research_model(), query_messages ) query_content = query_response["choices"][0]["message"]["content"] # Extract JSON from response try: query_json_str = query_content[ query_content.find("{") : query_content.rfind("}") + 1 ] query_data = json.loads(query_json_str) queries = query_data.get("queries", []) # Check if queries is a list of strings or a list of objects if queries and isinstance(queries[0], str): # Convert to objects with query and topic query_strings = queries query_topics = ( priority_topics[: len(queries)] if priority_topics else ["Research"] * len(queries) ) queries = [ {"query": q, "topic": t} for q, t in zip(query_strings, query_topics) ] except (json.JSONDecodeError, ValueError, KeyError, TypeError) as e: logger.error(f"Error parsing query JSON: {e}") # Fallback: generate basic queries for priority topics queries = [] for i, topic in enumerate(priority_topics[:3]): queries.append({"query": f"{user_message} {topic}", "topic": topic}) # Extract query strings and topics query_strings = [item["query"] for item in queries] query_topics = [item.get("topic", "Research") for item in queries] # Apply semantic transformations to queries if available if self.semantic_transformations and ( self.user_preferences["pdv"] is not None or self.research_trajectory is not None or gap_vector is not None ): # Get query embeddings query_embeddings = [] for query in query_strings: embedding = await self.get_embedding(query) if embedding: # Apply transformation transformed = await self.apply_semantic_transformation( embedding, self.semantic_transformations ) query_embeddings.append(transformed) # If we have transformed embeddings, try to refine the queries if query_embeddings and len(query_embeddings) == len(query_strings): # Create a prompt for refining the queries based on semantic direction refine_prompt = { "role": "system", "content": """You are a post-grad research assistant refining search queries to better match a specific semantic direction. The queries have been semantically transformed to emphasize certain research dimensions. Your task is to refine these queries to better capture the semantic direction while maintaining their core intent. Do not drastically change the queries, but make subtle refinements that emphasize the semantic dimensions. Return the refined queries in the same format as the original.""", } # Create context for refinement refine_context = ( f"Original query: {user_message}\n\n" "Original search queries:\n" ) for i, (query, topic) in enumerate( zip(query_strings, query_topics) ): refine_context += f"{i+1}. {query} (for topic: {topic})\n" refine_context += "\nThese queries have been semantically transformed to emphasize:" if self.user_preferences["pdv"] is not None: refine_context += "\n- User preferences" if self.research_trajectory is not None: refine_context += "\n- Research trajectory from previous cycles" if gap_vector is not None: refine_context += "\n- Gaps in research coverage" refine_context += "\n\nPlease refine these queries to better capture these semantic directions while maintaining their core intent." # Get refined queries refine_messages = [ refine_prompt, {"role": "user", "content": refine_context}, ] try: refine_response = await self.generate_completion( self.get_research_model(), refine_messages ) refine_content = refine_response["choices"][0]["message"][ "content" ] # Extract refined queries refined_queries = re.findall( r"(?:^\d+\.|\*)\s*(.+?)(?:\s+\(for topic:|$)", refine_content, re.MULTILINE, ) # Use refined queries if we got them if refined_queries and len(refined_queries) == len( query_strings ): logger.info(f"Using semantically refined queries") query_strings = refined_queries except Exception as e: logger.error(f"Error refining queries: {e}") # Display the queries to the user await self.emit_message(f"### Research Cycle {cycle}: Search Queries\n\n") for i, (query, topic) in enumerate(zip(query_strings, query_topics)): await self.emit_message( f"**Query {i+1}**: {query} (for topic: {topic})\n\n" ) # Add queries to search history search_history.extend(query_strings) # Step 5: Execute searches and process results cycle_results = [] for query, topic in zip(query_strings, query_topics): # Get query embedding for content comparison try: query_embedding = await self.get_embedding(query) if not query_embedding: query_embedding = [0] * 384 # Default embedding size except Exception as e: logger.error(f"Error getting embedding: {e}") query_embedding = [0] * 384 # Default embedding size # Apply semantic transformation if available if self.semantic_transformations: transformed_query = await self.apply_semantic_transformation( query_embedding, self.semantic_transformations ) # Use transformed embedding if available if transformed_query: query_embedding = transformed_query # Process the query and get results results = await self.process_query( query, query_embedding, outline_embedding, None, summary_embedding, ) # Add successful results to the cycle results and history cycle_results.extend(results) results_history.extend(results) # Step 6: Analyze results and update research outline if cycle_results: await self.emit_status( "info", "Analyzing search results and updating research outline...", False, ) analysis_prompt = { "role": "system", "content": """You are a post-grad research assistant analyzing search results and updating a research outline. Examine the search results and the current research outline. Determine which topics have been adequately addressed by the search results. Update the research outline by classifying topics into different categories. Topics should be classified as: - COMPLETED: Topics that have been fully or reasonably addressed with comprehensive information. - PARTIAL: Topics that have minimal information and need more research. Don't let topics languish here! If one hasn't been addressed in a while, consider if maybe it actually has been, or if it's possibly irrelevant. - IRRELEVANT: Topics that are not actually relevant to the main query, are red herrings, based on misidentified subjects, or are website artifacts rather than substantive topics. For example, mark as irrelevant any topics about unrelated subjects that were mistakenly included due to ambiguous terms, acronyms with multiple meanings, or page elements from websites that don't relate to the actual query. - NEW: New topics discovered in the search results that should be added to the research. Topics that feel like a logical extension of the user's line of questioning, or that are clearly important to a specific subject but aren't currently included, belong here. Format your response as a valid JSON object with the following structure: { "completed_topics": ["Topic 1", "Subtopic 2.1"], "partial_topics": ["Topic 2"], "irrelevant_topics": ["Topic that's a distraction", "Misidentified subject"], "new_topics": ["New topic discovered"], "analysis": "Brief analysis of what we've learned and what still needs research" }""", } # Create a context with the current outline and search results analysis_context = "### Current Research Outline Topics:\n" analysis_context += "\n".join( [f"- {topic}" for topic in active_outline] ) analysis_context += "\n\n### Latest Search Results:\n\n" for i, result in enumerate(cycle_results): analysis_context += f"Result {i+1} (Query: '{result['query']}')\n" analysis_context += f"Title: {result['title']}\n" analysis_context += f"Content: {result['content'][:1000]}...\n\n" # Include previous cycle summaries for continuity if cycle_summaries: analysis_context += "\n### Previous cycle summaries:\n" for i, summary in enumerate(cycle_summaries): analysis_context += f"Cycle {i+1} Summary: {summary}\n\n" # Include lists of completed and irrelevant topics if completed_topics: analysis_context += "\n### Already completed topics:\n" for topic in completed_topics: analysis_context += f"- {topic}\n" if irrelevant_topics: analysis_context += "\n### Already identified irrelevant topics:\n" for topic in irrelevant_topics: analysis_context += f"- {topic}\n" # Include user preferences if applicable if ( self.valves.USER_PREFERENCE_THROUGHOUT and self.user_preferences["pdv"] is not None ): analysis_context += ( "\n### User preferences are being applied to research\n" ) analysis_messages = [ analysis_prompt, { "role": "user", "content": f"Original query: {user_message}\n\n{analysis_context}\n\nAnalyze these results and update the research outline.", }, ] try: analysis_response = await self.generate_completion( self.get_research_model(), analysis_messages ) analysis_content = analysis_response["choices"][0]["message"][ "content" ] # Extract JSON from response analysis_json_str = analysis_content[ analysis_content.find("{") : analysis_content.rfind("}") + 1 ] analysis_data = json.loads(analysis_json_str) # Update completed topics newly_completed = set(analysis_data.get("completed_topics", [])) completed_topics.update(newly_completed) # Update irrelevant topics newly_irrelevant = set(analysis_data.get("irrelevant_topics", [])) irrelevant_topics.update(newly_irrelevant) # Add any new topics discovered new_topics = analysis_data.get("new_topics", []) for topic in new_topics: if ( topic not in all_topics and topic not in completed_topics and topic not in irrelevant_topics ): active_outline.append(topic) all_topics.append(topic) # Update active outline by removing completed and irrelevant topics active_outline = [ topic for topic in active_outline if topic not in completed_topics and topic not in irrelevant_topics ] # Save the analysis summary cycle_summaries.append( analysis_data.get("analysis", f"Analysis for cycle {cycle}") ) # Create the current checklist for display to the user current_checklist = { "completed": newly_completed, "partial": set(analysis_data.get("partial_topics", [])), "irrelevant": newly_irrelevant, "new": set(new_topics), "remaining": set(active_outline), } # Display analysis to the user analysis_text = f"### Research Analysis (Cycle {cycle})\n\n" analysis_text += f"{analysis_data.get('analysis', 'Analysis not available.')}\n\n" if newly_completed: analysis_text += "**Topics Completed:**\n" for topic in newly_completed: analysis_text += f"✓ {topic}\n" analysis_text += "\n" if analysis_data.get("partial_topics"): partial_topics = analysis_data.get("partial_topics") analysis_text += "**Topics Partially Addressed:**\n" # Show only first 5 partial topics for topic in partial_topics[:5]: analysis_text += f"⚪ {topic}\n" # Add count of additional topics if there are more than 5 if len(partial_topics) > 5: analysis_text += f"...and {len(partial_topics) - 5} more\n" analysis_text += "\n" # Add display for irrelevant topics if newly_irrelevant: analysis_text += "**Irrelevant/Distraction Topics:**\n" for topic in newly_irrelevant: analysis_text += f"✗ {topic}\n" analysis_text += "\n" if new_topics: analysis_text += "**New Topics Discovered:**\n" for topic in new_topics: analysis_text += f"+ {topic}\n" analysis_text += "\n" if active_outline: analysis_text += "**Remaining Topics:**\n" for topic in active_outline[:5]: # Show just the first 5 analysis_text += f"□ {topic}\n" if len(active_outline) > 5: analysis_text += f"...and {len(active_outline) - 5} more\n" analysis_text += "\n" # Add dimension coverage information if available if self.research_dimensions: try: # Get the top 3 most covered dimensions coverage = self.research_dimensions["coverage"] dimension_names = self.research_dimensions[ "dimension_names" ] # Sort dimensions by coverage (descending) sorted_dims = np.argsort(coverage)[::-1] # Display the top covered dimensions top_dimensions = [ (dimension_names[i], coverage[i]) for i in sorted_dims[:3] ] if top_dimensions: analysis_text += "**Research Dimensions Coverage:**\n" for name, cov in top_dimensions: analysis_text += ( f"- {name}: {int(cov * 100)}% covered\n" ) analysis_text += "\n" except Exception as e: logger.error(f"Error displaying dimension coverage: {e}") await self.emit_message(analysis_text) except Exception as e: logger.error(f"Error analyzing results: {e}") await self.emit_message( f"### Research Progress (Cycle {cycle})\n\nContinuing research on remaining topics...\n\n" ) # Mark one topic as completed to ensure progress if active_outline: # Find the most covered topic based on similarities to gathered results topic_scores = {} # Only attempt similarity analysis if we have results if cycle_results: for topic in active_outline: topic_embedding = await self.get_embedding(topic) if topic_embedding: # Calculate similarity to each result topic_score = 0.0 for result in cycle_results: content = result.get("content", "")[ :1000 ] # Use first 1000 chars content_embedding = await self.get_embedding( content ) if content_embedding: sim = cosine_similarity( [topic_embedding], [content_embedding] )[0][0] topic_score += sim # Average the score if cycle_results: topic_score /= len(cycle_results) topic_scores[topic] = topic_score # If we have scores, select the highest; otherwise just take the first one if topic_scores: completed_topic = max( topic_scores.items(), key=lambda x: x[1] )[0] logger.info( f"Selected most covered topic: {completed_topic} (score: {topic_scores[completed_topic]:.3f})" ) else: completed_topic = active_outline[0] logger.info( f"No similarity data available, selected first topic: {completed_topic}" ) completed_topics.add(completed_topic) active_outline.remove(completed_topic) await self.emit_message( f"**Topic Addressed:** {completed_topic}\n\n" ) # Add minimal analysis to cycle summaries cycle_summaries.append(f"Completed topic: {completed_topic}") # Check termination criteria if not active_outline or active_outline == []: await self.emit_status( "info", "All research topics have been addressed!", False ) break if cycle >= min_cycles and len(completed_topics) / len(all_topics) > 0.7: await self.emit_status( "info", "Most research topics have been addressed. Finalizing...", False, ) break # Continue to next cycle if we haven't hit max_cycles if cycle >= max_cycles: await self.emit_status( "info", f"Maximum research cycles ({max_cycles}) reached. Finalizing...", False, ) break await self.emit_status( "info", f"Research cycle {cycle} complete. Moving to next cycle...", False, ) # Apply stepped compression to all research results if enabled if self.valves.STEPPED_SYNTHESIS_COMPRESSION and len(results_history) > 2: await self.emit_status( "info", "Applying stepped compression to research results...", False ) # Track token counts before compression total_tokens_before = 0 for result in results_history: tokens = await self.count_tokens(result.get("content", "")) total_tokens_before += tokens # Apply stepped compression to results results_history = await self.apply_stepped_compression( results_history, query_embedding if "query_embedding" in locals() else None, summary_embedding, ) # Calculate total tokens after compression total_tokens_after = sum( result.get("tokens", 0) for result in results_history ) # Log token reduction token_reduction = total_tokens_before - total_tokens_after if total_tokens_before > 0: percent_reduction = (token_reduction / total_tokens_before) * 100 logger.info( f"Stepped compression: {total_tokens_before} → {total_tokens_after} tokens " f"(saved {token_reduction} tokens, {percent_reduction:.1f}% reduction)" ) await self.emit_status( "info", f"Compressed research results from {total_tokens_before} to {total_tokens_after} tokens", False, ) # Step 7: Generate refined synthesis outline await self.emit_status( "info", "Generating refined outline for synthesis...", False ) synthesis_outline = await self.generate_synthesis_outline( research_outline, completed_topics, user_message, results_history ) # If synthesis outline generation failed, use original if not synthesis_outline: synthesis_outline = research_outline # Step 8: Synthesize final answer with the selected model - Section by Section await self.emit_status( "info", "Synthesizing comprehensive answer from research results...", False ) await self.emit_message( "\n\n---\n\n### Research Complete\n\nSynthesizing comprehensive answer...\n\n" ) # Determine which model to use for synthesis synthesis_model = self.get_synthesis_model() await self.emit_status("info", f"Using {synthesis_model} for synthesis", False) # Clear section content storage self.section_synthesized_content = {} # Process each main topic and its subtopics compiled_sections = {} # Include only main topics that are not in irrelevant_topics relevant_topics = [ topic for topic in synthesis_outline if topic["topic"] not in irrelevant_topics ] # If we have no relevant topics, use a simple structure if not relevant_topics: relevant_topics = [ {"topic": "Research Summary", "subtopics": ["General Information"]} ] # Generate content for each section with proper status updates for topic_item in relevant_topics: section_title = topic_item["topic"] subtopics = [ st for st in topic_item.get("subtopics", []) if st not in irrelevant_topics ] # Generate content for this section await self.emit_status( "info", f"Generating content for section: {section_title}...", False ) section_content = await self.generate_section_content( section_title, subtopics, user_message, results_history, synthesis_model, is_follow_up, self.prev_comprehensive_summary if is_follow_up else "", ) # Store in compiled sections compiled_sections[section_title] = section_content # After all sections are generated, perform synthesis review await self.emit_status( "info", "Reviewing and improving the synthesis...", False ) # Get synthesis review review_data = await self.review_synthesis( compiled_sections, user_message, synthesis_outline, synthesis_model ) # Apply edits from review await self.emit_status("info", "Applying improvements to synthesis...", False) edited_sections, changes_made = await self.apply_review_edits( compiled_sections, review_data, synthesis_model ) # Build final answer comprehensive_answer = "" # Add introduction intro_prompt = { "role": "system", "content": """You are a post-grad research assistant writing a brief introduction for a research report. Create a concise introduction (1-2 paragraphs) that summarizes the purpose of the research and briefly states what it found. Do not include general statements about the research process itself.""", } intro_context = f"Research Query: {user_message}\n\nResearch Outline:" for section in edited_sections: intro_context += f"\n- {section}" intro_message = {"role": "user", "content": intro_context} try: # Use synthesis model for intro intro_response = await self.generate_completion( synthesis_model, [intro_prompt, intro_message], stream=False, temperature=self.valves.SYNTHESIS_TEMPERATURE * 0.83, ) if ( intro_response and "choices" in intro_response and len(intro_response["choices"]) > 0 ): introduction = intro_response["choices"][0]["message"]["content"] comprehensive_answer += f"{introduction}\n\n" except Exception as e: logger.error(f"Error generating introduction: {e}") comprehensive_answer += f"# Comprehensive Research on: {user_message}\n\n" # Add each section with heading for section_title, content in edited_sections.items(): # Get token count for the section section_tokens = self.memory_stats["section_tokens"].get(section_title, 0) if section_tokens == 0: section_tokens = await self.count_tokens(content) self.memory_stats["section_tokens"][section_title] = section_tokens comprehensive_answer += ( f"## {section_title} [{section_tokens} tokens]\n\n{content}\n\n" ) # Add conclusion concl_prompt = { "role": "system", "content": """You are a post-grad research assistant writing a comprehensive but brief conclusion for a research report. Create a concise conclusion (2-4 paragraphs) that summarizes the purpose of, and key findings and insights from, the research. Avoid repeating specific instances and instead focus on the big picture that characterizes the research and topic. Do not discuss limitations or future work unless specifically relevant to the findings.""", } concl_context = ( f"Research Query: {user_message}\n\nKey findings from each section:\n" ) for section_title, content in edited_sections.items(): concl_context += f"\n## {section_title}\n{content[:200]}...\n" concl_message = {"role": "user", "content": concl_context} try: await self.emit_status("info", "Generating conclusion...", False) # Use synthesis model for conclusion concl_response = await self.generate_completion( synthesis_model, [concl_prompt, concl_message], stream=False, temperature=self.valves.SYNTHESIS_TEMPERATURE, ) if ( concl_response and "choices" in concl_response and len(concl_response["choices"]) > 0 ): conclusion = concl_response["choices"][0]["message"]["content"] conclusion_tokens = await self.count_tokens(conclusion) comprehensive_answer += ( f"## Conclusion [{conclusion_tokens} tokens]\n\n{conclusion}\n\n" ) except Exception as e: logger.error(f"Error generating conclusion: {e}") # Count total tokens in the comprehensive answer synthesis_tokens = await self.count_tokens(comprehensive_answer) self.memory_stats["synthesis_tokens"] = synthesis_tokens # Calculate total tokens used in the research total_tokens = ( self.memory_stats["results_tokens"] + sum(self.memory_stats["section_tokens"].values()) + synthesis_tokens ) self.memory_stats["total_tokens"] = total_tokens # Mark research as completed self._research_completed = True # Output the final compiled and edited synthesis await self.emit_status("info", "Final synthesis complete!", True) # Output the complete integrated synthesis await self.emit_message("\n\n## Comprehensive Answer\n\n") await self.emit_message(comprehensive_answer) # Add token usage statistics token_stats = ( f"\n\n---\n\n**Token Usage Statistics**\n\n" f"- Research Results: {self.memory_stats['results_tokens']} tokens\n" f"- Final Synthesis: {synthesis_tokens} tokens\n" f"- Total: {total_tokens} tokens\n" ) await self.emit_message(token_stats) # Store the comprehensive answer for potential follow-up queries self.prev_comprehensive_summary = comprehensive_answer # Share embedding cache stats cache_stats = self.embedding_cache.stats() logger.info(f"Embedding cache stats: {cache_stats}") # Complete the process await self.emit_status("success", "Deep research complete!", True) return ""