Skip to content

πŸš€ Kafka-MongoDB-LLMInspect Pipeline Setup Guide

Welcome to your Kafka-MongoDB-LLMInspect pipeline setup guide! This guide will help you deploy the pipeline, configure environment variables, register the Kafka connector, update schema_mappings.json according to your custon schema and verify that everything is running smoothly.


πŸ“ Provided Files

  • docker-compose.yml – Defines and runs your Kafka, Zookeeper, and Consumer services.
  • Dockerfile.kafka – Builds a customized Kafka image.
  • schema_mapping.json – Maps your MongoDB document structure to be processed.
  • env.example – An example file listing all required environment variables (detailed below).

Here's the updated Prerequisites section with a clear mention of the MongoDB Replica Set Requirement without additional setup steps:


πŸ“Œ Prerequisites

Before you begin, ensure you have the following installed and configured:

🐳 Docker & Docker Compose

  • Install Docker and Docker Compose to run the services.
  • Verify installation:
    docker -v && docker compose version
    

πŸ—„οΈ MongoDB Instance (Replica Set Required)

  • A MongoDB database is required (external or containerized).
  • MongoDB must be configured as a replica set for the Kafka MongoDB Source Connector to function properly.
  • There should be at least user id (some unique way to identify user) and the prompt itself present in your MongoDB collection.

πŸ”‘ Keycloak Authentication Server

  • A Keycloak server must be running.
  • You need to create a client in Keycloak to authenticate API requests. (Instructions below)

πŸ”§ Docker Compose Configuration

  • Update the extra_hosts value in the consumer section of your docker-compose.yml file if not present:
    extra_hosts:
      - "localhost:host-gateway"
    
  • Replace localhost with the Common Name (CN) from your llminspect deployment’s TLS certificate so that Docker maps that hostname to the host-gateway.

πŸ”§ Environment Variables Configuration

  • In your .env file, update the following variables:
  • OPENAI_REVERSE_PROXY: Replace the placeholder with the Common Name (CN) from your llminspect deployment’s TLS certificate.
  • KEYCLOAK_HOST: Replace the placeholder with the Common Name (CN) from your llminspect deployment’s TLS certificate.

πŸ“‹ Example .env File (Copy & Rename to .env)

# ==========================
# πŸ”§ LLMInspect Configuration
# ==========================

# Optional: If you're using a reverse proxy for OpenAI requests, specify its URL
OPENAI_REVERSE_PROXY=https://URL_OF_LLMInspect_DEPLOYMENT/v1

# ==========================
# πŸ”Œ Kafka Configuration
# ==========================

# Kafka broker address (Use the container name inside Docker)
KAFKA_BROKERS=kafka:9092

# Kafka topic where MongoDB data will be published
KAFKA_TOPIC=mongo.prompts.kafka-test.prompts

# Port for Kafka Connect (Used to register and manage connectors)
KAFKA_CONNECT_PORT=8083

# External port to access Kafka from outside the Docker network
KAFKA_EXTERNAL_PORT=29092

# Zookeeper port for Kafka cluster management
ZOOKEEPER_PORT=2181

# ==========================
# πŸ—„οΈ MongoDB Configuration
# ==========================

# Connection URI for MongoDB (Use container name if running inside Docker)
MONGO_URI=mongodb://mongodb-container:27017

# Database name in MongoDB to stream data from
MONGO_DATABASE=kafka-test

# Collection name in MongoDB that holds the documents to be streamed
MONGO_COLLECTION=prompts

# ==========================
# πŸ” Schema Mapping
# ==========================

# Path to the schema mapping file used by the consumer service
SCHEMA_MAPPING_FILE=schema_mapping.json

# ==========================
# πŸ”Ή Keycloak Configuration
# ==========================
KEYCLOAK_HOST=localhost
KEYCLOAK_PORT=4116
KEYCLOAK_REALM=InspectChat
KEYCLOAK_CLIENT_ID=llminspect-connect
KEYCLOAK_CLIENT_SECRET=Ey4Zib9  # Replace with actual client secret

πŸ”‘ Setting Up Keycloak Client

To authenticate API requests, you must create a client in Keycloak. Follow these steps:

1️⃣ Access Keycloak Admin Console

  • Open your browser and navigate to:

    http://<KEYCLOAK_HOST>:<KEYCLOAK_PORT>/admin
    
    (Replace <KEYCLOAK_HOST> and <KEYCLOAK_PORT> with the correct values from your .env file.)

  • Log in using your Keycloak admin username and password.

2️⃣ Create a New Client

  • In the Keycloak Admin Console, go to: Realm Settings β†’ Clients β†’ Create

  • Enter the following values:

  • Client ID: llminspect-connect
  • Client Type: Confidential
  • Root URL: http://localhost:4116 (Adjust if needed)
  • Valid Redirect URIs: *

  • Click Save.

3️⃣ Configure Client Settings

  • In the Client Settings, ensure the "Service Account Roles" option is enabled. This allows the client to use service account roles for authentication.

4️⃣ Configure Client Credentials

  • In the Client Settings, go to the "Credentials" tab.
  • Copy the Client Secret and replace the value in your .env file:
    KEYCLOAK_CLIENT_SECRET=your_client_secret_here
    

