Basic Job Definition
Creating an Environment
Copy
Ask AI
from ev import Env
# Create a Python environment with specific version
env = Env("3.11")
Defining Jobs
Copy
Ask AI
from ev import Env, Job
# Create environment
env = Env("3.11").pip_install(["daft==0.5.9"])
# Create job
job = Job("data_processing", env)
@job.main()
def main():
import daft
# Your job logic here
df = daft.from_pydict({"name": ["Alice", "Bob"], "age": [25, 30]})
df.show()
return 0
Advanced Patterns
Jobs with File Dependencies
Copy
Ask AI
from ev import Env, Job
# Environment with local file dependencies
env = Env("3.11").pip_install(["daft==0.5.9"])
env.include([
"utils/data_processor.py",
"config/settings.json",
"data/sample.parquet"
])
job = Job("file_processing", env)
@job.main()
def main():
import daft
import json
from utils.data_processor import process_data
# Load configuration
with open("config/settings.json") as f:
config = json.load(f)
# Process data file
df = daft.read_parquet("data/sample.parquet")
result = process_data(df, config)
print(f"Processed {len(result)} rows")
return 0
Jobs with Environment Variables
Copy
Ask AI
from ev import Env, Job
import os
# Environment with configuration
env = Env("3.11").pip_install(["daft==0.5.9", "requests==2.31.0"])
env.environ["API_BASE_URL"] = "https://api.example.com"
env.environ["API_KEY"] = os.environ.get("API_KEY", "default-key")
env.environ["BATCH_SIZE"] = "100"
env.environ["RETRY_COUNT"] = "3"
job = Job("api_processor", env)
@job.main()
def main():
import os
import requests
import daft
# Read configuration from environment
api_url = os.environ["API_BASE_URL"]
api_key = os.environ["API_KEY"]
batch_size = int(os.environ["BATCH_SIZE"])
retry_count = int(os.environ["RETRY_COUNT"])
# Use configuration in job logic
headers = {"Authorization": f"Bearer {api_key}"}
all_data = []
for i in range(0, 1000, batch_size):
response = requests.get(
f"{api_url}/data",
headers=headers,
params={"offset": i, "limit": batch_size}
)
if response.status_code == 200:
data = response.json()
all_data.extend(data)
print(f"Processed batch {i//batch_size + 1}: {len(data)} items")
else:
print(f"Error in batch {i//batch_size + 1}: {response.status_code}")
# Convert to daft DataFrame for further processing
if all_data:
df = daft.from_pylist(all_data)
df.show(5) # Show first 5 rows
return 0
Real-World Examples
Data Pipeline Job
Copy
Ask AI
from ev import Env, Job
# Create environment for data processing
env = Env("3.11").pip_install([
"daft==0.5.9",
"boto3==1.34.0",
"pyarrow>=15.0.0"
])
env.environ["AWS_DEFAULT_REGION"] = "us-west-2"
env.include(["transforms/", "schemas/"])
job = Job("etl_pipeline", env)
@job.main()
def main():
import daft
from transforms.cleaner import clean_data
from schemas.output import OUTPUT_SCHEMA
# Extract: Read from S3
df = daft.read_parquet("s3://data-lake/raw/events/*.parquet")
# Transform: Clean and process data
df_cleaned = clean_data(df)
# Add computed columns
df_processed = df_cleaned.with_columns([
df_cleaned["timestamp"].dt.date().alias("event_date"),
(df_cleaned["amount"] * 1.1).alias("amount_with_tax")
])
# Filter recent data
df_filtered = df_processed.where(
df_processed["event_date"] >= daft.lit("2024-01-01").cast(daft.DataType.date())
)
# Load: Write to output location
df_filtered.write_parquet(
"s3://data-lake/processed/events/",
partition_cols=["event_date"]
)
print(f"Processed {len(df_filtered)} records")
return 0
API Integration Job
Copy
Ask AI
from ev import Env, Job
# Environment for API integration
env = Env("3.11").pip_install([
"requests==2.31.0",
"daft==0.5.9",
"tenacity==8.2.0" # For retry logic
])
env.environ["API_BASE_URL"] = "https://api.data-source.com"
env.environ["API_TIMEOUT"] = "30"
env.environ["MAX_RETRIES"] = "3"
job = Job("api_data_sync", env)
@job.main()
def main():
import requests
import daft
import os
from tenacity import retry, stop_after_attempt, wait_exponential
base_url = os.environ["API_BASE_URL"]
timeout = int(os.environ["API_TIMEOUT"])
max_retries = int(os.environ["MAX_RETRIES"])
@retry(
stop=stop_after_attempt(max_retries),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
def fetch_data(endpoint):
response = requests.get(f"{base_url}/{endpoint}", timeout=timeout)
response.raise_for_status()
return response.json()
try:
# Fetch data from multiple endpoints
users = fetch_data("users")
orders = fetch_data("orders")
products = fetch_data("products")
# Convert to daft DataFrames
users_df = daft.from_pylist(users)
orders_df = daft.from_pylist(orders)
products_df = daft.from_pylist(products)
# Process and merge data
merged_df = orders_df.join(users_df, on='user_id', how='left')
merged_df = merged_df.join(products_df, on='product_id', how='left')
# Save processed data
merged_df.write_parquet("output/processed_data.parquet")
print(f"Successfully processed {len(merged_df)} records")
return 0
except Exception as e:
print(f"Job failed: {e}")
return 1
Job Organization and Best Practices
Modular Job Design
Copy
Ask AI
from ev import Env, Job
def create_data_processing_env():
"""Create reusable environment for data processing jobs."""
env = Env("3.11").pip_install([
"daft==0.5.9",
"pyarrow>=15.0.0"
])
env.environ["DAFT_RUNNER"] = "py"
env.include(["src/", "config/"])
return env
# Reusable data processing job
data_env = create_data_processing_env()
data_job = Job("data_processor", data_env)
@data_job.main()
def process_data():
import daft
from src.transforms import apply_business_rules
from config.settings import OUTPUT_PATH
# Load data
df = daft.read_parquet("input/*.parquet")
# Apply transforms
df_processed = apply_business_rules(df)
# Save results
df_processed.write_parquet(OUTPUT_PATH)
print(f"Processed {len(df_processed)} records")
return 0
Best Practices
Environment Management
Copy
Ask AI
from ev import Env, Job
# Use specific package versions for reproducibility
def create_stable_env():
"""Create environment with pinned dependencies."""
return Env("3.11").pip_install([
"daft==0.5.9", # Exact version
"pyarrow==15.0.0", # Exact version
"requests>=2.31.0,<3.0" # Compatible range
])
# Separate environments for different use cases
data_env = Env("3.11").pip_install([
"daft==0.5.9",
"pyarrow>=15.0.0"
])
ml_env = Env("3.11").pip_install([
"daft==0.5.9",
"scikit-learn==1.3.0",
"numpy==1.24.3"
])
# Environment with secure defaults
secure_env = Env("3.11").pip_install([
"daft==0.5.9",
"cryptography==41.0.0",
"certifi>=2023.1.1"
])
secure_env.environ["PYTHONHTTPSVERIFY"] = "1"
secure_env.environ["SSL_CERT_FILE"] = "/etc/ssl/certs/ca-certificates.crt"
Error Handling and Logging
Copy
Ask AI
from ev import Env, Job
env = Env("3.11").pip_install([
"daft==0.5.9",
"structlog==23.1.0" # Structured logging
])
env.include(["src/", "config/"])
robust_job = Job("robust_processor", env)
@robust_job.main()
def robust_main():
import logging
import structlog
import daft
from src.validators import validate_input, validate_output
from src.utils import cleanup_temp_files
# Configure structured logging
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.JSONRenderer()
],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
logger = structlog.get_logger()
try:
logger.info("Job starting", job_name="robust_processor")
# Input validation
input_path = "data/input.parquet"
if not validate_input(input_path):
raise ValueError(f"Input validation failed for {input_path}")
# Main processing with progress logging
logger.info("Loading data", file=input_path)
df = daft.read_parquet(input_path)
logger.info("Processing data", rows=len(df), columns=len(df.column_names))
# Process data
df_processed = df.dropna()
# Output validation
if not validate_output(df_processed):
raise ValueError("Output validation failed")
# Save results
output_path = "data/output.parquet"
df_processed.write_parquet(output_path)
logger.info(
"Job completed successfully",
input_rows=len(df),
output_rows=len(df_processed),
output_file=output_path
)
return 0
except Exception as e:
logger.error("Job failed", error=str(e), error_type=type(e).__name__)
return 1
finally:
# Always cleanup
cleanup_temp_files()
logger.info("Cleanup completed")
Next Steps
CLI Usage
Learn the command-line interface for interactive use
Examples
See complete job examples in action
Core Concepts
Understand environments, jobs, and execution models
Best Practices
Advanced patterns and optimization techniques
The programmatic interface is ideal for building automation, CI/CD pipelines, and complex workflows that require fine-grained control over job execution and environment configuration.