practicuscore

Practicus AI SDK v24.8.2

Overview

Welcome to the Practicus AI SDK, a Python library that allows you to interact with Practicus AI Regions, manage Workers, deploy and manage machine learning Models and Apps, and orchestrate distributed jobs.

This SDK provides an intuitive Pythonic interface for common operations, such as:

  • Connecting to Practicus AI Regions and managing their resources.
  • Creating and working with Workers and Workspaces.
  • Building and deploying ML models and GenAI apps, managing data connections
  • Running distributed workloads (Spark, Dask, Torch) seamlessly.
  • Accessing notebooks, experiments tracking, and more.

Key Functionality

Below are some of the helper classes, interfaces and functions for interacting with Practicus AI platform.

Helper Classes

These static wrapper classes provide convenient entrypoints and abstractions for common operations, allowing you to work with Practicus AI resources efficiently:

  • regions: Manage and interact with Practicus AI Regions, handle login, workers, etc.
  • models: Deploy and manage ML models, view model prefixes and versions.
  • apps: Deploy and manage GenAI focused apps and APIs and their versions.
  • workflows: Deploy and manage workflows (e.g., Airflow DAGs).
  • connections: Manage and create data source connections (e.g., S3, SQL).
  • distributed: Work with distributed job clusters and frameworks.
  • auth: Authentication helpers for logging in and out of regions.
  • engines: Interact with data processing engines like Spark.
  • experiments: Manage experiment tracking services, such as MLFlow.
  • notebooks: Access Jupyter notebooks running on workers.
  • quality: Code quality (linting, formatting) utilities.

Core Classes

These core classes represent the primary entities you'll interact with the most:

  • Region: Represents a Practicus AI Region (control plane) where you can manage workers, models, apps, and connections.
  • Worker: Represents a compute resource (pod) in the Region where you can run tasks, load data, open notebooks, and more.
  • Process: Represents an OS-level process running on a Worker that can load and manipulate data, run transformations, execute code snippets, build models, and perform predictions—all directly on the Worker.

Sample Usage

import practicuscore as prt

# Connect to the default region
region = prt.get_default_region()

# Create a worker
worker = region.create_worker()

# Deploy a model
dynamic_url, version_url, meta_url = region.deploy_model(
    deployment_key="my-model-service", prefix="my-prefix", model_name="my-model"
)

# Run a task on a worker
_, success = prt.run_task("analysis.py", files_path="project_code/")
print("Task successful:", success)

Alias Functions

In addition to using the Region or it's helper regions classes directly, the SDK provides some alias functions as shortcuts that directly map to commonly used methods in a selected (or default) region. This allows you to perform actions without explicitly fetching or referencing a Region object first:

These aliases simplify calls to region-dependent functions, making code more concise and direct. For example:

import practicuscore as prt

# Instead of:
region = prt.get_default_region()
worker = region.create_worker()

# You can do:
worker = prt.create_worker()

Practicus AI Documentation

For help on getting started with the Practicus AI platform and tutorials, please visit: Practicus AI Documentation

class regions:

Provides a simplified, high-level interface for managing and interacting with Practicus AI Regions.

A Region in Practicus AI represents a logical control plane environment—typically isolated via Kubernetes namespaces—where resources such as workers, workspaces, model hosting, applications and tasks are managed. The regions class offers convenient static methods to authenticate, select, and operate on these remote execution environments without requiring direct instantiation of Region objects.

By default, operations on regions occur against the currently active (default) region, but the regions class allows selecting other regions, logging in or out of different regions, and performing tasks like launching workers or running remote scripts. It abstracts away the complexities of region-specific APIs and credentials.

Key Concepts:

  • Region: A Practicus AI control plane endpoint. Multiple regions can be configured and managed.
  • Worker: A Kubernetes pod in a region that executes code, runs notebooks, tasks, distributed jobs, builds models, apps..

Common Usage:

import practicuscore as prt

# Login to a specific region
region = prt.regions.login(
    url="https://my-practicus-region.example.com",
    email="user@example.com",
    password="mypassword"
)

# Set the default region (if multiple are configured)
prt.regions.set_default_region("user@my-practicus-region.example.com")
@staticmethod
def running_on_a_worker() -> bool:

Checks if the current code is executing inside a Practicus AI Worker.

This is useful for conditional logic that depends on whether the code runs locally or on a remote Practicus AI-managed environment.

Returns

True if this code is running inside a Practicus AI Worker pod; False otherwise.

@staticmethod
def login( url: str, email: str, password: str | None = None, refresh_token: str | None = None, save_password: bool = False, access_token: str | None = None, save_config: bool = True, *args, **kwargs) -> Region:

Authenticates the user to a Practicus AI region and returns a Region instance.

The login process uses the provided credentials (password or tokens) to obtain a refresh token and access token. Once authenticated, the region and associated credentials are stored so they can be reused in subsequent sessions, unless save_config is set to False.

Parameters:

  • url (str): The base URL of the Practicus AI region, e.g. "https://my-region.example.com".
  • email (str): The user's email associated with the Practicus AI account.
  • password (str | None): The user's password. Optional if refresh_token or access_token are provided.
  • refresh_token (str | None): Existing refresh token; can be used instead of password-based login.
  • save_password (bool): If True, saves the entered password securely for future sessions.
  • access_token (str | None): An existing valid access token to bypass credential-based login.
  • save_config (bool): If True, persists the configuration so that subsequent sessions do not require login.
Returns

A Region object representing the authenticated Practicus AI environment.

Example:

region = prt.regions.login(
    url="https://my-practicus-region.example.com",
    email="user@example.com",
    password="mypassword",
    save_password=True
)
@staticmethod
def logout( region_key: str | None = None, all_regions: bool = True, *args, **kwargs):

Logs out of one or all Practicus AI regions, removing stored credentials from local configuration.

Parameters:

  • region_key (str | None): The key of the region to logout from. If None and all_regions is False, no action will be taken.
  • all_regions (bool): If True (default), logs out from all known regions. If False, you must specify region_key.
Returns

None

@staticmethod
def get_region_list() -> PrtList[Region]:

Retrieves a list of all configured and logged-in Practicus AI regions.

Each entry in the returned list is a Region instance containing connection and authentication info.

Returns

A read-only PrtList of Region objects.

@staticmethod
def get_region( region_key: str | None = None, *args, **kwargs) -> Region:

Retrieves a specific Practicus AI region based on the provided region key or returns the default region.

Parameters:

  • region_key (str | None): A region identifier in either username@region_address or region_address format. If not provided, the default region is returned.

Returns: A Region object.

Example:

# If multiple regions are available:
region = prt.regions.get_region("alice@my-practicus-region.example.com")

# If none provided, defaults to last used or configured default:
default_region = prt.regions.get_region()
@staticmethod
def get_default_region(*args, **kwargs) -> Region:

Retrieves the default Practicus AI region. The default region is typically the one last logged-in to or explicitly set using set_default_region.

Returns

The default Region instance.

@staticmethod
def current_region(*args, **kwargs) -> Region:

Returns the currently active Practicus AI region. If the code is running inside a worker, this is the region associated with that worker. Otherwise, this returns the default configured region.

Returns

A Region object for the current environment.

@staticmethod
def region_factory( worker_config: WorkerConfig | str | dict | None = None) -> Region:

Creates or retrieves a Region instance based on a provided worker configuration or returns the current/default region if none is provided.

This is useful in contexts where you may have a serialized or external configuration that specifies a region.

Parameters:

  • worker_config (WorkerConfig | str | dict | None): A configuration object or JSON path/dict that may contain region connection details. If None, returns the current region.
Returns

A Region instance determined by the provided configuration or the current region.

@staticmethod
def set_default_region(region_key: str, *args, **kwargs) -> bool:

Sets the default Practicus AI region. Subsequent operations that do not explicitly specify a region will use this default.

Parameters:

  • region_key (str): The region identifier, e.g. username@my-region.example.com or just my-region.example.com if only one user is associated with it.
Returns

True if the region is successfully set as default; False otherwise.

@staticmethod
def get_local_worker(*args, **kwargs) -> Worker:

Retrieves the Worker instance representing the current environment if the code is running inside a worker pod.

Returns: A Worker instance associated with the local environment.

Note: If not running inside a worker, this will raise an error.

@staticmethod
def create_worker( worker_config: WorkerConfig | str | dict | None = None, wait_until_ready: bool | None = None, **kwargs) -> Worker:

Creates a new remote Practicus AI Worker in the current or specified region.

A worker is a computational pod that can run code, host Jupyter notebooks, build models etc. By default, it uses the current region unless worker_config points to another region.

Parameters:

  • worker_config (WorkerConfig | str | dict | None): Optional configuration for the worker. Accepts a JSON path, a dict, or a WorkerConfig object. If None, uses the default configuration.
  • wait_until_ready (bool | None): If True, the method waits until the worker is fully provisioned and ready.
Returns

A Worker instance representing the newly created remote pod.

@staticmethod
def create_workspace( worker_config: WorkerConfig | str | dict | None = None, **kwargs) -> Worker:

Creates a new Practicus AI Workspace (a special type of Worker) in the selected region.

A workspace is a worker configured for interactive development and includes Practicus AI Studio, office tools and more.

Parameters:

  • worker_config (WorkerConfig | str | dict | None): Configuration for the workspace. Accepts a JSON path, a dict, or WorkerConfig. If None, uses the default configuration.
Returns

A Worker instance configured as a workspace.

@staticmethod
def get_or_create_worker( worker_config: WorkerConfig | str | dict | None = None, **kwargs) -> Worker:

Attempts to retrieve an existing worker (if it matches the provided configuration) or creates one if not found.

This is useful for idempotent deployments where you do not want to create duplicates if the worker already exists.

Parameters:

  • worker_config (WorkerConfig | str | dict | None): The configuration to check against existing workers. If not provided, defaults are used.
Returns

A Worker instance, either existing or newly created.

@staticmethod
def run_task( file_name: str, files_path: str | None = None, worker_config: WorkerConfig | str | dict | None = None, terminate_on_completion: bool = True, capture_task_output: bool = True, python_venv_name: str | None = None, max_files_to_upload: int = 250, *args, **kwargs) -> tuple[Worker | None, bool]:

Runs a specified script (Python or shell) as a "task" on a newly created remote worker.

Common uses include running batch jobs, scheduled tasks, or CI/CD pipeline steps in a controlled environment.

Parameters:

  • file_name (str): The path to the script to run (e.g. "run_analysis.py" or "deploy.sh").
  • files_path (str | None): The directory containing all necessary files to upload. If None, uses current directory.
  • worker_config (WorkerConfig | str | dict | None): Configuration for the worker to run this task.
  • terminate_on_completion (bool): If True, the worker is terminated after the task finishes.
  • capture_task_output (bool): If True, captures and logs stdout/stderr from the task's execution.
  • python_venv_name (str | None): Name of the Python virtual environment on the worker to use.
  • max_files_to_upload (int): Maximum number of files to upload from files_path.
Returns

A tuple of (Worker, bool) where Worker is the worker used or created for this task, and bool indicates if the task succeeded.

@staticmethod
def auto_login() -> Region:

Automatically logs into the Practicus AI region when running within a known Practicus AI environment (e.g., a worker or workspace) that already has embedded credentials. Useful for automations and internal scenarios where explicit login is not required.

Returns

The default Region instance after auto-login.

@staticmethod
def change_password(old_password: str, new_password: str):

Changes the current user's password in the default Practicus AI region.

Parameters:

  • old_password (str): The current password.
  • new_password (str): The new desired password.
@staticmethod
def get_access_token() -> str:

Retrieves a fresh access token for the currently active region using the stored refresh token.

Returns

The new access token as a string.

@staticmethod
def get_refresh_and_access_token() -> tuple[str, str]:

Retrieves both a refresh token and an updated access token for the current region session.

Returns

A tuple (refresh_token, access_token).

@staticmethod
def get_workspace_credentials(instance_id: str) -> tuple[str, str]:

Retrieves the username and password credentials for a specified workspace instance.

The username is typically derived from the user's email address. The password is a managed credential provided by the region.

Parameters:

  • instance_id (str): The identifier of the workspace. View available instances via region.worker_list.
Returns

A tuple (username, password) for the selected workspace.

class models:

Provides a high-level interface for managing machine learning models within Practicus AI.

The models class allows you to:

  • Deploy models to a Practicus AI region for serving predictions and model metadata.
  • Generate and manage model configurations, including model signatures and metadata.
  • Acquire short-lived session tokens for model APIs (e.g., for prediction or uploading a new model version).
  • Package and unpackage model artifacts using a standardized zip archive, simplifying model deployment flows.

Key Concepts:

  • Model Deployment: Uploading a model and its associated files (artifacts, weights, metadata) to a server accessible via a stable API endpoint.
  • Model Config & Signature: Capturing and storing metadata about the model’s input schema, output schema, problem type, and version information. This information is crucial for reproducibility, explainability, and consistent model version management.

Common Usage:

import practicuscore as prt

# Deploy a model to the current region
prt.models.deploy(
    deployment_key="my_model_service",
    prefix="mymodel",
    model_name="model_v1",
)
@staticmethod
def deploy( deployment_key: str, prefix: str, model_name: str, model_dir: str | None = None, *args, **kwargs) -> tuple[str, str, str]:

Deploys a model to the currently selected Practicus AI region.

This operation uploads model artifacts and registers them under the specified model name and prefix, making them accessible via stable API endpoints. If multiple versions of a model exist, the prefix can be used to route requests to the latest version dynamically.

Parameters:

  • deployment_key (str): The key identifying the model deployment service, provided by your Practicus AI admin.
  • prefix (str): A prefix used to group and identify model versions under a common name.
  • model_name (str): The name of this specific model version.
  • model_dir (str | None): The directory containing model files. If None, uses the current working directory.

Returns:

  • tuple[str, str, str]: A tuple containing:
    1. API URL for dynamic version routing.
    2. API URL for the uploaded version.
    3. API URL to read metadata of the model.

Example:

api_url_dynamic, api_url_version, api_url_metadata = prt.models.deploy(
    deployment_key="my_model_service",
    prefix="mymodel",
    model_name="model_v1",
)
@staticmethod
def get_session_token( api_url: str, for_upload: bool = False, retry=3, *args, **kwargs) -> str | None:

Retrieves a short-lived session token to interact with a model's API—either for making predictions (inference) or uploading a new model version.

Parameters:

  • api_url (str): The base URL of the model API, e.g. "https://practicus.company.com/models/my-model/".
  • for_upload (bool): If True, requests a token with permissions suitable for model uploads.
  • retry (int): Number of retry attempts if the token request fails.

Returns:

  • str | None: A token string if successful, or None if unsuccessful.

Example:

token = prt.models.get_session_token("https://practicus.company.com/models/mymodel/", for_upload=True)
if token:
    print("Got a session token for model uploads!")
else:
    print("Failed to retrieve a session token.")
@staticmethod
def load_config( model_config: dict | str, *args, **kwargs) -> practicuscore.api_def.ModelConfig | None:

Loads a ModelConfig object from a dictionary or a JSON string/file.

Parameters:

  • model_config (dict | str):
    • If dict, directly interprets it as model configuration.
    • If str, treats it as either a JSON string or a path to a JSON file, and parses it.

Returns:

  • ModelConfig | None: A ModelConfig instance if successful, or None if parsing fails.

Example:

model_conf = prt.models.load_config("model_config.json")
print("Loaded model config:", model_conf)
@staticmethod
def get_mlflow_data_type( df: pandas.core.frame.DataFrame, col_name: str, *args, **kwargs) -> mlflow.types.schema.DataType:

Infers the MLflow data type for a given DataFrame column. MLflow requires explicit schema types for model signatures, and this method maps Pandas dtypes to MLflow data types.

Parameters:

  • df (pd.DataFrame): The DataFrame containing the data.
  • col_name (str): The column name to infer the type for.

Returns:

  • DataType: An MLflow DataType object (e.g., DataType.double, DataType.string).

Example:

mlflow_dtype = prt.models.get_mlflow_data_type(df, "age")
print("Inferred MLflow data type:", mlflow_dtype)
@staticmethod
def get_model_signature( df: pandas.core.frame.DataFrame, target: str | None = None, *args, **kwargs) -> mlflow.models.signature.ModelSignature:

Generates a MLflow ModelSignature object from a given DataFrame and optional target column.

A model signature captures input and output schema for a model. The target column (if provided) is treated as the output schema, and all other columns are treated as inputs.

Parameters:

  • df (pd.DataFrame): The DataFrame representing the model’s training or inference data.
  • target (str | None): The column name representing the model's target variable, if any.

Returns:

  • ModelSignature: An MLflow ModelSignature object describing the model’s input and output schema.

Example:

signature = prt.models.get_model_signature(df, target="label")
print("Model signature:", signature)
@staticmethod
def get_model_signature_json( df: pandas.core.frame.DataFrame, target: str | None = None, *args, **kwargs) -> str:

Generates a JSON representation of the model signature from a DataFrame and optional target.

Parameters:

  • df (pd.DataFrame): The data to infer schema from.
  • target (str | None): The target column for output schema, if applicable.

Returns:

  • str: A JSON string representing the model signature.

Example:

