How it Works

This pipeline can use several Pathway connectors to read the data from the local drive, Google Drive, and Microsoft SharePoint sources. It allows you to poll the changes with low latency and to do the modifications tracking. So, if something changes in the tracked files, the corresponding change is reflected in the internal collections. The contents are read into a single Pathway Table as binary objects.

After that, those binary objects are parsed with “unstructured” library and split into chunks. With the usage of OpenAI API, the pipeline embeds the obtained chunks.

Finally, the embeddings are indexed with the capabilities of Pathway's machine-learning library. The user can then query the created index with simple HTTP requests to the endpoints mentioned above.

Understanding your RAG pipeline

This folder contains several objects:

  • app.py, the application code using Pathway and written in Python;

  • config.yaml, the file containing configuration stubs for the data sources, the OpenAI LLM model, and the web server. It needs to be customized if you want to change the LLM model, use the Google Drive data source or change the filesystem directories that will be indexed;

  • requirements.txt, the dependencies for your pipeline. It can be passed to pip install -r ... to install everything that is needed to launch the pipeline locally;

  • Dockerfile, the Docker configuration for running the pipeline in the container;

  • .env, a short environment variables configuration file where the OpenAI key must be stored;

  • data/, a folder with exemplary files that can be used for the test runs.

Let's understand your application code in app.py

Here in your app.py file you've followed a sequence of steps. Before looking at the code, let's give it a glance.

  1. Set Up Your License Key: You ensure you have the necessary access to Pathway features.

  2. Configure Logging: Set up logging to monitor what’s happening in your application.

  3. Load Environment Variables: Manage sensitive data securely.

  4. Define Data Sources Function: Handle data from various sources seamlessly.

  5. Main Function with Click: Use command-line interaction to control your pipeline.

  6. Initialize Embedder: Convert text to embeddings for further processing.

  7. Initialize Chat Model: Set up your language model for generating responses.

  8. Set Up Vector Store: Manage and retrieve document embeddings efficiently.

  9. Set Up RAG Application: Combine retrieval and generation for effective question answering.

  10. Build and Run Server: Start your server to handle real-time requests.

app.py file
import logging
import sys

import click
import pathway as pw
import yaml
from dotenv import load_dotenv
from pathway.udfs import DiskCache, ExponentialBackoffRetryStrategy
from pathway.xpacks.llm import embedders, llms, parsers, splitters
from pathway.xpacks.llm.question_answering import BaseRAGQuestionAnswerer
from pathway.xpacks.llm.vector_store import VectorStoreServer

# Step 1: Set Up Your License Key
# To access advanced features, get your free license key from https://pathway.com/features.
# Paste it below. For the community version, comment out the line below.
pw.set_license_key("demo-license-key-with-telemetry")

# Step 2: Configure Logging
# This sets up logging to capture and display log messages. It's helpful for debugging.
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(name)s %(levelname)s %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
)

# Step 3: Load Environment Variables
# Load variables from a .env file. This is useful for managing sensitive data like API keys.
load_dotenv()

# Step 4: Define Data Sources Function
# This function reads data from various sources like local files, Google Drive, or SharePoint.
def data_sources(source_configs) -> list[pw.Table]:
    sources = []
    for source_config in source_configs:
        if source_config["kind"] == "local":
            # Read local files.
            source = pw.io.fs.read(
                **source_config["config"],
                format="binary",
                with_metadata=True,
            )
            sources.append(source)
        elif source_config["kind"] == "gdrive":
            # Read files from Google Drive.
            source = pw.io.gdrive.read(
                **source_config["config"],
                with_metadata=True,
            )
            sources.append(source)
        elif source_config["kind"] == "sharepoint":
            try:
                import pathway.xpacks.connectors.sharepoint as io_sp
                # Read files from SharePoint.
                source = io_sp.read(**source_config["config"], with_metadata=True)
                sources.append(source)
            except ImportError:
                print(
                    "The Pathway Sharepoint connector is part of the commercial offering, "
                    "please contact us for a commercial license."
                )
                sys.exit(1)
    return sources

