AI Data Pipelines: How to Build Data Infrastructure for Machine Learning
Author
ZTABS Team
Date Published
Every AI team eventually learns the same lesson: model performance is capped by data quality. You can swap GPT-4o for Claude, re-architect your prompts, and tune hyperparameters for weeks — but if the data feeding your system is incomplete, stale, or poorly structured, the AI will underperform.
Data pipelines are the infrastructure that moves data from where it lives (databases, APIs, files, streams) to where your AI needs it (feature stores, vector databases, training datasets). Getting this infrastructure right is the difference between an AI system that works in a demo and one that works in production.
This guide covers how to design, build, and operate data pipelines specifically for AI and machine learning workloads — from batch ETL to real-time streaming to the specialized vector embedding pipelines that power RAG systems.
Why Data Pipelines Matter for AI
In traditional software, the database is relatively static. A user creates a record, the app reads it. The data pipeline is the application itself.
AI systems are different. They consume data from many sources, transform it in complex ways, and the quality of that transformation directly determines the quality of the AI output.
| Without Proper Pipelines | With Proper Pipelines | |--------------------------|----------------------| | Models trained on stale data | Models retrained on fresh data automatically | | Manual CSV exports and uploads | Automated ingestion from source systems | | No visibility into data quality | Automated validation and alerting | | Features computed inconsistently | Feature store ensures consistency | | Embeddings regenerated from scratch | Incremental embedding updates | | Hours of manual data prep per experiment | Self-service data access for the team |
If you're spending more time preparing data than building models, your pipeline infrastructure is the bottleneck.
Pipeline Architecture Patterns
Batch Pipelines
Batch pipelines process data on a schedule — hourly, daily, or weekly. They're the simplest pattern and the right starting point for most AI projects.
Source DB → Extract → Transform → Load → Data Warehouse → Feature Store
(daily) (clean, (write)
enrich,
validate)
Best for:
- Training data preparation
- Daily model retraining
- Analytics and reporting
- Knowledge base updates for RAG
Tools: Apache Airflow, Dagster, Prefect, dbt
Example: Daily training data refresh with Dagster
from dagster import asset, define_asset_job, ScheduleDefinition
@asset
def raw_customer_interactions():
"""Extract yesterday's customer interactions from production DB."""
query = """
SELECT id, message, category, resolution, created_at
FROM support_tickets
WHERE created_at >= CURRENT_DATE - INTERVAL '1 day'
"""
return db.execute(query).fetchall()
@asset
def cleaned_interactions(raw_customer_interactions):
"""Clean and validate interaction data."""
df = pd.DataFrame(raw_customer_interactions)
df = df.dropna(subset=["message", "category"])
df["message"] = df["message"].apply(clean_text)
df = df[df["message"].str.len() > 10]
return df
@asset
def training_dataset(cleaned_interactions):
"""Format for model training and save to feature store."""
df = cleaned_interactions
df["label"] = df["category"].map(CATEGORY_MAPPING)
save_to_feature_store(df, "customer_intent_training")
return df
daily_refresh = ScheduleDefinition(
job=define_asset_job("daily_training_refresh"),
cron_schedule="0 2 * * *", # 2 AM daily
)
Streaming Pipelines
Streaming pipelines process data in real-time or near-real-time as events occur. They're essential when your AI system needs to react to live data.
Event Source → Message Queue → Process → Serve
(user action, (Kafka, SQS) (transform, (feature store,
sensor data, embed, vector DB,
API webhook) validate) cache)
Best for:
- Real-time personalization
- Fraud detection
- Live monitoring and alerting
- Conversational AI with memory
Tools: Apache Kafka, AWS Kinesis, Redis Streams, Apache Flink
Hybrid Pipelines
Most production AI systems use hybrid architectures: batch pipelines for heavy lifting (training data, knowledge base indexing) and streaming for real-time updates.
| Component | Pattern | Frequency | |-----------|---------|-----------| | Knowledge base indexing | Batch | Daily or on document change | | User behavior features | Streaming | Real-time | | Model retraining | Batch | Weekly or on data threshold | | Embedding updates | Batch + incremental | Daily batch + real-time for new docs | | Anomaly detection | Streaming | Real-time | | Cost and usage reporting | Batch | Daily |
This is the architecture we recommend for most AI development projects. Start batch, add streaming where latency requirements demand it.
Data Ingestion: Getting Data In
Common Data Sources for AI
| Source Type | Examples | Ingestion Pattern | Challenges | |-------------|---------|-------------------|------------| | Relational databases | PostgreSQL, MySQL | CDC or scheduled queries | Schema changes, large tables | | APIs | REST, GraphQL | Polling or webhooks | Rate limits, pagination, auth | | File storage | S3, GCS, local files | Event-triggered or scheduled | Format inconsistency, large files | | SaaS platforms | Salesforce, HubSpot, Zendesk | API + webhooks | API limits, data model complexity | | Streaming sources | Kafka, webhooks, IoT | Continuous consumption | Ordering, deduplication, backpressure | | Web scraping | Public websites | Scheduled crawls | Legal considerations, site changes | | Documents | PDFs, Word, emails | Batch processing | Unstructured, OCR quality |
Change Data Capture (CDC)
For database sources, CDC is the most efficient ingestion pattern. Instead of re-reading entire tables, CDC captures only the changes (inserts, updates, deletes).
# Debezium CDC configuration for PostgreSQL
{
"name": "postgres-source",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "prod-db.internal",
"database.port": "5432",
"database.dbname": "app_production",
"table.include.list": "public.documents,public.user_feedback",
"plugin.name": "pgoutput",
"publication.name": "ai_pipeline_pub",
"slot.name": "ai_pipeline_slot",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
}
}
CDC ensures your AI system always has fresh data without putting load on your production database.
Transformation and Feature Engineering
Raw data is rarely usable by AI systems directly. The transformation layer cleans, enriches, and reshapes data into features the model can consume.
Common Transformations for AI
| Transformation | Purpose | Example | |---------------|---------|---------| | Text cleaning | Remove noise from text data | Strip HTML, normalize whitespace, fix encoding | | Tokenization | Prepare text for NLP | Split text into tokens, handle special characters | | Normalization | Scale numeric features | Min-max scaling, z-score normalization | | Encoding | Convert categories to numbers | One-hot encoding, label encoding, embeddings | | Aggregation | Create summary features | Average order value, message count per day | | Window functions | Time-based features | Rolling 7-day average, trend indicators | | Deduplication | Remove duplicate records | Fuzzy matching, hash-based dedup | | PII masking | Comply with privacy regulations | Mask emails, phone numbers, SSNs |
Feature Engineering with dbt
dbt (data build tool) is excellent for SQL-based feature engineering. It provides version control, testing, and documentation for your transformation logic.
-- models/features/customer_features.sql
WITH customer_orders AS (
SELECT
customer_id,
COUNT(*) AS total_orders,
AVG(order_value) AS avg_order_value,
MAX(order_date) AS last_order_date,
MIN(order_date) AS first_order_date,
SUM(CASE WHEN order_date >= CURRENT_DATE - INTERVAL '30 days'
THEN 1 ELSE 0 END) AS orders_last_30d
FROM {{ ref('stg_orders') }}
GROUP BY customer_id
),
customer_support AS (
SELECT
customer_id,
COUNT(*) AS total_tickets,
AVG(resolution_time_hours) AS avg_resolution_hours,
SUM(CASE WHEN sentiment = 'negative' THEN 1 ELSE 0 END) AS negative_tickets
FROM {{ ref('stg_support_tickets') }}
GROUP BY customer_id
)
SELECT
o.customer_id,
o.total_orders,
o.avg_order_value,
o.orders_last_30d,
DATE_PART('day', CURRENT_DATE - o.last_order_date) AS days_since_last_order,
DATE_PART('day', o.last_order_date - o.first_order_date) AS customer_tenure_days,
COALESCE(s.total_tickets, 0) AS support_tickets,
COALESCE(s.avg_resolution_hours, 0) AS avg_resolution_hours,
COALESCE(s.negative_tickets, 0) AS negative_support_tickets
FROM customer_orders o
LEFT JOIN customer_support s ON o.customer_id = s.customer_id
Data Quality and Validation
Bad data is the top reason AI systems fail in production. A single corrupted batch can retrain your model on garbage or fill your vector database with nonsensical embeddings.
Validation Layers
Build validation into every stage of your pipeline:
| Layer | What to Check | Tools | |-------|---------------|-------| | Schema validation | Column types, required fields, formats | Great Expectations, Pandera, dbt tests | | Statistical validation | Value distributions, outliers, null rates | Great Expectations, custom checks | | Referential integrity | Foreign key relationships, orphaned records | dbt tests, custom SQL | | Freshness checks | Data is recent enough | Dagster freshness policies, dbt source freshness | | Volume checks | Expected row counts, no empty batches | Custom alerts | | Semantic validation | Content makes sense (not corrupted) | LLM-based checks for text data |
Example: Data Quality Checks with Great Expectations
import great_expectations as gx
context = gx.get_context()
validator = context.sources.pandas_default.read_csv(
"training_data.csv"
)
validator.expect_column_values_to_not_be_null("text_content")
validator.expect_column_values_to_not_be_null("label")
validator.expect_column_value_lengths_to_be_between(
"text_content", min_value=10, max_value=50000
)
validator.expect_column_values_to_be_in_set(
"label", ["positive", "negative", "neutral"]
)
validator.expect_column_values_to_be_unique("document_id")
validator.expect_table_row_count_to_be_between(
min_value=1000, max_value=1000000
)
results = validator.validate()
if not results.success:
raise DataQualityError(f"Validation failed: {results.statistics}")
Vector Embedding Pipelines for RAG
If you're building a RAG system, you need a specialized pipeline that converts documents into vector embeddings and keeps them synchronized with your source data.
RAG Indexing Pipeline
Documents → Parse → Chunk → Enrich → Embed → Upsert to Vector DB
↑
Track versions
for incremental
updates
Chunking Strategies
How you split documents into chunks dramatically affects retrieval quality.
| Strategy | Description | Best For | |----------|-------------|---------| | Fixed-size | Split every N tokens with overlap | Simple documents, uniform structure | | Semantic | Split at topic/section boundaries | Long-form content, articles | | Recursive | Try multiple separators in order | Mixed content types | | Document-aware | Use document structure (headings, sections) | Structured docs, technical manuals | | Sentence-based | Split at sentence boundaries | FAQ content, short passages |
Embedding Pipeline Example
from openai import OpenAI
import hashlib
client = OpenAI()
def process_document(doc_id: str, content: str, metadata: dict):
"""Full pipeline: chunk, embed, and upsert a document."""
chunks = semantic_chunker(content, max_tokens=512, overlap=50)
embeddings = []
for i, chunk in enumerate(chunks):
chunk_id = f"{doc_id}_chunk_{i}"
content_hash = hashlib.sha256(chunk.encode()).hexdigest()
if not has_changed(chunk_id, content_hash):
continue
embedding = client.embeddings.create(
model="text-embedding-3-small",
input=chunk
).data[0].embedding
embeddings.append({
"id": chunk_id,
"values": embedding,
"metadata": {
"doc_id": doc_id,
"chunk_index": i,
"text": chunk,
"content_hash": content_hash,
**metadata
}
})
if embeddings:
vector_db.upsert(vectors=embeddings)
update_hash_cache(embeddings)
return len(embeddings)
The content hash check ensures you only re-embed chunks that have actually changed, reducing API costs significantly for incremental updates.
Pipeline Tooling Comparison
Orchestration Tools
| Tool | Language | Best For | Learning Curve | Hosting | |------|----------|---------|----------------|---------| | Apache Airflow | Python | Complex DAGs, mature ecosystem | Steep | Self-hosted or managed (MWAA, Astronomer) | | Dagster | Python | Data-aware orchestration, asset-based | Moderate | Self-hosted or Dagster Cloud | | Prefect | Python | Simple workflows, good developer experience | Low | Self-hosted or Prefect Cloud | | Temporal | Go/Java/Python | Long-running, fault-tolerant workflows | Steep | Self-hosted or Temporal Cloud | | dbt | SQL | SQL transformations, analytics engineering | Low | dbt Core (free) or dbt Cloud |
For most AI teams, we recommend Dagster for orchestration (its asset-based model maps well to ML pipelines) and dbt for SQL transformations.
Transformation Tools
| Tool | Type | Best For | |------|------|---------| | dbt | SQL-based | Warehouse transformations, feature engineering | | Pandas | Python | Small-medium datasets, complex transformations | | Polars | Python/Rust | Large datasets, performance-critical | | Apache Spark | Distributed | Very large datasets (TB+) | | DuckDB | SQL/Python | Local analytics, medium datasets |
Storage Architecture
Data Storage Layers
| Layer | Purpose | Tools | When to Use | |-------|---------|-------|-------------| | Data lake | Raw, unprocessed data | S3, GCS, MinIO | Always — keep raw data | | Data warehouse | Cleaned, structured data | PostgreSQL, BigQuery, Snowflake | Analytics, feature engineering | | Feature store | ML-ready features | Feast, Tecton, custom | When features are shared across models | | Vector database | Embeddings for retrieval | Pinecone, Weaviate, pgvector | RAG systems, semantic search | | Cache | Low-latency serving | Redis, Memcached | Real-time inference, frequent queries |
For most AI startups, PostgreSQL with pgvector handles both the relational data warehouse and vector storage needs, reducing operational complexity.
The Feature Store Pattern
A feature store provides a centralized repository for ML features with two key capabilities:
- Offline store — Historical features for training (batch)
- Online store — Low-latency features for inference (real-time)
from feast import Entity, Feature, FeatureView, FileSource
customer = Entity(name="customer_id", value_type=ValueType.INT64)
customer_features = FeatureView(
name="customer_features",
entities=[customer],
ttl=timedelta(days=1),
features=[
Feature(name="total_orders", dtype=ValueType.INT64),
Feature(name="avg_order_value", dtype=ValueType.FLOAT),
Feature(name="days_since_last_order", dtype=ValueType.INT64),
Feature(name="support_tickets_30d", dtype=ValueType.INT64),
],
online=True,
source=FileSource(path="s3://features/customer_features.parquet"),
)
Monitoring Data Drift
Data drift — when the distribution of incoming data changes over time — is a silent killer of AI performance. Your model was trained on data that looked a certain way. When production data starts looking different, accuracy degrades.
Types of Drift
| Drift Type | What Changes | Detection Method | |------------|-------------|-----------------| | Data drift | Input feature distributions | Statistical tests (KS, PSI, chi-squared) | | Concept drift | Relationship between features and target | Model performance monitoring | | Schema drift | Data structure or types | Schema validation checks | | Volume drift | Amount of data | Row count monitoring |
Monitoring Implementation
from scipy import stats
def detect_drift(reference_data, current_data, threshold=0.05):
"""Detect data drift using Kolmogorov-Smirnov test."""
drift_report = {}
for column in reference_data.columns:
if reference_data[column].dtype in ['float64', 'int64']:
stat, p_value = stats.ks_2samp(
reference_data[column].dropna(),
current_data[column].dropna()
)
drift_report[column] = {
"statistic": stat,
"p_value": p_value,
"drift_detected": p_value < threshold
}
drifted_features = [
col for col, result in drift_report.items()
if result["drift_detected"]
]
if drifted_features:
alert(f"Data drift detected in: {drifted_features}")
return drift_report
Run drift detection on every batch pipeline execution and on a schedule for streaming pipelines.
Cost Optimization
Data pipelines can become expensive, especially at scale. Here's how to keep costs under control.
Cost Drivers
| Cost Center | What Drives It | Optimization Strategy | |-------------|---------------|----------------------| | Compute | Data volume, transformation complexity | Right-size instances, use spot/preemptible | | Storage | Raw data retention, embedding dimensions | Tiered storage, compression, reduce dimensions | | Embedding API | Number of documents, re-embedding frequency | Content hashing, incremental updates | | Orchestration | Number of tasks, scheduling frequency | Batch related tasks, reduce schedule frequency | | Data transfer | Cross-region, cross-cloud movement | Co-locate storage and compute |
Quick Wins
- Incremental processing — Only process new or changed data, not the entire dataset every time
- Content hashing — Skip re-embedding documents that haven't changed
- Smaller embedding models —
text-embedding-3-smallis 5x cheaper thantext-embedding-3-largewith minimal quality loss for most use cases - Compression — Use Parquet instead of CSV, enable compression on storage
- Scheduled scaling — Scale down compute when pipelines aren't running
Getting Started: A Practical Roadmap
Phase 1: Foundation (Week 1–2)
- Set up a data warehouse (PostgreSQL or BigQuery)
- Create basic batch pipelines for your primary data sources
- Implement schema validation on all ingested data
- Document your data sources and their update frequencies
Phase 2: ML-Ready (Week 3–4)
- Build transformation layers for feature engineering
- Set up a vector embedding pipeline if building RAG
- Implement data quality checks at every stage
- Create a development environment with sample data
Phase 3: Production (Week 5–6)
- Add monitoring and alerting for pipeline failures
- Implement data drift detection
- Set up incremental processing for efficiency
- Document the pipeline architecture and runbooks
Phase 4: Optimization (Ongoing)
- Monitor costs and optimize expensive steps
- Add streaming components where latency matters
- Build a feature store if multiple models share features
- Automate model retraining triggered by data updates
Building robust AI data pipelines is not glamorous work, but it's the foundation that determines whether your AI system delivers consistent results or degrades unpredictably. Invest in your data infrastructure early, and every model you build on top of it will perform better.
Need help designing and building your AI data infrastructure? Get in touch — our team has built data pipelines for everything from early-stage AI startups to enterprise ML platforms.
Need Help Building Your Project?
From web apps and mobile apps to AI solutions and SaaS platforms — we ship production software for 300+ clients.
Related Articles
AI Agent Orchestration: How to Coordinate Agents in Production
AI agent orchestration is how you coordinate multiple agents, tools, and workflows into reliable production systems. This guide covers orchestration patterns, frameworks, state management, error handling, and the protocols (MCP, A2A) that make it work.
10 min readAI Agent Testing and Evaluation: How to Measure Quality Before and After Launch
You cannot ship an AI agent to production without a testing strategy. This guide covers evaluation datasets, accuracy metrics, regression testing, production monitoring, and the tools and frameworks for testing AI agents systematically.
10 min readAI Agents for Accounting & Finance: Bookkeeping, AP/AR, and Reporting
AI agents automate accounting tasks — invoice processing, expense management, reconciliation, and financial reporting — reducing manual work by 60–80% while improving accuracy. This guide covers use cases, ROI, compliance, and implementation.