practicuscore
Practicus AI SDK v24.8.2
Overview
Welcome to the Practicus AI SDK, a Python library that allows you to interact with Practicus AI Regions, manage Workers, deploy and manage machine learning Models and Apps, and orchestrate distributed jobs.
This SDK provides an intuitive Pythonic interface for common operations, such as:
- Connecting to Practicus AI Regions and managing their resources.
- Creating and working with Workers and Workspaces.
- Building and deploying ML models and GenAI apps, managing data connections
- Running distributed workloads (Spark, Dask, Torch) seamlessly.
- Accessing notebooks, experiments tracking, and more.
Key Functionality
Below are some of the helper classes, interfaces and functions for interacting with Practicus AI platform.
Helper Classes
These static wrapper classes provide convenient entrypoints
and abstractions
for common operations,
allowing you to work with Practicus AI resources efficiently:
regions
: Manage and interact with Practicus AI Regions, handle login, workers, etc.models
: Deploy and manage ML models, view model prefixes and versions.apps
: Deploy and manage GenAI focused apps and APIs and their versions.workflows
: Deploy and manage workflows (e.g., Airflow DAGs).connections
: Manage and create data source connections (e.g., S3, SQL).distributed
: Work with distributed job clusters and frameworks.auth
: Authentication helpers for logging in and out of regions.engines
: Interact with data processing engines like Spark.experiments
: Manage experiment tracking services, such as MLFlow.notebooks
: Access Jupyter notebooks running on workers.quality
: Code quality (linting, formatting) utilities.
Core Classes
These core classes represent the primary entities you'll interact with the most:
Region
: Represents a Practicus AI Region (control plane) where you can manage workers, models, apps, and connections.Worker
: Represents a compute resource (pod) in the Region where you can run tasks, load data, open notebooks, and more.Process
: Represents an OS-level process running on a Worker that can load and manipulate data, run transformations, execute code snippets, build models, and perform predictions—all directly on the Worker.
Sample Usage
import practicuscore as prt
# Connect to the default region
region = prt.get_default_region()
# Create a worker
worker = region.create_worker()
# Deploy a model
dynamic_url, version_url, meta_url = region.deploy_model(
deployment_key="my-model-service", prefix="my-prefix", model_name="my-model"
)
# Run a task on a worker
_, success = prt.run_task("analysis.py", files_path="project_code/")
print("Task successful:", success)
Alias Functions
In addition to using the Region
or it's helper regions
classes directly, the SDK provides some alias functions
as shortcuts that directly map to commonly used methods in a selected (or default) region. This allows you to perform
actions without explicitly fetching or referencing a Region
object first:
get_default_region()
: Retrieves the default Practicus AI region previously configured.create_worker()
: Creates a new worker (pod) in the default or current region.current_region()
: Returns the currently selected region, if any.get_local_worker()
: Returns the worker representing the current environment if the code is running inside one.get_or_create_worker()
: Retrieves an existing worker or creates a new one if none is suitable.running_on_a_worker()
: Checks if the current code is executing on a Practicus AI worker.get_region()
: Retrieves a specific region by key (username@host_dns).get_region_list()
: Returns a list of all configured and accessible regions.region_factory()
: Creates aRegion
instance from a given configuration.set_default_region()
: Changes which region is considered the default.create_workspace()
: Creates a new Practicus AI Workspace (an interactive development environment).run_task()
: Runs a given Python or shell script task on a worker in the default or specified region.
These aliases simplify calls to region-dependent functions, making code more concise and direct. For example:
import practicuscore as prt
# Instead of:
region = prt.get_default_region()
worker = region.create_worker()
# You can do:
worker = prt.create_worker()
Practicus AI Documentation
For help on getting started with the Practicus AI platform and tutorials, please visit: Practicus AI Documentation
Provides a simplified, high-level interface for managing and interacting with Practicus AI Regions.
A Region
in Practicus AI represents a logical control plane environment—typically isolated via
Kubernetes namespaces—where resources such as workers, workspaces, model hosting, applications
and tasks are managed. The regions
class offers convenient static methods to authenticate, select,
and operate on these remote execution environments without requiring direct instantiation of
Region
objects.
By default, operations on regions occur against the currently active (default) region, but the regions
class allows selecting other regions, logging in or out of different regions, and performing tasks like
launching workers or running remote scripts. It abstracts away the complexities of region-specific APIs
and credentials.
Key Concepts:
Region
: A Practicus AI control plane endpoint. Multiple regions can be configured and managed.Worker
: A Kubernetes pod in a region that executes code, runs notebooks, tasks, distributed jobs, builds models, apps..
Common Usage:
import practicuscore as prt
# Login to a specific region
region = prt.regions.login(
url="https://my-practicus-region.example.com",
email="user@example.com",
password="mypassword"
)
# Set the default region (if multiple are configured)
prt.regions.set_default_region("user@my-practicus-region.example.com")
Checks if the current code is executing inside a Practicus AI Worker.
This is useful for conditional logic that depends on whether the code runs locally or on a remote Practicus AI-managed environment.
Returns
True if this code is running inside a Practicus AI Worker pod; False otherwise.
Authenticates the user to a Practicus AI region and returns a Region
instance.
The login process uses the provided credentials (password or tokens) to obtain a refresh token and
access token. Once authenticated, the region and associated credentials are stored so they can be
reused in subsequent sessions, unless save_config
is set to False.
Parameters:
- url (str): The base URL of the Practicus AI region, e.g. "https://my-region.example.com".
- email (str): The user's email associated with the Practicus AI account.
- password (str | None): The user's password. Optional if refresh_token or access_token are provided.
- refresh_token (str | None): Existing refresh token; can be used instead of password-based login.
- save_password (bool): If True, saves the entered password securely for future sessions.
- access_token (str | None): An existing valid access token to bypass credential-based login.
- save_config (bool): If True, persists the configuration so that subsequent sessions do not require login.
Returns
A
Region
object representing the authenticated Practicus AI environment.
Example:
region = prt.regions.login(
url="https://my-practicus-region.example.com",
email="user@example.com",
password="mypassword",
save_password=True
)
Logs out of one or all Practicus AI regions, removing stored credentials from local configuration.
Parameters:
- region_key (str | None): The key of the region to logout from. If None and
all_regions
is False, no action will be taken. - all_regions (bool): If True (default), logs out from all known regions. If False, you must specify
region_key
.
Returns
None
Retrieves a specific Practicus AI region based on the provided region key or returns the default region.
Parameters:
- region_key (str | None): A region identifier in either
username@region_address
orregion_address
format. If not provided, the default region is returned.
Returns:
A Region
object.
Example:
# If multiple regions are available:
region = prt.regions.get_region("alice@my-practicus-region.example.com")
# If none provided, defaults to last used or configured default:
default_region = prt.regions.get_region()
Retrieves the default Practicus AI region. The default region is typically the one last logged-in to
or explicitly set using set_default_region
.
Returns
The default
Region
instance.
Returns the currently active Practicus AI region. If the code is running inside a worker, this is the region associated with that worker. Otherwise, this returns the default configured region.
Returns
A
Region
object for the current environment.
Creates or retrieves a Region
instance based on a provided worker configuration or returns the
current/default region if none is provided.
This is useful in contexts where you may have a serialized or external configuration that specifies a region.
Parameters:
- worker_config (WorkerConfig | str | dict | None): A configuration object or JSON path/dict that may contain region connection details. If None, returns the current region.
Returns
A
Region
instance determined by the provided configuration or the current region.
Sets the default Practicus AI region. Subsequent operations that do not explicitly specify a region will use this default.
Parameters:
- region_key (str): The region identifier, e.g.
username@my-region.example.com
or justmy-region.example.com
if only one user is associated with it.
Returns
True if the region is successfully set as default; False otherwise.
Creates a new remote Practicus AI Worker in the current or specified region.
A worker is a computational pod that can run code, host Jupyter notebooks, build models etc.
By default, it uses the current region unless worker_config
points to another region.
Parameters:
- worker_config (WorkerConfig | str | dict | None): Optional configuration for the worker. Accepts
a JSON path, a dict, or a
WorkerConfig
object. If None, uses the default configuration. - wait_until_ready (bool | None): If True, the method waits until the worker is fully provisioned and ready.
Returns
A
Worker
instance representing the newly created remote pod.
Creates a new Practicus AI Workspace (a special type of Worker) in the selected region.
A workspace is a worker configured for interactive development and includes Practicus AI Studio, office tools and more.
Parameters:
- worker_config (WorkerConfig | str | dict | None): Configuration for the workspace. Accepts a JSON path,
a dict, or
WorkerConfig
. If None, uses the default configuration.
Returns
A
Worker
instance configured as a workspace.
Attempts to retrieve an existing worker (if it matches the provided configuration) or creates one if not found.
This is useful for idempotent deployments where you do not want to create duplicates if the worker already exists.
Parameters:
- worker_config (WorkerConfig | str | dict | None): The configuration to check against existing workers. If not provided, defaults are used.
Returns
A
Worker
instance, either existing or newly created.
Runs a specified script (Python or shell) as a "task" on a newly created remote worker.
Common uses include running batch jobs, scheduled tasks, or CI/CD pipeline steps in a controlled environment.
Parameters:
- file_name (str): The path to the script to run (e.g. "run_analysis.py" or "deploy.sh").
- files_path (str | None): The directory containing all necessary files to upload. If None, uses current directory.
- worker_config (WorkerConfig | str | dict | None): Configuration for the worker to run this task.
- terminate_on_completion (bool): If True, the worker is terminated after the task finishes.
- capture_task_output (bool): If True, captures and logs stdout/stderr from the task's execution.
- python_venv_name (str | None): Name of the Python virtual environment on the worker to use.
- max_files_to_upload (int): Maximum number of files to upload from
files_path
.
Returns
A tuple of
(Worker, bool)
where Worker is the worker used or created for this task, and bool indicates if the task succeeded.
Automatically logs into the Practicus AI region when running within a known Practicus AI environment (e.g., a worker or workspace) that already has embedded credentials. Useful for automations and internal scenarios where explicit login is not required.
Returns
The default
Region
instance after auto-login.
Changes the current user's password in the default Practicus AI region.
Parameters:
- old_password (str): The current password.
- new_password (str): The new desired password.
Retrieves a fresh access token for the currently active region using the stored refresh token.
Returns
The new access token as a string.
Retrieves both a refresh token and an updated access token for the current region session.
Returns
A tuple
(refresh_token, access_token)
.
Retrieves the username and password credentials for a specified workspace instance.
The username is typically derived from the user's email address. The password is a managed credential provided by the region.
Parameters:
- instance_id (str): The identifier of the workspace. View available instances via
region.worker_list
.
Returns
A tuple
(username, password)
for the selected workspace.
Provides a high-level interface for managing machine learning models within Practicus AI.
The models
class allows you to:
- Deploy models to a Practicus AI region for serving predictions and model metadata.
- Generate and manage model configurations, including model signatures and metadata.
- Acquire short-lived session tokens for model APIs (e.g., for prediction or uploading a new model version).
- Package and unpackage model artifacts using a standardized zip archive, simplifying model deployment flows.
Key Concepts:
- Model Deployment: Uploading a model and its associated files (artifacts, weights, metadata) to a server accessible via a stable API endpoint.
- Model Config & Signature: Capturing and storing metadata about the model’s input schema, output schema, problem type, and version information. This information is crucial for reproducibility, explainability, and consistent model version management.
Common Usage:
import practicuscore as prt
# Deploy a model to the current region
prt.models.deploy(
deployment_key="my_model_service",
prefix="mymodel",
model_name="model_v1",
)
Deploys a model to the currently selected Practicus AI region.
This operation uploads model artifacts and registers them under the specified model name and prefix,
making them accessible via stable API endpoints. If multiple versions of a model exist, the prefix
can be used to route requests to the latest version dynamically.
Parameters:
- deployment_key (str): The key identifying the model deployment service, provided by your Practicus AI admin.
- prefix (str): A prefix used to group and identify model versions under a common name.
- model_name (str): The name of this specific model version.
- model_dir (str | None): The directory containing model files. If None, uses the current working directory.
Returns:
- tuple[str, str, str]: A tuple containing:
- API URL for dynamic version routing.
- API URL for the uploaded version.
- API URL to read metadata of the model.
Example:
api_url_dynamic, api_url_version, api_url_metadata = prt.models.deploy(
deployment_key="my_model_service",
prefix="mymodel",
model_name="model_v1",
)
Retrieves a short-lived session token to interact with a model's API—either for making predictions (inference) or uploading a new model version.
Parameters:
- api_url (str): The base URL of the model API, e.g. "https://practicus.company.com/models/my-model/".
- for_upload (bool): If True, requests a token with permissions suitable for model uploads.
- retry (int): Number of retry attempts if the token request fails.
Returns:
- str | None: A token string if successful, or None if unsuccessful.
Example:
token = prt.models.get_session_token("https://practicus.company.com/models/mymodel/", for_upload=True)
if token:
print("Got a session token for model uploads!")
else:
print("Failed to retrieve a session token.")
Loads a ModelConfig
object from a dictionary or a JSON string/file.
Parameters:
- model_config (dict | str):
- If dict, directly interprets it as model configuration.
- If str, treats it as either a JSON string or a path to a JSON file, and parses it.
Returns:
- ModelConfig | None: A
ModelConfig
instance if successful, or None if parsing fails.
Example:
model_conf = prt.models.load_config("model_config.json")
print("Loaded model config:", model_conf)
Infers the MLflow data type for a given DataFrame column. MLflow requires explicit schema types for model signatures, and this method maps Pandas dtypes to MLflow data types.
Parameters:
- df (pd.DataFrame): The DataFrame containing the data.
- col_name (str): The column name to infer the type for.
Returns:
- DataType: An MLflow DataType object (e.g., DataType.double, DataType.string).
Example:
mlflow_dtype = prt.models.get_mlflow_data_type(df, "age")
print("Inferred MLflow data type:", mlflow_dtype)
Generates a MLflow ModelSignature
object from a given DataFrame and optional target column.
A model signature captures input and output schema for a model. The target
column (if provided)
is treated as the output schema, and all other columns are treated as inputs.
Parameters:
- df (pd.DataFrame): The DataFrame representing the model’s training or inference data.
- target (str | None): The column name representing the model's target variable, if any.
Returns:
- ModelSignature: An MLflow ModelSignature object describing the model’s input and output schema.
Example:
signature = prt.models.get_model_signature(df, target="label")
print("Model signature:", signature)
Generates a JSON representation of the model signature from a DataFrame and optional target.
Parameters:
- df (pd.DataFrame): The data to infer schema from.
- target (str | None): The target column for output schema, if applicable.
Returns:
- str: A JSON string representing the model signature.
Example:
signature_json = prt.models.get_model_signature_json(df, target="label")
print(signature_json)
Creates a ModelConfig
instance from a DataFrame and metadata about the model.
This method generates a model signature and bundles it with additional information like model name, problem type, version, and performance score.
Parameters:
- df (pd.DataFrame): DataFrame from which to infer the model’s input/output schema.
- target (str | None): The target column for output schema, if any.
- model_name (str | None): A name for the model.
- problem_type (str | None): The type of problem (e.g., "classification", "regression").
- version_name (str | None): A version identifier for the model.
- final_model (str | None): A reference or path to the final model artifact.
- score (float | None): A performance metric value (e.g., accuracy or RMSE).
Returns:
- ModelConfig: The created
ModelConfig
object.
Example:
model_conf = prt.models.create_model_config(
df, target="label", model_name="my_classifier", problem_type="classification", score=0.95
)
print(model_conf)
Packages selected files into a model.zip
archive for model deployment.
By default, Practicus AI may automatically upload certain files (like environment configuration).
If you include these files in model.zip
, it may cause ambiguity. Hence, certain known files are skipped.
Parameters:
- files_to_add (list[str]): A list of filenames (within
model_dir
) to include in the zip file. - model_dir (str | None): The directory containing the files. If None, defaults to the current directory.
Example:
prt.models.zip(["trained_model.pkl", "requirements.txt"], model_dir="path/to/model/files")
Extracts files from model.zip
into the specified directory.
Parameters:
- model_dir (str | None): The directory where
model.zip
is located and where files will be extracted. If None, defaults to the current directory.
Example:
prt.models.unzip(model_dir="path/to/model/files")
# All files from model.zip are now extracted into model_dir.
Usage docs: https://docs.pydantic.dev/2.9/concepts/models/
A base class for creating Pydantic models.
Attributes:
__class_vars__: The names of the class variables defined on the model.
__private_attributes__: Metadata about the private attributes of the model.
__signature__: The synthesized __init__
[Signature
][inspect.Signature] of the model.
__pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__: The core schema of the model.
__pydantic_custom_init__: Whether the model has a custom `__init__` function.
__pydantic_decorators__: Metadata containing the decorators defined on the model.
This replaces `Model.__validators__` and `Model.__root_validators__` from Pydantic V1.
__pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to
__args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__: The name of the post-init method for the model, if defined.
__pydantic_root_model__: Whether the model is a [`RootModel`][pydantic.root_model.RootModel].
__pydantic_serializer__: The `pydantic-core` `SchemaSerializer` used to dump instances of the model.
__pydantic_validator__: The `pydantic-core` `SchemaValidator` used to validate instances of the model.
__pydantic_extra__: A dictionary containing extra values, if [`extra`][pydantic.config.ConfigDict.extra]
is set to `'allow'`.
__pydantic_fields_set__: The names of fields explicitly set during instantiation.
__pydantic_private__: Values of private attributes set on the model instance.
Provides a high-level interface for managing the lifecycle of Streamlit-based apps within Practicus AI.
The apps
class allows you to:
- Deploy interactive applications (e.g., Streamlit apps) to a Practicus AI region.
- Obtain short-lived session tokens for interacting with the app’s APIs.
- Test the application locally within a Practicus AI Worker environment.
- Integrate and call internal API functions directly for debugging or local execution.
- Manage multiple versions of an app (deploy new versions, delete old versions).
Key Concepts:
- App Deployment: Uploading an app (including UI code, API code, and configuration files) to a Practicus AI region so it can be accessed via a stable URL.
- App Prefix & Name: Identifiers that allow you to maintain multiple versions of an application and route traffic appropriately.
- Local Testing: Running the app locally on a Practicus AI Worker for development and debugging before final deployment.
Common Usage:
import practicuscore as prt
# Deploy an app to the current region
ui_url, api_url = prt.apps.deploy(
deployment_setting_key="my_app_service",
prefix="myapp",
app_name="app_v1",
visible_name="My Application",
description="A demo GenAI Visual App",
icon="fa:rocket"
)
print("App deployed. UI:", ui_url, "API:", api_url)
# Get a session token for interacting with the app’s API
token = prt.apps.get_session_token(api_url=api_url)
print("Session token:", token)
# Test the app inside a Practicus AI Worker environment
prt.apps.test_app(app_dir="path/to/app/files")
# Delete an old version of the app
prt.apps.delete_version(version="app_v1", prefix="myapp")
Secures the current Streamlit page by enforcing user authentication and permissions.
Parameters:
- page_title (str | None): The title to display on the browser tab. If None, uses default.
- layout ('Layout'): The page layout, e.g., "centered" or "wide".
- initial_sidebar_state ('InitialSideBarState'): The initial state of the sidebar, e.g., "auto", "expanded", "collapsed".
- menu_items (MenuItems | None): Optional custom menu items for the Streamlit menu.
- disable_authentication (bool): If True, disables authentication checks (useful in development mode).
- must_be_admin (bool): If True, requires the user to have admin privileges to access this page.
Behavior:
- In production mode, checks the Practicus AI session to ensure the user is authenticated.
- If
must_be_admin
is True, verifies that the authenticated user is an admin. - Optionally configures page layout and sidebar.
Example:
prt.apps.secure_page(page_title="Admin Dashboard", must_be_admin=True)
Retrieves the current user's unique identifier.
Returns:
- int | None: The user’s ID if available, None otherwise.
Behavior:
- In development mode, returns the user_id from the global context.
- In production mode, retrieves the user_id from the
Streamlit
session state ifprt_session
is present.
Example:
user_id = prt.apps.get_user_id()
if user_id:
st.write(f"User ID: {user_id}")
else:
st.write("No user logged in.")
Retrieves the current user's username (the part of their email before '@', or a configured username).
Returns:
- str: The username as a string. Returns an empty string if not found.
Behavior:
- In development mode, returns the username from global context.
- In production mode, returns the username from the
Streamlit
session state ifprt_session
is available.
Example:
username = prt.apps.get_username()
st.write(f"Hello, {username}")
Retrieves the current user's email address.
Returns:
- str: The user’s email if available, or an empty string if not found.
Behavior:
- In development mode, returns the user_email from global context.
- In production mode, returns the email from the
Streamlit
session state ifprt_session
is available.
Example:
email = prt.apps.get_user_email()
st.write(f"Your email: {email}")
Retrieves the list of groups to which the current user belongs.
Parameters:
- reload (bool): If True, forces a reload of the user groups, ignoring any cached values.
Returns:
- list[str]: A list of group names the user is part of.
Behavior:
- In development mode, returns the developer’s predefined groups.
- In production mode, retrieves groups from
prt_session
or queries the Practicus AI backend ifreload
is True.
Example:
groups = prt.apps.get_user_groups()
st.write(f"You belong to the following groups: {', '.join(groups)}")
Checks if the current logged-in user is an admin.
Returns:
- bool: True if the user is an admin, False otherwise.
Example:
if prt.apps.user_is_admin():
st.write("Welcome, Admin!")
else:
st.write("You do not have admin privileges.")
Retrieves the ID of the current app.
Returns:
- int | None: The app ID if available, otherwise None.
Example:
app_id = prt.apps.get_app_id()
st.write(f"Current App ID: {app_id}")
Retrieves the prefix of the current app.
Returns:
- str: The app prefix.
Example:
prefix = prt.apps.get_app_prefix()
st.write(f"App Prefix: {prefix}")
Retrieves the name of the current app.
Returns:
- str: The app name.
Example:
name = prt.apps.get_app_name()
st.write(f"App Name: {name}")
Retrieves the version of the current app.
Returns:
- str: The app version.
Example:
version = prt.apps.get_app_version()
st.write(f"App Version: {version}")
Indicates whether the app is running in development mode or in production (AppHost) mode.
Returns:
- bool: True if running in development mode; False if running in production mode.
Example:
if prt.apps.development_mode():
st.write("Running in development mode.")
else:
st.write("Running in production mode.")
Deploys an application to the currently selected Practicus AI region.
This operation packages the app's UI code, configuration, and any supporting files, and uploads them to the Practicus AI environment. Once deployed, the app and its API are accessible through stable URLs.
Parameters:
- deployment_setting_key (str): The key identifying the application deployment service (set by your admin).
- prefix (str): A prefix used to group and identify the app and its versions.
- app_name (str): A unique name for this specific version of the app.
- app_dir (str | None): The directory containing the app files. If None, uses the current working directory.
- visible_name (str | None): The user-facing name displayed in the Practicus AI interface.
- description (str | None): A short description of the app’s purpose or functionality.
- icon (str | None): A Font Awesome icon name (e.g., "fa:rocket") to represent the app.
Returns:
- tuple[str, str]: A tuple containing:
- The UI URL of the deployed app.
- The API URL of the deployed app.
Example:
ui_url, api_url = prt.apps.deploy(
deployment_setting_key="my_streamlit_service",
prefix="myapp",
app_name="app_v1",
app_dir="app_folder",
visible_name="MyApp",
description="A sample Streamlit application",
icon="fa:rocket"
)
Obtains a short-lived session token for interacting with the application's API.
Use this token to call the app’s API endpoints, or to upload new versions of the app (if for_upload
is True).
Parameters:
- api_url (str | None): The app’s API URL. E.g. "https://practicus.company.com/apps/my-app/api/v5/"
- app_id (int | None): The app’s unique identifier. If provided, it can be used instead of
api_url
. - for_upload (bool): If True, requests a token with permissions to upload new versions (admin privileges may be required).
Returns:
- str: A session token string.
Example:
token = prt.apps.get_session_token(api_url="https://practicus.company.com/apps/my-app/api/v5/")
print("Session token:", token)
Tests the application by running it locally inside a Practicus AI Worker environment.
This method launches the Streamlit app so you can interact with it through the Practicus AI Studio environment (e.g., via a port-forwarded URL). Useful for iterative development and debugging before deploying to a production environment.
Parameters:
- app_dir (str | None): The directory containing the app files. If None, uses the current working directory.
- kill_existing_app (bool): If True, terminates any previously running test app instances on this worker before launching the new one.
Example:
prt.apps.test_app(app_dir="my_app_dir")
# Open the provided URL in Practicus AI Studio to interact with the test app.
Calls the run()
function of a provided API module, optionally with admin privileges.
Useful for local testing and debugging of API logic without deploying.
Parameters:
- api_module (ModuleType): The Python module representing the API. Must have a
run()
function. - payload: The payload to pass to the
run()
function. Should match the expected pydantic model if defined. - is_admin (bool): If True, simulates calling the API as an admin user.
Returns:
The result returned by the run()
function of the API module.
Example:
from my_app_apis import my_api_module
result = prt.apps.call_api(my_api_module, payload={"key": "value"})
print("API call result:", result)
Deletes an application from the current region.
Either app_id
or both prefix
and app_name
should be provided to identify the app.
Parameters:
- app_id (int | None): The unique identifier of the app. If provided, overrides
prefix
andapp_name
. - prefix (str | None): The app prefix to identify which group of apps it belongs to.
- app_name (str | None): The name of the app to delete.
Example:
# Delete by app_id
prt.apps.delete(app_id=12345)
# Or delete by prefix and app name
prt.apps.delete(prefix="myapp", app_name="app_v1")
Deletes a specific version of an application from the current region.
Parameters:
- version (str): The version identifier of the app to delete.
- app_id (int | None): The app’s unique identifier. If provided,
prefix
andapp_name
are ignored. - prefix (str | None): The prefix identifying the app group, if
app_id
is not provided. - app_name (str | None): The specific app name, if
app_id
is not provided.
Example:
# Delete version using app_id
prt.apps.delete_version(version="app_v1", app_id=12345)
# Or delete version using prefix and app_name
prt.apps.delete_version(version="app_v1", prefix="myapp", app_name="my_application")
Provides a simplified interface for managing workflows—such as Airflow DAGs—in Practicus AI.
The workflows
class enables you to deploy, generate, and test workflows that define and execute
complex data pipelines or ML processes. By abstracting away the underlying integration with Airflow
and other systems, it allows you to focus on authoring DAGs, tasks, and configurations without
delving into low-level deployment details.
Key Concepts:
- Workflow: A directed acyclic graph (DAG) representing a set of tasks and their dependencies, orchestrated by a workflow engine like Airflow.
- Airflow Integration: Practicus AI supports generating and deploying Airflow DAGs to the configured region, enabling fully managed and scheduled execution of your tasks.
Common Usage:
import practicuscore as prt
# Deploy an Airflow DAG to the current region
prt.workflows.deploy(service_key="my_airflow_service", dag_key="my_data_pipeline", files_path="my_dag_files/")
# Generate DAG and task files from a high-level workflow description
prt.workflows.generate_files(
dag_key="my_data_pipeline",
dag_flow="extract_data >> transform_data >> load_data",
files_path="my_dag_files/",
default_worker_config=prt.WorkerConfig(),
schedule_interval="@daily"
)
# Test tasks locally by creating temporary workers and running each task script
success_workers, failed_workers = prt.workflows.test_tasks(
dag_flow="extract_data >> transform_data >> load_data",
files_path="my_dag_files/",
default_worker_config=prt.WorkerConfig(),
terminate_on_success=True
)
Deploys a workflow (Airflow DAG) to the currently selected Practicus AI region.
Parameters:
- service_key (str): The key identifying the workflow orchestration service (e.g., Airflow instance) defined by your Practicus AI administrator.
- dag_key (str): A unique identifier (name) for the DAG being deployed.
- files_path (str | None): The directory containing the DAG file and associated task files. If None, defaults to the current working directory.
- max_files_to_upload (int): The maximum number of files to upload along with the DAG for its tasks.
Behavior: This method packages the specified DAG files, task scripts, and configurations, and uploads them to the Airflow service running in the current region, making the DAG available for scheduling and execution.
Example:
prt.workflows.deploy(
service_key="my_airflow_service",
dag_key="my_pipeline",
)
Retrieves a dictionary of Airflow DAG parameters that can be used to configure Practicus AI workers and services within the Airflow tasks. These parameters often include environment details, service endpoints, and default worker configurations.
Returns:
- dict: A dictionary of parameters suitable for inclusion in Airflow DAG and task configurations.
Example:
params = prt.workflows.get_airflow_params()
# Use params in your DAG definition to provide runtime configuration
Executes an Airflow task inside a DAG at runtime.
This method is primarily intended for internal use within the Airflow DAGs generated by Practicus AI. It receives runtime parameters from Airflow and runs the associated task. Users typically do not call this method directly.
Parameters:
- kwargs: Dynamic parameters provided by the Airflow scheduler and DAG configuration.
Returns: None
Note: Refer to Practicus AI's Airflow integration documentation or generated DAG code for usage details.
Infers the DAG key and the username (as derived from a predefined folder structure) from a given DAG file path.
Parameters:
- dag_file_path (str): The full path to the DAG file. For convenience, you can pass
__file__
when calling from inside the DAG file.
Returns:
- tuple[str, str]: A tuple
(dag_key, username)
extracted from the directory structure.
Example:
dag_key, username = prt.workflows.get_dag_info(__file__)
print(dag_key, username)
Generates Airflow DAG and task files from a given workflow definition (DAG flow).
This utility helps bootstrap the filesystem layout for your workflow code, including:
- DAG file (Python code)
- Task scripts (Python files, optionally shell scripts)
- Worker configuration files
Parameters:
- dag_key (str): A unique identifier for the DAG.
- dag_flow (str): A string representing the workflow tasks and their dependencies
(e.g.
"extract_data >> transform_data >> load_data"
). - files_path (str | None): The directory where DAG and task files will be generated. If None, defaults to the current working directory.
- default_worker_config (WorkerConfig | None): The default worker configuration applied to all tasks.
- custom_worker_configs (list[tuple[str, WorkerConfig]] | None): A list of tuples mapping specific task names to custom worker configurations.
- save_credentials (bool): If True, saves the current user's credentials in the default worker config file.
- overwrite_existing (bool): If True, overwrites existing files.
- schedule_interval (str | None): The scheduling interval for the DAG, e.g. "@daily", "0 2 * * THU".
- start_date (datetime | None): The start date for the DAG scheduling. Defaults to the current datetime.
- retries (int): The number of retries allowed for tasks in this DAG.
- dag_template (str): The template filename for the DAG file.
- task_template (str): The template filename for individual task files.
Example:
prt.workflows.generate_files(
dag_key="etl_pipeline",
dag_flow="extract >> transform >> load",
default_worker_config=prt.WorkerConfig(),
schedule_interval="@daily",
start_date=datetime(2024, 1, 1),
retries=2
)
Tests tasks defined by a given DAG flow or a task list by running them in temporary workers. This allows local validation of code before deploying the full workflow.
Parameters:
- dag_flow (str | None): A DAG flow string (e.g. "task1 >> task2") defining task dependencies.
- task_list (list[str] | None): A list of task names to test. Provide either
dag_flow
ortask_list
, not both. - files_path (str | None): The directory containing the task files. If None, uses the current directory.
- default_worker_config (WorkerConfig | None): The base worker configuration for running the tasks.
- custom_worker_configs (list[tuple[str, WorkerConfig]] | None): Per-task overrides for the worker configuration.
- terminate_on_success (bool): If True, terminates the worker after a successful task run.
- terminate_on_failed (bool): If True, terminates the worker if the task fails.
Returns:
- tuple[list[Worker], list[Worker]]: A tuple containing a list of workers that ran successful tasks and a list of workers that ran failed tasks (if they were not terminated).
Example:
success_workers, failed_workers = prt.workflows.test_tasks(
dag_flow="extract >> transform >> load",
default_worker_config=prt.WorkerConfig(),
terminate_on_success=True,
terminate_on_failed=False
)
Provides a high-level interface for managing various data and service connections within Practicus AI.
The connections
class includes:
- Type aliases for different connection configuration types (e.g., S3, SQL databases, Snowflake).
- Functions to create, update, retrieve, and list all configured connections.
- A utility to upload files to S3 storage.
By leveraging these methods, you can easily integrate and interact with multiple data sources and external systems in your Practicus AI environment, all through a unified interface.
Key Concepts:
- Connection Configuration (ConnConf): A structured way to store credentials, endpoints, and parameters necessary to connect to databases, storage services, and other external data systems.
- Managed Connections: Centralized management of all defined connections, allowing for easy retrieval and updates through code, without needing to hard-code credentials repeatedly.
Common Usage:
import practicuscore as prt
# Get all configured connections
all_connections = prt.connections.get_all()
# Retrieve a specific connection by name
my_db_conn = prt.connections.get("my_database")
# Create a new S3 connection configuration
s3_config = prt.connections.S3ConnConf(
endpoint_url="...",
aws_access_key_id="...",
)
prt.connections.create(name="my_s3_conn", conn_conf=s3_config)
# Update an existing connection
updated_config = s3_config.copy(update={"bucket": "my-new-bucket"})
prt.connections.update(name="my_s3_conn", conn_conf=updated_config)
# Upload a file to S3 using the newly created connection
prt.connections.upload_to_s3("my_s3_conn", local_path="data.csv", key="data/data.csv")
Uploads a file to S3 using the specified upload configuration.
This method uses the configuration details in UploadS3Conf
—including credentials,
target S3 bucket, and key—to upload a local file or in-memory data to S3.
Parameters:
- upload_conf (UploadS3Conf): A configuration object containing:
- Access and secret keys or reference to a connection.
- Bucket name.
- Local file path or data to upload.
- S3 key (object path) under which the file will be stored.
- Other optional parameters like overwrite behavior.
Returns: None. Raises an exception if the upload fails.
Retrieves a list of all configured connections in the currently active Practicus AI region.
This method returns a list of Connection
objects, each representing a previously created
or registered connection within the current region's environment. Connections can include
various data sources like S3, databases, or custom endpoints.
Returns:
- PrtList[Connection]: A read-only list of
Connection
objects.
Example:
import practicuscore as prt
all_conns = prt.connections.get_all()
for conn in all_conns:
print(conn.name, conn.conn_type)
Retrieves a specific connection configuration by its UUID or name.
If you know the unique identifier (UUID) or a user-friendly name of the connection,
this method returns the corresponding Connection
object. If no connection matches
the provided identifier, returns None.
Parameters:
- uuid_or_name (str): The UUID or name of the desired connection.
Returns:
- Connection | None: The matching
Connection
object, or None if not found.
Example:
import practicuscore as prt
my_conn = prt.connections.get("my_database")
if my_conn:
print("Found connection:", my_conn.name, my_conn.conn_type)
else:
print("Connection not found.")
Creates a new connection in the currently active Practicus AI region.
By providing a name and a ConnConf
object representing the configuration details,
this method registers a new connection that can later be retrieved and used.
The optional tree_path
parameter can organize connections into a logical hierarchy.
Parameters:
- name (str): The name of the new connection. Must be unique within the region.
- conn_conf (ConnConf): The configuration object specifying credentials, endpoints, and properties for the connection type (e.g., S3, MySQL, Snowflake).
- tree_path (str | None): Optional hierarchical path to store the connection under (e.g., "teamA/projects").
- can_write (bool): If True, indicates that this connection can be used for write operations.
Returns:
- str: The UUID of the newly created connection.
Example:
import practicuscore as prt
s3_conf = prt.connections.S3ConnConf(
...
)
conn_uuid = prt.connections.create(name="my_s3_conn", conn_conf=s3_conf, can_write=True)
print("Created connection with UUID:", conn_uuid)
Updates an existing connection with new configuration details.
Use this method to modify an existing connection if credentials, endpoints, or other parameters need to be changed. For example, changing the S3 bucket, database host, or default credentials.
Parameters:
- conn_uuid (str): The UUID of the connection to update.
- name (str): The new name for the connection (can be the same as before or a new one).
- conn_conf (ConnConf): The updated connection configuration object.
- tree_path (str | None): Optional hierarchical path for organizing the connection.
- can_write (bool): If True, indicates this connection can be used for write operations.
Returns: None. Raises an exception if the update fails or if the connection does not exist.
Example:
import practicuscore as prt
# Retrieve UUID from get or create methods
updated_s3_conf = prt.connections.S3ConnConf(
...
)
prt.connections.update(conn_uuid="existing-uuid", name="my_updated_s3_conn", conn_conf=updated_s3_conf)
print("Connection updated successfully.")
Configuration for uploading files to an S3-compatible storage.
This class defines the parameters needed to upload files from a local source to a specified S3 bucket and prefix. It can also handle cutting part of the source path to simplify the uploaded key structure.
Usage Example:
s3_conf = UploadS3Conf(
bucket="my-bucket",
prefix="data/backups",
folder_path="local_data/",
aws_access_key_id="AKIA...",
aws_secret_access_key="secret...",
aws_region="us-east-1"
)
Factory class to generate connection configuration classes using a dictionary, json or an instance of the subject class
Base connection configuration class
AWS S3 or compatible object storage (e.g. Minio, CEPH, Google Cloud Storage) connection configuration
SQLite connection configuration
MYSQL connection configuration
PostgreSQL connection configuration
AWS Redshift connection configuration
Snowflake connection configuration
Microsoft SQL Server connection configuration
Oracle DB or DWH connection configuration
Hive connection configuration
Cloudera connection configuration
AWS Athena connection configuration
ElasticSearch connection configuration
OpenSearch connection configuration
Trino connection configuration
Dremio connection configuration
SAP Hana connection configuration
Teradata connection configuration
IBM DB2 connection configuration
AWS DynamoDB connection configuration
CockroachDB connection configuration
Custom sqlalchemy compatible connection configuration
Provides a simplified interface for managing distributed compute jobs within Practicus AI.
The distributed
class offers convenient methods and property aliases to inspect and interact with
distributed workloads, such as multi-worker or multi-GPU training jobs. It leverages underlying
DistJobHelper
utilities and configurations to streamline setup and monitoring.
Key Concepts:
- Distributed Jobs: A set of workers orchestrated together to run a parallel or distributed task, such as large-scale training, hyperparameter tuning, or data processing.
- Coordinator and Agents: In distributed tasks, a coordinator (rank 0) manages synchronization and coordination, while agent workers (rank ≥ 1) carry out the bulk of the workload.
Common Usage:
import practicuscore as prt
# Check if running in a distributed cluster
if prt.distributed.running_on_a_cluster():
job_id = prt.distributed.get_job_id()
gpu_count = prt.distributed.get_gpu_count()
print(f"Running in distributed mode. Job ID: {job_id}, GPUs: {gpu_count}")
if prt.distributed.is_coordinator():
print("This worker is the coordinator. Managing task distribution...")
else:
print("This worker is an agent, focusing on processing tasks.")
else:
print("This worker is not part of a distributed job.")
Job type e.g. python, spark, deepspeed etc. using the DistJobType
enum
Waits until the distributed job directory and configuration files appear, indicating that all executors have started (but not necessarily running).
Parameters:
- job_dir (str | None): The job directory. If None, expects the code is running on the cluster.
- job_id (str | None): The job ID. If None, expects the code is running on the cluster.
- timeout (int): Maximum number of seconds to wait before raising a TimeoutError.
- sleep_between_refresh (int): How often (in seconds) to check for job start.
- log_state (bool): If True, logs waiting status messages.
Example:
DistJobHelper.wait_for_start(job_dir="/path/to/job_dir", job_id="12345")
Waits until all executors of a distributed job are in the running
state.
Parameters:
- job_dir (str | None): The job directory. If None, expects the code is running on the cluster.
- job_id (str | None): The job ID. If None, expects the code is running on the cluster.
- timeout (int): Maximum number of seconds to wait before raising a TimeoutError.
- sleep_between_refresh (int): How often (in seconds) to check for executors running.
Example:
DistJobHelper.wait_for_running(job_dir="/path/to/job_dir", job_id="12345")
Opens a live view of the distributed job’s state inside a Jupyter environment, refreshing periodically.
Parameters:
- job_dir (str | None): Job directory or None if running inside the cluster.
- job_id (str | None): Job ID or None if running inside the cluster.
- max_rows (int): Maximum number of executor entries to display.
- sleep_between_refresh (int): How often (in seconds) to refresh the view.
Example:
DistJobHelper.live_view(job_dir="/path/to/job_dir", job_id="12345")
Displays executor worker logs for a given distributed job executor (identified by rank).
Parameters:
- job_dir (str | None): Job directory or None if running inside the cluster.
- job_id (str | None): Job ID or None if running inside the cluster.
- rank (int | None): The executor rank. If None, uses the current worker’s rank.
Example:
DistJobHelper.view_log(job_dir="/path/to/job_dir", job_id="12345", rank=0)
Acquires a client (or session) for interactive clusters like Dask or Spark.
Parameters:
- job_dir (str | None): Job directory or None if running on the coordinator.
- job_id (str | None): Job ID or None if running on the coordinator.
- conn_conf (ConnConf | None): Optional connection configuration (e.g., credentials for S3 in Spark).
- config_dict (dict | None): Additional configuration parameters.
Returns:
- Client/session object appropriate for the job type (e.g., Dask client, Spark session).
Example:
client = DistJobHelper.get_client(job_dir="/path", job_id="12345")
# client could be a Dask client or a Spark session depending on the job type.
Opens the management dashboard for interactive clusters (Spark, Dask) in a browser or returns the URL.
Parameters:
- job_dir (str | None): Job directory or None if running on the job cluster.
- job_id (str | None): Job ID or None if running on the job cluster.
- get_url_only (bool): If True, returns the dashboard URL without opening it.
Returns:
- str: The dashboard URL.
Example:
dashboard_url = DistJobHelper.open_dashboard(job_dir="/path", job_id="12345", get_url_only=True)
print("Dashboard URL:", dashboard_url)
Retrieves the active Job ID for this worker if it is part of a distributed job.
Returns:
- int | None: The Job ID if this worker is part of a distributed job, None otherwise.
Example:
job_id = prt.distributed.get_job_id()
if job_id is not None:
print(f"This worker belongs to distributed job ID: {job_id}")
else:
print("Not part of a distributed job.")
Retrieves the rank of this worker within the distributed job.
The rank is an integer that distinguishes different workers in a job. Typically:
- Rank 0: Coordinator (master) worker.
- Rank ≥1: Agent (worker) nodes.
Returns:
- int | None: The rank of this worker if part of a distributed job, None otherwise.
Example:
rank = prt.distributed.get_job_rank()
if rank == 0:
print("This is the coordinator node.")
elif rank is not None:
print(f"This is an agent node with rank: {rank}")
else:
print("Not part of a distributed job.")
Retrieves the number of GPUs available on this worker.
Returns:
- int: The number of GPUs detected on this worker.
Example:
gpu_count = prt.distributed.get_gpu_count()
print(f"Available GPUs on this worker: {gpu_count}")
Checks if the current code is running on a worker that is part of a distributed job or cluster.
Returns:
- bool: True if running as part of a distributed job, False otherwise.
Example:
if prt.distributed.running_on_a_cluster():
print("Running in distributed mode.")
else:
print("Running in a standalone worker.")
Checks if this worker is the coordinator (rank 0) of a distributed job.
If the worker is running a distributed job and its rank is 0, or if auto-distributed configuration is detected without a specific rank, it is considered the coordinator.
Returns:
- bool: True if this worker is the coordinator, False otherwise.
Example:
if prt.distributed.is_coordinator():
print("I am the coordinator of this distributed job.")
Checks if this worker is an agent (rank ≥ 1) in a distributed job.
Agents typically carry out workloads assigned by the coordinator.
Returns:
- bool: True if this worker is an agent in a distributed job, False otherwise.
Example:
if prt.distributed.is_agent():
print("I am an agent worker node.")
Validates that the specified job directory is accessible to this worker.
In distributed settings, job-related files or configurations may be stored in a shared directory. This method ensures that the directory is present and accessible.
Parameters:
- job_dir (str): The user-friendly job directory path. It will be translated
to the actual directory path by
DistJobHelper
.
Example:
try:
prt.distributed.validate_job_dir("/my/job/dir")
print("Job directory is valid.")
except FileNotFoundError:
print("Invalid job directory.")
Provides a high-level, convenience interface for Practicus AI authentication and authorization tasks.
The auth
class exposes commonly used authentication-related methods—such as logging in, logging out,
changing passwords, and retrieving tokens—without requiring direct interaction with the underlying regions
class or Region
objects. This allows developers to quickly manage authentication workflows in a more
streamlined manner.
Typical Operations Include:
- Logging into a Practicus AI region with
auth.login(...)
. - Logging out from one or all regions using
auth.logout(...)
. - Automatically authenticating within known Practicus AI environments using
auth.auto_login()
. - Changing user passwords and retrieving workspace credentials or tokens.
Example:
import practicuscore as prt
# Log in to a Practicus AI region
prt.auth.login(
url="https://my-practicus-region.example.com",
email="user@example.com",
password="mypassword"
)
# Retrieve an access token
access_token = prt.auth.get_access_token()
# Change the default region
prt.auth.set_default_region("user@my-practicus-region.example.com")
# Logout from all known regions
prt.auth.logout(all_regions=True)
Sets the default Practicus AI region. Subsequent operations that do not explicitly specify a region will use this default.
Parameters:
- region_key (str): The region identifier, e.g.
username@my-region.example.com
or justmy-region.example.com
if only one user is associated with it.
Returns
True if the region is successfully set as default; False otherwise.
Authenticates the user to a Practicus AI region and returns a Region
instance.
The login process uses the provided credentials (password or tokens) to obtain a refresh token and
access token. Once authenticated, the region and associated credentials are stored so they can be
reused in subsequent sessions, unless save_config
is set to False.
Parameters:
- url (str): The base URL of the Practicus AI region, e.g. "https://my-region.example.com".
- email (str): The user's email associated with the Practicus AI account.
- password (str | None): The user's password. Optional if refresh_token or access_token are provided.
- refresh_token (str | None): Existing refresh token; can be used instead of password-based login.
- save_password (bool): If True, saves the entered password securely for future sessions.
- access_token (str | None): An existing valid access token to bypass credential-based login.
- save_config (bool): If True, persists the configuration so that subsequent sessions do not require login.
Returns
A
Region
object representing the authenticated Practicus AI environment.
Example:
region = prt.regions.login(
url="https://my-practicus-region.example.com",
email="user@example.com",
password="mypassword",
save_password=True
)
Logs out of one or all Practicus AI regions, removing stored credentials from local configuration.
Parameters:
- region_key (str | None): The key of the region to logout from. If None and
all_regions
is False, no action will be taken. - all_regions (bool): If True (default), logs out from all known regions. If False, you must specify
region_key
.
Returns
None
Automatically logs into the Practicus AI region when running within a known Practicus AI environment (e.g., a worker or workspace) that already has embedded credentials. Useful for automations and internal scenarios where explicit login is not required.
Returns
The default
Region
instance after auto-login.
Changes the current user's password in the default Practicus AI region.
Parameters:
- old_password (str): The current password.
- new_password (str): The new desired password.
Retrieves the username and password credentials for a specified workspace instance.
The username is typically derived from the user's email address. The password is a managed credential provided by the region.
Parameters:
- instance_id (str): The identifier of the workspace. View available instances via
region.worker_list
.
Returns
A tuple
(username, password)
for the selected workspace.
Provides high-level access to data processing engines like Apache Spark within Practicus AI.
The engines
class acts as a simplified interface for working with various data processing engines configured
in the Practicus AI environment. This typically involves retrieving or creating Spark sessions that run
on Practicus AI Workers. By abstracting away configuration details, it allows developers to focus on their
data pipelines and analysis tasks.
Key Concepts:
- Spark Session: A handle to an Apache Spark cluster environment. You can use it to read and write data, run transformations, and perform distributed computations.
- Connection Config: A dictionary or JSON configuration that specifies how the Spark session should connect to data sources (e.g., S3, databases).
Common Usage:
import practicuscore as prt
# Get or create a Spark session on a Practicus AI Worker
spark = prt.engines.get_spark_session()
# Perform Spark operations
df = spark.read.csv("s3://mybucket/mydata.csv", header=True)
df.show()
# Delete the Spark session when done
prt.engines.delete_spark_session()
Important: Spark sessions require that your code is running on a Practicus AI Worker. Attempting to run Spark-related operations outside of a Worker environment will result in an error.
Retrieves or creates a Spark session for distributed data processing within a Practicus AI Worker.
A Spark session can be used to connect to various data sources, run SQL queries, and perform large-scale transformations or machine learning tasks. If a session already exists, it will be reused; otherwise, a new one will be created based on the provided configuration parameters.
Parameters:
- conn_conf (dict | str | None): Optional. A configuration object specifying data connection details.
- extra_spark_conf (dict | None):
Optional. A dictionary of additional Spark configuration options. For example:
{"spark.executor.memory": "4g", "spark.driver.cores": "2"}
Returns:
- Optional['SparkSession']: A SparkSession object if successful, or None if Spark cannot be initialized.
Example:
spark = prt.engines.get_spark_session(
extra_spark_conf={"spark.executor.instances": "5"}
)
df = spark.read.csv("s3://my-data-bucket/data.csv", header=True)
df.show()
Terminates the currently active Spark session created by Practicus AI.
This method releases the resources associated with the Spark session, including executors and any cached data. It is good practice to delete the Spark session when you have finished your data processing tasks, especially if you anticipate creating a new session later.
Returns: None
Example:
# After finishing all Spark tasks
prt.engines.delete_spark_session()
Provides a simplified interface for managing machine learning experiments in Practicus AI.
The experiments
class helps you integrate with various experiment tracking services supported by
Practicus AI (e.g., MLFlow) to log parameters, metrics, and artifacts of your machine learning models.
By centralizing the configuration process, it allows you to easily initialize and switch between
experiment backends without deep knowledge of their internal configurations.
Key Concepts:
- Experiment Tracking: The practice of recording experiment parameters, metrics, artifacts (models, plots, data snapshots), and metadata in a versioned and searchable manner.
- MLFlow: A popular open-source platform for managing the ML lifecycle. Practicus AI can configure MLFlow servers to track your experiments.
Common Usage:
import practicuscore as prt
# Configure the experiment tracking to use a particular MLFlow service
prt.experiments.configure(service_name="my-mlflow-service", service_key="abc123", experiment_name="MyExperiment")
# After configuration, your ML code can log metrics and parameters via MLFlow APIs
import mlflow
with mlflow.start_run():
mlflow.log_param("learning_rate", 0.01)
mlflow.log_metric("accuracy", 0.95)
Configures the machine learning experiment tracking service for the current environment.
This method sets up integration with a configured MLFlow tracking server in Practicus AI. After configuration, you can use the MLFlow Python API to log parameters, metrics, and artifacts of your experiments.
Parameters:
- service_name (str | None): The name of the Practicus AI experiment service (e.g., an MLFlow deployment) to connect to. If None, uses a default or previously configured service if available.
service_key (str | None): A key or token to authenticate and retrieve the appropriate experiment service details. This might be provided by Practicus AI’s console or your platform administrator.
experiment_name (str | None): An optional name of the experiment. If provided, this will attempt to set or start a new experiment run under the given name. If not provided, the service will default to a predefined experiment context or wait for you to specify one later.
Returns: None
Behavior and Error Handling:
- If configuration is successful and
service_name
references an external MLFlow service, a log message confirms the configuration. - If configuration fails, an error message is logged. Common reasons for failure include:
- Invalid
service_name
orservice_key
. - Network or credential issues accessing the external tracking server.
- Invalid
- Exceptions, if any, are logged with stack traces for debugging.
Example:
# Configure with a known MLFlow service and start a new experiment
prt.experiments.configure(
service_name="my-mlflow-service",
service_key="myservicekey123",
experiment_name="MyModelExperiment"
)
# Now you can use MLFlow's Python SDK for tracking:
import mlflow
mlflow.log_param("batch_size", 64)
mlflow.log_metric("loss", 0.4)
Provides a simplified interface for executing, configuring, and managing Jupyter notebooks programmatically within Practicus AI.
The notebooks
class allows you to:
- Configure default output directories for successful and failed notebook runs.
- Execute Jupyter notebooks with specified parameters, capturing outputs and handling failures.
- Keep track of execution history, including which notebooks succeeded or failed.
- Validate the execution environment (e.g., correct Python virtual environment).
Key Concepts:
- Notebook Execution: Running a
.ipynb
file programmatically (via papermill) and capturing the executed output. - Output Management: Storing successful and failed notebook outputs in designated directories, optionally with timestamps for versioning or organization.
- History Tracking: Keeping a record of all notebooks executed, and validating that all completed successfully if desired.
Common Usage:
import practicuscore as prt
# Configure default directories and reset history
prt.notebooks.configure(
default_output_folder="~/notebook_outputs/success",
default_failed_output_folder="~/notebook_outputs/failed",
add_time_stamp_to_output=True,
reset_history=True
)
# Execute a notebook
prt.notebooks.execute_notebook(
input_path="~/my_notebook.ipynb",
parameters={"param1": 42},
raise_on_failure=True
)
# View execution history
prt.notebooks.view_history()
# Validate all executed notebooks
prt.notebooks.validate_history()
Configures default output directories and history handling for notebook execution.
Parameters:
- default_output_folder (str | None):
The base directory where successful notebook outputs will be stored.
Supports
~
for the home directory. Ifadd_time_stamp_to_output
is True, a timestamped subfolder is created. - default_failed_output_folder (str | None):
The base directory where failed notebook outputs will be stored.
Supports
~
for the home directory. Ifadd_time_stamp_to_output
is True, a timestamped subfolder is created. - add_time_stamp_to_output (bool): If True, appends a timestamp-based folder structure (YYYY-MM-DD/HH-MM-SS) inside the specified output directories.
- reset_history (bool):
If True, clears the record of previously executed notebooks, resetting
successful_notebooks
andfailed_notebooks
.
Example:
prt.notebooks.configure(
default_output_folder="~/outputs/success",
default_failed_output_folder="~/outputs/failed",
add_time_stamp_to_output=True,
reset_history=True
)
Clears the record of previously executed notebooks, resetting both successful and failed lists.
Example:
prt.notebooks.reset_history()
print("Execution history has been reset.")
Logs the summary of executed notebooks: how many succeeded and how many failed.
Behavior:
- If there are failed notebooks, logs a warning with the count.
- If there are successful notebooks, logs their count.
- If no notebooks have been run, logs that no history is available.
Example:
prt.notebooks.view_history()
Validates the execution history by checking if any notebooks failed.
Behavior:
- If any notebooks failed, raises a
ValidationFailedError
. - If all executed notebooks were successful, logs a success message.
- If no notebooks have been run, logs that no runs are in the history.
Example:
prt.notebooks.validate_history()
Retrieves the name of the current Python virtual environment.
Returns:
- str: The current virtual environment name. If using system Python, returns "practicus".
Example:
current_venv = prt.notebooks.get_venv_name()
print(f"Running in venv: {current_venv}")
Validates that the current environment matches the expected virtual environment name.
Parameters:
- venv_name (str): The expected virtual environment name.
Behavior:
- If the current environment does not match
venv_name
, raises an EnvironmentError. - In a Practicus AI Worker, suggests using the correct image or kernel if mismatch is found.
Example:
prt.notebooks.validate_venv("practicus_genai")
Executes a Jupyter notebook using papermill, optionally applying parameters.
Parameters:
- input_path (str): The path to the input
.ipynb
notebook file.~
will be expanded to home dir. If no.ipynb
suffix, it is appended. - parameters (dict | None): A dictionary of parameters to pass to the notebook.
- output_path (str | None): The path to save the executed notebook.
If None, uses
default_output_folder
or modifiesinput_path
to create an_output.ipynb
. - failed_output_path (str | None): Custom directory to store failed notebook outputs.
If None, uses
default_failed_output_folder
if configured. - raise_on_failure (bool): If True, raises an exception if the notebook fails.
- **kwargs: Additional arguments passed to
papermill.execute_notebook()
.
Behavior:
- Executes the notebook with papermill.
- If successful, logs success and records the notebook in
successful_notebooks
. - If it fails, logs an error, moves the output to the failed directory if available, and records the notebook in
failed_notebooks
.
Example:
prt.notebooks.execute_notebook(
input_path="~/analysis.ipynb",
parameters={"input_data": "dataset.csv"},
raise_on_failure=True
)
Provides convenience methods to check and format code quality using the ruff
linting and formatting tool.
The quality
class leverages the ruff
command-line tool to:
- Lint code, identifying stylistic, formatting, and potential bug issues.
- Automatically fix certain issues to maintain a consistent code style.
By default, if no paths are specified, it operates on the current working directory.
Key Concepts:
- Linting (Check): Scans code for issues and reports them without changing files.
- Formatting: Attempts to auto-fix and format code to adhere to specified guidelines.
Common Usage:
import practicuscore as prt
# Check code quality in the current directory
success = prt.quality.check()
# Format code in a specific directory
prt.quality.format(paths=["src/"])
Checks code quality and linting issues using ruff.
Parameters:
- paths (list[str] | None): Paths to lint. If None, defaults to the current directory.
- config_path (str | None): Optional ruff config file path.
- fix (bool): If True, attempts to fix linting issues automatically (default: False).
- select (list[str] | None): Specific linting rules (error codes) to focus on.
- ignore (list[str] | None): Specific linting rules (error codes) to ignore.
Returns:
- bool: True if no issues (or successfully fixed) found; False otherwise.
Example:
# Check the current directory without fixing issues
success = quality.check()
# Check and fix issues in 'src' directory
success = quality.check(paths=["src/"], fix=True)
Formats code to improve code style and consistency.
Unlike check
, the format
command attempts to rewrite code to conform to linting rules.
It does not fix all issues but can apply automatic formatting improvements.
Parameters:
- paths (list[str] | None): Paths to format. If None, defaults to the current directory.
- config_path (str | None): Optional ruff config file path.
- select (list[str] | None): Specific formatting-related rules to enforce.
- ignore (list[str] | None): Specific formatting-related rules to ignore.
Example:
# Automatically format code in 'src' directory
quality.format(paths=["src/"])
Represents a Practicus AI "Region," which is a logical control plane for managing resources such as workers (compute pods), models, applications, workflows, and data connections.
A Region
corresponds to an isolated environment often running on Kubernetes. Each Region
holds configurations, credentials, and references to available services and data sources.
By interacting with a Region
, you can:
- Launch and manage Workers or Workspaces for running code, notebooks, or hosting models.
- Deploy and manage ML models, view model prefixes and versions.
- Deploy and manage Streamlit-based applications.
- Manage data connections (e.g., S3, SQL databases, Snowflake).
- Work with add-ons and external services integrated into the Practicus AI ecosystem.
Key Concepts:
- Workers and Workspaces: Kubernetes pods that run code, handle notebooks, or host services.
- Model and App Deployments: ML models and Streamlit apps deployed as services accessible by stable URLs.
- Connections: Configurations to external data sources (S3, databases, etc.).
- Regions & Isolation: Each Region is an isolated namespace, ensuring that resources and configurations do not conflict across environments.
Common Usage:
import practicuscore as prt
# Retrieve the default region
region = prt.get_default_region()
# Launch a worker
worker = region.create_worker()
# List current workers
for w in region.worker_list:
print(w.name, w.instance_id)
# Deploy a model or app
region.deploy_model(deployment_key="my_model_service", prefix="mymodel", model_name="model_v1")
region.deploy_app(deployment_setting_key="my_app_service", prefix="myapp", app_name="app_v1")
# Manage data connections
conn = region.get_connection("my_database")
print(conn)
# Terminate all workers
region.terminate_all_workers()
Initializes a new Region instance.
Parameters:
- url (str): The base URL of the Practicus AI Region.
- email (str | None): The user's email associated with this Region.
- username (str | None): The username derived from the email or assigned for this Region.
- refresh_token (str | None): A refresh token for authenticating calls to the Region.
- access_token (str | None): An optional pre-obtained access token.
Logger configuration for all region related activity
The username derived from the user's email or explicitly set for this Region session.
The refresh token used for authenticating requests in this Region session.
Retrieves the host DNS of the region by stripping protocol from the URL.
Returns:
- str: The host DNS (e.g., "my-region.example.com").
Example:
print(region.host_dns)
A unique key identifying this region in the form "username@host_dns".
Returns:
- str: The unique region key.
Example:
print(region.key) # e.g. "alice@my-region.example.com"
Checks whether this region is configured as the default region.
Returns:
- bool: True if this is the default region, False otherwise.
Example:
if region.is_default:
print("This is the default region.")
Retrieves a list of all Workers currently running in this region.
Workers are refreshed periodically. If the cache is stale, this property automatically reloads the worker list from the region.
Returns:
- PrtList[Worker]: A read-only list of Workers.
Example:
for worker in region.worker_list:
print(worker.name, worker.service_type)
Forces a reload of the worker list from the region, optionally filtering by service type (e.g., "cloud_worker", "workspace").
Parameters:
- service_type (str | None): The service type to filter workers by. If None, returns all workers.
Example:
region.reload_worker_list(service_type="cloud_worker")
Retrieves the default worker size for launching new workers if none is specified.
Returns:
- str: The default worker size name.
Example:
default_size = region.get_default_worker_size()
print("Default worker size:", default_size)
Retrieves the default container image for a given service type.
Parameters:
- service_type (str): The service type (e.g., "cloud_worker", "workspace").
Returns:
- str: The URL of the default worker image.
Example:
default_image = region.get_default_worker_image()
print("Default worker image:", default_image)
Creates a new Practicus AI Worker in this region.
If no worker_config
is provided, defaults (default image and size) are used.
The worker can be a simple "cloud_worker" or a more complex distributed job.
Parameters:
- worker_config (WorkerConfig | str | dict | None): The worker configuration.
Can be a
WorkerConfig
object, a path to a JSON file, a dict, or None. - wait_until_ready (bool | None): If True, waits until the worker is fully ready.
Returns:
- Worker: The newly created Worker instance.
Example:
worker = region.create_worker()
print("Created worker:", worker.name)
Creates a new Practicus AI Workspace.
A Workspace is a specialized worker configured for interactive development (e.g., Jupyter notebooks).
Parameters:
- worker_config (WorkerConfig | str | dict | None): The workspace configuration. If None, default settings are used.
Returns:
- Worker: The newly created workspace worker.
Example:
workspace = region.create_workspace()
print("Created workspace:", workspace.name)
Retrieves the Worker instance representing the current environment if the code is running inside a worker.
Returns:
- Worker: The local worker.
Raises:
- SystemError: If not running inside a worker.
Example:
local_w = region.get_local_worker()
print("Local worker:", local_w.name)
Gets an existing worker or creates a new one if none suitable exists.
If worker_config
is not provided and no suitable running worker is found, it creates a new one.
Parameters:
- worker_config (WorkerConfig | str | dict | None): The worker configuration.
- service_type (str): The service type (e.g. "cloud_worker").
Returns:
- Worker: An existing or newly created worker.
Example:
worker = region.get_or_create_worker()
print("Worker:", worker.name)
Retrieves a list of configured model prefixes available in this region.
Model prefixes group related models, allowing you to organize and manage multiple versions and deployments under a logical namespace.
Returns:
- PrtList[ModelPrefix]: A read-only list of model prefixes.
Example:
for prefix in region.model_prefix_list:
print(prefix.prefix_name)
Retrieves a list of all models known to this region.
Each ModelMeta
object contains metadata about a model, such as its name, versions, and associated prefix.
Returns:
- PrtList[ModelMeta]: A read-only list of models.
Example:
for model in region.model_list:
print(model.model_name, model.latest_version)
Retrieves a list of all applications known to this region.
Each AppMeta
object contains metadata about the app, such as its prefix, name, versions, and deployment info.
Returns:
- PrtList[AppMeta]: A read-only list of apps.
Example:
for app in region.app_list:
print(app.app_name, app.visible_name)
Retrieves a list of app prefixes available in this region.
App prefixes group related applications, providing a logical namespace for organizing multiple apps.
Returns:
- PrtList[AppPrefix]: A read-only list of app prefixes.
Example:
for prefix in region.app_prefix_list:
print(prefix.prefix)
Forces a reload of the model prefix list from the region.
If prefixes are stale or changed, calling this method ensures you have the latest data.
Example:
region.reload_model_prefix_list()
Forces a reload of the model list from the region.
Ensures that newly created or updated models are reflected locally.
Example:
region.reload_model_list()
Forces a reload of the app list from the region.
Ensures that newly deployed or updated apps appear locally.
Example:
region.reload_app_list()
Forces a reload of the app prefix list from the region.
Example:
region.reload_app_prefix_list()
Retrieves a list of all model deployments currently defined in this region.
A ModelDeployment
represents a deployed model service, including routing and scaling details.
Returns:
- PrtList[ModelDeployment]: A read-only list of model deployments.
Example:
for deployment in region.model_deployment_list:
print(deployment.model_name, deployment.deployment_key)
Retrieves a list of all application deployment settings defined in this region.
AppDeploymentSetting
provides configurations that influence how apps are deployed
(e.g., environment variables, scaling settings).
Returns:
- PrtList[AppDeploymentSetting]: A read-only list of app deployment settings.
Example:
for setting in region.app_deployment_setting_list:
print(setting.deployment_setting_key, setting.description)
Forces a reload of the model deployment list from the region.
Ensures newly created or removed deployments are reflected locally.
Example:
region.reload_model_deployment_list()
Forces a reload of the app deployment setting list from the region.
Example:
region.reload_app_deployment_setting_list()
Deploys a ML model to this region.
Parameters:
- deployment_key (str): The deployment key defined by admin.
- prefix (str): The model prefix.
- model_name (str): The model name.
- model_dir (str | None): Directory containing model files. If None, current directory is used.
Returns:
- (str, str, str): (Dynamic version API URL, Specific version API URL, Metadata API URL).
Example:
dynamic_url, version_url, meta_url = region.deploy_model("my_model_service", "mymodel", "model_v1")
Deploys an application to this region.
Parameters:
- deployment_setting_key (str): The deployment setting key defined by admin.
- prefix (str): The app prefix.
- app_name (str): The app name.
- app_dir (str | None): The directory with app files. If None, current directory is used.
- visible_name (str | None): User-facing name of the app.
- description (str | None): Short description of the app.
- icon (str | None): Font Awesome icon name.
Returns:
- (str, str): (UI URL, API URL)
Example:
ui_url, api_url = region.deploy_app("my_app_service", "myapp", "app_v1", app_dir="app_files/")
Deletes an entire application (all versions) from this region.
Parameters:
- app_id (int | None): The app ID. If not provided, prefix and app_name must be given.
- prefix (str | None): The app prefix if app_id is not known.
- app_name (str | None): The app name if app_id is not known.
Example:
region.delete_app(prefix="myapp", app_name="app_v1")
Deletes a specific version of an app from this region.
Parameters:
- version (int | str): The version to delete.
- app_id (int | None): The app ID. If not provided, prefix and app_name must be given.
- prefix (str | None): The app prefix.
- app_name (str | None): The app name.
Example:
region.delete_app_version(version=2, prefix="myapp", app_name="app_v1")
Deploys a workflow (e.g., an Airflow DAG) to this region.
Parameters:
- service_key (str): The workflow service key defined by admin (e.g., Airflow).
- dag_key (str): The DAG key (must be a valid Python module name).
- files_path (str | None): The directory with workflow files. Defaults to current directory.
- max_files_to_upload (int): Maximum files to upload.
Example:
region.deploy_workflow(service_key="airflow_service", dag_key="my_pipeline", files_path="workflow_files/")
Logs out from this region by removing stored credentials.
Example:
region.logout()
print("Logged out from region.")
Sets this region as the default region.
Example:
region.set_default()
print("Default region is now:", region.key)
Runs a remote task (script) on a newly created or existing worker.
The code and dependencies are uploaded to the worker, and the script is executed.
If terminate_on_completion
is True, the worker is terminated after the task finishes.
Parameters:
- file_name (str): The Python (.py) or shell (.sh) script to run.
- files_path (str | None): Directory containing files to upload. Defaults to current directory.
- worker_config (WorkerConfig | str | dict | None): Configuration for the remote worker.
- terminate_on_completion (bool): If True, terminates the worker after completion.
- capture_task_output (bool): If True, captures and logs the script's stdout and stderr.
- python_venv_name (str | None): Name of a Python virtual environment to use on the worker.
- max_files_to_upload (int): Max number of files to upload.
Returns:
- (Worker | None, bool): A tuple of (worker, success_boolean).
Example:
worker, success = region.run_task("analysis.py", files_path="project_code/")
if success:
print("Task ran successfully!")
Retrieves a short-lived session token for interacting with a model's API.
Parameters:
- api_url (str): The model API URL.
- for_upload (bool): If True, requests a token with permissions for model uploads.
- retry (int): Number of times to retry obtaining a token.
Returns:
- str | None: The session token, or None if unable to fetch.
Example:
token = region.get_model_api_session_token("https://my-region.com/mymodel/", for_upload=True)
if token:
print("Got token for model API upload.")
Retrieves a short-lived session token for interacting with an app's API.
Parameters:
- api_url (str | None): The app API URL.
- app_id (int | None): The app's unique identifier.
- for_upload (bool): If True, requests a token for uploading a new app version.
- retry (int): Number of times to retry obtaining a token.
Returns:
- str | None: The session token, or None if unable to fetch.
Example:
token = region.get_app_api_session_token(app_id=1234, for_upload=False)
if token:
print("Got token for app API.")
Retrieves a list of add-ons (3rd party services) integrated into this region.
Add-ons might represent external analytics tools, monitoring services etc. accessible through the region.
Returns:
- PrtList[AddOn]: A read-only list of add-ons.
Example:
for addon in region.addon_list:
print(addon.key, addon.name)
Forces a reload of the add-on list from the region.
Ensures that any newly added or updated add-ons are reflected locally.
Example:
region.reload_addon_list()
Retrieves an AddOn (external service) by its key.
Parameters:
- key (str): The unique identifier of the add-on.
Returns:
- AddOn | None: The matching add-on, or None if not found.
Example:
addon = region.get_addon("my_addon_key")
if addon:
print("Add-on name:", addon.name)
else:
print("Add-on not found.")
Opens the specified add-on in a browser or a suitable interface if supported.
Parameters:
- key (str): The unique identifier of the add-on to open.
Example:
region.open_addon("my_addon_key")
Removes a worker from the region's internal cache without terminating it.
This is typically used internally. To fully terminate a worker, use terminate_worker()
.
Parameters:
- worker_name (str): The name of the worker to remove from the cache.
Example:
region.remove_worker_from_cache("Worker-123")
Retrieves a data connection by UUID or name.
Names are not guaranteed unique, so it's recommended to use UUIDs in production.
Parameters:
- uuid_or_name (str): The connection's UUID or name.
Returns:
- Connection | None: The matching connection, or None if not found.
Example:
conn = region.get_connection("my_database")
if conn:
print("Found connection:", conn.name)
Removes a worker from the region's internal cache without terminating it.
This is typically used internally. To fully terminate a worker, use terminate_worker()
.
Parameters:
- worker_name (str): The name of the worker to remove from the cache.
Example:
region.remove_worker_from_cache("Worker-123")
Terminates one or all workers in this region.
Parameters:
- worker_name_or_num (str | int | None): The worker name or numeric suffix (e.g. "Worker-123" or "123").
- all_workers (bool): If True, terminates all workers in the region.
- instance_id (str | None): The instance_id of a worker, if not using name/num.
- stop_reason (str): A reason for stopping, used for logging and audit.
Returns:
- bool: True if at least one worker was terminated, False otherwise.
Example:
region.terminate_worker(worker_name_or_num="Worker-123")
region.terminate_worker(all_workers=True)
Retrieves the group memberships of the currently logged-in user.
Returns:
- list[str]: A list of group names.
Example:
groups = region.get_groups()
print("User groups:", groups)
Terminates all workers in this region.
Returns:
- bool: True if any workers were terminated, False if none were found.
Example:
region.terminate_all_workers()
Retrieves a sorted list of all data connections defined in this region.
Connections can point to databases, S3 buckets, or other external data sources.
Returns:
- PrtList[Connection]: A read-only list of connections sorted by name.
Example:
for conn in region.connection_list:
print(conn.name, conn.uuid)
Forces a reload of the data connection list from the region.
Ensures newly created or updated connections appear locally.
Example:
region.reload_connection_list()
Creates a new data connection in this region.
A data connection stores credentials and parameters for accessing external data sources, such as databases or S3 buckets.
Parameters:
- name (str): A unique name for the new connection.
- conn_conf (ConnConf): The connection configuration object describing the data source.
- tree_path (str | None): An optional hierarchical path for organizing the connection, e.g. "teamA/projects".
- can_write (bool): If True, this connection can be used for write operations.
Returns:
- str: The UUID of the newly created connection.
Example:
from practicuscore import S3ConnConf, connections
s3_conf = S3ConnConf(access_key="AKIA...", secret_key="secret...", bucket="my-bucket")
uuid = region.create_connection(name="my_s3_conn", conn_conf=s3_conf, can_write=True)
print("Created connection with UUID:", uuid)
Updates an existing data connection's configuration.
This method allows changing connection details (e.g., credentials, endpoints) or reorganizing
how connections are stored (via tree_path
).
Parameters:
- conn_uuid (str): The UUID of the existing connection to update.
- name (str): The new name for the connection. You can keep the old name or provide a new one.
- conn_conf (ConnConf): The updated connection configuration.
- tree_path (str | None): An optional hierarchical path for reorganizing the connection.
- can_write (bool): If True, the updated connection can be used for write operations.
Example:
# Assume we previously retrieved or created a connection with UUID "abcd1234"
updated_s3_conf = S3ConnConf(access_key="AKIA...", secret_key="new_secret", bucket="my-new-bucket")
region.update_connection(conn_uuid="abcd1234", name="my_updated_s3_conn", conn_conf=updated_s3_conf)
print("Connection updated successfully.")
Retrieves a list of worker sizes available in this region.
Worker sizes define the compute resources (CPU, memory) allocated to a worker.
Returns:
- PrtList[WorkerSize]: A read-only list of worker sizes.
Example:
for size in region.worker_size_list:
print(size.name, size.default)
Forces a reload of the worker size list from the region.
Ensures that any newly added or updated worker sizes are reflected locally.
Example:
region.reload_worker_size_list()
Retrieves a list of available worker images in this region.
Worker images define the software environment (base Docker image) used by workers or workspaces.
Returns:
- PrtList[WorkerImage]: A read-only list of worker images, sorted by priority (order).
Example:
for img in region.worker_image_list:
print(img.url, img.service_type)
Forces a reload of the worker image list from the region.
Ensures that any newly added or updated worker images are reflected locally.
Example:
region.reload_worker_image_list()
Retrieves an access token for the region using the stored refresh token.
Returns:
- str: The access token.
Example:
token = region.get_access_token()
print("Access token:", token)
Retrieves a refreshed access token and refresh token from the region.
Returns:
- (str, str): (refresh_token, access_token)
Example:
refresh_t, access_t = region.get_refresh_and_access_token()
Retrieves the login credentials for a specified workspace.
Parameters:
- instance_id (str): The instance ID of the workspace.
Returns:
- (str, str): (username, password)
Example:
user, pwd = region.get_workspace_credentials("instance1234")
print("Workspace creds:", user, pwd)
Re-creates a model deployment, deleting and recreating it. This is an admin operation often used in testing.
Parameters:
- model_deployment_key (str): The key of the model deployment to recreate.
Example:
region.recreate_model_deployment("my_model_deployment_key")
Changes the currently logged-in user's password.
Parameters:
- old_password (str): The current password.
- new_password (str): The new password, which must meet complexity requirements.
Example:
region.change_password(old_password="oldpass", new_password="NewP@ssw0rd!")
print("Password changed successfully.")
Represents a compute resource (pod) running in a Practicus AI Region.
A Worker can host notebooks, run scripts, process data, perform distributed jobs, training,
and serve as the foundation for various workloads.
Workers are managed by the Region
and are typically created through methods like region.create_worker()
or region.get_local_worker()
.
Example:
# Creating a worker through the region:
worker = region.create_worker()
# Running a task on the worker:
_, success = worker.region.run_task("analysis.py", files_path="project_code/")
print("Task successful:", success)
# Opening a notebook on the worker:
notebook_url = worker.open_notebook(get_url_only=True)
print("Notebook URL:", notebook_url)
Once you are done with the Worker, you can terminate it to free resources:
worker.terminate()
Logger configuration for all worker related activity
The type of service this worker provides (e.g., "cloud_worker", "workspace").
Returns:
- str | None: The worker's service type.
The unique instance identifier of this worker.
Returns:
- str | None: The worker's instance ID, or None if not assigned yet.
A system-generated server (Kubernetes pod) name for internal use.
Returns:
- str | None: The server name based on the worker's instance ID.
The internal Kubernetes service address name used within the cluster.
Returns:
- str | None: The cluster service address name, or None if not assigned.
The fully qualified domain name (FQDN) of the worker's service within the Kubernetes cluster.
Returns:
- str | None: The cluster-local FQDN of the worker's service.
A parsed image identifier derived from the worker's image.
Returns:
- str | None: The simplified image ID extracted from the image string.
The timestamp when this worker was created.
Returns:
- datetime | None: The creation timestamp, or None if not available.
The friendly name of this worker.
Returns:
- str | None: The worker's name (e.g., "Worker-123").
The worker size, representing the compute resources allocated (e.g., "Medium").
Returns:
- str | None: The worker size name.
The total memory allocated to this worker, in gigabytes.
Returns:
- float | None: The total memory in GB.
The number of CPU cores allocated to this worker.
Returns:
- float | None: The count of CPU cores.
The number of GPU cores allocated to this worker.
Returns:
- float | None: The GPU core count.
The current status/state of this worker (e.g., "Running", "Terminated").
Returns:
- str | None: The worker's status.
The job ID, if this worker is part of a distributed job cluster.
Returns:
- str | None: The distributed job ID, or None if not applicable.
Indicates whether this worker is in the process of shutting down or already terminated.
Returns:
- bool: True if the worker is terminating or terminated, False otherwise.
Associates the Worker with a specific underlying CloudNode
.
Note: This method is intended for internal use and is typically called when the Worker is created.
Parameters:
- node (CloudNode): The underlying cloud node representing this worker's infrastructure.
Creates a Worker instance representing the local environment if the code is running on a Practicus AI Worker.
Note: This is intended for internal use.
Parameters:
- k8s_region (K8sRegion): The Kubernetes region configuration.
- region (Region): The parent Region instance.
Returns:
- Worker: A Worker object representing the local worker environment.
Terminates the worker, releasing its resources.
Parameters:
- stop_reason (str): A short reason for termination (used for auditing).
Example:
worker.terminate()
Loads data into a new Process running on this worker.
This method sets up a data processing environment (like Pandas or Spark) on the worker to interact with data from a given connection configuration.
Parameters:
- conn_conf (dict | str | ConnConf): The connection configuration for the data source.
- engine (str): The data processing engine, e.g. "AUTO", "PANDAS", "SPARK".
Returns:
- Process: A Process object representing the data loading session.
Uploads files from the local environment to the worker's filesystem.
Parameters:
- source (str): The local file or directory path to upload.
- destination (str | None): The target directory on the worker. Defaults to a predefined upload folder.
- max_files_to_upload (int): Maximum number of files to upload (to prevent huge accidental uploads).
Returns:
- str: The directory on the worker where the files were uploaded.
Example:
upload_path = worker.upload_files("data/")
print("Files uploaded to:", upload_path)
Downloads files from the worker's filesystem to the local environment.
Parameters:
- source (str): The file or directory path on the worker to download.
- destination (str): The local directory to store the downloaded files.
Example:
worker.download_files("results/output.csv", "local_results/")
Runs a Python (.py) or shell (.sh) script on the worker, optionally capturing output.
Parameters:
- task_file_path (str): The file path on the worker to the script to run.
- capture_task_output (bool): If True, captures stdout and stderr of the script.
- python_venv_name (str | None): Name of a Python virtual environment on the worker.
Returns:
- str: A unique task UUID identifying the running task.
Example:
task_uuid = worker.run_task("/path/to/script.py", capture_task_output=True)
Monitors a running task identified by a task UUID until it completes or fails.
Prints logs periodically and returns True if the task completes successfully, False otherwise.
Parameters:
- task_uuid (str): The UUID of the task to check.
Returns:
- bool: True if the task succeeded, False if it failed.
Example:
success = worker.check_task(task_uuid)
print("Task succeeded:", success)
Retrieves a read-only list of Process objects created on this worker.
Returns:
- PrtList[Process]: A list of active processes on this worker.
Removes a process from the worker's internal cache without killing it.
Note: This is intended for internal use. To fully terminate a process, use kill_proc()
.
Parameters:
- proc_id (int): The ID of the process to remove from the cache.
Terminates a specific process running on the worker.
Parameters:
- proc_id (int): The ID of the process to kill.
Example:
worker.kill_proc(proc_id=123)
Terminates all processes running on this worker.
Example:
worker.kill_all_procs()
Opens the Jupyter notebook environment running on this worker in a browser (or returns its URL).
Parameters:
- dark_mode (bool): If True, the notebook UI is displayed in dark mode.
- get_url_only (bool): If True, returns the notebook login URL without opening a browser.
Returns:
- str: The login URL for the notebook.
Example:
url = worker.open_notebook(get_url_only=True)
print("Notebook URL:", url)
Opens Visual Studio Code (VS Code) environment running on this worker in a browser (or returns its URL and password).
Parameters:
- dark_mode (bool): If True, the VS Code UI is displayed in dark mode.
- get_url_only (bool): If True, returns the VS Code URL and token without opening a browser.
Returns:
- (str, str): The VS Code URL and the authentication token.
Example:
url, token = worker.open_vscode(get_url_only=True)
print("VS Code URL:", url)
print("Token:", token)
Opens a workspace environment (e.g., a specialized development UI) running on this worker.
Parameters:
- get_url_only (bool): If True, returns the workspace URL without opening a browser.
Returns:
- str: The workspace URL.
Example:
url = worker.open_workspace(get_url_only=True)
print("Workspace URL:", url)
Retrieves login credentials (username, password) for this workspace.
Returns:
- (str, str): (username, password) for the workspace login.
Example:
user, pwd = worker.get_workspace_credentials()
print("Workspace user:", user)
Sends a ping request to the worker to measure responsiveness.
Parameters:
- raise_on_error (bool): If True, raise an exception if the worker cannot be reached.
Returns:
- float: The ping time in seconds, or 0.0 if failed and
raise_on_error=False
.
Example:
latency = worker.ping()
print("Latency:", latency, "seconds")
Waits until the worker is fully ready to serve requests.
Parameters:
- timeout (int): Time (in seconds) before raising a TimeoutError if the worker isn't ready.
Returns:
- float: The final ping time once the worker is ready.
Example:
wait_time = worker.wait_until_ready(timeout=120)
print("Worker ready, ping:", wait_time, "seconds")
Retrieves the latest portion of the worker's logs.
Parameters:
- log_size_mb (int): The size of logs (in MB) to retrieve, from the bottom of the log file.
Returns:
- str | None: The logs as a string, or None if no logs are found.
Example:
logs = worker.get_logs(log_size_mb=2)
print("Worker logs:", logs)
Prints the latest portion of the worker's logs to stdout.
For reading logs into a variable, use get_logs()
.
Parameters:
- log_size_mb (int): The size of logs (in MB) to print.
Example:
worker.view_logs()
Represents a process running on a Practicus AI Worker. A Process
corresponds to an actual OS-level process
in the worker’s environment. It can load and manipulate data, run transformations, execute code snippets,
and integrate with machine learning model building and prediction tasks.
Key Capabilities:
- Data Loading & Processing: Load data from various sources (Data Lakes, S3, databases, local files) into the worker’s environment, and perform transformations such as filtering, sorting, grouping, and type changes.
- Running Code & Snippets: Execute custom Python code blocks, SQL queries, and pre-defined snippets directly on the worker.
- Model Operations: Build, register, and search ML models. Integrate model predictions into your data pipeline.
- Workflow Integration: Run recorded steps, save and restore workflows, and apply transformations in sequence.
- Lifecycle Management: Automatically kills itself when going out of scope if used as a context manager, and provides methods to terminate, wait until current operations are complete, and manage logs.
Example:
with worker.load(s3_connection) as proc:
# Perform data transformations
proc.filter("price > 100")
proc.sort_column(["price"])
# Retrieve updated DataFrame
df = proc.get_df_copy()
print(df.head())
# Process is automatically killed at the end of the 'with' block
After the process finishes, you can also run model predictions or save the processed data to another destination.
Initializes a new Process associated with a given Worker.
Parameters:
- node (CloudNode): The underlying cloud node (e.g., Kubernetes pod) representing the worker.
- worker (Worker): The parent Worker instance that this process runs on.
Manages the internal data (worksheet) and steps executed by this process.
Tracks issues or asynchronous operation results that occur during process execution.
Returns a CSV header line representing the columns used in the __str__
method.
Returns:
- str: CSV-formatted header line for process attributes.
The internal process ID assigned by Practicus AI.
Returns:
- int: The process ID, or -1 if not available.
The underlying OS-level PID of the process running on the worker.
Returns:
- int: The OS PID, or -1 if not available.
The connection configuration used by this process to load data.
Returns:
- ConnConf | None: The connection configuration, or None if not set.
A friendly, human-readable description of the current connection configuration.
Returns:
- str: The connection's long description, or an empty string if unavailable.
The name of the worker hosting this process.
Returns:
- str: The worker name, or an empty string if unavailable.
Executes a single transformation or action (a Step
) on the process.
Parameters:
- step (Step): The step to run, representing a data transformation or action.
- record_step (bool): If True, records the step in the worksheet history.
Returns the internal dataengine representation enum
Deletes one or more recorded steps from the internal worksheet by their step numbers.
Saves the current worksheet, including steps and data sampling settings, to a specified file path.
Retrieves the activity log of all operations performed in the worksheet.
Displays the recorded activity logs and issues from the worker.
Parameters:
- raise_error_on_issues (bool): If True, raises an error if any issues are found.
Displays any captured asynchronous operation issues from the worker.
Parameters:
- raise_error_on_issues (bool): If True, raises an error if issues are present.
Checks if the local worker control plane is active
Returns the internal Worker representation object
View documentation of Region
.get_model_api_session_token
Finds E.g., file.json or some/sub/path/file.py up in the caller stack
Prints the first few rows of the DataFrame currently managed by this process.
Note: The client SDK might have limited or no data depending on previous steps and sampling configurations.
Loads data into the process from the specified connection and engine configuration.
Parameters:
- conn_conf (dict | str | ConnConf): Connection information, can be a JSON/dict or a ConnConf instance.
- engine (str): The data processing engine, e.g. "AUTO", "PANDAS", "SPARK".
Saves the current data state to another destination, defined by the given connection configuration.
Parameters:
- connection (dict | str): The destination connection configuration.
- timeout_min (int): The maximum number of minutes to wait for completion.
Renames a single column from one name to another.
Renames multiple columns at once using a dictionary mapping old names to new names.
Changes the data type of a specified column.
Filters the DataFrame rows based on a provided conditional expression.
Converts a categorical column into multiple one-hot encoded columns.
Maps categorical values of a specified column into numeric or string representations.
Splits the values of a column into multiple parts using a specified delimiter.
Handles missing values in the specified columns using a given technique or a custom value.
Sorts the DataFrame by specified columns in ascending or descending order.
Groups the DataFrame by specified columns and applies aggregation functions.
Performs time-based sampling or aggregation of the data using a date column and specified frequency.
Replaces occurrences of an old value with a new value in a specified column.
Creates a new column by evaluating a formula expression involving existing columns.
Runs a custom Python function's code on the worker.
Parameters:
- custom_function: A Python function object whose source code is executed remotely.
Runs a predefined snippet (a .py file) from the caller's directory stack.
Parameters:
- snippet_name (str): The snippet's name (without
.py
extension). - **kwargs: Parameters passed to the snippet's code.
Executes a custom SQL query against the current data, treating it as a table with a given name.
Builds a machine learning model on the worker using the provided configuration.
Parameters:
- model_config (dict | str): The model configuration as a dictionary or JSON string.
- timeout_min (int): Maximum minutes to wait for model build completion.
Returns:
- ModelConfig | None: The finalized model configuration if successful, else None.
Registers the last built AI model, making it available in MLFlow or related model registries.
Searches for models that match a given text query using MlFlow.
Parameters:
- model_text (str): The search string or query.
Returns:
- ModelSearchResults | None: The search results, or None if none found.
Runs predictions against a deployed model API endpoint.
Parameters:
- api_url (str): The model API URL.
- api_token (str | None): The access token for the model API. If None, attempts to get one automatically.
- column_names (list[str] | None): The columns to include as input features.
- new_column_name (str | None): The name for the new prediction column.
- ground_truth_col (str | None): If provided, compares predictions against a ground truth column.
- model_id (int | None): Specific model ID to query if multiple versions exist.
- batch_size (int | None): Batch size for prediction requests.
- compression_algo (str | None): Optional compression for data transfer.
Makes predictions using an offline model available locally (e.g., MLflow model URI).
Parameters:
- column_names (list[str] | None): Input feature columns.
- new_column_name (str | None): Column name for predictions.
- future_horizon (int | None): For forecasting, how many steps into the future to predict.
- mlflow_model_uri (str | None): URI for the MLflow model.
- model_conf_path (str | None): Path to a model configuration file.
- model_conf (str | None): Model configuration in JSON/dict form.
- problem_type (str | None): Type of problem (e.g., classification, regression, forecasting).
Joins the current DataFrame with another dataset from a specified connection.
Parameters:
- conn_conf (str | dict): The connection configuration of the data source to join.
- left_key_col_name (str): The key column in the current DataFrame.
- right_key_col_name (str): The key column in the target dataset.
- right_ws_name (str | None): Optional name for the right worksheet.
- join_technique (str): The join type ("Left", "Right", "Inner", "Outer").
- suffix_for_overlap (str): Suffix added to overlapping columns.
- summary_column (bool): If True, adds a summary column of join results.
Waits until all operations and steps have finished executing on the worker, or until the given timeout elapses.
Parameters:
- timeout_min (int): Maximum number of minutes to wait.
Retrieves a copy of the DataFrame currently held by the process after ensuring all operations are done.
Parameters:
- timeout_min (int): Maximum number of minutes to wait until done.
Returns:
- pd.DataFrame: A pandas DataFrame representing the processed data.
Retrieves a logger instance associated with a given Log
enumeration value.
Logging is critical for monitoring the behavior of the SDK and diagnosing issues. The Practicus AI SDK uses named loggers for different components, allowing you to configure their levels and outputs independently.
Parameters:
- log (Log): The
Log
enum value representing the desired logger.
Returns:
- logging.Logger: A configured logger instance for the specified log component.
Example:
from practicuscore import get_logger, Log
logger = get_logger(Log.SDK)
logger.info("This is an informational message from the SDK logger.")
Enumeration of loggers used throughout the Practicus AI SDK, each corresponding to a specific subsystem or feature.
Adjusts the logging level globally or for specific modules within the Practicus AI SDK and its dependencies.
By default, logs may be set to a certain level. This function allows you to raise or lower the verbosity of logs depending on your debugging needs. For instance, setting the log level to DEBUG can help troubleshoot complex issues by providing more detailed output, while INFO or WARNING might be sufficient for normal operations.
Parameters:
- log_level (str | None): The global logging level to apply. Accepts standard Python logging levels (e.g., "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"). If None, does not change the global level.
- modules_log_level (str): A comma-separated list of module-specific logging levels. For example:
"practicus:DEBUG"
sets the logging level for the "practicus" module to DEBUG."urllib3:INFO"
sets the "urllib3" module to INFO."*:DEBUG"
sets DEBUG level for all modules. Multiple rules can be combined by separating them with commas, e.g."practicus:DEBUG,urllib3:INFO"
.
Returns:
- None
Example:
from practicuscore import set_logging_level
# Set the global logging level to INFO
set_logging_level(log_level="INFO")
# Set DEBUG level for practicus-related logs and INFO for urllib3
set_logging_level(modules_log_level="practicus:DEBUG,urllib3:INFO")
Defines a worker configuration for launching Practicus AI Workers.
Usage Example:
worker_config = WorkerConfig(
worker_image="practicus",
worker_size="Small",
)
The container image to be used for this worker.
If you provide a simple image name like practicus-gpu-torch
, it will be expanded to a full image name
(e.g. ghcr.io/practicusai/practicus-gpu-torch
) with a default version. You can also specify a fully qualified
image such as my-container-repo/my-container:some-version
.
Note: Custom container images must be based on a Practicus AI-compatible base image.
The worker size indicating the CPU, RAM, and GPU resources allocated to the worker.
Example: "Small", "Medium", "Large".
The type of service this worker represents, typically "cloud_worker" or "workspace". If omitted, defaults to "cloud_worker".
The network protocol to use for this worker. Valid values are "http" or "https". If omitted, the worker will choose a suitable default.
Configuration for distributed jobs (e.g., Spark, Dask, Torch).
If provided, it defines the parameters and ports for running a distributed cluster.
An optional startup script (shell commands) to be run when the worker starts. This should be a small script encoded as plain text.
The log level for the worker process itself. Examples: "DEBUG", "INFO", "WARNING", "ERROR". If omitted, defaults to a region-level or system default.
A module-specific log level configuration, if you want certain modules to log at different levels.
Set this to "True" if you need to bypass SSL certificate verification. Generally not recommended unless working with trusted but self-signed certs.
Policy for pulling the container image. Valid values: "Always", "IfNotPresent", "Never". If omitted, a suitable default is used.
Indicates if the worker should be run in an interactive mode, e.g. allowing shell access or interactive sessions.
An optional service URL. For special use-cases where the worker might need to connect to a particular endpoint (e.g., a custom model host), you can specify it here.
An optional user email associated with this worker's configuration, if needed for authentication or logging.
An optional refresh token for authentication against certain services.
If provided, the worker might use it to obtain a fresh access token automatically.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
Validate a pydantic model instance.
Args: obj: The object to validate. strict: Whether to enforce types strictly. from_attributes: Whether to extract data from object attributes. context: Additional context to pass to the validator.
Raises: ValidationError: If the object could not be validated.
Returns: The validated model instance.
Validates and creates a WorkerConfig object from a JSON string.
Parameters:
- json_data: The JSON-encoded string, bytes, or bytearray containing the worker config data.
Returns:
- WorkerConfig: A validated WorkerConfig instance.
Raises:
- ValueError: If the JSON is invalid or required fields are missing.
Usage docs: https://docs.pydantic.dev/2.9/concepts/serialization/#modelmodel_dump
Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.
Args:
mode: The mode in which to_python
should run.
If mode is 'json', the output will only contain JSON serializable types.
If mode is 'python', the output may contain non-JSON-serializable Python objects.
include: A set of fields to include in the output.
exclude: A set of fields to exclude from the output.
context: Additional context to pass to the serializer.
by_alias: Whether to use the field's alias in the dictionary key if defined.
exclude_unset: Whether to exclude fields that have not been explicitly set.
exclude_defaults: Whether to exclude fields that are set to their default value.
exclude_none: Whether to exclude fields that have a value of None
.
round_trip: If True, dumped values should be valid as input for non-idempotent types such as Json[T].
warnings: How to handle serialization errors. False/"none" ignores them, True/"warn" logs errors,
"error" raises a [PydanticSerializationError
][pydantic_core.PydanticSerializationError].
serialize_as_any: Whether to serialize fields with duck-typing serialization behavior.
Returns: A dictionary representation of the model.
Serializes the WorkerConfig to a JSON string, including any additional parameters.
Parameters:
- indent (int | None): Indentation for pretty-printing JSON. Defaults to 4 if not set.
- **kwargs: Additional arguments passed to
model_dump
.
Returns:
- str: A JSON-encoded string of the WorkerConfig.
Metadata about the fields defined on the model,
mapping of field names to [FieldInfo
][pydantic.fields.FieldInfo] objects.
This replaces Model.__fields__
from Pydantic V1.
Represents the type of distributed job or cluster environment you want to create and manage.
Different distributed frameworks have varying requirements and behaviors. By specifying a DistJobType
,
you inform the Practicus AI platform how to set up, start, and manage the underlying distributed environment.
Types:
python
: A generic Python-based distributed job.torch
: A PyTorch-based distributed training job.deepspeed
: A DeepSpeed-based distributed training job.fairscale
: A FairScale-based distributed training job.horovod
: A Horovod-based distributed training job.spark
: A Spark-based distributed job or interactive cluster.dask
: A Dask-based distributed job or interactive cluster.custom
: A user-defined distributed job type with a custom adaptor.
Configuration for distributed jobs in Practicus AI.
A distributed job involves multiple worker nodes cooperating to run a large-scale task, such as Spark, Dask, or Torch-based training jobs. This configuration defines how the cluster is formed, how many workers, memory, and CPU resources to allocate, as well as additional parameters like job directories, Python files, and termination conditions.
Usage Example:
dist_conf = DistJobConfig(
job_type=DistJobType.deepspeed,
job_dir="/path/to/job_dir",
worker_count=10,
py_file="job.py"
)
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError
][pydantic_core.ValidationError] if the input data cannot be
validated to form a valid model.
self
is explicitly positional-only to allow self
as a field name.
Directory containing job code and related files. For non-auto-distributed Spark and Dask jobs, and for all other job types, this must be provided.
If True and job_type
is Spark, the cluster is managed automatically (auto-scaling, etc.). Currently only supported
for Spark.
(alias) Sets initial_count
and max_count
to the same value, resulting in a fixed cluster size.
Use worker_count
if you are not auto-scaling.
Set the initial number of workers. If not using worker_count
, you must specify both initial_count
and
max_count
.
Set the maximum number of workers. If not using worker_count
, must be set along with initial_count
.
The coordinator (master) port. If left empty, a suitable default is used based on job_type
.
List of extra ports for worker communication. Leave empty to use defaults. Most job types do not need these.
Specifies a custom Python class (adaptor) extending job handling logic. Must refer to a class accessible at runtime.
If True, terminates all workers after job completion. Set to False to keep the cluster alive for further exploration, experiments, or debugging.
If True, captures and logs stdout/stderr of job scripts (e.g. .py, .sh). Disable if already logging to avoid duplicates.
By default disabled for performance. If True, enables service mesh sidecars for encrypted traffic between workers.
Time in seconds to wait for the cluster to fully start before timing out.
The name of a Python virtual environment (under ~/.venv/) to use. Leave empty for the default venv.
If True, places logs and artifacts in the run directory. Leave empty for defaults.
Interval in seconds for measuring system and GPU usage if measure_utilization
is True.
If True, coordinator also acts as a worker. Default True if unset. If False, coordinator doesn't run tasks, freeing resources.
Number of processes/executors per worker node. For Spark, this is the executor count per node; for Dask, the worker count.
Number of threads per executor/process. In Spark, corresponds to executor cores; in Dask, --nthreads
per worker.
Memory limit per executor/process in GB. For Spark, maps to executor/driver memory; for Dask, --memory-limit
.
(Read-only) A list of executor definitions, set by the system after cluster creation.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
Metadata about the fields defined on the model,
mapping of field names to [FieldInfo
][pydantic.fields.FieldInfo] objects.
This replaces Model.__fields__
from Pydantic V1.
A specialized list that can be toggled as read-only. It also provides utilities for converting its items into CSV, pandas DataFrames, and JSON formats.
Key Features:
- Inherits all behavior from the built-in
list
, but adds a read-only flag. - If
read_only
is set toTrue
, any attempt to modify the list will raise aValueError
. - Supports easy data exporting to CSV (via
__str__
), pandas DataFrame (viato_pandas
), and JSON (viato_json
). - Can convert its contents into a list of dictionaries (via
to_dict_list
), making it easier to manipulate or serialize.
Check if the list is currently read-only.
Returns
True
if read-only,False
otherwise.
Convert the list to a pandas DataFrame by reading the CSV representation.
Returns
A
pandas.DataFrame
containing the list's data.
Retrieves the default Practicus AI region. The default region is typically the one last logged-in to
or explicitly set using set_default_region
.
Returns
The default
Region
instance.
Creates a new remote Practicus AI Worker in the current or specified region.
A worker is a computational pod that can run code, host Jupyter notebooks, build models etc.
By default, it uses the current region unless worker_config
points to another region.
Parameters:
- worker_config (WorkerConfig | str | dict | None): Optional configuration for the worker. Accepts
a JSON path, a dict, or a
WorkerConfig
object. If None, uses the default configuration. - wait_until_ready (bool | None): If True, the method waits until the worker is fully provisioned and ready.
Returns
A
Worker
instance representing the newly created remote pod.
Returns the currently active Practicus AI region. If the code is running inside a worker, this is the region associated with that worker. Otherwise, this returns the default configured region.
Returns
A
Region
object for the current environment.
Attempts to retrieve an existing worker (if it matches the provided configuration) or creates one if not found.
This is useful for idempotent deployments where you do not want to create duplicates if the worker already exists.
Parameters:
- worker_config (WorkerConfig | str | dict | None): The configuration to check against existing workers. If not provided, defaults are used.
Returns
A
Worker
instance, either existing or newly created.
Checks if the current code is executing inside a Practicus AI Worker.
This is useful for conditional logic that depends on whether the code runs locally or on a remote Practicus AI-managed environment.
Returns
True if this code is running inside a Practicus AI Worker pod; False otherwise.
Retrieves a specific Practicus AI region based on the provided region key or returns the default region.
Parameters:
- region_key (str | None): A region identifier in either
username@region_address
orregion_address
format. If not provided, the default region is returned.
Returns:
A Region
object.
Example:
# If multiple regions are available:
region = prt.regions.get_region("alice@my-practicus-region.example.com")
# If none provided, defaults to last used or configured default:
default_region = prt.regions.get_region()
Creates or retrieves a Region
instance based on a provided worker configuration or returns the
current/default region if none is provided.
This is useful in contexts where you may have a serialized or external configuration that specifies a region.
Parameters:
- worker_config (WorkerConfig | str | dict | None): A configuration object or JSON path/dict that may contain region connection details. If None, returns the current region.
Returns
A
Region
instance determined by the provided configuration or the current region.
Sets the default Practicus AI region. Subsequent operations that do not explicitly specify a region will use this default.
Parameters:
- region_key (str): The region identifier, e.g.
username@my-region.example.com
or justmy-region.example.com
if only one user is associated with it.
Returns
True if the region is successfully set as default; False otherwise.
Creates a new Practicus AI Workspace (a special type of Worker) in the selected region.
A workspace is a worker configured for interactive development and includes Practicus AI Studio, office tools and more.
Parameters:
- worker_config (WorkerConfig | str | dict | None): Configuration for the workspace. Accepts a JSON path,
a dict, or
WorkerConfig
. If None, uses the default configuration.
Returns
A
Worker
instance configured as a workspace.
Runs a specified script (Python or shell) as a "task" on a newly created remote worker.
Common uses include running batch jobs, scheduled tasks, or CI/CD pipeline steps in a controlled environment.
Parameters:
- file_name (str): The path to the script to run (e.g. "run_analysis.py" or "deploy.sh").
- files_path (str | None): The directory containing all necessary files to upload. If None, uses current directory.
- worker_config (WorkerConfig | str | dict | None): Configuration for the worker to run this task.
- terminate_on_completion (bool): If True, the worker is terminated after the task finishes.
- capture_task_output (bool): If True, captures and logs stdout/stderr from the task's execution.
- python_venv_name (str | None): Name of the Python virtual environment on the worker to use.
- max_files_to_upload (int): Maximum number of files to upload from
files_path
.
Returns
A tuple of
(Worker, bool)
where Worker is the worker used or created for this task, and bool indicates if the task succeeded.