We're Hiring!
Whitepaper
Docs
Sign In
Tool
Tool
v1.0.0
Vikunja
Last Updated
2 days ago
Created
2 days ago
Tool ID
vikunja
Creator
@whogben
Downloads
3+
Get
Sponsored by Open WebUI Inc.
We are hiring!
Shape the way humanity engages with
intelligence
.
Description
Access Vikunja Task Manager
README
Tool Code
Show
""" title: Vikunja Task Manager author: User description: | Manage your Vikunja tasks and projects (lists). Supported Features: - Projects: Create, update, delete, and nest projects (sub-projects). - Tasks: Create, update, delete, and mark as done. - Task Details: Priorities (1-5), Due/Start/End dates, Recurring tasks, Colors. - Search: Advanced filtering by dates, priority, and status. NOT Supported: - Labels / Tags - User Assignments - Comments & Attachments - Subtasks (Parent/Child tasks) - Kanban Buckets required_open_webui_version: 0.6.0 requirements: - httpx - pydantic version: 1.0.0 license: MIT """ import asyncio import json import os from abc import ABC, abstractmethod from typing import Any, Literal, Annotated from datetime import datetime, timezone import httpx from pydantic import BaseModel, Field, ConfigDict class Tools: class Valves(BaseModel): model_config = ConfigDict(extra="forbid") default_vikunja_api_url: str = Field( "", description="Default all-users API URL, e.g. https://try.vikunja.io/api/v1", ) default_vikunja_api_key: str = Field( "", description="Default all-users API Key" ) class UserValves(BaseModel): model_config = ConfigDict(extra="forbid") vikunja_api_url: str = Field( "", description="Vikunja API URL, e.g. https://try.vikunja.io/api/v1" ) vikunja_api_key: str = Field("", description="Vikunja API Key") def __init__(self): self.valves = self.Valves() self.user_valves = self.UserValves() def _parse_iso_date_strict( self, date_field_name: str, date_str: str | None ) -> datetime | None: if not date_str: return None try: dt = datetime.fromisoformat(date_str) if dt.tzinfo is None: # Explicitly reject naive datetimes to avoid timezone surprises raise ValueError( f"'{date_field_name}' must be timezone-aware. Please specify a timezone in the timestamp." ) return dt except ValueError as e: # Catch malformed strings that aren't dates at all raise ValueError(f"Invalid format for '{date_field_name}': {str(e)}") async def list_lists(self, __user__: dict = {}) -> list[dict]: """ List all available task lists (projects). """ lists = await _get_api(self, __user__).get_lists() return repr_llm(lists) async def get_list( self, list_id: Annotated[str, Field(description="The ID of the list.")], __user__: dict = {}, ) -> dict: """ Get details of a specific list. """ list_obj = await _get_api(self, __user__).get_list(list_id) return repr_llm(list_obj) async def create_list( self, title: Annotated[str, Field(description="Title of the new list.")], description: Annotated[ str | None, Field(description="Description of the list.") ] = None, hex_color: Annotated[str | None, Field(description="Color code.")] = None, parent_list_id: Annotated[ str | None, Field(description="Parent list ID if nested.") ] = None, __user__: dict = {}, ) -> dict: """ Create a new list (project). """ new_list = TaskList( title=title, description=description, hex_color=hex_color, parent_list_id=parent_list_id, ) created = await _get_api(self, __user__).create_list(new_list) return repr_llm(created) async def update_list( self, list_id: Annotated[str, Field(description="ID of the list to update.")], changes: Annotated[ dict, Field(description="Fields to update (title, description, hex_color, etc.)"), ], __user__: dict = {}, ) -> dict: """ Update a list (project). """ updated = await _get_api(self, __user__).update_list(list_id, changes) return repr_llm(updated) async def delete_list( self, list_id: Annotated[str, Field(description="ID of the list to delete.")], __user__: dict = {}, ) -> dict: """ Delete a list and all its tasks. """ deleted = await _get_api(self, __user__).delete_list(list_id) return repr_llm(deleted) async def list_tasks( self, filter: Annotated[ dict, Field(), ] = {}, __user__: dict = {}, ) -> list[dict]: """ Get tasks from all lists (default) or filtered lists. Dictionary of filter options. You may include any combination of the following fields: - search (str): Search term to filter tasks by title and description. - list_ids (list[str]): Filter by specific list IDs. - is_done (bool): Filter by completion status. - is_favorite (bool): Filter by favorite status. - is_archived (bool): Filter by archived status (default False). - min_priority (int): Minimum priority (1-5). - max_priority (int): Maximum priority (1-5). - due_min (str): Tasks due after this date (ISO 8601). - due_max (str): Tasks due before this date (ISO 8601). - start_min (str): Tasks starting after this date. - start_max (str): Tasks starting before this date. - end_min (str): Tasks ending after this date. - end_max (str): Tasks ending before this date. - is_repeating (bool): Filter by repeating tasks. - page (int): Page number (starts at 1). - page_size (int): Tasks per page (default 50). - sort_by (str): Sort field (priority, due_date, start_date, end_date, done_at, created, updated). - order_by (str): Sort order (asc, desc). However you may not include any additional fields beyond this. """ filter_obj = TaskFilter(**filter) tasks = await _get_api(self, __user__).get_tasks(filter_obj) return repr_llm(tasks) async def get_task( self, task_id: Annotated[str, Field(description="The ID of the task.")], __user__: dict = {}, ) -> dict: """ Get details of a specific task. """ task = await _get_api(self, __user__).get_task(task_id) return repr_llm(task) async def create_task( self, list_id: Annotated[ str, Field(description="The ID of the list to add the task to.") ], title: Annotated[str, Field(description="The title of the task.")], description: Annotated[ str | None, Field(description="Optional description.") ] = None, is_done: Annotated[ bool, Field(description="Whether the task is completed.") ] = False, priority: Annotated[ int | None, Field(description="Priority from 1 (lowest) to 5 (highest).") ] = None, due_date: Annotated[ str | None, Field(description="Due date in ISO 8601 format (YYYY-MM-DD).") ] = None, start_date: Annotated[ str | None, Field(description="Start date in ISO 8601 format (YYYY-MM-DD).") ] = None, end_date: Annotated[ str | None, Field(description="End date in ISO 8601 format (YYYY-MM-DD).") ] = None, repeat_mode: Annotated[ Literal["from_start_date", "monthly", "from_done_date"] | None, Field(description="Mode for repeating tasks."), ] = None, repeat_interval_s: Annotated[ int | None, Field(description="Repeat interval in seconds (e.g. 3600 for 1 hour)."), ] = None, hex_color: Annotated[ str | None, Field(description="Color code for the task.") ] = None, is_favorite: Annotated[ bool, Field(description="Whether the task is marked as favorite.") ] = False, is_archived: Annotated[ bool, Field(description="Whether the task is archived.") ] = False, __user__: dict = {}, ) -> dict: """ Create a new task in a specific list. """ dt_due = self._parse_iso_date_strict("due_date", due_date) dt_start = self._parse_iso_date_strict("start_date", start_date) dt_end = self._parse_iso_date_strict("end_date", end_date) task = Task( list_id=list_id, title=title, description=description, is_done=is_done, priority=priority, due_date=dt_due, start_date=dt_start, end_date=dt_end, repeat_mode=repeat_mode, repeat_interval_s=repeat_interval_s, hex_color=hex_color, is_favorite=is_favorite, is_archived=is_archived, ) created = await _get_api(self, __user__).create_task(task) return repr_llm(created) async def update_task( self, task_id: Annotated[str, Field(description="The ID of the task to update.")], changes: Annotated[ dict, Field( description="Dictionary of fields to change. Keys can by any parameter of the create_task function." ), ], __user__: dict = {}, ) -> dict: """ Update specific fields of a task. """ for date_field in ["due_date", "start_date", "end_date"]: if date_field in changes and isinstance(changes[date_field], str): changes[date_field] = self._parse_iso_date_strict( date_field, changes[date_field] ) updated = await _get_api(self, __user__).update_task(task_id, changes) return repr_llm(updated) async def batch_update_tasks( self, changes: Annotated[ dict, Field(description="Fields to update for all matching tasks.") ], filter: Annotated[ dict, Field( description="Filter dictionary to select tasks (see list_tasks for fields).", ), ] = {}, __user__: dict = {}, ) -> list[dict]: """ Update multiple tasks at once based on a filter. """ filter_obj = TaskFilter(**filter) for date_field in ["due_date", "start_date", "end_date"]: if date_field in changes and isinstance(changes[date_field], str): changes[date_field] = self._parse_iso_date_strict( date_field, changes[date_field] ) updated_tasks = await _get_api(self, __user__).update_tasks(filter_obj, changes) return repr_llm(updated_tasks) async def delete_task( self, task_id: Annotated[str, Field(description="Task ID")], __user__: dict = {}, ) -> dict: """ Delete a task. """ deleted = await _get_api(self, __user__).delete_task(task_id) return repr_llm(deleted) def _get_api(self: Tools, user: dict = {}) -> "VikunjaAPI": # Prioritize UserValves from the injected __user__ dictionary user_valves = user.get("valves") if user else None url = ( (user_valves.vikunja_api_url if user_valves else None) or self.user_valves.vikunja_api_url or self.valves.default_vikunja_api_url or os.getenv("VIKUNJA_API_URL", "") ) key = ( (user_valves.vikunja_api_key if user_valves else None) or self.user_valves.vikunja_api_key or self.valves.default_vikunja_api_key or os.getenv("VIKUNJA_API_KEY", "") ) if not url: raise ValueError( "Vikunja API URL is missing. Please configure it in User Valves or Global Valves." ) if not key: raise ValueError( "Vikunja API Key is missing. Please configure it in User Valves or Global Valves." ) return VikunjaAPI(url, key) def _now_utc(): """Return current time in UTC.""" return datetime.now(timezone.utc) def repr_llm( obj_or_list, hide_none_fields: bool = True, insert_classname: bool = False, public_only: bool = False, ) -> dict | list[dict]: """ Gets a LLM-friendly dict describing the object or list of dicts describing the objects. If insert_classname is True, the key "_classname" is inserted at the start of the dict. If public_only is True, only public fields are included (besides optional _classname key). """ if isinstance(obj_or_list, list): return [ repr_llm( item, hide_none_fields=hide_none_fields, insert_classname=insert_classname, public_only=public_only, ) for item in obj_or_list ] contents = {} classname = None if isinstance(obj_or_list, dict): contents = obj_or_list elif isinstance(obj_or_list, BaseModel): contents = obj_or_list.model_dump(mode="json", exclude_none=hide_none_fields) classname = obj_or_list.__class__.__name__ if public_only: contents = {k: v for k, v in contents.items() if not k.startswith("_")} if hide_none_fields: contents = {k: v for k, v in contents.items() if v != ""} if insert_classname and classname: contents = {"_classname": classname, **contents} return contents # TUNING CONSTANTS DEFAULT_PAGE_SIZE = 50 # MODELS class CRUDObj(BaseModel): """A base model for all CRUD objects.""" model_config = ConfigDict(extra="forbid") id: str = Field(default="", description="Auto-assigned by API.") created: datetime = Field(default_factory=_now_utc) updated: datetime = Field(default_factory=_now_utc) deleted: datetime | None = Field(default=None) def __str__(self): return self.__class__.__name__ + ": " + json.dumps(repr_llm(self), indent=2) class TaskList(CRUDObj): """An ordered collection of tasks.""" title: str = Field(...) description: str | None = Field(default=None) parent_list_id: str | None = Field(default=None) hex_color: str | None = Field(default=None) is_favorite: bool | None = Field(default=None) is_archived: bool | None = Field(default=None) class Task(CRUDObj): """A task to be completed.""" list_id: str = Field(..., description="The list this task belongs to.") title: str = Field(..., description="As displayed.") is_done: bool = Field(default=False) description: str | None = Field(default=None, description="As written.") done_at: datetime | None = Field(default=None) priority: int | None = Field(default=None) due_date: datetime | None = Field(default=None) start_date: datetime | None = Field(default=None) end_date: datetime | None = Field(default=None) repeat_mode: Literal["from_start_date", "monthly", "from_done_date"] | None = Field( default=None ) repeat_interval_s: int | None = Field( default=None, description="Seconds added to task dates when marked done." ) hex_color: str | None = Field(default=None) is_favorite: bool | None = Field(default=None) is_archived: bool | None = Field(default=None) class TaskFilter(BaseModel): """ Used to query tasks. Leaving fields empty = include all. """ model_config = ConfigDict(extra="forbid") search: str | None = Field( default=None, description="Search term for task titles and descriptions." ) list_ids: list[str] | None = Field(default=None) task_ids: list[str] | None = Field(default=None) is_done: bool | None = Field(default=None) is_favorite: bool | None = Field(default=None) is_archived: bool | None = Field(default=False) min_priority: int | None = Field(default=None) max_priority: int | None = Field(default=None) due_min: datetime | None = Field(default=None) due_max: datetime | None = Field(default=None) start_min: datetime | None = Field(default=None) start_max: datetime | None = Field(default=None) end_min: datetime | None = Field(default=None) end_max: datetime | None = Field(default=None) is_repeating: bool | None = Field(default=None) page: int | None = Field(default=None, description="Starts at 1.") page_size: int | None = Field( default=None, description=f"Defaults to {DEFAULT_PAGE_SIZE} if unspecified." ) sort_by: ( Literal[ "priority", "due_date", "start_date", "end_date", "done_at", "created", "updated", ] | None ) = Field( default=None, description="Defaults to created. Valid values are done_at, due_date, priority, start_date, end_date, created, updated.", ) order_by: Literal["asc", "desc"] | None = Field( default=None, description="defaults to desc" ) class TaskAPIBase(ABC): """Base class that must be subclassed for specific Task API integrations.""" # LISTS INTERFACE: async def get_list(self, id: str) -> TaskList: """Get a list by its id.""" return await self._get_list(id) async def get_lists(self) -> list[TaskList]: """Gets all lists.""" return await self._get_lists() async def create_list(self, list: TaskList) -> TaskList: """Create a new list.""" return await self._create_list(list) async def update_list(self, id: str, changes: dict[str, Any]) -> TaskList: """Update specific fields of a list.""" task_list = await self._get_list(id) task_list.updated = _now_utc() for key, value in changes.items(): setattr(task_list, key, value) return await self._update_list(id, changes) async def delete_list(self, id: str) -> TaskList: """Delete a list by its id, erasing all tasks in it.""" return await self._delete_list(id) # TASKS INTERFACE: async def get_task(self, id: str) -> Task: """Get a task by its id.""" return await self._get_task(id) async def get_tasks(self, filter: TaskFilter | None = None) -> list[Task]: """Get multiple tasks narrowed by an optional filter.""" if filter is None: filter = TaskFilter() return await self._get_tasks(filter) async def create_task(self, task: Task) -> Task: """Create a new task.""" return await self._create_task(task) async def update_task(self, id: str, changes: dict[str, Any]) -> Task: """Update specific fields of a specific task.""" task = await self._get_task(id) changes["updated"] = _now_utc() for key, value in changes.items(): setattr(task, key, value) return await self._update_task(id, task) async def update_tasks( self, filter: TaskFilter, changes: dict[str, Any] ) -> list[Task]: """Sequentially updates specific fields of a batch of filtered tasks.""" tasks = await self._get_tasks(filter) results = [] changes["updated"] = _now_utc() for task in tasks: for key, value in changes.items(): setattr(task, key, value) results.append(await self._update_task(task.id, task)) return results async def delete_task(self, id: str) -> Task: """Delete a task by its id.""" return await self._delete_task(id) # LISTS IMPLEMENTATION @abstractmethod async def _get_list(self, id: str) -> TaskList: pass @abstractmethod async def _get_lists(self) -> list[TaskList]: pass @abstractmethod async def _create_list(self, list: TaskList) -> TaskList: pass @abstractmethod async def _update_list(self, id: str, changes: dict[str, Any]) -> TaskList: pass @abstractmethod async def _delete_list(self, id: str) -> TaskList: pass # TASKS IMPLEMENTATION @abstractmethod async def _get_task(self, id: str) -> Task: pass @abstractmethod async def _get_tasks(self, filter: TaskFilter) -> list[Task]: pass @abstractmethod async def _update_tasks(self, tasks: list[Task]) -> list[Task]: pass @abstractmethod async def _create_task(self, task: Task) -> Task: pass @abstractmethod async def _update_task(self, id: str, task: Task) -> Task: pass @abstractmethod async def _delete_task(self, id: str) -> Task: pass class TaskAPITester: """ Tests a TaskAPI implementation. Can be run non-destructively against a production implementation, but may leave test data behind on test failures. """ def __init__(self, api: TaskAPIBase): self.api = api self.debug_prints = True self._test_list_ids = [] # Track all test list IDs self._test_task_ids = [] # Track all test task IDs async def test_all(self): """ Tests all methods of the TaskAPI implementation, raising on the first failure. Automatically cleans up test data. """ try: for name in dir(self): if name.startswith("test_") and name != "test_all": method = getattr(self, name) try: if self.debug_prints: print(f"- {name}..", end="") await method() if self.debug_prints: print(" PASS") except Exception as e: if self.debug_prints: print(" FAILED\n") raise Exception(f"Test {name} failed: {e}") from e finally: # Always clean up test data, even if tests fail await self._cleanup_test_data() async def _cleanup_test_data(self): """Clean up all created test data with best-effort approach.""" # Delete all test tasks first (to avoid foreign key constraints) for task_id in self._test_task_ids: try: await self.api.delete_task(task_id) except Exception: pass # Ignore errors during cleanup # Delete all test lists for list_id in self._test_list_ids: try: await self.api.delete_list(list_id) except Exception: pass # Ignore errors during cleanup async def test_create_list(self): """Tests the create_list method.""" task_list = TaskList(title="Test List") created_list = await self.api.create_list(task_list) self._test_list_ids.append(created_list.id) async def test_get_lists(self): """Tests the get_lists method returns our new list""" test_list_id = self._test_list_ids[-1] # Get the most recent test list task_lists = await self.api.get_lists() assert any(task_list.id == test_list_id for task_list in task_lists) async def test_get_list(self): """Tests we can get our new list""" test_list_id = self._test_list_ids[-1] # Get the most recent test list task_list = await self.api.get_list(test_list_id) assert task_list.id == test_list_id async def test_update_list(self): """Tests we can update our new list""" test_list_id = self._test_list_ids[-1] # Get the most recent test list await self.api.update_list(test_list_id, {"title": "Updated Test List"}) task_list = await self.api.get_list(test_list_id) assert task_list.title == "Updated Test List" async def test_delete_list(self): """Tests we can delete our new list.""" # Create a separate list for deletion test to avoid breaking other tests list_to_delete = TaskList(title="List to Delete") created_list = await self.api.create_list(list_to_delete) self._test_list_ids.append(created_list.id) await self.api.delete_list(created_list.id) task_lists = await self.api.get_lists() assert not any(task_list.id == created_list.id for task_list in task_lists) # Remove from tracked IDs so subsequent tests don't try to use this deleted list self._test_list_ids.remove(created_list.id) async def test_create_task(self): """Tests the create_task method.""" # First create a test list if none exists (robustness) if not self._test_list_ids: await self.test_create_list() test_list_id = self._test_list_ids[-1] # Now create a task in that list task = Task(list_id=test_list_id, title="Test Task") created_task = await self.api.create_task(task) self._test_task_ids.append(created_task.id) async def test_get_tasks(self): """Tests the get_tasks method returns our new task""" test_task_id = self._test_task_ids[-1] # Get the most recent test task tasks = await self.api.get_tasks() assert any(task.id == test_task_id for task in tasks) async def test_get_task(self): """Tests we can get our new task""" test_task_id = self._test_task_ids[-1] # Get the most recent test task task = await self.api.get_task(test_task_id) assert task.id == test_task_id async def test_update_task(self): """Tests we can update our new task""" test_task_id = self._test_task_ids[-1] # Get the most recent test task await self.api.update_task(test_task_id, {"title": "Updated Test Task"}) task = await self.api.get_task(test_task_id) assert task.title == "Updated Test Task" async def test_update_tasks(self): """Tests batch updating of tasks""" # Use the most recent test list test_list_id = self._test_list_ids[-1] # Create a second task for batch testing task2 = Task(list_id=test_list_id, title="Second Test Task") created_task2 = await self.api.create_task(task2) self._test_task_ids.append(created_task2.id) # Batch update both tasks filter_obj = TaskFilter(list_ids=[test_list_id]) await self.api.update_tasks(filter_obj, {"is_favorite": True}) # Verify both tasks were updated tasks = await self.api.get_tasks(filter_obj) assert all(task.is_favorite is True for task in tasks) async def test_delete_task(self): """Tests we can delete our new task.""" # Create a separate list for deletion test l = TaskList(title="List for Task Deletion") created_l = await self.api.create_list(l) self._test_list_ids.append(created_l.id) task_to_delete = Task(list_id=created_l.id, title="Task to Delete") created_task = await self.api.create_task(task_to_delete) self._test_task_ids.append(created_task.id) await self.api.delete_task(created_task.id) tasks = await self.api.get_tasks() assert not any(task.id == created_task.id for task in tasks) self._test_task_ids.remove(created_task.id) # Clean up the list await self.api.delete_list(created_l.id) self._test_list_ids.remove(created_l.id) async def test_filter_by_done(self): """Tests filtering tasks by completion status.""" if not self._test_list_ids: await self.test_create_list() list_id = self._test_list_ids[-1] # Create one done and one pending task task_done = Task(list_id=list_id, title="Done Task", is_done=True) created_done = await self.api.create_task(task_done) self._test_task_ids.append(created_done.id) task_pending = Task(list_id=list_id, title="Pending Task", is_done=False) created_pending = await self.api.create_task(task_pending) self._test_task_ids.append(created_pending.id) # Filter for done tasks filter_obj = TaskFilter(list_ids=[list_id], is_done=True) tasks = await self.api.get_tasks(filter_obj) assert any(t.id == created_done.id for t in tasks) assert not any(t.id == created_pending.id for t in tasks) async def test_filter_by_priority(self): """Tests filtering tasks by minimum priority.""" if not self._test_list_ids: await self.test_create_list() list_id = self._test_list_ids[-1] # Vikunja priority: 1 (low) to 5 (high) task_high = Task(list_id=list_id, title="High Pri", priority=5) created_high = await self.api.create_task(task_high) self._test_task_ids.append(created_high.id) task_low = Task(list_id=list_id, title="Low Pri", priority=1) created_low = await self.api.create_task(task_low) self._test_task_ids.append(created_low.id) # Filter for priority >= 4 filter_obj = TaskFilter(list_ids=[list_id], min_priority=4) tasks = await self.api.get_tasks(filter_obj) assert any(t.id == created_high.id for t in tasks) assert not any(t.id == created_low.id for t in tasks) async def test_filter_by_date(self): """Tests filtering tasks by due date range.""" if not self._test_list_ids: await self.test_create_list() list_id = self._test_list_ids[-1] # Use hardcoded disparate years to avoid "current time" complexity date_past = datetime(2020, 1, 1, tzinfo=timezone.utc) date_future = datetime(2030, 1, 1, tzinfo=timezone.utc) task_past = Task(list_id=list_id, title="Past Task", due_date=date_past) created_past = await self.api.create_task(task_past) self._test_task_ids.append(created_past.id) task_future = Task(list_id=list_id, title="Future Task", due_date=date_future) created_future = await self.api.create_task(task_future) self._test_task_ids.append(created_future.id) # Filter for tasks due before 2025 filter_cutoff = datetime(2025, 1, 1, tzinfo=timezone.utc) filter_obj = TaskFilter(list_ids=[list_id], due_max=filter_cutoff) tasks = await self.api.get_tasks(filter_obj) assert any(t.id == created_past.id for t in tasks) assert not any(t.id == created_future.id for t in tasks) async def test_filter_by_ids(self): """Tests filtering tasks by specific IDs.""" if not self._test_list_ids: await self.test_create_list() list_id = self._test_list_ids[-1] t1 = Task(list_id=list_id, title="Task ID 1") ct1 = await self.api.create_task(t1) self._test_task_ids.append(ct1.id) t2 = Task(list_id=list_id, title="Task ID 2") ct2 = await self.api.create_task(t2) self._test_task_ids.append(ct2.id) # Filter for just t1 filter_obj = TaskFilter(list_ids=[list_id], task_ids=[ct1.id]) tasks = await self.api.get_tasks(filter_obj) assert any(t.id == ct1.id for t in tasks) assert not any(t.id == ct2.id for t in tasks) async def test_filter_by_favorite(self): """Tests filtering tasks by favorite status.""" if not self._test_list_ids: await self.test_create_list() list_id = self._test_list_ids[-1] t_fav = Task(list_id=list_id, title="Fav Task", is_favorite=True) ct_fav = await self.api.create_task(t_fav) self._test_task_ids.append(ct_fav.id) t_norm = Task(list_id=list_id, title="Normal Task", is_favorite=False) ct_norm = await self.api.create_task(t_norm) self._test_task_ids.append(ct_norm.id) filter_obj = TaskFilter(list_ids=[list_id], is_favorite=True) tasks = await self.api.get_tasks(filter_obj) assert any(t.id == ct_fav.id for t in tasks) assert not any(t.id == ct_norm.id for t in tasks) async def test_filter_by_repeating(self): """Tests filtering tasks by repeating status.""" if not self._test_list_ids: await self.test_create_list() list_id = self._test_list_ids[-1] t_rep = Task(list_id=list_id, title="Repeating Task", repeat_interval_s=3600) ct_rep = await self.api.create_task(t_rep) self._test_task_ids.append(ct_rep.id) t_once = Task(list_id=list_id, title="One-off Task") ct_once = await self.api.create_task(t_once) self._test_task_ids.append(ct_once.id) filter_obj = TaskFilter(list_ids=[list_id], is_repeating=True) tasks = await self.api.get_tasks(filter_obj) assert any(t.id == ct_rep.id for t in tasks) assert not any(t.id == ct_once.id for t in tasks) async def test_filter_by_start_end_dates(self): """Tests filtering by start and end date ranges.""" if not self._test_list_ids: await self.test_create_list() list_id = self._test_list_ids[-1] date_early = datetime(2024, 1, 1, tzinfo=timezone.utc) date_late = datetime(2024, 12, 31, tzinfo=timezone.utc) t_start = Task(list_id=list_id, title="Start Task", start_date=date_early) ct_start = await self.api.create_task(t_start) self._test_task_ids.append(ct_start.id) t_end = Task(list_id=list_id, title="End Task", end_date=date_late) ct_end = await self.api.create_task(t_end) self._test_task_ids.append(ct_end.id) # Test Start Date Filter f_start = TaskFilter( list_ids=[list_id], start_max=datetime(2024, 6, 1, tzinfo=timezone.utc) ) tasks_start = await self.api.get_tasks(f_start) assert any(t.id == ct_start.id for t in tasks_start) # Test End Date Filter f_end = TaskFilter( list_ids=[list_id], end_min=datetime(2024, 6, 1, tzinfo=timezone.utc) ) tasks_end = await self.api.get_tasks(f_end) assert any(t.id == ct_end.id for t in tasks_end) async def test_pagination(self): """Tests pagination of results.""" if not self._test_list_ids: await self.test_create_list() list_id = self._test_list_ids[-1] # Create 3 tasks ids = [] for i in range(3): t = Task(list_id=list_id, title=f"Page Task {i}") ct = await self.api.create_task(t) self._test_task_ids.append(ct.id) ids.append(ct.id) # Page 1, Size 2 f_p1 = TaskFilter( list_ids=[list_id], page=1, page_size=2, sort_by="created", order_by="desc" ) tasks_p1 = await self.api.get_tasks(f_p1) # Depending on other tests, we might have more tasks in this list if cleanup failed, # but we are in a fresh list. # Ensure we found at least our tasks found_ids_p1 = {t.id for t in tasks_p1} intersection_p1 = found_ids_p1.intersection(set(ids)) assert len(intersection_p1) == 2 # Page 2, Size 2 f_p2 = TaskFilter( list_ids=[list_id], page=2, page_size=2, sort_by="created", order_by="desc" ) tasks_p2 = await self.api.get_tasks(f_p2) found_ids_p2 = {t.id for t in tasks_p2} intersection_p2 = found_ids_p2.intersection(set(ids)) assert len(intersection_p2) == 1 async def test_sorting(self): """Tests sorting of results.""" if not self._test_list_ids: await self.test_create_list() list_id = self._test_list_ids[-1] t1 = Task(list_id=list_id, title="Low", priority=1) ct1 = await self.api.create_task(t1) self._test_task_ids.append(ct1.id) t2 = Task(list_id=list_id, title="High", priority=5) ct2 = await self.api.create_task(t2) self._test_task_ids.append(ct2.id) # Sort by priority DESC f_desc = TaskFilter(list_ids=[list_id], sort_by="priority", order_by="desc") tasks = await self.api.get_tasks(f_desc) relevant_tasks = [t for t in tasks if t.id in {ct1.id, ct2.id}] assert len(relevant_tasks) >= 2 assert relevant_tasks[0].id == ct2.id # High priority first assert relevant_tasks[-1].id == ct1.id async def test_filter_by_search(self): """Tests filtering tasks by search term.""" if not self._test_list_ids: await self.test_create_list() list_id = self._test_list_ids[-1] # Create task with unique search term t_match = Task(list_id=list_id, title="UniqueSearchTerm Task") ct_match = await self.api.create_task(t_match) self._test_task_ids.append(ct_match.id) # Create task without term t_other = Task(list_id=list_id, title="Other Task") ct_other = await self.api.create_task(t_other) self._test_task_ids.append(ct_other.id) # Filter by search term filter_obj = TaskFilter(list_ids=[list_id], search="UniqueSearchTerm") tasks = await self.api.get_tasks(filter_obj) assert any(t.id == ct_match.id for t in tasks) assert not any(t.id == ct_other.id for t in tasks) class VikunjaAPI(TaskAPIBase): """A TaskAPI implementation for Vikunja.""" # Mapping Vikunja's integer repeat modes to our Task model literals # Note: Please verify these integer-to-literal mappings against your Vikunja instance behavior. # Standard Vikunja: 0=Period (Interval), 1=Month, 2=Year REPEAT_MODE_MAP = { 0: "from_start_date", 1: "monthly", 2: "from_done_date", } REPEAT_MODE_MAP_INV = {v: k for k, v in REPEAT_MODE_MAP.items()} def __init__(self, api_url: str, api_key: str): self.api_url = api_url self.api_key = api_key def _get_client(self) -> httpx.AsyncClient: return httpx.AsyncClient( base_url=self.api_url, headers={"Authorization": f"Bearer {self.api_key}"} ) def _parse_date(self, date_str: str | None) -> datetime | None: if not date_str: return None # Handle Vikunja's potentially varied formats (usually ISO 8601 with Z) return datetime.fromisoformat(date_str.replace("Z", "+00:00")) def _map_project_to_list(self, data: dict) -> TaskList: return TaskList( id=str(data["id"]), title=data["title"], description=data.get("description"), created=self._parse_date(data["created"]), updated=self._parse_date(data["updated"]), parent_list_id=( str(data["parent_project_id"]) if data.get("parent_project_id") else None ), hex_color=data.get("hex_color"), is_favorite=data.get("is_favorite", False), is_archived=data.get("is_archived", False), ) def _map_task_to_payload(self, task: Task) -> dict: payload = { "title": task.title, "description": task.description, "done": task.is_done, "priority": task.priority, "hex_color": task.hex_color, "is_favorite": task.is_favorite, "is_archived": task.is_archived, # Added missing field "project_id": int(task.list_id), } # Map repeat_mode literal -> int if task.repeat_mode: mode_int = self.REPEAT_MODE_MAP_INV.get(task.repeat_mode) if mode_int is not None: payload["repeat_mode"] = mode_int if task.due_date: payload["due_date"] = task.due_date.isoformat() if task.start_date: payload["start_date"] = task.start_date.isoformat() if task.end_date: payload["end_date"] = task.end_date.isoformat() if task.repeat_interval_s: payload["repeat_after"] = task.repeat_interval_s return payload def _map_payload_to_task(self, data: dict) -> Task: return Task( id=str(data["id"]), list_id=str(data["project_id"]), title=data["title"], description=data.get("description"), is_done=data.get("done", False), priority=data.get("priority"), due_date=self._parse_date(data.get("due_date")), start_date=self._parse_date(data.get("start_date")), end_date=self._parse_date(data.get("end_date")), done_at=self._parse_date(data.get("done_at")), created=self._parse_date(data.get("created")), updated=self._parse_date(data.get("updated")), hex_color=data.get("hex_color"), is_favorite=data.get("is_favorite", False), is_archived=data.get("is_archived", False), # Added missing field repeat_interval_s=data.get("repeat_after"), repeat_mode=self.REPEAT_MODE_MAP.get(data.get("repeat_mode")), ) # LISTS IMPLEMENTATION (as Vikunja Projects) async def _get_list(self, id: str) -> TaskList: async with self._get_client() as client: response = await client.get(f"/projects/{id}") response.raise_for_status() return self._map_project_to_list(response.json()) async def _get_lists(self) -> list[TaskList]: async with self._get_client() as client: response = await client.get("/projects") response.raise_for_status() data = response.json() # Handle potential pagination or direct list projects = data if isinstance(data, list) else data.get("results", []) return [self._map_project_to_list(p) for p in projects] async def _create_list(self, list: TaskList) -> TaskList: async with self._get_client() as client: response = await client.put( "/projects", json={ "title": list.title, "description": list.description, "hex_color": list.hex_color, "parent_project_id": ( int(list.parent_list_id) if list.parent_list_id else 0 ), }, ) response.raise_for_status() return self._map_project_to_list(response.json()) async def _update_list(self, id: str, changes: dict[str, Any]) -> TaskList: payload = {} for k, v in changes.items(): if k == "parent_list_id": payload["parent_project_id"] = int(v) if v else 0 else: payload[k] = v async with self._get_client() as client: response = await client.post(f"/projects/{id}", json=payload) response.raise_for_status() return self._map_project_to_list(response.json()) async def _delete_list(self, id: str) -> TaskList: async with self._get_client() as client: response = await client.delete(f"/projects/{id}") response.raise_for_status() # Vikunja does not return the deleted object, returning placeholder. return TaskList(id=id, title="Deleted") # TASKS IMPLEMENTATION (as Vikunja Tasks) async def _get_task(self, id: str) -> Task: async with self._get_client() as client: response = await client.get(f"/tasks/{id}") response.raise_for_status() return self._map_payload_to_task(response.json()) async def _get_tasks(self, filter: TaskFilter) -> list[Task]: query_params = [] filter_parts = [] def add_filter_cond(field, value, comparator="="): if isinstance(value, bool): val_str = "true" if value else "false" elif isinstance(value, (int, float)): val_str = str(value) else: # Use double quotes for strings/dates to match Vikunja filter syntax val_str = f'"{value}"' filter_parts.append(f"{field} {comparator} {val_str}") if filter.list_ids: if len(filter.list_ids) == 1: add_filter_cond("project_id", filter.list_ids[0], "=") else: ids_str = ", ".join(filter.list_ids) filter_parts.append(f"project_id in ({ids_str})") if filter.task_ids: if len(filter.task_ids) == 1: add_filter_cond("id", filter.task_ids[0], "=") else: ids_str = ", ".join(filter.task_ids) filter_parts.append(f"id in ({ids_str})") if filter.is_done is not None: add_filter_cond("done", filter.is_done) # is_favorite is not supported in Vikunja filters. # We will filter client-side, noting this breaks pagination guarantees if many non-matches exist. if filter.is_repeating is not None: if filter.is_repeating: add_filter_cond("repeat_after", 0, ">") else: add_filter_cond("repeat_after", 0, "=") # Date filters def format_date(dt): # Try simpler date format YYYY-MM-DD return dt.strftime("%Y-%m-%d") if filter.due_min: add_filter_cond("due_date", format_date(filter.due_min), ">") if filter.due_max: add_filter_cond("due_date", format_date(filter.due_max), "<") if filter.start_min: add_filter_cond("start_date", format_date(filter.start_min), ">") if filter.start_max: add_filter_cond("start_date", format_date(filter.start_max), "<") if filter.end_min: add_filter_cond("end_date", format_date(filter.end_min), ">") if filter.end_max: add_filter_cond("end_date", format_date(filter.end_max), "<") # Priority filters if filter.min_priority is not None: add_filter_cond("priority", filter.min_priority, ">=") if filter.max_priority is not None: add_filter_cond("priority", filter.max_priority, "<=") if filter_parts: # Vikunja documentation uses && for AND query_params.append(("filter", " && ".join(filter_parts))) if filter.page: query_params.append(("page", str(filter.page))) if filter.page_size: query_params.append(("per_page", str(filter.page_size))) if filter.sort_by: query_params.append(("sort_by", filter.sort_by)) query_params.append(("order_by", filter.order_by or "desc")) if filter.search: query_params.append(("s", filter.search)) async with self._get_client() as client: endpoint = "/tasks/all" # Optimization: Use project-specific endpoint if filtering by single project # This often handles pagination more reliably and is faster if filter.list_ids and len(filter.list_ids) == 1: endpoint = f"/projects/{filter.list_ids[0]}/tasks" response = await client.get(endpoint, params=query_params) response.raise_for_status() data = response.json() tasks = data if isinstance(data, list) else data.get("results", []) mapped_tasks = [self._map_payload_to_task(t) for t in tasks] if filter.is_favorite is not None: mapped_tasks = [ t for t in mapped_tasks if t.is_favorite == filter.is_favorite ] return mapped_tasks async def _create_task(self, task: Task) -> Task: async with self._get_client() as client: payload = self._map_task_to_payload(task) response = await client.put( f"/projects/{task.list_id}/tasks", json=payload, ) response.raise_for_status() return self._map_payload_to_task(response.json()) async def _update_task(self, id: str, task: Task) -> Task: payload = self._map_task_to_payload(task) async with self._get_client() as client: response = await client.post(f"/tasks/{id}", json=payload) response.raise_for_status() return self._map_payload_to_task(response.json()) async def _update_tasks(self, tasks: list[Task]) -> list[Task]: results = [] for task in tasks: results.append(await self._update_task(task.id, task)) return results async def _delete_task(self, id: str) -> Task: async with self._get_client() as client: await client.delete(f"/tasks/{id}") return Task(list_id="0", title="Deleted") if __name__ == "__main__": import os from dotenv import load_dotenv from pathlib import Path load_dotenv(Path(__file__).resolve().parent.parent / ".env") api_url = os.getenv("VIKUNJA_API_URL") api_key = os.getenv("VIKUNJA_API_KEY") if api_url is not None and api_key is not None: api = VikunjaAPI(api_url, api_key) tester = TaskAPITester(api) print(f"Testing Vikunja API against {api.api_url} as {api.api_key[:4]}...") asyncio.run(tester.test_all())