Skip to content

Build

# Parameters: Replace with your actual deployment key and app prefix
# These identify where the app should be deployed within your Practicus AI environment.
app_deployment_key = None
app_prefix = "apps"
assert app_deployment_key, "Please select a deployment key"
import practicuscore as prt

# Analyze the current directory for Practicus AI App components (APIs, MQ consumers, UI, etc.)
# This should output the list of detected API endpoints.
prt.apps.analyze()
# --- Deployment ---
app_name = "agentic-ai-test-retention"
visible_name = "Agentic AI Test retention"
description = "Test Application for Agentic AI Example."
icon = "fa-robot"

print(f"Deploying app '{app_name}'...")

app_url, api_url = prt.apps.deploy(
    deployment_setting_key=app_deployment_key,
    prefix=app_prefix,
    app_name=app_name,
    app_dir=None,  # Uses current directory
    visible_name=visible_name,
    description=description,
    icon=icon,
)

print("App deployed successfully!")
print(f"  API Base URL: {api_url}")

# The OpenAPI (Swagger) documentation (Swagger/ReDoc) is usually available at /docs or /redoc off the API base URL
print(f"  OpenAPI (Swagger) Docs (ReDoc): {api_url}redoc/")

# Store the api_url for later use when creating tools
assert api_url, "Deployment failed to return an API URL."
tool_endpoint_paths = [
    "aggregate-retention-insight-report/",
    "analyze-complaint-topics/",
    "analyze-sentiment-trends/",
    "detect-negative-sentiment-clusters/",
    "generate-customer-summary/",
    "generate-retention-plan/",
    "generate-retention-risk-scores/",
    "load-customer-interaction-data/",
    "summarize-customer-retention-case/",
    "visualize-customer-journey-map/",
]


# Construct full URLs
tool_endpoint_urls = [api_url + path for path in tool_endpoint_paths]

print("Will attempt to create tools for the following API endpoints:")
for url in tool_endpoint_urls:
    print(f" - {url}")

# Tip: If you pass partial API URLs e.g. 'apps/agentic-ai-test/api/v1/generate-receipt/'
#  The base URL e.g. 'https://practicus.my-company.com/' will be added to the final URL
#  using your current Practicus AI region.
import os
from langchain_openai import ChatOpenAI  # Or your preferred ChatModel
from langgraph.prebuilt import create_react_agent
from langchain_practicus import APITool

# Ensure practicuscore is imported for enums
import practicuscore as prt


def validate_api_spec(api_tool: APITool, strict=False) -> bool:
    """Checks the APISpec of a fetched tool against our rules."""

    # APITool fetches the spec from OpenAPI (Swagger) during initialization
    spec = api_tool.spec

    if not spec:
        # API definition in the source file might be missing the 'spec' object
        warning_msg = f"API '{api_tool.url}' does not have APISpec metadata defined in its OpenAPI spec."
        if strict:
            raise ValueError(f"{warning_msg} Validation is strict.")
        else:
            prt.logger.warning(f"{warning_msg} Allowing since validation is not strict.")
            return True  # Allow if not strict

    # --- Apply Rules based on fetched spec ---

    # Rule 1: Check Risk Profile
    if spec.risk_profile and spec.risk_profile == prt.APIRiskProfile.High:
        err = f"API '{api_tool.url}' has a risk profile defined as '{spec.risk_profile}'."
        if strict:
            err += " Stopping flow since validation is strict."
            raise ValueError(err)
        else:
            # Even if not strict, we might choose to block High risk tools
            prt.logger.warning(f"{err} Blocking High Risk API even though validation is not strict.")
            return False  # Block high risk

    # Rule 2: Check Human Gating for non-read-only APIs
    # (Example: Enforce human gating for safety on modifying APIs)
    if not spec.read_only and not spec.human_gated:
        err = f"API '{api_tool.url}' modifies data (read_only=False) but is not marked as human_gated."
        if strict:
            err += " Stopping flow since validation is strict."
            raise ValueError(err)
        else:
            prt.logger.warning(f"{err} Allowing non-gated modifying API since validation is not strict.")
            # In this non-strict case, we allow it, but a stricter policy might return False here.
            return True

    # Add more complex rules here if needed...
    # E.g., check custom_attributes, scope, etc.

    # If no rules were violated (or violations were allowed because not strict)
    prt.logger.info(f"API '{api_tool.url}' passed validation (strict={strict}). Spec: {spec}")
    return True


# --- Create Tools (optionally applying validation) ---
def get_tools(endpoint_urls: list[str], validate=True):
    _tools = []
    strict_validation = False  # Set to True to enforce stricter rules
    additional_instructions = "Add Yo! after all of your final responses."  # Example instruction

    print(f"\nCreating and validating tools (strict={strict_validation})...")
    for tool_endpoint_url in endpoint_urls:
        print(f"\nProcessing tool for API: {tool_endpoint_url}")
        try:
            api_tool = APITool(
                url=tool_endpoint_url,
                additional_instructions=additional_instructions,
                # token=..., # Uses current user credentials by default, set to override
                # include_resp_schema=True # Response schema (if exists) is not included by default
            )

            # Explain the tool (optional, useful for debugging)
            # api_tool.explain(print_on_screen=True)

            # Validate based on fetched APISpec
            if not validate or validate_api_spec(api_tool=api_tool, strict=strict_validation):
                print(
                    f"--> Adding tool: {api_tool.name} ({api_tool.url}) {'' if validate else ' - skipping validation'}"
                )
                _tools.append(api_tool)
            else:
                print(f"--> Skipping tool {api_tool.name} due to validation rules.")
        except Exception as e:
            # Catch potential errors during APITool creation (e.g., API not found, spec parsing error)
            print(f"ERROR: Failed to create or validate tool for {tool_endpoint_url}: {e}")
            if strict_validation:
                raise  # Re-raise if strict
            else:
                print("--> Skipping tool due to error (not strict).")

    return _tools


