Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/spiceai/spiceai/llms.txt

Use this file to discover all available pages before exploring further.

Catalog connectors enable federated SQL queries across multiple tables from external catalog providers (databases, data warehouses, etc.). They automatically discover and register tables without explicit configuration.

Overview

Catalog connectors differ from regular data connectors:
  • Regular Connector: Provides access to a single, explicitly configured dataset
  • Catalog Connector: Discovers and provides access to all tables in a catalog/schema
┌─────────────────────────────────────────┐
│  Catalog Connector                      │
│  ┌───────────────────────────────────┐ │
│  │  Catalog Discovery                │ │
│  │  - List schemas                   │ │
│  │  - List tables                    │ │
│  │  - Filter by glob patterns        │ │
│  └───────────────────────────────────┘ │
│           ↓                             │
│  ┌───────────────────────────────────┐ │
│  │  Table Registration               │ │
│  │  - schema.table1 → TableProvider  │ │
│  │  - schema.table2 → TableProvider  │ │
│  │  - schema.table3 → TableProvider  │ │
│  └───────────────────────────────────┘ │
│           ↓                             │
│  ┌───────────────────────────────────┐ │
│  │  Federated Queries                │ │
│  │  SELECT * FROM schema.table1      │ │
│  │    JOIN schema.table2 ...         │ │
│  └───────────────────────────────────┘ │
└─────────────────────────────────────────┘

Catalog Component

Catalog Structure

use runtime::component::catalog::Catalog;
use globset::GlobSet;
use std::collections::HashMap;

pub struct Catalog {
    /// Catalog provider name (e.g., "postgres", "mysql")
    pub provider: String,

    /// Optional catalog identifier
    pub catalog_id: Option<String>,

    /// Source string (e.g., "postgres://...")
    pub from: String,

    /// Catalog name in Spice
    pub name: String,

    /// Access mode (read-only, read-write)
    pub access: AccessMode,

    /// Include patterns for table filtering
    pub include: Option<GlobSet>,

    /// Connector-specific parameters
    pub params: HashMap<String, String>,

    /// Parameters applied to all discovered datasets
    pub dataset_params: HashMap<String, String>,
}

Spicepod Configuration

version: v1beta1
kind: Spicepod
name: my_app

catalogs:
  - from: postgres:public
    name: pg_catalog
    params:
      pg_host: localhost
      pg_port: 5432
      pg_db: mydb
      pg_user: ${secrets:PG_USER}
      pg_pass: ${secrets:PG_PASS}
    dataset_params:
      # Applied to all discovered tables
      pg_sslmode: require
    include:
      - "users*"      # Include tables starting with 'users'
      - "orders"      # Include 'orders' table
      - "products_*"  # Include tables matching pattern

Implementation

Catalog functionality is implemented as an optional method on existing DataConnectorFactory:

Add Catalog Support to Factory

use runtime::dataconnector::DataConnectorFactory;
use runtime::component::catalog::Catalog;
use datafusion::catalog::CatalogProvider;
use async_trait::async_trait;
use std::sync::Arc;

#[async_trait]
impl DataConnectorFactory for PostgresFactory {
    // ... existing methods ...

    /// Returns a CatalogProvider for federated queries
    async fn catalog_provider(
        &self,
        catalog: Arc<Catalog>,
    ) -> Option<Arc<dyn CatalogProvider>> {
        Some(create_postgres_catalog_provider(catalog).await.ok()?)
    }
}

Implement CatalogProvider

use datafusion::catalog::{
    CatalogProvider, SchemaProvider,
};
use datafusion::datasource::TableProvider;
use std::collections::HashMap;
use std::sync::Arc;

pub struct PostgresCatalogProvider {
    catalog: Arc<Catalog>,
    schemas: Arc<RwLock<HashMap<String, Arc<dyn SchemaProvider>>>>,
}

impl PostgresCatalogProvider {
    pub async fn new(catalog: Arc<Catalog>) -> Result<Self> {
        let provider = Self {
            catalog: Arc::clone(&catalog),
            schemas: Arc::new(RwLock::new(HashMap::new())),
        };

        // Discover schemas
        provider.discover_schemas().await?;

        Ok(provider)
    }

    async fn discover_schemas(&self) -> Result<()> {
        let conn = self.connect().await?;

        // Query for available schemas
        let schemas = conn.query(
            "SELECT schema_name FROM information_schema.schemata",
            &[],
        ).await?;

        let mut schema_map = self.schemas.write().await;

        for row in schemas {
            let schema_name: String = row.get(0);

            let schema_provider = PostgresSchemaProvider::new(
                Arc::clone(&self.catalog),
                schema_name.clone(),
            ).await?;

            schema_map.insert(
                schema_name,
                Arc::new(schema_provider) as Arc<dyn SchemaProvider>,
            );
        }

        Ok(())
    }
}

impl CatalogProvider for PostgresCatalogProvider {
    fn as_any(&self) -> &dyn std::any::Any {
        self
    }

    fn schema_names(&self) -> Vec<String> {
        let schemas = self.schemas.blocking_read();
        schemas.keys().cloned().collect()
    }

    fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
        let schemas = self.schemas.blocking_read();
        schemas.get(name).cloned()
    }
}

Implement SchemaProvider

use datafusion::catalog::SchemaProvider;
use datafusion::datasource::TableProvider;

pub struct PostgresSchemaProvider {
    catalog: Arc<Catalog>,
    schema_name: String,
    tables: Arc<RwLock<HashMap<String, Arc<dyn TableProvider>>>>,
}

impl PostgresSchemaProvider {
    pub async fn new(
        catalog: Arc<Catalog>,
        schema_name: String,
    ) -> Result<Self> {
        let provider = Self {
            catalog: Arc::clone(&catalog),
            schema_name: schema_name.clone(),
            tables: Arc::new(RwLock::new(HashMap::new())),
        };

        // Discover tables in this schema
        provider.discover_tables().await?;

        Ok(provider)
    }

    async fn discover_tables(&self) -> Result<()> {
        let conn = self.connect().await?;

        // Query for tables in schema
        let tables = conn.query(
            "SELECT table_name FROM information_schema.tables 
             WHERE table_schema = $1 AND table_type = 'BASE TABLE'",
            &[&self.schema_name],
        ).await?;

        let mut table_map = self.tables.write().await;

        for row in tables {
            let table_name: String = row.get(0);

            // Apply include filters
            if !self.should_include_table(&table_name) {
                continue;
            }

            // Create TableProvider for this table
            let table_provider = self.create_table_provider(
                &self.schema_name,
                &table_name,
            ).await?;

            table_map.insert(table_name, table_provider);
        }

        Ok(())
    }

    fn should_include_table(&self, table_name: &str) -> bool {
        match &self.catalog.include {
            Some(globset) => globset.is_match(table_name),
            None => true, // Include all tables if no filter
        }
    }

    async fn create_table_provider(
        &self,
        schema_name: &str,
        table_name: &str,
    ) -> Result<Arc<dyn TableProvider>> {
        // Create a TableProvider for this specific table
        let table_ref = format!("{}.{}", schema_name, table_name);

        let factory = PostgresFactory::new();
        let params = build_connector_params(
            Arc::clone(&self.catalog),
            &table_ref,
        )?;

        let connector = factory.create(params).await?;

        // Build temporary dataset for table provider creation
        let dataset = build_catalog_dataset(
            Arc::clone(&self.catalog),
            schema_name,
            table_name,
        );

        connector.read_provider(&dataset).await
            .map_err(Into::into)
    }
}

#[async_trait]
impl SchemaProvider for PostgresSchemaProvider {
    fn as_any(&self) -> &dyn std::any::Any {
        self
    }

    fn table_names(&self) -> Vec<String> {
        let tables = self.tables.blocking_read();
        tables.keys().cloned().collect()
    }

    async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
        let tables = self.tables.read().await;
        tables.get(name).cloned()
    }

    fn table_exist(&self, name: &str) -> bool {
        let tables = self.tables.blocking_read();
        tables.contains_key(name)
    }
}

Helper Functions

Build Connector Parameters

use runtime::dataconnector::parameters::ConnectorParamsBuilder;

fn build_connector_params(
    catalog: Arc<Catalog>,
    table_ref: &str,
) -> Result<ConnectorParams> {
    // Create synthetic dataset from catalog config
    let dataset = build_catalog_dataset(
        Arc::clone(&catalog),
        schema_name,
        table_name,
    );

    let mut builder = ConnectorParamsBuilder::new(
        catalog.provider.clone(),
        ConnectorComponent::from(&dataset),
    );

    // Merge catalog params with dataset params
    let mut all_params = catalog.params.clone();
    all_params.extend(catalog.dataset_params.clone());

    for (key, value) in all_params {
        builder = builder.with_param(key, value);
    }

    builder.build(secrets, tokio_handle).await
}

Build Synthetic Dataset

use runtime::component::dataset::Dataset;
use datafusion::sql::TableReference;

fn build_catalog_dataset(
    catalog: Arc<Catalog>,
    schema_name: &str,
    table_name: &str,
) -> Dataset {
    let table_ref = TableReference::partial(
        schema_name.to_string(),
        table_name.to_string(),
    );

    Dataset {
        from: format!("{}:{}", catalog.provider, table_ref),
        name: table_ref,
        params: catalog.dataset_params.clone(),
        app: Arc::clone(&catalog.app),
        runtime: Arc::clone(&catalog.runtime),
        // ... other fields with defaults
    }
}

Complete Example: MySQL Catalog

use async_trait::async_trait;
use datafusion::catalog::{CatalogProvider, SchemaProvider};
use runtime::component::catalog::Catalog;
use std::sync::Arc;

pub struct MySQLCatalogProvider {
    catalog: Arc<Catalog>,
    pool: Pool<MySqlConnectionManager>,
    schemas: Arc<RwLock<HashMap<String, Arc<dyn SchemaProvider>>>>,
}

