Jobs are the fundamental unit of execution on the Eventual platform. They’re procedures composed of daft (our multimodal query engine) operations that automatically handle scaling, retries, and fault tolerance, so you can focus on writing your business logic without worrying about distributed systems complexity.
Jobs automatically handle scaling, retries, and fault tolerance - no distributed systems expertise required!
from ev import Env, Job# Create an environmentenv = Env("3.11").pip_install(["daft==0.5.9"])# Create a job instancejob = Job("my_job", env)# Define the main function@job.main()def my_function(): # Your business logic here import daft df = daft.from_pydict({"message": ["Hello, Eventual!"]}) df.show() return 0
Let’s create a simple job that processes some data:
Copy
Ask AI
from ev import Env, Job# Create environment with daftenv = Env("3.11").pip_install(["daft==0.5.9"])# Create jobjob = Job("data_processor", env)@job.main()def main(): """ A job that processes data using daft. """ import daft # Create a simple dataset using daft df = daft.from_pydict({ "message": ["Hello Eventual", "Processing data", "With daft"], "id": [1, 2, 3] }) # Process the data with daft operations df = df.with_column( "processed_message", df["message"].str.upper() ) # Show the results df.show() print(f"Processed {len(df)} messages") return 0
from ev import Env, Job# Create environment with dependenciesenv = Env("3.11").pip_install([ "daft==0.5.9", "requests==2.31.0"])env.environ["API_KEY"] = "your-api-key"job = Job("api_fetcher", env)@job.main()def main(): """Fetch data from API and process it.""" import requests import daft import os # Fetch data api_key = os.environ["API_KEY"] response = requests.get( "https://api.example.com/data", headers={"Authorization": f"Bearer {api_key}"} ) response.raise_for_status() data = response.json() # Process with daft df = daft.from_pylist(data) processed_df = df.where(df["valid"] == True) processed_df.show() print(f"Processed {len(processed_df)} valid items out of {len(df)} total") return 0