Documentation Index
Fetch the complete documentation index at: https://mintlify.com/spiceai/spiceai/llms.txt
Use this file to discover all available pages before exploring further.
Custom data accelerators allow you to implement local storage engines for accelerated data access. This guide covers the architecture, trait requirements, and implementation patterns for building accelerators.
Architecture Overview
Data accelerators provide local caching and materialization of federated data sources. The accelerator system consists of:
- DataAccelerator: Creates and manages accelerated tables
- TableProvider: DataFusion interface for reads/writes
- Registration: Automatic discovery via linkme-based plugin system
┌───────────────────────────────────────────────┐
│ Spice Runtime │
│ ┌─────────────────────────────────────────┐ │
│ │ Accelerator Engine Registry │ │
│ │ (linkme distributed slice) │ │
│ └─────────────────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────┐ │
│ │ DataAccelerator │ │
│ │ - create_external_table() │ │
│ │ - init() │ │
│ │ - acceleration_layout() │ │
│ └─────────────────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────┐ │
│ │ TableProvider │ │
│ │ - scan() (read data) │ │
│ │ - insert_into() (write data) │ │
│ └─────────────────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────┐ │
│ │ Storage Engine │ │
│ │ (DuckDB, SQLite, Arrow, etc.) │ │
│ └─────────────────────────────────────────┘ │
└───────────────────────────────────────────────┘
DataAccelerator Trait
Required Methods
use async_trait::async_trait;
use datafusion::{
datasource::TableProvider,
execution::runtime_env::RuntimeEnv,
logical_expr::CreateExternalTable,
};
use runtime::dataaccelerator::{
DataAccelerator, AccelerationSource, BootstrapStatus, AccelerationLayout,
};
use runtime::parameters::ParameterSpec;
use runtime_table_partition::expression::PartitionedBy;
use std::{any::Any, sync::Arc};
#[async_trait]
pub trait DataAccelerator: Send + Sync {
/// Returns reference for downcasting
fn as_any(&self) -> &dyn Any;
/// Creates a new accelerated table (REQUIRED)
async fn create_external_table(
&self,
cmd: CreateExternalTable,
source: Option<&dyn AccelerationSource>,
partition_by: Vec<PartitionedBy>,
runtime_env: Option<Arc<RuntimeEnv>>,
) -> Result<Arc<dyn TableProvider>, Box<dyn std::error::Error + Send + Sync>>;
/// Name of the accelerator
fn name(&self) -> &'static str;
/// Parameter prefix (e.g., "duckdb")
fn prefix(&self) -> &'static str;
/// List of accepted parameters
fn parameters(&self) -> &'static [ParameterSpec];
/// Storage layout for snapshots and metrics
fn acceleration_layout(&self, source: &dyn AccelerationSource) -> AccelerationLayout {
if let Ok(path) = self.file_path(source) {
AccelerationLayout::file(PathBuf::from(path))
} else {
AccelerationLayout::default()
}
}
/// Initialize accelerator (e.g., download snapshots)
async fn init(
&self,
source: &dyn AccelerationSource,
) -> Result<BootstrapStatus, Box<dyn std::error::Error + Send + Sync>> {
Ok(BootstrapStatus::none())
}
/// Check if accelerator is initialized
fn is_initialized(&self, source: &dyn AccelerationSource) -> bool {
true
}
/// Valid file extensions for file-based accelerators
fn valid_file_extensions(&self) -> Vec<&'static str> {
vec![]
}
/// File path for file-based accelerators
fn file_path(&self, source: &dyn AccelerationSource) -> Result<String, FilePathError> {
Err(FilePathError::FileModeUnsupported {})
}
/// Shutdown hook for cleanup
async fn shutdown(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
Ok(())
}
}
Implementation Example: DuckDB Accelerator
Accelerator Implementation
use async_trait::async_trait;
use datafusion::{
datasource::TableProvider,
logical_expr::CreateExternalTable,
};
use runtime::dataaccelerator::{DataAccelerator, AccelerationSource};
use runtime::parameters::ParameterSpec;
use std::sync::Arc;
pub struct DuckDBAccelerator {
// Shared state if needed
}
impl DuckDBAccelerator {
pub fn new() -> Self {
Self {}
}
}
#[async_trait]
impl DataAccelerator for DuckDBAccelerator {
fn as_any(&self) -> &dyn std::any::Any {
self
}
async fn create_external_table(
&self,
cmd: CreateExternalTable,
source: Option<&dyn AccelerationSource>,
partition_by: Vec<PartitionedBy>,
runtime_env: Option<Arc<RuntimeEnv>>,
) -> Result<Arc<dyn TableProvider>, Box<dyn std::error::Error + Send + Sync>> {
// Extract configuration from cmd.options
let mode = cmd.options.get("mode")
.map(|s| s.as_str())
.unwrap_or("memory");
let file_path = if mode == "file" {
Some(self.file_path(source.ok_or("Source required")?)?)
} else {
None
};
// Create DuckDB connection
let conn = if let Some(path) = file_path {
duckdb::Connection::open(path)?
} else {
duckdb::Connection::open_in_memory()?
};
// Create table provider
let provider = DuckDBTableProvider::create(
conn,
cmd.name,
cmd.schema.as_arrow(),
cmd.options,
)?;
Ok(Arc::new(provider) as Arc<dyn TableProvider>)
}
fn name(&self) -> &'static str {
"duckdb"
}
fn prefix(&self) -> &'static str {
"duckdb"
}
fn parameters(&self) -> &'static [ParameterSpec] {
&[
ParameterSpec::accelerator("file")
.description("DuckDB database file path"),
ParameterSpec::accelerator("threads")
.description("Number of threads")
.default(Some("4")),
]
}
fn valid_file_extensions(&self) -> Vec<&'static str> {
vec!["db", "duckdb"]
}
fn file_path(&self, source: &dyn AccelerationSource) -> Result<String, FilePathError> {
let acceleration = source.acceleration()
.context(AccelerationNotEnabledSnafu)?;
if acceleration.mode != Mode::File {
return Err(FilePathError::FileModeUnsupported {});
}
// Check for custom file parameter
if let Some(custom_path) = acceleration.params.get("duckdb_file") {
return Ok(custom_path.clone());
}
// Generate default path
let data_dir = spice_data_base_path();
let table_name = source.name().table().replace('.', "_");
Ok(format!("{}/{}.duckdb", data_dir, table_name))
}
}
TableProvider Implementation
The TableProvider implements DataFusion’s interface:
use async_trait::async_trait;
use datafusion::{
datasource::{TableProvider, TableType},
execution::{SendableRecordBatchStream, context::SessionState},
logical_expr::Expr,
physical_plan::ExecutionPlan,
};
use arrow::datatypes::SchemaRef;
pub struct DuckDBTableProvider {
conn: Arc<Mutex<duckdb::Connection>>,
table_name: String,
schema: SchemaRef,
}
#[async_trait]
impl TableProvider for DuckDBTableProvider {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
fn table_type(&self) -> TableType {
TableType::Base
}
async fn scan(
&self,
state: &SessionState,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
// Create execution plan for reading
let exec = DuckDBExec::new(
Arc::clone(&self.conn),
&self.table_name,
Arc::clone(&self.schema),
projection.cloned(),
filters.to_vec(),
limit,
)?;
Ok(Arc::new(exec))
}
async fn insert_into(
&self,
state: &SessionState,
input: Arc<dyn ExecutionPlan>,
overwrite: bool,
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
// Create execution plan for writing
let insert_exec = DuckDBInsertExec::new(
Arc::clone(&self.conn),
&self.table_name,
input,
overwrite,
)?;
Ok(Arc::new(insert_exec))
}
}
Registration with Linkme
The register_data_accelerator! macro automatically registers your accelerator:
use runtime::register_data_accelerator;
use runtime::component::dataset::acceleration::Engine;
// Simple form
register_data_accelerator!(Engine::DuckDB, DuckDBAccelerator);
// Explicit form
register_data_accelerator!(
register_duckdb_accelerator,
DUCKDB_ACCELERATOR_REGISTRATION,
Engine::DuckDB,
DuckDBAccelerator
);
How Registration Works
// Expanded macro (simplified)
fn register_duckdb_accelerator() -> Arc<dyn DataAccelerator> {
Arc::new(DuckDBAccelerator::new())
}
#[linkme::distributed_slice(DATA_ACCELERATOR_REGISTRATIONS)]
pub static DUCKDB_ACCELERATOR_REGISTRATION: AcceleratorRegistration =
AcceleratorRegistration::new(Engine::DuckDB, register_duckdb_accelerator);
Acceleration Modes
Memory Mode
In-memory storage, fastest but not persistent:
if mode == "memory" {
let conn = duckdb::Connection::open_in_memory()?;
// Configure for performance
}
File Mode
Persistent file-based storage:
if mode == "file" {
let file_path = self.file_path(source)?;
let conn = duckdb::Connection::open(file_path)?;
// Configure durability settings
}
Storage Layouts
File-Based Layout
Single file per accelerated table:
fn acceleration_layout(&self, source: &dyn AccelerationSource) -> AccelerationLayout {
let path = self.file_path(source)
.unwrap_or_else(|_| "/tmp/default.db".to_string());
AccelerationLayout::file(PathBuf::from(path))
}
Directory-Based Layout (Cayenne)
Multiple files organized in directories:
fn acceleration_layout(&self, source: &dyn AccelerationSource) -> AccelerationLayout {
let data_dir = get_cayenne_data_dir(source);
let metadata_dir = get_cayenne_metadata_dir(source);
AccelerationLayout::cayenne(
PathBuf::from(data_dir),
PathBuf::from(metadata_dir),
)
}
Snapshot Support
Implement init() to support snapshot bootstrapping:
use runtime::dataaccelerator::BootstrapStatus;
use runtime_acceleration::snapshot::{
SnapshotDownloadInfo, download_snapshot,
};
async fn init(
&self,
source: &dyn AccelerationSource,
) -> Result<BootstrapStatus, Box<dyn std::error::Error + Send + Sync>> {
let acceleration = source.acceleration()
.ok_or("Acceleration not enabled")?;
// Check if snapshot URL is configured
let snapshot_url = match acceleration.params.get("snapshot_url") {
Some(url) => url,
None => return Ok(BootstrapStatus::none()),
};
// Download snapshot to file path
let file_path = self.file_path(source)?;
let info = download_snapshot(snapshot_url, &file_path).await?;
tracing::info!(
"Initialized {} from snapshot (last updated: {:?})",
source.name(),
info.last_updated_at
);
Ok(BootstrapStatus::bootstrapped(info))
}
fn is_initialized(&self, source: &dyn AccelerationSource) -> bool {
if let Ok(path) = self.file_path(source) {
std::path::Path::new(&path).exists()
} else {
false
}
}
Advanced Features
Partitioning Support
Handle partitioned data:
async fn create_external_table(
&self,
cmd: CreateExternalTable,
source: Option<&dyn AccelerationSource>,
partition_by: Vec<PartitionedBy>,
runtime_env: Option<Arc<RuntimeEnv>>,
) -> Result<Arc<dyn TableProvider>, Box<dyn std::error::Error + Send + Sync>> {
// Create partitioned table if partition_by is not empty
if !partition_by.is_empty() {
let partition_cols: Vec<String> = partition_by
.iter()
.map(|p| p.column_name().to_string())
.collect();
return self.create_partitioned_table(
cmd,
partition_cols,
runtime_env,
).await;
}
// Otherwise create standard table
self.create_standard_table(cmd, runtime_env).await
}
Upsert Support
Handle conflict resolution:
use datafusion_table_providers::util::on_conflict::OnConflict;
// Extract upsert configuration
let on_conflict = cmd.options.get("on_conflict")
.and_then(|s| OnConflict::try_from(s.as_str()).ok());
if let Some(OnConflict::Upsert(key_columns)) = on_conflict {
// Configure primary key constraint
let pk_columns = key_columns.column_names();
create_table_with_pk(&conn, table_name, schema, pk_columns)?;
}
Index Support
Create indexes for faster queries:
use runtime::component::dataset::acceleration::IndexType;
// Extract index configuration
let indexes_str = cmd.options.get("indexes");
if let Some(indexes) = parse_indexes(indexes_str) {
for (column_ref, index_type) in indexes {
match index_type {
IndexType::Enabled => {
create_index(&conn, table_name, column_ref.column_names())?;
}
IndexType::Unique => {
create_unique_index(&conn, table_name, column_ref.column_names())?;
}
}
}
}
Connection Pooling
use deadpool::managed::Pool;
pub struct PooledAccelerator {
pool: Pool<ConnectionManager>,
}
impl PooledAccelerator {
async fn get_connection(&self) -> Result<Connection> {
self.pool.get().await.map_err(Into::into)
}
}
Zero-Copy Operations
use arrow::record_batch::RecordBatch;
// Prefer Arrow native operations
fn insert_batch(&self, batch: RecordBatch) -> Result<()> {
// Use batch.slice() instead of copying
let subset = batch.slice(offset, length);
// Write directly without intermediate allocations
self.write_arrow_batch(subset)?;
Ok(())
}
Batch Processing
const BATCH_SIZE: usize = 8192;
while let Some(batch) = stream.next().await {
let batch = batch?;
if batch.num_rows() >= BATCH_SIZE {
self.write_batch(batch).await?;
} else {
buffer.append(batch);
if buffer.len() >= BATCH_SIZE {
self.write_batch(buffer.drain()).await?;
}
}
}
Best Practices
DO:
- ✅ Support both memory and file modes
- ✅ Implement connection pooling for shared resources
- ✅ Use zero-copy Arrow operations
- ✅ Batch writes for better throughput
- ✅ Support snapshot initialization
- ✅ Handle schema evolution gracefully
- ✅ Implement proper shutdown cleanup
- ✅ Use SIMD-optimized Arrow kernels
DON’T:
- ❌ Block async runtime with synchronous I/O
- ❌ Copy RecordBatch data unnecessarily
- ❌ Create connections on every operation
- ❌ Use panics for error handling
- ❌ Hold locks across
.await points
- ❌ Ignore schema validation
- ❌ Skip cleanup in shutdown()
- ❌ Perform allocations in hot paths
Testing
#[cfg(test)]
mod tests {
use super::*;
use arrow::datatypes::{DataType, Field, Schema};
use datafusion::logical_expr::CreateExternalTable;
#[tokio::test]
async fn test_create_memory_table() {
let accelerator = DuckDBAccelerator::new();
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("name", DataType::Utf8, false),
]));
let cmd = CreateExternalTable {
schema: schema.to_dfschema_ref().unwrap(),
name: "test_table".into(),
options: HashMap::from([
("mode".to_string(), "memory".to_string()),
]),
// ...
};
let provider = accelerator
.create_external_table(cmd, None, vec![], None)
.await
.expect("should create");
assert_eq!(provider.schema().fields().len(), 2);
}
}
Feature Flags
Cargo.toml:
[dependencies]
duckdb = { version = "1.0", optional = true }
[features]
duckdb = ["dep:duckdb"]
Code:
#[cfg(feature = "duckdb")]
pub mod duckdb;
#[cfg(feature = "duckdb")]
register_data_accelerator!(Engine::DuckDB, DuckDBAccelerator);
Next Steps