Skip to content

Performance Tips

Optimizing pytaskwarrior usage can significantly improve application responsiveness and efficiency.

Batch Operation Optimization

Efficient Task Creation

from taskwarrior import TaskWarrior, TaskInputDTO

tw = TaskWarrior()

# Instead of creating tasks one by one
tasks_to_create = [
    TaskInputDTO(description="Task 1", project="work"),
    TaskInputDTO(description="Task 2", project="work"),
    TaskInputDTO(description="Task 3", project="work")
]

# Create all tasks in a batch
for task in tasks_to_create:
    tw.add_task(task)

# Or use bulk operations if available
# (This would depend on the underlying TaskWarrior implementation)

Batch Task Updates

from taskwarrior import task_output_to_input

def batch_update_tasks(tw, tasks_data):
    """Update multiple tasks efficiently"""
    for task_data in tasks_data:
        try:
            # Get the current task
            task = tw.get_task(task_data['uuid'])

            # Convert to input DTO
            input_dto = task_output_to_input(task)

            # Apply changes
            if 'priority' in task_data:
                input_dto.priority = task_data['priority']
            if 'tags' in task_data:
                input_dto.tags = task_data['tags']

            # Save changes
            tw.modify_task(input_dto, task_data['uuid'])
        except Exception as e:
            print(f"Failed to update task {task_data['uuid']}: {e}")

# Usage
tasks_to_update = [
    {'uuid': 'task-1', 'priority': 'H'},
    {'uuid': 'task-2', 'tags': ['urgent', 'review']},
]
batch_update_tasks(tw, tasks_to_update)

Filter Performance Tips

Efficient Filtering Strategies

The default TC adapter supports implicit AND (space-separated tokens). For complex boolean logic, filter in Python after retrieving tasks.

# Good - use specific filter tokens (implicit AND)
tasks = tw.get_tasks("project:work status:pending")

# OR logic — not supported natively in TC adapter; combine in Python
work_tasks = tw.get_tasks("project:work")
filtered_tasks = [t for t in work_tasks if t.priority in ['H', 'M']]

Pre-filtering for Complex Queries

def get_filtered_tasks(tw, project=None, priority=None):
    """Filter by project first, then apply further criteria in Python."""
    base_filter = f"project:{project}" if project else ""
    all_tasks = tw.get_tasks(base_filter)
    if priority:
        all_tasks = [t for t in all_tasks if t.priority == priority]
    return all_tasks

# Usage
tasks = get_filtered_tasks(tw, project="work", priority="H")

Caching Strategies

Task Result Caching

import time
from functools import lru_cache

class CachedTaskWarrior:
    def __init__(self, tw):
        self.tw = tw
        self._task_cache = {}
        self._cache_timeout = 300  # 5 minutes

    @lru_cache(maxsize=128)
    def get_tasks_cached(self, filter_string=""):
        """Cached version of get_tasks"""
        return self.tw.get_tasks(filter_string)

    def get_task_cached(self, uuid):
        """Cached version of get_task"""
        # Simple in-memory cache
        if uuid in self._task_cache:
            cached, timestamp = self._task_cache[uuid]
            if time.time() - timestamp < self._cache_timeout:
                return cached

        # Fetch from TaskWarrior
        task = self.tw.get_task(uuid)
        self._task_cache[uuid] = (task, time.time())
        return task

# Usage
cached_tw = CachedTaskWarrior(tw)
tasks = cached_tw.get_tasks_cached("project:work")

Context Caching

class ContextCachedTaskWarrior:
    def __init__(self, tw):
        self.tw = tw
        self._context_cache = {}

    def get_contexts_cached(self):
        """Cache context list"""
        if 'contexts' not in self._context_cache:
            contexts = self.tw.get_contexts()
            self._context_cache['contexts'] = contexts
            return contexts
        return self._context_cache['contexts']

    def apply_context_cached(self, context_name):
        """Apply context with caching"""
        # In a real implementation, you might want to cache the current context
        return self.tw.apply_context(context_name)

Large Dataset Handling

Memory-Efficient Task Processing

The TC adapter loads all matching tasks into memory — get_tasks() returns a list. For large datasets, use a specific filter to reduce the result set rather than fetching everything and paginating in the caller.

def process_tasks_efficiently(tw, filter_string=""):
    """Use a targeted filter to avoid loading unnecessary tasks."""
    tasks = tw.get_tasks(filter_string)
    for task in tasks:
        print(f"Processing: {task.description}")

# Prefer a focused filter over fetching all tasks
process_tasks_efficiently(tw, "project:work +READY")

Command Execution Efficiency

Minimize External Calls

# Good - minimize calls to TaskWarrior
def get_task_summary(tw):
    """Get summary of tasks with minimal calls"""

    # Get all pending tasks in one call
    tasks = tw.get_tasks("status:pending")

    # Process in memory
    summary = {
        'total': len(tasks),
        'high_priority': len([t for t in tasks if t.priority == 'H']),
        'by_project': {}
    }

    for task in tasks:
        project = task.project or 'unassigned'
        summary['by_project'][project] = summary['by_project'].get(project, 0) + 1

    return summary

# Usage
summary = get_task_summary(tw)
print(f"Total tasks: {summary['total']}")

Batch Commands

def batch_complete_tasks(tw, task_uuids):
    """Complete multiple tasks efficiently"""

    # Instead of calling done_task for each task
    # You could potentially batch these operations

    completed = []
    for uuid in task_uuids:
        try:
            tw.done_task(uuid)
            completed.append(uuid)
        except Exception as e:
            print(f"Failed to complete task {uuid}: {e}")

    return completed

# Usage
task_uuids = ["task1", "task2", "task3"]
completed = batch_complete_tasks(tw, task_uuids)

Connection Pooling (if applicable)

class OptimizedTaskWarrior:
    def __init__(self):
        # Reuse TaskWarrior instance instead of creating new ones
        self.tw = TaskWarrior()

    def get_tasks_optimized(self, filter_string=""):
        """Optimized task retrieval"""
        return self.tw.get_tasks(filter_string)

    def add_task_optimized(self, task):
        """Optimized task creation"""
        return self.tw.add_task(task)

Monitoring and Profiling

Performance Monitoring

import time
from functools import wraps

def monitor_performance(func):
    """Decorator to monitor function performance"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        print(f"{func.__name__} took {end_time - start_time:.2f} seconds")
        return result
    return wrapper

@monitor_performance
def get_work_tasks(tw):
    return tw.get_tasks("project:work")

# Usage
tasks = get_work_tasks(tw)

Memory Usage Monitoring

import psutil
import os

def monitor_memory_usage():
    """Monitor current memory usage"""
    process = psutil.Process(os.getpid())
    memory_info = process.memory_info()
    print(f"Memory usage: {memory_info.rss / 1024 / 1024:.2f} MB")

# Usage
monitor_memory_usage()
```