Catalog connectors enable federated SQL queries across multiple tables from external catalog providers (databases, data warehouses, etc.). They automatically discover and register tables without explicit configuration.Documentation Index
Fetch the complete documentation index at: https://mintlify.com/spiceai/spiceai/llms.txt
Use this file to discover all available pages before exploring further.
Overview
Catalog connectors differ from regular data connectors:- Regular Connector: Provides access to a single, explicitly configured dataset
- Catalog Connector: Discovers and provides access to all tables in a catalog/schema
┌─────────────────────────────────────────┐
│ Catalog Connector │
│ ┌───────────────────────────────────┐ │
│ │ Catalog Discovery │ │
│ │ - List schemas │ │
│ │ - List tables │ │
│ │ - Filter by glob patterns │ │
│ └───────────────────────────────────┘ │
│ ↓ │
│ ┌───────────────────────────────────┐ │
│ │ Table Registration │ │
│ │ - schema.table1 → TableProvider │ │
│ │ - schema.table2 → TableProvider │ │
│ │ - schema.table3 → TableProvider │ │
│ └───────────────────────────────────┘ │
│ ↓ │
│ ┌───────────────────────────────────┐ │
│ │ Federated Queries │ │
│ │ SELECT * FROM schema.table1 │ │
│ │ JOIN schema.table2 ... │ │
│ └───────────────────────────────────┘ │
└─────────────────────────────────────────┘
Catalog Component
Catalog Structure
use runtime::component::catalog::Catalog;
use globset::GlobSet;
use std::collections::HashMap;
pub struct Catalog {
/// Catalog provider name (e.g., "postgres", "mysql")
pub provider: String,
/// Optional catalog identifier
pub catalog_id: Option<String>,
/// Source string (e.g., "postgres://...")
pub from: String,
/// Catalog name in Spice
pub name: String,
/// Access mode (read-only, read-write)
pub access: AccessMode,
/// Include patterns for table filtering
pub include: Option<GlobSet>,
/// Connector-specific parameters
pub params: HashMap<String, String>,
/// Parameters applied to all discovered datasets
pub dataset_params: HashMap<String, String>,
}
Spicepod Configuration
version: v1beta1
kind: Spicepod
name: my_app
catalogs:
- from: postgres:public
name: pg_catalog
params:
pg_host: localhost
pg_port: 5432
pg_db: mydb
pg_user: ${secrets:PG_USER}
pg_pass: ${secrets:PG_PASS}
dataset_params:
# Applied to all discovered tables
pg_sslmode: require
include:
- "users*" # Include tables starting with 'users'
- "orders" # Include 'orders' table
- "products_*" # Include tables matching pattern
Implementation
Catalog functionality is implemented as an optional method on existingDataConnectorFactory:
Add Catalog Support to Factory
use runtime::dataconnector::DataConnectorFactory;
use runtime::component::catalog::Catalog;
use datafusion::catalog::CatalogProvider;
use async_trait::async_trait;
use std::sync::Arc;
#[async_trait]
impl DataConnectorFactory for PostgresFactory {
// ... existing methods ...
/// Returns a CatalogProvider for federated queries
async fn catalog_provider(
&self,
catalog: Arc<Catalog>,
) -> Option<Arc<dyn CatalogProvider>> {
Some(create_postgres_catalog_provider(catalog).await.ok()?)
}
}
Implement CatalogProvider
use datafusion::catalog::{
CatalogProvider, SchemaProvider,
};
use datafusion::datasource::TableProvider;
use std::collections::HashMap;
use std::sync::Arc;
pub struct PostgresCatalogProvider {
catalog: Arc<Catalog>,
schemas: Arc<RwLock<HashMap<String, Arc<dyn SchemaProvider>>>>,
}
impl PostgresCatalogProvider {
pub async fn new(catalog: Arc<Catalog>) -> Result<Self> {
let provider = Self {
catalog: Arc::clone(&catalog),
schemas: Arc::new(RwLock::new(HashMap::new())),
};
// Discover schemas
provider.discover_schemas().await?;
Ok(provider)
}
async fn discover_schemas(&self) -> Result<()> {
let conn = self.connect().await?;
// Query for available schemas
let schemas = conn.query(
"SELECT schema_name FROM information_schema.schemata",
&[],
).await?;
let mut schema_map = self.schemas.write().await;
for row in schemas {
let schema_name: String = row.get(0);
let schema_provider = PostgresSchemaProvider::new(
Arc::clone(&self.catalog),
schema_name.clone(),
).await?;
schema_map.insert(
schema_name,
Arc::new(schema_provider) as Arc<dyn SchemaProvider>,
);
}
Ok(())
}
}
impl CatalogProvider for PostgresCatalogProvider {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn schema_names(&self) -> Vec<String> {
let schemas = self.schemas.blocking_read();
schemas.keys().cloned().collect()
}
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
let schemas = self.schemas.blocking_read();
schemas.get(name).cloned()
}
}
Implement SchemaProvider
use datafusion::catalog::SchemaProvider;
use datafusion::datasource::TableProvider;
pub struct PostgresSchemaProvider {
catalog: Arc<Catalog>,
schema_name: String,
tables: Arc<RwLock<HashMap<String, Arc<dyn TableProvider>>>>,
}
impl PostgresSchemaProvider {
pub async fn new(
catalog: Arc<Catalog>,
schema_name: String,
) -> Result<Self> {
let provider = Self {
catalog: Arc::clone(&catalog),
schema_name: schema_name.clone(),
tables: Arc::new(RwLock::new(HashMap::new())),
};
// Discover tables in this schema
provider.discover_tables().await?;
Ok(provider)
}
async fn discover_tables(&self) -> Result<()> {
let conn = self.connect().await?;
// Query for tables in schema
let tables = conn.query(
"SELECT table_name FROM information_schema.tables
WHERE table_schema = $1 AND table_type = 'BASE TABLE'",
&[&self.schema_name],
).await?;
let mut table_map = self.tables.write().await;
for row in tables {
let table_name: String = row.get(0);
// Apply include filters
if !self.should_include_table(&table_name) {
continue;
}
// Create TableProvider for this table
let table_provider = self.create_table_provider(
&self.schema_name,
&table_name,
).await?;
table_map.insert(table_name, table_provider);
}
Ok(())
}
fn should_include_table(&self, table_name: &str) -> bool {
match &self.catalog.include {
Some(globset) => globset.is_match(table_name),
None => true, // Include all tables if no filter
}
}
async fn create_table_provider(
&self,
schema_name: &str,
table_name: &str,
) -> Result<Arc<dyn TableProvider>> {
// Create a TableProvider for this specific table
let table_ref = format!("{}.{}", schema_name, table_name);
let factory = PostgresFactory::new();
let params = build_connector_params(
Arc::clone(&self.catalog),
&table_ref,
)?;
let connector = factory.create(params).await?;
// Build temporary dataset for table provider creation
let dataset = build_catalog_dataset(
Arc::clone(&self.catalog),
schema_name,
table_name,
);
connector.read_provider(&dataset).await
.map_err(Into::into)
}
}
#[async_trait]
impl SchemaProvider for PostgresSchemaProvider {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn table_names(&self) -> Vec<String> {
let tables = self.tables.blocking_read();
tables.keys().cloned().collect()
}
async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
let tables = self.tables.read().await;
tables.get(name).cloned()
}
fn table_exist(&self, name: &str) -> bool {
let tables = self.tables.blocking_read();
tables.contains_key(name)
}
}
Helper Functions
Build Connector Parameters
use runtime::dataconnector::parameters::ConnectorParamsBuilder;
fn build_connector_params(
catalog: Arc<Catalog>,
table_ref: &str,
) -> Result<ConnectorParams> {
// Create synthetic dataset from catalog config
let dataset = build_catalog_dataset(
Arc::clone(&catalog),
schema_name,
table_name,
);
let mut builder = ConnectorParamsBuilder::new(
catalog.provider.clone(),
ConnectorComponent::from(&dataset),
);
// Merge catalog params with dataset params
let mut all_params = catalog.params.clone();
all_params.extend(catalog.dataset_params.clone());
for (key, value) in all_params {
builder = builder.with_param(key, value);
}
builder.build(secrets, tokio_handle).await
}
Build Synthetic Dataset
use runtime::component::dataset::Dataset;
use datafusion::sql::TableReference;
fn build_catalog_dataset(
catalog: Arc<Catalog>,
schema_name: &str,
table_name: &str,
) -> Dataset {
let table_ref = TableReference::partial(
schema_name.to_string(),
table_name.to_string(),
);
Dataset {
from: format!("{}:{}", catalog.provider, table_ref),
name: table_ref,
params: catalog.dataset_params.clone(),
app: Arc::clone(&catalog.app),
runtime: Arc::clone(&catalog.runtime),
// ... other fields with defaults
}
}
Complete Example: MySQL Catalog
use async_trait::async_trait;
use datafusion::catalog::{CatalogProvider, SchemaProvider};
use runtime::component::catalog::Catalog;
use std::sync::Arc;
pub struct MySQLCatalogProvider {
catalog: Arc<Catalog>,
pool: Pool<MySqlConnectionManager>,
schemas: Arc<RwLock<HashMap<String, Arc<dyn SchemaProvider>>>>,
}
impl MySQLCatalogProvider {
pub async fn new(catalog: Arc<Catalog>) -> Result<Self> {
// Create connection pool from catalog params
let pool = create_mysql_pool(&catalog.params).await?;
let provider = Self {
catalog: Arc::clone(&catalog),
pool,
schemas: Arc::new(RwLock::new(HashMap::new())),
};
provider.discover_schemas().await?;
Ok(provider)
}
async fn discover_schemas(&self) -> Result<()> {
let mut conn = self.pool.get().await?;
let schemas: Vec<String> = conn
.query("SHOW DATABASES")
.await?
.into_iter()
.map(|row| row.get(0))
.collect();
let mut schema_map = self.schemas.write().await;
for schema_name in schemas {
// Skip system schemas
if ["information_schema", "mysql", "performance_schema", "sys"]
.contains(&schema_name.as_str())
{
continue;
}
let schema_provider = MySQLSchemaProvider::new(
Arc::clone(&self.catalog),
Arc::clone(&self.pool),
schema_name.clone(),
)
.await?;
schema_map.insert(
schema_name,
Arc::new(schema_provider) as Arc<dyn SchemaProvider>,
);
}
Ok(())
}
}
impl CatalogProvider for MySQLCatalogProvider {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn schema_names(&self) -> Vec<String> {
let schemas = self.schemas.blocking_read();
schemas.keys().cloned().collect()
}
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
let schemas = self.schemas.blocking_read();
schemas.get(name).cloned()
}
}
Pattern Filtering
Glob Pattern Matching
use globset::{Glob, GlobSetBuilder};
fn build_include_filter(patterns: &[String]) -> Result<GlobSet> {
let mut builder = GlobSetBuilder::new();
for pattern in patterns {
let glob = Glob::new(pattern)?;
builder.add(glob);
}
builder.build().map_err(Into::into)
}
// Usage
let include = build_include_filter(&[
"users*".to_string(),
"orders".to_string(),
"products_*".to_string(),
])?;
if include.is_match("users") {
// Table matches
}
Complex Filtering
catalogs:
- from: postgres:public
name: pg_catalog
include:
- "fact_*" # Include all fact tables
- "dim_*" # Include all dimension tables
- "!*_temp" # Exclude temporary tables
- "!*_backup" # Exclude backup tables
Access Modes
Read-Only Catalogs
use runtime::component::access::AccessMode;
if catalog.access == AccessMode::Read {
// Use read_provider() only
connector.read_provider(&dataset).await?
}
Read-Write Catalogs
if catalog.access == AccessMode::ReadWrite {
// Use read_write_provider() if available
if let Some(result) = connector.read_write_provider(&dataset).await {
result?
} else {
connector.read_provider(&dataset).await?
}
}
Best Practices
DO:
- ✅ Use connection pooling for catalog discovery
- ✅ Cache discovered schemas and tables
- ✅ Filter tables with glob patterns
- ✅ Skip system schemas/tables
- ✅ Handle schema evolution dynamically
- ✅ Log discovered tables at INFO level
- ✅ Apply dataset_params to all tables
- ✅ Support both qualified and unqualified names
DON’T:
- ❌ Discover tables on every query
- ❌ Include system/internal schemas
- ❌ Block on schema discovery
- ❌ Ignore include/exclude patterns
- ❌ Create new connections per table
- ❌ Panic on missing schemas
- ❌ Skip error handling
- ❌ Forget to apply access modes
Error Handling
use snafu::prelude::*;
#[derive(Debug, Snafu)]
pub enum CatalogError {
#[snafu(display("Failed to discover schemas: {source}"))]
SchemaDiscoveryFailed { source: sqlx::Error },
#[snafu(display("Failed to discover tables in schema {schema}: {source}"))]
TableDiscoveryFailed {
schema: String,
source: sqlx::Error,
},
#[snafu(display("Schema {schema} not found"))]
SchemaNotFound { schema: String },
#[snafu(display("Table {table} not found in schema {schema}"))]
TableNotFound { schema: String, table: String },
}
Testing
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_catalog_discovery() {
let catalog = create_test_catalog();
let provider = PostgresCatalogProvider::new(catalog)
.await
.expect("should create");
let schemas = provider.schema_names();
assert!(schemas.contains(&"public".to_string()));
let public_schema = provider.schema("public")
.expect("should have public schema");
let tables = public_schema.table_names();
assert!(!tables.is_empty());
}
#[tokio::test]
async fn test_include_filters() {
let catalog = create_catalog_with_filters(vec![
"users*".to_string(),
]);
let provider = PostgresCatalogProvider::new(catalog)
.await
.expect("should create");
let schema = provider.schema("public").unwrap();
let tables = schema.table_names();
assert!(tables.iter().all(|t| t.starts_with("users")));
}
}
Next Steps
- Building Custom Connectors - Data source integration
- Building Secret Stores - Secure credential management
- Catalog Provider API - DataFusion docs