Documentation Index
Fetch the complete documentation index at: https://mintlify.com/spiceai/spiceai/llms.txt
Use this file to discover all available pages before exploring further.
Arrow Flight SQL is a specialized protocol built on Arrow Flight for database-style SQL query execution. It provides metadata discovery, prepared statements, and efficient query execution.
Overview
Arrow Flight SQL extends Arrow Flight with:
- SQL-specific commands (GetCatalogs, GetSchemas, GetTables)
- Prepared statements with parameter binding
- Metadata discovery for databases, tables, and schemas
- Type information (GetXdbcTypeInfo)
- Efficient data transfer using Arrow columnar format
Connection
Default Endpoint
Default Ports
| Service | Port | Description |
|---|
| HTTP API | 8090 | SQL queries, health checks |
| Arrow Flight | 50051 | Flight and Flight SQL |
| Metrics | 9090 | Prometheus metrics |
Authentication
Basic Authentication
from adbc_driver_flightsql import dbapi
conn = dbapi.connect(
uri="grpc://localhost:50051",
username="your-username",
password="your-password"
)
Bearer Token (API Key)
from adbc_driver_flightsql import dbapi
conn = dbapi.connect(
uri="grpc://localhost:50051",
db_kwargs={
"adbc.flight.sql.authorization_header": "Bearer your-api-key"
}
)
Python Client (ADBC)
Installation
pip install adbc-driver-flightsql pyarrow
Basic Query
from adbc_driver_flightsql import dbapi
conn = dbapi.connect(uri="grpc://localhost:50051")
cursor = conn.cursor()
cursor.execute("SELECT * FROM taxi_trips LIMIT 10")
result = cursor.fetch_arrow_table()
print(result.to_pandas())
cursor.close()
conn.close()
Prepared Statements
Prepared statements provide better performance and prevent SQL injection:
from adbc_driver_flightsql import dbapi
conn = dbapi.connect(uri="grpc://localhost:50051")
cursor = conn.cursor()
# Prepared statement with positional parameters
query = "SELECT * FROM taxi_trips WHERE passenger_count = ? LIMIT ?"
cursor.execute(query, parameters=[2, 10])
result = cursor.fetch_arrow_table()
print(result)
# Reuse prepared statement with different parameters
cursor.execute(query, parameters=[3, 20])
result = cursor.fetch_arrow_table()
print(result)
cursor.close()
conn.close()
Streaming Large Results
from adbc_driver_flightsql import dbapi
conn = dbapi.connect(uri="grpc://localhost:50051")
cursor = conn.cursor()
cursor.execute("SELECT * FROM large_table")
# Stream results in batches
while True:
batch = cursor.fetch_record_batch()
if batch is None:
break
print(f"Processing batch with {len(batch)} rows")
# Process batch...
cursor.close()
conn.close()
from adbc_driver_flightsql import dbapi
conn = dbapi.connect(uri="grpc://localhost:50051")
cursor = conn.cursor()
# List all tables
cursor.execute("SHOW TABLES")
tables = cursor.fetch_arrow_table()
print(tables.to_pandas())
# Get table schema
cursor.execute("DESCRIBE taxi_trips")
schema = cursor.fetch_arrow_table()
print(schema.to_pandas())
cursor.close()
conn.close()
Python Client (PyArrow Flight SQL)
Installation
Basic Usage
import pyarrow.flight as flight
# Connect to Flight SQL server
client = flight.FlightClient("grpc://localhost:50051")
# Authenticate
token = client.authenticate_basic_token("username", "password")
options = flight.FlightCallOptions(headers=[(b"authorization", token)])
# Execute SQL query
from pyarrow.flight import FlightDescriptor
sql_command = '{"CommandStatementQuery": {"query": "SELECT * FROM taxi_trips LIMIT 10"}}'
descriptor = FlightDescriptor.for_command(sql_command.encode())
# Get flight info
flight_info = client.get_flight_info(descriptor, options)
# Fetch results
for endpoint in flight_info.endpoints:
reader = client.do_get(endpoint.ticket, options)
table = reader.read_all()
print(table.to_pandas())
Prepared Statements
import pyarrow.flight as flight
import json
client = flight.FlightClient("grpc://localhost:50051")
# Prepare statement
prepare_command = {
"CommandPreparedStatementQuery": {
"query": "SELECT * FROM taxi_trips WHERE passenger_count = ? LIMIT ?"
}
}
descriptor = flight.FlightDescriptor.for_command(
json.dumps(prepare_command).encode()
)
prepared_statement = client.prepare(descriptor)
# Bind parameters and execute
import pyarrow as pa
parameters = pa.record_batch(
[[2], [10]],
names=["$1", "$2"]
)
prepared_statement.set_parameters(parameters)
flight_info = prepared_statement.execute()
# Fetch results
for endpoint in flight_info.endpoints:
reader = client.do_get(endpoint.ticket)
table = reader.read_all()
print(table.to_pandas())
prepared_statement.close()
Java Client (JDBC)
See the JDBC Connectivity page for Java examples using JDBC drivers.
Go Client
Installation
go get github.com/apache/arrow/go/v18/arrow/flight
go get github.com/apache/arrow/go/v18/arrow/flight/flightsql
Basic Usage
package main
import (
"context"
"fmt"
"github.com/apache/arrow/go/v18/arrow/flight/flightsql"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
func main() {
ctx := context.Background()
conn, _ := grpc.Dial(
"localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
defer conn.Close()
client := flightsql.NewClient(conn)
// Execute query
info, _ := client.Execute(ctx, "SELECT * FROM taxi_trips LIMIT 10")
// Fetch results
reader, _ := client.DoGet(ctx, info.Endpoint[0].Ticket)
defer reader.Release()
for reader.Next() {
record := reader.Record()
fmt.Printf("Rows: %d\n", record.NumRows())
}
}
Prepared Statements
package main
import (
"context"
"github.com/apache/arrow/go/v18/arrow"
"github.com/apache/arrow/go/v18/arrow/array"
"github.com/apache/arrow/go/v18/arrow/flight/flightsql"
"github.com/apache/arrow/go/v18/arrow/memory"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
func main() {
ctx := context.Background()
conn, _ := grpc.Dial(
"localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
defer conn.Close()
client := flightsql.NewClient(conn)
mem := memory.NewGoAllocator()
// Prepare statement
prepared, _ := client.Prepare(
ctx,
"SELECT * FROM taxi_trips WHERE passenger_count = ? LIMIT ?",
)
defer prepared.Close(ctx)
// Bind parameters
builder := array.NewRecordBuilder(mem, prepared.ParameterSchema())
defer builder.Release()
builder.Field(0).(*array.Int64Builder).Append(2)
builder.Field(1).(*array.Int64Builder).Append(10)
params := builder.NewRecord()
defer params.Release()
prepared.SetParameters(params)
// Execute
info, _ := prepared.Execute(ctx)
reader, _ := client.DoGet(ctx, info.Endpoint[0].Ticket)
defer reader.Release()
for reader.Next() {
// Process results
}
}
Rust Client
Dependencies
[dependencies]
arrow = "54"
arrow-flight = "54"
tonic = "0.12"
tokio = { version = "1", features = ["full"] }
Basic Usage
use arrow_flight::{
sql::client::FlightSqlServiceClient,
Ticket,
};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = FlightSqlServiceClient::new(
"http://localhost:50051"
).await?;
// Execute query
let mut stmt = client.execute(
"SELECT * FROM taxi_trips LIMIT 10".to_string(),
None,
).await?;
let flight_info = stmt.into_inner();
// Fetch results
for endpoint in flight_info.endpoint {
let mut stream = client.do_get(endpoint.ticket.unwrap()).await?;
while let Some(batch) = stream.next().await? {
println!("Received {} rows", batch.num_rows());
}
}
Ok(())
}
Flight SQL Commands
GetCatalogs
List available catalogs:
from adbc_driver_flightsql import dbapi
conn = dbapi.connect(uri="grpc://localhost:50051")
cursor = conn.cursor()
cursor.execute("SELECT * FROM information_schema.catalogs")
print(cursor.fetch_arrow_table())
GetSchemas
List schemas in a catalog:
cursor.execute("SELECT * FROM information_schema.schemata")
print(cursor.fetch_arrow_table())
GetTables
List tables in a schema:
cursor.execute("SHOW TABLES")
print(cursor.fetch_arrow_table())
GetTableTypes
Get supported table types:
cursor.execute("SELECT * FROM information_schema.table_types")
print(cursor.fetch_arrow_table())
- Use prepared statements for repeated queries
- Stream large results instead of fetching all at once
- Reuse connections across multiple queries
- Batch parameter bindings when executing prepared statements multiple times
- Enable compression for network-constrained environments
Error Handling
from adbc_driver_flightsql import dbapi
import adbc_driver_flightsql
conn = dbapi.connect(uri="grpc://localhost:50051")
cursor = conn.cursor()
try:
cursor.execute("INVALID SQL QUERY")
result = cursor.fetch_arrow_table()
except adbc_driver_flightsql.DatabaseError as e:
print(f"Database error: {e}")
except adbc_driver_flightsql.ProgrammingError as e:
print(f"Programming error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
finally:
cursor.close()
conn.close()
Testing Your Connection
Test script to verify Flight SQL connectivity:
from adbc_driver_flightsql import dbapi
import sys
def test_connection(host="localhost", port=50051):
try:
uri = f"grpc://{host}:{port}"
conn = dbapi.connect(uri=uri)
cursor = conn.cursor()
# Basic query
cursor.execute("SELECT 1 AS test")
result = cursor.fetch_arrow_table()
print(f"✓ Connection successful")
print(f"✓ Query result: {result.to_pydict()}")
cursor.close()
conn.close()
return True
except Exception as e:
print(f"✗ Connection failed: {e}")
return False
if __name__ == "__main__":
success = test_connection()
sys.exit(0 if success else 1)
- Arrow Flight API - Base Arrow Flight protocol
- ADBC - Arrow Database Connectivity standard
- JDBC - Java database connectivity
- ODBC - Open Database Connectivity