tools = get_tools(tool_endpoint_urls)
print(f"\nAgent will have access to {len(tools)} tools: {[t.name for t in tools]}")
# For this excercise we will skip validating APIs

tools = get_tools(tool_endpoint_urls, validate=False)
print(f"\nAgent will have access to {len(tools)} tools: {[t.name for t in tools]}")
# View tool explanation

for tool in tools:
    tool.explain()
openaikey, age = prt.vault.get_secret("openaikey")
os.environ["OPENAI_API_KEY"] = openaikey

assert os.environ["OPENAI_API_KEY"], "OpenAI key is not defined"
# --- Agent Initialization ---
llm = ChatOpenAI(model="gpt-4o", temperature=0)
# Create a ReAct agent using LangGraph
graph = create_react_agent(llm, tools=tools)
print("Agent initialized.")


# Helper function to print the agent's stream output nicely
def pretty_print_stream_chunk(chunk):
    print("--- Agent Step ---")
    for node, updates in chunk.items():
        print(f"Node: {node}")
        if isinstance(updates, dict) and "messages" in updates:
            # Print the latest message added by the node
            latest_message = updates["messages"][-1]
            latest_message.pretty_print()
        else:
            # Print other kinds of updates
            print(f"  Update: {updates}")
    print("--------------------\n")
def pretty_print_stream_chunk2(chunk):
    print("--- Agent Step ---")
    for node, updates in chunk.items():
        print(f"Node: {node}")
        if isinstance(updates, dict) and "messages" in updates:
            latest_message = updates["messages"][-1]
            latest_message.pretty_print()
        else:
            print(f"  Update: {updates}")
            if isinstance(updates, Exception):
                print("  ⚠️ Exception Detected in Agent Execution!")
    print("--------------------\n")
query = """
Identify clusters of customers who are expressing negative sentiment in their recent feedback, broken down by date and channel. I want to detect any spikes or trends that might indicate dissatisfaction periods.
"""


inputs = {"messages": [("user", query)]}

if graph:
    print(f"\nInvoking agent with query: '{query}'")
    print("Streaming agent execution steps:\n")

    # Configuration for the stream, e.g., setting user/thread IDs
    # config = {"configurable": {"user_id": "doc-user-1", "thread_id": "doc-thread-1"}}
    config = {}
    # Use astream to get intermediate steps
    async for chunk in graph.astream(inputs, config=config):
        pretty_print_stream_chunk2(chunk)

    print("\nAgent execution finished.")

    # Optional: Get the final state if needed
    # final_state = await graph.ainvoke(inputs, config=config)
    # print("\nFinal Agent State:", final_state)

else:
    print("\nAgent execution skipped because the agent graph was not initialized.")
# Cleanup
prt.apps.delete(prefix=app_prefix, app_name=app_name)

API TEST

from apis.analyze_sales_trends import AnalyzeSalesTrendsRequest, AnalyzeSalesTrendsResponse, SalesRecord
from datetime import datetime
import practicuscore as prt

sales_data = [
    SalesRecord(date=datetime(2025, 1, 1), product="Laptop", region="Asia", units_sold=100, revenue=100000.0),
    SalesRecord(date=datetime(2025, 1, 2), product="Laptop", region="Asia", units_sold=120, revenue=120000.0),
    SalesRecord(date=datetime(2025, 1, 3), product="Tablet", region="Europe", units_sold=80, revenue=32000.0),
]

payload = AnalyzeSalesTrendsRequest(
    sales_data=sales_data, start_date=datetime(2025, 1, 1), end_date=datetime(2025, 1, 3)
)

response: AnalyzeSalesTrendsResponse = prt.apps.test_api("/analyze-sales-trends", payload)

print(response)
from apis.sentiment_test_slogan import SentimentTestSloganRequest, SentimentTestSloganResponse
from pydantic import BaseModel
import practicuscore as prt

# Prepare payload
slogans = [
    "Power Up Your Productivity!",
    "Nothing beats the classics.",
    "Innovation in Every Click.",
    "Your Tech, Your Edge.",
    "Be Bold. Be Better.",
]
payload = SentimentTestSloganRequest(slogans=slogans)

# Type check (optional)
print(issubclass(type(payload), BaseModel))  # Should print True

# Local test via Practicus
response: SentimentTestSloganResponse = prt.apps.test_api("/sentiment-test-slogan", payload)

# Output the results
for result in response.results:
    print(f"Slogan: {result.slogan}")
    print(f"Sentiment: {result.sentiment}")
    print(f"Comment: {result.comment}")
    print("-----")
from apis.top_products_insight import TopProductsInsightRequest, TopProductsInsightResponse, SalesRecord
from pydantic import BaseModel
from datetime import datetime
import practicuscore as prt

# Örnek satış verisi
sales_data = [
    SalesRecord(date=datetime(2025, 1, 1), product="Laptop", region="Asia", units_sold=120, revenue=120000.0),
    SalesRecord(date=datetime(2025, 1, 2), product="Laptop", region="Asia", units_sold=100, revenue=100000.0),
    SalesRecord(date=datetime(2025, 1, 3), product="Tablet", region="Europe", units_sold=80, revenue=32000.0),
    SalesRecord(date=datetime(2025, 1, 4), product="Tablet", region="Europe", units_sold=70, revenue=28000.0),
    SalesRecord(date=datetime(2025, 1, 5), product="Monitor", region="North America", units_sold=95, revenue=47500.0),
]

# Test payload
payload = TopProductsInsightRequest(sales_data=sales_data, top_n=3)

# API test
try:
    response: TopProductsInsightResponse = prt.apps.test_api("/top-products-insight", payload)

    # Sonuçları yazdır
    for product in response.products:
        print(f"Product: {product.product}")
        print(f"Total Units Sold: {product.total_units_sold}")
        print(f"Top Region: {product.top_region}")
        print(f"Insight: {product.insight}")
        print("-----")

