mirror of
https://github.com/nomic-ai/gpt4all.git
synced 2025-12-23 12:11:18 +00:00
transfer python bindings code
This commit is contained in:
2
gpt4all-bindings/python/gpt4all/__init__.py
Normal file
2
gpt4all-bindings/python/gpt4all/__init__.py
Normal file
@@ -0,0 +1,2 @@
|
||||
from .pyllmodel import LLModel # noqa
|
||||
from .gpt4all import GPT4All # noqa
|
||||
280
gpt4all-bindings/python/gpt4all/gpt4all.py
Normal file
280
gpt4all-bindings/python/gpt4all/gpt4all.py
Normal file
@@ -0,0 +1,280 @@
|
||||
"""
|
||||
Python only API for running all GPT4All models.
|
||||
"""
|
||||
import json
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Dict, List
|
||||
|
||||
import requests
|
||||
from tqdm import tqdm
|
||||
|
||||
from . import pyllmodel
|
||||
|
||||
# TODO: move to config
|
||||
DEFAULT_MODEL_DIRECTORY = os.path.join(str(Path.home()), ".cache", "gpt4all").replace("\\", "\\\\")
|
||||
|
||||
class GPT4All():
|
||||
"""Python API for retrieving and interacting with GPT4All models
|
||||
|
||||
Attribuies:
|
||||
model: Pointer to underlying C model.
|
||||
"""
|
||||
|
||||
def __init__(self, model_name: str, model_path: str = None, model_type: str = None, allow_download=True):
|
||||
"""
|
||||
Constructor
|
||||
|
||||
Args:
|
||||
model_name: Name of GPT4All or custom model. Including ".bin" file extension is optional but encouraged.
|
||||
model_path: Path to directory containing model file or, if file does not exist, where to download model.
|
||||
Default is None, in which case models will be stored in `~/.cache/gpt4all/`.
|
||||
model_type: Model architecture to use - currently, only options are 'llama' or 'gptj'. Only required if model
|
||||
is custom. Note that these models still must be built from llama.cpp or GPTJ ggml architecture.
|
||||
Default is None.
|
||||
allow_download: Allow API to download models from gpt4all.io. Default is True.
|
||||
"""
|
||||
self.model = None
|
||||
|
||||
# Model type provided for when model is custom
|
||||
if model_type:
|
||||
self.model = GPT4All.get_model_from_type(model_type)
|
||||
# Else get model from gpt4all model filenames
|
||||
else:
|
||||
self.model = GPT4All.get_model_from_name(model_name)
|
||||
|
||||
# Retrieve model and download if allowed
|
||||
model_dest = self.retrieve_model(model_name, model_path=model_path, allow_download=allow_download)
|
||||
self.model.load_model(model_dest)
|
||||
|
||||
@staticmethod
|
||||
def list_models():
|
||||
"""
|
||||
Fetch model list from https://gpt4all.io/models/models.json
|
||||
|
||||
Returns:
|
||||
Model list in JSON format.
|
||||
"""
|
||||
response = requests.get("https://gpt4all.io/models/models.json")
|
||||
model_json = json.loads(response.content)
|
||||
return model_json
|
||||
|
||||
@staticmethod
|
||||
def retrieve_model(model_name: str, model_path: str = None, allow_download = True):
|
||||
"""
|
||||
Find model file, and if it doesn't exist, download the model.
|
||||
|
||||
Args:
|
||||
model_name: Name of model.
|
||||
model_path: Path to find model. Default is None in which case path is set to
|
||||
~/.cache/gpt4all/.
|
||||
allow_download: Allow API to download model from gpt4all.io. Default is True.
|
||||
|
||||
Returns:
|
||||
Model file destination.
|
||||
"""
|
||||
model_path = model_path.replace("\\", "\\\\")
|
||||
model_filename = model_name
|
||||
if ".bin" not in model_filename:
|
||||
model_filename += ".bin"
|
||||
|
||||
# Validate download directory
|
||||
if model_path == None:
|
||||
model_path = DEFAULT_MODEL_DIRECTORY
|
||||
if not os.path.exists(DEFAULT_MODEL_DIRECTORY):
|
||||
try:
|
||||
os.makedirs(DEFAULT_MODEL_DIRECTORY)
|
||||
except:
|
||||
raise ValueError("Failed to create model download directory at ~/.cache/gpt4all/. \
|
||||
Please specify download_dir.")
|
||||
|
||||
if os.path.exists(model_path):
|
||||
model_dest = os.path.join(model_path, model_filename).replace("\\", "\\\\")
|
||||
if os.path.exists(model_dest):
|
||||
print("Found model file.")
|
||||
return model_dest
|
||||
|
||||
# If model file does not exist, download
|
||||
elif allow_download:
|
||||
# Make sure valid model filename before attempting download
|
||||
model_match = False
|
||||
for item in GPT4All.list_models():
|
||||
if model_filename == item["filename"]:
|
||||
model_match = True
|
||||
break
|
||||
if not model_match:
|
||||
raise ValueError(f"Model filename not in model list: {model_filename}")
|
||||
return GPT4All.download_model(model_filename, model_path)
|
||||
else:
|
||||
raise ValueError("Failed to retrieve model")
|
||||
else:
|
||||
raise ValueError("Invalid model directory")
|
||||
|
||||
@staticmethod
|
||||
def download_model(model_filename, model_path):
|
||||
def get_download_url(model_filename):
|
||||
return f"https://gpt4all.io/models/{model_filename}"
|
||||
|
||||
# Download model
|
||||
download_path = os.path.join(model_path, model_filename).replace("\\", "\\\\")
|
||||
download_url = get_download_url(model_filename)
|
||||
|
||||
response = requests.get(download_url, stream=True)
|
||||
total_size_in_bytes = int(response.headers.get("content-length", 0))
|
||||
block_size = 1048576 # 1 MB
|
||||
progress_bar = tqdm(total=total_size_in_bytes, unit="iB", unit_scale=True)
|
||||
with open(download_path, "wb") as file:
|
||||
for data in response.iter_content(block_size):
|
||||
progress_bar.update(len(data))
|
||||
file.write(data)
|
||||
progress_bar.close()
|
||||
|
||||
# Validate download was successful
|
||||
if total_size_in_bytes != 0 and progress_bar.n != total_size_in_bytes:
|
||||
raise RuntimeError(
|
||||
"An error occurred during download. Downloaded file may not work."
|
||||
)
|
||||
|
||||
print("Model downloaded at: " + download_path)
|
||||
return download_path
|
||||
|
||||
def generate(self, prompt: str, **generate_kwargs):
|
||||
"""
|
||||
Surfaced method of running generate without accessing model object.
|
||||
"""
|
||||
return self.model.generate(prompt, **generate_kwargs)
|
||||
|
||||
def chat_completion(self,
|
||||
messages: List[Dict],
|
||||
default_prompt_header: bool = True,
|
||||
default_prompt_footer: bool = True,
|
||||
verbose: bool = True) -> str:
|
||||
"""
|
||||
Format list of message dictionaries into a prompt and call model
|
||||
generate on prompt. Returns a response dictionary with metadata and
|
||||
generated content.
|
||||
|
||||
Args:
|
||||
messages: Each dictionary should have a "role" key
|
||||
with value of "system", "assistant", or "user" and a "content" key with a
|
||||
string value. Messages are organized such that "system" messages are at top of prompt,
|
||||
and "user" and "assistant" messages are displayed in order. Assistant messages get formatted as
|
||||
"Reponse: {content}".
|
||||
default_prompt_header: If True (default), add default prompt header after any user specified system messages and
|
||||
before user/assistant messages.
|
||||
default_prompt_footer: If True (default), add default footer at end of prompt.
|
||||
verbose: If True (default), print full prompt and generated response.
|
||||
|
||||
Returns:
|
||||
Response dictionary with:
|
||||
"model": name of model.
|
||||
"usage": a dictionary with number of full prompt tokens, number of
|
||||
generated tokens in response, and total tokens.
|
||||
"choices": List of message dictionary where "content" is generated response and "role" is set
|
||||
as "assistant". Right now, only one choice is returned by model.
|
||||
|
||||
"""
|
||||
|
||||
full_prompt = self._build_prompt(messages,
|
||||
default_prompt_header=default_prompt_header,
|
||||
default_prompt_footer=default_prompt_footer)
|
||||
|
||||
if verbose:
|
||||
print(full_prompt)
|
||||
|
||||
response = self.model.generate(full_prompt)
|
||||
|
||||
if verbose:
|
||||
print(response)
|
||||
|
||||
response_dict = {
|
||||
"model": self.model.model_name,
|
||||
"usage": {"prompt_tokens": len(full_prompt),
|
||||
"completion_tokens": len(response),
|
||||
"total_tokens" : len(full_prompt) + len(response)},
|
||||
"choices": [
|
||||
{
|
||||
"message": {
|
||||
"role": "assistant",
|
||||
"content": response
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
return response_dict
|
||||
|
||||
@staticmethod
|
||||
def _build_prompt(messages: List[Dict],
|
||||
default_prompt_header=True,
|
||||
default_prompt_footer=False) -> str:
|
||||
full_prompt = ""
|
||||
|
||||
for message in messages:
|
||||
if message["role"] == "system":
|
||||
system_message = message["content"] + "\n"
|
||||
full_prompt += system_message
|
||||
|
||||
if default_prompt_header:
|
||||
full_prompt += """### Instruction:
|
||||
The prompt below is a question to answer, a task to complete, or a conversation
|
||||
to respond to; decide which and write an appropriate response.
|
||||
\n### Prompt: """
|
||||
|
||||
for message in messages:
|
||||
if message["role"] == "user":
|
||||
user_message = "\n" + message["content"]
|
||||
full_prompt += user_message
|
||||
if message["role"] == "assistant":
|
||||
assistant_message = "\n### Response: " + message["content"]
|
||||
full_prompt += assistant_message
|
||||
|
||||
if default_prompt_footer:
|
||||
full_prompt += "\n### Response:"
|
||||
|
||||
return full_prompt
|
||||
|
||||
@staticmethod
|
||||
def get_model_from_type(model_type: str) -> pyllmodel.LLModel:
|
||||
# This needs to be updated for each new model
|
||||
# TODO: Might be worth converting model_type to enum
|
||||
|
||||
if model_type == "gptj":
|
||||
return pyllmodel.GPTJModel()
|
||||
elif model_type == "llama":
|
||||
return pyllmodel.LlamaModel()
|
||||
else:
|
||||
raise ValueError(f"No corresponding model for model_type: {model_type}")
|
||||
|
||||
@staticmethod
|
||||
def get_model_from_name(model_name: str) -> pyllmodel.LLModel:
|
||||
# This needs to be updated for each new model
|
||||
|
||||
# NOTE: We are doing this preprocessing a lot, maybe there's a better way to organize
|
||||
if ".bin" not in model_name:
|
||||
model_name += ".bin"
|
||||
|
||||
GPTJ_MODELS = [
|
||||
"ggml-gpt4all-j-v1.3-groovy.bin",
|
||||
"ggml-gpt4all-j-v1.2-jazzy.bin",
|
||||
"ggml-gpt4all-j-v1.1-breezy.bin",
|
||||
"ggml-gpt4all-j.bin"
|
||||
]
|
||||
|
||||
LLAMA_MODELS = [
|
||||
"ggml-gpt4all-l13b-snoozy.bin",
|
||||
"ggml-vicuna-7b-1.1-q4_2.bin",
|
||||
"ggml-vicuna-13b-1.1-q4_2.bin",
|
||||
"ggml-wizardLM-7B.q4_2.bin",
|
||||
"ggml-stable-vicuna-13B.q4_2.bin"
|
||||
]
|
||||
|
||||
if model_name in GPTJ_MODELS:
|
||||
return pyllmodel.GPTJModel()
|
||||
elif model_name in LLAMA_MODELS:
|
||||
return pyllmodel.LlamaModel()
|
||||
else:
|
||||
err_msg = f"""No corresponding model for provided filename {model_name}.
|
||||
If this is a custom model, make sure to specify a valid model_type.
|
||||
"""
|
||||
raise ValueError(err_msg)
|
||||
241
gpt4all-bindings/python/gpt4all/pyllmodel.py
Normal file
241
gpt4all-bindings/python/gpt4all/pyllmodel.py
Normal file
@@ -0,0 +1,241 @@
|
||||
from io import StringIO
|
||||
import pkg_resources
|
||||
import ctypes
|
||||
import os
|
||||
import platform
|
||||
import re
|
||||
import sys
|
||||
|
||||
# TODO: provide a config file to make this more robust
|
||||
LLMODEL_PATH = os.path.join("llmodel_DO_NOT_MODIFY", "build")
|
||||
|
||||
def load_llmodel_library():
|
||||
system = platform.system()
|
||||
|
||||
def get_c_shared_lib_extension():
|
||||
if system == "Darwin":
|
||||
return "dylib"
|
||||
elif system == "Linux":
|
||||
return "so"
|
||||
elif system == "Windows":
|
||||
return "dll"
|
||||
else:
|
||||
raise Exception("Operating System not supported")
|
||||
|
||||
c_lib_ext = get_c_shared_lib_extension()
|
||||
|
||||
llmodel_file = "libllmodel" + '.' + c_lib_ext
|
||||
llama_file = "libllama" + '.' + c_lib_ext
|
||||
llama_dir = str(pkg_resources.resource_filename('gpt4all', os.path.join(LLMODEL_PATH, llama_file)))
|
||||
llmodel_dir = str(pkg_resources.resource_filename('gpt4all', os.path.join(LLMODEL_PATH, llmodel_file)))
|
||||
|
||||
# For windows
|
||||
llama_dir = llama_dir.replace("\\", "\\\\")
|
||||
print(llama_dir)
|
||||
llmodel_dir = llmodel_dir.replace("\\", "\\\\")
|
||||
print(llmodel_dir)
|
||||
|
||||
llama_lib = ctypes.CDLL(llama_dir, mode=ctypes.RTLD_GLOBAL)
|
||||
llmodel_lib = ctypes.CDLL(llmodel_dir)
|
||||
|
||||
return llmodel_lib, llama_lib
|
||||
|
||||
|
||||
llmodel, llama = load_llmodel_library()
|
||||
|
||||
# Define C function signatures using ctypes
|
||||
llmodel.llmodel_gptj_create.restype = ctypes.c_void_p
|
||||
llmodel.llmodel_gptj_destroy.argtypes = [ctypes.c_void_p]
|
||||
llmodel.llmodel_llama_create.restype = ctypes.c_void_p
|
||||
llmodel.llmodel_llama_destroy.argtypes = [ctypes.c_void_p]
|
||||
|
||||
llmodel.llmodel_loadModel.argtypes = [ctypes.c_void_p, ctypes.c_char_p]
|
||||
llmodel.llmodel_loadModel.restype = ctypes.c_bool
|
||||
llmodel.llmodel_isModelLoaded.argtypes = [ctypes.c_void_p]
|
||||
llmodel.llmodel_isModelLoaded.restype = ctypes.c_bool
|
||||
|
||||
class LLModelPromptContext(ctypes.Structure):
|
||||
_fields_ = [("logits", ctypes.POINTER(ctypes.c_float)),
|
||||
("logits_size", ctypes.c_size_t),
|
||||
("tokens", ctypes.POINTER(ctypes.c_int32)),
|
||||
("tokens_size", ctypes.c_size_t),
|
||||
("n_past", ctypes.c_int32),
|
||||
("n_ctx", ctypes.c_int32),
|
||||
("n_predict", ctypes.c_int32),
|
||||
("top_k", ctypes.c_int32),
|
||||
("top_p", ctypes.c_float),
|
||||
("temp", ctypes.c_float),
|
||||
("n_batch", ctypes.c_int32),
|
||||
("repeat_penalty", ctypes.c_float),
|
||||
("repeat_last_n", ctypes.c_int32),
|
||||
("context_erase", ctypes.c_float)]
|
||||
|
||||
ResponseCallback = ctypes.CFUNCTYPE(ctypes.c_bool, ctypes.c_int32, ctypes.c_char_p)
|
||||
RecalculateCallback = ctypes.CFUNCTYPE(ctypes.c_bool, ctypes.c_bool)
|
||||
|
||||
llmodel.llmodel_prompt.argtypes = [ctypes.c_void_p,
|
||||
ctypes.c_char_p,
|
||||
ResponseCallback,
|
||||
ResponseCallback,
|
||||
RecalculateCallback,
|
||||
ctypes.POINTER(LLModelPromptContext)]
|
||||
|
||||
|
||||
class LLModel:
|
||||
"""
|
||||
Base class and universal wrapper for GPT4All language models
|
||||
built around llmodel C-API.
|
||||
|
||||
Attributes
|
||||
----------
|
||||
model: llmodel_model
|
||||
Ctype pointer to underlying model
|
||||
model_type : str
|
||||
Model architecture identifier
|
||||
"""
|
||||
|
||||
model_type: str = None
|
||||
|
||||
def __init__(self):
|
||||
self.model = None
|
||||
self.model_name = None
|
||||
|
||||
def __del__(self):
|
||||
pass
|
||||
|
||||
def load_model(self, model_path: str) -> bool:
|
||||
"""
|
||||
Load model from a file.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
model_path : str
|
||||
Model filepath
|
||||
|
||||
Returns
|
||||
-------
|
||||
True if model loaded successfully, False otherwise
|
||||
"""
|
||||
llmodel.llmodel_loadModel(self.model, model_path.encode('utf-8'))
|
||||
filename = os.path.basename(model_path)
|
||||
self.model_name = os.path.splitext(filename)[0]
|
||||
|
||||
if llmodel.llmodel_isModelLoaded(self.model):
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def generate(self,
|
||||
prompt: str,
|
||||
logits_size: int = 0,
|
||||
tokens_size: int = 0,
|
||||
n_past: int = 0,
|
||||
n_ctx: int = 1024,
|
||||
n_predict: int = 128,
|
||||
top_k: int = 40,
|
||||
top_p: float = .9,
|
||||
temp: float = .1,
|
||||
n_batch: int = 8,
|
||||
repeat_penalty: float = 1.2,
|
||||
repeat_last_n: int = 10,
|
||||
context_erase: float = .5) -> str:
|
||||
"""
|
||||
Generate response from model from a prompt.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
prompt: str
|
||||
Question, task, or conversation for model to respond to
|
||||
add_default_header: bool, optional
|
||||
Whether to add a prompt header (default is True)
|
||||
add_default_footer: bool, optional
|
||||
Whether to add a prompt footer (default is True)
|
||||
verbose: bool, optional
|
||||
Whether to print prompt and response
|
||||
|
||||
Returns
|
||||
-------
|
||||
Model response str
|
||||
"""
|
||||
|
||||
prompt = prompt.encode('utf-8')
|
||||
prompt = ctypes.c_char_p(prompt)
|
||||
|
||||
# Change stdout to StringIO so we can collect response
|
||||
old_stdout = sys.stdout
|
||||
collect_response = StringIO()
|
||||
sys.stdout = collect_response
|
||||
|
||||
context = LLModelPromptContext(
|
||||
logits_size=logits_size,
|
||||
tokens_size=tokens_size,
|
||||
n_past=n_past,
|
||||
n_ctx=n_ctx,
|
||||
n_predict=n_predict,
|
||||
top_k=top_k,
|
||||
top_p=top_p,
|
||||
temp=temp,
|
||||
n_batch=n_batch,
|
||||
repeat_penalty=repeat_penalty,
|
||||
repeat_last_n=repeat_last_n,
|
||||
context_erase=context_erase
|
||||
)
|
||||
|
||||
llmodel.llmodel_prompt(self.model,
|
||||
prompt,
|
||||
ResponseCallback(self._prompt_callback),
|
||||
ResponseCallback(self._response_callback),
|
||||
RecalculateCallback(self._recalculate_callback),
|
||||
context)
|
||||
|
||||
response = collect_response.getvalue()
|
||||
sys.stdout = old_stdout
|
||||
|
||||
# Remove the unnecessary new lines from response
|
||||
response = re.sub(r"\n(?!\n)", "", response).strip()
|
||||
|
||||
return response
|
||||
|
||||
# Empty prompt callback
|
||||
@staticmethod
|
||||
def _prompt_callback(token_id, response):
|
||||
return True
|
||||
|
||||
# Empty response callback method that just prints response to be collected
|
||||
@staticmethod
|
||||
def _response_callback(token_id, response):
|
||||
print(response.decode('utf-8'))
|
||||
return True
|
||||
|
||||
# Empty recalculate callback
|
||||
@staticmethod
|
||||
def _recalculate_callback(is_recalculating):
|
||||
return is_recalculating
|
||||
|
||||
|
||||
class GPTJModel(LLModel):
|
||||
|
||||
model_type = "gptj"
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.model = llmodel.llmodel_gptj_create()
|
||||
|
||||
def __del__(self):
|
||||
if self.model is not None:
|
||||
llmodel.llmodel_gptj_destroy(self.model)
|
||||
super().__del__()
|
||||
|
||||
|
||||
class LlamaModel(LLModel):
|
||||
|
||||
model_type = "llama"
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.model = llmodel.llmodel_llama_create()
|
||||
|
||||
def __del__(self):
|
||||
if self.model is not None:
|
||||
llmodel.llmodel_llama_destroy(self.model)
|
||||
super().__del__()
|
||||
Reference in New Issue
Block a user