Customizing Task Parameters
This example demonstrates how to customize and pass parameters to a Practicus AI worker. You can do so by defining environment variables in the WorkerConfig
object.
Why Environment Variables?
Environment variables are often the easiest way to inject small pieces of configuration or parameters into a script. Practicus AI automatically sets these environment variables in the worker's environment when your script runs.
Basic Example
In this simple example, we show how to set environment variables in WorkerConfig
and then access them in a Python script.
# task.py
import os
print("First Param:", os.environ["MY_FIRST_PARAM"])
print("Second Param:", os.environ["MY_SECOND_PARAM"])
Defining WorkerConfig
with Environment Variables
Below, we create a WorkerConfig
object. Notice how we specify environment variables in the env_variables
dictionary. Practicus AI will ensure these variables are set when task.py
is executed.
import practicuscore as prt
worker_config = prt.WorkerConfig(
worker_image="practicus", # The base container image
worker_size="X-Small", # Size configuration
env_variables={"MY_FIRST_PARAM": "VALUE1", "MY_SECOND_PARAM": 123},
)
worker, success = prt.run_task(
file_name="task.py", # The name of the script to run
worker_config=worker_config,
)
print("Task finished with status:", success)
When this code runs, it will print out the values of MY_FIRST_PARAM
and MY_SECOND_PARAM
from within task.py
.
Airflow Integration
To integrate with Practicus AI Airflow, you can utilize the same approach. Airflow tasks can inject environment variables by writing out a WorkerConfig
JSON file that Practicus AI can pick up.
For example:
1. Define your worker_config
in Python.
2. Serialize it to JSON.
3. Store it in a file named after your task (e.g., task_worker.json
if your script is task.py
).
Example: Writing task_worker.json
worker_config_json = worker_config.model_dump_json(exclude_none=True)
with open("task_worker.json", "wt") as f:
f.write(worker_config_json)
print("Generated worker_config JSON:\n", worker_config_json)
An example task_worker.json
might look like:
{
"worker_image": "practicus",
"worker_size": "X-Small",
"additional_params": "eyJlbnZfdmFyaWFibGVzIjogeyJNWV9GSVJTVF9QQVJBTSI6ICJWQUxVRTEiLCAiTVlfU0VDT05EX1BBUkFNIjogMTIzfX0="
}
Note that additional_params
is base64-encoded data containing:
Summary
- Defining environment variables: Use
env_variables
withinWorkerConfig
to inject parameters. - Accessing parameters: In your Python script (e.g.,
task.py
), read them fromos.environ
. - Airflow: Write out the
worker_config
to a JSON file. Practicus AI automatically picks that up.
By following these steps, you can effectively pass custom parameters and configurations to your Practicus AI tasks, making your data pipelines more dynamic and flexible.
Supplementary Files
task_runnner_app/apis/sample_callback.py
# Sample callback APIs, we will use them for our tests.
from pydantic import BaseModel
import practicuscore as prt
from starlette.requests import Request
class TaskRunnerCallbackRequest(BaseModel):
task_name: str
"""Model Deployment Task"""
task_id: str
"""Unique task ID"""
success: bool
"""True if the task completed successfully, False otherwise"""
logs: str
"""Any info or error message"""
model_config = {
"use_attribute_docstrings": True,
"json_schema_extra": {
"examples": [
{
"task_name": "deploy_model",
"task_id": "123e4567-e89b-12d3-a456-426614174000",
"success": True,
"message": "Model deployed successfully",
},
{
"task_name": "cleanup_task",
"task_id": "123e4567-e89b-12d3-a456-426614174001",
"success": False,
"message": "Error occurred during cleanup",
},
]
},
}
class TaskRunnerCallbackResponse(BaseModel):
success: bool
"""True or False"""
message: str
"""Any info or error message"""
model_config = {
"use_attribute_docstrings": True,
"json_schema_extra": {
"examples": [
{"success": True, "message": "Successfully finished"},
{"success": False, "message": "Errored occurred"},
]
},
}
api_spec = prt.APISpec(
disable_authentication=True,
)
@prt.apps.api(path="/sample-task-runner-callback", spec=api_spec)
async def task_runner_callback(payload: TaskRunnerCallbackRequest, **kwargs) -> TaskRunnerCallbackResponse:
if payload.success:
prt.logger.info(f"Task with name : {payload.task_name} task_id : finished successfully")
else:
prt.logger.error(
f"Task with name : {payload.task_name} finished with error\npayload : {payload.model_dump_json(indent=2)}"
)
return TaskRunnerCallbackResponse(success=True, message="ok")
@prt.apps.api(path="/sample-task-runner-callback2", spec=api_spec)
async def task_runner_callback2(request: Request, **kwargs) -> TaskRunnerCallbackResponse:
prt.logger.info(f"request: {request}")
body: dict = await request.json()
task_name = body.get("task_name")
task_id = body.get("task_id")
success = body.get("success")
logs = body.get("logs")
if success:
prt.logger.info(f"Task with name : {task_name} task_id : {task_id} finished successfully")
else:
prt.logger.error(f"Task with name : {task_name} task_id : {task_id} finished with error\nlogs : {logs}")
return TaskRunnerCallbackResponse(success=True, message="ok")
task_runnner_app/apis/task_runner.py
import os
import threading
import requests
from pydantic import BaseModel
import practicuscore as prt
class TaskRunnerRequest(BaseModel):
task_id: str
"""Unique task id (1222345, 23erd-rt56-tty67-34er5 ...)"""
task_file_name: str
"""deployer_task.py, task1.sh... (*.py, *.sh files) """
worker_config: prt.WorkerConfig
"""WorkerConfig with or without a GitConfig"""
callback_url: str | None = None
"""Callback URL to be called after the task finishes"""
callback_url_token: str | None = None
"""Optional Callback URL security (bearer) token"""
terminate_on_completion: bool = True
"""Determines whether the task runner should terminate on completion"""
model_config = {
"use_attribute_docstrings": True,
"json_schema_extra": {
"examples": [
{
"task_id": "23erd-rt56",
"task_file_name": "deployer_task.py",
"worker_config": {
"worker_size": "small",
"personal_secrets": ["git_secret"],
"git_configs": [
{"remote_url": "https://github.com/example/repo.git", "secret_name": "git_secret"}
],
},
"callback_url": "https://example.com/callback",
"terminate_on_completion": True,
}
]
},
}
class TaskRunnerResponse(BaseModel):
success: bool
"""True or False"""
message: str
"""Any info or error message"""
model_config = {
"use_attribute_docstrings": True,
"json_schema_extra": {
"examples": [
{"success": True, "message": "Successfully finished"},
{"success": False, "message": "Errored occurred"},
]
},
}
@prt.apps.api(path="/task-runner")
async def task_runner(payload: TaskRunnerRequest, **kwargs) -> TaskRunnerResponse:
"""This API starts task async and returns started message"""
prt.logger.info("Started running task...")
thread: threading.Thread | None = None
try:
thread = threading.Thread(target=run_task, args=[payload])
thread.start()
return TaskRunnerResponse(success=True, message="Started Task successfully")
except Exception as ex:
prt.logger.error("Error on task", exc_info=True)
return TaskRunnerResponse(success=False, message=f"An Error occurred while running task. Error: {str(ex)}")
def run_task(request: TaskRunnerRequest):
assert request.task_file_name, "No task_file_name located"
assert request.worker_config, "No worker_config located"
success = None
worker = None
worker_logs = ""
try:
prt.logger.info(
f"Starting task: {request.task_id} ({request.task_file_name}) terminate_on_completion: {request.terminate_on_completion}"
)
tasks_path = get_tasks_path()
prt.logger.info(f"Tasks path to upload: {tasks_path}")
worker, success = prt.run_task(
file_name=request.task_file_name,
files_path=tasks_path,
worker_config=request.worker_config,
terminate_on_completion=False,
)
worker_logs = worker.get_logs(log_size_mb=5)
except Exception as ex:
message = f"Finished task_name: {request.task_file_name} with error"
prt.logger.error(message, exc_info=True)
raise ex
finally:
try:
if success is not None:
if success:
prt.logger.info("Finished successfully")
else:
prt.logger.error(
f"Finished task with error. Check logs for details. task_name: {request.task_file_name}"
)
if request.callback_url:
trigger_callback(request=request, success=success, worker_logs=worker_logs)
else:
prt.logger.debug(f"Task: {request.task_file_name} does not have a callback url.")
finally:
if worker is not None and request.terminate_on_completion:
worker.terminate()
def trigger_callback(request: TaskRunnerRequest, success: bool, worker_logs: str):
request.callback_url, "No callback_url located"
headers = {"content-type": "application/json"}
if request.callback_url_token:
prt.logger.debug(f"Task: '{request.task_file_name}' includes a security token, including as a Bearer token.")
headers["authorization"] = f"Bearer {request.callback_url_token}"
else:
prt.logger.debug(f"Task: {request.task_file_name} does not include a security token.")
callback_payload = {
"task_name": request.task_file_name,
"task_id": request.task_id,
"success": success,
"logs": worker_logs,
}
import json
json_data = json.dumps(callback_payload)
prt.logger.info(f"Sending to callback url: '{request.callback_url}' success: '{success}'")
resp = requests.post(request.callback_url, json=json_data, headers=headers)
if resp.ok:
prt.logger.info(f"Response text: {resp.text}")
else:
prt.logger.error(f"Response has errors: resp.status_code: {resp.status_code} - resp.text{resp.text}")
def get_tasks_path() -> str:
"""
Returns the path to the 'tasks' directory.
Steps:
1. Retrieve the current execution path using get_execution_path().
2. Get the parent directory of the execution path.
3. Construct the path to the 'tasks' directory
"""
# Get the execution path.
exec_path = get_execution_path()
prt.logger.debug(f"Exec pah: {exec_path}")
# Determine the parent directory of the execution path.
parent_dir = os.path.dirname(exec_path)
prt.logger.debug(f"parent_dir: {parent_dir}")
# Construct the path for the "tasks" directory at the same level.
path = os.path.join(parent_dir, "tasks")
prt.logger.debug(f"path: {path}")
return path
def get_execution_path() -> str:
"""
Returns the execution path of the current script.
If the __file__ variable is available, it returns the directory of the script.
Otherwise, it falls back to the current working directory.
"""
try:
# __file__ is defined when the script is run from a file.
path = os.path.dirname(os.path.abspath(__file__))
prt.logger.debug(f"path from file path: {path}")
return path
except NameError:
# __file__ is not defined in an interactive shell; use the current working directory.
path = os.getcwd()
prt.logger.debug(f"path from os.getcwd(): {path}")
return path
# async def main():
# worker_config = prt.WorkerConfig(
# worker_size="X-Small",
# # personal_secrets=[git_secret_name],
# # git_configs=[git_config],
# )
# payload = TaskRunnerRequest(
# task_file_name="deployer_task.py",
# task_id="task-1",
# worker_config=worker_config,
# callback_url="https://dev.practicus.io/apps/task-runner-app/api/task-runner-callback",
# terminate_on_completion=True,
# )
# await task_runner(payload)
# if __name__ == "__main__":
# import asyncio
# asyncio.run(main())
# if __name__ == "__main__":
# worker_config = prt.WorkerConfig(worker_size="X-Small")
# payload = TaskRunnerRequest(
# task_file_name="deployer_task.py",
# task_id="task-1",
# worker_config=worker_config,
# callback_url="https://dev.practicus.io/apps/task-runner-app/api/task-runner-callback",
# terminate_on_completion=True,
# )
# trigger_callback(request=payload, success=True, worker_logs="logs logs ....")
task_runnner_app/tasks/deployer_task.py
# Sample Task that the Task Runner App will execute
import practicuscore as prt
from pathlib import Path
def any_file_exists(dir_path: str) -> bool:
"""
Check if there is at least one file anywhere under the given directory.
:param dir_path: Path to the directory to be searched.
:return: True if any file is found, False otherwise.
"""
base = Path(dir_path)
return any(p.is_file() for p in base.rglob("*"))
def main():
prt.logger.info("Starting model deployment task ")
import os
assert "DEPLOYMENT_KEY" in os.environ and len(os.environ["DEPLOYMENT_KEY"]) > 0, "DEPLOYMENT_KEY is not provided"
assert "PREFIX" in os.environ and len(os.environ["PREFIX"]) > 0, "PREFIX is not provided"
assert "MODEL_NAME" in os.environ and len(os.environ["MODEL_NAME"]) > 0, "MODEL_NAME is not provided"
assert "MODEL_DIR" in os.environ and len(os.environ["MODEL_DIR"]) > 0, "MODEL_DIR is not provided"
deployment_key = os.environ["DEPLOYMENT_KEY"] # "depl"
prefix = os.environ["PREFIX"] # models
model_name = os.environ["MODEL_NAME"] # sample-model-ro
model_dir = os.environ["MODEL_DIR"] # "/home/ubuntu/my/projects/models-repo/sample-model"
if not any_file_exists(model_dir):
msg = "Model directory is empty or does not exist. Check logs for details."
prt.logger.error(msg)
raise RuntimeError(msg)
else:
api_url, api_version_url, api_meta_url = prt.models.deploy(
deployment_key=deployment_key, prefix=prefix, model_name=model_name, model_dir=model_dir
)
prt.logger.info(f"Finished model deployment task successfully. api_url : {api_version_url}")
if __name__ == "__main__":
main()
Previous: Build | Next: API Triggers For Airflow