signature_json = prt.models.get_model_signature_json(df, target="label")
print(signature_json)
@staticmethod
def create_model_config( df: pandas.core.frame.DataFrame, target: str | None = None, model_name: str | None = None, problem_type: str | None = None, version_name: str | None = None, final_model: str | None = None, score: float | None = None, *args, **kwargs) -> practicuscore.api_def.ModelConfig:

Creates a ModelConfig instance from a DataFrame and metadata about the model.

This method generates a model signature and bundles it with additional information like model name, problem type, version, and performance score.

Parameters:

  • df (pd.DataFrame): DataFrame from which to infer the model’s input/output schema.
  • target (str | None): The target column for output schema, if any.
  • model_name (str | None): A name for the model.
  • problem_type (str | None): The type of problem (e.g., "classification", "regression").
  • version_name (str | None): A version identifier for the model.
  • final_model (str | None): A reference or path to the final model artifact.
  • score (float | None): A performance metric value (e.g., accuracy or RMSE).

Returns:

Example:

model_conf = prt.models.create_model_config(
    df, target="label", model_name="my_classifier", problem_type="classification", score=0.95
)
print(model_conf)
@staticmethod
def zip(files_to_add: list[str], model_dir: str | None = None):

Packages selected files into a model.zip archive for model deployment.

By default, Practicus AI may automatically upload certain files (like environment configuration). If you include these files in model.zip, it may cause ambiguity. Hence, certain known files are skipped.

Parameters:

  • files_to_add (list[str]): A list of filenames (within model_dir) to include in the zip file.
  • model_dir (str | None): The directory containing the files. If None, defaults to the current directory.

Example:

prt.models.zip(["trained_model.pkl", "requirements.txt"], model_dir="path/to/model/files")
@staticmethod
def unzip(model_dir: str | None = None):

Extracts files from model.zip into the specified directory.

Parameters:

  • model_dir (str | None): The directory where model.zip is located and where files will be extracted. If None, defaults to the current directory.

Example:

prt.models.unzip(model_dir="path/to/model/files")
# All files from model.zip are now extracted into model_dir.
class models.ModelConfig(practicuscore.api_base.PrtBaseModel):

Usage docs: https://docs.pydantic.dev/2.9/concepts/models/

A base class for creating Pydantic models.

Attributes: __class_vars__: The names of the class variables defined on the model. __private_attributes__: Metadata about the private attributes of the model. __signature__: The synthesized __init__ [Signature][inspect.Signature] of the model.

__pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__: The core schema of the model.
__pydantic_custom_init__: Whether the model has a custom `__init__` function.
__pydantic_decorators__: Metadata containing the decorators defined on the model.
    This replaces `Model.__validators__` and `Model.__root_validators__` from Pydantic V1.
__pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to
    __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__: The name of the post-init method for the model, if defined.
__pydantic_root_model__: Whether the model is a [`RootModel`][pydantic.root_model.RootModel].
__pydantic_serializer__: The `pydantic-core` `SchemaSerializer` used to dump instances of the model.
__pydantic_validator__: The `pydantic-core` `SchemaValidator` used to validate instances of the model.

__pydantic_extra__: A dictionary containing extra values, if [`extra`][pydantic.config.ConfigDict.extra]
    is set to `'allow'`.
__pydantic_fields_set__: The names of fields explicitly set during instantiation.
__pydantic_private__: Values of private attributes set on the model instance.
class apps:

Provides a high-level interface for managing the lifecycle of Streamlit-based apps within Practicus AI.

The apps class allows you to:

  • Deploy interactive applications (e.g., Streamlit apps) to a Practicus AI region.
  • Obtain short-lived session tokens for interacting with the app’s APIs.
  • Test the application locally within a Practicus AI Worker environment.
  • Integrate and call internal API functions directly for debugging or local execution.
  • Manage multiple versions of an app (deploy new versions, delete old versions).

Key Concepts:

  • App Deployment: Uploading an app (including UI code, API code, and configuration files) to a Practicus AI region so it can be accessed via a stable URL.
  • App Prefix & Name: Identifiers that allow you to maintain multiple versions of an application and route traffic appropriately.
  • Local Testing: Running the app locally on a Practicus AI Worker for development and debugging before final deployment.

Common Usage:

import practicuscore as prt

# Deploy an app to the current region
ui_url, api_url = prt.apps.deploy(
    deployment_setting_key="my_app_service",
    prefix="myapp",
    app_name="app_v1",
    visible_name="My Application",
    description="A demo GenAI Visual App",
    icon="fa:rocket"
)
print("App deployed. UI:", ui_url, "API:", api_url)

# Get a session token for interacting with the app’s API
token = prt.apps.get_session_token(api_url=api_url)
print("Session token:", token)

# Test the app inside a Practicus AI Worker environment
prt.apps.test_app(app_dir="path/to/app/files")

# Delete an old version of the app
prt.apps.delete_version(version="app_v1", prefix="myapp")
@staticmethod
def secure_page( page_title: str | None = None, layout: Literal['centered', 'wide'] = 'centered', initial_sidebar_state: Literal['auto', 'expanded', 'collapsed'] = 'auto', menu_items: Optional[Mapping[Literal['Get help', 'Get Help', 'get help', 'Report a bug', 'report a bug', 'About', 'about'], Optional[str]]] = None, disable_authentication: bool = False, must_be_admin: bool = False):

Secures the current Streamlit page by enforcing user authentication and permissions.

Parameters:

  • page_title (str | None): The title to display on the browser tab. If None, uses default.
  • layout ('Layout'): The page layout, e.g., "centered" or "wide".
  • initial_sidebar_state ('InitialSideBarState'): The initial state of the sidebar, e.g., "auto", "expanded", "collapsed".
  • menu_items (MenuItems | None): Optional custom menu items for the Streamlit menu.
  • disable_authentication (bool): If True, disables authentication checks (useful in development mode).
  • must_be_admin (bool): If True, requires the user to have admin privileges to access this page.

Behavior:

  • In production mode, checks the Practicus AI session to ensure the user is authenticated.
  • If must_be_admin is True, verifies that the authenticated user is an admin.
  • Optionally configures page layout and sidebar.

Example:

prt.apps.secure_page(page_title="Admin Dashboard", must_be_admin=True)
@staticmethod
def get_user_id() -> int | None:

Retrieves the current user's unique identifier.

Returns:

  • int | None: The user’s ID if available, None otherwise.

Behavior:

  • In development mode, returns the user_id from the global context.
  • In production mode, retrieves the user_id from the Streamlit session state if prt_session is present.

Example:

user_id = prt.apps.get_user_id()
if user_id:
    st.write(f"User ID: {user_id}")
else:
    st.write("No user logged in.")
@staticmethod
def get_username() -> str:

Retrieves the current user's username (the part of their email before '@', or a configured username).

Returns:

  • str: The username as a string. Returns an empty string if not found.

Behavior:

  • In development mode, returns the username from global context.
  • In production mode, returns the username from the Streamlit session state if prt_session is available.

Example:

username = prt.apps.get_username()
st.write(f"Hello, {username}")
@staticmethod
def get_user_email() -> str:

Retrieves the current user's email address.

Returns:

  • str: The user’s email if available, or an empty string if not found.

Behavior:

  • In development mode, returns the user_email from global context.
  • In production mode, returns the email from the Streamlit session state if prt_session is available.

Example:

email = prt.apps.get_user_email()
st.write(f"Your email: {email}")
@staticmethod
def get_user_groups(reload=False) -> list[str]:

Retrieves the list of groups to which the current user belongs.

Parameters:

  • reload (bool): If True, forces a reload of the user groups, ignoring any cached values.

Returns:

  • list[str]: A list of group names the user is part of.

Behavior:

  • In development mode, returns the developer’s predefined groups.
  • In production mode, retrieves groups from prt_session or queries the Practicus AI backend if reload is True.

Example:

groups = prt.apps.get_user_groups()
st.write(f"You belong to the following groups: {', '.join(groups)}")
@staticmethod
def user_is_admin() -> bool:

Checks if the current logged-in user is an admin.

Returns:

  • bool: True if the user is an admin, False otherwise.

Example:

if prt.apps.user_is_admin():
    st.write("Welcome, Admin!")
else:
    st.write("You do not have admin privileges.")
@staticmethod
def get_app_id() -> int | None:

Retrieves the ID of the current app.

Returns:

  • int | None: The app ID if available, otherwise None.

Example:

app_id = prt.apps.get_app_id()
st.write(f"Current App ID: {app_id}")
@staticmethod
def get_app_prefix() -> str:

Retrieves the prefix of the current app.

Returns:

  • str: The app prefix.

Example:

prefix = prt.apps.get_app_prefix()
st.write(f"App Prefix: {prefix}")
@staticmethod
def get_app_name() -> str:

Retrieves the name of the current app.

Returns:

  • str: The app name.

Example:

name = prt.apps.get_app_name()
st.write(f"App Name: {name}")
@staticmethod
def get_app_version() -> str:

Retrieves the version of the current app.

Returns:

  • str: The app version.

Example:

version = prt.apps.get_app_version()
st.write(f"App Version: {version}")
@staticmethod
def development_mode() -> bool:

Indicates whether the app is running in development mode or in production (AppHost) mode.

Returns:

  • bool: True if running in development mode; False if running in production mode.

Example:

if prt.apps.development_mode():
    st.write("Running in development mode.")
else:
    st.write("Running in production mode.")
@staticmethod
def deploy( deployment_setting_key: str, prefix: str, app_name: str, app_dir: str | None = None, visible_name: str | None = None, description: str | None = None, icon: str | None = None, *args, **kwargs) -> tuple[str, str]:

Deploys an application to the currently selected Practicus AI region.

This operation packages the app's UI code, configuration, and any supporting files, and uploads them to the Practicus AI environment. Once deployed, the app and its API are accessible through stable URLs.

Parameters:

  • deployment_setting_key (str): The key identifying the application deployment service (set by your admin).
  • prefix (str): A prefix used to group and identify the app and its versions.
  • app_name (str): A unique name for this specific version of the app.
  • app_dir (str | None): The directory containing the app files. If None, uses the current working directory.
  • visible_name (str | None): The user-facing name displayed in the Practicus AI interface.
  • description (str | None): A short description of the app’s purpose or functionality.
  • icon (str | None): A Font Awesome icon name (e.g., "fa:rocket") to represent the app.

Returns:

  • tuple[str, str]: A tuple containing:
    1. The UI URL of the deployed app.
    2. The API URL of the deployed app.

Example:

ui_url, api_url = prt.apps.deploy(
    deployment_setting_key="my_streamlit_service",
    prefix="myapp",
    app_name="app_v1",
    app_dir="app_folder",
    visible_name="MyApp",
    description="A sample Streamlit application",
    icon="fa:rocket"
)
@staticmethod
def get_session_token( api_url: str | None = None, app_id: int | None = None, for_upload: bool = False, *args, **kwargs) -> str:

Obtains a short-lived session token for interacting with the application's API.

Use this token to call the app’s API endpoints, or to upload new versions of the app (if for_upload is True).

Parameters:

  • api_url (str | None): The app’s API URL. E.g. "https://practicus.company.com/apps/my-app/api/v5/"
  • app_id (int | None): The app’s unique identifier. If provided, it can be used instead of api_url.
  • for_upload (bool): If True, requests a token with permissions to upload new versions (admin privileges may be required).

Returns:

  • str: A session token string.

Example:

token = prt.apps.get_session_token(api_url="https://practicus.company.com/apps/my-app/api/v5/")
print("Session token:", token)
@staticmethod
def test_app(app_dir: str | None = None, kill_existing_app: bool = True):

Tests the application by running it locally inside a Practicus AI Worker environment.

This method launches the Streamlit app so you can interact with it through the Practicus AI Studio environment (e.g., via a port-forwarded URL). Useful for iterative development and debugging before deploying to a production environment.

Parameters:

  • app_dir (str | None): The directory containing the app files. If None, uses the current working directory.
  • kill_existing_app (bool): If True, terminates any previously running test app instances on this worker before launching the new one.

Example:

prt.apps.test_app(app_dir="my_app_dir")
# Open the provided URL in Practicus AI Studio to interact with the test app.
@staticmethod
def call_api(api_module: module, payload, is_admin: bool = False):

Calls the run() function of a provided API module, optionally with admin privileges. Useful for local testing and debugging of API logic without deploying.

Parameters:

  • api_module (ModuleType): The Python module representing the API. Must have a run() function.
  • payload: The payload to pass to the run() function. Should match the expected pydantic model if defined.
  • is_admin (bool): If True, simulates calling the API as an admin user.

Returns: The result returned by the run() function of the API module.

Example:

from my_app_apis import my_api_module
result = prt.apps.call_api(my_api_module, payload={"key": "value"})
print("API call result:", result)
@staticmethod
def delete( app_id: int | None = None, prefix: str | None = None, app_name: str | None = None):

Deletes an application from the current region.

Either app_id or both prefix and app_name should be provided to identify the app.

Parameters:

  • app_id (int | None): The unique identifier of the app. If provided, overrides prefix and app_name.
  • prefix (str | None): The app prefix to identify which group of apps it belongs to.
  • app_name (str | None): The name of the app to delete.

Example:

# Delete by app_id
prt.apps.delete(app_id=12345)

# Or delete by prefix and app name
prt.apps.delete(prefix="myapp", app_name="app_v1")
@staticmethod
def delete_version( version: str, app_id: int | None = None, prefix: str | None = None, app_name: str | None = None):

Deletes a specific version of an application from the current region.

Parameters:

  • version (str): The version identifier of the app to delete.
  • app_id (int | None): The app’s unique identifier. If provided, prefix and app_name are ignored.
  • prefix (str | None): The prefix identifying the app group, if app_id is not provided.
  • app_name (str | None): The specific app name, if app_id is not provided.

Example:

# Delete version using app_id
prt.apps.delete_version(version="app_v1", app_id=12345)

# Or delete version using prefix and app_name
prt.apps.delete_version(version="app_v1", prefix="myapp", app_name="my_application")
class workflows:

Provides a simplified interface for managing workflows—such as Airflow DAGs—in Practicus AI.

The workflows class enables you to deploy, generate, and test workflows that define and execute complex data pipelines or ML processes. By abstracting away the underlying integration with Airflow and other systems, it allows you to focus on authoring DAGs, tasks, and configurations without delving into low-level deployment details.

Key Concepts:

  • Workflow: A directed acyclic graph (DAG) representing a set of tasks and their dependencies, orchestrated by a workflow engine like Airflow.
  • Airflow Integration: Practicus AI supports generating and deploying Airflow DAGs to the configured region, enabling fully managed and scheduled execution of your tasks.

Common Usage:

import practicuscore as prt

# Deploy an Airflow DAG to the current region
prt.workflows.deploy(service_key="my_airflow_service", dag_key="my_data_pipeline", files_path="my_dag_files/")

# Generate DAG and task files from a high-level workflow description
prt.workflows.generate_files(
    dag_key="my_data_pipeline",
    dag_flow="extract_data >> transform_data >> load_data",
    files_path="my_dag_files/",
    default_worker_config=prt.WorkerConfig(),
    schedule_interval="@daily"
)

# Test tasks locally by creating temporary workers and running each task script
success_workers, failed_workers = prt.workflows.test_tasks(
    dag_flow="extract_data >> transform_data >> load_data",
    files_path="my_dag_files/",
    default_worker_config=prt.WorkerConfig(),
    terminate_on_success=True
)
@staticmethod
def deploy( service_key: str, dag_key: str, files_path: str | None = None, max_files_to_upload: int = 250):

Deploys a workflow (Airflow DAG) to the currently selected Practicus AI region.

Parameters:

  • service_key (str): The key identifying the workflow orchestration service (e.g., Airflow instance) defined by your Practicus AI administrator.
  • dag_key (str): A unique identifier (name) for the DAG being deployed.
  • files_path (str | None): The directory containing the DAG file and associated task files. If None, defaults to the current working directory.
  • max_files_to_upload (int): The maximum number of files to upload along with the DAG for its tasks.

Behavior: This method packages the specified DAG files, task scripts, and configurations, and uploads them to the Airflow service running in the current region, making the DAG available for scheduling and execution.

Example:

prt.workflows.deploy(
    service_key="my_airflow_service",
    dag_key="my_pipeline",
)
@staticmethod
def get_airflow_params() -> dict:

Retrieves a dictionary of Airflow DAG parameters that can be used to configure Practicus AI workers and services within the Airflow tasks. These parameters often include environment details, service endpoints, and default worker configurations.

Returns:

  • dict: A dictionary of parameters suitable for inclusion in Airflow DAG and task configurations.

Example:

params = prt.workflows.get_airflow_params()
# Use params in your DAG definition to provide runtime configuration
@staticmethod
def run_airflow_task(**kwargs):

Executes an Airflow task inside a DAG at runtime.

This method is primarily intended for internal use within the Airflow DAGs generated by Practicus AI. It receives runtime parameters from Airflow and runs the associated task. Users typically do not call this method directly.

Parameters:

  • kwargs: Dynamic parameters provided by the Airflow scheduler and DAG configuration.

Returns: None

Note: Refer to Practicus AI's Airflow integration documentation or generated DAG code for usage details.

@staticmethod
def get_dag_info(dag_file_path: str) -> tuple[str, str]:

Infers the DAG key and the username (as derived from a predefined folder structure) from a given DAG file path.

Parameters:

  • dag_file_path (str): The full path to the DAG file. For convenience, you can pass __file__ when calling from inside the DAG file.

