This guide helps you diagnose and fix common issues when using the ev SDK with daft (our multimodal query engine).

Installation Issues

Package Installation Fails

Problem: pip install ev-sdk fails with permission errorsSolutions:
# Option 1: Install for current user only
pip install --user ev-sdk

# Option 2: Use virtual environment (recommended)
python -m venv ev-env
source ev-env/bin/activate  # On Windows: ev-env\Scripts\activate
pip install ev-sdk

# Option 3: Use sudo (not recommended)
sudo pip install ev-sdk
Problem: SSL certificate verification fails during installationSolutions:
# Temporary fix (not recommended for production)
pip install --trusted-host pypi.org --trusted-host files.pythonhosted.org ev-sdk

# Better fix: Update certificates
# On macOS
/Applications/Python\ 3.x/Install\ Certificates.command

# On Linux
sudo apt-get update && sudo apt-get install ca-certificates
Problem: ev command not found after installationSolutions:
# Check if ev is in PATH
which ev

# If not found, check Python scripts directory
python -m pip show ev-sdk

# Use full path or add to PATH
export PATH=$PATH:~/.local/bin

# Or use python module
python -m ev --help
Problem: Package conflicts with daft or other dependenciesSolutions:
# Create fresh environment
python -m venv fresh-env
source fresh-env/bin/activate

# Install ev-sdk first, then other packages
pip install ev-sdk
pip install torch torchvision  # Add other deps as needed

# Check for conflicts
pip check

Authentication Issues

Configuration Problems

Problem: Authentication fails or token expiredDiagnosis:
# Check authentication status
ev auth status

# Check current configuration
ev config show
Solutions:
# Re-authenticate via browser
ev auth login

# Verify authentication worked
ev auth status
Problem: Commands fail due to wrong space selectionDiagnosis:
# Check current space
ev spaces current

# List available spaces
ev spaces list
Solutions:
# Switch to correct space
ev spaces use <correct-space-name>

# Or specify space in command
EV_SPACE=<space-name> ev run ./job.py

Job Execution Issues

Job Submission Problems

Problem: Job submission fails immediatelyDiagnosis:
# Check job syntax
python -m py_compile job.py

# Dry run (if available)
ev run --dry-run ./job.py

# Check job logs
ev jobs logs <job-id>
Common Causes:
  • Syntax errors in job file
  • Missing @job.main decorator
  • Invalid parameter types or missing type hints
  • Import errors
Solutions:
# Ensure proper job structure
from ev import Env, Job

# Create environment
env = Env("3.11").pip_install(["daft==0.5.9"])

# Create job
job = Job("my_function", env)

@job.main()
def main():
    return 0
Problem: Job configuration and execution issuesCommon Issues:
  • Incorrect environment setup
  • Missing dependencies
  • Configuration errors
Solutions:
# Correct job patterns
from ev import Env, Job

# Environment with proper dependencies
env = Env("3.11").pip_install([
    "daft==0.5.9",
    "requests==2.31.0",
    "numpy>=1.21.0"
])

# Environment variables for configuration
env.environ["BATCH_SIZE"] = "100"
env.environ["API_KEY"] = "your-api-key"

# Create job
job = Job("data_processor", env)

@job.main()
def main():
    import daft
    import os

    batch_size = int(os.environ.get("BATCH_SIZE", "10"))

    # Your processing logic
    df = daft.from_pydict({"data": [1, 2, 3]})
    df.show()

    return 0
CLI Usage:
# Simple job execution
ev run ./job.py

Runtime Errors

Problem: Job fails with import errorsDiagnosis:
# Check job logs for import errors
ev jobs logs <job-id>
Solutions:
# Ensure dependencies are in environment
from ev import Env, Job

env = Env("3.11").pip_install([
    "daft==0.5.9",
    "torch==2.0.0",
    "torchvision==0.15.0",
    "numpy==1.21.0",
    "pillow==9.0.0"
])

job = Job("ml_job", env)

@job.main()
def main():
    # Import inside function for distributed execution
    import torch
    import torchvision
    import daft

    # Your job logic here
    df = daft.from_pydict({"status": ["success"]})
    df.show()
    return 0
