templates: Add neo4j semantic layer with ollama template (#17192)

A template with JSON-based agent using Mixtral via Ollama.

---------

Co-authored-by: Erick Friis <erick@langchain.dev>
This commit is contained in:
Tomaz Bratanic
2024-02-07 21:50:54 +01:00
committed by GitHub
parent f87acf0340
commit ecf8042a10
14 changed files with 2513 additions and 0 deletions

View File

@@ -0,0 +1,3 @@
from neo4j_semantic_ollama.agent import agent_executor
__all__ = ["agent_executor"]

View File

@@ -0,0 +1,110 @@
import os
from typing import List, Tuple
from langchain.agents import AgentExecutor
from langchain.agents.format_scratchpad import format_log_to_messages
from langchain.agents.output_parsers import (
ReActJsonSingleInputOutputParser,
)
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.pydantic_v1 import BaseModel, Field
from langchain.schema import AIMessage, HumanMessage
from langchain.tools.render import render_text_description_and_args
from langchain_community.chat_models import ChatOllama
from neo4j_semantic_ollama.information_tool import InformationTool
from neo4j_semantic_ollama.memory_tool import MemoryTool
from neo4j_semantic_ollama.recommendation_tool import RecommenderTool
from neo4j_semantic_ollama.smalltalk_tool import SmalltalkTool
llm = ChatOllama(
model="mixtral",
temperature=0,
base_url=os.environ["OLLAMA_BASE_URL"],
streaming=True,
)
chat_model_with_stop = llm.bind(stop=["\nObservation"])
tools = [InformationTool(), RecommenderTool(), MemoryTool(), SmalltalkTool()]
# Inspiration taken from hub.pull("hwchase17/react-json")
system_message = f"""Answer the following questions as best you can.
You can answer directly if the user is greeting you or similar.
Otherise, you have access to the following tools:
{render_text_description_and_args(tools).replace('{', '{{').replace('}', '}}')}
The way you use the tools is by specifying a json blob.
Specifically, this json should have a `action` key (with the name of the tool to use)
and a `action_input` key (with the input to the tool going here).
The only values that should be in the "action" field are: {[t.name for t in tools]}
The $JSON_BLOB should only contain a SINGLE action,
do NOT return a list of multiple actions.
Here is an example of a valid $JSON_BLOB:
```
{{{{
"action": $TOOL_NAME,
"action_input": $INPUT
}}}}
```
The $JSON_BLOB must always be enclosed with triple backticks!
ALWAYS use the following format:
Question: the input question you must answer
Thought: you should always think about what to do
Action:```
$JSON_BLOB
```
Observation: the result of the action...
(this Thought/Action/Observation can repeat N times)
Thought: I now know the final answer
Final Answer: the final answer to the original input question
Begin! Reminder to always use the exact characters `Final Answer` when responding.'
"""
prompt = ChatPromptTemplate.from_messages(
[
(
"user",
system_message,
),
MessagesPlaceholder(variable_name="chat_history"),
("user", "{input}"),
MessagesPlaceholder(variable_name="agent_scratchpad"),
]
)
def _format_chat_history(chat_history: List[Tuple[str, str]]):
buffer = []
for human, ai in chat_history:
buffer.append(HumanMessage(content=human))
buffer.append(AIMessage(content=ai))
return buffer
agent = (
{
"input": lambda x: x["input"],
"agent_scratchpad": lambda x: format_log_to_messages(x["intermediate_steps"]),
"chat_history": lambda x: _format_chat_history(x["chat_history"])
if x.get("chat_history")
else [],
}
| prompt
| chat_model_with_stop
| ReActJsonSingleInputOutputParser()
)
# Add typing for input
class AgentInput(BaseModel):
input: str
chat_history: List[Tuple[str, str]] = Field(
..., extra={"widget": {"type": "chat", "input": "input", "output": "output"}}
)
agent_executor = AgentExecutor(agent=agent, tools=tools).with_types(
input_type=AgentInput
)

View File

@@ -0,0 +1,74 @@
from typing import Optional, Type
from langchain.callbacks.manager import (
AsyncCallbackManagerForToolRun,
CallbackManagerForToolRun,
)
# Import things that are needed generically
from langchain.pydantic_v1 import BaseModel, Field
from langchain.tools import BaseTool
from neo4j_semantic_ollama.utils import get_candidates, graph
description_query = """
MATCH (m:Movie|Person)
WHERE m.title = $candidate OR m.name = $candidate
MATCH (m)-[r:ACTED_IN|DIRECTED|HAS_GENRE]-(t)
WITH m, type(r) as type, collect(coalesce(t.name, t.title)) as names
WITH m, type+": "+reduce(s="", n IN names | s + n + ", ") as types
WITH m, collect(types) as contexts
WITH m, "type:" + labels(m)[0] + "\ntitle: "+ coalesce(m.title, m.name)
+ "\nyear: "+coalesce(m.released,"") +"\n" +
reduce(s="", c in contexts | s + substring(c, 0, size(c)-2) +"\n") as context
RETURN context LIMIT 1
"""
def get_information(entity: str, type: str) -> str:
candidates = get_candidates(entity, type)
if not candidates:
return "No information was found about the movie or person in the database"
elif len(candidates) > 1:
newline = "\n"
return (
"Need additional information, which of these "
f"did you mean: {newline + newline.join(str(d) for d in candidates)}"
)
data = graph.query(
description_query, params={"candidate": candidates[0]["candidate"]}
)
return data[0]["context"]
class InformationInput(BaseModel):
entity: str = Field(description="movie or a person mentioned in the question")
entity_type: str = Field(
description="type of the entity. Available options are 'movie' or 'person'"
)
class InformationTool(BaseTool):
name = "Information"
description = (
"useful for when you need to answer questions about various actors or movies"
)
args_schema: Type[BaseModel] = InformationInput
def _run(
self,
entity: str,
entity_type: str,
run_manager: Optional[CallbackManagerForToolRun] = None,
) -> str:
"""Use the tool."""
return get_information(entity, entity_type)
async def _arun(
self,
entity: str,
entity_type: str,
run_manager: Optional[AsyncCallbackManagerForToolRun] = None,
) -> str:
"""Use the tool asynchronously."""
return get_information(entity, entity_type)

View File

@@ -0,0 +1,73 @@
from typing import Optional, Type
from langchain.callbacks.manager import (
AsyncCallbackManagerForToolRun,
CallbackManagerForToolRun,
)
# Import things that are needed generically
from langchain.pydantic_v1 import BaseModel, Field
from langchain.tools import BaseTool
from neo4j_semantic_ollama.utils import get_candidates, get_user_id, graph
store_rating_query = """
MERGE (u:User {userId:$user_id})
WITH u
UNWIND $candidates as row
MATCH (m:Movie {title: row.candidate})
MERGE (u)-[r:RATED]->(m)
SET r.rating = toFloat($rating)
RETURN distinct
'Create a final answer saying that preference has been stored' AS response
"""
def store_movie_rating(movie: str, rating: int):
user_id = get_user_id()
candidates = get_candidates(movie, "movie")
if not candidates:
return "This movie is not in our database"
response = graph.query(
store_rating_query,
params={"user_id": user_id, "candidates": candidates, "rating": rating},
)
try:
return response[0]["response"]
except Exception as e:
print(e)
return "Something went wrong"
class MemoryInput(BaseModel):
movie: str = Field(description="movie the user liked")
rating: int = Field(
description=(
"Rating from 1 to 5, where one represents heavy dislike "
"and 5 represent the user loved the movie"
)
)
class MemoryTool(BaseTool):
name = "Memory"
description = "useful for memorizing which movies the user liked"
args_schema: Type[BaseModel] = MemoryInput
def _run(
self,
movie: str,
rating: int,
run_manager: Optional[CallbackManagerForToolRun] = None,
) -> str:
"""Use the tool."""
return store_movie_rating(movie, rating)
async def _arun(
self,
movie: str,
rating: int,
run_manager: Optional[AsyncCallbackManagerForToolRun] = None,
) -> str:
"""Use the tool asynchronously."""
return store_movie_rating(movie, rating)

View File

@@ -0,0 +1,164 @@
from typing import Optional, Type
from langchain.callbacks.manager import (
AsyncCallbackManagerForToolRun,
CallbackManagerForToolRun,
)
from langchain.pydantic_v1 import BaseModel, Field
from langchain.tools import BaseTool
from neo4j_semantic_ollama.utils import get_candidates, get_user_id, graph
recommendation_query_db_history = """
MERGE (u:User {userId:$user_id})
WITH u
// get recommendation candidates
OPTIONAL MATCH (u)-[r1:RATED]->()<-[r2:RATED]-()-[r3:RATED]->(recommendation)
WHERE r1.rating > 3.5 AND r2.rating > 3.5 AND r3.rating > 3.5
AND NOT EXISTS {(u)-[:RATED]->(recommendation)}
// rank and limit recommendations
WITH u, recommendation, count(*) AS count
ORDER BY count DESC LIMIT 3
RETURN 'title:' + recommendation.title + '\nactors:' +
apoc.text.join([(recommendation)<-[:ACTED_IN]-(a) | a.name], ',') +
'\ngenre:' + apoc.text.join([(recommendation)-[:IN_GENRE]->(a) | a.name], ',')
AS movie
"""
recommendation_query_genre = """
MATCH (m:Movie)-[:IN_GENRE]->(g:Genre {name:$genre})
// filter out already seen movies by the user
WHERE NOT EXISTS {
(m)<-[:RATED]-(:User {userId:$user_id})
}
// rank and limit recommendations
WITH m AS recommendation
ORDER BY recommendation.imdbRating DESC LIMIT 3
RETURN 'title:' + recommendation.title + '\nactors:' +
apoc.text.join([(recommendation)<-[:ACTED_IN]-(a) | a.name], ',') +
'\ngenre:' + apoc.text.join([(recommendation)-[:IN_GENRE]->(a) | a.name], ',')
AS movie
"""
def recommendation_query_movie(genre: bool) -> str:
return f"""
MATCH (m1:Movie)<-[r1:RATED]-()-[r2:RATED]->(m2:Movie)
WHERE r1.rating > 3.5 AND r2.rating > 3.5 and m1.title IN $movieTitles
// filter out already seen movies by the user
AND NOT EXISTS {{
(m2)<-[:RATED]-(:User {{userId:$user_id}})
}}
{'AND EXISTS {(m2)-[:IN_GENRE]->(:Genre {name:$genre})}' if genre else ''}
// rank and limit recommendations
WITH m2 AS recommendation, count(*) AS count
ORDER BY count DESC LIMIT 3
RETURN 'title:' + recommendation.title + '\nactors:' +
apoc.text.join([(recommendation)<-[:ACTED_IN]-(a) | a.name], ',') +
'\ngenre:' + apoc.text.join([(recommendation)-[:IN_GENRE]->(a) | a.name], ',')
AS movie
"""
nl = "\n"
def recommend_movie(movie: Optional[str] = None, genre: Optional[str] = None) -> str:
"""
Recommends movies based on user's history and preference
for a specific movie and/or genre.
Returns:
str: A string containing a list of recommended movies, or an error message.
"""
user_id = get_user_id()
params = {"user_id": user_id, "genre": genre}
if not movie and not genre:
# Try to recommend a movie based on the information in the db
response = graph.query(recommendation_query_db_history, params)
try:
return (
'Recommended movies are: '
f'{f"###Movie {nl}".join([el["movie"] for el in response])}'
)
except Exception:
return "Can you tell us about some of the movies you liked?"
if not movie and genre:
# Recommend top voted movies in the genre the user haven't seen before
response = graph.query(recommendation_query_genre, params)
try:
return (
'Recommended movies are: '
f'{f"###Movie {nl}".join([el["movie"] for el in response])}'
)
except Exception:
return "Something went wrong"
candidates = get_candidates(movie, "movie")
if not candidates:
return "The movie you mentioned wasn't found in the database"
params["movieTitles"] = [el["candidate"] for el in candidates]
query = recommendation_query_movie(bool(genre))
response = graph.query(query, params)
try:
return (
'Recommended movies are: '
f'{f"###Movie {nl}".join([el["movie"] for el in response])}'
)
except Exception:
return "Something went wrong"
all_genres = [
"Action",
"Adventure",
"Animation",
"Children",
"Comedy",
"Crime",
"Documentary",
"Drama",
"Fantasy",
"Film-Noir",
"Horror",
"IMAX",
"Musical",
"Mystery",
"Romance",
"Sci-Fi",
"Thriller",
"War",
"Western",
]
class RecommenderInput(BaseModel):
movie: Optional[str] = Field(description="movie used for recommendation")
genre: Optional[str] = Field(
description=(
"genre used for recommendation. Available options are:" f"{all_genres}"
)
)
class RecommenderTool(BaseTool):
name = "Recommender"
description = "useful for when you need to recommend a movie"
args_schema: Type[BaseModel] = RecommenderInput
def _run(
self,
movie: Optional[str] = None,
genre: Optional[str] = None,
run_manager: Optional[CallbackManagerForToolRun] = None,
) -> str:
"""Use the tool."""
return recommend_movie(movie, genre)
async def _arun(
self,
movie: Optional[str] = None,
genre: Optional[str] = None,
run_manager: Optional[AsyncCallbackManagerForToolRun] = None,
) -> str:
"""Use the tool asynchronously."""
return recommend_movie(movie, genre)

View File

@@ -0,0 +1,39 @@
from typing import Optional, Type
from langchain.callbacks.manager import (
AsyncCallbackManagerForToolRun,
CallbackManagerForToolRun,
)
from langchain.pydantic_v1 import BaseModel, Field
from langchain.tools import BaseTool
response = (
"Create a final answer that says if they "
"have any questions about movies or actors"
)
class SmalltalkInput(BaseModel):
query: Optional[str] = Field(description="user query")
class SmalltalkTool(BaseTool):
name = "Smalltalk"
description = "useful for when user greets you or wants to smalltalk"
args_schema: Type[BaseModel] = SmalltalkInput
def _run(
self,
query: Optional[str] = None,
run_manager: Optional[CallbackManagerForToolRun] = None,
) -> str:
"""Use the tool."""
return response
async def _arun(
self,
query: Optional[str] = None,
run_manager: Optional[AsyncCallbackManagerForToolRun] = None,
) -> str:
"""Use the tool asynchronously."""
return response

View File

@@ -0,0 +1,84 @@
from typing import Dict, List
from langchain_community.graphs import Neo4jGraph
graph = Neo4jGraph()
def get_user_id() -> int:
"""
Placeholder for a function that would normally retrieve
a user's ID
"""
return 1
def remove_lucene_chars(text: str) -> str:
"""Remove Lucene special characters"""
special_chars = [
"+",
"-",
"&",
"|",
"!",
"(",
")",
"{",
"}",
"[",
"]",
"^",
'"',
"~",
"*",
"?",
":",
"\\",
]
for char in special_chars:
if char in text:
text = text.replace(char, " ")
return text.strip()
def generate_full_text_query(input: str) -> str:
"""
Generate a full-text search query for a given input string.
This function constructs a query string suitable for a full-text search.
It processes the input string by splitting it into words and appending a
similarity threshold (~0.8) to each word, then combines them using the AND
operator. Useful for mapping movies and people from user questions
to database values, and allows for some misspelings.
"""
full_text_query = ""
words = [el for el in remove_lucene_chars(input).split() if el]
for word in words[:-1]:
full_text_query += f" {word}~0.8 AND"
full_text_query += f" {words[-1]}~0.8"
return full_text_query.strip()
candidate_query = """
CALL db.index.fulltext.queryNodes($index, $fulltextQuery, {limit: $limit})
YIELD node
RETURN coalesce(node.name, node.title) AS candidate,
[el in labels(node) WHERE el IN ['Person', 'Movie'] | el][0] AS label
"""
def get_candidates(input: str, type: str, limit: int = 3) -> List[Dict[str, str]]:
"""
Retrieve a list of candidate entities from database based on the input string.
This function queries the Neo4j database using a full-text search. It takes the
input string, generates a full-text query, and executes this query against the
specified index in the database. The function returns a list of candidates
matching the query, with each candidate being a dictionary containing their name
(or title) and label (either 'Person' or 'Movie').
"""
ft_query = generate_full_text_query(input)
candidates = graph.query(
candidate_query, {"fulltextQuery": ft_query, "index": type, "limit": limit}
)
return candidates