Returns:

  • tuple[str, str]: A tuple (dag_key, username) extracted from the directory structure.

Example:

dag_key, username = prt.workflows.get_dag_info(__file__)
print(dag_key, username)
@staticmethod
def generate_files( dag_key: str, dag_flow: str, files_path: str | None = None, default_worker_config: WorkerConfig | None = None, custom_worker_configs: list[tuple[str, WorkerConfig]] | None = None, save_credentials: bool = True, overwrite_existing: bool = False, schedule_interval: str | None = None, start_date: datetime.datetime | None = None, retries: int = 0, dag_template: str = 'airflow_dag.j2.py', task_template: str = 'python_task.j2.py'):

Generates Airflow DAG and task files from a given workflow definition (DAG flow).

This utility helps bootstrap the filesystem layout for your workflow code, including:

  • DAG file (Python code)
  • Task scripts (Python files, optionally shell scripts)
  • Worker configuration files

Parameters:

  • dag_key (str): A unique identifier for the DAG.
  • dag_flow (str): A string representing the workflow tasks and their dependencies (e.g. "extract_data >> transform_data >> load_data").
  • files_path (str | None): The directory where DAG and task files will be generated. If None, defaults to the current working directory.
  • default_worker_config (WorkerConfig | None): The default worker configuration applied to all tasks.
  • custom_worker_configs (list[tuple[str, WorkerConfig]] | None): A list of tuples mapping specific task names to custom worker configurations.
  • save_credentials (bool): If True, saves the current user's credentials in the default worker config file.
  • overwrite_existing (bool): If True, overwrites existing files.
  • schedule_interval (str | None): The scheduling interval for the DAG, e.g. "@daily", "0 2 * * THU".
  • start_date (datetime | None): The start date for the DAG scheduling. Defaults to the current datetime.
  • retries (int): The number of retries allowed for tasks in this DAG.
  • dag_template (str): The template filename for the DAG file.
  • task_template (str): The template filename for individual task files.

Example:

prt.workflows.generate_files(
    dag_key="etl_pipeline",
    dag_flow="extract >> transform >> load",
    default_worker_config=prt.WorkerConfig(),
    schedule_interval="@daily",
    start_date=datetime(2024, 1, 1),
    retries=2
)
@staticmethod
def test_tasks( dag_flow: str | None = None, task_list: list[str] | None = None, files_path: str | None = None, default_worker_config: WorkerConfig | None = None, custom_worker_configs: list[tuple[str, WorkerConfig]] | None = None, terminate_on_success: bool = True, terminate_on_failed: bool = False) -> tuple[list[Worker], list[Worker]]:

Tests tasks defined by a given DAG flow or a task list by running them in temporary workers. This allows local validation of code before deploying the full workflow.

Parameters:

  • dag_flow (str | None): A DAG flow string (e.g. "task1 >> task2") defining task dependencies.
  • task_list (list[str] | None): A list of task names to test. Provide either dag_flow or task_list, not both.
  • files_path (str | None): The directory containing the task files. If None, uses the current directory.
  • default_worker_config (WorkerConfig | None): The base worker configuration for running the tasks.
  • custom_worker_configs (list[tuple[str, WorkerConfig]] | None): Per-task overrides for the worker configuration.
  • terminate_on_success (bool): If True, terminates the worker after a successful task run.
  • terminate_on_failed (bool): If True, terminates the worker if the task fails.

Returns:

  • tuple[list[Worker], list[Worker]]: A tuple containing a list of workers that ran successful tasks and a list of workers that ran failed tasks (if they were not terminated).

Example:

success_workers, failed_workers = prt.workflows.test_tasks(
    dag_flow="extract >> transform >> load",
    default_worker_config=prt.WorkerConfig(),
    terminate_on_success=True,
    terminate_on_failed=False
)
class connections:

Provides a high-level interface for managing various data and service connections within Practicus AI.

The connections class includes:

  • Type aliases for different connection configuration types (e.g., S3, SQL databases, Snowflake).
  • Functions to create, update, retrieve, and list all configured connections.
  • A utility to upload files to S3 storage.

By leveraging these methods, you can easily integrate and interact with multiple data sources and external systems in your Practicus AI environment, all through a unified interface.

Key Concepts:

  • Connection Configuration (ConnConf): A structured way to store credentials, endpoints, and parameters necessary to connect to databases, storage services, and other external data systems.
  • Managed Connections: Centralized management of all defined connections, allowing for easy retrieval and updates through code, without needing to hard-code credentials repeatedly.

Common Usage:

import practicuscore as prt

# Get all configured connections
all_connections = prt.connections.get_all()

# Retrieve a specific connection by name
my_db_conn = prt.connections.get("my_database")

# Create a new S3 connection configuration
s3_config = prt.connections.S3ConnConf(
    endpoint_url="...",
    aws_access_key_id="...",
)
prt.connections.create(name="my_s3_conn", conn_conf=s3_config)

# Update an existing connection
updated_config = s3_config.copy(update={"bucket": "my-new-bucket"})
prt.connections.update(name="my_s3_conn", conn_conf=updated_config)

# Upload a file to S3 using the newly created connection
prt.connections.upload_to_s3("my_s3_conn", local_path="data.csv", key="data/data.csv")
@staticmethod
def upload_to_s3(upload_conf: practicuscore.api_base.UploadS3Conf):

Uploads a file to S3 using the specified upload configuration.

This method uses the configuration details in UploadS3Conf—including credentials, target S3 bucket, and key—to upload a local file or in-memory data to S3.

Parameters:

  • upload_conf (UploadS3Conf): A configuration object containing:
    • Access and secret keys or reference to a connection.
    • Bucket name.
    • Local file path or data to upload.
    • S3 key (object path) under which the file will be stored.
    • Other optional parameters like overwrite behavior.

Returns: None. Raises an exception if the upload fails.

@staticmethod
def get_all() -> PrtList[practicuscore.conn_helper.Connection]:

Retrieves a list of all configured connections in the currently active Practicus AI region.

This method returns a list of Connection objects, each representing a previously created or registered connection within the current region's environment. Connections can include various data sources like S3, databases, or custom endpoints.

Returns:

  • PrtList[Connection]: A read-only list of Connection objects.

Example:

import practicuscore as prt

all_conns = prt.connections.get_all()
for conn in all_conns:
    print(conn.name, conn.conn_type)
@staticmethod
def get(uuid_or_name: str) -> practicuscore.conn_helper.Connection | None:

Retrieves a specific connection configuration by its UUID or name.

If you know the unique identifier (UUID) or a user-friendly name of the connection, this method returns the corresponding Connection object. If no connection matches the provided identifier, returns None.

Parameters:

  • uuid_or_name (str): The UUID or name of the desired connection.

Returns:

  • Connection | None: The matching Connection object, or None if not found.

Example:

import practicuscore as prt

my_conn = prt.connections.get("my_database")
if my_conn:
    print("Found connection:", my_conn.name, my_conn.conn_type)
else:
    print("Connection not found.")
@staticmethod
def create( name: str, conn_conf: practicuscore.api_base.ConnConf, tree_path: str | None = None, can_write=True) -> str:

Creates a new connection in the currently active Practicus AI region.

By providing a name and a ConnConf object representing the configuration details, this method registers a new connection that can later be retrieved and used. The optional tree_path parameter can organize connections into a logical hierarchy.

Parameters:

  • name (str): The name of the new connection. Must be unique within the region.
  • conn_conf (ConnConf): The configuration object specifying credentials, endpoints, and properties for the connection type (e.g., S3, MySQL, Snowflake).
  • tree_path (str | None): Optional hierarchical path to store the connection under (e.g., "teamA/projects").
  • can_write (bool): If True, indicates that this connection can be used for write operations.

Returns:

  • str: The UUID of the newly created connection.

Example:

import practicuscore as prt

s3_conf = prt.connections.S3ConnConf(
    ...
)
conn_uuid = prt.connections.create(name="my_s3_conn", conn_conf=s3_conf, can_write=True)
print("Created connection with UUID:", conn_uuid)
@staticmethod
def update( conn_uuid: str, name: str, conn_conf: practicuscore.api_base.ConnConf, tree_path: str | None = None, can_write=True):

Updates an existing connection with new configuration details.

Use this method to modify an existing connection if credentials, endpoints, or other parameters need to be changed. For example, changing the S3 bucket, database host, or default credentials.

Parameters:

  • conn_uuid (str): The UUID of the connection to update.
  • name (str): The new name for the connection (can be the same as before or a new one).
  • conn_conf (ConnConf): The updated connection configuration object.
  • tree_path (str | None): Optional hierarchical path for organizing the connection.
  • can_write (bool): If True, indicates this connection can be used for write operations.

Returns: None. Raises an exception if the update fails or if the connection does not exist.

Example:

import practicuscore as prt

# Retrieve UUID from get or create methods
updated_s3_conf = prt.connections.S3ConnConf(
    ...
)
prt.connections.update(conn_uuid="existing-uuid", name="my_updated_s3_conn", conn_conf=updated_s3_conf)
print("Connection updated successfully.")
class connections.UploadS3Conf(pydantic.main.BaseModel):

Configuration for uploading files to an S3-compatible storage.

This class defines the parameters needed to upload files from a local source to a specified S3 bucket and prefix. It can also handle cutting part of the source path to simplify the uploaded key structure.

Usage Example:

s3_conf = UploadS3Conf(
    bucket="my-bucket",
    prefix="data/backups",
    folder_path="local_data/",
    aws_access_key_id="AKIA...",
    aws_secret_access_key="secret...",
    aws_region="us-east-1"
)
class connections.ConnConfFactory:

Factory class to generate connection configuration classes using a dictionary, json or an instance of the subject class

class connections.ConnConf(practicuscore.api_base.PrtBaseModel):

Base connection configuration class

class connections.S3ConnConf(practicuscore.api_base.ConnConf):

AWS S3 or compatible object storage (e.g. Minio, CEPH, Google Cloud Storage) connection configuration

class connections.SqLiteConnConf(practicuscore.api_base.RelationalConnConf):

SQLite connection configuration

class connections.MYSQLConnConf(practicuscore.api_base.RelationalConnConf):

MYSQL connection configuration

class connections.PostgreSQLConnConf(practicuscore.api_base.RelationalConnConf):

PostgreSQL connection configuration

class connections.RedshiftConnConf(practicuscore.api_base.RelationalConnConf):

AWS Redshift connection configuration

class connections.SnowflakeConnConf(practicuscore.api_base.RelationalConnConf):

Snowflake connection configuration

class connections.MSSQLConnConf(practicuscore.api_base.RelationalConnConf):

Microsoft SQL Server connection configuration

class connections.OracleConnConf(practicuscore.api_base.RelationalConnConf):

Oracle DB or DWH connection configuration

class connections.HiveConnConf(practicuscore.api_base.RelationalConnConf):

Hive connection configuration

class connections.ClouderaConnConf(practicuscore.api_base.RelationalConnConf):

Cloudera connection configuration

class connections.AthenaConnConf(practicuscore.api_base.RelationalConnConf):

AWS Athena connection configuration

class connections.ElasticSearchConnConf(practicuscore.api_base.RelationalConnConf):

ElasticSearch connection configuration

class connections.OpenSearchConnConf(practicuscore.api_base.RelationalConnConf):

OpenSearch connection configuration

class connections.TrinoConnConf(practicuscore.api_base.RelationalConnConf):

Trino connection configuration

class connections.DremioConnConf(practicuscore.api_base.RelationalConnConf):

Dremio connection configuration

class connections.HanaConnConf(practicuscore.api_base.RelationalConnConf):

SAP Hana connection configuration

class connections.TeradataConnConf(practicuscore.api_base.RelationalConnConf):

Teradata connection configuration

class connections.Db2ConnConf(practicuscore.api_base.RelationalConnConf):

IBM DB2 connection configuration

class connections.DynamoDBConnConf(practicuscore.api_base.RelationalConnConf):

AWS DynamoDB connection configuration

class connections.CockroachDBConnConf(practicuscore.api_base.RelationalConnConf):

CockroachDB connection configuration

class connections.CustomDBConnConf(practicuscore.api_base.RelationalConnConf):

Custom sqlalchemy compatible connection configuration

class distributed:

Provides a simplified interface for managing distributed compute jobs within Practicus AI.

The distributed class offers convenient methods and property aliases to inspect and interact with distributed workloads, such as multi-worker or multi-GPU training jobs. It leverages underlying DistJobHelper utilities and configurations to streamline setup and monitoring.

Key Concepts:

  • Distributed Jobs: A set of workers orchestrated together to run a parallel or distributed task, such as large-scale training, hyperparameter tuning, or data processing.
  • Coordinator and Agents: In distributed tasks, a coordinator (rank 0) manages synchronization and coordination, while agent workers (rank ≥ 1) carry out the bulk of the workload.

Common Usage:

import practicuscore as prt

# Check if running in a distributed cluster
if prt.distributed.running_on_a_cluster():
    job_id = prt.distributed.get_job_id()
    gpu_count = prt.distributed.get_gpu_count()
    print(f"Running in distributed mode. Job ID: {job_id}, GPUs: {gpu_count}")

    if prt.distributed.is_coordinator():
        print("This worker is the coordinator. Managing task distribution...")
    else:
        print("This worker is an agent, focusing on processing tasks.")
else:
    print("This worker is not part of a distributed job.")
JobType = <enum 'DistJobType'>

Job type e.g. python, spark, deepspeed etc. using the DistJobType enum

JobConfig = <class 'DistJobConfig'>

Job configuration using DistJobConfig pydantic class

@staticmethod
def wait_for_start( job_dir: str | None = None, job_id: str | None = None, timeout: int = 120, sleep_between_refresh: int = 3, log_state=True):

Waits until the distributed job directory and configuration files appear, indicating that all executors have started (but not necessarily running).

Parameters:

  • job_dir (str | None): The job directory. If None, expects the code is running on the cluster.
  • job_id (str | None): The job ID. If None, expects the code is running on the cluster.
  • timeout (int): Maximum number of seconds to wait before raising a TimeoutError.
  • sleep_between_refresh (int): How often (in seconds) to check for job start.
  • log_state (bool): If True, logs waiting status messages.

Example:

DistJobHelper.wait_for_start(job_dir="/path/to/job_dir", job_id="12345")
@staticmethod
def wait_for_running( job_dir: str | None = None, job_id: str | None = None, timeout: int = 120, sleep_between_refresh: int = 3):

Waits until all executors of a distributed job are in the running state.

Parameters:

  • job_dir (str | None): The job directory. If None, expects the code is running on the cluster.
  • job_id (str | None): The job ID. If None, expects the code is running on the cluster.
  • timeout (int): Maximum number of seconds to wait before raising a TimeoutError.
  • sleep_between_refresh (int): How often (in seconds) to check for executors running.

Example:

DistJobHelper.wait_for_running(job_dir="/path/to/job_dir", job_id="12345")
@staticmethod
def live_view( job_dir: str | None = None, job_id: str | None = None, max_rows: int = 10, sleep_between_refresh: int = 3):

Opens a live view of the distributed job’s state inside a Jupyter environment, refreshing periodically.

Parameters:

  • job_dir (str | None): Job directory or None if running inside the cluster.
  • job_id (str | None): Job ID or None if running inside the cluster.
  • max_rows (int): Maximum number of executor entries to display.
  • sleep_between_refresh (int): How often (in seconds) to refresh the view.

Example:

DistJobHelper.live_view(job_dir="/path/to/job_dir", job_id="12345")
@staticmethod
def view_log( job_dir: str | None = None, job_id: str | None = None, rank: int | None = None):

Displays executor worker logs for a given distributed job executor (identified by rank).

Parameters:

  • job_dir (str | None): Job directory or None if running inside the cluster.
  • job_id (str | None): Job ID or None if running inside the cluster.
  • rank (int | None): The executor rank. If None, uses the current worker’s rank.

Example:

DistJobHelper.view_log(job_dir="/path/to/job_dir", job_id="12345", rank=0)
@staticmethod
def get_client( job_dir: str | None = None, job_id: str | None = None, conn_conf: practicuscore.api_base.ConnConf | None = None, config_dict: dict | None = None):

Acquires a client (or session) for interactive clusters like Dask or Spark.

Parameters:

  • job_dir (str | None): Job directory or None if running on the coordinator.
  • job_id (str | None): Job ID or None if running on the coordinator.
  • conn_conf (ConnConf | None): Optional connection configuration (e.g., credentials for S3 in Spark).
  • config_dict (dict | None): Additional configuration parameters.

Returns:

  • Client/session object appropriate for the job type (e.g., Dask client, Spark session).

Example:

client = DistJobHelper.get_client(job_dir="/path", job_id="12345")
# client could be a Dask client or a Spark session depending on the job type.
@staticmethod
def open_dashboard( job_dir: str | None = None, job_id: str | None = None, get_url_only=False) -> str:

Opens the management dashboard for interactive clusters (Spark, Dask) in a browser or returns the URL.

Parameters:

  • job_dir (str | None): Job directory or None if running on the job cluster.
  • job_id (str | None): Job ID or None if running on the job cluster.
  • get_url_only (bool): If True, returns the dashboard URL without opening it.

Returns:

  • str: The dashboard URL.

Example:

dashboard_url = DistJobHelper.open_dashboard(job_dir="/path", job_id="12345", get_url_only=True)
print("Dashboard URL:", dashboard_url)
@staticmethod
def get_job_id() -> int | None:

