Customizing Task Parameters
This example demonstrates how to customize and pass parameters to a Practicus AI worker. You can do so by defining environment variables in the WorkerConfig
object.
Why Environment Variables?
Environment variables are often the easiest way to inject small pieces of configuration or parameters into a script. Practicus AI automatically sets these environment variables in the worker's environment when your script runs.
Basic Example
In this simple example, we show how to set environment variables in WorkerConfig
and then access them in a Python script.
# task.py
import os
print("First Param:", os.environ["MY_FIRST_PARAM"])
print("Second Param:", os.environ["MY_SECOND_PARAM"])
Defining WorkerConfig
with Environment Variables
Below, we create a WorkerConfig
object. Notice how we specify environment variables in the env_variables
dictionary. Practicus AI will ensure these variables are set when task.py
is executed.
import practicuscore as prt
worker_config = prt.WorkerConfig(
worker_image="practicus", # The base container image
worker_size="X-Small", # Size configuration
env_variables={"MY_FIRST_PARAM": "VALUE1", "MY_SECOND_PARAM": 123},
)
worker, success = prt.run_task(
file_name="task.py", # The name of the script to run
worker_config=worker_config,
)
print("Task finished with status:", success)
When this code runs, it will print out the values of MY_FIRST_PARAM
and MY_SECOND_PARAM
from within task.py
.
Airflow Integration
To integrate with Practicus AI Airflow, you can utilize the same approach. Airflow tasks can inject environment variables by writing out a WorkerConfig
JSON file that Practicus AI can pick up.
For example:
1. Define your worker_config
in Python.
2. Serialize it to JSON.
3. Store it in a file named after your task (e.g., task_worker.json
if your script is task.py
).
Example: Writing task_worker.json
worker_config_json = worker_config.model_dump_json(exclude_none=True)
with open("task_worker.json", "wt") as f:
f.write(worker_config_json)
print("Generated worker_config JSON:\n", worker_config_json)
An example task_worker.json
might look like:
{
"worker_image": "practicus",
"worker_size": "X-Small",
"additional_params": "eyJlbnZfdmFyaWFibGVzIjogeyJNWV9GSVJTVF9QQVJBTSI6ICJWQUxVRTEiLCAiTVlfU0VDT05EX1BBUkFNIjogMTIzfX0="
}
Note that additional_params
is base64-encoded data containing:
Summary
- Defining environment variables: Use
env_variables
withinWorkerConfig
to inject parameters. - Accessing parameters: In your Python script (e.g.,
task.py
), read them fromos.environ
. - Airflow: Write out the
worker_config
to a JSON file. Practicus AI automatically picks that up.
By following these steps, you can effectively pass custom parameters and configurations to your Practicus AI tasks, making your data pipelines more dynamic and flexible.
Supplementary Files
run_task_safe/default_worker.json
run_task_safe/run_task_safe_dag.py
# Airflow DAG run_task_safe_dag.py created using Practicus AI
import logging
from datetime import datetime
from airflow.decorators import dag, task
import practicuscore as prt
import os
# Constructing a Unique DAG ID
# ----------------------------
# We strongly recommend using a DAG ID format like:
# <dag_key>.<username>
# This approach ensures the username effectively serves as a namespace,
# preventing name collisions in Airflow.
# Let's locate dag_key and username. This runs in Airflow 'after' deployment.
dag_key, username = prt.workflows.get_dag_info(__file__)
dag_id = f"{dag_key}.{username}"
def convert_local(d):
logging.debug("Converting {}".format(d))
return d.in_timezone("Europe/Istanbul")
def _read_cloud_worker_config_from_file(worker_config_json_path: str) -> dict:
cloud_worker_conf_dict = {} # type: ignore[var-annotated]
try:
if os.path.exists(worker_config_json_path):
prt.logger.info(f"Reading Worker config from {worker_config_json_path}")
with open(worker_config_json_path, "rt") as f:
content = f.read()
import json
cloud_worker_conf_dict = json.loads(content)
else:
prt.logger.info(
f"Worker configuration file {worker_config_json_path} not found. "
f"Airflow DAG must provide settings via params passed from run DAG UI or global Airflow configuration."
)
except:
prt.logger.error(f"Could not parse Worker config from file {worker_config_json_path}", exc_info=True)
return cloud_worker_conf_dict
def _get_worker_config_dict(**kwargs) -> dict:
dag = kwargs["dag"]
dag_folder = dag.folder
final_dict = {}
worker_config_json_path = os.path.join(dag_folder, "default_worker.json")
prt.logger.debug(f"worker_config_json_path : {worker_config_json_path}")
if os.path.exists(worker_config_json_path):
worker_dict_from_json_file = _read_cloud_worker_config_from_file(worker_config_json_path)
try:
for key, value in worker_dict_from_json_file.items():
if value not in [None, "", "None"]:
prt.logger.info(
f"Updating Worker configuration key '{key}' using "
f"task specific worker configuration file: default_worker.json"
)
final_dict[key] = value
except:
prt.logger.error(f"Could not parse param dictionary from {worker_config_json_path}", exc_info=True)
return final_dict
def _cleanup(**kwargs):
from datetime import datetime, timezone
timeout_seconds = 59 # Threshold for considering a Worker as stuck
_worker_config_dict = _get_worker_config_dict(**kwargs)
region = prt.regions.region_factory(_worker_config_dict)
prt.logger.info(f"Found region : {str(region)}")
# Iterate through all Workers in the region
for worker in region.worker_list:
time_since_creation = int((datetime.now(timezone.utc) - worker.creation_time).total_seconds())
prt.logger.info(
f"{worker.name} started {time_since_creation} seconds ago and is currently in '{worker.status}' state."
)
if worker.status == "Provisioning" and time_since_creation > timeout_seconds:
prt.logger.warning(
f"-> Terminating {worker.name} — stuck in 'Provisioning' for more than {timeout_seconds} seconds."
)
worker.terminate()
# Define other DAG properties like schedule, retries etc.
@dag(
dag_id=dag_id,
schedule_interval=None,
start_date=datetime(2025, 5, 9, 7, 0),
default_args={
"owner": username,
"retries": 0,
},
catchup=False,
user_defined_macros={"local_tz": convert_local},
max_active_runs=1,
params=prt.workflows.get_airflow_params(),
)
def generate_dag():
# The `task_id` must match the task file (e.g., `my_1st_task.py or my_1st_task.sh`)
# located in the same folder as this DAG file.
def run_with_dynamic_param(**kwargs):
from practicuscore.exceptions import TaskError
try:
return prt.workflows.run_airflow_task(**kwargs)
except TaskError as ex:
_cleanup(**kwargs)
raise ex
task1 = task(run_with_dynamic_param, task_id="task1")()
# Define how your task will flow
task1
generate_dag()
run_task_safe/task1.py
import practicuscore as prt
def main():
prt.logger.info("Running task 1 is completed")
if __name__ == "__main__":
main()
task_runnner_app/apis/sample_callback.py
# Sample callback APIs, we will use them for our tests.
from pydantic import BaseModel
import practicuscore as prt
from starlette.requests import Request
class TaskRunnerCallbackRequest(BaseModel):
task_name: str
"""Model Deployment Task"""
task_id: str
"""Unique task ID"""
success: bool
"""True if the task completed successfully, False otherwise"""
logs: str
"""Any info or error message"""
model_config = {
"use_attribute_docstrings": True,
"json_schema_extra": {
"examples": [
{
"task_name": "deploy_model",
"task_id": "123e4567-e89b-12d3-a456-426614174000",
"success": True,
"message": "Model deployed successfully",
},
{
"task_name": "cleanup_task",
"task_id": "123e4567-e89b-12d3-a456-426614174001",
"success": False,
"message": "Error occurred during cleanup",
},
]
},
}
class TaskRunnerCallbackResponse(BaseModel):
success: bool
"""True or False"""
message: str
"""Any info or error message"""
model_config = {
"use_attribute_docstrings": True,
"json_schema_extra": {
"examples": [
{"success": True, "message": "Successfully finished"},
{"success": False, "message": "Errored occurred"},
]
},
}
api_spec = prt.APISpec(
disable_authentication=True,
)
@prt.apps.api(path="/sample-task-runner-callback", spec=api_spec)
async def task_runner_callback(payload: TaskRunnerCallbackRequest, **kwargs) -> TaskRunnerCallbackResponse:
if payload.success:
prt.logger.info(f"Task with name : {payload.task_name} task_id : finished successfully")
else:
prt.logger.error(
f"Task with name : {payload.task_name} finished with error\npayload : {payload.model_dump_json(indent=2)}"
)
return TaskRunnerCallbackResponse(success=True, message="ok")
@prt.apps.api(path="/sample-task-runner-callback2", spec=api_spec)
async def task_runner_callback2(request: Request, **kwargs) -> TaskRunnerCallbackResponse:
prt.logger.info(f"request: {request}")
body: dict = await request.json()
task_name = body.get("task_name")
task_id = body.get("task_id")
success = body.get("success")
logs = body.get("logs")
if success:
prt.logger.info(f"Task with name : {task_name} task_id : {task_id} finished successfully")
else:
prt.logger.error(f"Task with name : {task_name} task_id : {task_id} finished with error\nlogs : {logs}")
return TaskRunnerCallbackResponse(success=True, message="ok")
task_runnner_app/apis/task_runner.py
import os
import threading
import requests
from pydantic import BaseModel
import practicuscore as prt
class TaskRunnerRequest(BaseModel):
task_id: str
"""Unique task id (1222345, 23erd-rt56-tty67-34er5 ...)"""
task_file_name: str
"""deployer_task.py, task1.sh... (*.py, *.sh files) """
worker_config: prt.WorkerConfig
"""WorkerConfig with or without a GitConfig"""
callback_url: str | None = None
"""Callback URL to be called after the task finishes"""
callback_url_token: str | None = None
"""Optional Callback URL security (bearer) token"""
terminate_on_completion: bool = True
"""Determines whether the task runner should terminate on completion"""
model_config = {
"use_attribute_docstrings": True,
"json_schema_extra": {
"examples": [
{
"task_id": "23erd-rt56",
"task_file_name": "deployer_task.py",
"worker_config": {
"worker_size": "small",
"personal_secrets": ["git_secret"],
"git_configs": [
{"remote_url": "https://github.com/example/repo.git", "secret_name": "git_secret"}
],
},
"callback_url": "https://example.com/callback",
"terminate_on_completion": True,
}
]
},
}
class TaskRunnerResponse(BaseModel):
success: bool
"""True or False"""
message: str
"""Any info or error message"""
model_config = {
"use_attribute_docstrings": True,
"json_schema_extra": {
"examples": [
{"success": True, "message": "Successfully finished"},
{"success": False, "message": "Errored occurred"},
]
},
}
@prt.apps.api(path="/task-runner")
async def task_runner(payload: TaskRunnerRequest, **kwargs) -> TaskRunnerResponse:
"""This API starts task async and returns started message"""
prt.logger.info("Started running task...")
thread: threading.Thread | None = None
try:
thread = threading.Thread(target=run_task, args=[payload])
thread.start()
return TaskRunnerResponse(success=True, message="Started Task successfully")
except Exception as ex:
prt.logger.error("Error on task", exc_info=True)
return TaskRunnerResponse(success=False, message=f"An Error occurred while running task. Error: {str(ex)}")
def run_task(request: TaskRunnerRequest):
assert request.task_file_name, "No task_file_name located"
assert request.worker_config, "No worker_config located"
success = None
worker = None
worker_logs = ""
try:
prt.logger.info(
f"Starting task: {request.task_id} ({request.task_file_name}) terminate_on_completion: {request.terminate_on_completion}"
)
tasks_path = get_tasks_path()
prt.logger.info(f"Tasks path to upload: {tasks_path}")
worker, success = prt.run_task(
file_name=request.task_file_name,
files_path=tasks_path,
worker_config=request.worker_config,
terminate_on_completion=False,
)
worker_logs = worker.get_logs(log_size_mb=5)
except Exception as ex:
message = f"Finished task_name: {request.task_file_name} with error"
prt.logger.error(message, exc_info=True)
raise ex
finally:
try:
if success is not None:
if success:
prt.logger.info("Finished successfully")
else:
prt.logger.error(
f"Finished task with error. Check logs for details. task_name: {request.task_file_name}"
)
if request.callback_url:
trigger_callback(request=request, success=success, worker_logs=worker_logs)
else:
prt.logger.debug(f"Task: {request.task_file_name} does not have a callback url.")
finally:
if worker is not None and request.terminate_on_completion:
worker.terminate()
def trigger_callback(request: TaskRunnerRequest, success: bool, worker_logs: str):
request.callback_url, "No callback_url located"
headers = {"content-type": "application/json"}
if request.callback_url_token:
prt.logger.debug(f"Task: '{request.task_file_name}' includes a security token, including as a Bearer token.")
headers["authorization"] = f"Bearer {request.callback_url_token}"
else:
prt.logger.debug(f"Task: {request.task_file_name} does not include a security token.")
callback_payload = {
"task_name": request.task_file_name,
"task_id": request.task_id,
"success": success,
"logs": worker_logs,
}
import json
json_data = json.dumps(callback_payload)
prt.logger.info(f"Sending to callback url: '{request.callback_url}' success: '{success}'")
resp = requests.post(request.callback_url, json=json_data, headers=headers)
if resp.ok:
prt.logger.info(f"Response text: {resp.text}")
else:
prt.logger.error(f"Response has errors: resp.status_code: {resp.status_code} - resp.text{resp.text}")
def get_tasks_path() -> str:
"""
Returns the path to the 'tasks' directory.
Steps:
1. Retrieve the current execution path using get_execution_path().
2. Get the parent directory of the execution path.
3. Construct the path to the 'tasks' directory
"""
# Get the execution path.
exec_path = get_execution_path()
prt.logger.debug(f"Exec pah: {exec_path}")
# Determine the parent directory of the execution path.
parent_dir = os.path.dirname(exec_path)
prt.logger.debug(f"parent_dir: {parent_dir}")
# Construct the path for the "tasks" directory at the same level.
path = os.path.join(parent_dir, "tasks")
prt.logger.debug(f"path: {path}")
return path
def get_execution_path() -> str:
"""
Returns the execution path of the current script.
If the __file__ variable is available, it returns the directory of the script.
Otherwise, it falls back to the current working directory.
"""
try:
# __file__ is defined when the script is run from a file.
path = os.path.dirname(os.path.abspath(__file__))
prt.logger.debug(f"path from file path: {path}")
return path
except NameError:
# __file__ is not defined in an interactive shell; use the current working directory.
path = os.getcwd()
prt.logger.debug(f"path from os.getcwd(): {path}")
return path
# async def main():
# worker_config = prt.WorkerConfig(
# worker_size="X-Small",
# # personal_secrets=[git_secret_name],
# # git_configs=[git_config],
# )
# payload = TaskRunnerRequest(
# task_file_name="deployer_task.py",
# task_id="task-1",
# worker_config=worker_config,
# callback_url="https://dev.practicus.io/apps/task-runner-app/api/task-runner-callback",
# terminate_on_completion=True,
# )
# await task_runner(payload)
# if __name__ == "__main__":
# import asyncio
# asyncio.run(main())
# if __name__ == "__main__":
# worker_config = prt.WorkerConfig(worker_size="X-Small")
# payload = TaskRunnerRequest(
# task_file_name="deployer_task.py",
# task_id="task-1",
# worker_config=worker_config,
# callback_url="https://dev.practicus.io/apps/task-runner-app/api/task-runner-callback",
# terminate_on_completion=True,
# )
# trigger_callback(request=payload, success=True, worker_logs="logs logs ....")
task_runnner_app/tasks/deployer_task.py
# Sample Task that the Task Runner App will execute
import practicuscore as prt
from pathlib import Path
def any_file_exists(dir_path: str) -> bool:
"""
Check if there is at least one file anywhere under the given directory.
:param dir_path: Path to the directory to be searched.
:return: True if any file is found, False otherwise.
"""
base = Path(dir_path)
return any(p.is_file() for p in base.rglob("*"))
def main():
prt.logger.info("Starting model deployment task ")
import os
assert "DEPLOYMENT_KEY" in os.environ and len(os.environ["DEPLOYMENT_KEY"]) > 0, "DEPLOYMENT_KEY is not provided"
assert "PREFIX" in os.environ and len(os.environ["PREFIX"]) > 0, "PREFIX is not provided"
assert "MODEL_NAME" in os.environ and len(os.environ["MODEL_NAME"]) > 0, "MODEL_NAME is not provided"
assert "MODEL_DIR" in os.environ and len(os.environ["MODEL_DIR"]) > 0, "MODEL_DIR is not provided"
deployment_key = os.environ["DEPLOYMENT_KEY"] # "depl"
prefix = os.environ["PREFIX"] # models
model_name = os.environ["MODEL_NAME"] # sample-model-ro
model_dir = os.environ["MODEL_DIR"] # "/home/ubuntu/my/projects/models-repo/sample-model"
if not any_file_exists(model_dir):
msg = "Model directory is empty or does not exist. Check logs for details."
prt.logger.error(msg)
raise RuntimeError(msg)
else:
api_url, api_version_url, api_meta_url = prt.models.deploy(
deployment_key=deployment_key, prefix=prefix, model_name=model_name, model_dir=model_dir
)
prt.logger.info(f"Finished model deployment task successfully. api_url : {api_version_url}")
if __name__ == "__main__":
main()
Previous: Build | Next: Run Task Safe > Deploy