Data Cleaning

Log File Processing Pipeline

Parse and analyze server logs with regex patterns, aggregation, and anomaly detection.

PythonLog AnalysisRegexPipeline

Thumbnail for Log File Processing Pipeline

Overview

Parse and analyze server logs with regex patterns, aggregation, and anomaly detection.

Pipeline Architecture

text
Raw Data → Ingestion → Validation → Cleaning → Transformation → Output
              │            │            │            │
              ▼            ▼            ▼            ▼
          Log errors   Flag issues  Fix/Remove   Enrich data

Installation

bash
pip install pandas numpy openpyxl pyarrow

Data Cleaning Pipeline

python
import pandas as pd
import numpy as np
from typing import Optional
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class DataCleaner:
    """Automated data cleaning pipeline for Log File Processing Pipeline."""

    def __init__(self, df: pd.DataFrame):
        self.df = df.copy()
        self.report = {"original_rows": len(df), "steps": []}

    def remove_duplicates(self, subset: Optional[list] = None) -> "DataCleaner":
        before = len(self.df)
        self.df = self.df.drop_duplicates(subset=subset)
        removed = before - len(self.df)
        self._log("remove_duplicates", removed)
        return self

    def handle_missing(
        self, strategy: str = "drop", fill_value=None, columns: Optional[list] = None
    ) -> "DataCleaner":
        cols = columns or self.df.columns.tolist()
        before = self.df[cols].isnull().sum().sum()

        if strategy == "drop":
            self.df = self.df.dropna(subset=cols)
        elif strategy == "fill":
            self.df[cols] = self.df[cols].fillna(fill_value)
        elif strategy == "median":
            for col in cols:
                if pd.api.types.is_numeric_dtype(self.df[col]):
                    self.df[col] = self.df[col].fillna(self.df[col].median())
        elif strategy == "mode":
            for col in cols:
                self.df[col] = self.df[col].fillna(self.df[col].mode().iloc[0])

        self._log(f"handle_missing ({strategy})", int(before))
        return self

    def remove_outliers(self, columns: list, method: str = "iqr", factor: float = 1.5) -> "DataCleaner":
        before = len(self.df)
        for col in columns:
            if method == "iqr":
                Q1 = self.df[col].quantile(0.25)
                Q3 = self.df[col].quantile(0.75)
                IQR = Q3 - Q1
                mask = (self.df[col] >= Q1 - factor * IQR) & (self.df[col] <= Q3 + factor * IQR)
                self.df = self.df[mask]
            elif method == "zscore":
                z = np.abs((self.df[col] - self.df[col].mean()) / self.df[col].std())
                self.df = self.df[z < factor]
        self._log("remove_outliers", before - len(self.df))
        return self

    def standardize_text(self, columns: list) -> "DataCleaner":
        for col in columns:
            self.df[col] = (
                self.df[col]
                .str.strip()
                .str.lower()
                .str.replace(r"\s+", " ", regex=True)
            )
        self._log("standardize_text", 0)
        return self

    def get_report(self) -> dict:
        self.report["final_rows"] = len(self.df)
        self.report["rows_removed"] = self.report["original_rows"] - len(self.df)
        return self.report

    def _log(self, step: str, affected: int):
        logger.info(f"[{step}] affected {affected} rows")
        self.report["steps"].append({"step": step, "affected": affected})

Usage Example

python
def main():
    # Load raw data
    df = pd.read_csv("data/raw_dataset.csv")
    print(f"Loaded {len(df)} rows, {len(df.columns)} columns")

    # Run cleaning pipeline
    cleaner = DataCleaner(df)
    clean_df = (
        cleaner
        .remove_duplicates()
        .handle_missing(strategy="median", columns=["revenue", "quantity"])
        .handle_missing(strategy="mode", columns=["category"])
        .remove_outliers(columns=["revenue"], method="iqr")
        .standardize_text(columns=["name", "category"])
        .df
    )

    # Output
    report = cleaner.get_report()
    print(f"Cleaning complete: {report['original_rows']}{report['final_rows']} rows")
    clean_df.to_parquet("data/clean_dataset.parquet", index=False)


if __name__ == "__main__":
    main()

Validation Checks

python
def validate(df: pd.DataFrame) -> list[str]:
    issues = []
    if df.duplicated().any():
        issues.append(f"Found {df.duplicated().sum()} duplicates")
    for col in df.select_dtypes(include="number"):
        null_pct = df[col].isnull().mean() * 100
        if null_pct > 5:
            issues.append(f"{col}: {null_pct:.1f}% null values")
    return issues

Technologies

- Python - Log Analysis - Regex - Pipeline

Related Projects

Comments (0)

Leave a Comment

No comments yet. Be the first to comment!