Workflows with the Practicus AI Airflow Add-on
Practicus AI integrates seamlessly with Airflow to orchestrate workflows. By leveraging Airflow as an add-on, you can:
- Define complex directed acyclic graphs (DAGs) to manage task order and parallelism.
- Schedule tasks to run at specific times or intervals.
- Use Airflow's UI and ecosystem for monitoring and managing workflows.
For more details on Airflow concepts, see the official Airflow documentation.
Creating Tasks, DAGs, and Supporting Files
When building workflows, start by designing your tasks as independently executable units. Each task runs on its own Practicus AI Worker. You can group related actions into a single task for simplicity. Practicus AI provides utilities to generate starter files and DAG definitions.
Example DAG Flow:
This means:
my_1st_task
runs first.- On success,
my_2nd_task
andmy_3rd_task
run in parallel. - After both complete,
my_4th_task
runs.
import practicuscore as prt
# Define a DAG flow with 4 tasks, where two run in parallel.
dag_flow = "my_1st_task >> [my_2nd_task, my_3rd_task] >> my_4th_task"
# Default worker configuration
default_worker_config = prt.WorkerConfig(
worker_image="practicus",
worker_size="X-Small",
)
# Custom configuration for the second task
my_2nd_task_worker_config = prt.WorkerConfig(
worker_image="practicus-genai",
worker_size="Small",
)
custom_worker_configs = [
('my_2nd_task', my_2nd_task_worker_config),
]
dag_key = "my_workflow"
schedule_interval = None # Set a cron string or '@daily' as needed
retries = 0 # 0 for dev/test, increase for production
prt.workflows.generate_files(
dag_key=dag_key,
dag_flow=dag_flow,
files_path=None, # Current dir
default_worker_config=default_worker_config,
custom_worker_configs=custom_worker_configs,
save_credentials=True,
overwrite_existing=False,
schedule_interval=schedule_interval,
retries=retries,
)
Understanding the Generated Files
Task Python Scripts (e.g., my_1st_task.py
):
- Contain the logic for each task.
- Each runs in its own isolated Worker.
default_worker.json
:
- Stores default worker configuration (image, size, credentials).
- Credentials can be set globally by an admin or passed at runtime.
my_2nd_task_worker.json
:
- Overrides the worker config for
my_2nd_task
.
my_workflow_dag.py
:
- The Airflow DAG file that ties tasks together.
Test Your Tasks Before Deploying
It's wise to test tasks locally or via Practicus AI before deploying to Airflow. You can intentionally insert errors in your task code to verify error handling.
For more details on tasks, see the tasks sample notebook.
successful_task_workers, failed_task_workers = prt.workflows.test_tasks(
dag_flow=dag_flow,
task_list=None, # Test all tasks in the DAG
files_path=None, # Current dir
default_worker_config=default_worker_config,
custom_worker_configs=custom_worker_configs,
terminate_on_success=True, # Automatically terminate successful tasks
terminate_on_failed=False, # Keep failed tasks alive for debugging
)
# Investigate successful or failed tasks
for worker in successful_task_workers:
# If you had terminate_on_success=False, you could open the notebook to review logs.
print(f"Opening notebook on successful task worker: {worker.name}")
worker.open_notebook()
for worker in failed_task_workers:
print(f"Opening notebook on failed task worker: {worker.name}")
worker.open_notebook()
# After analysis, terminate remaining workers
for worker in successful_task_workers:
print(f"Terminating successful task worker: {worker.name}")
worker.terminate()
for worker in failed_task_workers:
print(f"Terminating failed task worker: {worker.name}")
worker.terminate()
(Optional) Locating an Airflow Service Add-on
If you don't know your Airflow service key, you can list available add-ons and identify it. For instructions on add-ons and their usage, see the Practicus AI documentation.
import practicuscore as prt
region = prt.get_default_region()
addons_df = region.addon_list.to_pandas()
print("Add-on services accessible to you:")
display(addons_df)
airflow_services_df = addons_df[addons_df["service_type"] == "Airflow"]
print("Airflow services you can access:")
display(airflow_services_df)
if airflow_services_df.empty:
raise RuntimeError("No Airflow service access. Contact your admin.")
service_key = airflow_services_df.iloc[0]["key"]
service_url = airflow_services_df.iloc[0]["url"]
print("Selected Airflow Service:")
print(f"- Service Key: {service_key}")
print(f"- Service URL: {service_url}")
# Alternatively, you can set service_key manually if you know it:
# service_key = 'my-airflow-service-key'
Deploying the Workflow to Airflow
Once your tasks, DAG, and configurations are ready and tested, deploy them to Airflow. This pushes your code and configuration to the underlying version control system used by the Airflow service, making the workflow visible and runnable via the Airflow UI.
prt.workflows.deploy(
service_key=service_key,
dag_key=dag_key,
files_path=None, # Current directory
)
Additional Notes and Customizations
- Running Shell Scripts: Tasks don't have to be Python files;
.sh
scripts also work. - Manual Worker Config Files: Instead of passing parameters to
generate_files()
ordeploy()
, you can manually manage the.json
worker config files. - Credential Management: For security, consider storing credentials globally at the Airflow environment level. Avoid embedding sensitive info in local files.
- Multi-Region Deployments: You can create workflows that run tasks in different regions. Just ensure the worker config
.json
files point to the correctservice_url
,email
, andrefresh_key
. - Customizing the DAG: Edit the generated DAG file to change default parameters, logging settings, or to add custom logic. For complex scenarios (e.g., different logging strategies), you can customize the
run_airflow_task
calls as shown in the example snippet.
Previous: Task Basics | Next: AI Studio > Generating Wokflows