except Exception as e:
    prt.logger.error(f"[test-top-products-insight] Exception: {e}")
    raise
from practicuscore import apps
from apis.predict_growth_opportunities import PredictGrowthOpportunitiesRequest

payload = PredictGrowthOpportunitiesRequest(
    top_products=[{"product": "Tablet", "total_units_sold": 305, "top_region": "Asia"}],
    regional_drops=[{"region": "Europe", "product": "Tablet", "drop_percentage": 100.0}],
    trend_summary={"trend_direction": "increasing", "peak_day": "2025-01-10"},
)

response = apps.test_api("/predict-growth-opportunities", payload)
print(response)
from apis.predict_growth_opportunities import run, PredictGrowthOpportunitiesRequest

payload = PredictGrowthOpportunitiesRequest(
    top_products=[{"product": "Tablet", "total_units_sold": 305, "top_region": "Asia"}],
    regional_drops=[{"region": "Europe", "product": "Tablet", "drop_percentage": 100.0}],
    trend_summary={"trend_direction": "increasing", "peak_day": "2025-01-10"},
)

await run(payload)

Supplementary Files

apis/aggregate_retention_insight_report.py

# apis/aggregate_retention_insight_report.py

import practicuscore as prt
from pydantic import BaseModel
from typing import List, Dict


class CustomerRetentionCase(BaseModel):
    customer_id: str
    """ID of the customer evaluated for retention."""

    churn_risk: str
    """Detected churn risk level: low, medium, or high."""

    complaint_topic: str
    """Most prominent complaint theme, extracted via feedback analysis."""

    retention_action: str
    """Suggested key retention action from the plan."""

    model_config = {
        "use_attribute_docstrings": True,
        "json_schema_extra": {
            "examples": [
                {
                    "customer_id": "C088",
                    "churn_risk": "high",
                    "complaint_topic": "billing errors",
                    "retention_action": "offer billing correction and compensation"
                }
            ]
        }
    }


class AggregateRetentionInsightRequest(BaseModel):
    retention_cases: List[CustomerRetentionCase]
    """Retention-level insights for each customer including root cause and remedy."""

    model_config = {
        "use_attribute_docstrings": True,
        "json_schema_extra": {
            "examples": [
                {
                    "retention_cases": [
                        {
                            "customer_id": "C088",
                            "churn_risk": "high",
                            "complaint_topic": "billing errors",
                            "retention_action": "offer billing correction and compensation"
                        }
                    ]
                }
            ]
        }
    }


class InsightSummary(BaseModel):
    key_findings: List[str]
    """Executive summary of key findings from churn analysis."""

    dominant_issues: Dict[str, int]
    """Complaint topic frequency distribution."""

    churn_risk_distribution: Dict[str, int]
    """Count of customers by churn risk level."""

    strategic_recommendation: str
    """Final LLM-generated strategic recommendation summary."""

    model_config = {
        "use_attribute_docstrings": True,
        "json_schema_extra": {
            "examples": [
                {
                    "key_findings": [
                        "Most churn risk is observed in mid-tier accounts",
                        "Support-related issues dominate complaint patterns"
                    ],
                    "dominant_issues": {
                        "support delays": 12,
                        "billing errors": 9,
                        "missing features": 5
                    },
                    "churn_risk_distribution": {
                        "high": 14,
                        "medium": 22,
                        "low": 64
                    },
                    "strategic_recommendation": "Invest in proactive support, improve billing clarity, and introduce churn detection triggers for mid-risk customers."
                }
            ]
        }
    }


class AggregateRetentionInsightResponse(BaseModel):
    summary: InsightSummary
    """Top-level synthesized insight report for decision makers."""

    model_config = {
        "use_attribute_docstrings": True,
        "json_schema_extra": {
            "examples": [
                {
                    "summary": {
                        "key_findings": [
                            "Most churn risk is observed in mid-tier accounts",
                            "Support-related issues dominate complaint patterns"
                        ],
                        "dominant_issues": {
                            "support delays": 12,
                            "billing errors": 9,
                            "missing features": 5
                        },
                        "churn_risk_distribution": {
                            "high": 14,
                            "medium": 22,
                            "low": 64
                        },
                        "strategic_recommendation": "Invest in proactive support, improve billing clarity, and introduce churn detection triggers for mid-risk customers."
                    }
                }
            ]
        }
    }


api_spec = prt.APISpec(
    execution_target=prt.APIExecutionTarget.DirectExecution,
    read_only=True,
    interactive=True,
    risk_profile=prt.APIRiskProfile.Low
)


@prt.api("/aggregate-retention-insight-report", spec=api_spec)
async def run(payload: AggregateRetentionInsightRequest, **kwargs) -> AggregateRetentionInsightResponse:
    """
    Aggregates individual customer churn analyses into a strategic retention report.
    """
    risk_counts = {"low": 0, "medium": 0, "high": 0}
    issue_counts = {}

    for case in payload.retention_cases:
        risk_counts[case.churn_risk] += 1
        issue_counts[case.complaint_topic] = issue_counts.get(case.complaint_topic, 0) + 1

    key_findings = [
        f"{risk_counts['high']} customers are at high churn risk.",
        f"Top issues include: {', '.join(sorted(issue_counts, key=issue_counts.get, reverse=True)[:3])}."
    ]

    recommendation = (
        "Focus on proactive outreach for high-risk clients, resolve the top 2 complaint topics,"
        " and monitor medium-risk customers with automated engagement campaigns."
    )

    return AggregateRetentionInsightResponse(
        summary=InsightSummary(
            key_findings=key_findings,
            dominant_issues=issue_counts,
            churn_risk_distribution=risk_counts,
            strategic_recommendation=recommendation
        )
    )

