from langchain.prompts import ChatPromptTemplate from langchain.load import dumps, loads from langchain_core.output_parsers import StrOutputParser from langchain_openai import ChatOpenAI from langchain_community.llms import Ollama from langchain_community.chat_models import ChatOllama from operator import itemgetter from langchain_core.runnables import RunnablePassthrough from langchain import hub from langchain.globals import set_llm_cache from langchain import PromptTemplate from langchain_core.runnables import ( RunnableBranch, RunnableLambda, RunnableParallel, RunnablePassthrough, ) from typing import Tuple, List, Optional from langchain_core.messages import AIMessage, HumanMessage from datasets import Dataset from ragas import evaluate from ragas.metrics import ( answer_relevancy, faithfulness, context_recall, context_precision, ) from typing import List import os from dotenv import load_dotenv load_dotenv('environment.env') ######################################################################################################################## ######################################################################################################################## from langchain.cache import SQLiteCache from langchain.cache import RedisSemanticCache from langchain_openai import OpenAIEmbeddings from langchain.globals import set_llm_cache ######################################################################################################################## import requests import openai openai_api_key = os.getenv("OPENAI_API_KEY") openai.api_key = openai_api_key URI = os.getenv("SUPABASE_URI") # 設置緩存,以減少對API的重複請求。使用Redis # set_llm_cache(SQLiteCache(database_path=".langchain.db")) # set_llm_cache(RedisSemanticCache(redis_url="redis://localhost:6380", embedding=OpenAIEmbeddings(openai_api_key=openai_api_key), score_threshold=0.0005)) # # TAIDE model on Ollama https://ollama.com/jcai/llama3-taide-lx-8b-chat-alpha1 # def interact_with_model(messages, api_url="http://localhost:11434/v1/chat/completions"): # print("Using model: TAIDE") # response = requests.post(api_url, json={"model": "jcai/llama3-taide-lx-8b-chat-alpha1:Q4_K_M", "messages": messages}) # return response.json()["choices"][0]["message"]["content"] # class CustomTAIDELLM(LLM): # api_url: str = "http://localhost:11434/v1/chat/completions" # def _call(self, prompt: str, stop: Optional[List[str]] = None) -> str: # messages = [{"role": "user", "content": prompt}] # response = requests.post(self.api_url, json={ # "model": "taide-local", # Use your local model name # "messages": messages # }) # return response.json()["choices"][0]["message"]["content"] # @property # def _llm_type(self) -> str: # return "custom_taide" # # Create an instance of the custom LLM # taide_llm = CustomTAIDELLM() # 生成多個不同版本的問題,進行檢索,並返回答案和參考文檔 def multi_query(question, retriever, chat_history): def multi_query_chain(): # Multi Query: Different Perspectives template = """You are an AI language model assistant. Your task is to generate three different versions of the given user question to retrieve relevant documents from a vector database. By generating multiple perspectives on the user question, your goal is to help the user overcome some of the limitations of the distance-based similarity search. Provide these alternative questions separated by newlines. You must return original question also, which means that you return 1 original version + 3 different versions = 4 questions. Original question: {question}""" prompt_perspectives = ChatPromptTemplate.from_template(template) messages = [ {"role": "system", "content": template}, {"role": "user", "content": question}, ] # generate_queries = interact_with_model(messages).split("\n") llm = ChatOpenAI(model="gpt-4-1106-preview") # llm = ChatOllama(model="llama3", num_gpu=1, temperature=0) # llm = ChatOllama(model="gemma2", temperature=0) # llm = ChatOllama(model=model) generate_queries = ( prompt_perspectives | llm | StrOutputParser() | (lambda x: x.split("\n")) ) return generate_queries def get_unique_union(documents: List[list]): """ Unique union of retrieved docs """ # Flatten list of lists, and convert each Document to string flattened_docs = [dumps(doc) for sublist in documents for doc in sublist] # Get unique documents unique_docs = list(set(flattened_docs)) # Return return [loads(doc) for doc in unique_docs] _search_query = get_search_query() modified_question = _search_query.invoke({"question":question, "chat_history": chat_history}) print(modified_question) generate_queries = multi_query_chain() retrieval_chain = generate_queries | retriever.map() | get_unique_union docs = retrieval_chain.invoke({"question":modified_question}) answer = multi_query_rag_prompt(retrieval_chain, modified_question) return answer, docs # 根據檢索到的文檔和用戶問題生成最後答案 def multi_query_rag_prompt(retrieval_chain, question): # RAG template = """Answer the following question based on this context: {context} Question: {question} Output in user's language. If the question is in zh-tw, then the output will be in zh-tw. If the question is in English, then the output will be in English\n You should not mention anything about "根據提供的文件內容" or other similar terms. If you don't know the answer, just say that "很抱歉,目前我無法回答您的問題,請將您的詢問發送至 test@email.com 以便獲得更進一步的幫助,謝謝。I'm sorry I cannot answer your question. Please send your question to test@email.com for further assistance. Thank you." """ prompt = ChatPromptTemplate.from_template(template) context = retrieval_chain.invoke({"question": question}) # Ensure this returns the context # llm = ChatOpenAI(temperature=0) llm = ChatOpenAI(model="gpt-4-1106-preview") # llm = ChatOllama(model="llama3", num_gpu=1, temperature=0) # llm = ChatOllama(model="gemma2", temperature=0) final_rag_chain = ( {"context": retrieval_chain, "question": itemgetter("question")} | prompt | llm | StrOutputParser() ) messages = [ {"role": "system", "content": template}, {"role": "user", "content": question}, {"role": "assistant", "content": context} ] # answer = interact_with_model(messages) answer = final_rag_chain.invoke({"question":question}) answer = "" for text in final_rag_chain.stream({"question":question}): print(text, end="", flush=True) answer += text return answer ######################################################################################################################## # 將聊天紀錄個跟進問題轉化為獨立問題 def get_search_query(): # Condense a chat history and follow-up question into a standalone question # # _template = """Given the following conversation and a follow up question, # rephrase the follow up question to be a standalone question to help others understand the question without having to go back to the conversation transcript. # Generate standalone question in its original language. # Chat History: # {chat_history} # Follow Up Input: {question} # Hint: # * Refer to chat history and add the subject to the question # * Replace the pronouns in the question with the correct person or thing, please refer to chat history # Standalone question:""" # noqa: E501 _template = """Rewrite the following query by incorporating relevant context from the conversation history. The rewritten query should: - Preserve the core intent and meaning of the original query - Expand and clarify the query to make it more specific and informative for retrieving relevant context - Avoid introducing new topics or queries that deviate from the original query - DONT EVER ANSWER the Original query, but instead focus on rephrasing and expanding it into a new query - The rewritten query should be in its original language. Return ONLY the rewritten query text, without any additional formatting or explanations. Conversation History: {chat_history} Original query: [{question}] Rewritten query: """ CONDENSE_QUESTION_PROMPT = PromptTemplate.from_template(_template) def _format_chat_history(chat_history: List[Tuple[str, str]]) -> List: buffer = [] for human, ai in chat_history: buffer.append(HumanMessage(content=human)) buffer.append(AIMessage(content=ai)) return buffer _search_query = RunnableBranch( # If input includes chat_history, we condense it with the follow-up question ( RunnableLambda(lambda x: bool(x.get("chat_history"))).with_config( run_name="HasChatHistoryCheck" ), # Condense follow-up question and chat into a standalone_question RunnablePassthrough.assign( chat_history=lambda x: _format_chat_history(x["chat_history"]) ) | CONDENSE_QUESTION_PROMPT | ChatOpenAI() | StrOutputParser(), ), # Else, we have no chat history, so just pass through the question RunnableLambda(lambda x : x["question"]), ) return _search_query ######################################################################################################################## # 檢索文檔並生成答案 def naive_rag(question, retriever): #### RETRIEVAL and GENERATION #### # Prompt prompt = hub.pull("rlm/rag-prompt") # LLM llm = ChatOpenAI(model_name="gpt-3.5-turbo") # Post-processing def format_docs(docs): return "\n\n".join(doc.page_content for doc in docs) reference = retriever.get_relevant_documents(question) # Chain rag_chain = ( {"context": retriever | format_docs, "question": RunnablePassthrough()} | prompt | llm | StrOutputParser() ) # Question answer = rag_chain.invoke(question) return answer, reference ################################################################################################ # 處理question-answer pairs的檢索和生成答案 def naive_rag_for_qapairs(question, retriever): #### RETRIEVAL and GENERATION #### # Prompt # prompt = hub.pull("rlm/rag-prompt") template = """You are an assistant for question-answering tasks. Use the following pieces of retrieved context to answer the question. Following retrieved context is question-answer pairs of historical QA, Find the suitable answer from the qa pairs If you can not find the suitable answer, just return "False". Use three sentences maximum and Do not make up the answer. Output in user's language. If the question is in zh-tw, then the output will be in zh-tw. {context} Question: {question} """ prompt = PromptTemplate.from_template(template) # LLM llm = ChatOpenAI(model_name="gpt-4-0125-preview") # llm = ChatOllama(model="llama3", num_gpu=1, temperature=0) # llm = ChatOllama(model="gemma2", num_gpu=1, temperature=0) # Post-processing def format_docs(docs): return "\n\n".join(doc.page_content for doc in docs) reference = retriever.get_relevant_documents(question) # Chain rag_chain = ( {"context": retriever | format_docs, "question": RunnablePassthrough()} | prompt | llm | StrOutputParser() ) # Question answer = rag_chain.invoke(question) return answer, reference ######################################################################################################################## def rag_score(question, ground_truth, answer, reference_docs): datasets = { "question": [question], # question: list[str] "answer": [answer], # answer: list[str] "contexts": [reference_docs], # contexts: list[list[str]] "ground_truths": [[ground_truth]] # ground_truth: list[list[str]] } evalsets = Dataset.from_dict(datasets) result = evaluate( evalsets, metrics=[ context_precision, faithfulness, answer_relevancy, context_recall, ], ) result_df = result.to_pandas() print(result_df.head()) result_df.to_csv('ragas_rag.csv') return result