Examples

Table of contents

  1. Basic Examples
    1. Simple Query Test
    2. JOIN Query Test
  2. Advanced Examples
    1. Working with Arrays
    2. Working with Maps (BigQuery/Athena/Trino/Redshift)
    3. Date and Time Operations
    4. Window Functions
  3. Database-Specific Examples
    1. Working with Structs (Athena/Trino/BigQuery)
    2. BigQuery-Specific Features
    3. Athena/Trino Array Operations
    4. Redshift JSON and Map Handling
  4. Testing Patterns
    1. Testing CTEs
    2. Testing Error Cases
    3. Dynamic Test Generation
  5. Best Practices
    1. 1. Organize Mock Tables
    2. 2. Reusable Test Data
    3. 3. Test Complex Business Logic
    4. 4. Document Expected Results

Basic Examples

Simple Query Test

Testing a basic SELECT query with filtering:

from dataclasses import dataclass
from sql_testing_library import sql_test, TestCase
from sql_testing_library.mock_table import BaseMockTable
from pydantic import BaseModel

@dataclass
class Employee:
    id: int
    name: str
    department: str
    salary: float
    active: bool

class EmployeesMockTable(BaseMockTable):
    def get_database_name(self) -> str:
        return "company_db"

    def get_table_name(self) -> str:
        return "employees"

class ActiveEmployeeResult(BaseModel):
    name: str
    department: str

@sql_test(
    mock_tables=[
        EmployeesMockTable([
            Employee(1, "Alice", "Engineering", 100000, True),
            Employee(2, "Bob", "Sales", 80000, False),
            Employee(3, "Charlie", "Engineering", 95000, True),
            Employee(4, "Diana", "HR", 85000, True)
        ])
    ],
    result_class=ActiveEmployeeResult
)
def test_active_employees():
    return TestCase(
        query="""
            SELECT name, department
            FROM employees
            WHERE active = true
            ORDER BY name
        """,
        default_namespace="company_db"
    )

JOIN Query Test

Testing queries with multiple tables:

@dataclass
class Order:
    order_id: int
    user_id: int
    product_id: int
    quantity: int
    total: float

@dataclass
class Product:
    product_id: int
    name: str
    category: str
    price: float

class OrdersMockTable(BaseMockTable):
    def get_database_name(self) -> str:
        return "sales_db"

    def get_table_name(self) -> str:
        return "orders"

class ProductsMockTable(BaseMockTable):
    def get_database_name(self) -> str:
        return "sales_db"

    def get_table_name(self) -> str:
        return "products"

class OrderDetailResult(BaseModel):
    order_id: int
    product_name: str
    quantity: int
    total: float

@sql_test(
    mock_tables=[
        OrdersMockTable([
            Order(1, 101, 1, 2, 39.98),
            Order(2, 102, 2, 1, 999.99),
            Order(3, 101, 3, 3, 89.97)
        ]),
        ProductsMockTable([
            Product(1, "Mouse", "Electronics", 19.99),
            Product(2, "Laptop", "Electronics", 999.99),
            Product(3, "Notebook", "Stationery", 29.99)
        ])
    ],
    result_class=OrderDetailResult
)
def test_order_details():
    return TestCase(
        query="""
            SELECT
                o.order_id,
                p.name as product_name,
                o.quantity,
                o.total
            FROM orders o
            JOIN products p ON o.product_id = p.product_id
            WHERE o.total > 50
            ORDER BY o.order_id
        """,
        default_namespace="sales_db"
    )

Advanced Examples

Working with Arrays

Testing queries with array data types:

from typing import List
from decimal import Decimal

@dataclass
class Customer:
    customer_id: int
    name: str
    tags: List[str]
    purchase_amounts: List[Decimal]

class CustomersMockTable(BaseMockTable):
    def get_database_name(self) -> str:
        return "analytics.customer_data"

    def get_table_name(self) -> str:
        return "customers"

class CustomerTagResult(BaseModel):
    name: str
    tag_count: int
    total_purchases: Decimal

