Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/spiceai/spiceai/llms.txt

Use this file to discover all available pages before exploring further.

Arrow Flight SQL is a specialized protocol built on Arrow Flight for database-style SQL query execution. It provides metadata discovery, prepared statements, and efficient query execution.

Overview

Arrow Flight SQL extends Arrow Flight with:
  • SQL-specific commands (GetCatalogs, GetSchemas, GetTables)
  • Prepared statements with parameter binding
  • Metadata discovery for databases, tables, and schemas
  • Type information (GetXdbcTypeInfo)
  • Efficient data transfer using Arrow columnar format

Connection

Default Endpoint

grpc://localhost:50051

Default Ports

ServicePortDescription
HTTP API8090SQL queries, health checks
Arrow Flight50051Flight and Flight SQL
Metrics9090Prometheus metrics

Authentication

Basic Authentication

from adbc_driver_flightsql import dbapi

conn = dbapi.connect(
    uri="grpc://localhost:50051",
    username="your-username",
    password="your-password"
)

Bearer Token (API Key)

from adbc_driver_flightsql import dbapi

conn = dbapi.connect(
    uri="grpc://localhost:50051",
    db_kwargs={
        "adbc.flight.sql.authorization_header": "Bearer your-api-key"
    }
)

Python Client (ADBC)

Installation

pip install adbc-driver-flightsql pyarrow

Basic Query

from adbc_driver_flightsql import dbapi

conn = dbapi.connect(uri="grpc://localhost:50051")
cursor = conn.cursor()

cursor.execute("SELECT * FROM taxi_trips LIMIT 10")
result = cursor.fetch_arrow_table()
print(result.to_pandas())

cursor.close()
conn.close()

Prepared Statements

Prepared statements provide better performance and prevent SQL injection:
from adbc_driver_flightsql import dbapi

conn = dbapi.connect(uri="grpc://localhost:50051")
cursor = conn.cursor()

# Prepared statement with positional parameters
query = "SELECT * FROM taxi_trips WHERE passenger_count = ? LIMIT ?"
cursor.execute(query, parameters=[2, 10])
result = cursor.fetch_arrow_table()
print(result)

# Reuse prepared statement with different parameters
cursor.execute(query, parameters=[3, 20])
result = cursor.fetch_arrow_table()
print(result)

cursor.close()
conn.close()

Streaming Large Results

from adbc_driver_flightsql import dbapi

conn = dbapi.connect(uri="grpc://localhost:50051")
cursor = conn.cursor()

cursor.execute("SELECT * FROM large_table")

# Stream results in batches
while True:
    batch = cursor.fetch_record_batch()
    if batch is None:
        break
    print(f"Processing batch with {len(batch)} rows")
    # Process batch...

cursor.close()
conn.close()

Metadata Discovery

from adbc_driver_flightsql import dbapi

conn = dbapi.connect(uri="grpc://localhost:50051")
cursor = conn.cursor()

# List all tables
cursor.execute("SHOW TABLES")
tables = cursor.fetch_arrow_table()
print(tables.to_pandas())

# Get table schema
cursor.execute("DESCRIBE taxi_trips")
schema = cursor.fetch_arrow_table()
print(schema.to_pandas())

cursor.close()
conn.close()

Python Client (PyArrow Flight SQL)

Installation

pip install pyarrow

Basic Usage

import pyarrow.flight as flight

# Connect to Flight SQL server
client = flight.FlightClient("grpc://localhost:50051")

# Authenticate
token = client.authenticate_basic_token("username", "password")
options = flight.FlightCallOptions(headers=[(b"authorization", token)])

# Execute SQL query
from pyarrow.flight import FlightDescriptor

sql_command = '{"CommandStatementQuery": {"query": "SELECT * FROM taxi_trips LIMIT 10"}}'
descriptor = FlightDescriptor.for_command(sql_command.encode())

# Get flight info
flight_info = client.get_flight_info(descriptor, options)

# Fetch results
for endpoint in flight_info.endpoints:
    reader = client.do_get(endpoint.ticket, options)
    table = reader.read_all()
    print(table.to_pandas())

Prepared Statements

import pyarrow.flight as flight
import json

client = flight.FlightClient("grpc://localhost:50051")

# Prepare statement
prepare_command = {
    "CommandPreparedStatementQuery": {
        "query": "SELECT * FROM taxi_trips WHERE passenger_count = ? LIMIT ?"
    }
}

descriptor = flight.FlightDescriptor.for_command(
    json.dumps(prepare_command).encode()
)
prepared_statement = client.prepare(descriptor)

# Bind parameters and execute
import pyarrow as pa

parameters = pa.record_batch(
    [[2], [10]],
    names=["$1", "$2"]
)

prepared_statement.set_parameters(parameters)
flight_info = prepared_statement.execute()

# Fetch results
for endpoint in flight_info.endpoints:
    reader = client.do_get(endpoint.ticket)
    table = reader.read_all()
    print(table.to_pandas())

prepared_statement.close()

Java Client (JDBC)

See the JDBC Connectivity page for Java examples using JDBC drivers.

Go Client

Installation

go get github.com/apache/arrow/go/v18/arrow/flight
go get github.com/apache/arrow/go/v18/arrow/flight/flightsql

Basic Usage

package main

import (
    "context"
    "fmt"

    "github.com/apache/arrow/go/v18/arrow/flight/flightsql"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
)

