from dotenv import load_dotenv load_dotenv("../.env") from langchain_openai import OpenAIEmbeddings from langchain_community.vectorstores import Chroma from langchain_community.document_loaders import TextLoader from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_community.document_loaders import PyPDFLoader from langchain_community.document_loaders import Docx2txtLoader from langchain.document_loaders import CSVLoader import os import glob from langchain_community.vectorstores import SupabaseVectorStore from langchain_openai import OpenAIEmbeddings from supabase.client import Client, create_client document_table = "documents" def get_data_list(data_list=None, path=None, extension=None, update=False): files = data_list or glob.glob(os.path.join(path, f"*.{extension}")) if update: doc = files.copy() else: existed_data = check_existed_data(supabase) doc = [] for file_path in files: filename = os.path.basename(file_path) if filename not in existed_data: doc.append(file_path) return doc def read_and_split_files(data_list=None, path=None, extension=None, update=False): def read_csv(path): extension = "csv" # path = r"./Phase2/" files = glob.glob(os.path.join(path, f"*.{extension}")) documents = [] for file_path in files: print(file_path) loader = CSVLoader(file_path, encoding="utf-8") doc = loader.load() documents.extend(doc) return documents def load_and_split(file_list): chunks = [] for file in file_list: if file.endswith(".txt"): loader = TextLoader(file, encoding='utf-8') elif file.endswith(".pdf"): loader = PyPDFLoader(file) elif file.endswith(".docx"): loader = Docx2txtLoader(file) else: print(f"Unsupported file extension: {file}") continue docs = loader.load() # Split if file.endswith(".docx"): separators = ['\u25cb\s*第.*?條', '\u25cf\s*第.*?條'] text_splitter = RecursiveCharacterTextSplitter(is_separator_regex=True, separators=separators, chunk_size=300, chunk_overlap=0) else: text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(chunk_size=500, chunk_overlap=0) splits = text_splitter.split_documents(docs) chunks.extend(splits) doc = read_csv(path) chunks.extend(doc) return chunks # specific data type doc = get_data_list(data_list=data_list, path=path, extension=extension, update=update) # web url # csv # doc = read_csv(path) # Index docs = load_and_split(doc) return docs def create_ids(docs): # Create a dictionary to count occurrences of each page in each document page_counter = {} # List to store the resulting IDs document_ids = [] # Generate IDs for doc in [docs[i].metadata for i in range(len(docs))]: source = doc['source'] file_name = os.path.basename(source).split('.')[0] if "page" in doc.keys(): page = doc['page'] key = f"{source}_{page}" else: key = f"{source}" if key not in page_counter: page_counter[key] = 1 else: page_counter[key] += 1 if "page" in doc.keys(): doc_id = f"{file_name} | page {page} | chunk {page_counter[key]}" else: doc_id = f"{file_name} | chunk {page_counter[key]}" document_ids.append(doc_id) return document_ids def get_document(data_list=None, path=None, extension=None, update=False): docs = read_and_split_files(data_list=data_list, path=path, extension=extension, update=update) document_ids = create_ids(docs) for doc in docs: doc.metadata['source'] = os.path.basename(doc.metadata['source']) # print(doc.metadata) # document_metadatas = [{'source': doc.metadata['source'], 'page': doc.metadata['page'], 'chunk': int(id.split("chunk ")[-1])} for doc, id in zip(docs, document_ids)] document_metadatas = [] for doc, id in zip(docs, document_ids): chunk_number = int(id.split("chunk ")[-1]) doc.metadata['chunk'] = chunk_number doc.metadata['extension'] = os.path.basename(doc.metadata['source']).split(".")[-1] document_metadatas.append(doc.metadata) documents = [docs.metadata['source'].split(".")[0] + docs.page_content for docs in docs] return document_ids, documents, document_metadatas def check_existed_data(supabase): response = supabase.table(document_table).select("id, metadata").execute() existed_data = list(set([data['metadata']['source'] for data in response.data])) # existed_data = [(data['id'], data['metadata']['source']) for data in response.data] return existed_data class GetVectorStore(SupabaseVectorStore): def __init__(self, embeddings, supabase, table_name): super().__init__(embedding=embeddings, client=supabase, table_name=table_name, query_name="match_documents") def insert(self, documents, document_metadatas): self.add_texts( texts=documents, metadatas=document_metadatas, ) def delete(self, file_list): for file_name in file_list: self._client.table(self.table_name).delete().eq('metadata->>source', file_name).execute() def update(self, documents, document_metadatas, update_existing_data=False): if not document_metadatas: # no new data return if update_existing_data: file_list = list(set(metadata['source'] for metadata in document_metadatas)) self.delete(file_list) self.insert(documents, document_metadatas) if __name__ == "__main__": load_dotenv("../.env") supabase_url = os.environ.get("SUPABASE_URL") supabase_key = os.environ.get("SUPABASE_KEY") document_table = "documents" supabase: Client = create_client(supabase_url, supabase_key) embeddings = OpenAIEmbeddings() ################################################################################### # get vector store vector_store = GetVectorStore(embeddings, supabase, document_table) ################################################################################### # update data (old + new / all new / all old) path = "/home/mia/systex/Documents" extension = "pdf" # file = None # file_list = ["溫室氣體排放量盤查作業指引113年版.pdf"] # file = [os.path.join(path, file) for file in file_list] # file_list = glob.glob(os.path.join(path, "*")) file_list =glob.glob(os.path.join(path, f"*.{extension}")) # print(file_list) # update = False # document_ids, documents, document_metadatas = get_document(data_list=file_list, path=path, extension=extension, update=update) # vector_store.update(documents, document_metadatas, update_existing_data=update) ################################################################################### # insert new data (all new) # vector_store.insert(documents, document_metadatas) ################################################################################### # delete data # file_list = ["溫室氣體排放量盤查作業指引113年版.pdf"] file_list = glob.glob(os.path.join(path, f"*.docx")) file_list = [os.path.basename(file_path) for file_path in file_list] print(file_list) vector_store.delete(file_list) ################################################################################### # get retriver # retriever = vector_store.as_retriever(search_kwargs={"k": 6})