apis/analyze_complaint_topics.py

# apis/analyze_complaint_topics.py

import practicuscore as prt
from pydantic import BaseModel
from typing import List, Dict
from collections import Counter
import re


class Complaint(BaseModel):
    customer_id: str
    """Customer's unique identifier."""

    complaint_text: str
    """Free-form text describing a customer's complaint."""

    model_config = {
        "use_attribute_docstrings": True,
        "json_schema_extra": {
            "examples": [
                {"customer_id": "C001", "complaint_text": "Service was too slow and no one responded to my emails."},
                {"customer_id": "C002", "complaint_text": "Billing is incorrect for the last two months!"}
            ]
        }
    }


class ComplaintTopic(BaseModel):
    topic: str
    """Identified complaint topic keyword or phrase."""

    count: int
    """Number of times the topic was detected."""

    model_config = {
        "use_attribute_docstrings": True,
        "json_schema_extra": {
            "examples": [{"topic": "billing", "count": 5}, {"topic": "support", "count": 3}]
        }
    }


class AnalyzeComplaintTopicsRequest(BaseModel):
    complaints: List[Complaint]
    """List of raw customer complaints."""

    model_config = {
        "use_attribute_docstrings": True,
        "json_schema_extra": {
            "examples": [
                {
                    "complaints": [
                        {"customer_id": "C001", "complaint_text": "Your mobile app keeps crashing."},
                        {"customer_id": "C002", "complaint_text": "The delivery was delayed twice."}
                    ]
                }
            ]
        }
    }


class AnalyzeComplaintTopicsResponse(BaseModel):
    topics: List[ComplaintTopic]
    """Most frequent complaint topics extracted from the text corpus."""

    model_config = {
        "use_attribute_docstrings": True,
        "json_schema_extra": {
            "examples": {
                "topics": [
                    {"topic": "app crash", "count": 4},
                    {"topic": "delivery delay", "count": 3}
                ]
            }
        }
    }


api_spec = prt.APISpec(
    execution_target=prt.APIExecutionTarget.DirectExecution,
    read_only=True,
    risk_profile=prt.APIRiskProfile.Low
)


@prt.api("/analyze-complaint-topics", spec=api_spec)
async def run(payload: AnalyzeComplaintTopicsRequest, **kwargs) -> AnalyzeComplaintTopicsResponse:
    """
    Extracts the most common complaint topics from free-text customer complaints.
    """

    topic_keywords = ["billing", "delivery", "support", "login", "app", "refund", "delay", "payment", "email", "cancel"]
    topic_counter = Counter()

    for complaint in payload.complaints:
        text = complaint.complaint_text.lower()
        for keyword in topic_keywords:
            if re.search(rf"\b{re.escape(keyword)}\b", text):
                topic_counter[keyword] += 1

    topics = [ComplaintTopic(topic=k, count=v) for k, v in topic_counter.most_common()]

    return AnalyzeComplaintTopicsResponse(topics=topics)

apis/analyze_sentiment_trends.py

# apis/analyze_sentiment_trends.py

import practicuscore as prt
from pydantic import BaseModel
from typing import List, Dict
from collections import defaultdict
import pandas as pd


class CustomerInteraction(BaseModel):
    customer_id: str
    """Unique identifier for the customer."""

    interaction_date: str
    """Date of the customer interaction."""

    channel: str
    """Communication channel (e.g. Email, Phone, Chat)."""

    sentiment_score: float
    """Sentiment score of the interaction (-1 to 1)."""

    issue_type: str
    """Category of the customer's issue or concern."""

    model_config = {
        "use_attribute_docstrings": True,
        "json_schema_extra": {
            "examples": [
                {
                    "customer_id": "C12345",
                    "interaction_date": "2024-05-10",
                    "channel": "Email",
                    "sentiment_score": -0.45,
                    "issue_type": "Billing"
                }
            ]
        },
    }


class AnalyzeSentimentTrendsRequest(BaseModel):
    interactions: List[CustomerInteraction]
    """List of customer interaction records."""

    model_config = {
        "use_attribute_docstrings": True,
        "json_schema_extra": {
            "examples": [
                {
                    "interactions": [
                        {
                            "customer_id": "C12345",
                            "interaction_date": "2024-05-10",
                            "channel": "Email",
                            "sentiment_score": -0.45,
                            "issue_type": "Billing"
                        }
                    ]
                }
            ]
        },
    }


class SentimentTrend(BaseModel):
    period: str
    """Time period label (e.g., '2024-05')."""

    average_sentiment: float
    """Average sentiment score for that period."""


class AnalyzeSentimentTrendsResponse(BaseModel):
    trends: List[SentimentTrend]
    """List of sentiment trend entries across time."""

    model_config = {
        "use_attribute_docstrings": True,
        "json_schema_extra": {
            "examples": [
                {
                    "trends": [
                        {"period": "2024-05", "average_sentiment": -0.21}
                    ]
                }
            ]
        },
    }


api_spec = prt.APISpec(
    execution_target=prt.APIExecutionTarget.DirectExecution,
    read_only=True,
    risk_profile=prt.APIRiskProfile.Low,
)


@prt.api("/analyze-sentiment-trends", spec=api_spec)
async def run(payload: AnalyzeSentimentTrendsRequest, **kwargs) -> AnalyzeSentimentTrendsResponse:
    """
    Analyze sentiment trends over time by grouping customer interactions monthly
    and computing the average sentiment score per period.
    """
    df = pd.DataFrame([i.model_dump() for i in payload.interactions])
    df["interaction_date"] = pd.to_datetime(df["interaction_date"])
    df["period"] = df["interaction_date"].dt.to_period("M").astype(str)

    trend_df = df.groupby("period")["sentiment_score"].mean().reset_index()
    trend_df.columns = ["period", "average_sentiment"]

    trends = [
        SentimentTrend(period=row["period"], average_sentiment=row["average_sentiment"])
        for _, row in trend_df.iterrows()
    ]

    return AnalyzeSentimentTrendsResponse(trends=trends)