Retrieves the active Job ID for this worker if it is part of a distributed job.

Returns:

  • int | None: The Job ID if this worker is part of a distributed job, None otherwise.

Example:

job_id = prt.distributed.get_job_id()
if job_id is not None:
    print(f"This worker belongs to distributed job ID: {job_id}")
else:
    print("Not part of a distributed job.")
@staticmethod
def get_job_rank() -> int | None:

Retrieves the rank of this worker within the distributed job.

The rank is an integer that distinguishes different workers in a job. Typically:

  • Rank 0: Coordinator (master) worker.
  • Rank ≥1: Agent (worker) nodes.

Returns:

  • int | None: The rank of this worker if part of a distributed job, None otherwise.

Example:

rank = prt.distributed.get_job_rank()
if rank == 0:
    print("This is the coordinator node.")
elif rank is not None:
    print(f"This is an agent node with rank: {rank}")
else:
    print("Not part of a distributed job.")
@staticmethod
def get_gpu_count() -> int:

Retrieves the number of GPUs available on this worker.

Returns:

  • int: The number of GPUs detected on this worker.

Example:

gpu_count = prt.distributed.get_gpu_count()
print(f"Available GPUs on this worker: {gpu_count}")
@staticmethod
def running_on_a_cluster() -> bool:

Checks if the current code is running on a worker that is part of a distributed job or cluster.

Returns:

  • bool: True if running as part of a distributed job, False otherwise.

Example:

if prt.distributed.running_on_a_cluster():
    print("Running in distributed mode.")
else:
    print("Running in a standalone worker.")
@staticmethod
def is_coordinator() -> bool:

Checks if this worker is the coordinator (rank 0) of a distributed job.

If the worker is running a distributed job and its rank is 0, or if auto-distributed configuration is detected without a specific rank, it is considered the coordinator.

Returns:

  • bool: True if this worker is the coordinator, False otherwise.

Example:

if prt.distributed.is_coordinator():
    print("I am the coordinator of this distributed job.")
@staticmethod
def is_agent() -> bool:

Checks if this worker is an agent (rank ≥ 1) in a distributed job.

Agents typically carry out workloads assigned by the coordinator.

Returns:

  • bool: True if this worker is an agent in a distributed job, False otherwise.

Example:

if prt.distributed.is_agent():
    print("I am an agent worker node.")
@staticmethod
def validate_job_dir(job_dir: str):

Validates that the specified job directory is accessible to this worker.

In distributed settings, job-related files or configurations may be stored in a shared directory. This method ensures that the directory is present and accessible.

Parameters:

  • job_dir (str): The user-friendly job directory path. It will be translated to the actual directory path by DistJobHelper.

Example:

try:
    prt.distributed.validate_job_dir("/my/job/dir")
    print("Job directory is valid.")
except FileNotFoundError:
    print("Invalid job directory.")
class auth:

Provides a high-level, convenience interface for Practicus AI authentication and authorization tasks.

The auth class exposes commonly used authentication-related methods—such as logging in, logging out, changing passwords, and retrieving tokens—without requiring direct interaction with the underlying regions class or Region objects. This allows developers to quickly manage authentication workflows in a more streamlined manner.

Typical Operations Include:

  • Logging into a Practicus AI region with auth.login(...).
  • Logging out from one or all regions using auth.logout(...).
  • Automatically authenticating within known Practicus AI environments using auth.auto_login().
  • Changing user passwords and retrieving workspace credentials or tokens.

Example:

import practicuscore as prt

# Log in to a Practicus AI region
prt.auth.login(
    url="https://my-practicus-region.example.com",
    email="user@example.com",
    password="mypassword"
)

# Retrieve an access token
access_token = prt.auth.get_access_token()

# Change the default region
prt.auth.set_default_region("user@my-practicus-region.example.com")

# Logout from all known regions
prt.auth.logout(all_regions=True)
@staticmethod
def set_default_region(region_key: str, *args, **kwargs) -> bool:

Sets the default Practicus AI region. Subsequent operations that do not explicitly specify a region will use this default.

Parameters:

  • region_key (str): The region identifier, e.g. username@my-region.example.com or just my-region.example.com if only one user is associated with it.
Returns

True if the region is successfully set as default; False otherwise.

@staticmethod
def login( url: str, email: str, password: str | None = None, refresh_token: str | None = None, save_password: bool = False, access_token: str | None = None, save_config: bool = True, *args, **kwargs) -> Region:

Authenticates the user to a Practicus AI region and returns a Region instance.

The login process uses the provided credentials (password or tokens) to obtain a refresh token and access token. Once authenticated, the region and associated credentials are stored so they can be reused in subsequent sessions, unless save_config is set to False.

Parameters:

  • url (str): The base URL of the Practicus AI region, e.g. "https://my-region.example.com".
  • email (str): The user's email associated with the Practicus AI account.
  • password (str | None): The user's password. Optional if refresh_token or access_token are provided.
  • refresh_token (str | None): Existing refresh token; can be used instead of password-based login.
  • save_password (bool): If True, saves the entered password securely for future sessions.
  • access_token (str | None): An existing valid access token to bypass credential-based login.
  • save_config (bool): If True, persists the configuration so that subsequent sessions do not require login.
Returns

A Region object representing the authenticated Practicus AI environment.

Example:

region = prt.regions.login(
    url="https://my-practicus-region.example.com",
    email="user@example.com",
    password="mypassword",
    save_password=True
)
@staticmethod
def logout( region_key: str | None = None, all_regions: bool = True, *args, **kwargs):

Logs out of one or all Practicus AI regions, removing stored credentials from local configuration.

Parameters:

  • region_key (str | None): The key of the region to logout from. If None and all_regions is False, no action will be taken.
  • all_regions (bool): If True (default), logs out from all known regions. If False, you must specify region_key.
Returns

None

@staticmethod
def auto_login() -> Region:

Automatically logs into the Practicus AI region when running within a known Practicus AI environment (e.g., a worker or workspace) that already has embedded credentials. Useful for automations and internal scenarios where explicit login is not required.

Returns

The default Region instance after auto-login.

@staticmethod
def change_password(old_password: str, new_password: str):

Changes the current user's password in the default Practicus AI region.

Parameters:

  • old_password (str): The current password.
  • new_password (str): The new desired password.
@staticmethod
def get_workspace_credentials(instance_id: str) -> tuple[str, str]:

Retrieves the username and password credentials for a specified workspace instance.

The username is typically derived from the user's email address. The password is a managed credential provided by the region.

Parameters:

  • instance_id (str): The identifier of the workspace. View available instances via region.worker_list.
Returns

A tuple (username, password) for the selected workspace.

@staticmethod
def get_access_token() -> str:

Retrieves a fresh access token for the currently active region using the stored refresh token.

Returns

The new access token as a string.

@staticmethod
def get_refresh_and_access_token() -> tuple[str, str]:

Retrieves both a refresh token and an updated access token for the current region session.

Returns

A tuple (refresh_token, access_token).

class engines:

Provides high-level access to data processing engines like Apache Spark within Practicus AI.

The engines class acts as a simplified interface for working with various data processing engines configured in the Practicus AI environment. This typically involves retrieving or creating Spark sessions that run on Practicus AI Workers. By abstracting away configuration details, it allows developers to focus on their data pipelines and analysis tasks.

Key Concepts:

  • Spark Session: A handle to an Apache Spark cluster environment. You can use it to read and write data, run transformations, and perform distributed computations.
  • Connection Config: A dictionary or JSON configuration that specifies how the Spark session should connect to data sources (e.g., S3, databases).

Common Usage:

import practicuscore as prt

# Get or create a Spark session on a Practicus AI Worker
spark = prt.engines.get_spark_session()

# Perform Spark operations
df = spark.read.csv("s3://mybucket/mydata.csv", header=True)
df.show()

# Delete the Spark session when done
prt.engines.delete_spark_session()

Important: Spark sessions require that your code is running on a Practicus AI Worker. Attempting to run Spark-related operations outside of a Worker environment will result in an error.

@staticmethod
def get_spark_session( conn_conf: dict | str | None = None, extra_spark_conf: dict | None = None, *args, **kwargs) -> Optional[pyspark.sql.session.SparkSession]:

Retrieves or creates a Spark session for distributed data processing within a Practicus AI Worker.

A Spark session can be used to connect to various data sources, run SQL queries, and perform large-scale transformations or machine learning tasks. If a session already exists, it will be reused; otherwise, a new one will be created based on the provided configuration parameters.

Parameters:

  • conn_conf (dict | str | None): Optional. A configuration object specifying data connection details.
  • extra_spark_conf (dict | None): Optional. A dictionary of additional Spark configuration options. For example: {"spark.executor.memory": "4g", "spark.driver.cores": "2"}

Returns:

  • Optional['SparkSession']: A SparkSession object if successful, or None if Spark cannot be initialized.

Example:

spark = prt.engines.get_spark_session(
    extra_spark_conf={"spark.executor.instances": "5"}
)
df = spark.read.csv("s3://my-data-bucket/data.csv", header=True)
df.show()
@staticmethod
def delete_spark_session(*args, **kwargs):

Terminates the currently active Spark session created by Practicus AI.

This method releases the resources associated with the Spark session, including executors and any cached data. It is good practice to delete the Spark session when you have finished your data processing tasks, especially if you anticipate creating a new session later.

Returns: None

Example:

# After finishing all Spark tasks
prt.engines.delete_spark_session()
class experiments:

Provides a simplified interface for managing machine learning experiments in Practicus AI.

The experiments class helps you integrate with various experiment tracking services supported by Practicus AI (e.g., MLFlow) to log parameters, metrics, and artifacts of your machine learning models. By centralizing the configuration process, it allows you to easily initialize and switch between experiment backends without deep knowledge of their internal configurations.

Key Concepts:

  • Experiment Tracking: The practice of recording experiment parameters, metrics, artifacts (models, plots, data snapshots), and metadata in a versioned and searchable manner.
  • MLFlow: A popular open-source platform for managing the ML lifecycle. Practicus AI can configure MLFlow servers to track your experiments.

Common Usage:

import practicuscore as prt

# Configure the experiment tracking to use a particular MLFlow service
prt.experiments.configure(service_name="my-mlflow-service", service_key="abc123", experiment_name="MyExperiment")

# After configuration, your ML code can log metrics and parameters via MLFlow APIs
import mlflow

with mlflow.start_run():
    mlflow.log_param("learning_rate", 0.01)
    mlflow.log_metric("accuracy", 0.95)
@staticmethod
def configure( service_name: str | None = None, service_key: str | None = None, experiment_name: str | None = None):

Configures the machine learning experiment tracking service for the current environment.

This method sets up integration with a configured MLFlow tracking server in Practicus AI. After configuration, you can use the MLFlow Python API to log parameters, metrics, and artifacts of your experiments.

Parameters:

  • service_name (str | None): The name of the Practicus AI experiment service (e.g., an MLFlow deployment) to connect to. If None, uses a default or previously configured service if available.
  • service_key (str | None): A key or token to authenticate and retrieve the appropriate experiment service details. This might be provided by Practicus AI’s console or your platform administrator.

  • experiment_name (str | None): An optional name of the experiment. If provided, this will attempt to set or start a new experiment run under the given name. If not provided, the service will default to a predefined experiment context or wait for you to specify one later.

Returns: None

Behavior and Error Handling:

  • If configuration is successful and service_name references an external MLFlow service, a log message confirms the configuration.
  • If configuration fails, an error message is logged. Common reasons for failure include:
    • Invalid service_name or service_key.
    • Network or credential issues accessing the external tracking server.
  • Exceptions, if any, are logged with stack traces for debugging.

Example:

# Configure with a known MLFlow service and start a new experiment
prt.experiments.configure(
    service_name="my-mlflow-service",
    service_key="myservicekey123",
    experiment_name="MyModelExperiment"
)

# Now you can use MLFlow's Python SDK for tracking:
import mlflow
mlflow.log_param("batch_size", 64)
mlflow.log_metric("loss", 0.4)
class notebooks:

Provides a simplified interface for executing, configuring, and managing Jupyter notebooks programmatically within Practicus AI.

The notebooks class allows you to:

  • Configure default output directories for successful and failed notebook runs.
  • Execute Jupyter notebooks with specified parameters, capturing outputs and handling failures.
  • Keep track of execution history, including which notebooks succeeded or failed.
  • Validate the execution environment (e.g., correct Python virtual environment).

Key Concepts:

  • Notebook Execution: Running a .ipynb file programmatically (via papermill) and capturing the executed output.
  • Output Management: Storing successful and failed notebook outputs in designated directories, optionally with timestamps for versioning or organization.
  • History Tracking: Keeping a record of all notebooks executed, and validating that all completed successfully if desired.

Common Usage:

import practicuscore as prt

# Configure default directories and reset history
prt.notebooks.configure(
    default_output_folder="~/notebook_outputs/success",
    default_failed_output_folder="~/notebook_outputs/failed",
    add_time_stamp_to_output=True,
    reset_history=True
)

# Execute a notebook
prt.notebooks.execute_notebook(
    input_path="~/my_notebook.ipynb",
    parameters={"param1": 42},
    raise_on_failure=True
)

# View execution history
prt.notebooks.view_history()

# Validate all executed notebooks
prt.notebooks.validate_history()
default_output_folder: str | None = None
default_failed_output_folder: str | None = None
successful_notebooks: list[str] = []
failed_notebooks: list[str] = []
@staticmethod
def configure( default_output_folder: str | None = None, default_failed_output_folder: str | None = None, add_time_stamp_to_output=True, reset_history=True):

Configures default output directories and history handling for notebook execution.

Parameters:

  • default_output_folder (str | None): The base directory where successful notebook outputs will be stored. Supports ~ for the home directory. If add_time_stamp_to_output is True, a timestamped subfolder is created.
  • default_failed_output_folder (str | None): The base directory where failed notebook outputs will be stored. Supports ~ for the home directory. If add_time_stamp_to_output is True, a timestamped subfolder is created.
  • add_time_stamp_to_output (bool): If True, appends a timestamp-based folder structure (YYYY-MM-DD/HH-MM-SS) inside the specified output directories.
  • reset_history (bool): If True, clears the record of previously executed notebooks, resetting successful_notebooks and failed_notebooks.

Example:

prt.notebooks.configure(
    default_output_folder="~/outputs/success",
    default_failed_output_folder="~/outputs/failed",
    add_time_stamp_to_output=True,
    reset_history=True
)
@staticmethod
def reset_history():

Clears the record of previously executed notebooks, resetting both successful and failed lists.

Example:

prt.notebooks.reset_history()
print("Execution history has been reset.")
@staticmethod
def view_history():

Logs the summary of executed notebooks: how many succeeded and how many failed.

Behavior:

  • If there are failed notebooks, logs a warning with the count.
  • If there are successful notebooks, logs their count.
  • If no notebooks have been run, logs that no history is available.

Example:

prt.notebooks.view_history()
@staticmethod
def validate_history():

Validates the execution history by checking if any notebooks failed.

Behavior:

  • If any notebooks failed, raises a ValidationFailedError.
  • If all executed notebooks were successful, logs a success message.
  • If no notebooks have been run, logs that no runs are in the history.

Example:

prt.notebooks.validate_history()
@staticmethod
def get_venv_name():

Retrieves the name of the current Python virtual environment.

Returns:

  • str: The current virtual environment name. If using system Python, returns "practicus".

Example:

current_venv = prt.notebooks.get_venv_name()
print(f"Running in venv: {current_venv}")
@staticmethod
def validate_venv(venv_name: str):

Validates that the current environment matches the expected virtual environment name.

Parameters:

  • venv_name (str): The expected virtual environment name.

Behavior:

  • If the current environment does not match venv_name, raises an EnvironmentError.
  • In a Practicus AI Worker, suggests using the correct image or kernel if mismatch is found.

Example:

prt.notebooks.validate_venv("practicus_genai")
@staticmethod
def execute_notebook( input_path, parameters: dict | None = None, output_path=None, failed_output_path=None, raise_on_failure: bool = False, **kwargs):

Executes a Jupyter notebook using papermill, optionally applying parameters.

Parameters:

  • input_path (str): The path to the input .ipynb notebook file. ~ will be expanded to home dir. If no .ipynb suffix, it is appended.
  • parameters (dict | None): A dictionary of parameters to pass to the notebook.
  • output_path (str | None): The path to save the executed notebook. If None, uses default_output_folder or modifies input_path to create an _output.ipynb.
  • failed_output_path (str | None): Custom directory to store failed notebook outputs. If None, uses default_failed_output_folder if configured.
  • raise_on_failure (bool): If True, raises an exception if the notebook fails.
  • **kwargs: Additional arguments passed to papermill.execute_notebook().

Behavior:

  • Executes the notebook with papermill.
  • If successful, logs success and records the notebook in successful_notebooks.
  • If it fails, logs an error, moves the output to the failed directory if available, and records the notebook in failed_notebooks.

Example:

prt.notebooks.execute_notebook(
    input_path="~/analysis.ipynb",
    parameters={"input_data": "dataset.csv"},
    raise_on_failure=True
)
class quality:

Provides convenience methods to check and format code quality using the ruff linting and formatting tool.

The quality class leverages the ruff command-line tool to:

  • Lint code, identifying stylistic, formatting, and potential bug issues.
  • Automatically fix certain issues to maintain a consistent code style.

