123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268 |
- from dotenv import load_dotenv
- load_dotenv("../.env")
- from langchain_openai import OpenAIEmbeddings
- from langchain_community.vectorstores import Chroma
- from langchain_community.document_loaders import TextLoader
- from langchain_text_splitters import MarkdownHeaderTextSplitter, RecursiveCharacterTextSplitter
- from langchain_community.document_loaders import PyPDFLoader
- from langchain_community.document_loaders import Docx2txtLoader
- from langchain.document_loaders import CSVLoader
- import os
- import glob
- from langchain_community.vectorstores import SupabaseVectorStore
- from langchain_openai import OpenAIEmbeddings
- from supabase.client import Client, create_client
- load_dotenv()
- supabase_url = os.environ.get("SUPABASE_URL")
- supabase_key = os.environ.get("SUPABASE_KEY")
- document_table = "documents2"
- supabase: Client = create_client(supabase_url, supabase_key)
- def get_data_list(data_list=None, path=None, extension=None, update=False):
- files = data_list or glob.glob(os.path.join(path, f"*.{extension}"))
- if update:
- doc = files.copy()
- else:
- existed_data = check_existed_data(supabase)
- doc = []
- for file_path in files:
- filename = os.path.basename(file_path)
- if filename not in existed_data:
- doc.append(file_path)
- return doc
- def read_and_split_files(data_list=None, path=None, extension=None, update=False):
- def read_csv(path):
- extension = "csv"
- # path = r"./Phase2/"
- files = glob.glob(os.path.join(path, f"*.{extension}"))
- if not files:
- return None
- documents = []
- for file_path in files:
- print(file_path)
- loader = CSVLoader(file_path, encoding="utf-8")
- doc = loader.load()
- documents.extend(doc)
-
- return documents
-
- def load_and_split(file_list):
- chunks = []
- for file in file_list:
- if file.endswith(".txt") or file.endswith(".md"):
- loader = TextLoader(file, encoding='utf-8')
- docs = loader.load()
- elif file.endswith(".pdf"):
- loader = PyPDFLoader(file)
- docs = loader.load()
- elif file.endswith(".docx"):
- loader = Docx2txtLoader(file)
- docs = loader.load()
- else:
- print(f"Unsupported file extension: {file}")
- continue
- # Split
- rules = ['低碳產品獎勵辦法.docx', '公私場所固定污染源空氣污染物排放量申報管理辦法.docx', '氣候變遷因應法.docx', '氣候變遷因應法施行細則.docx',
- '淘汰老舊機車換購電動機車溫室氣體減量獎勵辦法訂定總說明及逐條說明.docx', '溫室氣體抵換專案管理辦法.docx', '溫室氣體排放源符合效能標準獎勵辦法.docx',
- '溫室氣體排放量增量抵換管理辦法.docx', '溫室氣體排放量增量抵換管理辦法訂定總說明及逐條說明.docx', '溫室氣體排放量盤查登錄及查驗管理辦法修正條文.docx',
- '溫室氣體排放量盤查登錄管理辦法(溫室氣體排放量盤查登錄及查驗管理辦法修正條文前身).docx', '溫室氣體自願減量專案管理辦法.docx',
- '溫室氣體自願減量專案管理辦法中華民國112年10月12日訂定總說明及逐條說明.docx', '溫室氣體認證機構及查驗機構管理辦法.docx',
- '溫室氣體階段管制目標及管制方式作業準則.docx', '碳足跡產品類別規則訂定、引用及修訂指引.docx',
- '老舊汽車汰舊換新溫室氣體減量獎勵辦法中華民國112年1月11日訂定總說明及逐條說明.docx']
- print(os.path.basename(file))
- if file.endswith(".docx") and os.path.basename(file) in rules:
- separators = ['\u25cb\s*第.*?條', '\u25cf\s*第.*?條']
- text_splitter = RecursiveCharacterTextSplitter(is_separator_regex=True, separators=separators, chunk_size=500, chunk_overlap=0)
- splits = text_splitter.split_documents(docs)
- elif os.path.basename(file) in ["new_information.docx"]:
- print(file)
- separators = ['###']
- text_splitter = RecursiveCharacterTextSplitter(is_separator_regex=True, separators=separators, chunk_size=500, chunk_overlap=0)
- splits = text_splitter.split_documents(docs)
- elif file.endswith(".md"):
- headers_to_split_on = [
- ("#", "Header 1"),
- ("##", "Header 2"),
- ("###", "Header 3"),
- ]
- markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on=headers_to_split_on)
- splits = markdown_splitter.split_text(docs[0].page_content)
- for split in splits:
- split
-
- else:
- text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(chunk_size=500, chunk_overlap=0)
- splits = text_splitter.split_documents(docs)
- chunks.extend(splits)
- # doc = read_csv(path)
- # chunks.extend(doc)
- return chunks
-
- # specific data type
- doc = get_data_list(data_list=data_list, path=path, extension=extension, update=update)
- # web url
- # csv
- # doc = read_csv(path)
- # Index
- docs = load_and_split(doc)
- return docs
- def create_ids(docs):
- # Create a dictionary to count occurrences of each page in each document
- page_counter = {}
- # List to store the resulting IDs
- document_ids = []
- # Generate IDs
- for doc in [docs[i].metadata for i in range(len(docs))]:
- if "source" in doc.keys():
- source = doc['source']
- file_name = os.path.basename(source).split('.')[0]
- else:
- source = "supplement"
- file_name = "supplement"
- if "page" in doc.keys():
- page = doc['page']
- key = f"{source}_{page}"
- else:
- key = f"{source}"
- if key not in page_counter:
- page_counter[key] = 1
- else:
- page_counter[key] += 1
-
- if "page" in doc.keys():
- doc_id = f"{file_name} | page {page} | chunk {page_counter[key]}"
- else:
- doc_id = f"{file_name} | chunk {page_counter[key]}"
-
- document_ids.append(doc_id)
- return document_ids
- def get_document(data_list=None, path=None, extension=None, update=False):
- docs = read_and_split_files(data_list=data_list, path=path, extension=extension, update=update)
- document_ids = create_ids(docs)
- for doc in docs:
- doc.metadata['source'] = os.path.basename(doc.metadata['source']) if 'source' in doc.metadata else "supplement.md"
- # print(doc.metadata)
- # document_metadatas = [{'source': doc.metadata['source'], 'page': doc.metadata['page'], 'chunk': int(id.split("chunk ")[-1])} for doc, id in zip(docs, document_ids)]
- document_metadatas = []
- for doc, id in zip(docs, document_ids):
- chunk_number = int(id.split("chunk ")[-1])
- doc.metadata['chunk'] = chunk_number
- doc.metadata['extension'] = os.path.basename(doc.metadata['source']).split(".")[-1]
- document_metadatas.append(doc.metadata)
- documents = [docs.metadata['source'].split(".")[0] + docs.page_content for docs in docs]
- return document_ids, documents, document_metadatas
- def check_existed_data(supabase):
- response = supabase.table(document_table).select("id, metadata").execute()
- existed_data = list(set([data['metadata']['source'] for data in response.data]))
- # existed_data = [(data['id'], data['metadata']['source']) for data in response.data]
- return existed_data
- class GetVectorStore(SupabaseVectorStore):
- def __init__(self, embeddings, supabase, table_name):
- super().__init__(embedding=embeddings, client=supabase, table_name=table_name, query_name="match_documents")
- def insert(self, documents, document_metadatas):
- self.add_texts(
- texts=documents,
- metadatas=document_metadatas,
- )
- def delete(self, file_list):
- for file in file_list:
- file_name = os.path.basename(file)
- self._client.table(self.table_name).delete().eq('metadata->>source', file_name).execute()
- def update(self, documents, document_metadatas, update_existing_data=False):
- if not document_metadatas: # no new data
- return
- if update_existing_data:
- file_list = list(set(metadata['source'] for metadata in document_metadatas))
- self.delete(file_list)
- self.insert(documents, document_metadatas)
- if __name__ == "__main__":
- load_dotenv("../.env")
- supabase_url = os.environ.get("SUPABASE_URL")
- supabase_key = os.environ.get("SUPABASE_KEY")
- document_table = "documents"
- supabase: Client = create_client(supabase_url, supabase_key)
- embeddings = OpenAIEmbeddings()
- ###################################################################################
- # get vector store
- vector_store = GetVectorStore(embeddings, supabase, document_table)
- ###################################################################################
- # update data (old + new / all new / all old)
- path = "/home/mia/systex/Documents"
- extension = "pdf"
- # file = None
- # file_list = ["溫室氣體排放量盤查作業指引113年版.pdf"]
- # file = [os.path.join(path, file) for file in file_list]
- # file_list = glob.glob(os.path.join(path, "*"))
- file_list =glob.glob(os.path.join(path, f"*.{extension}"))
- # print(file_list)
-
- # update = False
- # document_ids, documents, document_metadatas = get_document(data_list=file_list, path=path, extension=extension, update=update)
- # vector_store.update(documents, document_metadatas, update_existing_data=update)
- ###################################################################################
- # insert new data (all new)
- # vector_store.insert(documents, document_metadatas)
- ###################################################################################
- # delete data
- # file_list = ["溫室氣體排放量盤查作業指引113年版.pdf"]
- # file_list = glob.glob(os.path.join(path, f"*.docx"))
- # file_list = [os.path.basename(file_path) for file_path in file_list]
- # print(file_list)
- # vector_store.delete(file_list)
- ###################################################################################
- # get retriver
- # retriever = vector_store.as_retriever(search_kwargs={"k": 6})
|