Problem: pip install ev-sdk fails with permission errorsSolutions:
Copy
Ask AI
# Option 1: Install for current user onlypip install --user ev-sdk# Option 2: Use virtual environment (recommended)python -m venv ev-envsource ev-env/bin/activate # On Windows: ev-env\Scripts\activatepip install ev-sdk# Option 3: Use sudo (not recommended)sudo pip install ev-sdk
SSL Certificate Error
Problem: SSL certificate verification fails during installationSolutions:
Copy
Ask AI
# Temporary fix (not recommended for production)pip install --trusted-host pypi.org --trusted-host files.pythonhosted.org ev-sdk# Better fix: Update certificates# On macOS/Applications/Python\ 3.x/Install\ Certificates.command# On Linuxsudo apt-get update && sudo apt-get install ca-certificates
Command Not Found
Problem: ev command not found after installationSolutions:
Copy
Ask AI
# Check if ev is in PATHwhich ev# If not found, check Python scripts directorypython -m pip show ev-sdk# Use full path or add to PATHexport PATH=$PATH:~/.local/bin# Or use python modulepython -m ev --help
Dependency Conflicts
Problem: Package conflicts with daft or other dependenciesSolutions:
Copy
Ask AI
# Create fresh environmentpython -m venv fresh-envsource fresh-env/bin/activate# Install ev-sdk first, then other packagespip install ev-sdkpip install torch torchvision # Add other deps as needed# Check for conflictspip check
# Check job logs for import errorsev jobs logs <job-id>
Solutions:
Copy
Ask AI
# Ensure dependencies are in environmentfrom ev import Env, Jobenv = Env("3.11").pip_install([ "daft==0.5.9", "torch==2.0.0", "torchvision==0.15.0", "numpy==1.21.0", "pillow==9.0.0"])job = Job("ml_job", env)@job.main()def main(): # Import inside function for distributed execution import torch import torchvision import daft # Your job logic here df = daft.from_pydict({"status": ["success"]}) df.show() return 0
Memory Errors
Problem: Job fails with out-of-memory errorsDiagnosis:
Copy
Ask AI
# Check job resource usageev jobs status <job-id>ev jobs logs <job-id>
Solutions:
Copy
Ask AI
# Use daft's lazy evaluation effectivelyfrom ev import Env, Jobenv = Env("3.11").pip_install(["daft==0.5.9"])job = Job("memory_efficient", env)@job.main()def main(): import daft # Load data lazily (no immediate materialization) df = daft.read_parquet("s3://input/data.parquet") # Chain operations without materializing df = df.where(df["status"] == "active") df = df.select("id", "name", "value") # Only needed columns # Process in streaming fashion df.write_parquet("s3://output/processed.parquet") print("Processing completed efficiently") return 0
Async/Await Errors
Problem: Job submission or monitoring fails with async errorsCommon Issues:
Not using await with async functions
Running async code in sync context
Event loop already running errors
Solutions:
Copy
Ask AI
# Correct patterns for ev SDKimport daftfrom ev import Env, Jobdef run_job_example(): """Proper job execution with ev SDK.""" # Create environment and job env = Env("3.11").pip_install(["daft==0.5.9"]) job = Job("my-job", env) @job.main() def process_data(): df = daft.read_parquet("s3://bucket/data.parquet") result = df.where(df["status"] == "active") return {"processed_rows": result.count_rows()} # Run with: ev run ./job.py return process_data# For testing locallydef test_job(): """Test job logic locally before deployment.""" # Test the job function directly result = process_data() print(f"Local test result: {result}") return result
Note: The ev SDK uses Env and Job classes directly with the @job.main() decorator pattern for job execution.
Problem: Expected results not appearing or confusion about when operations executeUnderstanding Lazy Evaluation:
Copy
Ask AI
# daft uses lazy evaluation - operations build a query plandf = daft.read_parquet("s3://data/file.parquet")df = df.where(df["status"] == "active") # No execution yetdf = df.with_column("processed", daft.lit(True)) # Still no execution# Only .collect() triggers executionresult = df.collect() # Now it executes
Common Mistakes:
Copy
Ask AI
# ❌ Wrong: Trying to get length before collect()df = daft.read_parquet("s3://path/file.parquet")# len(df) # This won't work - df is lazy# ✅ Correct: Use count_rows() or collect firstdf = daft.read_parquet("s3://path/file.parquet")row_count = df.count_rows() # Efficient count# ORmaterialized = df.collect()row_count = len(materialized) # After materialization
Column Reference Errors
Problem: Column not found or incorrect column operationsSolutions:
Copy
Ask AI
# Use column references for filteringdf = df.where(df["status"] == "active") # ✅ Correct# Check column names firstprint(df.schema) # See all columns and types# Handle missing columns gracefullyif "optional_col" in df.column_names: df = df.with_column("new_col", df["optional_col"] * 2)else: df = df.with_column("new_col", daft.lit(0))# String operationsdf = df.with_column( "upper_name", df["name"].str.upper())# Null handlingdf = df.where(df["important_field"].is_not_null())
Multimodal Data Issues
Problem: Issues with images, URLs, or complex data typesImage Processing Issues:
Copy
Ask AI
# Common image processing problems and solutions# ❌ Problem: Images fail to downloaddf = df.with_column( "image", df["image_url"].url.download() # May fail on bad URLs)# ✅ Solution: Handle errors gracefullydf = df.with_column( "image", df["image_url"].url.download(on_error="null"))# ❌ Problem: Image decode failuresdf = df.with_column( "decoded", df["image"].image.decode() # May fail on corrupted images)# ✅ Solution: Handle decode errorsdf = df.with_column( "decoded", df["image"].image.decode(on_error="null", mode=daft.ImageMode.RGB))# Filter out failed operationsdf = df.drop_null("decoded")
URL and Network Issues:
Copy
Ask AI
# Handle network timeouts and retriesdf = df.with_column( "content", df["url"].url.download( timeout_ms=30000, # 30 second timeout retry_count=3, on_error="null" ))# Check for download failuressuccessful_downloads = df.where(df["content"].is_not_null())failed_downloads = df.where(df["content"].is_null())print(f"Success: {successful_downloads.count_rows()}")print(f"Failed: {failed_downloads.count_rows()}")
UDF (User-Defined Function) Errors
Problem: Custom functions fail in distributed executionCommon UDF Issues:
Copy
Ask AI
# ❌ Problem: UDF without proper type hintsdef bad_udf(x): return x * 2# ✅ Solution: Use Python functions with daftdef good_transform(df): return df.with_column("doubled", df["value"] * 2)# ✅ For complex operations, use built-in daft functionsdf = df.with_column("length", df["text"].str.len())df = df.with_column("upper", df["text"].str.upper())# ✅ For custom logic, use apply with proper handlingdef process_text(text_series): # Custom processing logic return text_series.str.replace("old", "new")df = df.with_column("processed", process_text(df["text"]))
# test_daft_locally.pydef test_daft_pipeline(): """Test daft operations with small local data.""" import daft # Create test data test_data = { "id": [1, 2, 3, 4, 5], "status": ["active", "inactive", "active", "pending", "active"], "value": [10.5, 20.0, 30.5, 40.0, 50.5], "timestamp": ["2024-01-01", "2024-01-02", "2024-01-03", "2024-01-04", "2024-01-05"] } df = daft.from_pydict(test_data) # Test the same operations as in your job result_df = df.where(df["status"] == "active") result_df = result_df.with_column("processed", daft.lit(True)) # Collect and inspect result = result_df.collect() print(f"Test result: {result.to_pydict()}") # Verify expected behavior assert len(result) == 3 # Should have 3 active records assert all(result.to_pydict()["processed"]) # All should be marked processed print("Local test passed!")if __name__ == "__main__": test_daft_pipeline()
Community Forum: Ask questions and share solutions
Example Gallery: Browse working examples and patterns
For complex multimodal processing issues or performance optimization questions, consider sharing your specific use case and data characteristics to get more targeted assistance.