impl MySQLCatalogProvider {
    pub async fn new(catalog: Arc<Catalog>) -> Result<Self> {
        // Create connection pool from catalog params
        let pool = create_mysql_pool(&catalog.params).await?;

        let provider = Self {
            catalog: Arc::clone(&catalog),
            pool,
            schemas: Arc::new(RwLock::new(HashMap::new())),
        };

        provider.discover_schemas().await?;

        Ok(provider)
    }

    async fn discover_schemas(&self) -> Result<()> {
        let mut conn = self.pool.get().await?;

        let schemas: Vec<String> = conn
            .query("SHOW DATABASES")
            .await?
            .into_iter()
            .map(|row| row.get(0))
            .collect();

        let mut schema_map = self.schemas.write().await;

        for schema_name in schemas {
            // Skip system schemas
            if ["information_schema", "mysql", "performance_schema", "sys"]
                .contains(&schema_name.as_str())
            {
                continue;
            }

            let schema_provider = MySQLSchemaProvider::new(
                Arc::clone(&self.catalog),
                Arc::clone(&self.pool),
                schema_name.clone(),
            )
            .await?;

            schema_map.insert(
                schema_name,
                Arc::new(schema_provider) as Arc<dyn SchemaProvider>,
            );
        }

        Ok(())
    }
}

impl CatalogProvider for MySQLCatalogProvider {
    fn as_any(&self) -> &dyn std::any::Any {
        self
    }

    fn schema_names(&self) -> Vec<String> {
        let schemas = self.schemas.blocking_read();
        schemas.keys().cloned().collect()
    }

    fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
        let schemas = self.schemas.blocking_read();
        schemas.get(name).cloned()
    }
}

Pattern Filtering

Glob Pattern Matching

use globset::{Glob, GlobSetBuilder};

fn build_include_filter(patterns: &[String]) -> Result<GlobSet> {
    let mut builder = GlobSetBuilder::new();

    for pattern in patterns {
        let glob = Glob::new(pattern)?;
        builder.add(glob);
    }

    builder.build().map_err(Into::into)
}

// Usage
let include = build_include_filter(&[
    "users*".to_string(),
    "orders".to_string(),
    "products_*".to_string(),
])?;

if include.is_match("users") {
    // Table matches
}

Complex Filtering

catalogs:
  - from: postgres:public
    name: pg_catalog
    include:
      - "fact_*"       # Include all fact tables
      - "dim_*"        # Include all dimension tables
      - "!*_temp"      # Exclude temporary tables
      - "!*_backup"    # Exclude backup tables

Access Modes

Read-Only Catalogs

use runtime::component::access::AccessMode;

if catalog.access == AccessMode::Read {
    // Use read_provider() only
    connector.read_provider(&dataset).await?
}

Read-Write Catalogs

if catalog.access == AccessMode::ReadWrite {
    // Use read_write_provider() if available
    if let Some(result) = connector.read_write_provider(&dataset).await {
        result?
    } else {
        connector.read_provider(&dataset).await?
    }
}

Best Practices

DO:

  • ✅ Use connection pooling for catalog discovery
  • ✅ Cache discovered schemas and tables
  • ✅ Filter tables with glob patterns
  • ✅ Skip system schemas/tables
  • ✅ Handle schema evolution dynamically
  • ✅ Log discovered tables at INFO level
  • ✅ Apply dataset_params to all tables
  • ✅ Support both qualified and unqualified names

DON’T:

  • ❌ Discover tables on every query
  • ❌ Include system/internal schemas
  • ❌ Block on schema discovery
  • ❌ Ignore include/exclude patterns
  • ❌ Create new connections per table
  • ❌ Panic on missing schemas
  • ❌ Skip error handling
  • ❌ Forget to apply access modes

Error Handling

use snafu::prelude::*;

#[derive(Debug, Snafu)]
pub enum CatalogError {
    #[snafu(display("Failed to discover schemas: {source}"))]
    SchemaDiscoveryFailed { source: sqlx::Error },

    #[snafu(display("Failed to discover tables in schema {schema}: {source}"))]
    TableDiscoveryFailed {
        schema: String,
        source: sqlx::Error,
    },

    #[snafu(display("Schema {schema} not found"))]
    SchemaNotFound { schema: String },

    #[snafu(display("Table {table} not found in schema {schema}"))]
    TableNotFound { schema: String, table: String },
}

Testing

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn test_catalog_discovery() {
        let catalog = create_test_catalog();
        let provider = PostgresCatalogProvider::new(catalog)
            .await
            .expect("should create");

        let schemas = provider.schema_names();
        assert!(schemas.contains(&"public".to_string()));

        let public_schema = provider.schema("public")
            .expect("should have public schema");

        let tables = public_schema.table_names();
        assert!(!tables.is_empty());
    }

    #[tokio::test]
    async fn test_include_filters() {
        let catalog = create_catalog_with_filters(vec![
            "users*".to_string(),
        ]);

        let provider = PostgresCatalogProvider::new(catalog)
            .await
            .expect("should create");

        let schema = provider.schema("public").unwrap();
        let tables = schema.table_names();

        assert!(tables.iter().all(|t| t.starts_with("users")));
    }
}

Next Steps