Preparation of Model File
The model.py file is a critical component when deploying a Large Language Model (LLM) in environments like Practicus AI. It encapsulates the logic for initializing the model, making predictions, and cleaning up resources. Below, I'll provide a detailed explanation of a model.py script designed for deploying an LLM, ensuring no step is overlooked.
Import Statements
sys: Used for interacting with the interpreter, including adding paths for Python to search for modules.
datetime: Facilitates recording timestamps, useful for performance monitoring.
Global Variables
generator: Holds the model instance. Initialized as None and later assigned the LLM object.
answers: Accumulates responses from the model. Initialized as an empty string and appended with each prediction.
Initialization Function
async def init(model_meta=None, *args, **kwargs):
global generator
if generator is not None:
print("generator exists, using")
return
print("generator is none, building")
llama_cache = "/var/practicus/cache"
if llama_cache not in sys.path:
sys.path.insert(0, llama_cache)
try:
from llama import Llama
except Exception as e:
raise ModuleNotFoundError("llama library not found. Have you included it in the object storage cache?") from e
try:
generator = Llama.build(
ckpt_dir=f"{llama_cache}/CodeLlama-7b-Instruct/",
tokenizer_path=f"{llama_cache}/CodeLlama-7b-Instruct/tokenizer.model",
max_seq_len=512,
max_batch_size=4,
model_parallel_size=1
)
except Exception as e:
print(f"Failed to build generator: {e}")
raise
This function prepares the environment for the model. It checks if the generator is already created to avoid redundant loads. If not, it attempts to import the LLaMA library and build the model with specified parameters. Errors in this process trigger exceptions, alerting you to potential issues with library presence or model construction.
Cleanup Function
async def cleanup(model_meta=None, *args, **kwargs):
print("Cleaning up memory")
global generator
generator = None
from torch import cuda
cuda.empty_cache()
This function is designed to free up resources once they're no longer needed, setting generator back to None and clearing the GPU memory cache to prevent memory leaks, crucial for maintaining performance.
Prediction Wrapper Function
async def predict(http_request, model_meta=None, payload_dict=None, *args, **kwargs):
await init(model_meta)
import threading
threads = []
count = int(payload_dict["count"])
thread_start = datetime.now()
for _ in range(count):
thread = threading.Thread(target=_predict)
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
print("Total finished in:", (datetime.now() - thread_start).total_seconds())
return {
"answer": f"Time:{(datetime.now() - thread_start).total_seconds()}\nanswers:{answers}"
}
This asynchronous function is the main entry for prediction requests. It ensures the model is initialized, then creates and manages multiple threads for concurrent prediction tasks. Each thread calls the _predict function to generate a response. The total execution time is calculated, providing insights into the performance.
Prediction Execution Function
def _predict(http_request=None, model_meta=None, payload_dict=None, *args, **kwargs):
start = datetime.now()
instructions = [[
{"role": "system", "content": ""},
{"role": "user", "content": "Capital of Turkey"}
]]
results = generator.chat_completion(
instructions,
max_gen_len=None,
temperature=0.2,
top_p=0.95,
)
answer = ""
for result in results:
answer += f"{result['generation']['content']}\n"
print("thread answer:", answer)
total_time = (datetime.now() - start).total_seconds()
print("thread answer in:", total_time)
global answers
answers += f"start:{start} end: {datetime.now()} time: {total_time} answer: {answer}\n"
_predict is called by each thread to perform the actual prediction task. It formats the instructions for the model, executes the prediction, and logs the response and execution time. The global answers string is appended with each prediction result for final aggregation.
Summary
This model.py script outlines a robust framework for deploying and interacting with a LLM in a scalable, asynchronous manner. It highlights essential practices like dynamic library loading, concurrent processing with threads, resource management, and detailed logging for performance monitoring. This setup is adaptable to various models and can be tailored to fit specific requirements of different LLM deployments.
Supplementary Files
model.json
{
"download_files_from": "cache/codellama-01/",
"_comment": "you can also define download_files_to otherwise, /var/practicus/cache is used"
}
model.py
import sys
from datetime import datetime
generator = None
answers = ""
async def init(model_meta=None, *args, **kwargs):
global generator
if generator is not None:
print("generator exists, using")
return
print("generator is none, building")
# Assuming llama library is copied into cache dir, in addition to torch .pth files
llama_cache = "/var/practicus/cache"
if llama_cache not in sys.path:
sys.path.insert(0, llama_cache)
try:
from llama import Llama
except Exception as e:
raise ModuleNotFoundError("llama library not found. Have you included it in the object storage cache?") from e
try:
generator = Llama.build(
ckpt_dir=f"{llama_cache}/CodeLlama-7b-Instruct/",
tokenizer_path=f"{llama_cache}/CodeLlama-7b-Instruct/tokenizer.model",
max_seq_len=512,
max_batch_size=4,
model_parallel_size=1
)
except:
building_generator = False
raise
async def cleanup(model_meta=None, *args, **kwargs):
print("Cleaning up memory")
global generator
generator = None
from torch import cuda
cuda.empty_cache()
def _predict(http_request=None, model_meta=None, payload_dict=None, *args, **kwargs):
start = datetime.now()
# instructions = [[
# {"role": "system", "content": payload_dict["system_context"]},
# {"role": "user", "content": payload_dict["user_prompt"]}
# ]]
instructions = [[
{"role": "system", "content": ""},
{"role": "user", "content": "Capital of Turkey"}
]]
results = generator.chat_completion(
instructions,
max_gen_len=None,
temperature=0.2,
top_p=0.95,
)
answer = ""
for result in results:
answer += f"{result['generation']['content']}\n"
print("thread answer:", answer)
total_time = (datetime.now() - start).total_seconds()
print("thread answer in:", total_time)
global answers
answers += f"start:{start} end: {datetime.now()} time: {total_time} answer: {answer}\n"
async def predict(http_request, model_meta=None, payload_dict=None, *args, **kwargs):
await init(model_meta)
import threading
threads = []
count = int(payload_dict["count"])
thread_start = datetime.now()
for _ in range(count):
thread = threading.Thread(target=_predict)
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
print("Total finished in:", (datetime.now() - thread_start).total_seconds())
return {
"answer": f"Time:{(datetime.now() - thread_start).total_seconds()}\nanswers:{answers}"
}
Previous: Upload Download | Next: Model Json