apis/detect_negative_sentiment_clusters.py

# apis/detect_negative_sentiment_clusters.py

import practicuscore as prt
from pydantic import BaseModel
from typing import List, Optional
import pandas as pd


class CustomerInteraction(BaseModel):
    customer_id: str
    """Unique identifier for the customer."""

    interaction_date: str
    """Date of the customer interaction."""

    channel: str
    """Communication channel used (e.g., Phone, Email, Chat)."""

    sentiment_score: Optional[float] = None
    """Sentiment score of the message (-1 = negative, 1 = positive)."""

    issue_type: str
    """Type or category of the customer issue (e.g., 'Delivery', 'Billing')."""

    model_config = {
        "use_attribute_docstrings": True,
        "json_schema_extra": {
            "examples": [
                {
                    "customer_id": "C12345",
                    "interaction_date": "2024-05-10",
                    "channel": "Chat",
                    "sentiment_score": -0.9,
                    "issue_type": "Technical"
                }
            ]
        },
    }


class DetectNegativeClustersRequest(BaseModel):
    interactions: List[CustomerInteraction]
    """List of past customer interaction records."""

    model_config = {
        "use_attribute_docstrings": True,
        "json_schema_extra": {
            "examples": [
                {
                    "interactions": [
                        {
                            "customer_id": "C12345",
                            "interaction_date": "2024-05-10",
                            "channel": "Chat",
                            "sentiment_score": -0.9,
                            "issue_type": "Technical"
                        }
                    ]
                }
            ]
        },
    }


class SentimentCluster(BaseModel):
    time_window: str
    """Time window (e.g., day or week) in which a negative sentiment spike occurred."""

    channel: str
    """The communication channel where the spike was observed."""

    avg_sentiment: float
    """Average sentiment score in this cluster."""

    interaction_count: int
    """Number of negative interactions detected."""

    model_config = {
        "use_attribute_docstrings": True,
        "json_schema_extra": {
            "examples": [
                {
                    "time_window": "2024-05-10",
                    "channel": "Chat",
                    "avg_sentiment": -0.85,
                    "interaction_count": 17
                }
            ]
        },
    }


class DetectNegativeClustersResponse(BaseModel):
    clusters: List[SentimentCluster]
    """List of negative sentiment clusters grouped by time and channel."""

    model_config = {
        "use_attribute_docstrings": True,
        "json_schema_extra": {
            "examples": [
                {
                    "clusters": [
                        {
                            "time_window": "2024-05-10",
                            "channel": "Chat",
                            "avg_sentiment": -0.85,
                            "interaction_count": 17
                        }
                    ]
                }
            ]
        },
    }


api_spec = prt.APISpec(
    execution_target=prt.APIExecutionTarget.DirectExecution,
    read_only=True,
    risk_profile=prt.APIRiskProfile.Medium,
)


@prt.api("/detect-negative-sentiment-clusters", spec=api_spec)
async def run(payload: DetectNegativeClustersRequest, **kwargs) -> DetectNegativeClustersResponse:
    """
    Detects clusters of interactions with highly negative sentiment
    by grouping by day and communication channel.
    This helps identify periods of customer dissatisfaction spikes.
    """
    df = pd.DataFrame([i.model_dump() for i in payload.interactions])

    df["interaction_date"] = pd.to_datetime(df["interaction_date"])
    df["date"] = df["interaction_date"].dt.date

    # Filter out rows with null sentiment_score
    df = df[df["sentiment_score"].notnull()]

    # Filter for strongly negative interactions
    negative_df = df[df["sentiment_score"] < -0.6]

    grouped = negative_df.groupby(["date", "channel"]).agg(
        avg_sentiment=("sentiment_score", "mean"),
        interaction_count=("sentiment_score", "count")
    ).reset_index()

    clusters = [
        SentimentCluster(
            time_window=str(row["date"]),
            channel=row["channel"],
            avg_sentiment=round(row["avg_sentiment"], 2),
            interaction_count=row["interaction_count"]
        )
        for _, row in grouped.iterrows()
    ]

    return DetectNegativeClustersResponse(clusters=clusters)

apis/generate_customer_summary.py

# apis/generate_customer_summary.py

import practicuscore as prt
from pydantic import BaseModel
from typing import List, Optional


class CustomerSignal(BaseModel):
    customer_id: str
    """Unique identifier for the customer."""

    loyalty_score: Optional[float]
    """Predicted loyalty score based on engagement and sentiment."""

    churn_risk: Optional[str]
    """Level of churn risk: low, medium, or high."""

    dominant_complaint_topic: Optional[str]
    """Most frequently detected complaint topic, if any."""

    feedback_sentiment: Optional[str]
    """Overall sentiment from feedback data (e.g., positive, neutral, negative)."""

    model_config = {
        "use_attribute_docstrings": True,
        "json_schema_extra": {
            "examples": [
                {
                    "customer_id": "C021",
                    "loyalty_score": 0.82,
                    "churn_risk": "low",
                    "dominant_complaint_topic": "billing",
                    "feedback_sentiment": "positive"
                }
            ]
        }
    }


class GenerateCustomerSummaryRequest(BaseModel):
    signals: List[CustomerSignal]
    """Customer-level prediction outputs and behavioral insights."""

    model_config = {
        "use_attribute_docstrings": True,
        "json_schema_extra": {
            "examples": [
                {
                    "signals": [
                        {
                            "customer_id": "C021",
                            "loyalty_score": 0.82,
                            "churn_risk": "low",
                            "dominant_complaint_topic": "billing",
                            "feedback_sentiment": "positive"
                        }
                    ]
                }
            ]
        }
    }