By default, if no paths are specified, it operates on the current working directory.

Key Concepts:

  • Linting (Check): Scans code for issues and reports them without changing files.
  • Formatting: Attempts to auto-fix and format code to adhere to specified guidelines.

Common Usage:

import practicuscore as prt

# Check code quality in the current directory
success = prt.quality.check()

# Format code in a specific directory
prt.quality.format(paths=["src/"])
@staticmethod
def check( paths: list[str] | None = None, config_path: str | None = None, fix: bool = False, select: list[str] | None = None, ignore: list[str] | None = None) -> bool:

Checks code quality and linting issues using ruff.

Parameters:

  • paths (list[str] | None): Paths to lint. If None, defaults to the current directory.
  • config_path (str | None): Optional ruff config file path.
  • fix (bool): If True, attempts to fix linting issues automatically (default: False).
  • select (list[str] | None): Specific linting rules (error codes) to focus on.
  • ignore (list[str] | None): Specific linting rules (error codes) to ignore.

Returns:

  • bool: True if no issues (or successfully fixed) found; False otherwise.

Example:

# Check the current directory without fixing issues
success = quality.check()

# Check and fix issues in 'src' directory
success = quality.check(paths=["src/"], fix=True)
@staticmethod
def format( paths: list[str] | None = None, config_path: str | None = None, select: list[str] | None = None, ignore: list[str] | None = None):

Formats code to improve code style and consistency.

Unlike check, the format command attempts to rewrite code to conform to linting rules. It does not fix all issues but can apply automatic formatting improvements.

Parameters:

  • paths (list[str] | None): Paths to format. If None, defaults to the current directory.
  • config_path (str | None): Optional ruff config file path.
  • select (list[str] | None): Specific formatting-related rules to enforce.
  • ignore (list[str] | None): Specific formatting-related rules to ignore.

Example:

# Automatically format code in 'src' directory
quality.format(paths=["src/"])
class Region:

Represents a Practicus AI "Region," which is a logical control plane for managing resources such as workers (compute pods), models, applications, workflows, and data connections.

A Region corresponds to an isolated environment often running on Kubernetes. Each Region holds configurations, credentials, and references to available services and data sources. By interacting with a Region, you can:

  • Launch and manage Workers or Workspaces for running code, notebooks, or hosting models.
  • Deploy and manage ML models, view model prefixes and versions.
  • Deploy and manage Streamlit-based applications.
  • Manage data connections (e.g., S3, SQL databases, Snowflake).
  • Work with add-ons and external services integrated into the Practicus AI ecosystem.

Key Concepts:

  • Workers and Workspaces: Kubernetes pods that run code, handle notebooks, or host services.
  • Model and App Deployments: ML models and Streamlit apps deployed as services accessible by stable URLs.
  • Connections: Configurations to external data sources (S3, databases, etc.).
  • Regions & Isolation: Each Region is an isolated namespace, ensuring that resources and configurations do not conflict across environments.

Common Usage:

import practicuscore as prt

# Retrieve the default region
region = prt.get_default_region()

# Launch a worker
worker = region.create_worker()

# List current workers
for w in region.worker_list:
    print(w.name, w.instance_id)

# Deploy a model or app
region.deploy_model(deployment_key="my_model_service", prefix="mymodel", model_name="model_v1")
region.deploy_app(deployment_setting_key="my_app_service", prefix="myapp", app_name="app_v1")

# Manage data connections
conn = region.get_connection("my_database")
print(conn)

# Terminate all workers
region.terminate_all_workers()
Region( url: str, email: str | None = None, username: str | None = None, refresh_token: str | None = None, access_token: str | None = None)

Initializes a new Region instance.

Parameters:

  • url (str): The base URL of the Practicus AI Region.
  • email (str | None): The user's email associated with this Region.
  • username (str | None): The username derived from the email or assigned for this Region.
  • refresh_token (str | None): A refresh token for authenticating calls to the Region.
  • access_token (str | None): An optional pre-obtained access token.
logger = <Logger practicus.core.region (DEBUG)>

Logger configuration for all region related activity

url: str

The base URL of the Practicus AI Region.

email: str | None

The email address of the user associated with this Region session, if known.

username: str | None

The username derived from the user's email or explicitly set for this Region session.

refresh_token: str | None

The refresh token used for authenticating requests in this Region session.

def get_csv_header(self) -> str:
host_dns: str

Retrieves the host DNS of the region by stripping protocol from the URL.

Returns:

  • str: The host DNS (e.g., "my-region.example.com").

Example:

print(region.host_dns)
key: str

A unique key identifying this region in the form "username@host_dns".

Returns:

  • str: The unique region key.

Example:

print(region.key)  # e.g. "alice@my-region.example.com"
is_default: bool

Checks whether this region is configured as the default region.

Returns:

  • bool: True if this is the default region, False otherwise.

Example:

if region.is_default:
    print("This is the default region.")
worker_list: PrtList[Worker]

Retrieves a list of all Workers currently running in this region.

Workers are refreshed periodically. If the cache is stale, this property automatically reloads the worker list from the region.

Returns:

  • PrtList[Worker]: A read-only list of Workers.

Example:

for worker in region.worker_list:
    print(worker.name, worker.service_type)
def reload_worker_list(self, service_type: str | None = None):

Forces a reload of the worker list from the region, optionally filtering by service type (e.g., "cloud_worker", "workspace").

Parameters:

  • service_type (str | None): The service type to filter workers by. If None, returns all workers.

Example:

region.reload_worker_list(service_type="cloud_worker")
def get_default_worker_size(self) -> str:

Retrieves the default worker size for launching new workers if none is specified.

Returns:

  • str: The default worker size name.

Example:

default_size = region.get_default_worker_size()
print("Default worker size:", default_size)
def get_default_worker_image(self, service_type: str = 'cloud_worker') -> str:

Retrieves the default container image for a given service type.

Parameters:

  • service_type (str): The service type (e.g., "cloud_worker", "workspace").

Returns:

  • str: The URL of the default worker image.

Example:

default_image = region.get_default_worker_image()
print("Default worker image:", default_image)
def create_worker( self, worker_config: WorkerConfig | str | dict | None = None, wait_until_ready: bool | None = None, **kwargs) -> Worker:

Creates a new Practicus AI Worker in this region.

If no worker_config is provided, defaults (default image and size) are used. The worker can be a simple "cloud_worker" or a more complex distributed job.

Parameters:

  • worker_config (WorkerConfig | str | dict | None): The worker configuration. Can be a WorkerConfig object, a path to a JSON file, a dict, or None.
  • wait_until_ready (bool | None): If True, waits until the worker is fully ready.

Returns:

  • Worker: The newly created Worker instance.

Example:

worker = region.create_worker()
print("Created worker:", worker.name)
def create_workspace( self, worker_config: WorkerConfig | str | dict | None = None, **kwargs) -> Worker:

Creates a new Practicus AI Workspace.

A Workspace is a specialized worker configured for interactive development (e.g., Jupyter notebooks).

Parameters:

  • worker_config (WorkerConfig | str | dict | None): The workspace configuration. If None, default settings are used.

Returns:

  • Worker: The newly created workspace worker.

Example:

workspace = region.create_workspace()
print("Created workspace:", workspace.name)
def get_local_worker(self, *args, **kwargs) -> Worker:

Retrieves the Worker instance representing the current environment if the code is running inside a worker.

Returns:

  • Worker: The local worker.

Raises:

  • SystemError: If not running inside a worker.

Example:

local_w = region.get_local_worker()
print("Local worker:", local_w.name)
def get_or_create_worker( self, worker_config: WorkerConfig | str | dict | None = None, service_type: str = 'cloud_worker', *args, **kwargs) -> Worker:

Gets an existing worker or creates a new one if none suitable exists.

If worker_config is not provided and no suitable running worker is found, it creates a new one.

Parameters:

  • worker_config (WorkerConfig | str | dict | None): The worker configuration.
  • service_type (str): The service type (e.g. "cloud_worker").

Returns:

  • Worker: An existing or newly created worker.

Example:

worker = region.get_or_create_worker()
print("Worker:", worker.name)
model_prefix_list: PrtList[practicuscore.api_k8s.ModelPrefix]

Retrieves a list of configured model prefixes available in this region.

Model prefixes group related models, allowing you to organize and manage multiple versions and deployments under a logical namespace.

Returns:

  • PrtList[ModelPrefix]: A read-only list of model prefixes.

Example:

for prefix in region.model_prefix_list:
    print(prefix.prefix_name)
model_list: PrtList[practicuscore.api_k8s.ModelMeta]

Retrieves a list of all models known to this region.

Each ModelMeta object contains metadata about a model, such as its name, versions, and associated prefix.

Returns:

  • PrtList[ModelMeta]: A read-only list of models.

Example:

for model in region.model_list:
    print(model.model_name, model.latest_version)
app_list: PrtList[practicuscore.api_k8s.AppMeta]

Retrieves a list of all applications known to this region.

Each AppMeta object contains metadata about the app, such as its prefix, name, versions, and deployment info.

Returns:

  • PrtList[AppMeta]: A read-only list of apps.

Example:

for app in region.app_list:
    print(app.app_name, app.visible_name)
app_prefix_list: PrtList[practicuscore.api_k8s.AppPrefix]

Retrieves a list of app prefixes available in this region.

App prefixes group related applications, providing a logical namespace for organizing multiple apps.

Returns:

  • PrtList[AppPrefix]: A read-only list of app prefixes.

Example:

for prefix in region.app_prefix_list:
    print(prefix.prefix)
def reload_model_prefix_list(self):

Forces a reload of the model prefix list from the region.

If prefixes are stale or changed, calling this method ensures you have the latest data.

Example:

region.reload_model_prefix_list()
def reload_model_list(self):

Forces a reload of the model list from the region.

Ensures that newly created or updated models are reflected locally.

Example:

region.reload_model_list()
def reload_app_list(self):

Forces a reload of the app list from the region.

Ensures that newly deployed or updated apps appear locally.

Example:

region.reload_app_list()
def reload_app_prefix_list(self):

Forces a reload of the app prefix list from the region.

Example:

region.reload_app_prefix_list()
model_deployment_list: PrtList[practicuscore.api_k8s.ModelDeployment]

Retrieves a list of all model deployments currently defined in this region.

A ModelDeployment represents a deployed model service, including routing and scaling details.

Returns:

  • PrtList[ModelDeployment]: A read-only list of model deployments.

Example:

for deployment in region.model_deployment_list:
    print(deployment.model_name, deployment.deployment_key)
app_deployment_setting_list: PrtList[practicuscore.api_k8s.AppDeploymentSetting]

Retrieves a list of all application deployment settings defined in this region.

AppDeploymentSetting provides configurations that influence how apps are deployed (e.g., environment variables, scaling settings).

Returns:

  • PrtList[AppDeploymentSetting]: A read-only list of app deployment settings.

Example:

for setting in region.app_deployment_setting_list:
    print(setting.deployment_setting_key, setting.description)
def reload_model_deployment_list(self):

Forces a reload of the model deployment list from the region.

Ensures newly created or removed deployments are reflected locally.

Example:

region.reload_model_deployment_list()
def reload_app_deployment_setting_list(self):

Forces a reload of the app deployment setting list from the region.

Example:

region.reload_app_deployment_setting_list()
def deploy_model( self, deployment_key: str, prefix: str, model_name: str, model_dir: str | None = None) -> tuple[str, str, str]:

Deploys a ML model to this region.

Parameters:

  • deployment_key (str): The deployment key defined by admin.
  • prefix (str): The model prefix.
  • model_name (str): The model name.
  • model_dir (str | None): Directory containing model files. If None, current directory is used.

Returns:

  • (str, str, str): (Dynamic version API URL, Specific version API URL, Metadata API URL).

Example:

dynamic_url, version_url, meta_url = region.deploy_model("my_model_service", "mymodel", "model_v1")
def deploy_app( self, deployment_setting_key: str, prefix: str, app_name: str, app_dir: str | None = None, visible_name: str | None = None, description: str | None = None, icon: str | None = None) -> tuple[str, str]:

Deploys an application to this region.

Parameters:

  • deployment_setting_key (str): The deployment setting key defined by admin.
  • prefix (str): The app prefix.
  • app_name (str): The app name.
  • app_dir (str | None): The directory with app files. If None, current directory is used.
  • visible_name (str | None): User-facing name of the app.
  • description (str | None): Short description of the app.
  • icon (str | None): Font Awesome icon name.

Returns:

  • (str, str): (UI URL, API URL)

Example:

ui_url, api_url = region.deploy_app("my_app_service", "myapp", "app_v1", app_dir="app_files/")
def delete_app( self, app_id: int | None = None, prefix: str | None = None, app_name: str | None = None):

Deletes an entire application (all versions) from this region.

Parameters:

  • app_id (int | None): The app ID. If not provided, prefix and app_name must be given.
  • prefix (str | None): The app prefix if app_id is not known.
  • app_name (str | None): The app name if app_id is not known.

Example:

region.delete_app(prefix="myapp", app_name="app_v1")
def delete_app_version( self, version: int | str, app_id: int | None = None, prefix: str | None = None, app_name: str | None = None):

Deletes a specific version of an app from this region.

Parameters:

  • version (int | str): The version to delete.
  • app_id (int | None): The app ID. If not provided, prefix and app_name must be given.
  • prefix (str | None): The app prefix.
  • app_name (str | None): The app name.

Example:

region.delete_app_version(version=2, prefix="myapp", app_name="app_v1")
def deploy_workflow( self, service_key: str, dag_key: str, files_path: str | None = None, max_files_to_upload: int = 250, *args, **kwargs):

Deploys a workflow (e.g., an Airflow DAG) to this region.

Parameters:

  • service_key (str): The workflow service key defined by admin (e.g., Airflow).
  • dag_key (str): The DAG key (must be a valid Python module name).
  • files_path (str | None): The directory with workflow files. Defaults to current directory.
  • max_files_to_upload (int): Maximum files to upload.

Example:

region.deploy_workflow(service_key="airflow_service", dag_key="my_pipeline", files_path="workflow_files/")
def logout(self):

Logs out from this region by removing stored credentials.

Example:

region.logout()
print("Logged out from region.")
def set_default(self):

Sets this region as the default region.

Example:

region.set_default()
print("Default region is now:", region.key)
def run_task( self, file_name: str, files_path: str | None = None, worker_config: WorkerConfig | str | dict | None = None, terminate_on_completion: bool = True, capture_task_output: bool = True, python_venv_name: str | None = None, max_files_to_upload: int = 250, *args, **kwargs) -> tuple[Worker | None, bool]:

Runs a remote task (script) on a newly created or existing worker.

The code and dependencies are uploaded to the worker, and the script is executed. If terminate_on_completion is True, the worker is terminated after the task finishes.

Parameters:

  • file_name (str): The Python (.py) or shell (.sh) script to run.
  • files_path (str | None): Directory containing files to upload. Defaults to current directory.
  • worker_config (WorkerConfig | str | dict | None): Configuration for the remote worker.
  • terminate_on_completion (bool): If True, terminates the worker after completion.
  • capture_task_output (bool): If True, captures and logs the script's stdout and stderr.
  • python_venv_name (str | None): Name of a Python virtual environment to use on the worker.
  • max_files_to_upload (int): Max number of files to upload.

Returns:

  • (Worker | None, bool): A tuple of (worker, success_boolean).

Example:

worker, success = region.run_task("analysis.py", files_path="project_code/")
if success:
    print("Task ran successfully!")
def get_model_api_session_token(self, api_url: str, for_upload=False, retry=3) -> str | None:

Retrieves a short-lived session token for interacting with a model's API.

Parameters:

  • api_url (str): The model API URL.
  • for_upload (bool): If True, requests a token with permissions for model uploads.
  • retry (int): Number of times to retry obtaining a token.

Returns:

  • str | None: The session token, or None if unable to fetch.

Example:

token = region.get_model_api_session_token("https://my-region.com/mymodel/", for_upload=True)
if token:
    print("Got token for model API upload.")
def get_app_api_session_token( self, api_url: str | None = None, app_id: int | None = None, for_upload=False, retry=3) -> str | None:

Retrieves a short-lived session token for interacting with an app's API.

Parameters:

  • api_url (str | None): The app API URL.
  • app_id (int | None): The app's unique identifier.
  • for_upload (bool): If True, requests a token for uploading a new app version.
  • retry (int): Number of times to retry obtaining a token.

Returns:

  • str | None: The session token, or None if unable to fetch.

Example:

token = region.get_app_api_session_token(app_id=1234, for_upload=False)
if token:
    print("Got token for app API.")
addon_list: PrtList[practicuscore.addon_helper.AddOn]

Retrieves a list of add-ons (3rd party services) integrated into this region.

Add-ons might represent external analytics tools, monitoring services etc. accessible through the region.

Returns:

  • PrtList[AddOn]: A read-only list of add-ons.

Example:

for addon in region.addon_list:
    print(addon.key, addon.name)
def reload_addon_list(self):

Forces a reload of the add-on list from the region.

Ensures that any newly added or updated add-ons are reflected locally.

Example:

region.reload_addon_list()
def get_addon(self, key: str) -> practicuscore.addon_helper.AddOn | None:

Retrieves an AddOn (external service) by its key.

Parameters:

  • key (str): The unique identifier of the add-on.

Returns:

  • AddOn | None: The matching add-on, or None if not found.

