While the CLI is great for interactive use, the ev SDK provides more control for building automation, workflows, and integrations by defining jobs programmatically.

Basic Job Definition

Creating an Environment

from ev import Env

# Create a Python environment with specific version
env = Env("3.11")

Defining Jobs

from ev import Env, Job

# Create environment
env = Env("3.11").pip_install(["daft==0.5.9"])

# Create job
job = Job("data_processing", env)

@job.main()
def main():
    import daft
    
    # Your job logic here
    df = daft.from_pydict({"name": ["Alice", "Bob"], "age": [25, 30]})
    df.show()
    return 0

Advanced Patterns

Jobs with File Dependencies

from ev import Env, Job

# Environment with local file dependencies
env = Env("3.11").pip_install(["daft==0.5.9"])

env.include([
    "utils/data_processor.py",
    "config/settings.json", 
    "data/sample.parquet"
])

job = Job("file_processing", env)

@job.main()
def main():
    import daft
    import json
    from utils.data_processor import process_data
    
    # Load configuration
    with open("config/settings.json") as f:
        config = json.load(f)
    
    # Process data file
    df = daft.read_parquet("data/sample.parquet")
    result = process_data(df, config)
    
    print(f"Processed {len(result)} rows")
    return 0

Jobs with Environment Variables

from ev import Env, Job
import os

# Environment with configuration
env = Env("3.11").pip_install(["daft==0.5.9", "requests==2.31.0"])

env.environ["API_BASE_URL"] = "https://api.example.com"
env.environ["API_KEY"] = os.environ.get("API_KEY", "default-key")
env.environ["BATCH_SIZE"] = "100"
env.environ["RETRY_COUNT"] = "3"

job = Job("api_processor", env)

@job.main()
def main():
    import os
    import requests
    import daft
    
    # Read configuration from environment
    api_url = os.environ["API_BASE_URL"]
    api_key = os.environ["API_KEY"]
    batch_size = int(os.environ["BATCH_SIZE"])
    retry_count = int(os.environ["RETRY_COUNT"])
    
    # Use configuration in job logic
    headers = {"Authorization": f"Bearer {api_key}"}
    
    all_data = []
    for i in range(0, 1000, batch_size):
        response = requests.get(
            f"{api_url}/data", 
            headers=headers,
            params={"offset": i, "limit": batch_size}
        )
        
        if response.status_code == 200:
            data = response.json()
            all_data.extend(data)
            print(f"Processed batch {i//batch_size + 1}: {len(data)} items")
        else:
            print(f"Error in batch {i//batch_size + 1}: {response.status_code}")
    
    # Convert to daft DataFrame for further processing
    if all_data:
        df = daft.from_pylist(all_data)
        df.show(5)  # Show first 5 rows
    
    return 0

Real-World Examples

Data Pipeline Job

from ev import Env, Job

# Create environment for data processing
env = Env("3.11").pip_install([
    "daft==0.5.9",
    "boto3==1.34.0",
    "pyarrow>=15.0.0"
])

env.environ["AWS_DEFAULT_REGION"] = "us-west-2"
env.include(["transforms/", "schemas/"])

job = Job("etl_pipeline", env)

@job.main()
def main():
    import daft
    from transforms.cleaner import clean_data
    from schemas.output import OUTPUT_SCHEMA
    
    # Extract: Read from S3
    df = daft.read_parquet("s3://data-lake/raw/events/*.parquet")
    
    # Transform: Clean and process data
    df_cleaned = clean_data(df)
    
    # Add computed columns
    df_processed = df_cleaned.with_columns([
        df_cleaned["timestamp"].dt.date().alias("event_date"),
        (df_cleaned["amount"] * 1.1).alias("amount_with_tax")
    ])
    
    # Filter recent data
    df_filtered = df_processed.where(
        df_processed["event_date"] >= daft.lit("2024-01-01").cast(daft.DataType.date())
    )
    
    # Load: Write to output location
    df_filtered.write_parquet(
        "s3://data-lake/processed/events/",
        partition_cols=["event_date"]
    )
    
    print(f"Processed {len(df_filtered)} records")
    return 0

API Integration Job

from ev import Env, Job

# Environment for API integration
env = Env("3.11").pip_install([
    "requests==2.31.0",
    "daft==0.5.9",
    "tenacity==8.2.0"  # For retry logic
])

env.environ["API_BASE_URL"] = "https://api.data-source.com"
env.environ["API_TIMEOUT"] = "30"
env.environ["MAX_RETRIES"] = "3"

job = Job("api_data_sync", env)

