Data Warehouse Schema Design
Design star and snowflake schemas with fact tables, dimensions, and slowly changing dims.
SQLData WarehouseStar SchemaDimensional
Overview
Design star and snowflake schemas with fact tables, dimensions, and slowly changing dims.
Pipeline Architecture
text
Raw Data → Ingestion → Validation → Cleaning → Transformation → Output
│ │ │ │
▼ ▼ ▼ ▼
Log errors Flag issues Fix/Remove Enrich dataInstallation
bash
pip install pandas numpy openpyxl pyarrowData Cleaning Pipeline
python
import pandas as pd
import numpy as np
from typing import Optional
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class DataCleaner:
"""Automated data cleaning pipeline for Data Warehouse Schema Design."""
def __init__(self, df: pd.DataFrame):
self.df = df.copy()
self.report = {"original_rows": len(df), "steps": []}
def remove_duplicates(self, subset: Optional[list] = None) -> "DataCleaner":
before = len(self.df)
self.df = self.df.drop_duplicates(subset=subset)
removed = before - len(self.df)
self._log("remove_duplicates", removed)
return self
def handle_missing(
self, strategy: str = "drop", fill_value=None, columns: Optional[list] = None
) -> "DataCleaner":
cols = columns or self.df.columns.tolist()
before = self.df[cols].isnull().sum().sum()
if strategy == "drop":
self.df = self.df.dropna(subset=cols)
elif strategy == "fill":
self.df[cols] = self.df[cols].fillna(fill_value)
elif strategy == "median":
for col in cols:
if pd.api.types.is_numeric_dtype(self.df[col]):
self.df[col] = self.df[col].fillna(self.df[col].median())
elif strategy == "mode":
for col in cols:
self.df[col] = self.df[col].fillna(self.df[col].mode().iloc[0])
self._log(f"handle_missing ({strategy})", int(before))
return self
def remove_outliers(self, columns: list, method: str = "iqr", factor: float = 1.5) -> "DataCleaner":
before = len(self.df)
for col in columns:
if method == "iqr":
Q1 = self.df[col].quantile(0.25)
Q3 = self.df[col].quantile(0.75)
IQR = Q3 - Q1
mask = (self.df[col] >= Q1 - factor * IQR) & (self.df[col] <= Q3 + factor * IQR)
self.df = self.df[mask]
elif method == "zscore":
z = np.abs((self.df[col] - self.df[col].mean()) / self.df[col].std())
self.df = self.df[z < factor]
self._log("remove_outliers", before - len(self.df))
return self
def standardize_text(self, columns: list) -> "DataCleaner":
for col in columns:
self.df[col] = (
self.df[col]
.str.strip()
.str.lower()
.str.replace(r"\s+", " ", regex=True)
)
self._log("standardize_text", 0)
return self
def get_report(self) -> dict:
self.report["final_rows"] = len(self.df)
self.report["rows_removed"] = self.report["original_rows"] - len(self.df)
return self.report
def _log(self, step: str, affected: int):
logger.info(f"[{step}] affected {affected} rows")
self.report["steps"].append({"step": step, "affected": affected})Usage Example
python
def main():
# Load raw data
df = pd.read_csv("data/raw_dataset.csv")
print(f"Loaded {len(df)} rows, {len(df.columns)} columns")
# Run cleaning pipeline
cleaner = DataCleaner(df)
clean_df = (
cleaner
.remove_duplicates()
.handle_missing(strategy="median", columns=["revenue", "quantity"])
.handle_missing(strategy="mode", columns=["category"])
.remove_outliers(columns=["revenue"], method="iqr")
.standardize_text(columns=["name", "category"])
.df
)
# Output
report = cleaner.get_report()
print(f"Cleaning complete: {report['original_rows']} → {report['final_rows']} rows")
clean_df.to_parquet("data/clean_dataset.parquet", index=False)
if __name__ == "__main__":
main()Validation Checks
python
def validate(df: pd.DataFrame) -> list[str]:
issues = []
if df.duplicated().any():
issues.append(f"Found {df.duplicated().sum()} duplicates")
for col in df.select_dtypes(include="number"):
null_pct = df[col].isnull().mean() * 100
if null_pct > 5:
issues.append(f"{col}: {null_pct:.1f}% null values")
return issuesTechnologies
- SQL - Data Warehouse - Star Schema - Dimensional
Related Projects
Data CleaningDec 28, 2025
Automated Data Quality Monitoring
Build a monitoring system that tracks data freshness, completeness, and schema drift.
PythonData QualityMonitoringAlerting
Read more → Source
Data CleaningDec 23, 2025
Feature Engineering for ML
Create predictive features from raw data: encoding, binning, polynomial, and interaction.
PythonFeature EngineeringMLPreprocessing
Read more → Source
Data CleaningDec 15, 2025
Real-Time Data Stream Processing
Process streaming data with Apache Kafka consumers and windowed aggregations.
PythonKafkaStreamingReal-time
Read more → Source
Comments (0)
No comments yet. Be the first to comment!