@sql_test(
    mock_tables=[
        CustomersMockTable([
            Customer(
                1,
                "Alice",
                ["vip", "frequent", "email"],
                [Decimal("99.99"), Decimal("149.50"), Decimal("79.99")]
            ),
            Customer(
                2,
                "Bob",
                ["new"],
                [Decimal("29.99")]
            ),
            Customer(
                3,
                "Charlie",
                ["vip", "phone"],
                [Decimal("299.99"), Decimal("199.99")]
            )
        ])
    ],
    result_class=CustomerTagResult
)
def test_customer_analytics():
    return TestCase(
        query="""
            SELECT
                name,
                ARRAY_LENGTH(tags) as tag_count,
                (SELECT SUM(amount) FROM UNNEST(purchase_amounts) as amount) as total_purchases
            FROM customers
            WHERE 'vip' IN UNNEST(tags)
            ORDER BY name
        """,
        default_namespace="analytics.customer_data"
    )

Working with Maps (BigQuery/Athena/Trino/Redshift)

Map types are supported for BigQuery, Athena, Trino, and Redshift adapters:

from typing import Dict, Optional

@dataclass
class UserPreferences:
    user_id: int
    username: str
    settings: Dict[str, str]           # String to string map
    scores: Dict[str, int]             # String to integer map
    metadata: Optional[Dict[str, str]] # Optional map

class UserPreferencesMockTable(BaseMockTable):
    def get_database_name(self) -> str:
        return "analytics"

    def get_table_name(self) -> str:
        return "user_preferences"

# Mock data with maps
mock_data = [
    UserPreferences(
        user_id=1,
        username="alice",
        settings={"theme": "dark", "notifications": "enabled", "language": "en"},
        scores={"level1": 100, "level2": 85, "level3": 92},
        metadata={"source": "mobile", "version": "2.1"}
    ),
    UserPreferences(
        user_id=2,
        username="bob",
        settings={"theme": "light", "notifications": "disabled"},
        scores={"level1": 95},
        metadata=None  # NULL map
    ),
    UserPreferences(
        user_id=3,
        username="charlie",
        settings={},  # Empty map
        scores={"level1": 88, "level2": 90},
        metadata={}
    )
]

# Query using map operations
test_case = TestCase(
    query="""
        SELECT
            username,
            settings['theme'] as theme_preference,
            settings['notifications'] as notifications_enabled,
            MAP_KEYS(scores) as completed_levels,
            MAP_VALUES(scores) as level_scores,
            CARDINALITY(settings) as settings_count,
            CASE
                WHEN metadata IS NULL THEN 'No metadata'
                WHEN CARDINALITY(metadata) = 0 THEN 'Empty metadata'
                ELSE metadata['source']
            END as data_source
        FROM user_preferences
        WHERE settings['theme'] IS NOT NULL
        ORDER BY username
    """,
    default_namespace="analytics"
)

# BigQuery-specific map query (stored as JSON)
bigquery_test_case = TestCase(
    query="""
        SELECT
            username,
            JSON_EXTRACT_SCALAR(settings, '$.theme') as theme_preference,
            JSON_EXTRACT_SCALAR(settings, '$.notifications') as notifications_enabled,
            JSON_EXTRACT_SCALAR(metadata, '$.source') as data_source,
            CASE
                WHEN metadata IS NULL THEN 'No metadata'
                WHEN metadata = '{}' THEN 'Empty metadata'
                ELSE 'Has metadata'
            END as metadata_status
        FROM user_preferences
        WHERE JSON_EXTRACT_SCALAR(settings, '$.theme') IS NOT NULL
        ORDER BY username
    """,
    default_namespace="my-project.analytics"
)

Date and Time Operations

Working with temporal data:

from datetime import date, datetime

@dataclass
class Event:
    event_id: int
    event_name: str
    event_date: date
    created_at: datetime
    duration_hours: float

class EventsMockTable(BaseMockTable):
    def get_database_name(self) -> str:
        return "events_db"

    def get_table_name(self) -> str:
        return "events"

class EventSummaryResult(BaseModel):
    month: str
    event_count: int
    total_hours: float