class CustomerSummary(BaseModel):
    customer_id: str
    """The customer the summary is about."""

    summary: str
    """Plain English summary of customer's status and risk profile."""

    model_config = {
        "use_attribute_docstrings": True,
        "json_schema_extra": {
            "examples": [
                {
                    "customer_id": "C021",
                    "summary": "Customer C021 has loyalty score of 0.82, low churn risk, positive recent feedback, notable complaints around billing."
                }
            ]
        }
    }


class GenerateCustomerSummaryResponse(BaseModel):
    summaries: List[CustomerSummary]
    """List of summaries for each customer."""

    model_config = {
        "use_attribute_docstrings": True,
        "json_schema_extra": {
            "examples": [
                {
                    "summaries": [
                        {
                            "customer_id": "C021",
                            "summary": "Customer C021 has loyalty score of 0.82, low churn risk, positive recent feedback, notable complaints around billing."
                        }
                    ]
                }
            ]
        }
    }


api_spec = prt.APISpec(
    execution_target=prt.APIExecutionTarget.DirectExecution,
    read_only=True,
    risk_profile=prt.APIRiskProfile.Medium,
    interactive=True
)


@prt.api("/generate-customer-summary", spec=api_spec)
async def run(payload: GenerateCustomerSummaryRequest, **kwargs) -> GenerateCustomerSummaryResponse:
    """
    Generates plain English summaries of individual customers based on risk and engagement data.
    """

    summaries = []

    for signal in payload.signals:
        parts = []

        if signal.loyalty_score is not None:
            parts.append(f"loyalty score of {signal.loyalty_score:.2f}")
        if signal.churn_risk:
            parts.append(f"{signal.churn_risk} churn risk")
        if signal.feedback_sentiment:
            parts.append(f"{signal.feedback_sentiment} recent feedback")
        if signal.dominant_complaint_topic:
            parts.append(f"notable complaints around {signal.dominant_complaint_topic}")

        summary = f"Customer {signal.customer_id} has " + ", ".join(parts) + "."
        summaries.append(CustomerSummary(customer_id=signal.customer_id, summary=summary))

    return GenerateCustomerSummaryResponse(summaries=summaries)

apis/generate_retention_plan.py

# generate_retention_plan.py

import practicuscore as prt
from pydantic import BaseModel
from typing import List, Optional


class RetentionTarget(BaseModel):
    customer_id: str
    """ID of the customer who is at churn risk."""

    churn_risk: str
    """Level of churn risk: low, medium, or high."""

    loyalty_score: Optional[float]
    """Predicted loyalty score from the model."""

    complaint_topic: Optional[str]
    """Dominant complaint topic extracted from feedbacks."""

    sentiment: Optional[str]
    """Overall customer sentiment, if known."""

    model_config = {
        "use_attribute_docstrings": True
    }


class RetentionPlan(BaseModel):
    customer_id: str
    """ID of the customer for whom the plan is generated."""

    plan_summary: str
    """Natural language recommendation plan to retain the customer."""

    model_config = {
        "use_attribute_docstrings": True
    }


class GenerateRetentionPlanResponse(BaseModel):
    plans: List[RetentionPlan]
    """Generated action plans to reduce churn likelihood per customer."""

    model_config = {
        "use_attribute_docstrings": True,
        "json_schema_extra": {
            "examples": [
                {
                    "plans": [
                        {
                            "customer_id": "C033",
                            "plan_summary": "Customer C033 exhibits high churn risk due to support delays and negative sentiment. Recommend assigning a dedicated account manager, offering priority support, and a goodwill credit to rebuild trust."
                        }
                    ]
                }
            ]
        }
    }


api_spec = prt.APISpec(
    execution_target=prt.APIExecutionTarget.DirectExecution,
    read_only=True,
    interactive=True,
    risk_profile=prt.APIRiskProfile.Medium
)


@prt.api("/generate-retention-plan", spec=api_spec)
async def run(targets: List[RetentionTarget], **kwargs) -> GenerateRetentionPlanResponse:
    """
    Creates LLM-generated retention plans based on churn risk and behavior signals.
    """

    plans = []

    for target in targets:
        recs = []

        if target.churn_risk == "high":
            recs.append("assign a dedicated account manager")
        if target.complaint_topic:
            recs.append(f"resolve recent issues around {target.complaint_topic}")
        if target.sentiment == "negative":
            recs.append("provide a goodwill gesture such as a discount or credit")
        if target.loyalty_score is not None and target.loyalty_score < 0.5:
            recs.append("initiate re-engagement campaign with personalized offers")

        plan = (
            f"Customer {target.customer_id} exhibits {target.churn_risk} churn risk"
        )
        if target.complaint_topic or target.sentiment:
            plan += f" possibly due to {target.complaint_topic or target.sentiment}"
        plan += f". Recommend to " + ", ".join(recs) + "."

        plans.append(RetentionPlan(customer_id=target.customer_id, plan_summary=plan))

    return GenerateRetentionPlanResponse(plans=plans)

apis/generate_retention_risk_scores.py

# apis/generate_retention_risk_scores.py

import practicuscore as prt
from pydantic import BaseModel
from typing import List
import pandas as pd


class CustomerInteraction(BaseModel):
    customer_id: str
    """Unique identifier for the customer."""

    interaction_date: str
    """Date of the last interaction."""

    avg_sentiment: float
    """Average sentiment score across interactions."""

    total_complaints: int
    """Total number of complaints filed by the customer."""

    avg_response_time_minutes: float
    """Average response time from support team in minutes."""

    issue_resolution_rate: float
    """Percentage of resolved issues (0 to 1)."""

    model_config = {
        "use_attribute_docstrings": True,
        "json_schema_extra": {
            "examples": [
                {
                    "customer_id": "C12345",
                    "interaction_date": "2024-05-10",
                    "avg_sentiment": -0.4,
                    "total_complaints": 3,
                    "avg_response_time_minutes": 75.0,
                    "issue_resolution_rate": 0.6
                }
            ]
        },
    }


