Performance Tips¶
Optimizing pytaskwarrior usage can significantly improve application responsiveness and efficiency.
Batch Operation Optimization¶
Efficient Task Creation¶
from taskwarrior import TaskWarrior, TaskInputDTO
tw = TaskWarrior()
# Instead of creating tasks one by one
tasks_to_create = [
TaskInputDTO(description="Task 1", project="work"),
TaskInputDTO(description="Task 2", project="work"),
TaskInputDTO(description="Task 3", project="work")
]
# Create all tasks in a batch
for task in tasks_to_create:
tw.add_task(task)
# Or use bulk operations if available
# (This would depend on the underlying TaskWarrior implementation)
Batch Task Updates¶
from taskwarrior import task_output_to_input
def batch_update_tasks(tw, tasks_data):
"""Update multiple tasks efficiently"""
for task_data in tasks_data:
try:
# Get the current task
task = tw.get_task(task_data['uuid'])
# Convert to input DTO
input_dto = task_output_to_input(task)
# Apply changes
if 'priority' in task_data:
input_dto.priority = task_data['priority']
if 'tags' in task_data:
input_dto.tags = task_data['tags']
# Save changes
tw.modify_task(input_dto, task_data['uuid'])
except Exception as e:
print(f"Failed to update task {task_data['uuid']}: {e}")
# Usage
tasks_to_update = [
{'uuid': 'task-1', 'priority': 'H'},
{'uuid': 'task-2', 'tags': ['urgent', 'review']},
]
batch_update_tasks(tw, tasks_to_update)
Filter Performance Tips¶
Efficient Filtering Strategies¶
The default TC adapter supports implicit AND (space-separated tokens). For complex boolean logic, filter in Python after retrieving tasks.
# Good - use specific filter tokens (implicit AND)
tasks = tw.get_tasks("project:work status:pending")
# OR logic — not supported natively in TC adapter; combine in Python
work_tasks = tw.get_tasks("project:work")
filtered_tasks = [t for t in work_tasks if t.priority in ['H', 'M']]
Pre-filtering for Complex Queries¶
def get_filtered_tasks(tw, project=None, priority=None):
"""Filter by project first, then apply further criteria in Python."""
base_filter = f"project:{project}" if project else ""
all_tasks = tw.get_tasks(base_filter)
if priority:
all_tasks = [t for t in all_tasks if t.priority == priority]
return all_tasks
# Usage
tasks = get_filtered_tasks(tw, project="work", priority="H")
Caching Strategies¶
Task Result Caching¶
import time
from functools import lru_cache
class CachedTaskWarrior:
def __init__(self, tw):
self.tw = tw
self._task_cache = {}
self._cache_timeout = 300 # 5 minutes
@lru_cache(maxsize=128)
def get_tasks_cached(self, filter_string=""):
"""Cached version of get_tasks"""
return self.tw.get_tasks(filter_string)
def get_task_cached(self, uuid):
"""Cached version of get_task"""
# Simple in-memory cache
if uuid in self._task_cache:
cached, timestamp = self._task_cache[uuid]
if time.time() - timestamp < self._cache_timeout:
return cached
# Fetch from TaskWarrior
task = self.tw.get_task(uuid)
self._task_cache[uuid] = (task, time.time())
return task
# Usage
cached_tw = CachedTaskWarrior(tw)
tasks = cached_tw.get_tasks_cached("project:work")
Context Caching¶
class ContextCachedTaskWarrior:
def __init__(self, tw):
self.tw = tw
self._context_cache = {}
def get_contexts_cached(self):
"""Cache context list"""
if 'contexts' not in self._context_cache:
contexts = self.tw.get_contexts()
self._context_cache['contexts'] = contexts
return contexts
return self._context_cache['contexts']
def apply_context_cached(self, context_name):
"""Apply context with caching"""
# In a real implementation, you might want to cache the current context
return self.tw.apply_context(context_name)
Large Dataset Handling¶
Memory-Efficient Task Processing¶
The TC adapter loads all matching tasks into memory — get_tasks() returns a
list. For large datasets, use a specific filter to reduce the result set rather
than fetching everything and paginating in the caller.
def process_tasks_efficiently(tw, filter_string=""):
"""Use a targeted filter to avoid loading unnecessary tasks."""
tasks = tw.get_tasks(filter_string)
for task in tasks:
print(f"Processing: {task.description}")
# Prefer a focused filter over fetching all tasks
process_tasks_efficiently(tw, "project:work +READY")
Command Execution Efficiency¶
Minimize External Calls¶
# Good - minimize calls to TaskWarrior
def get_task_summary(tw):
"""Get summary of tasks with minimal calls"""
# Get all pending tasks in one call
tasks = tw.get_tasks("status:pending")
# Process in memory
summary = {
'total': len(tasks),
'high_priority': len([t for t in tasks if t.priority == 'H']),
'by_project': {}
}
for task in tasks:
project = task.project or 'unassigned'
summary['by_project'][project] = summary['by_project'].get(project, 0) + 1
return summary
# Usage
summary = get_task_summary(tw)
print(f"Total tasks: {summary['total']}")
Batch Commands¶
def batch_complete_tasks(tw, task_uuids):
"""Complete multiple tasks efficiently"""
# Instead of calling done_task for each task
# You could potentially batch these operations
completed = []
for uuid in task_uuids:
try:
tw.done_task(uuid)
completed.append(uuid)
except Exception as e:
print(f"Failed to complete task {uuid}: {e}")
return completed
# Usage
task_uuids = ["task1", "task2", "task3"]
completed = batch_complete_tasks(tw, task_uuids)
Connection Pooling (if applicable)¶
class OptimizedTaskWarrior:
def __init__(self):
# Reuse TaskWarrior instance instead of creating new ones
self.tw = TaskWarrior()
def get_tasks_optimized(self, filter_string=""):
"""Optimized task retrieval"""
return self.tw.get_tasks(filter_string)
def add_task_optimized(self, task):
"""Optimized task creation"""
return self.tw.add_task(task)
Monitoring and Profiling¶
Performance Monitoring¶
import time
from functools import wraps
def monitor_performance(func):
"""Decorator to monitor function performance"""
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
print(f"{func.__name__} took {end_time - start_time:.2f} seconds")
return result
return wrapper
@monitor_performance
def get_work_tasks(tw):
return tw.get_tasks("project:work")
# Usage
tasks = get_work_tasks(tw)