Example:

addon = region.get_addon("my_addon_key")
if addon:
    print("Add-on name:", addon.name)
else:
    print("Add-on not found.")
def open_addon(self, key: str):

Opens the specified add-on in a browser or a suitable interface if supported.

Parameters:

  • key (str): The unique identifier of the add-on to open.

Example:

region.open_addon("my_addon_key")
def get_worker( self, worker_name_or_num: str | int) -> Worker | None:

Removes a worker from the region's internal cache without terminating it.

This is typically used internally. To fully terminate a worker, use terminate_worker().

Parameters:

  • worker_name (str): The name of the worker to remove from the cache.

Example:

region.remove_worker_from_cache("Worker-123")
def get_connection(self, uuid_or_name: str) -> practicuscore.conn_helper.Connection | None:

Retrieves a data connection by UUID or name.

Names are not guaranteed unique, so it's recommended to use UUIDs in production.

Parameters:

  • uuid_or_name (str): The connection's UUID or name.

Returns:

  • Connection | None: The matching connection, or None if not found.

Example:

conn = region.get_connection("my_database")
if conn:
    print("Found connection:", conn.name)
def remove_worker_from_cache(self, worker_name: str):

Removes a worker from the region's internal cache without terminating it.

This is typically used internally. To fully terminate a worker, use terminate_worker().

Parameters:

  • worker_name (str): The name of the worker to remove from the cache.

Example:

region.remove_worker_from_cache("Worker-123")
def terminate_worker( self, worker_name_or_num: str | int | None = None, all_workers: bool = False, instance_id: str | None = None, stop_reason: str = 'sdk requested') -> bool:

Terminates one or all workers in this region.

Parameters:

  • worker_name_or_num (str | int | None): The worker name or numeric suffix (e.g. "Worker-123" or "123").
  • all_workers (bool): If True, terminates all workers in the region.
  • instance_id (str | None): The instance_id of a worker, if not using name/num.
  • stop_reason (str): A reason for stopping, used for logging and audit.

Returns:

  • bool: True if at least one worker was terminated, False otherwise.

Example:

region.terminate_worker(worker_name_or_num="Worker-123")
region.terminate_worker(all_workers=True)
def get_groups(self) -> list[str]:

Retrieves the group memberships of the currently logged-in user.

Returns:

  • list[str]: A list of group names.

Example:

groups = region.get_groups()
print("User groups:", groups)
def terminate_all_workers(self) -> bool:

Terminates all workers in this region.

Returns:

  • bool: True if any workers were terminated, False if none were found.

Example:

region.terminate_all_workers()
connection_list: PrtList[practicuscore.conn_helper.Connection]

Retrieves a sorted list of all data connections defined in this region.

Connections can point to databases, S3 buckets, or other external data sources.

Returns:

  • PrtList[Connection]: A read-only list of connections sorted by name.

Example:

for conn in region.connection_list:
    print(conn.name, conn.uuid)
def reload_connection_list(self):

Forces a reload of the data connection list from the region.

Ensures newly created or updated connections appear locally.

Example:

region.reload_connection_list()
def create_connection( self, name: str, conn_conf: practicuscore.api_base.ConnConf, tree_path: str | None = None, can_write=True) -> str:

Creates a new data connection in this region.

A data connection stores credentials and parameters for accessing external data sources, such as databases or S3 buckets.

Parameters:

  • name (str): A unique name for the new connection.
  • conn_conf (ConnConf): The connection configuration object describing the data source.
  • tree_path (str | None): An optional hierarchical path for organizing the connection, e.g. "teamA/projects".
  • can_write (bool): If True, this connection can be used for write operations.

Returns:

  • str: The UUID of the newly created connection.

Example:

from practicuscore import S3ConnConf, connections

s3_conf = S3ConnConf(access_key="AKIA...", secret_key="secret...", bucket="my-bucket")
uuid = region.create_connection(name="my_s3_conn", conn_conf=s3_conf, can_write=True)
print("Created connection with UUID:", uuid)
def update_connection( self, conn_uuid: str, name: str, conn_conf: practicuscore.api_base.ConnConf, tree_path: str | None = None, can_write=True):

Updates an existing data connection's configuration.

This method allows changing connection details (e.g., credentials, endpoints) or reorganizing how connections are stored (via tree_path).

Parameters:

  • conn_uuid (str): The UUID of the existing connection to update.
  • name (str): The new name for the connection. You can keep the old name or provide a new one.
  • conn_conf (ConnConf): The updated connection configuration.
  • tree_path (str | None): An optional hierarchical path for reorganizing the connection.
  • can_write (bool): If True, the updated connection can be used for write operations.

Example:

# Assume we previously retrieved or created a connection with UUID "abcd1234"
updated_s3_conf = S3ConnConf(access_key="AKIA...", secret_key="new_secret", bucket="my-new-bucket")
region.update_connection(conn_uuid="abcd1234", name="my_updated_s3_conn", conn_conf=updated_s3_conf)
print("Connection updated successfully.")
worker_size_list: PrtList[practicuscore.worker_manager.WorkerSize]

Retrieves a list of worker sizes available in this region.

Worker sizes define the compute resources (CPU, memory) allocated to a worker.

Returns:

  • PrtList[WorkerSize]: A read-only list of worker sizes.

Example:

for size in region.worker_size_list:
    print(size.name, size.default)
def reload_worker_size_list(self):

Forces a reload of the worker size list from the region.

Ensures that any newly added or updated worker sizes are reflected locally.

Example:

region.reload_worker_size_list()
worker_image_list: PrtList[practicuscore.worker_manager.WorkerImage]

Retrieves a list of available worker images in this region.

Worker images define the software environment (base Docker image) used by workers or workspaces.

Returns:

  • PrtList[WorkerImage]: A read-only list of worker images, sorted by priority (order).

Example:

for img in region.worker_image_list:
    print(img.url, img.service_type)
def reload_worker_image_list(self):

Forces a reload of the worker image list from the region.

Ensures that any newly added or updated worker images are reflected locally.

Example:

region.reload_worker_image_list()
def get_access_token(self) -> str:

Retrieves an access token for the region using the stored refresh token.

Returns:

  • str: The access token.

Example:

token = region.get_access_token()
print("Access token:", token)
def get_refresh_and_access_token(self) -> tuple[str, str]:

Retrieves a refreshed access token and refresh token from the region.

Returns:

  • (str, str): (refresh_token, access_token)

Example:

refresh_t, access_t = region.get_refresh_and_access_token()
def get_workspace_credentials(self, instance_id: str) -> tuple[str, str]:

Retrieves the login credentials for a specified workspace.

Parameters:

  • instance_id (str): The instance ID of the workspace.

Returns:

  • (str, str): (username, password)

Example:

user, pwd = region.get_workspace_credentials("instance1234")
print("Workspace creds:", user, pwd)
def recreate_model_deployment(self, model_deployment_key: str):

Re-creates a model deployment, deleting and recreating it. This is an admin operation often used in testing.

Parameters:

  • model_deployment_key (str): The key of the model deployment to recreate.

Example:

region.recreate_model_deployment("my_model_deployment_key")
def change_password(self, old_password: str, new_password: str):

Changes the currently logged-in user's password.

Parameters:

  • old_password (str): The current password.
  • new_password (str): The new password, which must meet complexity requirements.

Example:

region.change_password(old_password="oldpass", new_password="NewP@ssw0rd!")
print("Password changed successfully.")
class Worker:

Represents a compute resource (pod) running in a Practicus AI Region.

A Worker can host notebooks, run scripts, process data, perform distributed jobs, training, and serve as the foundation for various workloads. Workers are managed by the Region and are typically created through methods like region.create_worker() or region.get_local_worker().

Example:

# Creating a worker through the region:
worker = region.create_worker()

# Running a task on the worker:
_, success = worker.region.run_task("analysis.py", files_path="project_code/")
print("Task successful:", success)

# Opening a notebook on the worker:
notebook_url = worker.open_notebook(get_url_only=True)
print("Notebook URL:", notebook_url)

Once you are done with the Worker, you can terminate it to free resources:

worker.terminate()
Worker(region: Region)

Initializes a new Worker instance associated with a given Region.

Normally, you don't directly instantiate Worker. Instead, you obtain Worker instances by calling methods like region.create_worker() or region.get_local_worker().

Parameters:

  • region (Region): The region this worker belongs to.
logger = <Logger practicus.core.worker (DEBUG)>

Logger configuration for all worker related activity

region: Region

The parent (owner) Region of this worker

create_model_response: Optional[practicuscore.api_def.CreateModelResponse]

For internal use

async_op_issue_list: list[practicuscore.core_def.OPResult]

For internal use

async_op_issue_history_list: list[practicuscore.core_def.OPResult]

For internal use

def get_csv_header(self) -> str:
service_type: str | None

The type of service this worker provides (e.g., "cloud_worker", "workspace").

Returns:

  • str | None: The worker's service type.
instance_id: str | None

The unique instance identifier of this worker.

Returns:

  • str | None: The worker's instance ID, or None if not assigned yet.
server_name: str | None

A system-generated server (Kubernetes pod) name for internal use.

Returns:

  • str | None: The server name based on the worker's instance ID.
server_addr: str | None

The internal Kubernetes service address name used within the cluster.

Returns:

  • str | None: The cluster service address name, or None if not assigned.
server_addr_fqdn: str | None

The fully qualified domain name (FQDN) of the worker's service within the Kubernetes cluster.

Returns:

  • str | None: The cluster-local FQDN of the worker's service.
image: str | None

The container image used by this worker.

Returns:

  • str | None: The image string.
image_id: str | None

A parsed image identifier derived from the worker's image.

Returns:

  • str | None: The simplified image ID extracted from the image string.
creation_time: datetime.datetime | None

The timestamp when this worker was created.

Returns:

  • datetime | None: The creation timestamp, or None if not available.
name: str | None

The friendly name of this worker.

Returns:

  • str | None: The worker's name (e.g., "Worker-123").
size: str | None

The worker size, representing the compute resources allocated (e.g., "Medium").

Returns:

  • str | None: The worker size name.
total_mem_gb: float | None

The total memory allocated to this worker, in gigabytes.

Returns:

  • float | None: The total memory in GB.
cpu: float | None

The number of CPU cores allocated to this worker.

Returns:

  • float | None: The count of CPU cores.
gpu: float | None

The number of GPU cores allocated to this worker.

Returns:

  • float | None: The GPU core count.
status: str | None

The current status/state of this worker (e.g., "Running", "Terminated").

Returns:

  • str | None: The worker's status.
job_id: str | None

The job ID, if this worker is part of a distributed job cluster.

Returns:

  • str | None: The distributed job ID, or None if not applicable.
terminating: bool

Indicates whether this worker is in the process of shutting down or already terminated.

Returns:

  • bool: True if the worker is terminating or terminated, False otherwise.
def assign_node(self, node: practicuscore.cloud_def.CloudNode):

Associates the Worker with a specific underlying CloudNode.

Note: This method is intended for internal use and is typically called when the Worker is created.

Parameters:

  • node (CloudNode): The underlying cloud node representing this worker's infrastructure.
@staticmethod
def local_worker_factory( k8s_region: practicuscore.cloud_def.K8sRegion, region: Region) -> Worker:

Creates a Worker instance representing the local environment if the code is running on a Practicus AI Worker.

Note: This is intended for internal use.

Parameters:

  • k8s_region (K8sRegion): The Kubernetes region configuration.
  • region (Region): The parent Region instance.

Returns:

  • Worker: A Worker object representing the local worker environment.
def terminate(self, stop_reason: str = 'sdk requested'):

Terminates the worker, releasing its resources.

Parameters:

  • stop_reason (str): A short reason for termination (used for auditing).

Example:

worker.terminate()
def load( self, conn_conf: dict | str | practicuscore.api_base.ConnConf, engine='AUTO') -> Process:

Loads data into a new Process running on this worker.

This method sets up a data processing environment (like Pandas or Spark) on the worker to interact with data from a given connection configuration.

Parameters:

  • conn_conf (dict | str | ConnConf): The connection configuration for the data source.
  • engine (str): The data processing engine, e.g. "AUTO", "PANDAS", "SPARK".

Returns:

  • Process: A Process object representing the data loading session.
def upload_files( self, source: str, destination: str | None = None, max_files_to_upload: int = 1000) -> str:

Uploads files from the local environment to the worker's filesystem.

Parameters:

  • source (str): The local file or directory path to upload.
  • destination (str | None): The target directory on the worker. Defaults to a predefined upload folder.
  • max_files_to_upload (int): Maximum number of files to upload (to prevent huge accidental uploads).

Returns:

  • str: The directory on the worker where the files were uploaded.

Example:

upload_path = worker.upload_files("data/")
print("Files uploaded to:", upload_path)
def download_files(self, source: str, destination: str):

Downloads files from the worker's filesystem to the local environment.

Parameters:

  • source (str): The file or directory path on the worker to download.
  • destination (str): The local directory to store the downloaded files.

Example:

worker.download_files("results/output.csv", "local_results/")
def run_task( self, task_file_path: str, capture_task_output: bool = True, python_venv_name: str | None = None) -> str:

Runs a Python (.py) or shell (.sh) script on the worker, optionally capturing output.

Parameters:

  • task_file_path (str): The file path on the worker to the script to run.
  • capture_task_output (bool): If True, captures stdout and stderr of the script.
  • python_venv_name (str | None): Name of a Python virtual environment on the worker.

Returns:

  • str: A unique task UUID identifying the running task.

Example:

task_uuid = worker.run_task("/path/to/script.py", capture_task_output=True)
def check_task(self, task_uuid: str) -> bool:

Monitors a running task identified by a task UUID until it completes or fails.

Prints logs periodically and returns True if the task completes successfully, False otherwise.

Parameters:

  • task_uuid (str): The UUID of the task to check.

Returns:

  • bool: True if the task succeeded, False if it failed.

Example:

success = worker.check_task(task_uuid)
print("Task succeeded:", success)
proc_list: PrtList[Process]

Retrieves a read-only list of Process objects created on this worker.

Returns:

  • PrtList[Process]: A list of active processes on this worker.
def remove_proc_from_cache(self, proc_id: int):

Removes a process from the worker's internal cache without killing it.

Note: This is intended for internal use. To fully terminate a process, use kill_proc().

Parameters:

  • proc_id (int): The ID of the process to remove from the cache.
def kill_proc(self, proc_id: int):

Terminates a specific process running on the worker.

Parameters:

  • proc_id (int): The ID of the process to kill.

Example:

worker.kill_proc(proc_id=123)
def kill_all_procs(self):

Terminates all processes running on this worker.

Example:

worker.kill_all_procs()
def open_notebook(self, dark_mode: bool = True, get_url_only: bool = False) -> str:

Opens the Jupyter notebook environment running on this worker in a browser (or returns its URL).

Parameters:

  • dark_mode (bool): If True, the notebook UI is displayed in dark mode.
  • get_url_only (bool): If True, returns the notebook login URL without opening a browser.

Returns:

  • str: The login URL for the notebook.

Example:

url = worker.open_notebook(get_url_only=True)
print("Notebook URL:", url)
def open_vscode( self, dark_mode: bool = True, get_url_only: bool = False) -> tuple[str, str]:

Opens Visual Studio Code (VS Code) environment running on this worker in a browser (or returns its URL and password).

Parameters:

  • dark_mode (bool): If True, the VS Code UI is displayed in dark mode.
  • get_url_only (bool): If True, returns the VS Code URL and token without opening a browser.

Returns:

  • (str, str): The VS Code URL and the authentication token.

Example:

url, token = worker.open_vscode(get_url_only=True)
print("VS Code URL:", url)
print("Token:", token)
def open_workspace(self, get_url_only: bool = False) -> str:

Opens a workspace environment (e.g., a specialized development UI) running on this worker.

Parameters:

  • get_url_only (bool): If True, returns the workspace URL without opening a browser.

Returns:

  • str: The workspace URL.

Example:

url = worker.open_workspace(get_url_only=True)
print("Workspace URL:", url)
def get_workspace_credentials(self) -> tuple[str, str]:

Retrieves login credentials (username, password) for this workspace.

Returns:

  • (str, str): (username, password) for the workspace login.

Example:

user, pwd = worker.get_workspace_credentials()
print("Workspace user:", user)
def ping(self, raise_on_error=True) -> float:

Sends a ping request to the worker to measure responsiveness.

Parameters:

  • raise_on_error (bool): If True, raise an exception if the worker cannot be reached.

Returns:

  • float: The ping time in seconds, or 0.0 if failed and raise_on_error=False.

Example:

latency = worker.ping()
print("Latency:", latency, "seconds")
def wait_until_ready(self, timeout=60) -> float:

Waits until the worker is fully ready to serve requests.

Parameters:

  • timeout (int): Time (in seconds) before raising a TimeoutError if the worker isn't ready.

Returns:

  • float: The final ping time once the worker is ready.

Example:

wait_time = worker.wait_until_ready(timeout=120)
print("Worker ready, ping:", wait_time, "seconds")
def get_logs(self, log_size_mb: int = 1) -> str | None:

Retrieves the latest portion of the worker's logs.

Parameters:

  • log_size_mb (int): The size of logs (in MB) to retrieve, from the bottom of the log file.

Returns:

  • str | None: The logs as a string, or None if no logs are found.

Example:

