Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/spiceai/spiceai/llms.txt

Use this file to discover all available pages before exploring further.

Arrow Flight is a high-performance protocol built on gRPC and Apache Arrow for efficient data transfer. Spice implements the Arrow Flight protocol for zero-copy streaming of query results.

Overview

Arrow Flight provides:
  • Zero-copy data transfer using Apache Arrow columnar format
  • High throughput with parallel data streams
  • Network efficiency with binary serialization
  • gRPC-based transport with HTTP/2 multiplexing

Connection

Default Endpoint

grpc://localhost:50051

Default Ports

ServicePortDescription
HTTP API8090SQL queries, health checks
Arrow Flight50051Flight and Flight SQL
Metrics9090Prometheus metrics

Authentication

Spice supports Basic Authentication and Bearer Token authentication for Arrow Flight.

Basic Authentication

Provide username and password during handshake:
import pyarrow.flight as flight

client = flight.FlightClient("grpc://localhost:50051")

# Authenticate
bearer_token = client.authenticate_basic_token("username", "password")

Bearer Token

Use API keys as bearer tokens:
import pyarrow.flight as flight

client = flight.FlightClient("grpc://localhost:50051")
options = flight.FlightCallOptions(
    headers=[(b"authorization", b"Bearer your-api-key")]
)

Session Tokens

After successful authentication, Spice returns a session ID that can be reused:
# Session ID from handshake response can be used for subsequent requests
options = flight.FlightCallOptions(
    headers=[(b"authorization", f"Bearer {session_id}".encode())]
)

Python Client

Installation

pip install pyarrow

Basic Usage

import pyarrow.flight as flight
import pyarrow as pa

# Connect to Spice Flight server
client = flight.FlightClient("grpc://localhost:50051")

# Execute query using Ticket
ticket = flight.Ticket(b'{"sql": "SELECT * FROM taxi_trips LIMIT 10"}')
flight_reader = client.do_get(ticket)

# Read results as Arrow Table
table = flight_reader.read_all()
print(table.to_pandas())

Streaming Results

import pyarrow.flight as flight

client = flight.FlightClient("grpc://localhost:50051")

# Stream large result sets
ticket = flight.Ticket(b'{"sql": "SELECT * FROM large_table"}')
flight_reader = client.do_get(ticket)

# Process batches as they arrive
for batch in flight_reader:
    # batch.data is a RecordBatch
    print(f"Received batch with {batch.data.num_rows} rows")
    # Process batch...

Schema Retrieval

import pyarrow.flight as flight

client = flight.FlightClient("grpc://localhost:50051")

# Get schema without fetching data
descriptor = flight.FlightDescriptor.for_command(
    b'{"sql": "SELECT * FROM taxi_trips"}'
)
schema_result = client.get_schema(descriptor)
schema = schema_result.schema

print(f"Schema: {schema}")

Writing Data

Write data to Spice using do_put:
import pyarrow as pa
import pyarrow.flight as flight

client = flight.FlightClient("grpc://localhost:50051")

# Create Arrow table
data = pa.table({
    "id": [1, 2, 3],
    "name": ["Alice", "Bob", "Charlie"],
    "value": [100, 200, 300]
})

# Write to Spice
descriptor = flight.FlightDescriptor.for_path("my_dataset")
writer, reader = client.do_put(descriptor, data.schema)
writer.write_table(data)
writer.close()

# Read acknowledgment
for chunk in reader:
    print(f"Write acknowledged: {chunk}")

JavaScript/TypeScript Client

Installation

npm install apache-arrow @apache-arrow/ts

Basic Usage

import { FlightClient } from '@apache-arrow/flight';
import { tableFromIPC } from 'apache-arrow';

const client = new FlightClient('grpc://localhost:50051');

// Execute query
const ticket = {
  sql: 'SELECT * FROM taxi_trips LIMIT 10'
};

const stream = await client.doGet(JSON.stringify(ticket));
const table = await tableFromIPC(stream);

console.log(table.toString());

Rust Client

Dependencies

[dependencies]
arrow = "54"
arrow-flight = "54"
tokio = { version = "1", features = ["full"] }

Basic Usage