@job.main()
def main():
    import requests
    import daft
    import os
    from tenacity import retry, stop_after_attempt, wait_exponential
    
    base_url = os.environ["API_BASE_URL"]
    timeout = int(os.environ["API_TIMEOUT"])
    max_retries = int(os.environ["MAX_RETRIES"])
    
    @retry(
        stop=stop_after_attempt(max_retries),
        wait=wait_exponential(multiplier=1, min=4, max=10)
    )
    def fetch_data(endpoint):
        response = requests.get(f"{base_url}/{endpoint}", timeout=timeout)
        response.raise_for_status()
        return response.json()
    
    try:
        # Fetch data from multiple endpoints
        users = fetch_data("users")
        orders = fetch_data("orders")
        products = fetch_data("products")
        
        # Convert to daft DataFrames
        users_df = daft.from_pylist(users)
        orders_df = daft.from_pylist(orders)
        products_df = daft.from_pylist(products)
        
        # Process and merge data
        merged_df = orders_df.join(users_df, on='user_id', how='left')
        merged_df = merged_df.join(products_df, on='product_id', how='left')
        
        # Save processed data
        merged_df.write_parquet("output/processed_data.parquet")
        
        print(f"Successfully processed {len(merged_df)} records")
        return 0
        
    except Exception as e:
        print(f"Job failed: {e}")
        return 1

Job Organization and Best Practices

Modular Job Design

from ev import Env, Job

def create_data_processing_env():
    """Create reusable environment for data processing jobs."""
    env = Env("3.11").pip_install([
        "daft==0.5.9",
        "pyarrow>=15.0.0"
    ])
    
    env.environ["DAFT_RUNNER"] = "py"
    env.include(["src/", "config/"])
    
    return env

# Reusable data processing job
data_env = create_data_processing_env()
data_job = Job("data_processor", data_env)

@job.main()
def process_data():
    import daft
    from src.transforms import apply_business_rules
    from config.settings import OUTPUT_PATH
    
    # Load data
    df = daft.read_parquet("input/*.parquet")
    
    # Apply transforms
    df_processed = apply_business_rules(df)
    
    # Save results
    df_processed.write_parquet(OUTPUT_PATH)
    
    print(f"Processed {len(df_processed)} records")
    return 0

Best Practices

Environment Management

from ev import Env, Job

# Use specific package versions for reproducibility
def create_stable_env():
    """Create environment with pinned dependencies."""
    return Env("3.11").pip_install([
        "daft==0.5.9",           # Exact version
        "pyarrow==15.0.0",       # Exact version  
        "requests>=2.31.0,<3.0"  # Compatible range
    ])

# Separate environments for different use cases
data_env = Env("3.11").pip_install([
    "daft==0.5.9",
    "pyarrow>=15.0.0"
])

ml_env = Env("3.11").pip_install([
    "daft==0.5.9",
    "scikit-learn==1.3.0", 
    "numpy==1.24.3"
])

# Environment with secure defaults
secure_env = Env("3.11").pip_install([
    "daft==0.5.9",
    "cryptography==41.0.0",
    "certifi>=2023.1.1"
])

secure_env.environ["PYTHONHTTPSVERIFY"] = "1"
secure_env.environ["SSL_CERT_FILE"] = "/etc/ssl/certs/ca-certificates.crt"

Error Handling and Logging

from ev import Env, Job

env = Env("3.11").pip_install([
    "daft==0.5.9",
    "structlog==23.1.0"  # Structured logging
])

env.include(["src/", "config/"])

robust_job = Job("robust_processor", env)

@job.main()
def robust_main():
    import logging
    import structlog
    import daft
    from src.validators import validate_input, validate_output
    from src.utils import cleanup_temp_files
    
    # Configure structured logging
    structlog.configure(
        processors=[
            structlog.stdlib.filter_by_level,
            structlog.stdlib.add_log_level,
            structlog.stdlib.PositionalArgumentsFormatter(),
            structlog.processors.JSONRenderer()
        ],
        context_class=dict,
        logger_factory=structlog.stdlib.LoggerFactory(),
        wrapper_class=structlog.stdlib.BoundLogger,
        cache_logger_on_first_use=True,
    )
    
    logger = structlog.get_logger()
    
    try:
        logger.info("Job starting", job_name="robust_processor")
        
        # Input validation
        input_path = "data/input.parquet"
        if not validate_input(input_path):
            raise ValueError(f"Input validation failed for {input_path}")
        
        # Main processing with progress logging
        logger.info("Loading data", file=input_path)
        df = daft.read_parquet(input_path)
        
        logger.info("Processing data", rows=len(df), columns=len(df.column_names))
        
        # Process data
        df_processed = df.dropna()
        
        # Output validation
        if not validate_output(df_processed):
            raise ValueError("Output validation failed")
        
        # Save results
        output_path = "data/output.parquet"
        df_processed.write_parquet(output_path)
        
        logger.info(
            "Job completed successfully", 
            input_rows=len(df),
            output_rows=len(df_processed),
            output_file=output_path
        )
        
        return 0
        
    except Exception as e:
        logger.error("Job failed", error=str(e), error_type=type(e).__name__)
        return 1
        
    finally:
        # Always cleanup
        cleanup_temp_files()
        logger.info("Cleanup completed")

Next Steps

The programmatic interface is ideal for building automation, CI/CD pipelines, and complex workflows that require fine-grained control over job execution and environment configuration.