Secret stores provide secure storage and retrieval of sensitive data like passwords, API keys, and tokens. This guide covers implementing custom secret stores using theDocumentation Index
Fetch the complete documentation index at: https://mintlify.com/spiceai/spiceai/llms.txt
Use this file to discover all available pages before exploring further.
SecretStore trait.
Architecture Overview
The secret store system provides:- SecretStore Trait: Interface for retrieving secrets
- Secret Injection: Automatic substitution of secret references in configuration
- Precedence Order: Multiple stores searched in priority order
- Type Safety: Secrets wrapped in
SecretStringto prevent accidental logging
┌─────────────────────────────────────────────┐
│ Spice Runtime │
│ ┌───────────────────────────────────────┐ │
│ │ Secrets Registry │ │
│ │ (ordered by precedence) │ │
│ │ 1. custom_store │ │
│ │ 2. kubernetes │ │
│ │ 3. env (default) │ │
│ └───────────────────────────────────────┘ │
│ ↓ │
│ ┌───────────────────────────────────────┐ │
│ │ Secret Injection │ │
│ │ ${ secrets:API_KEY } → actual_value │ │
│ │ ${ env:DB_PASS } → actual_value │ │
│ └───────────────────────────────────────┘ │
│ ↓ │
│ ┌───────────────────────────────────────┐ │
│ │ Component Configuration │ │
│ │ (datasets, models, catalogs) │ │
│ └───────────────────────────────────────┘ │
└─────────────────────────────────────────────┘
SecretStore Trait
Interface Definition
use async_trait::async_trait;
use secrecy::SecretString;
#[async_trait]
pub trait SecretStore: Send + Sync {
/// Retrieves a secret by key
///
/// Returns:
/// - Ok(Some(secret)) if found
/// - Ok(None) if not found
/// - Err(e) if error occurred
async fn get_secret(&self, key: &str) -> AnyErrorResult<Option<SecretString>>;
}
pub type AnyErrorResult<T> =
std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
Key Points
- Async: All secret retrieval is asynchronous
- Optional: Return
Noneif secret doesn’t exist (not an error) - SecretString: Wraps sensitive data to prevent accidental exposure
- Send + Sync: Can be shared across threads safely
Implementation Example: Environment Variables
Basic Implementation
use async_trait::async_trait;
use runtime_secrets::{SecretStore, AnyErrorResult};
use secrecy::SecretString;
use std::env;
pub struct EnvSecretStore {
// Optional: cache for performance
cache: Arc<RwLock<HashMap<String, SecretString>>>,
}
impl EnvSecretStore {
pub fn new() -> Self {
Self {
cache: Arc::new(RwLock::new(HashMap::new())),
}
}
}
#[async_trait]
impl SecretStore for EnvSecretStore {
async fn get_secret(&self, key: &str) -> AnyErrorResult<Option<SecretString>> {
// Check cache first
{
let cache = self.cache.read().await;
if let Some(secret) = cache.get(key) {
return Ok(Some(secret.clone()));
}
}
// Try original key first (exact match)
if let Ok(value) = env::var(key) {
let secret = SecretString::from(value);
self.cache.write().await.insert(key.to_string(), secret.clone());
return Ok(Some(secret));
}
// Try uppercase version as fallback
let upper_key = key.to_uppercase();
if let Ok(value) = env::var(&upper_key) {
let secret = SecretString::from(value);
self.cache.write().await.insert(key.to_string(), secret.clone());
return Ok(Some(secret));
}
// Not found
Ok(None)
}
}
Builder Pattern
pub struct EnvSecretStoreBuilder {
file_path: Option<PathBuf>,
}
impl EnvSecretStoreBuilder {
pub fn new() -> Self {
Self { file_path: None }
}
pub fn with_path(mut self, path: PathBuf) -> Self {
self.file_path = Some(path);
self
}
pub fn build(self) -> EnvSecretStore {
let mut store = EnvSecretStore::new();
// Load from .env file if specified
if let Some(path) = self.file_path {
if let Err(e) = store.load_from_file(&path) {
tracing::warn!("Failed to load secrets from {}: {}", path.display(), e);
}
}
store
}
}
impl EnvSecretStore {
fn load_from_file(&mut self, path: &Path) -> Result<()> {
let contents = std::fs::read_to_string(path)?;
for line in contents.lines() {
if line.starts_with('#') || line.trim().is_empty() {
continue;
}
if let Some((key, value)) = line.split_once('=') {
let key = key.trim();
let value = value.trim();
unsafe {
env::set_var(key, value);
}
}
}
Ok(())
}
}
Advanced Example: AWS Secrets Manager
use async_trait::async_trait;
use aws_sdk_secretsmanager::Client;
use runtime_secrets::{SecretStore, AnyErrorResult};
use secrecy::SecretString;
use snafu::prelude::*;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Failed to initialize AWS Secrets Manager client: {source}"))]
ClientInitFailed {
source: aws_sdk_secretsmanager::Error,
},
#[snafu(display("Failed to retrieve secret {secret_name}: {source}"))]
SecretRetrievalFailed {
secret_name: String,
source: aws_sdk_secretsmanager::Error,
},
#[snafu(display("Secret {secret_name} contains invalid JSON: {source}"))]
InvalidSecretFormat {
secret_name: String,
source: serde_json::Error,
},
}
pub struct AwsSecretsManager {
client: Arc<Client>,
secret_name: String,
cache: Arc<RwLock<Option<HashMap<String, SecretString>>>>,
}
impl AwsSecretsManager {
pub fn new(secret_name: String) -> Self {
Self {
client: Arc::new(Client::new(&aws_config::load_from_env().await)),
secret_name,
cache: Arc::new(RwLock::new(None)),
}
}
pub async fn init(&self) -> Result<(), Error> {
// Verify connection and load initial secrets
self.refresh_cache().await?;
Ok(())
}
async fn refresh_cache(&self) -> Result<(), Error> {
let response = self
.client
.get_secret_value()
.secret_id(&self.secret_name)
.send()
.await
.context(SecretRetrievalFailedSnafu {
secret_name: &self.secret_name,
})?;
let secret_string = response
.secret_string()
.ok_or_else(|| Error::InvalidSecretFormat {
secret_name: self.secret_name.clone(),
source: serde_json::Error::custom("No secret string"),
})?;
let secrets: HashMap<String, String> = serde_json::from_str(secret_string)
.context(InvalidSecretFormatSnafu {
secret_name: &self.secret_name,
})?;
let secret_map: HashMap<String, SecretString> = secrets
.into_iter()
.map(|(k, v)| (k, SecretString::from(v)))
.collect();
let mut cache = self.cache.write().await;
*cache = Some(secret_map);
Ok(())
}
}
#[async_trait]
impl SecretStore for AwsSecretsManager {
async fn get_secret(&self, key: &str) -> AnyErrorResult<Option<SecretString>> {
// Ensure cache is loaded
{
let cache = self.cache.read().await;
if cache.is_none() {
drop(cache);
self.refresh_cache().await?;
}
}
// Retrieve from cache
let cache = self.cache.read().await;
let secrets = cache.as_ref().ok_or("Cache not initialized")?;
Ok(secrets.get(key).cloned())
}
}
Complete Example: Kubernetes Secrets
use async_trait::async_trait;
use k8s_openapi::api::core::v1::Secret;
use kube::{Api, Client};
use runtime_secrets::{SecretStore, AnyErrorResult};
use secrecy::SecretString;
use std::collections::BTreeMap;
pub struct KubernetesSecretStore {
client: Api<Secret>,
secret_name: String,
namespace: String,
cache: Arc<RwLock<Option<HashMap<String, SecretString>>>>,
}
impl KubernetesSecretStore {
pub fn new(secret_name: String) -> Self {
Self {
client: Api::default_namespaced(Client::try_default().await.unwrap()),
secret_name,
namespace: "default".to_string(),
cache: Arc::new(RwLock::new(None)),
}
}
pub fn with_namespace(mut self, namespace: String) -> Self {
self.namespace = namespace;
self.client = Api::namespaced(
Client::try_default().await.unwrap(),
&self.namespace,
);
self
}
pub async fn init(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Load secret from Kubernetes
self.refresh_cache().await?;
Ok(())
}
async fn refresh_cache(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let secret = self.client.get(&self.secret_name).await?;
let data = secret.data.ok_or("Secret has no data")?;
let secrets: HashMap<String, SecretString> = data
.into_iter()
.filter_map(|(key, byte_string)| {
String::from_utf8(byte_string.0)
.ok()
.map(|value| (key, SecretString::from(value)))
})
.collect();
let mut cache = self.cache.write().await;
*cache = Some(secrets);
Ok(())
}
}
#[async_trait]
impl SecretStore for KubernetesSecretStore {
async fn get_secret(&self, key: &str) -> AnyErrorResult<Option<SecretString>> {
// Ensure cache is loaded
{
let cache = self.cache.read().await;
if cache.is_none() {
drop(cache);
self.refresh_cache().await?;
}
}
// Retrieve from cache
let cache = self.cache.read().await;
let secrets = cache.as_ref().ok_or("Cache not initialized")?;
Ok(secrets.get(key).cloned())
}
}
Registration and Loading
Store Type Enumeration
pub enum SecretStoreType {
Env,
EnvCustomPath(String),
#[cfg(feature = "keyring-secret-store")]
Keyring,
Kubernetes(String),
#[cfg(feature = "aws-secrets-manager")]
AwsSecretsManager(String),
}
Loading Secret Stores
use runtime_secrets::{Secrets, SecretStoreType};
use spicepod::component::secret::Secret as SpicepodSecret;
impl Secrets {
pub async fn load_from(&mut self, secrets: &[SpicepodSecret]) -> Result<()> {
self.stores.clear();
for secret in secrets {
let store_type = spicepod_secret_store_type(secret)?;
let secret_store = match load_secret_store(store_type).await {
Ok(store) => store,
Err(e) => {
tracing::error!("Error loading secret store {}: {}", secret.name, e);
continue;
}
};
self.stores.insert(secret.name.clone(), secret_store);
}
// Add default env store if no stores configured
if self.stores.is_empty() {
let default_store = Arc::new(EnvSecretStore::new());
self.stores.insert("env".to_string(), default_store);
}
// Reverse order for precedence
self.stores.reverse();
Ok(())
}
}
async fn load_secret_store(
store_type: SecretStoreType,
) -> Result<Arc<dyn SecretStore>> {
match store_type {
SecretStoreType::Env => {
Ok(Arc::new(EnvSecretStore::new()))
}
SecretStoreType::EnvCustomPath(path) => {
Ok(Arc::new(
EnvSecretStoreBuilder::new()
.with_path(path.into())
.build()
))
}
#[cfg(feature = "keyring-secret-store")]
SecretStoreType::Keyring => {
Ok(Arc::new(KeyringSecretStore::new()))
}
SecretStoreType::Kubernetes(secret_name) => {
let mut store = KubernetesSecretStore::new(secret_name);
store.init().await?;
Ok(Arc::new(store))
}
#[cfg(feature = "aws-secrets-manager")]
SecretStoreType::AwsSecretsManager(secret_name) => {
let store = AwsSecretsManager::new(secret_name);
store.init().await?;
Ok(Arc::new(store))
}
}
}
Secret Injection
Template Syntax
Secrets are referenced using the syntax:${ store:key }
datasets:
- from: postgres:public.users
name: users
params:
pg_host: localhost
pg_user: ${ env:PG_USER }
pg_pass: ${ secrets:PG_PASSWORD }
api_key: ${ aws:OPENAI_API_KEY }
Injection Implementation
impl Secrets {
pub async fn inject_secrets(&self, key: &str, param_str: ParamStr<'_>) -> SecretString {
let mut result = String::new();
let mut last_end = 0;
for secret_replacement in SecretReplacementMatcher::new(param_str.0) {
// Append text before match
result.push_str(¶m_str.0[last_end..secret_replacement.span.start]);
// Get secret value
let secret = self
.get_store_secret(
¶m_str,
&secret_replacement.store_name,
&secret_replacement.key,
)
.await
.unwrap_or_default();
// Replace token with secret
result.push_str(&secret);
last_end = secret_replacement.span.end;
}
// Append remaining text
result.push_str(¶m_str.0[last_end..]);
SecretString::from(result)
}
async fn get_store_secret(
&self,
param_str: &ParamStr<'_>,
store_name: &str,
key: &str,
) -> Option<String> {
// Special case: search all stores in precedence order
if store_name == "secrets" {
return match self.get_secret(key).await {
Ok(Some(secret)) => Some(secret.expose_secret().to_string()),
Ok(None) => {
tracing::error!("Key '{}' not found in any secrets", key);
None
}
Err(e) => {
tracing::error!("Error getting secret: {}", e);
None
}
};
}
// Get from specific store
let store = self.stores.get(store_name)?;
match store.get_secret(key).await {
Ok(Some(secret)) => Some(secret.expose_secret().to_string()),
Ok(None) => {
tracing::error!("Key {} not found in store: {}", key, store_name);
None
}
Err(e) => {
tracing::error!("Error getting secret: {}", e);
None
}
}
}
}
Spicepod Configuration
Defining Secret Stores
version: v1beta1
kind: Spicepod
name: my_app
secrets:
# Environment variables (default)
- name: env
from: env
# Custom .env file
- name: dotenv
from: env
params:
file_path: /path/to/.env
# Kubernetes secret
- name: k8s
from: kubernetes:my-secret-name
# AWS Secrets Manager
- name: aws
from: aws_secrets_manager:prod/myapp/secrets
datasets:
- from: postgres:public.users
name: users
params:
# Uses precedence order: k8s → aws → env
pg_pass: ${ secrets:DB_PASSWORD }
# Explicit store reference
api_key: ${ aws:OPENAI_API_KEY }
Best Practices
DO:
- ✅ Always return
Nonefor missing secrets (not error) - ✅ Use
SecretStringto prevent accidental logging - ✅ Cache secrets for performance
- ✅ Implement async initialization in
init() - ✅ Log errors at appropriate levels
- ✅ Support multiple secret formats (JSON, key-value)
- ✅ Validate secrets on load, not on retrieval
- ✅ Handle Unicode and special characters
DON’T:
- ❌ Log secret values (even in debug mode)
- ❌ Return errors for missing secrets
- ❌ Block async runtime with synchronous I/O
- ❌ Store secrets in plain String
- ❌ Skip error handling
- ❌ Fetch secrets on every access
- ❌ Expose
.expose_secret()unnecessarily - ❌ Use panics for error handling
Testing
#[cfg(test)]
mod tests {
use super::*;
use secrecy::ExposeSecret;
#[tokio::test]
async fn test_secret_store() {
let store = EnvSecretStore::new();
// Set test secret
std::env::set_var("TEST_SECRET", "secret_value");
let result = store.get_secret("TEST_SECRET").await;
assert!(result.is_ok());
let secret = result.unwrap();
assert!(secret.is_some());
assert_eq!(secret.unwrap().expose_secret(), "secret_value");
// Cleanup
std::env::remove_var("TEST_SECRET");
}
#[tokio::test]
async fn test_secret_not_found() {
let store = EnvSecretStore::new();
let result = store.get_secret("NONEXISTENT").await;
assert!(result.is_ok());
assert!(result.unwrap().is_none());
}
#[tokio::test]
async fn test_secret_injection() {
let mut secrets = Secrets::new();
secrets.load_from(&[]).await.unwrap();
std::env::set_var("MY_KEY", "secret123");
let result = secrets
.inject_secrets("test", ParamStr("api_key=${ env:MY_KEY }"))
.await;
assert_eq!(result.expose_secret(), "api_key=secret123");
std::env::remove_var("MY_KEY");
}
}
Feature Flags
Cargo.toml:[dependencies]
aws-sdk-secretsmanager = { version = "1.0", optional = true }
kube = { version = "0.87", optional = true }
[features]
aws-secrets-manager = ["dep:aws-sdk-secretsmanager"]
keyring-secret-store = ["dep:keyring"]
#[cfg(feature = "aws-secrets-manager")]
pub mod aws_secrets_manager;
#[cfg(feature = "keyring-secret-store")]
pub mod keyring;
Security Considerations
- Never log secret values, even in debug mode
- Use
SecretStringto wrap all sensitive data - Clear secrets from memory when no longer needed
- Rotate secrets regularly
- Use encrypted connections to secret backends
- Validate secret permissions at runtime
- Audit secret access patterns
- Use least-privilege principles
Next Steps
- Building Custom Connectors - Data source integration
- Building Custom Accelerators - Local storage engines
- Secret Store Reference - API docs