class GenerateRiskScoresRequest(BaseModel):
    interactions: List[CustomerInteraction]
    """List of summarized interaction data per customer."""

    model_config = {
        "use_attribute_docstrings": True,
        "json_schema_extra": {
            "examples": [
                {
                    "interactions": [
                        {
                            "customer_id": "C12345",
                            "interaction_date": "2024-05-10",
                            "avg_sentiment": -0.4,
                            "total_complaints": 3,
                            "avg_response_time_minutes": 75.0,
                            "issue_resolution_rate": 0.6
                        }
                    ]
                }
            ]
        },
    }


class RiskScore(BaseModel):
    customer_id: str
    """Customer ID for which the risk is calculated."""

    risk_score: float
    """Churn risk score between 0 (low) and 1 (high)."""

    risk_level: str
    """Categorical level: Low, Medium, High."""

    model_config = {
        "use_attribute_docstrings": True,
        "json_schema_extra": {
            "examples": [
                {
                    "customer_id": "C12345",
                    "risk_score": 0.78,
                    "risk_level": "High"
                }
            ]
        },
    }


class GenerateRiskScoresResponse(BaseModel):
    risk_scores: List[RiskScore]
    """List of churn risk scores per customer."""

    model_config = {
        "use_attribute_docstrings": True,
        "json_schema_extra": {
            "examples": [
                {
                    "risk_scores": [
                        {
                            "customer_id": "C12345",
                            "risk_score": 0.78,
                            "risk_level": "High"
                        }
                    ]
                }
            ]
        },
    }


api_spec = prt.APISpec(
    execution_target=prt.APIExecutionTarget.DirectExecution,
    read_only=True,
    interactive=True,
    risk_profile=prt.APIRiskProfile.Low,
)


@prt.api("/generate-retention-risk-scores", spec=api_spec)
async def run(payload: GenerateRiskScoresRequest, **kwargs) -> GenerateRiskScoresResponse:
    """
    Calculates a churn risk score for each customer based on interaction sentiment,
    complaints, response time, and resolution rate.
    """
    df = pd.DataFrame([i.model_dump() for i in payload.interactions])

    # Normalize all fields between 0-1
    df["sentiment_score"] = (1 - (df["avg_sentiment"] + 1) / 2)  # more negative = higher risk
    df["complaint_score"] = df["total_complaints"] / df["total_complaints"].max()
    df["response_time_score"] = df["avg_response_time_minutes"] / df["avg_response_time_minutes"].max()
    df["resolution_score"] = 1 - df["issue_resolution_rate"]

    df["risk_score"] = (
        0.3 * df["sentiment_score"] +
        0.2 * df["complaint_score"] +
        0.2 * df["response_time_score"] +
        0.3 * df["resolution_score"]
    ).clip(0, 1)

    def categorize(score):
        if score >= 0.7:
            return "High"
        elif score >= 0.4:
            return "Medium"
        else:
            return "Low"

    results = [
        RiskScore(
            customer_id=row["customer_id"],
            risk_score=round(row["risk_score"], 2),
            risk_level=categorize(row["risk_score"])
        )
        for _, row in df.iterrows()
    ]

    return GenerateRiskScoresResponse(risk_scores=results)

apis/load_customer_interaction_data.py

# apis/load_customer_interaction_data.py

import practicuscore as prt
import pandas as pd
from pydantic import BaseModel
from typing import List, Optional
import os


class CustomerInteraction(BaseModel):
    customer_id: str
    """Unique identifier for the customer."""

    interaction_date: str
    """Date of the customer interaction (format: YYYY-MM-DD)."""

    channel: str
    """Communication channel used during the interaction (e.g. Email, Phone, Chat)."""

    sentiment_score: Optional[float] = None
    """Sentiment score of the interaction, ranging from -1 (very negative) to 1 (very positive)."""

    issue_type: Optional[str] = None
    """Category or type of issue raised by the customer."""

    model_config = {
        "use_attribute_docstrings": True,
        "json_schema_extra": {
            "examples": [
                {
                    "customer_id": "C12345",
                    "interaction_date": "2024-05-10",
                    "channel": "Email",
                    "sentiment_score": -0.45,
                    "issue_type": "Billing"
                }
            ]
        },
    }


class LoadCustomerInteractionDataResponse(BaseModel):
    interactions: List[CustomerInteraction]
    """List of customer interaction records loaded from the source CSV file."""

    total_customers: int
    """Total number of unique customers found in the data."""

    total_interactions: int
    """Total number of interaction records in the dataset."""

    channels_detected: List[str]
    """List of unique interaction channels used by customers."""

    model_config = {
        "use_attribute_docstrings": True,
        "json_schema_extra": {
            "examples": [
                {
                    "interactions": [
                        {
                            "customer_id": "C12345",
                            "interaction_date": "2024-05-10",
                            "channel": "Email",
                            "sentiment_score": -0.45,
                            "issue_type": "Billing"
                        }
                    ],
                    "total_customers": 1,
                    "total_interactions": 1,
                    "channels_detected": ["Email"]
                }
            ]
        },
    }


api_spec = prt.APISpec(
    execution_target=prt.APIExecutionTarget.DirectExecution,
    read_only=True,
    risk_profile=prt.APIRiskProfile.Medium,
    interactive=False,
)