βœ… Keycloak client is now configured! The Kafka pipeline will use this client for authentication.


πŸ“„ What is schema_mapping.json?

The schema_mapping.json file defines how the Kafka Consumer extracts the user_id and prompt from incoming MongoDB documents. Since database structures vary, this mapping ensures the correct fields are used dynamically.


πŸ”§ How to Configure schema_mapping.json

1️⃣ Identify the fields in your MongoDB documents that store user_id and prompt. 2️⃣ List all possible paths where these fields might exist. 3️⃣ Update schema_mapping.json accordingly.


πŸ“Œ Example:

If your MongoDB documents look like this:

{
    "user_id": "12345",
    "query": "Tell me a joke!"
}
Set schema_mapping.json like this:
{
    "user_id_path": ["user_id"],
    "prompt_path": ["query"]
}

If your documents have a nested structure:

{
    "metadata": {
        "user": {
            "id": "67890"
        }
    },
    "data": {
        "text": "What is the capital of France?"
    }
}
Then configure:
{
    "user_id_path": ["metadata.user.id"],
    "prompt_path": ["data.text"]
}


πŸš€ Final Step

πŸ“Œ Ensure the paths in schema_mapping.json match your MongoDB schema before running the pipeline!

πŸƒ How to Use This Setup

1️⃣ Prepare Your Environment

  • Step 1: Copy env.example to .env
    cp env.example .env
    
  • Step 2: Edit .env and replace placeholder values (your_openai_api_key, etc.).
  • Step 3: Modify schema_mapping.json to match your MongoDB document structure if necessary.

2️⃣ Build and Launch Containers

  • Run the following command to start all services:
    sudo docker compose up --build -d
    
  • Verify services are running:
    sudo docker ps
    

3️⃣ Register the MongoDB Kafka Connector

  • Use the following cURL command to register the MongoDB Kafka Source Connector:
    sudo curl -X POST http://localhost:8083/connectors \
         -H "Content-Type: application/json" \
         -d '{
             "name": "mongo-source",
             "config": {
                 "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
                 "connection.uri": ""YOUR_MONGODB_URI"",
                 "database": ""YOUR_DB_NAME"",
                 "collection": ""YOUR_COLLECTION_NAME"",
                 "publish.full.document.only": "true",
                 "topic.prefix": "mongo.prompts",
                 "copy.existing": "true",
                 "copy.existing.max.threads": "4",
                 "copy.existing.batch.size": "1000"
             }
         }'
    
  • Verify the connector status:
    sudo curl -X GET http://localhost:8083/connectors/mongo-source/status
    
    βœ… If the response shows "RUNNING", the connector is successfully streaming MongoDB data to Kafka! πŸŽ‰

4️⃣ Verify the Pipeline Execution

πŸ›  Check Kafka Messages

Use this command to consume messages from Kafka:

sudo docker exec -it <kafka_container> kafka-console-consumer \
    --bootstrap-server kafka:9092 \
    --topic mongo.prompts.kafka-test.prompts \
    --from-beginning
βœ… If you see MongoDB documents, the data is correctly streaming to Kafka!

πŸ›  Check Consumer Logs

Verify if the consumer is picking up messages:

sudo docker logs <consumer_container> --tail 50
βœ… If logs show OpenAI API requests, then messages are processed correctly.


5️⃣ Stopping and Restarting Services

  • Stop all containers:
    sudo docker compose down
    
  • Restart all services:
    sudo docker compose up -d
    

πŸ” Troubleshooting

πŸ”Ή Problem: MongoDB Connector Fails to Start

Symptoms: The connector does not enter the "RUNNING" state. Solutions: - Check Kafka Connect logs:

sudo docker logs <kafka_connect_container> --tail 50
- Verify MongoDB connectivity:
sudo docker exec -it <mongodb_container> mongosh --eval "db.runCommand({ping: 1})"
- Check Docker networking:
sudo docker network inspect kafka-net
- Ensure MongoDB has a replica set initialized (if needed).


πŸ”Ή Problem: No Messages in Kafka

Symptoms: Kafka topic mongo.prompts.kafka-test.prompts has no messages. Solutions: - List Kafka topics:

sudo docker exec -it <kafka_container> kafka-topics --bootstrap-server kafka:9092 --list
- Verify Kafka Connector Status:
sudo curl -X GET http://localhost:8083/connectors/mongo-source/status


πŸ”Ή Problem: Consumer Not Processing Messages

Symptoms: Consumer logs do not show OpenAI API requests. Solutions: - Check consumer logs:

sudo docker logs <consumer_container> --tail 50
- Restart consumer:
sudo docker restart <consumer_container>


βœ… Summary

1️⃣ Files Provided: docker-compose.yml, Dockerfile.kafka, schema_mapping.json, env.example 2️⃣ Prerequisites: Docker, MongoDB instance, OpenAI API key 3️⃣ Environment Variables: Explained in .env file 4️⃣ Setup Flow: - Update .env and schema_mapping.json - Run docker compose up -d - Register Kafka Connector - Verify Kafka messages and consumer logs 5️⃣ Troubleshooting: Steps for resolving connector, Kafka, and consumer issues