"""
title: AutoCoder
author: OpenWebUI User
author_url: https://github.com/user
git_url: https://github.com/user/auto-coder
description: An autonomous coding tool that generates complete projects from prompts stored in Knowledge files
required_open_webui_version: 0.4.3
requirements: langchain-openai, langgraph, ollama, langchain_ollama, pydantic, pathlib, asyncio, datetime, json, os
version: 1.0.0
licence: MIT
"""
import os
import json
import asyncio
import datetime
import subprocess
import time
from pathlib import Path
from typing import List, Dict, Any, Optional, Tuple
from pydantic import BaseModel, Field
from datetime import datetime
from langchain_ollama import AsyncOllama
class Tools:
class Valves(BaseModel):
knowledge_base_path: str = Field(
default="/app/knowledge",
description="Path to the knowledge base directory"
)
model_name: str = Field(
default="deepseek-coder",
description="Ollama model to use for code generation"
)
python_version: str = Field(
default="3.10.6",
description="Python version to use (3.10.6, 3.11.9, 3.12.6, 3.13.1)"
)
temperature: float = Field(
default=0.1,
description="Temperature for code generation (0.0-1.0)"
)
max_tokens: int = Field(
default=4096,
description="Maximum tokens for code generation"
)
system_prompt: str = Field(
default="You are an expert software developer. Write clear, efficient, and well-documented code.",
description="System prompt for the LLM"
)
class UserValves(BaseModel):
workspace_name: str = Field(
default="",
description="Name of the workspace (leave empty to use prompt filename)"
)
auto_install_dependencies: bool = Field(
default=True,
description="Automatically install dependencies from requirements.txt"
)
auto_commit_changes: bool = Field(
default=False,
description="Automatically commit changes to git repository"
)
detailed_logging: bool = Field(
default=True,
description="Generate detailed logs during execution"
)
push_updates_to_knowledge: bool = Field(
default=True,
description="Push code updates back to knowledge base"
)
wait_for_review: bool = Field(
default=False,
description="Wait for user review before proceeding to next step"
)
def __init__(self):
"""Initialize the AutoCoder Tool."""
self.valves = self.Valves()
self.user_valves = self.UserValves()
self.citation = False
self.knowledge_files = {}
self.current_workspace = None
self.llm = None
self.execution_log = []
async def initialize_llm(self, __event_emitter__=None):
"""Initialize the LLM client."""
if __event_emitter__:
await __event_emitter__({
"type": "status",
"data": {"description": f"Initializing LLM ({self.valves.model_name})", "done": False}
})
try:
self.llm = AsyncOllama(model=self.valves.model_name, temperature=self.valves.temperature)
return True
except Exception as e:
if __event_emitter__:
await __event_emitter__({
"type": "status",
"data": {"description": f"Failed to initialize LLM: {str(e)}", "done": True}
})
return False
async def scan_knowledge_base(self, __event_emitter__=None):
"""Scan the knowledge base for _prompt.md files."""
knowledge_path = Path(self.valves.knowledge_base_path)
prompt_files = list(knowledge_path.glob("**/_prompt.md"))
result = []
for file in prompt_files:
rel_path = file.relative_to(knowledge_path)
workspace_name = file.parent.name
result.append({
"path": str(rel_path),
"workspace": workspace_name
})
# Cache contents for later use
self.knowledge_files[str(rel_path)] = file.read_text()
return result
async def create_workspace(self, prompt_path: str, __event_emitter__=None):
"""Create a workspace for a given prompt file."""
knowledge_path = Path(self.valves.knowledge_base_path)
prompt_file = knowledge_path / prompt_path
if not prompt_file.exists():
return {"error": f"Prompt file not found: {prompt_path}"}
workspace_dir = prompt_file.parent
workspace_name = self.user_valves.workspace_name or workspace_dir.name
# Initialize workspace
self.current_workspace = {
"name": workspace_name,
"path": str(workspace_dir),
"prompt_file": str(prompt_file),
"roadmap_file": str(workspace_dir / "_roadmap.md"),
"progress_file": str(workspace_dir / "_progress_tracker.md"),
"created_at": datetime.now().isoformat()
}
if __event_emitter__:
await __event_emitter__({
"type": "status",
"data": {"description": f"Created workspace: {workspace_name}", "done": True}
})
return self.current_workspace
async def generate_roadmap(self, prompt_path: str, __event_emitter__=None):
"""Generate a roadmap from a prompt file."""
if not self.llm and not await self.initialize_llm(__event_emitter__):
return {"error": "Failed to initialize LLM"}
# Create workspace if needed
if not self.current_workspace or self.current_workspace["prompt_file"] != str(Path(self.valves.knowledge_base_path) / prompt_path):
await self.create_workspace(prompt_path, __event_emitter__)
knowledge_path = Path(self.valves.knowledge_base_path)
prompt_file = knowledge_path / prompt_path
workspace_dir = prompt_file.parent
if __event_emitter__:
await __event_emitter__({
"type": "status",
"data": {"description": "Reading prompt file", "done": False}
})
# Read prompt content
prompt_content = prompt_file.read_text()
if __event_emitter__:
await __event_emitter__({
"type": "status",
"data": {"description": "Generating roadmap...", "done": False}
})
# Generate roadmap using LLM
roadmap_prompt = (
f"Based on the following project requirements, create a detailed development roadmap with clear steps:\n\n"
f"{prompt_content}\n\n"
f"Format the roadmap as markdown with these sections:\n"
f"1. Project Overview\n"
f"2. Technical Architecture\n"
f"3. Development Phases (with numbered steps)\n"
f"4. Dependencies and Requirements\n"
f"5. Testing Strategy\n"
f"6. Deployment Plan\n\n"
f"Be specific, practical, and comprehensive."
)
roadmap_response = await self.llm.ainvoke(roadmap_prompt)
roadmap_content = roadmap_response
# Write roadmap to file
roadmap_file = workspace_dir / "_roadmap.md"
roadmap_file.write_text(roadmap_content)
# Create progress tracker file if it doesn't exist
progress_file = workspace_dir / "_progress_tracker.md"
if not progress_file.exists():
progress_content = (
f"# Progress Tracker: {self.current_workspace['name']}\n\n"
f"## Project Status\n\n"
f"- **Started**: {datetime.now().strftime('%Y-%m-%d %H:%M')}\n"
f"- **Current Phase**: Planning\n"
f"- **Status**: In Progress\n\n"
f"## Development Log\n\n"
f"- {datetime.now().strftime('%Y-%m-%d %H:%M')} - Project initialized\n"
f"- {datetime.now().strftime('%Y-%m-%d %H:%M')} - Roadmap generated\n\n"
f"## Completed Tasks\n\n"
f"- [x] Initialize project\n"
f"- [x] Generate roadmap\n\n"
f"## Pending Tasks\n\n"
f"- [ ] Parse roadmap into executable steps\n"
f"- [ ] Set up development environment\n"
f"- [ ] Implement core components\n\n"
f"## Issues and Blockers\n\n"
f"*No issues identified yet*\n"
)
progress_file.write_text(progress_content)
if __event_emitter__:
await __event_emitter__({
"type": "status",
"data": {"description": "Roadmap generated successfully", "done": True}
})
await __event_emitter__({
"type": "message",
"data": {"content": f"π **Roadmap Generated**\n\nRoadmap has been created at `{roadmap_file.relative_to(knowledge_path)}`. Progress tracker initialized."}
})
return {
"workspace": self.current_workspace["name"],
"roadmap_file": str(roadmap_file.relative_to(knowledge_path)),
"progress_file": str(progress_file.relative_to(knowledge_path))
}
async def parse_roadmap(self, prompt_path: str, __event_emitter__=None):
"""Parse the roadmap into executable steps."""
if not self.current_workspace:
await self.create_workspace(prompt_path, __event_emitter__)
await self.generate_roadmap(prompt_path, __event_emitter__)
knowledge_path = Path(self.valves.knowledge_base_path)
roadmap_file = Path(self.current_workspace["roadmap_file"])
if not (knowledge_path / roadmap_file).exists():
await self.generate_roadmap(prompt_path, __event_emitter__)
if __event_emitter__:
await __event_emitter__({
"type": "status",
"data": {"description": "Parsing roadmap into executable steps", "done": False}
})
# Read roadmap content
roadmap_content = (knowledge_path / roadmap_file).read_text()
# Parse roadmap using LLM
parse_prompt = (
f"Parse the following project roadmap into a JSON list of executable development tasks:\n\n"
f"{roadmap_content}\n\n"
f"Structure each task with these properties:\n"
f"- id: Unique identifier (e.g., 'task-1')\n"
f"- name: Short task name\n"
f"- description: Detailed description\n"
f"- phase: Development phase it belongs to\n"
f"- dependencies: Array of task IDs this task depends on\n"
f"- estimate_hours: Estimated hours to complete\n"
f"- output_files: Array of expected files to be created/modified\n\n"
f"Return only the JSON array, properly formatted."
)
if not self.llm:
await self.initialize_llm(__event_emitter__)
parse_response = await self.llm.ainvoke(parse_prompt)
# Extract JSON from response
try:
# Try to find JSON in the response
json_start = parse_response.find('[')
json_end = parse_response.rfind(']') + 1
if json_start >= 0 and json_end > json_start:
tasks_json = parse_response[json_start:json_end]
tasks = json.loads(tasks_json)
else:
# If no JSON found, try to parse the whole response
tasks = json.loads(parse_response)
# Store tasks in workspace
self.current_workspace["tasks"] = tasks
# Update progress tracker
progress_file = Path(self.current_workspace["progress_file"])
progress_content = (knowledge_path / progress_file).read_text()
# Add tasks to pending tasks
pending_tasks_section = "## Pending Tasks\n\n"
pending_tasks_content = pending_tasks_section
for task in tasks:
pending_tasks_content += f"- [ ] {task['name']}: {task['description'][:100]}...\n"
# Replace pending tasks section
if "## Pending Tasks" in progress_content:
progress_content = progress_content.split("## Pending Tasks")[0] + pending_tasks_content
if "## Issues and Blockers" in progress_content:
blockers_section = "## Issues and Blockers" + progress_content.split("## Issues and Blockers")[1]
progress_content = progress_content.split("## Issues and Blockers")[0] + blockers_section
else:
progress_content += "\n" + pending_tasks_content
# Update current phase
progress_content = progress_content.replace("**Current Phase**: Planning", "**Current Phase**: Development Preparation")
# Write updated progress
(knowledge_path / progress_file).write_text(progress_content)
if __event_emitter__:
await __event_emitter__({
"type": "status",
"data": {"description": f"Parsed {len(tasks)} tasks from roadmap", "done": True}
})
await __event_emitter__({
"type": "message",
"data": {"content": f"π **Roadmap Parsed**\n\nExtracted {len(tasks)} executable tasks from the roadmap. Progress tracker updated."}
})
return {
"task_count": len(tasks),
"first_tasks": [t["name"] for t in tasks[:3]],
"phases": list(set(t["phase"] for t in tasks))
}
except Exception as e:
if __event_emitter__:
await __event_emitter__({
"type": "status",
"data": {"description": f"Error parsing roadmap: {str(e)}", "done": True}
})
return {"error": f"Failed to parse roadmap: {str(e)}"}
async def setup_environment(self, prompt_path: str, __event_emitter__=None):
"""Set up the development environment based on the roadmap."""
if not self.current_workspace:
await self.create_workspace(prompt_path, __event_emitter__)
await self.generate_roadmap(prompt_path, __event_emitter__)
await self.parse_roadmap(prompt_path, __event_emitter__)
workspace_dir = Path(self.current_workspace["path"])
if __event_emitter__:
await __event_emitter__({
"type": "status",
"data": {"description": "Setting up development environment", "done": False}
})
# Set Python version
try:
pyenv_cmd = f"cd {workspace_dir} && pyenv local {self.valves.python_version}"
process = subprocess.Popen(
pyenv_cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
stdout, stderr = process.communicate()
if process.returncode != 0:
if __event_emitter__:
await __event_emitter__({
"type": "status",
"data": {"description": f"Warning: Failed to set Python version: {stderr}", "done": False}
})
except Exception as e:
if __event_emitter__:
await __event_emitter__({
"type": "status",
"data": {"description": f"Warning: Failed to set Python version: {str(e)}", "done": False}
})
# Initialize virtual environment
try:
venv_cmd = f"cd {workspace_dir} && python -m venv .venv"
process = subprocess.Popen(
venv_cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
stdout, stderr = process.communicate()
if process.returncode != 0:
if __event_emitter__:
await __event_emitter__({
"type": "status",
"data": {"description": f"Warning: Failed to create virtual environment: {stderr}", "done": False}
})
else:
if __event_emitter__:
await __event_emitter__({
"type": "status",
"data": {"description": "Created virtual environment", "done": False}
})
except Exception as e:
if __event_emitter__:
await __event_emitter__({
"type": "status",
"data": {"description": f"Warning: Failed to create virtual environment: {str(e)}", "done": False}
})
# Create requirements.txt if it doesn't exist
if not (workspace_dir / "requirements.txt").exists() and self.current_workspace.get("tasks"):
if __event_emitter__:
await __event_emitter__({
"type": "status",
"data": {"description": "Generating requirements.txt", "done": False}
})
if not self.llm:
await self.initialize_llm(__event_emitter__)
# Extract project details for context
roadmap_content = Path(self.current_workspace["roadmap_file"]).read_text()
# Generate requirements using LLM
req_prompt = (
f"Based on the following project roadmap, generate a requirements.txt file with all necessary Python packages:\n\n"
f"{roadmap_content}\n\n"
f"Format the output as a plain requirements.txt file with one package per line, with version constraints where appropriate."
)
req_response = await self.llm.ainvoke(req_prompt)
# Clean response to get just the requirements
requirements_content = ""
in_requirements = False
for line in req_response.split("\n"):
if line.strip() == "```" or line.strip() == "```requirements.txt":
in_requirements = not in_requirements
continue
if in_requirements or not line.startswith("```"):
if line.strip() and not line.startswith("#"):
requirements_content += line.strip() + "\n"
# Write requirements.txt
(workspace_dir / "requirements.txt").write_text(requirements_content)
if __event_emitter__:
await __event_emitter__({
"type": "status",
"data": {"description": "Generated requirements.txt", "done": False}
})
# Install dependencies if needed
if self.user_valves.auto_install_dependencies and (workspace_dir / "requirements.txt").exists():
if __event_emitter__:
await __event_emitter__({
"type": "status",
"data": {"description": "Installing dependencies", "done": False}
})
# Determine activate script based on OS
activate_script = ".venv/Scripts/activate" if os.name == "nt" else ".venv/bin/activate"
try:
install_cmd = f"cd {workspace_dir} && . {activate_script} && pip install -r requirements.txt"
if os.name == "nt":
install_cmd = f"cd {workspace_dir} && {activate_script} && pip install -r requirements.txt"
process = subprocess.Popen(
install_cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
stdout, stderr = process.communicate()
if process.returncode != 0:
if __event_emitter__:
await __event_emitter__({
"type": "status",
"data": {"description": f"Warning: Failed to install dependencies: {stderr}", "done": False}
})
else:
if __event_emitter__:
await __event_emitter__({
"type": "status",
"data": {"description": "Installed dependencies", "done": False}
})
except Exception as e:
if __event_emitter__:
await __event_emitter__({
"type": "status",
"data": {"description": f"Warning: Failed to install dependencies: {str(e)}", "done": False}
})
# Create basic project structure
if not (workspace_dir / "src").exists():
(workspace_dir / "src").mkdir(exist_ok=True)
if not (workspace_dir / "tests").exists():
(workspace_dir / "tests").mkdir(exist_ok=True)
if not (workspace_dir / "docs").exists():
(workspace_dir / "docs").mkdir(exist_ok=True)
# Create README.md if it doesn't exist
if not (workspace_dir / "README.md").exists():
roadmap_content = Path(self.current_workspace["roadmap_file"]).read_text()
if not self.llm:
await self.initialize_llm(__event_emitter__)
readme_prompt = (
f"Based on the following project roadmap, generate a README.md file:\n\n"
f"{roadmap_content}\n\n"
f"Include these sections:\n"
f"- Project Name and Brief Description\n"
f"- Installation Instructions\n"
f"- Usage Examples\n"
f"- Features\n"
f"- License Information\n"
f"Format as Markdown and make it professional and complete."
)
readme_response = await self.llm.ainvoke(readme_prompt)
# Write README.md
(workspace_dir / "README.md").write_text(readme_response)
# Update progress tracker
progress_file = Path(self.current_workspace["progress_file"])
progress_content = Path(progress_file).read_text()
# Update current phase
progress_content = progress_content.replace("**Current Phase**: Development Preparation", "**Current Phase**: Implementation")
# Add log entry
log_entry = f"- {datetime.now().strftime('%Y-%m-%d %H:%M')} - Development environment set up\n"
if "## Development Log" in progress_content:
progress_content = progress_content.replace("## Development Log\n\n", f"## Development Log\n\n{log_entry}")
# Update completed tasks
completed_tasks = [
"- [x] Initialize project",
"- [x] Generate roadmap",
"- [x] Parse roadmap into executable steps",
"- [x] Set up development environment",
]
completed_section = "## Completed Tasks\n\n" + "\n".join(completed_tasks) + "\n\n"
if "## Completed Tasks" in progress_content:
progress_content = progress_content.split("## Completed Tasks")[0] + completed_section + progress_content.split("## Completed Tasks")[1].split("##")[1:]
# Write updated progress
Path(progress_file).write_text(progress_content)
if __event_emitter__:
await __event_emitter__({
"type": "status",
"data": {"description": "Development environment set up successfully", "done": True}
})
await __event_emitter__({
"type": "message",
"data": {"content": f"π οΈ **Environment Ready**\n\nDevelopment environment has been set up. Python {self.valves.python_version} configured with virtual environment and dependencies installed. Basic project structure created."}
})
return {
"python_version": self.valves.python_version,
"venv_created": (workspace_dir / ".venv").exists(),
"requirements_created": (workspace_dir / "requirements.txt").exists(),
"structure_created": True
}
async def execute_next_task(self, prompt_path: str, __event_emitter__=None, __event_call__=None):
"""Execute the next pending task from the roadmap."""
if not self.current_workspace or not self.current_workspace.get("tasks"):
await self.create_workspace(prompt_path, __event_emitter__)
await self.generate_roadmap(prompt_path, __event_emitter__)
await self.parse_roadmap(prompt_path, __event_emitter__)
await self.setup_environment(prompt_path, __event_emitter__)
# Find next pending task
next_task = None
for task in self.current_workspace["tasks"]:
# Check if task is already completed
is_completed = False
for completed_task in self.current_workspace.get("completed_tasks", []):
if completed_task["id"] == task["id"]:
is_completed = True
break
if not is_completed:
# Check if all dependencies are satisfied
dependencies_met = True
if task.get("dependencies"):
for dep_id in task["dependencies"]:
dep_completed = False
for completed_task in self.current_workspace.get("completed_tasks", []):
if completed_task["id"] == dep_id:
dep_completed = True
break
if not dep_completed:
dependencies_met = False
break
if dependencies_met:
next_task = task
break
if not next_task:
if __event_emitter__:
await __event_emitter__({
"type": "message",
"data": {"content": "π **All Tasks Completed**\n\nAll tasks in the roadmap have been completed!"}
})
return {"status": "completed", "message": "All tasks completed"}
# Execute the task
return await self.execute_task(next_task["id"], prompt_path, __event_emitter__, __event_call__)
async def auto_build(self, prompt_path: str, max_tasks: int = 0, __event_emitter__=None, __event_call__=None):
"""Automatically build the entire project based on the prompt."""
if __event_emitter__:
await __event_emitter__({
"type": "message",
"data": {"content": "π **Starting Autonomous Build**\n\nI'll now process the prompt, create a roadmap, and build your project step by step without requiring further interaction."}
})
# Initialize workspace and environment
await self.create_workspace(prompt_path, __event_emitter__)
await self.generate_roadmap(prompt_path, __event_emitter__)
await self.parse_roadmap(prompt_path, __event_emitter__)
await self.setup_environment(prompt_path, __event_emitter__)
# Execute tasks until completion
task_count = 0
all_completed = False
while not all_completed and (max_tasks == 0 or task_count < max_tasks):
result = await self.execute_next_task(prompt_path, __event_emitter__, __event_call__)
if result.get("status") == "completed" and result.get("message") == "All tasks completed":
all_completed = True
elif "error" in result:
if __event_emitter__:
await __event_emitter__({
"type": "message",
"data": {"content": f"β **Error During Build**\n\n{result['error']}\n\nAutonomous build process halted."}
})
return result
task_count += 1
# Small delay to avoid overwhelming the system
await asyncio.sleep(1)
# Final summary
if all_completed:
if __event_emitter__:
await __event_emitter__({
"type": "message",
"data": {"content": f"π **Project Build Complete**\n\nSuccessfully completed all {task_count} tasks in the project roadmap. Your project is now ready!"}
})
else:
if __event_emitter__:
await __event_emitter__({
"type": "message",
"data": {"content": f"βΈοΈ **Build Paused**\n\nCompleted {task_count} of {len(self.current_workspace['tasks'])} tasks. You can resume building anytime."}
})
return {
"status": "success" if all_completed else "partial",
"tasks_completed": task_count,
"total_tasks": len(self.current_workspace["tasks"]),
"project_name": self.current_workspace["name"]
}
async def list_prompts(self, __event_emitter__=None):
"""List all available prompt files in the knowledge base."""
try:
prompts = await self.scan_knowledge_base(__event_emitter__)
if __event_emitter__ and prompts:
prompt_list = "\n".join([f"- **{p['workspace']}**: `{p['path']}`" for p in prompts])
await __event_emitter__({
"type": "message",
"data": {"content": f"π **Available Prompts**\n\n{prompt_list}"}
})
return prompts
except Exception as e:
if __event_emitter__:
await __event_emitter__({
"type": "status",
"data": {"description": f"Error listing prompts: {str(e)}", "done": True}
})
return {"error": str(e)}
async def list_tasks(self, prompt_path: str, __event_emitter__=None):
"""List all tasks for a given project."""
if not self.current_workspace or self.current_workspace["prompt_file"] != str(Path(self.valves.knowledge_base_path) / prompt_path):
await self.create_workspace(prompt_path, __event_emitter__)
if not self.current_workspace.get("tasks"):
await self.generate_roadmap(prompt_path, __event_emitter__)
await self.parse_roadmap(prompt_path, __event_emitter__)
if not self.current_workspace.get("tasks"):
return {"error": "No tasks found for this project"}
tasks = self.current_workspace["tasks"]
completed_task_ids = [t["id"] for t in self.current_workspace.get("completed_tasks", [])]
# Group tasks by phase
phases = {}
for task in tasks:
phase = task.get("phase", "Unknown")
if phase not in phases:
phases[phase] = []
task_status = "Completed" if task["id"] in completed_task_ids else "Pending"
phases[phase].append({
"id": task["id"],
"name": task["name"],
"description": task["description"],
"status": task_status
})
if __event_emitter__:
message_content = f"π **Tasks for {self.current_workspace['name']}**\n\n"
for phase, phase_tasks in phases.items():
message_content += f"### {phase}\n\n"
for task in phase_tasks:
status_emoji = "β
" if task["status"] == "Completed" else "β³"
message_content += f"{status_emoji} **{task['name']}** ({task['id']})\n"
message_content += f" {task['description'][:100]}...\n\n"
await __event_emitter__({
"type": "message",
"data": {"content": message_content}
})
return {
"phases": phases,
"completed_count": len(completed_task_ids),
"total_count": len(tasks)
}
async def check_progress(self, prompt_path: str, __event_emitter__=None):
"""Check the current progress of a project."""
if not self.current_workspace or self.current_workspace["prompt_file"] != str(Path(self.valves.knowledge_base_path) / prompt_path):
await self.create_workspace(prompt_path, __event_emitter__)
workspace_dir = Path(self.current_workspace["path"])
progress_file = workspace_dir / "_progress_tracker.md"
if not progress_file.exists():
if __event_emitter__:
await __event_emitter__({
"type": "message",
"data": {"content": "β οΈ **No Progress Tracker Found**\n\nThis project doesn't have a progress tracker yet. Generate a roadmap first."}
})
return {"error": "No progress tracker found"}
progress_content = progress_file.read_text()
# Parse current phase
current_phase = "Unknown"
if "**Current Phase**:" in progress_content:
current_phase_line = [line for line in progress_content.split("\n") if "**Current Phase**:" in line][0]
current_phase = current_phase_line.split("**Current Phase**:")[1].strip()
# Parse completed tasks
completed_tasks = []
if "## Completed Tasks" in progress_content:
completed_section = progress_content.split("## Completed Tasks")[1].split("##")[0]
completed_lines = [line for line in completed_section.split("\n") if line.strip().startswith("- [x]")]
completed_tasks = [line.strip()[6:] for line in completed_lines]
# Parse pending tasks
pending_tasks = []
if "## Pending Tasks" in progress_content:
pending_section = progress_content.split("## Pending Tasks")[1].split("##")[0]
pending_lines = [line for line in pending_section.split("\n") if line.strip().startswith("- [ ]")]
pending_tasks = [line.strip()[6:] for line in pending_lines]
# Calculate completion percentage
total_tasks = len(completed_tasks) + len(pending_tasks)
completion_percentage = round((len(completed_tasks) / total_tasks * 100) if total_tasks > 0 else 0, 1)
if __event_emitter__:
await __event_emitter__({
"type": "message",
"data": {"content": f"π **Project Progress: {completion_percentage}%**\n\n**Current Phase:** { async def execute_task(self, task_id: str, prompt_path: str, __event_emitter__=None, __event_call__=None):
"""Execute a specific task from the roadmap."""
if not self.current_workspace or not self.current_workspace.get("tasks"):
await self.create_workspace(prompt_path, __event_emitter__)
await self.generate_roadmap(prompt_path, __event_emitter__)
await self.parse_roadmap(prompt_path, __event_emitter__)
await self.setup_environment(prompt_path, __event_emitter__)
# Find task by ID
task = None
for t in self.current_workspace["tasks"]:
if t["id"] == task_id:
task = t
break
if not task:
return {"error": f"Task not found: {task_id}"}
if __event_emitter__:
await __event_emitter__({
"type": "status",
"data": {"description": f"Executing task: {task['name']}", "done": False}
})
workspace_dir = Path(self.current_workspace["path"])
# Check dependencies
if task.get("dependencies"):
for dep_id in task["dependencies"]:
# Check if dependency is completed
is_completed = False
for completed_task in self.current_workspace.get("completed_tasks", []):
if completed_task["id"] == dep_id:
is_completed = True
break
if not is_completed:
# Execute dependency first
if __event_emitter__:
await __event_emitter__({
"type": "status",
"data": {"description": f"Executing dependency: {dep_id}", "done": False}
})
await self.execute_task(dep_id, prompt_path, __event_emitter__, __event_call__)
# Prepare task context
prompt_content = Path(self.current_workspace["prompt_file"]).read_text()
roadmap_content = Path(self.current_workspace["roadmap_file"]).read_text()
# Get list of existing files for context
existing_files = []
for file in workspace_dir.glob("**/*"):
if file.is_file() and not file.name.startswith(".") and not str(file.relative_to(workspace_dir)).startswith(".venv"):
existing_files.append(str(file.relative_to(workspace_dir)))
# Read content of key files for context
file_contents = {}
for file in existing_files:
if len(file_contents) < 5: # Limit to 5 files to avoid context window issues
if file.endswith((".py", ".md", ".txt", ".json", ".yaml", ".yml")):
try:
file_path = workspace_dir / file
if file_path.exists() and file_path.stat().st_size < 10000: # Limit file size
file_contents[file] = file_path.read_text()
except Exception:
pass
# Generate task execution prompt
task_prompt = (
f"You are implementing a specific task in a software development project. Here's your context:\n\n"
f"PROJECT REQUIREMENTS:\n{prompt_content}\n\n"
f"TASK DETAILS:\n"
f"- Name: {task['name']}\n"
f"- Description: {task['description']}\n"
f"- Expected Output Files: {', '.join(task.get('output_files', []))}\n\n"
f"EXISTING PROJECT FILES:\n" + "\n".join(existing_files) + "\n\n"
)
# Add content of key files
for file, content in file_contents.items():
task_prompt += f"CONTENT OF {file}:\n```\n{content}\n```\n\n"
task_prompt += (
f"INSTRUCTIONS:\n"
f"1. Analyze the task requirements and current project state\n"
f"2. For each output file, provide the complete code/content\n"
f"3. Include file paths and explanations for each file\n"
f"4. Provide a summary of what you've implemented\n\n"
f"RESPONSE FORMAT:\n"
f"For each file, use this format:\n"
f"FILE: path/to/file.py\n"
f"```python\n"
f"# Code content here\n"
f"```\n"
f"EXPLANATION: Brief explanation of this file\n\n"
f"Finally, include an IMPLEMENTATION SUMMARY section explaining your approach."
)
if not self.llm:
await self.initialize_llm(__event_emitter__)
if __event_emitter__:
await __event_emitter__({
"type": "status",
"data": {"description": f"Generating implementation for: {task['name']}", "done": False}
})
# Execute task using LLM
implementation_response = await self.llm.ainvoke(task_prompt)
# Process response and write files
created_files = []
current_file = None
file_content = ""
in_code_block = False
implementation_summary = ""
in_summary = False
for line in implementation_response.split('\n'):
if line.startswith("FILE: "):
# Save previous file if there is one
if current_file and file_content:
file_path = workspace_dir / current_file
file_path.parent.mkdir(exist_ok=True, parents=True)
file_path.write_text(file_content)
created_files.append(current_file)
file_content = ""
in_code_block = False
# Start new file
current_file = line[6:].strip()
elif line.startswith("```") and not in_code_block:
in_code_block = True
# Skip language identifier if present
continue
elif line.startswith("```") and in_code_block:
in_code_block = False
elif in_code_block:
file_content += line + "\n"
elif line.startswith("IMPLEMENTATION SUMMARY") or line.startswith("# IMPLEMENTATION SUMMARY"):
in_summary = True
implementation_summary = line + "\n"
elif in_summary:
implementation_summary += line + "\n"
# Save the last file if there is one
if current_file and file_content:
file_path = workspace_dir / current_file
file_path.parent.mkdir(exist_ok=True, parents=True)
file_path.write_text(file_content)
created_files.append(current_file)
# Update progress tracker
progress_file = Path(self.current_workspace["progress_file"])
progress_content = Path(progress_file).read_text()
# Add log entry
log_entry = f"- {datetime.now().strftime('%Y-%m-%d %H:%M')} - Completed task: {task['name']}\n"
if "## Development Log" in progress_content:
progress_content = progress_content.replace("## Development Log\n\n", f"## Development Log\n\n{log_entry}")
# Update completed tasks
completed_task_line = f"- [x] {task['name']}: {task['description'][:50]}...\n"
if "## Completed Tasks" in progress_content:
progress_content = progress_content.replace("## Completed Tasks\n\n", f"## Completed Tasks\n\n{completed_task_line}")
# Remove from pending tasks
pending_task_line = f"- [ ] {task['name']}: {task['description'][:100]}...\n"
if pending_task_line in progress_content:
progress_content = progress_content.replace(pending_task_line, "")
# Write updated progress
Path(progress_file).write_text(progress_content)
# Add to completed tasks
if not self.current_workspace.get("completed_tasks"):
self.current_workspace["completed_tasks"] = []
self.current_workspace["completed_tasks"].append(task)
if __event_emitter__:
await __event_emitter__({
"type": "status",
"data": {"description": f"Task completed: {task['name']}", "done": True}
})
created_files_msg = "\n".join([f"- `{f}`" for f in created_files])
await __event_emitter__({
"type": "message",
"data": {"content": f"β
**Task Completed**\n\n**{task['name']}**\n\nFiles created/modified:\n{created_files_msg}\n\n**Summary:**\n{implementation_summary}"}
})
# If user wants review, wait for approval
if self.user_valves.wait_for_review and __event_call__:
response = await __event_call__({
"type": "input",
"data": {
"title": "Review Task Implementation",
"message": f"Task '{task['name']}' has been implemented. Do you want to continue to the next task?",
"placeholder": "Yes to continue, or provide feedback"
}
})
# If response is not affirmative, handle feedback
if response and not response.lower().startswith(("y", "ok", "continue", "proceed")):
if __event_emitter__:
await __event_emitter__({
"type": "message",
"data": {"content": f"π **Feedback Received**\n\nYour feedback: \"{response}\"\n\nI'll take this into account for future tasks."}
})
return {
"task_id": task_id,
"task_name": task["name"],
"files_created": created_files,
"status": "completed"
}