func main() {
    ctx := context.Background()

    conn, _ := grpc.Dial(
        "localhost:50051",
        grpc.WithTransportCredentials(insecure.NewCredentials()),
    )
    defer conn.Close()

    client := flightsql.NewClient(conn)

    // Execute query
    info, _ := client.Execute(ctx, "SELECT * FROM taxi_trips LIMIT 10")

    // Fetch results
    reader, _ := client.DoGet(ctx, info.Endpoint[0].Ticket)
    defer reader.Release()

    for reader.Next() {
        record := reader.Record()
        fmt.Printf("Rows: %d\n", record.NumRows())
    }
}

Prepared Statements

package main

import (
    "context"

    "github.com/apache/arrow/go/v18/arrow"
    "github.com/apache/arrow/go/v18/arrow/array"
    "github.com/apache/arrow/go/v18/arrow/flight/flightsql"
    "github.com/apache/arrow/go/v18/arrow/memory"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
)

func main() {
    ctx := context.Background()
    conn, _ := grpc.Dial(
        "localhost:50051",
        grpc.WithTransportCredentials(insecure.NewCredentials()),
    )
    defer conn.Close()

    client := flightsql.NewClient(conn)
    mem := memory.NewGoAllocator()

    // Prepare statement
    prepared, _ := client.Prepare(
        ctx,
        "SELECT * FROM taxi_trips WHERE passenger_count = ? LIMIT ?",
    )
    defer prepared.Close(ctx)

    // Bind parameters
    builder := array.NewRecordBuilder(mem, prepared.ParameterSchema())
    defer builder.Release()

    builder.Field(0).(*array.Int64Builder).Append(2)
    builder.Field(1).(*array.Int64Builder).Append(10)

    params := builder.NewRecord()
    defer params.Release()

    prepared.SetParameters(params)

    // Execute
    info, _ := prepared.Execute(ctx)
    reader, _ := client.DoGet(ctx, info.Endpoint[0].Ticket)
    defer reader.Release()

    for reader.Next() {
        // Process results
    }
}

Rust Client

Dependencies

[dependencies]
arrow = "54"
arrow-flight = "54"
tonic = "0.12"
tokio = { version = "1", features = ["full"] }

Basic Usage

use arrow_flight::{
    sql::client::FlightSqlServiceClient,
    Ticket,
};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut client = FlightSqlServiceClient::new(
        "http://localhost:50051"
    ).await?;

    // Execute query
    let mut stmt = client.execute(
        "SELECT * FROM taxi_trips LIMIT 10".to_string(),
        None,
    ).await?;

    let flight_info = stmt.into_inner();

    // Fetch results
    for endpoint in flight_info.endpoint {
        let mut stream = client.do_get(endpoint.ticket.unwrap()).await?;

        while let Some(batch) = stream.next().await? {
            println!("Received {} rows", batch.num_rows());
        }
    }

    Ok(())
}

Flight SQL Commands

GetCatalogs

List available catalogs:
from adbc_driver_flightsql import dbapi

conn = dbapi.connect(uri="grpc://localhost:50051")
cursor = conn.cursor()

cursor.execute("SELECT * FROM information_schema.catalogs")
print(cursor.fetch_arrow_table())

GetSchemas

List schemas in a catalog:
cursor.execute("SELECT * FROM information_schema.schemata")
print(cursor.fetch_arrow_table())

GetTables

List tables in a schema:
cursor.execute("SHOW TABLES")
print(cursor.fetch_arrow_table())

GetTableTypes

Get supported table types:
cursor.execute("SELECT * FROM information_schema.table_types")
print(cursor.fetch_arrow_table())

Performance Best Practices

  1. Use prepared statements for repeated queries
  2. Stream large results instead of fetching all at once
  3. Reuse connections across multiple queries
  4. Batch parameter bindings when executing prepared statements multiple times
  5. Enable compression for network-constrained environments

Error Handling

from adbc_driver_flightsql import dbapi
import adbc_driver_flightsql

conn = dbapi.connect(uri="grpc://localhost:50051")
cursor = conn.cursor()

try:
    cursor.execute("INVALID SQL QUERY")
    result = cursor.fetch_arrow_table()
except adbc_driver_flightsql.DatabaseError as e:
    print(f"Database error: {e}")
except adbc_driver_flightsql.ProgrammingError as e:
    print(f"Programming error: {e}")
except Exception as e:
    print(f"Unexpected error: {e}")
finally:
    cursor.close()
    conn.close()

Testing Your Connection

Test script to verify Flight SQL connectivity:
from adbc_driver_flightsql import dbapi
import sys

def test_connection(host="localhost", port=50051):
    try:
        uri = f"grpc://{host}:{port}"
        conn = dbapi.connect(uri=uri)
        cursor = conn.cursor()

        # Basic query
        cursor.execute("SELECT 1 AS test")
        result = cursor.fetch_arrow_table()
        print(f"✓ Connection successful")
        print(f"✓ Query result: {result.to_pydict()}")

        cursor.close()
        conn.close()
        return True
    except Exception as e:
        print(f"✗ Connection failed: {e}")
        return False

if __name__ == "__main__":
    success = test_connection()
    sys.exit(0 if success else 1)
  • Arrow Flight API - Base Arrow Flight protocol
  • ADBC - Arrow Database Connectivity standard
  • JDBC - Java database connectivity
  • ODBC - Open Database Connectivity