Database Data Migration Scripts
Migrate data between databases with schema mapping, validation, and rollback support.
PythonMigrationSQLETL
Overview
Migrate data between databases with schema mapping, validation, and rollback support.
Pipeline Architecture
text
Raw Data → Ingestion → Validation → Cleaning → Transformation → Output
│ │ │ │
▼ ▼ ▼ ▼
Log errors Flag issues Fix/Remove Enrich dataInstallation
bash
pip install pandas numpy openpyxl pyarrowData Cleaning Pipeline
python
import pandas as pd
import numpy as np
from typing import Optional
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class DataCleaner:
"""Automated data cleaning pipeline for Database Data Migration Scripts."""
def __init__(self, df: pd.DataFrame):
self.df = df.copy()
self.report = {"original_rows": len(df), "steps": []}
def remove_duplicates(self, subset: Optional[list] = None) -> "DataCleaner":
before = len(self.df)
self.df = self.df.drop_duplicates(subset=subset)
removed = before - len(self.df)
self._log("remove_duplicates", removed)
return self
def handle_missing(
self, strategy: str = "drop", fill_value=None, columns: Optional[list] = None
) -> "DataCleaner":
cols = columns or self.df.columns.tolist()
before = self.df[cols].isnull().sum().sum()
if strategy == "drop":
self.df = self.df.dropna(subset=cols)
elif strategy == "fill":
self.df[cols] = self.df[cols].fillna(fill_value)
elif strategy == "median":
for col in cols:
if pd.api.types.is_numeric_dtype(self.df[col]):
self.df[col] = self.df[col].fillna(self.df[col].median())
elif strategy == "mode":
for col in cols:
self.df[col] = self.df[col].fillna(self.df[col].mode().iloc[0])
self._log(f"handle_missing ({strategy})", int(before))
return self
def remove_outliers(self, columns: list, method: str = "iqr", factor: float = 1.5) -> "DataCleaner":
before = len(self.df)
for col in columns:
if method == "iqr":
Q1 = self.df[col].quantile(0.25)
Q3 = self.df[col].quantile(0.75)
IQR = Q3 - Q1
mask = (self.df[col] >= Q1 - factor * IQR) & (self.df[col] <= Q3 + factor * IQR)
self.df = self.df[mask]
elif method == "zscore":
z = np.abs((self.df[col] - self.df[col].mean()) / self.df[col].std())
self.df = self.df[z < factor]
self._log("remove_outliers", before - len(self.df))
return self
def standardize_text(self, columns: list) -> "DataCleaner":
for col in columns:
self.df[col] = (
self.df[col]
.str.strip()
.str.lower()
.str.replace(r"\s+", " ", regex=True)
)
self._log("standardize_text", 0)
return self
def get_report(self) -> dict:
self.report["final_rows"] = len(self.df)
self.report["rows_removed"] = self.report["original_rows"] - len(self.df)
return self.report
def _log(self, step: str, affected: int):
logger.info(f"[{step}] affected {affected} rows")
self.report["steps"].append({"step": step, "affected": affected})Usage Example
python
def main():
# Load raw data
df = pd.read_csv("data/raw_dataset.csv")
print(f"Loaded {len(df)} rows, {len(df.columns)} columns")
# Run cleaning pipeline
cleaner = DataCleaner(df)
clean_df = (
cleaner
.remove_duplicates()
.handle_missing(strategy="median", columns=["revenue", "quantity"])
.handle_missing(strategy="mode", columns=["category"])
.remove_outliers(columns=["revenue"], method="iqr")
.standardize_text(columns=["name", "category"])
.df
)
# Output
report = cleaner.get_report()
print(f"Cleaning complete: {report['original_rows']} → {report['final_rows']} rows")
clean_df.to_parquet("data/clean_dataset.parquet", index=False)
if __name__ == "__main__":
main()Validation Checks
python
def validate(df: pd.DataFrame) -> list[str]:
issues = []
if df.duplicated().any():
issues.append(f"Found {df.duplicated().sum()} duplicates")
for col in df.select_dtypes(include="number"):
null_pct = df[col].isnull().mean() * 100
if null_pct > 5:
issues.append(f"{col}: {null_pct:.1f}% null values")
return issuesTechnologies
- Python - Migration - SQL - ETL
Related Projects
Data CleaningDec 28, 2025
Automated Data Quality Monitoring
Build a monitoring system that tracks data freshness, completeness, and schema drift.
PythonData QualityMonitoringAlerting
Read more → Source
Data CleaningDec 23, 2025
Feature Engineering for ML
Create predictive features from raw data: encoding, binning, polynomial, and interaction.
PythonFeature EngineeringMLPreprocessing
Read more → Source
Data CleaningDec 19, 2025
Data Warehouse Schema Design
Design star and snowflake schemas with fact tables, dimensions, and slowly changing dims.
SQLData WarehouseStar SchemaDimensional
Read more → Source
Comments (0)
No comments yet. Be the first to comment!