Skip to main content
Article
prefectworkflow-orchestrationpythondata-engineeringmlopsschedulingobservabilityetl

Orchestrate Python Functions with Prefect in 15 Minutes

Turn any Python function into an observable, resilient workflow with Prefect. Use simple decorators to define flows and tasks, then visualize execution, logs, and status in the local Prefect UI. Perfect for data pipelines and ML models.

beginner15 min4 steps
The play
  1. Install Prefect
    Install the Prefect library using pip. This gives you the core engine for defining workflows and a command-line interface (CLI) for observing them.
  2. Define a Flow and Tasks
    Import `flow` and `task` from Prefect. Use the `@task` decorator for individual units of work and `@flow` to orchestrate them. This structure makes your code modular and observable.
  3. Run the Flow
    To execute your workflow, simply call the flow function. Prefect automatically captures its execution details. You can run it directly from a Python script.
  4. Visualize in the Prefect UI
    Start the local Prefect UI server to visualize your workflow. Run your Python script in another terminal. Open http://127.0.0.1:4200 in your browser to see a dashboard with your flow runs, task states, and logs.
Starter code
import httpx
from prefect import flow, task
from typing import List

# Save this code as 'my_prefect_flow.py'

@task(retries=2, retry_delay_seconds=5)
def fetch_pokemon_data(pokemon_id: int) -> dict:
    """Task to fetch data for a single Pokémon from the PokéAPI."""
    url = f"https://pokeapi.co/api/v2/pokemon/{pokemon_id}"
    with httpx.Client() as client:
        response = client.get(url)
        response.raise_for_status()
        return response.json()

@task
def get_pokemon_names(pokemon_list: List[dict]) -> List[str]:
    """Task to extract just the names from the Pokémon data."""
    return [p['name'] for p in pokemon_list]

@flow(log_prints=True)
def pokemon_pipeline(pokemon_ids: List[int] = [1, 4, 7]):
    """A flow that fetches data for multiple Pokémon and prints their names."""
    print(f"Fetching data for Pokémon IDs: {pokemon_ids}")
    
    # Prefect runs these task calls concurrently by default
    pokemon_data = [fetch_pokemon_data.submit(pid) for pid in pokemon_ids]
    
    # .result() waits for the submitted tasks to complete
    pokemon_names = get_pokemon_names([p.result() for p in pokemon_data])
    
    print(f"Successfully fetched Pokémon: {pokemon_names}")

if __name__ == "__main__":
    # To run this flow:
    # 1. In one terminal, run: prefect server start
    # 2. In another terminal, run: python my_prefect_flow.py
    # 3. Open http://127.0.0.1:4200 in your browser to see the results.
    pokemon_pipeline()
Orchestrate Python Functions with Prefect in 15 Minutes — Action Pack