Problem: Job fails with out-of-memory errorsDiagnosis:
# Check job resource usage
ev jobs status <job-id>
ev jobs logs <job-id>
Solutions:
# Use daft's lazy evaluation effectively
from ev import Env, Job

env = Env("3.11").pip_install(["daft==0.5.9"])
job = Job("memory_efficient", env)

@job.main()
def main():
    import daft

    # Load data lazily (no immediate materialization)
    df = daft.read_parquet("s3://input/data.parquet")

    # Chain operations without materializing
    df = df.where(df["status"] == "active")
    df = df.select("id", "name", "value")  # Only needed columns

    # Process in streaming fashion
    df.write_parquet("s3://output/processed.parquet")

    print("Processing completed efficiently")
    return 0
Problem: Job submission or monitoring fails with async errorsCommon Issues:
  • Not using await with async functions
  • Running async code in sync context
  • Event loop already running errors
Solutions:
# Correct patterns for ev SDK
import daft
from ev import Env, Job

def run_job_example():
    """Proper job execution with ev SDK."""
    # Create environment and job
    env = Env("3.11").pip_install(["daft==0.5.9"])
    job = Job("my-job", env)

    @job.main()
    def process_data():
        df = daft.read_parquet("s3://bucket/data.parquet")
        result = df.where(df["status"] == "active")
        return {"processed_rows": result.count_rows()}

    # Run with: ev run ./job.py
    return process_data

# For testing locally
def test_job():
    """Test job logic locally before deployment."""
    # Test the job function directly
    result = process_data()
    print(f"Local test result: {result}")
    return result
Note: The ev SDK uses Env and Job classes directly with the @job.main() decorator pattern for job execution.

daft-Specific Issues

Data Processing Errors

Problem: Expected results not appearing or confusion about when operations executeUnderstanding Lazy Evaluation:
# daft uses lazy evaluation - operations build a query plan
df = daft.read_parquet("s3://data/file.parquet")
df = df.where(df["status"] == "active")  # No execution yet
df = df.with_column("processed", daft.lit(True))  # Still no execution

# Only .collect() triggers execution
result = df.collect()  # Now it executes
Common Mistakes:
# ❌ Wrong: Trying to get length before collect()
df = daft.read_parquet("s3://path/file.parquet")
# len(df)  # This won't work - df is lazy

# ✅ Correct: Use count_rows() or collect first
df = daft.read_parquet("s3://path/file.parquet")
row_count = df.count_rows()  # Efficient count
# OR
materialized = df.collect()
row_count = len(materialized)  # After materialization
Problem: Column not found or incorrect column operationsSolutions:
# Use column references for filtering
df = df.where(df["status"] == "active")  # ✅ Correct

# Check column names first
print(df.schema)  # See all columns and types

# Handle missing columns gracefully
if "optional_col" in df.column_names:
    df = df.with_column("new_col", df["optional_col"] * 2)
else:
    df = df.with_column("new_col", daft.lit(0))

# String operations
df = df.with_column(
    "upper_name",
    df["name"].str.upper()
)

# Null handling
df = df.where(df["important_field"].is_not_null())
Problem: Issues with images, URLs, or complex data typesImage Processing Issues:
# Common image processing problems and solutions

# ❌ Problem: Images fail to download
df = df.with_column(
    "image",
    df["image_url"].url.download()  # May fail on bad URLs
)

# ✅ Solution: Handle errors gracefully
df = df.with_column(
    "image",
    df["image_url"].url.download(on_error="null")
)

# ❌ Problem: Image decode failures
df = df.with_column(
    "decoded",
    df["image"].image.decode()  # May fail on corrupted images
)

# ✅ Solution: Handle decode errors
df = df.with_column(
    "decoded",
    df["image"].image.decode(on_error="null", mode=daft.ImageMode.RGB)
)

# Filter out failed operations
df = df.drop_null("decoded")
URL and Network Issues:
# Handle network timeouts and retries
df = df.with_column(
    "content",
    df["url"].url.download(
        timeout_ms=30000,  # 30 second timeout
        retry_count=3,
        on_error="null"
    )
)