use arrow_flight::{
    flight_service_client::FlightServiceClient,
    Ticket,
};
use arrow::ipc::reader::StreamReader;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Connect to Spice
    let mut client = FlightServiceClient::connect(
        "http://localhost:50051"
    ).await?;

    // Execute query
    let ticket = Ticket {
        ticket: r#"{"sql": "SELECT * FROM taxi_trips LIMIT 10"}"#
            .as_bytes()
            .to_vec(),
    };

    let mut stream = client.do_get(ticket).await?.into_inner();

    // Read Arrow batches
    while let Some(flight_data) = stream.message().await? {
        println!("Received batch with {} bytes", flight_data.data_body.len());
    }

    Ok(())
}

Go Client

Installation

go get github.com/apache/arrow/go/v18/arrow
go get github.com/apache/arrow/go/v18/arrow/flight

Basic Usage

package main

import (
    "context"
    "fmt"

    "github.com/apache/arrow/go/v18/arrow/flight"
    "google.golang.org/grpc"
)

func main() {
    conn, _ := grpc.Dial("localhost:50051", grpc.WithInsecure())
    client := flight.NewFlightClient(conn)

    ctx := context.Background()

    // Execute query
    ticket := &flight.Ticket{
        Ticket: []byte(`{"sql": "SELECT * FROM taxi_trips LIMIT 10"}`),
    }

    stream, _ := client.DoGet(ctx, ticket)

    // Read results
    reader, _ := flight.NewRecordReader(stream)
    defer reader.Release()

    for reader.Next() {
        rec := reader.Record()
        fmt.Printf("Received batch with %d rows\n", rec.NumRows())
    }
}

Java Client

Dependencies (Maven)

<dependency>
    <groupId>org.apache.arrow</groupId>
    <artifactId>flight-core</artifactId>
    <version>18.0.0</version>
</dependency>
<dependency>
    <groupId>org.apache.arrow</groupId>
    <artifactId>arrow-memory-netty</artifactId>
    <version>18.0.0</version>
</dependency>

Basic Usage

import org.apache.arrow.flight.*;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;

public class SpiceFlightClient {
    public static void main(String[] args) throws Exception {
        RootAllocator allocator = new RootAllocator();
        Location location = Location.forGrpcInsecure("localhost", 50051);
        FlightClient client = FlightClient.builder(allocator, location).build();

        // Execute query
        String sql = "{\"sql\": \"SELECT * FROM taxi_trips LIMIT 10\"}";
        Ticket ticket = new Ticket(sql.getBytes());
        FlightStream stream = client.getStream(ticket);

        // Read results
        while (stream.next()) {
            VectorSchemaRoot root = stream.getRoot();
            System.out.println("Rows: " + root.getRowCount());
        }

        stream.close();
        client.close();
        allocator.close();
    }
}

Performance Optimization

Parallel Streams

Arrow Flight supports parallel data streams for large result sets:
import pyarrow.flight as flight
from concurrent.futures import ThreadPoolExecutor

client = flight.FlightClient("grpc://localhost:50051")

# Get flight info with multiple endpoints
descriptor = flight.FlightDescriptor.for_command(
    b'{"sql": "SELECT * FROM large_table"}'
)
flight_info = client.get_flight_info(descriptor)

# Parallel fetch from multiple endpoints
def fetch_endpoint(endpoint):
    return client.do_get(endpoint.ticket).read_all()

with ThreadPoolExecutor() as executor:
    tables = list(executor.map(fetch_endpoint, flight_info.endpoints))
    
# Concatenate results
import pyarrow as pa
full_table = pa.concat_tables(tables)

Compression

Enable compression for network efficiency:
import pyarrow.flight as flight

client = flight.FlightClient(
    "grpc://localhost:50051",
    middleware=[flight.CompressionMiddleware()]
)

Rate Limiting

Spice enforces rate limits on write operations (DoPut) to prevent abuse:
  • Default limit: 100 requests per minute
  • Configurable via runtime settings

Error Handling

import pyarrow.flight as flight

client = flight.FlightClient("grpc://localhost:50051")

try:
    ticket = flight.Ticket(b'{"sql": "INVALID SQL"}')
    reader = client.do_get(ticket)
    table = reader.read_all()
except flight.FlightUnavailableError as e:
    print(f"Service unavailable: {e}")
except flight.FlightInternalError as e:
    print(f"Internal error: {e}")
except flight.FlightInvalidRequestError as e:
    print(f"Invalid request: {e}")

Best Practices

  1. Reuse connections - Create a single client and reuse it
  2. Stream large results - Use streaming instead of read_all() for large datasets
  3. Use session tokens - Authenticate once, reuse the session
  4. Enable compression - For network-constrained environments
  5. Handle errors gracefully - Implement retry logic for transient failures