Data Cleaning

Database Data Migration Scripts

Migrate data between databases with schema mapping, validation, and rollback support.

PythonMigrationSQLETL

Thumbnail for Database Data Migration Scripts

Overview

Migrate data between databases with schema mapping, validation, and rollback support.

Pipeline Architecture

text
Raw Data → Ingestion → Validation → Cleaning → Transformation → Output
              │            │            │            │
              ▼            ▼            ▼            ▼
          Log errors   Flag issues  Fix/Remove   Enrich data

Installation

bash
pip install pandas numpy openpyxl pyarrow

Data Cleaning Pipeline

python
import pandas as pd
import numpy as np
from typing import Optional
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class DataCleaner:
    """Automated data cleaning pipeline for Database Data Migration Scripts."""

    def __init__(self, df: pd.DataFrame):
        self.df = df.copy()
        self.report = {"original_rows": len(df), "steps": []}

    def remove_duplicates(self, subset: Optional[list] = None) -> "DataCleaner":
        before = len(self.df)
        self.df = self.df.drop_duplicates(subset=subset)
        removed = before - len(self.df)
        self._log("remove_duplicates", removed)
        return self

    def handle_missing(
        self, strategy: str = "drop", fill_value=None, columns: Optional[list] = None
    ) -> "DataCleaner":
        cols = columns or self.df.columns.tolist()
        before = self.df[cols].isnull().sum().sum()

        if strategy == "drop":
            self.df = self.df.dropna(subset=cols)
        elif strategy == "fill":
            self.df[cols] = self.df[cols].fillna(fill_value)
        elif strategy == "median":
            for col in cols:
                if pd.api.types.is_numeric_dtype(self.df[col]):
                    self.df[col] = self.df[col].fillna(self.df[col].median())
        elif strategy == "mode":
            for col in cols:
                self.df[col] = self.df[col].fillna(self.df[col].mode().iloc[0])

        self._log(f"handle_missing ({strategy})", int(before))
        return self

    def remove_outliers(self, columns: list, method: str = "iqr", factor: float = 1.5) -> "DataCleaner":
        before = len(self.df)
        for col in columns:
            if method == "iqr":
                Q1 = self.df[col].quantile(0.25)
                Q3 = self.df[col].quantile(0.75)
                IQR = Q3 - Q1
                mask = (self.df[col] >= Q1 - factor * IQR) & (self.df[col] <= Q3 + factor * IQR)
                self.df = self.df[mask]
            elif method == "zscore":
                z = np.abs((self.df[col] - self.df[col].mean()) / self.df[col].std())
                self.df = self.df[z < factor]
        self._log("remove_outliers", before - len(self.df))
        return self

    def standardize_text(self, columns: list) -> "DataCleaner":
        for col in columns:
            self.df[col] = (
                self.df[col]
                .str.strip()
                .str.lower()
                .str.replace(r"\s+", " ", regex=True)
            )
        self._log("standardize_text", 0)
        return self

    def get_report(self) -> dict:
        self.report["final_rows"] = len(self.df)
        self.report["rows_removed"] = self.report["original_rows"] - len(self.df)
        return self.report

    def _log(self, step: str, affected: int):
        logger.info(f"[{step}] affected {affected} rows")
        self.report["steps"].append({"step": step, "affected": affected})

Usage Example

python
def main():
    # Load raw data
    df = pd.read_csv("data/raw_dataset.csv")
    print(f"Loaded {len(df)} rows, {len(df.columns)} columns")

    # Run cleaning pipeline
    cleaner = DataCleaner(df)
    clean_df = (
        cleaner
        .remove_duplicates()
        .handle_missing(strategy="median", columns=["revenue", "quantity"])
        .handle_missing(strategy="mode", columns=["category"])
        .remove_outliers(columns=["revenue"], method="iqr")
        .standardize_text(columns=["name", "category"])
        .df
    )

    # Output
    report = cleaner.get_report()
    print(f"Cleaning complete: {report['original_rows']}{report['final_rows']} rows")
    clean_df.to_parquet("data/clean_dataset.parquet", index=False)


if __name__ == "__main__":
    main()

Validation Checks

python
def validate(df: pd.DataFrame) -> list[str]:
    issues = []
    if df.duplicated().any():
        issues.append(f"Found {df.duplicated().sum()} duplicates")
    for col in df.select_dtypes(include="number"):
        null_pct = df[col].isnull().mean() * 100
        if null_pct > 5:
            issues.append(f"{col}: {null_pct:.1f}% null values")
    return issues

Technologies

- Python - Migration - SQL - ETL

Related Projects

Comments (0)

Leave a Comment

No comments yet. Be the first to comment!