@sql_test(
    mock_tables=[
        EventsMockTable([
            Event(1, "Conference", date(2024, 1, 15), datetime(2024, 1, 1, 10, 30), 8.0),
            Event(2, "Workshop", date(2024, 1, 20), datetime(2024, 1, 5, 14, 0), 4.0),
            Event(3, "Seminar", date(2024, 2, 10), datetime(2024, 2, 1, 9, 0), 3.0),
            Event(4, "Training", date(2024, 2, 25), datetime(2024, 2, 15, 13, 30), 6.0)
        ])
    ],
    result_class=EventSummaryResult
)
def test_event_summary():
    return TestCase(
        query="""
            SELECT
                FORMAT_DATE('%Y-%m', event_date) as month,
                COUNT(*) as event_count,
                SUM(duration_hours) as total_hours
            FROM events
            WHERE event_date >= '2024-01-01'
            GROUP BY month
            ORDER BY month
        """,
        default_namespace="events_db"
    )

Window Functions

Testing analytical queries with window functions:

@dataclass
class Sale:
    sale_id: int
    salesperson: str
    region: str
    amount: Decimal
    sale_date: date

class SalesMockTable(BaseMockTable):
    def get_database_name(self) -> str:
        return "sales_analytics"

    def get_table_name(self) -> str:
        return "sales"

class SalesRankResult(BaseModel):
    salesperson: str
    region: str
    amount: Decimal
    region_rank: int
    total_rank: int

@sql_test(
    mock_tables=[
        SalesMockTable([
            Sale(1, "Alice", "North", Decimal("5000"), date(2024, 1, 1)),
            Sale(2, "Bob", "North", Decimal("7000"), date(2024, 1, 2)),
            Sale(3, "Charlie", "South", Decimal("6000"), date(2024, 1, 3)),
            Sale(4, "Diana", "South", Decimal("8000"), date(2024, 1, 4)),
            Sale(5, "Eve", "North", Decimal("4500"), date(2024, 1, 5))
        ])
    ],
    result_class=SalesRankResult
)
def test_sales_ranking():
    return TestCase(
        query="""
            SELECT
                salesperson,
                region,
                amount,
                RANK() OVER (PARTITION BY region ORDER BY amount DESC) as region_rank,
                RANK() OVER (ORDER BY amount DESC) as total_rank
            FROM sales
            WHERE sale_date >= '2024-01-01'
            ORDER BY total_rank
        """,
        default_namespace="sales_analytics"
    )

Database-Specific Examples

Working with Structs (Athena/Trino/BigQuery)

Struct types are supported for Athena, Trino, and BigQuery adapters using dataclasses or Pydantic models:

from dataclasses import dataclass
from typing import Optional
from decimal import Decimal

@dataclass
class Address:
    street: str
    city: str
    state: str
    zip_code: str

@dataclass
class Employee:
    employee_id: int
    name: str
    salary: Decimal
    address: Address  # Nested struct
    is_active: bool

class EmployeesMockTable(BaseMockTable):
    def get_database_name(self) -> str:
        return "hr_db"

    def get_table_name(self) -> str:
        return "employees"

class EmployeeLocationResult(BaseModel):
    name: str
    city: str
    state: str
    salary: Decimal

# Mock data with structs
mock_data = [
    Employee(
        employee_id=1,
        name="Alice Johnson",
        salary=Decimal("120000.50"),
        address=Address(
            street="123 Main St",
            city="New York",
            state="NY",
            zip_code="10001"
        ),
        is_active=True
    ),
    Employee(
        employee_id=2,
        name="Bob Smith",
        salary=Decimal("95000.00"),
        address=Address(
            street="456 Oak Ave",
            city="San Francisco",
            state="CA",
            zip_code="94102"
        ),
        is_active=True
    ),
    Employee(
        employee_id=3,
        name="Charlie Brown",
        salary=Decimal("105000.75"),
        address=Address(
            street="789 Pine Rd",
            city="Seattle",
            state="WA",
            zip_code="98101"
        ),
        is_active=False
    )
]

