Introduction

Jobs are the fundamental unit of execution on the Eventual platform. They’re procedures composed of daft (our multimodal query engine) operations that automatically handle scaling, retries, and fault tolerance, so you can focus on writing your business logic without worrying about distributed systems complexity.
Jobs automatically handle scaling, retries, and fault tolerance - no distributed systems expertise required!

Basic Job Structure

Every Eventual job follows a simple pattern:
from ev import Env, Job

# Create an environment
env = Env("3.11").pip_install(["daft==0.5.9"])

# Create a job instance
job = Job("my_job", env)

# Define the main function
@job.main()
def my_function():
    # Your business logic here
    import daft
    df = daft.from_pydict({"message": ["Hello, Eventual!"]})
    df.show()
    return 0

Your First Job

Let’s create a simple job that processes some data:
from ev import Env, Job

# Create environment with daft
env = Env("3.11").pip_install(["daft==0.5.9"])

# Create job
job = Job("data_processor", env)

@job.main()
def main():
    """
    A job that processes data using daft.
    """
    import daft
    
    # Create a simple dataset using daft
    df = daft.from_pydict({
        "message": ["Hello Eventual", "Processing data", "With daft"],
        "id": [1, 2, 3]
    })
    
    # Process the data with daft operations
    df = df.with_column(
        "processed_message",
        df["message"].str.upper()
    )
    
    # Show the results
    df.show()
    print(f"Processed {len(df)} messages")
    return 0

Running Your Job

There are two ways to run your job:

1. Using the CLI

ev run ./hello_job.py

2. Using Python (Programmatic)

You can also define and run jobs programmatically:
from ev import Env, Job

def create_and_run_hello():
    # Create environment
    env = Env("3.11")
    
    # Create job
    job = Job("hello_job", env)
    
    @job.main()
    def main():
        message = "Hello, World! Welcome to Eventual."
        print(message)
        return 0
    
    return job

# Create the job
hello_job = create_and_run_hello()

Job Configuration

Jobs can be configured with environment variables and dependencies:
from ev import Env, Job

# Create environment with configuration
env = Env("3.11").pip_install([
    "daft==0.5.9",
    "requests==2.31.0"
])

# Set environment variables
env.environ["LOG_LEVEL"] = "INFO"
env.environ["API_BASE_URL"] = "https://api.example.com"

# Include local files
env.include(["utils.py", "config/"])

# Create job
job = Job("configured_job", env)

@job.main()
def main():
    import os
    import daft
    import requests
    
    # Access environment variables
    log_level = os.environ.get("LOG_LEVEL", "DEBUG")
    api_url = os.environ.get("API_BASE_URL")
    
    print(f"Running with log level: {log_level}")
    print(f"API URL: {api_url}")
    
    # Process some data
    df = daft.from_pydict({
        "status": ["active", "inactive", "pending"],
        "count": [10, 5, 3]
    })
    
    df.show()
    return 0

Monitoring Your Job

Once your job is running, you can monitor its progress:
ev jobs list

Error Handling

Jobs automatically handle common errors, but you can add custom error handling:
from ev import Env, Job

# Create environment
env = Env("3.11").pip_install(["daft==0.5.9"])

job = Job("robust_job", env)

@job.main()
def main():
    """A job with error handling."""
    import daft
    
    try:
        # Your main logic
        df = daft.from_pydict({
            "data": ["item1", "item2", "item3"],
            "value": [1, 2, 3]
        })
        
        # Process data
        df = df.with_column("processed", df["data"].str.upper())
        df.show()
        
        print("Processing completed successfully")
        return 0
        
    except Exception as e:
        print(f"Error during processing: {e}")
        return 1

Best Practices

Common Patterns

Data Processing Job

from ev import Env, Job

# Create environment with daft
env = Env("3.11").pip_install(["daft==0.5.9"])

job = Job("csv_processor", env)

@job.main()
def main():
    """Process CSV data with daft."""
    import daft
    
    # Read data
    df = daft.read_csv("input/data.csv")
    
    # Process
    df = df.where(df["status"] == "active")
    df = df.with_column("processed_at", daft.lit("2024-01-01"))
    
    # Write results
    df.write_parquet("output/processed_data.parquet")
    
    print(f"Processed {len(df)} rows")
    return 0

API Integration Job

from ev import Env, Job

# Create environment with dependencies
env = Env("3.11").pip_install([
    "daft==0.5.9",
    "requests==2.31.0"
])

env.environ["API_KEY"] = "your-api-key"

job = Job("api_fetcher", env)

@job.main()
def main():
    """Fetch data from API and process it."""
    import requests
    import daft
    import os
    
    # Fetch data
    api_key = os.environ["API_KEY"]
    response = requests.get(
        "https://api.example.com/data", 
        headers={"Authorization": f"Bearer {api_key}"}
    )
    response.raise_for_status()
    
    data = response.json()
    
    # Process with daft
    df = daft.from_pylist(data)
    processed_df = df.where(df["valid"] == True)
    
    processed_df.show()
    
    print(f"Processed {len(processed_df)} valid items out of {len(df)} total")
    return 0

Next Steps

Now that you can create and run jobs, explore more advanced features:
Ready to process real data? Check out our image processing example to see how to handle multimodal data at scale.