logs = worker.get_logs(log_size_mb=2)
print("Worker logs:", logs)
def view_logs(self, log_size_mb: int = 1):

Prints the latest portion of the worker's logs to stdout.

For reading logs into a variable, use get_logs().

Parameters:

  • log_size_mb (int): The size of logs (in MB) to print.

Example:

worker.view_logs()
class Process:

Represents a process running on a Practicus AI Worker. A Process corresponds to an actual OS-level process in the worker’s environment. It can load and manipulate data, run transformations, execute code snippets, and integrate with machine learning model building and prediction tasks.

Key Capabilities:

  • Data Loading & Processing: Load data from various sources (Data Lakes, S3, databases, local files) into the worker’s environment, and perform transformations such as filtering, sorting, grouping, and type changes.
  • Running Code & Snippets: Execute custom Python code blocks, SQL queries, and pre-defined snippets directly on the worker.
  • Model Operations: Build, register, and search ML models. Integrate model predictions into your data pipeline.
  • Workflow Integration: Run recorded steps, save and restore workflows, and apply transformations in sequence.
  • Lifecycle Management: Automatically kills itself when going out of scope if used as a context manager, and provides methods to terminate, wait until current operations are complete, and manage logs.

Example:

with worker.load(s3_connection) as proc:
    # Perform data transformations
    proc.filter("price > 100")
    proc.sort_column(["price"])
    # Retrieve updated DataFrame
    df = proc.get_df_copy()
    print(df.head())
# Process is automatically killed at the end of the 'with' block

After the process finishes, you can also run model predictions or save the processed data to another destination.

Process( node: practicuscore.cloud_def.CloudNode, worker: Worker)

Initializes a new Process associated with a given Worker.

Parameters:

  • node (CloudNode): The underlying cloud node (e.g., Kubernetes pod) representing the worker.
  • worker (Worker): The parent Worker instance that this process runs on.
logger = <Logger practicus.core.process (DEBUG)>

Logger specific for

worker: Worker

The parent Worker instance for this process.

ws: practicuscore.ws.Worksheet | None

Manages the internal data (worksheet) and steps executed by this process.

wsm

Manages the internal data (worksheet) state manage by this process.

async_op_issue_list: list[practicuscore.core_def.OPResult]

Tracks issues or asynchronous operation results that occur during process execution.

alive: bool

Indicates if the process is currently alive (running) on the worker.

def get_csv_header(self) -> str:

Returns a CSV header line representing the columns used in the __str__ method.

Returns:

  • str: CSV-formatted header line for process attributes.
proc_id: int

The internal process ID assigned by Practicus AI.

Returns:

  • int: The process ID, or -1 if not available.
os_pid: int

The underlying OS-level PID of the process running on the worker.

Returns:

  • int: The OS PID, or -1 if not available.
conn_conf: practicuscore.api_base.ConnConf | None

The connection configuration used by this process to load data.

Returns:

  • ConnConf | None: The connection configuration, or None if not set.
conn_info: str

A friendly, human-readable description of the current connection configuration.

Returns:

  • str: The connection's long description, or an empty string if unavailable.
worker_name: str

The name of the worker hosting this process.

Returns:

  • str: The worker name, or an empty string if unavailable.
def run_step(self, step: practicuscore.steps.Step, record_step=True):

Executes a single transformation or action (a Step) on the process.

Parameters:

  • step (Step): The step to run, representing a data transformation or action.
  • record_step (bool): If True, records the step in the worksheet history.
@staticmethod
def get_eng(df) -> practicuscore.core_def.PRTEng:

Returns the internal dataengine representation enum

def show_recorded_steps(self):

Lists all recorded steps currently stored in the internal worksheet.

def delete_recorded_steps(self, *args):

Deletes one or more recorded steps from the internal worksheet by their step numbers.

def reset_recorded_steps(self):

Clears all recorded steps from the internal worksheet.

def run_recorded_steps(self):

Executes all previously recorded steps in sequence.

def save_ws( self, file_path: str, sampling_method: str | None = 'TOP', sample_size: int | None = 1000):

Saves the current worksheet, including steps and data sampling settings, to a specified file path.

def get_ws_activity_log(self) -> str:

Retrieves the activity log of all operations performed in the worksheet.

def show_logs(self, raise_error_on_issues=True):

Displays the recorded activity logs and issues from the worker.

Parameters:

  • raise_error_on_issues (bool): If True, raises an error if any issues are found.
def show_issues(self, raise_error_on_issues=True):

Displays any captured asynchronous operation issues from the worker.

Parameters:

  • raise_error_on_issues (bool): If True, raises an error if issues are present.
@staticmethod
def is_local_practicus_svc_active() -> bool:

Checks if the local worker control plane is active

my_k8s_worker: practicuscore.cloud_def.K8sCloudNode

Returns the internal Worker representation object

my_k8s_region: practicuscore.cloud_def.K8sRegion

Returns the internal Region representation object

def get_model_api_session_token(self, api_url: str, for_upload=False, retry=3) -> str | None:

View documentation of Region.get_model_api_session_token

@staticmethod
def get_file_path_in_caller_stack(file_name_or_sub_path: str | None) -> str:

Finds E.g., file.json or some/sub/path/file.py up in the caller stack

def show_head(self):

Prints the first few rows of the DataFrame currently managed by this process.

Note: The client SDK might have limited or no data depending on previous steps and sampling configurations.

def load( self, conn_conf: dict | str | practicuscore.api_base.ConnConf, engine='AUTO'):

Loads data into the process from the specified connection and engine configuration.

Parameters:

  • conn_conf (dict | str | ConnConf): Connection information, can be a JSON/dict or a ConnConf instance.
  • engine (str): The data processing engine, e.g. "AUTO", "PANDAS", "SPARK".
def save(self, connection: dict | str, timeout_min=600):

Saves the current data state to another destination, defined by the given connection configuration.

Parameters:

  • connection (dict | str): The destination connection configuration.
  • timeout_min (int): The maximum number of minutes to wait for completion.
def delete_columns(self, column_list: list[str]):

Deletes column(s) using the provided list

def rename_column(self, from_column_name: str, to_column_name: str):

Renames a single column from one name to another.

def rename_columns(self, columns_dict: dict):

Renames multiple columns at once using a dictionary mapping old names to new names.

def change_column_type(self, column_name: str, column_type: str):

Changes the data type of a specified column.

def filter(self, filter_expression: str):

Filters the DataFrame rows based on a provided conditional expression.

def one_hot(self, column_name: str, column_prefix: str):

Converts a categorical column into multiple one-hot encoded columns.

def categorical_map(self, column_name: str, column_suffix: str):

Maps categorical values of a specified column into numeric or string representations.

def split_column(self, column_name: str, split_using: str):

Splits the values of a column into multiple parts using a specified delimiter.

def handle_missing( self, technique: str, custom_value: str, column_list: list[str] | None = None):

Handles missing values in the specified columns using a given technique or a custom value.

def sort_column(self, column_list: list[str], ascending: list[bool] | None = None):

Sorts the DataFrame by specified columns in ascending or descending order.

def group_by_column(self, columns: list[str], aggregation: dict):

Groups the DataFrame by specified columns and applies aggregation functions.

def time_sample( self, date_column: str, summary_column: str, summary_method: str, frequency: str):

Performs time-based sampling or aggregation of the data using a date column and specified frequency.

def update_values(self, column_name: str, old_value: str, new_value: str):

Replaces occurrences of an old value with a new value in a specified column.

def run_formula(self, new_column_name: str, formula_expression: str):

Creates a new column by evaluating a formula expression involving existing columns.

def run_custom_code(self, custom_function):

Runs a custom Python function's code on the worker.

Parameters:

  • custom_function: A Python function object whose source code is executed remotely.
def run_snippet(self, snippet_name: str, **kwargs):

Runs a predefined snippet (a .py file) from the caller's directory stack.

Parameters:

  • snippet_name (str): The snippet's name (without .py extension).
  • **kwargs: Parameters passed to the snippet's code.
def run_custom_sql(self, custom_sql: str, sql_table_name: str):

Executes a custom SQL query against the current data, treating it as a table with a given name.

def build_model( self, model_config: dict | str, timeout_min=300) -> practicuscore.api_def.ModelConfig | None:

Builds a machine learning model on the worker using the provided configuration.

Parameters:

  • model_config (dict | str): The model configuration as a dictionary or JSON string.
  • timeout_min (int): Maximum minutes to wait for model build completion.

Returns:

  • ModelConfig | None: The finalized model configuration if successful, else None.
def register_model(self):

Registers the last built AI model, making it available in MLFlow or related model registries.

def find_model(self, model_text: str) -> practicuscore.api_def.ModelSearchResults | None:

Searches for models that match a given text query using MlFlow.

Parameters:

  • model_text (str): The search string or query.

Returns:

  • ModelSearchResults | None: The search results, or None if none found.
def predict( self, api_url: str, api_token: str | None = None, column_names: list[str] | None = None, new_column_name: str | None = None, ground_truth_col: str | None = None, model_id: int | None = None, batch_size: int | None = None, compression_algo: str | None = None):

Runs predictions against a deployed model API endpoint.

Parameters:

  • api_url (str): The model API URL.
  • api_token (str | None): The access token for the model API. If None, attempts to get one automatically.
  • column_names (list[str] | None): The columns to include as input features.
  • new_column_name (str | None): The name for the new prediction column.
  • ground_truth_col (str | None): If provided, compares predictions against a ground truth column.
  • model_id (int | None): Specific model ID to query if multiple versions exist.
  • batch_size (int | None): Batch size for prediction requests.
  • compression_algo (str | None): Optional compression for data transfer.
def predict_with_offline_model( self, column_names: list[str] | None = None, new_column_name: str | None = None, future_horizon: int | None = None, mlflow_model_uri: str | None = None, model_conf_path: str | None = None, model_conf: str | None = None, problem_type: str | None = None):

Makes predictions using an offline model available locally (e.g., MLflow model URI).

Parameters:

  • column_names (list[str] | None): Input feature columns.
  • new_column_name (str | None): Column name for predictions.
  • future_horizon (int | None): For forecasting, how many steps into the future to predict.
  • mlflow_model_uri (str | None): URI for the MLflow model.
  • model_conf_path (str | None): Path to a model configuration file.
  • model_conf (str | None): Model configuration in JSON/dict form.
  • problem_type (str | None): Type of problem (e.g., classification, regression, forecasting).
def join( self, conn_conf: str | dict, left_key_col_name: str, right_key_col_name: str, right_ws_name=None, join_technique='Left', suffix_for_overlap='_right', summary_column=False):

Joins the current DataFrame with another dataset from a specified connection.

Parameters:

  • conn_conf (str | dict): The connection configuration of the data source to join.
  • left_key_col_name (str): The key column in the current DataFrame.
  • right_key_col_name (str): The key column in the target dataset.
  • right_ws_name (str | None): Optional name for the right worksheet.
  • join_technique (str): The join type ("Left", "Right", "Inner", "Outer").
  • suffix_for_overlap (str): Suffix added to overlapping columns.
  • summary_column (bool): If True, adds a summary column of join results.
def wait_until_done(self, timeout_min=600):

Waits until all operations and steps have finished executing on the worker, or until the given timeout elapses.

Parameters:

  • timeout_min (int): Maximum number of minutes to wait.
def get_df_copy(self, timeout_min=600) -> pandas.core.frame.DataFrame:

Retrieves a copy of the DataFrame currently held by the process after ensuring all operations are done.

Parameters:

  • timeout_min (int): Maximum number of minutes to wait until done.

Returns:

  • pd.DataFrame: A pandas DataFrame representing the processed data.
def kill(self, kill_async=False):

Terminates the process on the worker, freeing resources.

Parameters:

  • kill_async (bool): If True, attempts an asynchronous kill without waiting for confirmation.

Note: Once killed, the process cannot be reused and a new one must be started for further operations.

def get_logger(log: Log):

Retrieves a logger instance associated with a given Log enumeration value.

Logging is critical for monitoring the behavior of the SDK and diagnosing issues. The Practicus AI SDK uses named loggers for different components, allowing you to configure their levels and outputs independently.

Parameters:

  • log (Log): The Log enum value representing the desired logger.

Returns:

  • logging.Logger: A configured logger instance for the specified log component.

Example:

from practicuscore import get_logger, Log

logger = get_logger(Log.SDK)
logger.info("This is an informational message from the SDK logger.")
class Log(enum.Enum):

Enumeration of loggers used throughout the Practicus AI SDK, each corresponding to a specific subsystem or feature.

CORE = <Log.CORE: 'practicus.core'>
SDK = <Log.SDK: 'practicus.core.sdk'>
REGION = <Log.REGION: 'practicus.core.region'>
WORKER = <Log.WORKER: 'practicus.core.worker'>
PROCESS = <Log.PROCESS: 'practicus.core.process'>
CLI = <Log.CLI: 'practicus.core.cli'>
WEB_SOCKET = <Log.WEB_SOCKET: 'practicus.core.websocket'>
ENGINES = <Log.ENGINES: 'practicus.core.engines'>
AIRFLOW = <Log.AIRFLOW: 'practicus.core.airflow'>
APPS = <Log.APPS: 'practicus.core.apps'>
GEN_AI = <Log.GEN_AI: 'practicus.core.gen_ai'>
TEST = <Log.TEST: 'practicus.core.test'>
NOTEBOOKS = <Log.NOTEBOOKS: 'practicus.core.notebooks'>
DISTRIBUTED = <Log.DISTRIBUTED: 'practicus.core.distributed'>
def set_logging_level(log_level: str | None = None, modules_log_level: str = ''):

Adjusts the logging level globally or for specific modules within the Practicus AI SDK and its dependencies.

By default, logs may be set to a certain level. This function allows you to raise or lower the verbosity of logs depending on your debugging needs. For instance, setting the log level to DEBUG can help troubleshoot complex issues by providing more detailed output, while INFO or WARNING might be sufficient for normal operations.

Parameters:

  • log_level (str | None): The global logging level to apply. Accepts standard Python logging levels (e.g., "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"). If None, does not change the global level.
  • modules_log_level (str): A comma-separated list of module-specific logging levels. For example:
    • "practicus:DEBUG" sets the logging level for the "practicus" module to DEBUG.
    • "urllib3:INFO" sets the "urllib3" module to INFO.
    • "*:DEBUG" sets DEBUG level for all modules. Multiple rules can be combined by separating them with commas, e.g. "practicus:DEBUG,urllib3:INFO".

Returns:

  • None

Example:

from practicuscore import set_logging_level

# Set the global logging level to INFO
set_logging_level(log_level="INFO")

# Set DEBUG level for practicus-related logs and INFO for urllib3
set_logging_level(modules_log_level="practicus:DEBUG,urllib3:INFO")
class WorkerConfig(pydantic.main.BaseModel):

Defines a worker configuration for launching Practicus AI Workers.

Usage Example:

worker_config = WorkerConfig(
    worker_image="practicus",
    worker_size="Small",
)
worker_image: str | None

The container image to be used for this worker.

If you provide a simple image name like practicus-gpu-torch, it will be expanded to a full image name (e.g. ghcr.io/practicusai/practicus-gpu-torch) with a default version. You can also specify a fully qualified image such as my-container-repo/my-container:some-version.

Note: Custom container images must be based on a Practicus AI-compatible base image.

worker_size: str | None

The worker size indicating the CPU, RAM, and GPU resources allocated to the worker.

Example: "Small", "Medium", "Large".

service_type: str | None

The type of service this worker represents, typically "cloud_worker" or "workspace". If omitted, defaults to "cloud_worker".

network_protocol: str | None

The network protocol to use for this worker. Valid values are "http" or "https". If omitted, the worker will choose a suitable default.

distributed_config: DistJobConfig | None

Configuration for distributed jobs (e.g., Spark, Dask, Torch).

If provided, it defines the parameters and ports for running a distributed cluster.

startup_script: str | None

An optional startup script (shell commands) to be run when the worker starts. This should be a small script encoded as plain text.

log_level: str | None

The log level for the worker process itself. Examples: "DEBUG", "INFO", "WARNING", "ERROR". If omitted, defaults to a region-level or system default.

modules_log_level: str | None

A module-specific log level configuration, if you want certain modules to log at different levels.

bypass_ssl_verification: bool | None

Set this to "True" if you need to bypass SSL certificate verification. Generally not recommended unless working with trusted but self-signed certs.

image_pull_policy: str | None

Policy for pulling the container image. Valid values: "Always", "IfNotPresent", "Never". If omitted, a suitable default is used.

interactive: bool | None

Indicates if the worker should be run in an interactive mode, e.g. allowing shell access or interactive sessions.

service_url: str | None

An optional service URL. For special use-cases where the worker might need to connect to a particular endpoint (e.g., a custom model host), you can specify it here.

email: str | None

An optional user email associated with this worker's configuration, if needed for authentication or logging.

refresh_token: str | None

An optional refresh token for authentication against certain services.

If provided, the worker might use it to obtain a fresh access token automatically.

