Python

Python Task Queue with Celery

Implement distributed task processing with Celery, Redis broker, and periodic scheduling.

PythonCeleryRedisQueue

Thumbnail for Python Task Queue with Celery

Overview

Implement distributed task processing with Celery, Redis broker, and periodic scheduling.

Installation

bash
# Create virtual environment
python -m venv venv
source venv/bin/activate

# Install dependencies
pip install -r requirements.txt

Requirements

txt
celery
redis
queue
pytest>=7.0

Core Module

python
from dataclasses import dataclass, field
from typing import List, Optional
from datetime import datetime


@dataclass
class DataProcessor:
    """Core processor for Python Task Queue with Celery."""
    name: str
    created_at: datetime = field(default_factory=datetime.now)
    _items: List[dict] = field(default_factory=list, repr=False)

    def load(self, source: str) -> "DataProcessor":
        """Load data from source."""
        print(f"Loading data from {source}...")
        # Implementation depends on source type
        return self

    def transform(self, func) -> "DataProcessor":
        """Apply transformation function to all items."""
        self._items = [func(item) for item in self._items]
        return self

    def filter(self, predicate) -> "DataProcessor":
        """Filter items based on predicate."""
        self._items = [item for item in self._items if predicate(item)]
        return self

    def export(self, destination: str, format: str = "json") -> None:
        """Export processed data to destination."""
        import json
        with open(destination, "w") as f:
            json.dump(self._items, f, indent=2, default=str)
        print(f"Exported {len(self._items)} items to {destination}")

    @property
    def count(self) -> int:
        return len(self._items)

Usage Example

python
def main():
    processor = DataProcessor(name="Python Task Queue with Celery")

    processor \
        .load("data/input.csv") \
        .transform(lambda x: {**x, "processed": True}) \
        .filter(lambda x: x.get("status") == "active") \
        .export("output/results.json")

    print(f"Processed {processor.count} records")


if __name__ == "__main__":
    main()

Testing

python
import pytest
from processor import DataProcessor


class TestDataProcessor:
    def test_create_processor(self):
        dp = DataProcessor(name="test")
        assert dp.name == "test"
        assert dp.count == 0

    def test_transform_chain(self):
        dp = DataProcessor(name="test")
        dp._items = [{"value": 1}, {"value": 2}]
        dp.transform(lambda x: {**x, "doubled": x["value"] * 2})
        assert dp._items[0]["doubled"] == 2
        assert dp._items[1]["doubled"] == 4

Technologies Used

- Python - Celery - Redis - Queue

Related Projects

Comments (0)

Leave a Comment

No comments yet. Be the first to comment!