Use the ev SDK to define and run jobs programmatically
from ev import Env
# Create a Python environment with specific version
env = Env("3.11")
from ev import Env, Job
# Create environment
env = Env("3.11").pip_install(["daft==0.5.9"])
# Create job
job = Job("data_processing", env)
@job.main()
def main():
import daft
# Your job logic here
df = daft.from_pydict({"name": ["Alice", "Bob"], "age": [25, 30]})
df.show()
return 0
from ev import Env, Job
# Environment with local file dependencies
env = Env("3.11").pip_install(["daft==0.5.9"])
env.include([
"utils/data_processor.py",
"config/settings.json",
"data/sample.parquet"
])
job = Job("file_processing", env)
@job.main()
def main():
import daft
import json
from utils.data_processor import process_data
# Load configuration
with open("config/settings.json") as f:
config = json.load(f)
# Process data file
df = daft.read_parquet("data/sample.parquet")
result = process_data(df, config)
print(f"Processed {len(result)} rows")
return 0
from ev import Env, Job
import os
# Environment with configuration
env = Env("3.11").pip_install(["daft==0.5.9", "requests==2.31.0"])
env.environ["API_BASE_URL"] = "https://api.example.com"
env.environ["API_KEY"] = os.environ.get("API_KEY", "default-key")
env.environ["BATCH_SIZE"] = "100"
env.environ["RETRY_COUNT"] = "3"
job = Job("api_processor", env)
@job.main()
def main():
import os
import requests
import daft
# Read configuration from environment
api_url = os.environ["API_BASE_URL"]
api_key = os.environ["API_KEY"]
batch_size = int(os.environ["BATCH_SIZE"])
retry_count = int(os.environ["RETRY_COUNT"])
# Use configuration in job logic
headers = {"Authorization": f"Bearer {api_key}"}
all_data = []
for i in range(0, 1000, batch_size):
response = requests.get(
f"{api_url}/data",
headers=headers,
params={"offset": i, "limit": batch_size}
)
if response.status_code == 200:
data = response.json()
all_data.extend(data)
print(f"Processed batch {i//batch_size + 1}: {len(data)} items")
else:
print(f"Error in batch {i//batch_size + 1}: {response.status_code}")
# Convert to daft DataFrame for further processing
if all_data:
df = daft.from_pylist(all_data)
df.show(5) # Show first 5 rows
return 0
from ev import Env, Job
# Create environment for data processing
env = Env("3.11").pip_install([
"daft==0.5.9",
"boto3==1.34.0",
"pyarrow>=15.0.0"
])
env.environ["AWS_DEFAULT_REGION"] = "us-west-2"
env.include(["transforms/", "schemas/"])
job = Job("etl_pipeline", env)
@job.main()
def main():
import daft
from transforms.cleaner import clean_data
from schemas.output import OUTPUT_SCHEMA
# Extract: Read from S3
df = daft.read_parquet("s3://data-lake/raw/events/*.parquet")
# Transform: Clean and process data
df_cleaned = clean_data(df)
# Add computed columns
df_processed = df_cleaned.with_columns([
df_cleaned["timestamp"].dt.date().alias("event_date"),
(df_cleaned["amount"] * 1.1).alias("amount_with_tax")
])
# Filter recent data
df_filtered = df_processed.where(
df_processed["event_date"] >= daft.lit("2024-01-01").cast(daft.DataType.date())
)
# Load: Write to output location
df_filtered.write_parquet(
"s3://data-lake/processed/events/",
partition_cols=["event_date"]
)
print(f"Processed {len(df_filtered)} records")
return 0
from ev import Env, Job
# Environment for API integration
env = Env("3.11").pip_install([
"requests==2.31.0",
"daft==0.5.9",
"tenacity==8.2.0" # For retry logic
])
env.environ["API_BASE_URL"] = "https://api.data-source.com"
env.environ["API_TIMEOUT"] = "30"
env.environ["MAX_RETRIES"] = "3"
job = Job("api_data_sync", env)
@job.main()
def main():
import requests
import daft
import os
from tenacity import retry, stop_after_attempt, wait_exponential
base_url = os.environ["API_BASE_URL"]
timeout = int(os.environ["API_TIMEOUT"])
max_retries = int(os.environ["MAX_RETRIES"])
@retry(
stop=stop_after_attempt(max_retries),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
def fetch_data(endpoint):
response = requests.get(f"{base_url}/{endpoint}", timeout=timeout)
response.raise_for_status()
return response.json()
try:
# Fetch data from multiple endpoints
users = fetch_data("users")
orders = fetch_data("orders")
products = fetch_data("products")
# Convert to daft DataFrames
users_df = daft.from_pylist(users)
orders_df = daft.from_pylist(orders)
products_df = daft.from_pylist(products)
# Process and merge data
merged_df = orders_df.join(users_df, on='user_id', how='left')
merged_df = merged_df.join(products_df, on='product_id', how='left')
# Save processed data
merged_df.write_parquet("output/processed_data.parquet")
print(f"Successfully processed {len(merged_df)} records")
return 0
except Exception as e:
print(f"Job failed: {e}")
return 1
from ev import Env, Job
def create_data_processing_env():
"""Create reusable environment for data processing jobs."""
env = Env("3.11").pip_install([
"daft==0.5.9",
"pyarrow>=15.0.0"
])
env.environ["DAFT_RUNNER"] = "py"
env.include(["src/", "config/"])
return env
# Reusable data processing job
data_env = create_data_processing_env()
data_job = Job("data_processor", data_env)
@job.main()
def process_data():
import daft
from src.transforms import apply_business_rules
from config.settings import OUTPUT_PATH
# Load data
df = daft.read_parquet("input/*.parquet")
# Apply transforms
df_processed = apply_business_rules(df)
# Save results
df_processed.write_parquet(OUTPUT_PATH)
print(f"Processed {len(df_processed)} records")
return 0
from ev import Env, Job
# Use specific package versions for reproducibility
def create_stable_env():
"""Create environment with pinned dependencies."""
return Env("3.11").pip_install([
"daft==0.5.9", # Exact version
"pyarrow==15.0.0", # Exact version
"requests>=2.31.0,<3.0" # Compatible range
])
# Separate environments for different use cases
data_env = Env("3.11").pip_install([
"daft==0.5.9",
"pyarrow>=15.0.0"
])
ml_env = Env("3.11").pip_install([
"daft==0.5.9",
"scikit-learn==1.3.0",
"numpy==1.24.3"
])
# Environment with secure defaults
secure_env = Env("3.11").pip_install([
"daft==0.5.9",
"cryptography==41.0.0",
"certifi>=2023.1.1"
])
secure_env.environ["PYTHONHTTPSVERIFY"] = "1"
secure_env.environ["SSL_CERT_FILE"] = "/etc/ssl/certs/ca-certificates.crt"
from ev import Env, Job
env = Env("3.11").pip_install([
"daft==0.5.9",
"structlog==23.1.0" # Structured logging
])
env.include(["src/", "config/"])
robust_job = Job("robust_processor", env)
@job.main()
def robust_main():
import logging
import structlog
import daft
from src.validators import validate_input, validate_output
from src.utils import cleanup_temp_files
# Configure structured logging
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.JSONRenderer()
],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
logger = structlog.get_logger()
try:
logger.info("Job starting", job_name="robust_processor")
# Input validation
input_path = "data/input.parquet"
if not validate_input(input_path):
raise ValueError(f"Input validation failed for {input_path}")
# Main processing with progress logging
logger.info("Loading data", file=input_path)
df = daft.read_parquet(input_path)
logger.info("Processing data", rows=len(df), columns=len(df.column_names))
# Process data
df_processed = df.dropna()
# Output validation
if not validate_output(df_processed):
raise ValueError("Output validation failed")
# Save results
output_path = "data/output.parquet"
df_processed.write_parquet(output_path)
logger.info(
"Job completed successfully",
input_rows=len(df),
output_rows=len(df_processed),
output_file=output_path
)
return 0
except Exception as e:
logger.error("Job failed", error=str(e), error_type=type(e).__name__)
return 1
finally:
# Always cleanup
cleanup_temp_files()
logger.info("Cleanup completed")