@field_validator('service_type')
def validate_service_type(cls, value) -> str | None:
@field_validator('network_protocol')
def validate_network_protocol(cls, value) -> str | None:
@field_validator('image_pull_policy')
def validate_image_pull_policy(cls, value) -> str | None:
@model_validator(mode='before')
def validate_model(cls, values):
model_config = {'validate_assignment': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

@classmethod
def model_validate(cls, data, **kwargs):

Validate a pydantic model instance.

Args: obj: The object to validate. strict: Whether to enforce types strictly. from_attributes: Whether to extract data from object attributes. context: Additional context to pass to the validator.

Raises: ValidationError: If the object could not be validated.

Returns: The validated model instance.

@classmethod
def model_validate_json(cls, json_data: str | bytes | bytearray, **kwargs):

Validates and creates a WorkerConfig object from a JSON string.

Parameters:

  • json_data: The JSON-encoded string, bytes, or bytearray containing the worker config data.

Returns:

  • WorkerConfig: A validated WorkerConfig instance.

Raises:

  • ValueError: If the JSON is invalid or required fields are missing.
def model_dump(self, **kwargs):

Usage docs: https://docs.pydantic.dev/2.9/concepts/serialization/#modelmodel_dump

Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.

Args: mode: The mode in which to_python should run. If mode is 'json', the output will only contain JSON serializable types. If mode is 'python', the output may contain non-JSON-serializable Python objects. include: A set of fields to include in the output. exclude: A set of fields to exclude from the output. context: Additional context to pass to the serializer. by_alias: Whether to use the field's alias in the dictionary key if defined. exclude_unset: Whether to exclude fields that have not been explicitly set. exclude_defaults: Whether to exclude fields that are set to their default value. exclude_none: Whether to exclude fields that have a value of None. round_trip: If True, dumped values should be valid as input for non-idempotent types such as Json[T]. warnings: How to handle serialization errors. False/"none" ignores them, True/"warn" logs errors, "error" raises a [PydanticSerializationError][pydantic_core.PydanticSerializationError]. serialize_as_any: Whether to serialize fields with duck-typing serialization behavior.

Returns: A dictionary representation of the model.

def model_dump_json(self, indent: int | None = None, **kwargs):

Serializes the WorkerConfig to a JSON string, including any additional parameters.

Parameters:

  • indent (int | None): Indentation for pretty-printing JSON. Defaults to 4 if not set.
  • **kwargs: Additional arguments passed to model_dump.

Returns:

  • str: A JSON-encoded string of the WorkerConfig.
model_fields: ClassVar[Dict[str, pydantic.fields.FieldInfo]] = {'worker_image': FieldInfo(annotation=Union[str, NoneType], required=False, default=None), 'worker_size': FieldInfo(annotation=Union[str, NoneType], required=False, default=None), 'service_type': FieldInfo(annotation=Union[str, NoneType], required=False, default=None), 'network_protocol': FieldInfo(annotation=Union[str, NoneType], required=False, default=None), 'distributed_config': FieldInfo(annotation=Union[DistJobConfig, NoneType], required=False, default=None, exclude=True), 'startup_script': FieldInfo(annotation=Union[str, NoneType], required=False, default=None, exclude=True), 'log_level': FieldInfo(annotation=Union[str, NoneType], required=False, default=None, exclude=True), 'modules_log_level': FieldInfo(annotation=Union[str, NoneType], required=False, default=None, exclude=True), 'bypass_ssl_verification': FieldInfo(annotation=Union[bool, NoneType], required=False, default=None, exclude=True), 'image_pull_policy': FieldInfo(annotation=Union[str, NoneType], required=False, default=None, exclude=True), 'interactive': FieldInfo(annotation=Union[bool, NoneType], required=False, default=None, exclude=True), 'service_url': FieldInfo(annotation=Union[str, NoneType], required=False, default=None), 'email': FieldInfo(annotation=Union[str, NoneType], required=False, default=None), 'refresh_token': FieldInfo(annotation=Union[str, NoneType], required=False, default=None)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

model_computed_fields: ClassVar[Dict[str, pydantic.fields.ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

class DistJobType(builtins.str, enum.Enum):

Represents the type of distributed job or cluster environment you want to create and manage.

Different distributed frameworks have varying requirements and behaviors. By specifying a DistJobType, you inform the Practicus AI platform how to set up, start, and manage the underlying distributed environment.

Types:

  • python: A generic Python-based distributed job.
  • torch: A PyTorch-based distributed training job.
  • deepspeed: A DeepSpeed-based distributed training job.
  • fairscale: A FairScale-based distributed training job.
  • horovod: A Horovod-based distributed training job.
  • spark: A Spark-based distributed job or interactive cluster.
  • dask: A Dask-based distributed job or interactive cluster.
  • custom: A user-defined distributed job type with a custom adaptor.
python = <DistJobType.python: 'python'>
torch = <DistJobType.torch: 'torch'>
deepspeed = <DistJobType.deepspeed: 'deepspeed'>
fairscale = <DistJobType.fairscale: 'fairscale'>
horovod = <DistJobType.horovod: 'horovod'>
spark = <DistJobType.spark: 'spark'>
dask = <DistJobType.dask: 'dask'>
custom = <DistJobType.custom: 'custom'>
@classmethod
def from_value(cls, value: str | enum.Enum) -> DistJobType:
class DistJobConfig(pydantic.main.BaseModel):

Configuration for distributed jobs in Practicus AI.

A distributed job involves multiple worker nodes cooperating to run a large-scale task, such as Spark, Dask, or Torch-based training jobs. This configuration defines how the cluster is formed, how many workers, memory, and CPU resources to allocate, as well as additional parameters like job directories, Python files, and termination conditions.

Usage Example:

dist_conf = DistJobConfig(
    job_type=DistJobType.deepspeed,
    job_dir="/path/to/job_dir",
    worker_count=10,
    py_file="job.py"
)
DistJobConfig(**data)

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

job_type: DistJobType

Specifies the type of distributed job (e.g., Spark, Dask, Torch, Python).

job_dir: str | None

Directory containing job code and related files. For non-auto-distributed Spark and Dask jobs, and for all other job types, this must be provided.

auto_distributed: bool | None

If True and job_type is Spark, the cluster is managed automatically (auto-scaling, etc.). Currently only supported for Spark.

worker_count: int | None

(alias) Sets initial_count and max_count to the same value, resulting in a fixed cluster size. Use worker_count if you are not auto-scaling.

initial_count: int | None

Set the initial number of workers. If not using worker_count, you must specify both initial_count and max_count.

max_count: int | None

Set the maximum number of workers. If not using worker_count, must be set along with initial_count.

coordinator_port: int | None

The coordinator (master) port. If left empty, a suitable default is used based on job_type.

additional_ports: list[int] | None

List of extra ports for worker communication. Leave empty to use defaults. Most job types do not need these.

custom_adaptor: str | None

Specifies a custom Python class (adaptor) extending job handling logic. Must refer to a class accessible at runtime.

terminate_on_completion: bool | None

If True, terminates all workers after job completion. Set to False to keep the cluster alive for further exploration, experiments, or debugging.

capture_script_output: bool | None

If True, captures and logs stdout/stderr of job scripts (e.g. .py, .sh). Disable if already logging to avoid duplicates.

service_mesh_sidecar: bool | None

By default disabled for performance. If True, enables service mesh sidecars for encrypted traffic between workers.

job_start_timeout_seconds: int | None

Time in seconds to wait for the cluster to fully start before timing out.

retries: int | None

Number of retries if the job fails, useful for transient failures.

sleep_between_retries: int | None

Seconds to wait between retries.

py_file: str | None

The Python file to run. If empty, defaults may apply (e.g. job.py or train.py).

py_venv_name: str | None

The name of a Python virtual environment (under ~/.venv/) to use. Leave empty for the default venv.

log_in_run_dir: bool | None

If True, places logs and artifacts in the run directory. Leave empty for defaults.

measure_utilization: bool | None

If True, measures system and GPU utilization periodically.

measure_utilization_interval: int | None

Interval in seconds for measuring system and GPU usage if measure_utilization is True.

coordinator_is_worker: bool | None

If True, coordinator also acts as a worker. Default True if unset. If False, coordinator doesn't run tasks, freeing resources.

processes: int | None

Number of processes/executors per worker node. For Spark, this is the executor count per node; for Dask, the worker count.

threads: int | None

Number of threads per executor/process. In Spark, corresponds to executor cores; in Dask, --nthreads per worker.

memory_gb: int | None

Memory limit per executor/process in GB. For Spark, maps to executor/driver memory; for Dask, --memory-limit.

executors: list[practicuscore.api_base.DistJobExecutor] | None

(Read-only) A list of executor definitions, set by the system after cluster creation.

@model_validator(mode='before')
def validate_model(cls, values):
model_config = {'validate_assignment': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, pydantic.fields.FieldInfo]] = {'job_type': FieldInfo(annotation=DistJobType, required=True), 'job_dir': FieldInfo(annotation=Union[str, NoneType], required=False, default=None), 'auto_distributed': FieldInfo(annotation=Union[bool, NoneType], required=False, default=None), 'worker_count': FieldInfo(annotation=Union[int, NoneType], required=False, default=None, exclude=True), 'initial_count': FieldInfo(annotation=Union[int, NoneType], required=False, default=None), 'max_count': FieldInfo(annotation=Union[int, NoneType], required=False, default=None), 'coordinator_port': FieldInfo(annotation=Union[int, NoneType], required=False, default=None), 'additional_ports': FieldInfo(annotation=Union[list[int], NoneType], required=False, default=None), 'custom_adaptor': FieldInfo(annotation=Union[str, NoneType], required=False, default=None), 'terminate_on_completion': FieldInfo(annotation=Union[bool, NoneType], required=False, default=None), 'capture_script_output': FieldInfo(annotation=Union[bool, NoneType], required=False, default=None), 'service_mesh_sidecar': FieldInfo(annotation=Union[bool, NoneType], required=False, default=None), 'job_start_timeout_seconds': FieldInfo(annotation=Union[int, NoneType], required=False, default=None), 'retries': FieldInfo(annotation=Union[int, NoneType], required=False, default=None), 'sleep_between_retries': FieldInfo(annotation=Union[int, NoneType], required=False, default=None), 'py_file': FieldInfo(annotation=Union[str, NoneType], required=False, default=None), 'py_venv_name': FieldInfo(annotation=Union[str, NoneType], required=False, default=None), 'log_in_run_dir': FieldInfo(annotation=Union[bool, NoneType], required=False, default=None), 'measure_utilization': FieldInfo(annotation=Union[bool, NoneType], required=False, default=None), 'measure_utilization_interval': FieldInfo(annotation=Union[int, NoneType], required=False, default=None), 'coordinator_is_worker': FieldInfo(annotation=Union[bool, NoneType], required=False, default=None), 'processes': FieldInfo(annotation=Union[int, NoneType], required=False, default=None), 'threads': FieldInfo(annotation=Union[int, NoneType], required=False, default=None), 'memory_gb': FieldInfo(annotation=Union[int, NoneType], required=False, default=None), 'executors': FieldInfo(annotation=Union[list[DistJobExecutor], NoneType], required=False, default=None)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

model_computed_fields: ClassVar[Dict[str, pydantic.fields.ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

class PrtList(list[~T], typing.Generic[~T]):

A specialized list that can be toggled as read-only. It also provides utilities for converting its items into CSV, pandas DataFrames, and JSON formats.

Key Features:

  • Inherits all behavior from the built-in list, but adds a read-only flag.
  • If read_only is set to True, any attempt to modify the list will raise a ValueError.
  • Supports easy data exporting to CSV (via __str__), pandas DataFrame (via to_pandas), and JSON (via to_json).
  • Can convert its contents into a list of dictionaries (via to_dict_list), making it easier to manipulate or serialize.
read_only: bool

Check if the list is currently read-only.

Returns

True if read-only, False otherwise.

def to_pandas(self):

Convert the list to a pandas DataFrame by reading the CSV representation.

Returns

A pandas.DataFrame containing the list's data.

def to_json(self, prettify: bool = True) -> str:

Convert the list to a JSON string, using the CSV data as the source.

Parameters
  • prettify: If True, the JSON is formatted with indentation. If False, the JSON is compact.
Returns

A JSON string of the list's data.

def to_dict_list(self) -> list[dict]:

Convert the list to a list of dictionaries by parsing the CSV representation.

Returns

A list of dictionaries representing each row/item in the list.

@staticmethod
def get_default_region(*args, **kwargs) -> Region:

Retrieves the default Practicus AI region. The default region is typically the one last logged-in to or explicitly set using set_default_region.

Returns

The default Region instance.

@staticmethod
def create_worker( worker_config: WorkerConfig | str | dict | None = None, wait_until_ready: bool | None = None, **kwargs) -> Worker:

Creates a new remote Practicus AI Worker in the current or specified region.

A worker is a computational pod that can run code, host Jupyter notebooks, build models etc. By default, it uses the current region unless worker_config points to another region.

Parameters:

  • worker_config (WorkerConfig | str | dict | None): Optional configuration for the worker. Accepts a JSON path, a dict, or a WorkerConfig object. If None, uses the default configuration.
  • wait_until_ready (bool | None): If True, the method waits until the worker is fully provisioned and ready.
Returns

A Worker instance representing the newly created remote pod.

@staticmethod
def current_region(*args, **kwargs) -> Region:

Returns the currently active Practicus AI region. If the code is running inside a worker, this is the region associated with that worker. Otherwise, this returns the default configured region.

Returns

A Region object for the current environment.

@staticmethod
def get_local_worker(*args, **kwargs) -> Worker:

Retrieves the Worker instance representing the current environment if the code is running inside a worker pod.

Returns: A Worker instance associated with the local environment.

Note: If not running inside a worker, this will raise an error.

@staticmethod
def get_or_create_worker( worker_config: WorkerConfig | str | dict | None = None, **kwargs) -> Worker:

Attempts to retrieve an existing worker (if it matches the provided configuration) or creates one if not found.

This is useful for idempotent deployments where you do not want to create duplicates if the worker already exists.

Parameters:

  • worker_config (WorkerConfig | str | dict | None): The configuration to check against existing workers. If not provided, defaults are used.
Returns

A Worker instance, either existing or newly created.

@staticmethod
def running_on_a_worker() -> bool:

Checks if the current code is executing inside a Practicus AI Worker.

This is useful for conditional logic that depends on whether the code runs locally or on a remote Practicus AI-managed environment.

Returns

True if this code is running inside a Practicus AI Worker pod; False otherwise.

@staticmethod
def get_region( region_key: str | None = None, *args, **kwargs) -> Region:

Retrieves a specific Practicus AI region based on the provided region key or returns the default region.

Parameters:

  • region_key (str | None): A region identifier in either username@region_address or region_address format. If not provided, the default region is returned.

Returns: A Region object.

Example:

# If multiple regions are available:
region = prt.regions.get_region("alice@my-practicus-region.example.com")

# If none provided, defaults to last used or configured default:
default_region = prt.regions.get_region()
@staticmethod
def get_region_list() -> PrtList[Region]:

Retrieves a list of all configured and logged-in Practicus AI regions.

Each entry in the returned list is a Region instance containing connection and authentication info.

Returns

A read-only PrtList of Region objects.

@staticmethod
def region_factory( worker_config: WorkerConfig | str | dict | None = None) -> Region:

Creates or retrieves a Region instance based on a provided worker configuration or returns the current/default region if none is provided.

This is useful in contexts where you may have a serialized or external configuration that specifies a region.

Parameters:

  • worker_config (WorkerConfig | str | dict | None): A configuration object or JSON path/dict that may contain region connection details. If None, returns the current region.
Returns

A Region instance determined by the provided configuration or the current region.

@staticmethod
def set_default_region(region_key: str, *args, **kwargs) -> bool:

Sets the default Practicus AI region. Subsequent operations that do not explicitly specify a region will use this default.

Parameters:

  • region_key (str): The region identifier, e.g. username@my-region.example.com or just my-region.example.com if only one user is associated with it.
Returns

True if the region is successfully set as default; False otherwise.

@staticmethod
def create_workspace( worker_config: WorkerConfig | str | dict | None = None, **kwargs) -> Worker:

Creates a new Practicus AI Workspace (a special type of Worker) in the selected region.

A workspace is a worker configured for interactive development and includes Practicus AI Studio, office tools and more.

Parameters:

  • worker_config (WorkerConfig | str | dict | None): Configuration for the workspace. Accepts a JSON path, a dict, or WorkerConfig. If None, uses the default configuration.
Returns

A Worker instance configured as a workspace.

@staticmethod
def run_task( file_name: str, files_path: str | None = None, worker_config: WorkerConfig | str | dict | None = None, terminate_on_completion: bool = True, capture_task_output: bool = True, python_venv_name: str | None = None, max_files_to_upload: int = 250, *args, **kwargs) -> tuple[Worker | None, bool]:

Runs a specified script (Python or shell) as a "task" on a newly created remote worker.

Common uses include running batch jobs, scheduled tasks, or CI/CD pipeline steps in a controlled environment.

Parameters:

  • file_name (str): The path to the script to run (e.g. "run_analysis.py" or "deploy.sh").
  • files_path (str | None): The directory containing all necessary files to upload. If None, uses current directory.
  • worker_config (WorkerConfig | str | dict | None): Configuration for the worker to run this task.
  • terminate_on_completion (bool): If True, the worker is terminated after the task finishes.
  • capture_task_output (bool): If True, captures and logs stdout/stderr from the task's execution.
  • python_venv_name (str | None): Name of the Python virtual environment on the worker to use.
  • max_files_to_upload (int): Maximum number of files to upload from files_path.
Returns

A tuple of (Worker, bool) where Worker is the worker used or created for this task, and bool indicates if the task succeeded.