# Check for download failures
successful_downloads = df.where(df["content"].is_not_null())
failed_downloads = df.where(df["content"].is_null())

print(f"Success: {successful_downloads.count_rows()}")
print(f"Failed: {failed_downloads.count_rows()}")
Problem: Custom functions fail in distributed executionCommon UDF Issues:
# ❌ Problem: UDF without proper type hints
def bad_udf(x):
    return x * 2

# ✅ Solution: Use Python functions with daft
def good_transform(df):
    return df.with_column("doubled", df["value"] * 2)

# ✅ For complex operations, use built-in daft functions
df = df.with_column("length", df["text"].str.len())
df = df.with_column("upper", df["text"].str.upper())

# ✅ For custom logic, use apply with proper handling
def process_text(text_series):
    # Custom processing logic
    return text_series.str.replace("old", "new")

df = df.with_column("processed", process_text(df["text"]))

Performance Issues

Problem: Jobs take too long to load dataDiagnosis:
# Check job logs for timing information
ev jobs logs <job-id>
Solutions:
# Optimize data loading with daft

# ✅ Use column selection early
df = daft.read_parquet(
    "s3://data/*.parquet",
    columns=["id", "name", "value", "timestamp"]  # Only needed columns
)

# ✅ Use pushdown filters
df = daft.read_parquet("s3://data/*.parquet") \
    .where(df["date"] >= "2024-01-01") \
    .where(df["status"] == "active")

# ✅ Use appropriate file formats
# Parquet (columnar, compressed, fast)
df = daft.read_parquet("s3://data/*.parquet")

# Delta Lake (ACID transactions, schema evolution)
df = daft.read_delta_lake("s3://delta-table/")

# ❌ Avoid CSV for large datasets
# df = daft.read_csv("s3://data/*.csv")  # Generally slower
Problem: High memory usage or OOM errorsSolutions:
# Optimize memory usage patterns

# ✅ Use streaming operations
@job.main
def streaming_job(input_path: str, output_path: str):
    # Read, process, and write in streaming fashion
    df = daft.read_parquet(input_path)

    # Chain operations (stays lazy)
    df = df.where(df["status"] == "active")
    df = df.with_column("processed_at", daft.lit(datetime.now()))

    # Write directly (streaming, no full materialization)
    df.write_parquet(output_path)

    return {"status": "completed"}

# ✅ Process in controlled batches
def batch_processing(df, batch_size=10000):
    total_rows = df.count_rows()

    for offset in range(0, total_rows, batch_size):
        batch = df.slice(offset, batch_size).collect()
        yield process_batch(batch)

# ✅ Avoid unnecessary materialization
# Bad: Multiple collects
df1 = daft.read_parquet(path1).collect()  # Materializes
df2 = daft.read_parquet(path2).collect()  # Materializes
combined = combine_dataframes(df1, df2)

# Good: Keep lazy until necessary
df1 = daft.read_parquet(path1)  # Lazy
df2 = daft.read_parquet(path2)  # Lazy
combined = df1.union(df2)       # Still lazy
result = combined.collect()     # Single materialization

Environment Issues

Dependency Management

Problem: Dependency conflicts between packagesModern Dependency Patterns:
# ✅ Current recommended pattern
from ev import Job, Env

env = Env().pip_install([
    "torch==2.0.0",
    "torchvision==0.15.0",
    "numpy==1.21.0",
    "pillow==9.0.0"
    # Note: daft is included with ev-sdk
])

# ❌ Avoid: Don't pin daft separately
# env = Env().pip_install([
#     "daft[all]==0.5.9",  # Conflicts with ev-sdk version
#     "torch==2.0.0"
# ])
Problem: GPU packages not working in job environmentSolutions:
# GPU-enabled environment
env = Env().pip_install([
    "torch==2.0.0+cu118",  # CUDA-enabled PyTorch
    "torchvision==0.15.0+cu118",
    "transformers==4.30.0"
])