@sql_test(
    adapter_type="athena",  # or "trino" or "bigquery"
    mock_tables=[EmployeesMockTable(mock_data)],
    result_class=EmployeeLocationResult
)
def test_employee_locations():
    return TestCase(
        query="""
            SELECT
                name,
                address.city as city,
                address.state as state,
                salary
            FROM employees
            WHERE is_active = true
                AND address.state IN ('NY', 'CA')
            ORDER BY name
        """,
        default_namespace="hr_db"
    )

# Test with WHERE clause on struct fields
@sql_test(
    adapter_type="trino",
    mock_tables=[EmployeesMockTable(mock_data)],
    result_class=dict
)
def test_employees_by_city():
    return TestCase(
        query="""
            SELECT
                name,
                address.street as street_address,
                address.city || ', ' || address.state || ' ' || address.zip_code as full_address
            FROM employees
            WHERE address.city = 'New York'
        """,
        default_namespace="hr_db"
    )

# Using Pydantic models for structs
from pydantic import BaseModel

class ContactInfo(BaseModel):
    email: str
    phone: str
    preferred_contact: str

class Customer(BaseModel):
    customer_id: int
    name: str
    contact: ContactInfo
    total_purchases: Decimal

# Query with nested struct access
test_case = TestCase(
    query="""
        SELECT
            name,
            contact.email as email,
            contact.preferred_contact as contact_method,
            total_purchases
        FROM customers
        WHERE contact.preferred_contact = 'email'
            AND total_purchases > 1000
        ORDER BY total_purchases DESC
    """,
    default_namespace="sales_db"
)

# Working with lists of structs
@dataclass
class OrderItem:
    product_id: int
    quantity: int
    unit_price: Decimal

@dataclass
class Order:
    order_id: int
    customer_id: int
    items: List[OrderItem]  # List of structs
    order_date: date

# Mock data with list of structs
orders_data = [
    Order(
        order_id=1,
        customer_id=100,
        items=[
            OrderItem(product_id=1, quantity=2, unit_price=Decimal("29.99")),
            OrderItem(product_id=2, quantity=1, unit_price=Decimal("49.99")),
            OrderItem(product_id=3, quantity=3, unit_price=Decimal("19.99"))
        ],
        order_date=date(2024, 1, 15)
    ),
    Order(
        order_id=2,
        customer_id=101,
        items=[
            OrderItem(product_id=2, quantity=5, unit_price=Decimal("49.99"))
        ],
        order_date=date(2024, 1, 16)
    )
]

# Query using list of structs
@sql_test(
    adapter_type="athena",  # or "trino" or "bigquery"
    mock_tables=[OrdersMockTable(orders_data)],
    result_class=dict
)
def test_order_items_analysis():
    return TestCase(
        query="""
            SELECT
                order_id,
                customer_id,
                CARDINALITY(items) as num_items,
                -- Access first item details
                items[1].product_id as first_product_id,
                items[1].quantity as first_product_qty,
                -- Calculate total using array operations
                REDUCE(
                    items,
                    0.0,
                    (total, item) -> total + (item.quantity * item.unit_price),
                    total -> total
                ) as order_total
            FROM orders
            WHERE CARDINALITY(items) > 0
            ORDER BY order_id
        """,
        default_namespace="test_db"
    )

BigQuery-Specific Features

BigQuery now supports struct types with dataclasses and Pydantic models:

# BigQuery struct support (NEW!)
@sql_test(
    adapter_type="bigquery",
    mock_tables=[EmployeesMockTable(mock_data)],  # Same mock data as above
    result_class=EmployeeLocationResult
)
def test_bigquery_struct_access():
    return TestCase(
        query="""
            SELECT
                name,
                address.city as city,
                address.state as state,
                salary
            FROM employees
            WHERE is_active = true
                AND address.state IN ('NY', 'CA')
            ORDER BY name
        """,
        default_namespace="my-project.my_dataset"
    )

