π Kafka-MongoDB-LLMInspect Pipeline Setup Guide
Welcome to your Kafka-MongoDB-LLMInspect pipeline setup guide! This guide will help you deploy the pipeline, configure environment variables, register the Kafka connector, update schema_mappings.json according to your custon schema and verify that everything is running smoothly.
π Provided Files
docker-compose.yml
β Defines and runs your Kafka, Zookeeper, and Consumer services.Dockerfile.kafka
β Builds a customized Kafka image.schema_mapping.json
β Maps your MongoDB document structure to be processed.env.example
β An example file listing all required environment variables (detailed below).
Here's the updated Prerequisites section with a clear mention of the MongoDB Replica Set Requirement without additional setup steps:
π Prerequisites
Before you begin, ensure you have the following installed and configured:
π³ Docker & Docker Compose
- Install Docker and Docker Compose to run the services.
- Verify installation:
ποΈ MongoDB Instance (Replica Set Required)
- A MongoDB database is required (external or containerized).
- MongoDB must be configured as a replica set for the Kafka MongoDB Source Connector to function properly.
- There should be at least user id (some unique way to identify user) and the prompt itself present in your MongoDB collection.
π Keycloak Authentication Server
- A Keycloak server must be running.
- You need to create a client in Keycloak to authenticate API requests. (Instructions below)
π§ Docker Compose Configuration
- Update the
extra_hosts
value in the consumer section of yourdocker-compose.yml
file if not present: - Replace
localhost
with the Common Name (CN) from your llminspect deploymentβs TLS certificate so that Docker maps that hostname to the host-gateway.
π§ Environment Variables Configuration
- In your
.env
file, update the following variables: OPENAI_REVERSE_PROXY
: Replace the placeholder with the Common Name (CN) from your llminspect deploymentβs TLS certificate.KEYCLOAK_HOST
: Replace the placeholder with the Common Name (CN) from your llminspect deploymentβs TLS certificate.
π Example .env
File (Copy & Rename to .env
)
# ==========================
# π§ LLMInspect Configuration
# ==========================
# Optional: If you're using a reverse proxy for OpenAI requests, specify its URL
OPENAI_REVERSE_PROXY=https://URL_OF_LLMInspect_DEPLOYMENT/v1
# ==========================
# π Kafka Configuration
# ==========================
# Kafka broker address (Use the container name inside Docker)
KAFKA_BROKERS=kafka:9092
# Kafka topic where MongoDB data will be published
KAFKA_TOPIC=mongo.prompts.kafka-test.prompts
# Port for Kafka Connect (Used to register and manage connectors)
KAFKA_CONNECT_PORT=8083
# External port to access Kafka from outside the Docker network
KAFKA_EXTERNAL_PORT=29092
# Zookeeper port for Kafka cluster management
ZOOKEEPER_PORT=2181
# ==========================
# ποΈ MongoDB Configuration
# ==========================
# Connection URI for MongoDB (Use container name if running inside Docker)
MONGO_URI=mongodb://mongodb-container:27017
# Database name in MongoDB to stream data from
MONGO_DATABASE=kafka-test
# Collection name in MongoDB that holds the documents to be streamed
MONGO_COLLECTION=prompts
# ==========================
# π Schema Mapping
# ==========================
# Path to the schema mapping file used by the consumer service
SCHEMA_MAPPING_FILE=schema_mapping.json
# ==========================
# πΉ Keycloak Configuration
# ==========================
KEYCLOAK_HOST=localhost
KEYCLOAK_PORT=4116
KEYCLOAK_REALM=InspectChat
KEYCLOAK_CLIENT_ID=llminspect-connect
KEYCLOAK_CLIENT_SECRET=Ey4Zib9 # Replace with actual client secret
# ==========================
# π§ LLMInspect Configuration
# ==========================
# Optional: If you're using a reverse proxy for OpenAI requests, specify its URL
OPENAI_REVERSE_PROXY=https://URL_OF_LLMInspect_DEPLOYMENT/v1
# ==========================
# π Kafka Configuration
# ==========================
# Kafka broker address (Use the container name inside Docker)
KAFKA_BROKERS=kafka:9092
# Kafka topic where MongoDB data will be published
KAFKA_TOPIC=mongo.prompts.kafka-test.prompts
# Port for Kafka Connect (Used to register and manage connectors)
KAFKA_CONNECT_PORT=8083
# External port to access Kafka from outside the Docker network
KAFKA_EXTERNAL_PORT=29092
# Zookeeper port for Kafka cluster management
ZOOKEEPER_PORT=2181
# ==========================
# ποΈ MongoDB Configuration
# ==========================
# Connection URI for MongoDB (Use container name if running inside Docker)
MONGO_URI=mongodb://mongodb-container:27017
# Database name in MongoDB to stream data from
MONGO_DATABASE=kafka-test
# Collection name in MongoDB that holds the documents to be streamed
MONGO_COLLECTION=prompts
# ==========================
# π Schema Mapping
# ==========================
# Path to the schema mapping file used by the consumer service
SCHEMA_MAPPING_FILE=schema_mapping.json
# ==========================
# πΉ Keycloak Configuration
# ==========================
KEYCLOAK_HOST=localhost
KEYCLOAK_PORT=4116
KEYCLOAK_REALM=InspectChat
KEYCLOAK_CLIENT_ID=llminspect-connect
KEYCLOAK_CLIENT_SECRET=Ey4Zib9 # Replace with actual client secret
π Setting Up Keycloak Client
To authenticate API requests, you must create a client in Keycloak. Follow these steps:
1οΈβ£ Access Keycloak Admin Console
-
Open your browser and navigate to:
(Replace<KEYCLOAK_HOST>
and<KEYCLOAK_PORT>
with the correct values from your.env
file.) -
Log in using your Keycloak admin username and password.
2οΈβ£ Create a New Client
-
In the Keycloak Admin Console, go to:
Realm Settings β Clients β Create
-
Enter the following values:
- Client ID:
llminspect-connect
- Client Type:
Confidential
- Root URL:
http://localhost:4116
(Adjust if needed) -
Valid Redirect URIs:
*
-
Click Save.
3οΈβ£ Configure Client Settings
- In the Client Settings, ensure the "Service Account Roles" option is enabled. This allows the client to use service account roles for authentication.
4οΈβ£ Configure Client Credentials
- In the Client Settings, go to the "Credentials" tab.
- Copy the Client Secret and replace the value in your
.env
file:
β Keycloak client is now configured! The Kafka pipeline will use this client for authentication.
π What is schema_mapping.json
?
The schema_mapping.json
file defines how the Kafka Consumer extracts the user_id
and prompt
from incoming MongoDB documents. Since database structures vary, this mapping ensures the correct fields are used dynamically.
π§ How to Configure schema_mapping.json
1οΈβ£ Identify the fields in your MongoDB documents that store user_id
and prompt
.
2οΈβ£ List all possible paths where these fields might exist.
3οΈβ£ Update schema_mapping.json
accordingly.
π Example:
If your MongoDB documents look like this:
Setschema_mapping.json
like this:
If your documents have a nested structure:
Then configure:π Final Step
π Ensure the paths in schema_mapping.json
match your MongoDB schema before running the pipeline!
π How to Use This Setup
1οΈβ£ Prepare Your Environment
- Step 1: Copy
env.example
to.env
- Step 2: Edit
.env
and replace placeholder values (your_openai_api_key
, etc.). - Step 3: Modify
schema_mapping.json
to match your MongoDB document structure if necessary.
2οΈβ£ Build and Launch Containers
- Run the following command to start all services:
- Verify services are running:
3οΈβ£ Register the MongoDB Kafka Connector
- Use the following cURL command to register the MongoDB Kafka Source Connector:
sudo curl -X POST http://localhost:8083/connectors \ -H "Content-Type: application/json" \ -d '{ "name": "mongo-source", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector", "connection.uri": ""YOUR_MONGODB_URI"", "database": ""YOUR_DB_NAME"", "collection": ""YOUR_COLLECTION_NAME"", "publish.full.document.only": "true", "topic.prefix": "mongo.prompts", "copy.existing": "true", "copy.existing.max.threads": "4", "copy.existing.batch.size": "1000" } }'
- Verify the connector status: β If the response shows "RUNNING", the connector is successfully streaming MongoDB data to Kafka! π
4οΈβ£ Verify the Pipeline Execution
π Check Kafka Messages
Use this command to consume messages from Kafka:
sudo docker exec -it <kafka_container> kafka-console-consumer \
--bootstrap-server kafka:9092 \
--topic mongo.prompts.kafka-test.prompts \
--from-beginning
π Check Consumer Logs
Verify if the consumer is picking up messages:
β If logs show OpenAI API requests, then messages are processed correctly.5οΈβ£ Stopping and Restarting Services
- Stop all containers:
- Restart all services:
π Troubleshooting
πΉ Problem: MongoDB Connector Fails to Start
Symptoms: The connector does not enter the "RUNNING" state. Solutions: - Check Kafka Connect logs:
- Verify MongoDB connectivity: - Check Docker networking: - Ensure MongoDB has a replica set initialized (if needed).πΉ Problem: No Messages in Kafka
Symptoms: Kafka topic mongo.prompts.kafka-test.prompts
has no messages.
Solutions:
- List Kafka topics:
πΉ Problem: Consumer Not Processing Messages
Symptoms: Consumer logs do not show OpenAI API requests. Solutions: - Check consumer logs:
- Restart consumer:β Summary
1οΈβ£ Files Provided: docker-compose.yml
, Dockerfile.kafka
, schema_mapping.json
, env.example
2οΈβ£ Prerequisites: Docker, MongoDB instance, OpenAI API key
3οΈβ£ Environment Variables: Explained in .env
file
4οΈβ£ Setup Flow:
- Update .env
and schema_mapping.json
- Run docker compose up -d
- Register Kafka Connector
- Verify Kafka messages and consumer logs
5οΈβ£ Troubleshooting: Steps for resolving connector, Kafka, and consumer issues