Traditional data pipelines move and transform data. AI pipelines need to do more—embed, chunk, index, and keep everything synchronized. Building data infrastructure for AI requires new patterns.
Here’s how to build AI-native data pipelines.
AI Data Requirements
Beyond ETL
ai_data_needs:
traditional_etl:
- Extract from sources
- Transform to schema
- Load to warehouse
ai_pipeline_additions:
- Chunking documents
- Generating embeddings
- Indexing for retrieval
- Maintaining freshness
- Handling multi-modal
Pipeline Architecture
Core Components
class AIDataPipeline:
"""End-to-end AI data pipeline."""
def __init__(
self,
chunker: Chunker,
embedder: Embedder,
vector_store: VectorStore,
metadata_store: MetadataStore
):
self.chunker = chunker
self.embedder = embedder
self.vector_store = vector_store
self.metadata_store = metadata_store
async def ingest_document(
self,
document: Document
) -> IngestionResult:
# Check if already processed
existing = await self.metadata_store.get(document.id)
if existing and existing.hash == document.content_hash:
return IngestionResult(status="unchanged")
# Chunk document
chunks = await self.chunker.chunk(document)
# Generate embeddings
embeddings = await self.embedder.embed_batch(
[chunk.text for chunk in chunks]
)
# Store in vector database
for chunk, embedding in zip(chunks, embeddings):
await self.vector_store.upsert(
id=f"{document.id}_{chunk.index}",
embedding=embedding,
metadata={
"document_id": document.id,
"chunk_index": chunk.index,
"text": chunk.text,
"source": document.source,
"updated_at": datetime.utcnow().isoformat()
}
)
# Update metadata
await self.metadata_store.upsert(
document_id=document.id,
hash=document.content_hash,
chunk_count=len(chunks),
processed_at=datetime.utcnow()
)
return IngestionResult(
status="processed",
chunks=len(chunks)
)
Incremental Processing
class IncrementalPipeline:
"""Process only changed documents."""
async def sync(self, source: DataSource) -> SyncResult:
# Get changes since last sync
last_sync = await self.state.get_last_sync(source.id)
changes = await source.get_changes(since=last_sync)
processed = 0
deleted = 0
for change in changes:
if change.type == "upsert":
await self.pipeline.ingest_document(change.document)
processed += 1
elif change.type == "delete":
await self.delete_document(change.document_id)
deleted += 1
# Update sync state
await self.state.set_last_sync(source.id, datetime.utcnow())
return SyncResult(
processed=processed,
deleted=deleted
)
async def delete_document(self, document_id: str):
# Delete all chunks
await self.vector_store.delete_by_filter(
filter={"document_id": document_id}
)
await self.metadata_store.delete(document_id)
Multi-Source Aggregation
multi_source_pipeline:
sources:
notion:
type: "api"
sync_frequency: "1 hour"
transformer: "notion_to_markdown"
confluence:
type: "api"
sync_frequency: "4 hours"
transformer: "confluence_to_markdown"
github:
type: "webhook"
events: ["push", "pull_request"]
transformer: "code_to_searchable"
pdfs:
type: "storage"
location: "s3://docs-bucket"
transformer: "pdf_to_text"
unified_index:
- All sources indexed together
- Source metadata preserved
- Cross-source search enabled
Chunking Strategies
class SmartChunker:
"""Context-aware document chunking."""
async def chunk(
self,
document: Document
) -> list[Chunk]:
doc_type = self._detect_type(document)
if doc_type == "code":
return await self._chunk_code(document)
elif doc_type == "markdown":
return await self._chunk_markdown(document)
else:
return await self._chunk_text(document)
async def _chunk_markdown(
self,
document: Document
) -> list[Chunk]:
sections = self._split_by_headers(document.content)
chunks = []
for section in sections:
# Keep sections together if small enough
if len(section.text) <= self.max_chunk_size:
chunks.append(Chunk(
text=f"# {section.header}\n\n{section.text}",
metadata={"section": section.header}
))
else:
# Split large sections
sub_chunks = self._split_by_paragraphs(section)
for sub in sub_chunks:
sub.text = f"# {section.header}\n\n{sub.text}"
chunks.extend(sub_chunks)
return chunks
Freshness and Quality
data_quality:
freshness:
- Track last sync time per source
- Alert on stale data
- Prioritize frequently accessed
quality_checks:
- Embedding validity
- Chunk coherence
- Metadata completeness
monitoring:
- Index size over time
- Query performance
- Source health
Key Takeaways
- AI data pipelines extend traditional ETL
- Chunking strategy affects retrieval quality
- Incremental processing essential at scale
- Multi-source aggregation enables comprehensive RAG
- Monitor freshness and quality continuously
- Version your embeddings
- Plan for re-indexing scenarios
Good data pipelines make good AI applications.