# BigQuery with list of structs and array operations
@sql_test(
    adapter_type="bigquery",
    mock_tables=[OrdersMockTable(orders_data)],  # Same list of structs data
    result_class=dict
)
def test_bigquery_struct_arrays():
    return TestCase(
        query="""
            SELECT
                order_id,
                customer_id,
                ARRAY_LENGTH(items) as num_items,
                -- Access first item details (0-based indexing in BigQuery)
                items[OFFSET(0)].product_id as first_product_id,
                items[OFFSET(0)].quantity as first_product_qty,
                -- Calculate total using array aggregation
                (SELECT SUM(item.quantity * item.unit_price)
                 FROM UNNEST(items) as item) as order_total
            FROM orders
            WHERE ARRAY_LENGTH(items) > 0
            ORDER BY order_id
        """,
        default_namespace="my-project.my_dataset"
    )

Using BigQuery SQL features in your queries:

# BigQuery adapter test
@sql_test(
    adapter_type="bigquery",
    mock_tables=[products_mock],
    result_class=dict
)
def test_bigquery_structs():
    return TestCase(
        query="""
            SELECT
                STRUCT(
                    product_id as id,
                    name,
                    ARRAY[
                        STRUCT('color' as attribute, 'red' as value),
                        STRUCT('size' as attribute, 'large' as value)
                    ] as attributes
                ) as product_info
            FROM products
            WHERE category = 'Electronics'
        """,
        default_namespace="my-project.my_dataset"
    )

Athena/Trino Array Operations

Using Presto/Trino SQL array functions:

# Athena adapter test
@sql_test(
    adapter_type="athena",
    mock_tables=[events_mock],
    result_class=dict
)
def test_athena_arrays():
    return TestCase(
        query="""
            SELECT
                event_name,
                CARDINALITY(attendees) as attendee_count,
                FILTER(attendees, x -> x LIKE '%@company.com') as company_attendees
            FROM events
            WHERE CONTAINS(attendees, 'alice@company.com')
        """,
        default_namespace="analytics_db"
    )

Redshift JSON and Map Handling

Working with JSON and map types in Redshift:

# Redshift adapter test with maps (using SUPER type)
@dataclass
class UserActivity:
    user_id: int
    username: str
    preferences: Dict[str, str]  # Stored as SUPER type
    activity_scores: Dict[str, int]

# Mock data with maps
mock_data = [
    UserActivity(
        user_id=1,
        username="alice",
        preferences={"theme": "dark", "language": "en"},
        activity_scores={"login": 100, "posts": 45}
    )
]

@sql_test(
    adapter_type="redshift",
    mock_tables=[UserActivityMockTable(mock_data)],
    result_class=dict
)
def test_redshift_maps():
    return TestCase(
        query="""
            SELECT
                username,
                preferences.theme as theme_pref,
                preferences.language as lang_pref,
                activity_scores.login as login_score
            FROM user_activity
            WHERE preferences.theme = 'dark'
        """,
        default_namespace="analytics_db"
    )

# Traditional JSON handling in Redshift
@sql_test(
    adapter_type="redshift",
    mock_tables=[user_events_mock],
    result_class=dict
)
def test_redshift_json():
    return TestCase(
        query="""
            SELECT
                user_id,
                JSON_EXTRACT_PATH_TEXT(event_data, 'action') as action,
                JSON_EXTRACT_PATH_TEXT(event_data, 'timestamp')::timestamp as event_time
            FROM user_events
            WHERE JSON_EXTRACT_PATH_TEXT(event_data, 'action') = 'purchase'
        """,
        default_namespace="events_db"
    )

Testing Patterns

Testing CTEs

Breaking down complex queries:

@sql_test(
    mock_tables=[transactions_mock, customers_mock],
    result_class=CustomerMetricsResult
)
def test_customer_metrics():
    return TestCase(
        query="""
            WITH monthly_totals AS (
                SELECT
                    customer_id,
                    DATE_TRUNC('month', transaction_date) as month,
                    SUM(amount) as monthly_total
                FROM transactions
                GROUP BY customer_id, month
            ),
            customer_stats AS (
                SELECT
                    customer_id,
                    AVG(monthly_total) as avg_monthly_spend,
                    MAX(monthly_total) as max_monthly_spend,
                    COUNT(DISTINCT month) as active_months
                FROM monthly_totals
                GROUP BY customer_id
            )
            SELECT
                c.name,
                cs.avg_monthly_spend,
                cs.max_monthly_spend,
                cs.active_months
            FROM customer_stats cs
            JOIN customers c ON cs.customer_id = c.customer_id
            WHERE cs.active_months >= 3
            ORDER BY cs.avg_monthly_spend DESC
        """,
        default_namespace="analytics_db"
    )

