Data Cleaning

Data Warehouse Schema Design

Design star and snowflake schemas with fact tables, dimensions, and slowly changing dims.

SQLData WarehouseStar SchemaDimensional

Thumbnail for Data Warehouse Schema Design

Overview

Design star and snowflake schemas with fact tables, dimensions, and slowly changing dims.

Pipeline Architecture

text
Raw Data → Ingestion → Validation → Cleaning → Transformation → Output
              │            │            │            │
              ▼            ▼            ▼            ▼
          Log errors   Flag issues  Fix/Remove   Enrich data

Installation

bash
pip install pandas numpy openpyxl pyarrow

Data Cleaning Pipeline

python
import pandas as pd
import numpy as np
from typing import Optional
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class DataCleaner:
    """Automated data cleaning pipeline for Data Warehouse Schema Design."""

    def __init__(self, df: pd.DataFrame):
        self.df = df.copy()
        self.report = {"original_rows": len(df), "steps": []}

    def remove_duplicates(self, subset: Optional[list] = None) -> "DataCleaner":
        before = len(self.df)
        self.df = self.df.drop_duplicates(subset=subset)
        removed = before - len(self.df)
        self._log("remove_duplicates", removed)
        return self

    def handle_missing(
        self, strategy: str = "drop", fill_value=None, columns: Optional[list] = None
    ) -> "DataCleaner":
        cols = columns or self.df.columns.tolist()
        before = self.df[cols].isnull().sum().sum()

        if strategy == "drop":
            self.df = self.df.dropna(subset=cols)
        elif strategy == "fill":
            self.df[cols] = self.df[cols].fillna(fill_value)
        elif strategy == "median":
            for col in cols:
                if pd.api.types.is_numeric_dtype(self.df[col]):
                    self.df[col] = self.df[col].fillna(self.df[col].median())
        elif strategy == "mode":
            for col in cols:
                self.df[col] = self.df[col].fillna(self.df[col].mode().iloc[0])

        self._log(f"handle_missing ({strategy})", int(before))
        return self

    def remove_outliers(self, columns: list, method: str = "iqr", factor: float = 1.5) -> "DataCleaner":
        before = len(self.df)
        for col in columns:
            if method == "iqr":
                Q1 = self.df[col].quantile(0.25)
                Q3 = self.df[col].quantile(0.75)
                IQR = Q3 - Q1
                mask = (self.df[col] >= Q1 - factor * IQR) & (self.df[col] <= Q3 + factor * IQR)
                self.df = self.df[mask]
            elif method == "zscore":
                z = np.abs((self.df[col] - self.df[col].mean()) / self.df[col].std())
                self.df = self.df[z < factor]
        self._log("remove_outliers", before - len(self.df))
        return self

    def standardize_text(self, columns: list) -> "DataCleaner":
        for col in columns:
            self.df[col] = (
                self.df[col]
                .str.strip()
                .str.lower()
                .str.replace(r"\s+", " ", regex=True)
            )
        self._log("standardize_text", 0)
        return self

    def get_report(self) -> dict:
        self.report["final_rows"] = len(self.df)
        self.report["rows_removed"] = self.report["original_rows"] - len(self.df)
        return self.report

    def _log(self, step: str, affected: int):
        logger.info(f"[{step}] affected {affected} rows")
        self.report["steps"].append({"step": step, "affected": affected})

Usage Example

python
def main():
    # Load raw data
    df = pd.read_csv("data/raw_dataset.csv")
    print(f"Loaded {len(df)} rows, {len(df.columns)} columns")

    # Run cleaning pipeline
    cleaner = DataCleaner(df)
    clean_df = (
        cleaner
        .remove_duplicates()
        .handle_missing(strategy="median", columns=["revenue", "quantity"])
        .handle_missing(strategy="mode", columns=["category"])
        .remove_outliers(columns=["revenue"], method="iqr")
        .standardize_text(columns=["name", "category"])
        .df
    )

    # Output
    report = cleaner.get_report()
    print(f"Cleaning complete: {report['original_rows']}{report['final_rows']} rows")
    clean_df.to_parquet("data/clean_dataset.parquet", index=False)


if __name__ == "__main__":
    main()

Validation Checks

python
def validate(df: pd.DataFrame) -> list[str]:
    issues = []
    if df.duplicated().any():
        issues.append(f"Found {df.duplicated().sum()} duplicates")
    for col in df.select_dtypes(include="number"):
        null_pct = df[col].isnull().mean() * 100
        if null_pct > 5:
            issues.append(f"{col}: {null_pct:.1f}% null values")
    return issues

Technologies

- SQL - Data Warehouse - Star Schema - Dimensional

Related Projects

Comments (0)

Leave a Comment

No comments yet. Be the first to comment!