@prt.api("/load-customer-interaction-data", spec=api_spec)
async def run(**kwargs) -> LoadCustomerInteractionDataResponse:
    """Load customer interaction records from a fixed CSV path and return structured data for analysis."""
    file_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "../customer_interactions.csv"))
    df = pd.read_csv(file_path)

    interactions = [
        CustomerInteraction(
            customer_id=row.get("customer_id", f"UNKNOWN_{i}"),
            interaction_date=row.get("interaction_date", "1970-01-01"),
            channel=row.get("channel", "Unknown"),
            sentiment_score=row.get("sentiment_score", None),
            issue_type=row.get("issue_type", "Uncategorized")
        )
        for i, row in df.iterrows()
    ]

    return LoadCustomerInteractionDataResponse(
        interactions=interactions,
        total_customers=df["customer_id"].nunique(),
        total_interactions=len(df),
        channels_detected=df["channel"].dropna().unique().tolist()
    )

apis/summarize_customer_retention_case.py

# apis/summarize_customer_retention_case.py
import practicuscore as prt
from pydantic import BaseModel


class SummarizeCustomerRetentionCaseRequest(BaseModel):
    customer_id: str
    """Unique identifier of the customer."""

    model_config = {
        "use_attribute_docstrings": True,
        "json_schema_extra": {"examples": [{"customer_id": "C105"}]},
    }


class SummarizeCustomerRetentionCaseResponse(BaseModel):
    summary: str
    """High-level summary of the customer's retention case, including risks and suggested actions."""

    model_config = {
        "use_attribute_docstrings": True,
        "json_schema_extra": {
            "examples": [
                {
                    "summary": "Customer C105 has shown declining engagement and expressed dissatisfaction. Retention plan recommended."
                }
            ]
        },
    }


api_spec = prt.APISpec(
    execution_target=prt.APIExecutionTarget.DirectExecution,
    read_only=True,
    risk_profile=prt.APIRiskProfile.Low,
)


@prt.api("/summarize-customer-retention-case", spec=api_spec)
async def run(payload: SummarizeCustomerRetentionCaseRequest, **kwargs) -> SummarizeCustomerRetentionCaseResponse:
    """
    Summarizes the current retention case for a given customer, including behavioral patterns, sentiment signals,
    and recommended actions if relevant.
    """
    return SummarizeCustomerRetentionCaseResponse(
        summary=f"Customer {payload.customer_id} has shown declining engagement and expressed dissatisfaction. Retention plan recommended."
    )

apis/visualize_customer_journey_map.py

# apis/visualize_customer_journey_map.py

import practicuscore as prt
from pydantic import BaseModel, Field
from typing import List, Optional


class InteractionPoint(BaseModel):
    date: str = Field(..., description="Date of the customer interaction.")
    channel: str = Field(..., description="Communication channel used (e.g., email, phone, chat).")
    sentiment_score: float = Field(..., description="Sentiment score for the interaction, from -1 (negative) to 1 (positive).")
    action_taken: Optional[str] = Field(None, description="Optional description of any action taken by the company.")

class JourneyMap(BaseModel):
    customer_id: str = Field(..., description="Unique identifier of the customer.")
    timeline: List[InteractionPoint] = Field(..., description="Chronological list of interaction points for this customer.")
    risk_flag: Optional[str] = Field(None, description="Optional churn or escalation flag for this customer.")
    recommendation: Optional[str] = Field(None, description="Summary recommendation based on the journey.")

class VisualizeCustomerJourneyRequest(BaseModel):
    customer_id: str = Field(..., description="ID of the customer whose journey should be visualized.")

    model_config = {
        "json_schema_extra": {
            "example": {
                "customer_id": "C457"
            }
        }
    }

class VisualizeCustomerJourneyResponse(BaseModel):
    journey_map: JourneyMap = Field(..., description="Customer journey map with annotated interactions.")

    model_config = {
        "json_schema_extra": {
            "example": {
                "journey_map": {
                    "customer_id": "C457",
                    "timeline": [
                        {
                            "date": "2024-01-10",
                            "channel": "email",
                            "sentiment_score": -0.4,
                            "action_taken": "Apology email sent"
                        },
                        {
                            "date": "2024-01-15",
                            "channel": "phone",
                            "sentiment_score": 0.1,
                            "action_taken": "Escalated to supervisor"
                        },
                        {
                            "date": "2024-01-20",
                            "channel": "chat",
                            "sentiment_score": 0.7,
                            "action_taken": "Issue resolved"
                        }
                    ],
                    "risk_flag": "Escalation",
                    "recommendation": "Maintain proactive follow-up for next 30 days."
                }
            }
        }
    }


api_spec = prt.APISpec(
    execution_target=prt.APIExecutionTarget.DirectExecution,
    risk_profile=prt.APIRiskProfile.Medium,
    interactive=True
)


@prt.api("/visualize-customer-journey-map", spec=api_spec)
async def run(payload: VisualizeCustomerJourneyRequest) -> VisualizeCustomerJourneyResponse:
    """
    Create a structured journey map for a customer by compiling their chronological
    interaction data, sentiment trends, and key actions. This map can be used for
    visualization in dashboards or reports.

    Args:
        payload (VisualizeCustomerJourneyRequest): Contains the customer_id.

    Returns:
        VisualizeCustomerJourneyResponse: Timeline of interactions and recommendations.
    """
    # Dummy timeline for demo (in real case, fetch from DB or context)
    timeline = [
        InteractionPoint(
            date="2024-01-10",
            channel="email",
            sentiment_score=-0.4,
            action_taken="Apology email sent"
        ),
        InteractionPoint(
            date="2024-01-15",
            channel="phone",
            sentiment_score=0.1,
            action_taken="Escalated to supervisor"
        ),
        InteractionPoint(
            date="2024-01-20",
            channel="chat",
            sentiment_score=0.7,
            action_taken="Issue resolved"
        )
    ]

    return VisualizeCustomerJourneyResponse(
        journey_map=JourneyMap(
            customer_id=payload.customer_id,
            timeline=timeline,
            risk_flag="Escalation",
            recommendation="Maintain proactive follow-up for next 30 days."
        )
    )

Previous: Build | Next: Hr > Build