# Verify GPU availability in job
@job.main()
def gpu_job():
    import torch

    if torch.cuda.is_available():
        device = "cuda"
        print(f"GPU available: {torch.cuda.get_device_name(0)}")
    else:
        device = "cpu"
        print("GPU not available, using CPU")

    return {"device": device}

Debugging Techniques

Comprehensive Logging

import logging
import time
from datetime import datetime

@job.main
def debug_daft_job(input_path: str):
    # Setup detailed logging
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)

    logger.info(f"Job started at {datetime.now()}")
    logger.info(f"Input path: {input_path}")

    start_time = time.time()

    try:
        # Log data loading
        logger.info("Loading data with daft...")
        load_start = time.time()
        df = daft.read_parquet(input_path)

        # Log schema information
        logger.info(f"Schema: {df.schema}")
        logger.info(f"Data loaded in {time.time() - load_start:.2f}s (lazy)")

        # Log row count (triggers execution)
        count_start = time.time()
        row_count = df.count_rows()
        logger.info(f"Row count: {row_count} (computed in {time.time() - count_start:.2f}s)")

        # Log processing steps
        logger.info("Applying filters...")
        filter_start = time.time()
        df = df.where(df["status"] == "active")
        active_count = df.count_rows()
        logger.info(f"Active rows: {active_count} (filtered in {time.time() - filter_start:.2f}s)")

        # Log memory usage if available
        try:
            import psutil
            memory_mb = psutil.Process().memory_info().rss / 1024 / 1024
            logger.info(f"Memory usage: {memory_mb:.1f} MB")
        except ImportError:
            pass

        total_time = time.time() - start_time
        logger.info(f"Job completed successfully in {total_time:.2f}s")

        return {
            "success": True,
            "total_rows": row_count,
            "active_rows": active_count,
            "processing_time": total_time
        }

    except Exception as e:
        logger.error(f"Job failed after {time.time() - start_time:.2f}s: {e}")
        logger.error(f"Error type: {type(e).__name__}")
        import traceback
        logger.error(f"Traceback: {traceback.format_exc()}")
        raise

Local Testing Strategies

# test_daft_locally.py
def test_daft_pipeline():
    """Test daft operations with small local data."""
    import daft

    # Create test data
    test_data = {
        "id": [1, 2, 3, 4, 5],
        "status": ["active", "inactive", "active", "pending", "active"],
        "value": [10.5, 20.0, 30.5, 40.0, 50.5],
        "timestamp": ["2024-01-01", "2024-01-02", "2024-01-03", "2024-01-04", "2024-01-05"]
    }

    df = daft.from_pydict(test_data)

    # Test the same operations as in your job
    result_df = df.where(df["status"] == "active")
    result_df = result_df.with_column("processed", daft.lit(True))

    # Collect and inspect
    result = result_df.collect()
    print(f"Test result: {result.to_pydict()}")

    # Verify expected behavior
    assert len(result) == 3  # Should have 3 active records
    assert all(result.to_pydict()["processed"])  # All should be marked processed

    print("Local test passed!")

if __name__ == "__main__":
    test_daft_pipeline()

Getting Help

Diagnostic Information

When reporting issues, include:
# ev SDK version
pip show ev-sdk

# Python version
python --version

# Operating system
uname -a  # Linux/macOS
# systeminfo | findstr /B /C:"OS Name" /C:"OS Version"  # Windows

# daft version (included with ev-sdk)
python -c "import daft; print(daft.__version__)"

# Environment packages
pip freeze

Error Patterns to Include

When seeking help, provide:
  1. Complete error message including stack trace
  2. Minimal reproducible example with sample data
  3. Environment configuration (packages, versions)
  4. Job parameters and input data characteristics
  5. Expected vs actual behavior

Community Resources

  • Documentation: https://docs.daft.ai
  • GitHub Issues: Report bugs and feature requests
  • Community Forum: Ask questions and share solutions
  • Example Gallery: Browse working examples and patterns
For complex multimodal processing issues or performance optimization questions, consider sharing your specific use case and data characteristics to get more targeted assistance.