Testing Error Cases

Validating business logic:

@sql_test(
    mock_tables=[
        InventoryMockTable([
            {"product_id": 1, "quantity": 5, "min_stock": 10},
            {"product_id": 2, "quantity": 15, "min_stock": 10},
            {"product_id": 3, "quantity": 0, "min_stock": 5}
        ])
    ],
    result_class=LowStockAlert
)
def test_low_stock_alerts():
    return TestCase(
        query="""
            SELECT
                product_id,
                quantity,
                min_stock,
                CASE
                    WHEN quantity = 0 THEN 'OUT_OF_STOCK'
                    WHEN quantity < min_stock THEN 'LOW_STOCK'
                    ELSE 'NORMAL'
                END as stock_status
            FROM inventory
            WHERE quantity < min_stock
            ORDER BY quantity ASC
        """,
        default_namespace="inventory_db"
    )

Dynamic Test Generation

Creating parameterized tests:

import pytest

test_cases = [
    ("North", 2),  # region, expected_count
    ("South", 1),
    ("East", 0)
]

@pytest.mark.parametrize("region,expected_count", test_cases)
def test_sales_by_region(region, expected_count):
    @sql_test(
        mock_tables=[
            SalesMockTable([
                Sale(1, "Alice", "North", 5000, date(2024, 1, 1)),
                Sale(2, "Bob", "North", 7000, date(2024, 1, 2)),
                Sale(3, "Charlie", "South", 6000, date(2024, 1, 3))
            ])
        ],
        result_class=dict
    )
    def _test():
        return TestCase(
            query=f"""
                SELECT COUNT(*) as count
                FROM sales
                WHERE region = '{region}'
            """,
            default_namespace="sales_db"
        )

    results = _test()
    assert results[0]['count'] == expected_count

Best Practices

1. Organize Mock Tables

Create a separate module for mock tables:

# tests/mocks/tables.py
from sql_testing_library.mock_table import BaseMockTable

class BaseCompanyMockTable(BaseMockTable):
    """Base class for all company mock tables"""
    def get_database_name(self) -> str:
        return "company_db.public"

class UsersMockTable(BaseCompanyMockTable):
    def get_table_name(self) -> str:
        return "users"

class OrdersMockTable(BaseCompanyMockTable):
    def get_table_name(self) -> str:
        return "orders"

2. Reusable Test Data

Create factory functions for test data:

# tests/factories.py
from datetime import date, timedelta
from decimal import Decimal

def create_test_orders(count: int, start_date: date):
    """Generate test orders with sequential dates"""
    return [
        Order(
            order_id=i,
            user_id=100 + (i % 5),
            amount=Decimal(str(50 + i * 10)),
            order_date=start_date + timedelta(days=i)
        )
        for i in range(1, count + 1)
    ]

3. Test Complex Business Logic

Focus on testing SQL logic, not just syntax:

@sql_test(
    mock_tables=[
        OrdersMockTable(create_test_orders(100, date(2024, 1, 1))),
        CustomersMockTable(create_test_customers())
    ],
    result_class=CustomerLTVResult
)
def test_customer_lifetime_value():
    """Test LTV calculation includes all business rules"""
    return TestCase(
        query=load_sql_file("queries/customer_ltv.sql"),
        default_namespace="analytics_db",
        description="Verify LTV calculation handles refunds, discounts, and loyalty tiers"
    )

4. Document Expected Results

Make tests self-documenting:

def test_quarterly_revenue():
    """
    Test quarterly revenue aggregation.

    Expected results:
    - Q1 2024: $15,000 (3 orders)
    - Q2 2024: $22,000 (4 orders)
    - Excludes cancelled orders
    - Applies regional tax rates
    """
    # Test implementation...