# Step 5: Define Main Function
# This function orchestrates the entire RAG pipeline using Click for command-line interaction.
@click.command()
@click.option("--config_file", default="config.yaml", help="Config file to be used.")
def run(config_file: str = "config.yaml"):
    # Load configuration settings from a YAML file.
    with open(config_file) as config_f:
        configuration = yaml.safe_load(config_f)

    # Extract the GPT model name from the configuration.
    GPT_MODEL = configuration["llm_config"]["model"]

    # Step 6: Initialize Embedder
    # The embedder converts text into embeddings, which are numerical representations of the text.
    embedder = embedders.OpenAIEmbedder(
        model="text-embedding-ada-002",
        cache_strategy=DiskCache(),
    )

    # Step 7: Initialize Chat Model
    # Set up the language model for chat with retry strategy and cache.
    chat = llms.OpenAIChat(
        model=GPT_MODEL,
        retry_strategy=ExponentialBackoffRetryStrategy(max_retries=6),
        cache_strategy=DiskCache(),
        temperature=0.05,
    )

    # Extract host and port configuration for the server.
    host_config = configuration["host_config"]
    host, port = host_config["host"], host_config["port"]

    # Step 8: Set Up Vector Store
    # The vector store server manages document embeddings and retrieval.
    doc_store = VectorStoreServer(
        *data_sources(configuration["sources"]),
        embedder=embedder,
        splitter=splitters.TokenCountSplitter(max_tokens=400),
        parser=parsers.ParseUnstructured(),
    )

    # Step 9: Set Up RAG Application
    # Combine retrieval and generation for question answering.
    rag_app = BaseRAGQuestionAnswerer(llm=chat, indexer=doc_store)

    # Step 10: Build and Run Server
    # Start the server to handle incoming requests.
    rag_app.build_server(host=host, port=port)
    rag_app.run_server(with_cache=True, terminate_on_error=False)

# If this script is executed directly, run the main function.
if __name__ == "__main__":
    run()

# Congratulations!
# You've set up the essential components of your RAG pipeline using Pathway.
# This setup allows you to read data from multiple sources, process it into embeddings,
# and use a GPT model to answer questions based on retrieved information.

Possible Modifications

  • Change Input Folders: Update paths to new data folders.

  • Modify LLM: Switch to a different language model

  • Change Embedder: Use an alternative embedder from embedders.

  • Update Index: Configure a different indexing method.

  • Host and Port: Adjust the host and port settings for different environments.

  • Run Options: Enable or disable caching and specify a new cache folder.

It is also possible to easily create new components by extending the pw.UDF class and implementing the `__wrapped__` function.

Conclusion

This demonstrates setting up a powerful RAG pipeline with always up-to-date knowledge. While we've only scratched the surface, there's more to explore:

  • Re-ranking: Prioritize the most relevant results for your specific query.

  • Knowledge Graphs: Leverage relationships between entities to improve understanding.

  • Hybrid Indexing: Combine different indexing strategies for optimal retrieval.

  • Adaptive Reranking: Iteratively enlarge the context for optimal accuracy, see our next tutorial around adaptive RAG.

Stay tuned for future examples exploring these RAG techniques with Pathway!

Enjoy building your RAG project! If you have any questions or need further assistance, feel free to reach out to the Pathway team or check with your peers from the bootcamp cohort.

What if you want to use a Multimodal LLM like GPT-4o

That's a great idea indeed. Multimodal LLMs like GPT-4o excel at parsing images, charts, etc. thereby significantly enhancing the accuracy for text-based use-cases as well.

For example, imagine if you're building a RAG project with Google Drive as a data source. But that Drive folder has several financial documents with charts, tables, etc. Below is an interesting example where you'll see how Pathway parsed tables as images and used GPT-4o to get a much more accurate response.

Last updated