123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395 |
- from langchain_community.vectorstores import Chroma
- from typing import Dict, List, Optional, Tuple
- import numpy as np
- import pandas as pd
- import umap
- from langchain.prompts import ChatPromptTemplate
- from langchain_core.output_parsers import StrOutputParser
- from sklearn.mixture import GaussianMixture
- from dotenv import load_dotenv
- load_dotenv()
- import os
- os.environ["PATH"] += os.pathsep + "C:/Users/lzl/anaconda3/Lib/site-packages/poppler-24.02.0/Library/bin"
- os.environ["PATH"] += os.pathsep + r"C:\Program Files\Tesseract-OCR\tessdata"
- os.environ["PATH"] += os.pathsep + r"C:\Program Files\Tesseract-OCR"
- os.environ["TESSDATA_PREFIX"] = r"C:\Program Files\Tesseract-OCR\tessdata"
- from langchain_openai import OpenAIEmbeddings
- embd = OpenAIEmbeddings()
- from langchain_openai import ChatOpenAI
- model = ChatOpenAI(temperature=0, model="gpt-4-1106-preview")
- RANDOM_SEED = 224 # Fixed seed for reproducibility
- ### --- Code from citations referenced above (added comments and docstrings) --- ###
- def global_cluster_embeddings(
- embeddings: np.ndarray,
- dim: int,
- n_neighbors: Optional[int] = None,
- metric: str = "cosine",
- ) -> np.ndarray:
- """
- Perform global dimensionality reduction on the embeddings using UMAP.
- Parameters:
- - embeddings: The input embeddings as a numpy array.
- - dim: The target dimensionality for the reduced space.
- - n_neighbors: Optional; the number of neighbors to consider for each point.
- If not provided, it defaults to the square root of the number of embeddings.
- - metric: The distance metric to use for UMAP.
- Returns:
- - A numpy array of the embeddings reduced to the specified dimensionality.
- """
- if n_neighbors is None:
- n_neighbors = int((len(embeddings) - 1) ** 0.5)
- return umap.UMAP(
- n_neighbors=n_neighbors, n_components=dim, metric=metric
- ).fit_transform(embeddings)
- def local_cluster_embeddings(
- embeddings: np.ndarray, dim: int, num_neighbors: int = 10, metric: str = "cosine"
- ) -> np.ndarray:
- """
- Perform local dimensionality reduction on the embeddings using UMAP, typically after global clustering.
- Parameters:
- - embeddings: The input embeddings as a numpy array.
- - dim: The target dimensionality for the reduced space.
- - num_neighbors: The number of neighbors to consider for each point.
- - metric: The distance metric to use for UMAP.
- Returns:
- - A numpy array of the embeddings reduced to the specified dimensionality.
- """
- return umap.UMAP(
- n_neighbors=num_neighbors, n_components=dim, metric=metric
- ).fit_transform(embeddings)
- def get_optimal_clusters(
- embeddings: np.ndarray, max_clusters: int = 50, random_state: int = RANDOM_SEED
- ) -> int:
- """
- Determine the optimal number of clusters using the Bayesian Information Criterion (BIC) with a Gaussian Mixture Model.
- Parameters:
- - embeddings: The input embeddings as a numpy array.
- - max_clusters: The maximum number of clusters to consider.
- - random_state: Seed for reproducibility.
- Returns:
- - An integer representing the optimal number of clusters found.
- """
- max_clusters = min(max_clusters, len(embeddings))
- n_clusters = np.arange(1, max_clusters)
- bics = []
- for n in n_clusters:
- gm = GaussianMixture(n_components=n, random_state=random_state)
- gm.fit(embeddings)
- bics.append(gm.bic(embeddings))
- return n_clusters[np.argmin(bics)]
- def GMM_cluster(embeddings: np.ndarray, threshold: float, random_state: int = 0):
- """
- Cluster embeddings using a Gaussian Mixture Model (GMM) based on a probability threshold.
- Parameters:
- - embeddings: The input embeddings as a numpy array.
- - threshold: The probability threshold for assigning an embedding to a cluster.
- - random_state: Seed for reproducibility.
- Returns:
- - A tuple containing the cluster labels and the number of clusters determined.
- """
- n_clusters = get_optimal_clusters(embeddings)
- gm = GaussianMixture(n_components=n_clusters, random_state=random_state)
- gm.fit(embeddings)
- probs = gm.predict_proba(embeddings)
- labels = [np.where(prob > threshold)[0] for prob in probs]
- return labels, n_clusters
- def perform_clustering(
- embeddings: np.ndarray,
- dim: int,
- threshold: float,
- ) -> List[np.ndarray]:
- """
- Perform clustering on the embeddings by first reducing their dimensionality globally, then clustering
- using a Gaussian Mixture Model, and finally performing local clustering within each global cluster.
- Parameters:
- - embeddings: The input embeddings as a numpy array.
- - dim: The target dimensionality for UMAP reduction.
- - threshold: The probability threshold for assigning an embedding to a cluster in GMM.
- Returns:
- - A list of numpy arrays, where each array contains the cluster IDs for each embedding.
- """
- if len(embeddings) <= dim + 1:
- # Avoid clustering when there's insufficient data
- return [np.array([0]) for _ in range(len(embeddings))]
- # Global dimensionality reduction
- reduced_embeddings_global = global_cluster_embeddings(embeddings, dim)
- # Global clustering
- global_clusters, n_global_clusters = GMM_cluster(
- reduced_embeddings_global, threshold
- )
- all_local_clusters = [np.array([]) for _ in range(len(embeddings))]
- total_clusters = 0
- # Iterate through each global cluster to perform local clustering
- for i in range(n_global_clusters):
- # Extract embeddings belonging to the current global cluster
- global_cluster_embeddings_ = embeddings[
- np.array([i in gc for gc in global_clusters])
- ]
- if len(global_cluster_embeddings_) == 0:
- continue
- if len(global_cluster_embeddings_) <= dim + 1:
- # Handle small clusters with direct assignment
- local_clusters = [np.array([0]) for _ in global_cluster_embeddings_]
- n_local_clusters = 1
- else:
- # Local dimensionality reduction and clustering
- reduced_embeddings_local = local_cluster_embeddings(
- global_cluster_embeddings_, dim
- )
- local_clusters, n_local_clusters = GMM_cluster(
- reduced_embeddings_local, threshold
- )
- # Assign local cluster IDs, adjusting for total clusters already processed
- for j in range(n_local_clusters):
- local_cluster_embeddings_ = global_cluster_embeddings_[
- np.array([j in lc for lc in local_clusters])
- ]
- indices = np.where(
- (embeddings == local_cluster_embeddings_[:, None]).all(-1)
- )[1]
- for idx in indices:
- all_local_clusters[idx] = np.append(
- all_local_clusters[idx], j + total_clusters
- )
- total_clusters += n_local_clusters
- return all_local_clusters
- ### --- Our code below --- ###
- def embed(texts):
- """
- Generate embeddings for a list of text documents.
- This function assumes the existence of an `embd` object with a method `embed_documents`
- that takes a list of texts and returns their embeddings.
- Parameters:
- - texts: List[str], a list of text documents to be embedded.
- Returns:
- - numpy.ndarray: An array of embeddings for the given text documents.
- """
- text_embeddings = embd.embed_documents(texts)
- text_embeddings_np = np.array(text_embeddings)
- return text_embeddings_np
- def embed_cluster_texts(texts):
- """
- Embeds a list of texts and clusters them, returning a DataFrame with texts, their embeddings, and cluster labels.
- This function combines embedding generation and clustering into a single step. It assumes the existence
- of a previously defined `perform_clustering` function that performs clustering on the embeddings.
- Parameters:
- - texts: List[str], a list of text documents to be processed.
- Returns:
- - pandas.DataFrame: A DataFrame containing the original texts, their embeddings, and the assigned cluster labels.
- """
- text_embeddings_np = embed(texts) # Generate embeddings
- cluster_labels = perform_clustering(
- text_embeddings_np, 10, 0.1
- ) # Perform clustering on the embeddings
- df = pd.DataFrame() # Initialize a DataFrame to store the results
- df["text"] = texts # Store original texts
- df["embd"] = list(text_embeddings_np) # Store embeddings as a list in the DataFrame
- df["cluster"] = cluster_labels # Store cluster labels
- return df
- def fmt_txt(df: pd.DataFrame) -> str:
- """
- Formats the text documents in a DataFrame into a single string.
- Parameters:
- - df: DataFrame containing the 'text' column with text documents to format.
- Returns:
- - A single string where all text documents are joined by a specific delimiter.
- """
- unique_txt = df["text"].tolist()
- return "--- --- \n --- --- ".join(unique_txt)
- def embed_cluster_summarize_texts(
- texts: List[str], level: int
- ) -> Tuple[pd.DataFrame, pd.DataFrame]:
- """
- Embeds, clusters, and summarizes a list of texts. This function first generates embeddings for the texts,
- clusters them based on similarity, expands the cluster assignments for easier processing, and then summarizes
- the content within each cluster.
- Parameters:
- - texts: A list of text documents to be processed.
- - level: An integer parameter that could define the depth or detail of processing.
- Returns:
- - Tuple containing two DataFrames:
- 1. The first DataFrame (`df_clusters`) includes the original texts, their embeddings, and cluster assignments.
- 2. The second DataFrame (`df_summary`) contains summaries for each cluster, the specified level of detail,
- and the cluster identifiers.
- """
- # Embed and cluster the texts, resulting in a DataFrame with 'text', 'embd', and 'cluster' columns
- df_clusters = embed_cluster_texts(texts)
- # Prepare to expand the DataFrame for easier manipulation of clusters
- expanded_list = []
- # Expand DataFrame entries to document-cluster pairings for straightforward processing
- for index, row in df_clusters.iterrows():
- for cluster in row["cluster"]:
- expanded_list.append(
- {"text": row["text"], "embd": row["embd"], "cluster": cluster}
- )
- # Create a new DataFrame from the expanded list
- expanded_df = pd.DataFrame(expanded_list)
- # Retrieve unique cluster identifiers for processing
- all_clusters = expanded_df["cluster"].unique()
- print(f"--Generated {len(all_clusters)} clusters--")
- # Summarization
- template = """Here is a sub-set of LangChain Expression Langauge doc.
-
- LangChain Expression Langauge provides a way to compose chain in LangChain.
-
- Give a detailed summary of the documentation provided.
-
- Documentation:
- {context}
- """
- prompt = ChatPromptTemplate.from_template(template)
- chain = prompt | model | StrOutputParser()
- # Format text within each cluster for summarization
- summaries = []
- for i in all_clusters:
- df_cluster = expanded_df[expanded_df["cluster"] == i]
- formatted_txt = fmt_txt(df_cluster)
- summaries.append(chain.invoke({"context": formatted_txt}))
- # Create a DataFrame to store summaries with their corresponding cluster and level
- df_summary = pd.DataFrame(
- {
- "summaries": summaries,
- "level": [level] * len(summaries),
- "cluster": list(all_clusters),
- }
- )
- return df_clusters, df_summary
- def recursive_embed_cluster_summarize(
- texts: List[str], level: int = 1, n_levels: int = 3
- ) -> Dict[int, Tuple[pd.DataFrame, pd.DataFrame]]:
- """
- Recursively embeds, clusters, and summarizes texts up to a specified level or until
- the number of unique clusters becomes 1, storing the results at each level.
- Parameters:
- - texts: List[str], texts to be processed.
- - level: int, current recursion level (starts at 1).
- - n_levels: int, maximum depth of recursion.
- Returns:
- - Dict[int, Tuple[pd.DataFrame, pd.DataFrame]], a dictionary where keys are the recursion
- levels and values are tuples containing the clusters DataFrame and summaries DataFrame at that level.
- """
- results = {} # Dictionary to store results at each level
- # Perform embedding, clustering, and summarization for the current level
- df_clusters, df_summary = embed_cluster_summarize_texts(texts, level)
- # Store the results of the current level
- results[level] = (df_clusters, df_summary)
- # Determine if further recursion is possible and meaningful
- unique_clusters = df_summary["cluster"].nunique()
- if level < n_levels and unique_clusters > 1:
- # Use summaries as the input texts for the next level of recursion
- new_texts = df_summary["summaries"].tolist()
- next_level_results = recursive_embed_cluster_summarize(
- new_texts, level + 1, n_levels
- )
- # Merge the results from the next level into the current results dictionary
- results.update(next_level_results)
- return results
- def create_retriever(path='Documents', extension="txt"):
- def preprocessing(path, extension):
- from langchain_community.document_loaders import DirectoryLoader
- loader = DirectoryLoader(path, glob=f'**/*.{extension}', show_progress=True)
- docs = loader.load()
- docs_texts = [d.page_content for d in docs]
- return docs_texts
- # Build tree
- leaf_texts = preprocessing(path, extension)
- results = recursive_embed_cluster_summarize(leaf_texts, level=1, n_levels=3)
- # Initialize all_texts with leaf_texts
- all_texts = leaf_texts.copy()
- # Iterate through the results to extract summaries from each level and add them to all_texts
- for level in sorted(results.keys()):
- # Extract summaries from the current level's DataFrame
- summaries = results[level][1]["summaries"].tolist()
- # Extend all_texts with the summaries from the current level
- all_texts.extend(summaries)
- # Now, use all_texts to build the vectorstore with Chroma
- embd = OpenAIEmbeddings()
- vectorstore = Chroma.from_texts(texts=all_texts, embedding=embd)
- retriever = vectorstore.as_retriever()
- return retriever
|