How to Build Real-Time Data Pipelines with Kafka and MongoDB
The world is evolving at high speed, especially in technology, which continues to reshape workflows across industries and businesses. As these shifts accelerate, the application layer must operate at peak efficiency and move data instantly between different parts of a system. Pairing technologies such as Kafka and MongoDB is one way organizations enable applications to become more responsive, scalable, and real-time.
Why is this combination so effective? It closes a long-standing gap in integrated systems: the ability to stream millions of events per second while also supporting advanced querying and reliable long-term storage.
This pairing has been applied across a wide range of markets, including event-driven systems for financial transaction handling, IoT sensor collection, real-time activity monitoring, and adaptive inventory control.
Together, MongoDB and Kafka enable immediate, reactive, and persistent data architectures that depend on real-time processing with access to historical context.
Key Takeaways
- Kafka manages high-throughput event streaming and serves as the central event backbone, while MongoDB stores and queries data durably for both real-time and historical workloads.
- Using them together gives you real-time pipelines plus durable storage, allowing you to stream events while still running complex queries and analytics.
- Producers publish events to Kafka topics, consumers process the streams, and MongoDB collections persist the resulting data.
- This design is well suited to e-commerce order processing, AI agents, IoT ingestion, and any platform that needs live events together with long-term data retention.
- For production deployments, use Kafka’s idempotent producer, schema validation, and monitoring, along with managed Kafka and managed MongoDB services for easier scaling and operations.
Architectural Overview
The combined architecture of Kafka and MongoDB shows how modern systems deliver both real-time responsiveness and durable persistence. At the center of this model, Kafka acts as the nervous system of the data environment, capturing events as they happen, buffering them efficiently, and distributing them to different consumers that must respond immediately. This continuous movement of information allows streams to pass through the pipeline without losing events, ensuring that signals and insights remain intact.
Every part of the architecture plays a critical role in keeping the system responsive, reliable, and consistent.
1. Kafka Producers (Data Sources)
Producers can be compared to reporters in a fast-moving newsroom. They collect live streams of information such as transactions, sensor readings, user actions, and log entries, then send them to Kafka. These producers may be applications, backend services, or connected devices that continuously publish data. Their responsibility is to forward raw information to Kafka without needing to know who will consume it or how it will be used later, which keeps updates flowing steadily into the system.
2. Kafka Topics (Event Channels)
After data is produced, it is not sent to one fixed destination. Instead, it is written into Kafka topics, which behave like categorized channels or folders. Each topic groups similar event types together. One topic might contain payment activity, another user behavior, and another IoT telemetry. This organization allows different systems or teams to subscribe only to the events they need, much like viewers selecting different broadcast channels without interfering with each other.
3. Kafka Consumers (Processing Logic)
Consumers are the analysts and decision-makers in this architecture. They subscribe to relevant Kafka topics and handle incoming streams in real time. Depending on the use case, consumers may clean, transform, or enrich the data before acting on it or passing the results onward. They can be thought of as chefs who take raw ingredients from producers, apply a recipe through business logic, and produce a finished result that is meaningful and ready for use.
4. MongoDB Collections (Data Persistence)
Once data has been processed, the results are stored in MongoDB, which serves as a digital archive and knowledge base. Inside MongoDB, information is stored in collections, which can be viewed as organized shelves containing all processed and generated data. These collections provide durability and make it simple to retrieve historical records for trend analysis, reporting, or context-aware user experiences. MongoDB’s flexible document model makes it suitable for both structured and semi-structured data, complementing Kafka’s fluid event streams.
Data Flow for Kafka and MongoDB Setup
In practice, this architectural combination enables organizations to benefit from the immediacy of streaming data while preserving the reliability of durable storage. Following best practices, high-throughput communication between Kafka and MongoDB can be implemented through Kafka connectors with fault tolerance built in, replication across ingestion and storage layers, and carefully planned schema evolution strategies that maintain consistency as data models change. Together, Kafka and MongoDB create an architecture that is reactive, resilient, and ready for modern enterprise data demands.
Key Use Cases of MongoDB and Apache Kafka for AI Agents and More
Connecting Kafka and MongoDB significantly expands what modern and intelligent applications can achieve beyond processing static datasets. The result is a responsive digital ecosystem driven by a constant stream of real-world events.
This combination empowers AI agents by turning them into real-time decision systems that consume live event streams while preserving state for continuous learning. These agents can answer user requests by drawing on both current and historical data stored in MongoDB, making them highly effective for autonomous customer support.
When a customer sends a message, the event is immediately published to Kafka. An AI agent consumes that event, makes a context-aware decision using current and historical information from MongoDB, and responds within seconds. Each interaction and decision is also recorded in MongoDB, providing both immediate context and long-term learning material that helps the agent improve over time.
AI agents can also power recommendation engines that react instantly to changing user behavior. Other examples include fraud detection systems that evaluate transaction patterns in motion and predictive maintenance platforms that analyze IoT sensor data to anticipate failures before they occur.
Beyond AI agents, the combination of Kafka and MongoDB supports a wide range of industry use cases where rapid response to streaming data and deep historical analysis are both required:
- Supply chain management: Intelligent event streams can coordinate logistics dynamically by rerouting deliveries according to weather updates and traffic conditions while referencing past shipment data.
- Healthcare: Providers can benefit from real-time patient monitoring combined with comprehensive long-term health record analysis for personalized care.
- Gaming platforms: Event streams can be used to adapt gameplay and promotional offers instantly while relying on persistent player profiles.
- Financial institutions: AI-driven systems can execute time-sensitive trades while keeping full trade histories for compliance requirements.
- Social media platforms: Streaming analytics can support content moderation and personalized recommendations based on both live trends and long-term user behavior.
In short, while AI agents represent one of the strongest examples of the power of Kafka and MongoDB, the broader ecosystem includes scalable and dependable solutions for event-driven architectures across many industries. This integration supports both immediate insights and deep retrospective analysis, allowing organizations to stay responsive now while building intelligence for the future.
AI Agents Use Cases
Let’s look at a practical example of a real-time pipeline using Kafka and MongoDB in the context of an e-commerce order event.
For this example, Java is used as the programming language together with a managed MongoDB cluster hosted by a cloud provider, while the Kafka broker runs on cloud infrastructure.
Prerequisites
- Managed MongoDB cluster (cloud-based): A fully managed, scalable, and secure database environment. You will need the connection string and the CA certificate to establish secure access.
- Kafka broker setup (cloud-hosted): A Kafka broker running on a cloud server instance, ideally deployed through Docker containers for easier setup and management.
- Java development environment: Use Java 11 or later together with build tools such as Maven or Gradle to manage dependencies.
- Required client libraries: The Kafka and MongoDB Java driver dependencies are
org.apache.kafka:kafka-clientsand eitherorg.mongodb:mongodb-driver-syncororg.mongodb:mongodb-driver-reactivestreams.
Step-by-Step Installation and Setup
This guide uses Zookeeper for simplicity, but in production environments you should use Kafka’s KRaft mode, which eliminates the need for Zookeeper and is available in Kafka 3.3 and later.
Managed MongoDB Setup
- Sign in to your cloud provider’s control panel.
- Create a new database cluster and select MongoDB.
- Choose the desired version and cluster size. Select a suitable plan for testing or production and assign a unique name to the cluster. After configuring the required options, create the database cluster.
- After provisioning is complete, open the settings for network access or trusted sources and allow connections from your local IP address and, if needed, from the IP address of your Kafka server instance.
- Next, open the overview section, locate the connection details, select the connection string, and copy the URI.
Note: Download the CA certificate provided by the platform dashboard, because managed database connections typically require TLS encryption.
Kafka Broker on a Cloud Server
Create a cloud server instance running Ubuntu or another Linux distribution of your choice.
Install Docker if it is not already present:
sudo apt update && sudo apt install docker.io
Start Kafka and Zookeeper containers on the server:
docker network create kafka-net
docker run -d --name zookeeper --network kafka-net -p 2181:2181 zookeeper
docker run -d --name kafka --network kafka-net -p 9092:9092 \
-e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://<your_server_ip>:9092 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 confluentinc/cp-kafka:latest
Create the order-events topic on the Kafka instance:
docker exec -it kafka kafka-topics --create --topic order-events --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
Update the cloud firewall settings to permit inbound traffic on ports 2181 and 9092.
Java Project
Create a Spring Boot Maven project locally with the following structure:
Add dependencies to your pom.xml for Kafka clients and MongoDB drivers:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>
Use the managed MongoDB connection string to connect the consumer application to MongoDB, and configure the Kafka bootstrap servers with the IP address and port of your Kafka broker. Add the following to your Java project configuration, such as application.yml:
spring:
kafka:
bootstrap-servers: <server_ip_address>:9092
consumer:
group-id: order-consumer-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
data:
mongodb:
uri: mongodb+srv://<database_name>:<database_pass>@cluster.example.mongodb.net/<database_name>?retryWrites=true&w=majority
auto-index-creation: true
order:
topic: order-events
logging:
level:
org.springframework.data.mongodb: DEBUG
com.mongodb: DEBUG
Core Implementation: Building a Real-Time Pipeline
Now let’s move into the core implementation of this setup and see how a managed MongoDB cluster and a Kafka broker hosted in the cloud can be used to build an e-commerce order flow, from a Kafka producer to a Kafka consumer, with events stored in MongoDB.
1. Producer: Capturing New Orders for Real-Time Processing
In an e-commerce system, every time a customer places an order, that event must be captured and immediately shared with other parts of the platform for processing, such as inventory updates, payment validation, and shipping workflows.
The Kafka producer handles this first step. When the REST API receives a new order through the /orders endpoint, the producer serializes the order details into a JSON message and publishes it to a Kafka topic called order-events.
This separates order intake from downstream processing, making the architecture asynchronous, scalable, and fault tolerant.
public class KafkaProducerService {
@Value("${order.topic}")
private String topic;
private final KafkaTemplate<String, String> kafkaTemplate;
public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendOrderEvent(String orderJson) {
kafkaTemplate.send(topic, orderJson);
}
}
2. Consumer: Processing and Persisting Orders Efficiently
Downstream services that need order information subscribe as Kafka consumers to the same topic.
The Kafka consumer listens to order-events and receives each message in real time. After receiving the event, it deserializes the JSON payload back into an Order object and stores it in the managed MongoDB database, making the order durable and queryable.
The consumer also includes idempotency protection. Before inserting an order, it checks whether the order ID is already present, helping to avoid duplicates and preserve data integrity even when retries or duplicate events occur.
public class KafkaConsumerService {
@Autowired
private MongoTemplate mongoTemplate;
private final ObjectMapper objectMapper = new ObjectMapper();
@KafkaListener(topics = "${order.topic}", groupId = "${spring.kafka.consumer.group-id}")
public void consumeOrderEvent(String orderJson) throws Exception {
Order order = objectMapper.readValue(orderJson, Order.class);
try {
mongoTemplate.save(order);
System.out.println("Order saved: " + order.getOrderId());
} catch (DuplicateKeyException ex) {
// Duplicate orderId; safely skip
System.out.println("Duplicate Order skipped (DB constraint): " + order.getOrderId());
}
}
}
3. Defining the Schema Model
Next, define the model for the data that is published to Kafka and later persisted into MongoDB.
@Document(collection = "orders")
public class Order {
@Id
private ObjectId id;
@Indexed(unique = true)
private String orderId;
private String customerId;
private double amount;
}
4. Exposing an API for Producing Order Events
Now expose an endpoint through the controller so new orders can be submitted and the data flow into Kafka and MongoDB can be tested.
@RestController
@RequestMapping("/orders")
public class OrderController {
private final KafkaProducerService producerService;
private final ObjectMapper objectMapper = new ObjectMapper();
public OrderController(KafkaProducerService producerService) {
this.producerService = producerService;
}
@PostMapping
public ResponseEntity<String> createOrder(@RequestBody Order order) {
try {
String orderJson = objectMapper.writeValueAsString(order);
producerService.sendOrderEvent(orderJson);
return ResponseEntity.ok("Order event sent to Kafka");
} catch (Exception e) {
// In case of failed order event, set up a dead letter queue for retry of failed events.
return ResponseEntity.status(500).body("Failed to send order event");
}
}
}
5. Testing the Pipeline Flow
After starting the application and sending a test order through the endpoint, the event will be published to the Kafka topic. The consumer service listening to that topic will then process the message and store it in the orders collection within the managed MongoDB cluster.
You can verify the message in the Kafka topic on the server instance:
docker exec -it kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic order-events --from-beginning
Example output:
{"id":1,"orderId":"1001","customerId":"JohnFedrick","amount":150.2}
The consumer also processes this message from the topic and writes it to the managed MongoDB cluster in real time.
This architecture links real-time event streaming with durable storage. It scales naturally by allowing multiple consumers to process order events independently. By combining Kafka’s fault tolerance with MongoDB’s flexible document model, the e-commerce platform remains responsive and dependable. The producer-consumer pattern ensures that each order is captured immediately and stored reliably, while allowing the system to grow with additional processing requirements without creating tight coupling.
You can review the codebase on GitHub.
Advanced Topics and Best Practices
In production environments that combine Kafka and MongoDB for e-commerce or similar real-time systems, several important considerations help ensure data consistency, reliability, and efficient processing:
Ensuring Exactly-Once Processing
- Enable Kafka’s idempotent producer (
enable.idempotence=true) to reduce the risk of duplicate messages at the Kafka layer. - Use Kafka’s transactional APIs to bundle production and offset commits into a single atomic operation so messages are processed exactly once. This is particularly important in financial systems and order-processing scenarios.
Implementing Schema Validation
- Use schema registries such as Confluent Schema Registry together with formats like Avro or JSON Schema to enforce message structures, prevent corruption, and support schema evolution.
- Consumers should validate incoming messages against defined schemas to reduce errors and increase system resilience.
Distributed Tracing and Monitoring
- Use distributed tracing tools such as OpenTracing or Jaeger to follow message movement across Kafka, consumer services, and databases.
- Monitor Kafka metrics such as lag, throughput, and retries, along with MongoDB metrics such as connection count and query performance, through dashboards powered by Prometheus or Grafana.
Partitioning and Scaling Strategies
- Select partition keys carefully so load is distributed evenly while preserving ordering for related messages.
- Scale consumers horizontally to balance work across partitions and support higher throughput.
Security and Compliance
- Encrypt data both in transit and at rest.
- Apply proper access control, authentication, and authorization, such as Kafka ACLs and MongoDB role-based permissions.
FAQs
1. What is the MongoDB Kafka Connector?
The MongoDB Connector for Apache Kafka is a connector verified by Confluent that can operate both as a sink and as a source. In sink mode, it writes data from Kafka topics into MongoDB. In source mode, it publishes changes from MongoDB, such as through change streams, into Kafka topics. This guide uses a custom Java consumer to write from Kafka to MongoDB, but a fully connector-based setup can also be implemented using the MongoDB Kafka Connector documentation.
2. How Do You Connect Kafka to a Database Such as MongoDB?
You can connect Kafka to MongoDB either by using the official MongoDB Kafka Connector as a sink or source, or by creating a consumer application that reads events from Kafka topics and writes them into MongoDB, as demonstrated in the e-commerce example in this tutorial. Kafka Connect supports both source connectors for database-to-Kafka flows and sink connectors for Kafka-to-database workflows.
3. Can Apache Kafka Replace a Database?
No. Kafka should be understood as an event streaming system, not as a direct replacement for databases such as MongoDB, MySQL, or Elasticsearch. Its role is to transport, retain, and replay event streams. Databases, on the other hand, are designed for persistent storage, indexing, and querying under their own consistency and durability models. In real-time architectures, Kafka and MongoDB usually complement each other: Kafka handles the stream and buffering layer, while MongoDB stores the data long term and makes it queryable.
4. Is Kafka Used for Real-Time Data Streaming?
Yes. Apache Kafka is widely used to build real-time data streaming pipelines and applications. It supports fault-tolerant, high-throughput event streaming and integrates well with processing tools such as Kafka Streams. When combined with MongoDB, it provides real-time event flow together with durable, queryable storage.
5. When Should I Use Kafka with MongoDB?
Use Kafka together with MongoDB when you need to ingest or process large volumes of event streams while also storing results for the long term, running complex queries, or supporting both real-time and historical analytics. Typical use cases include e-commerce order handling, change data capture, event-driven microservices, and AI agents that need live event streams plus persistent state.
Conclusion
The steady flow of Kafka’s event streams combined with the flexible memory of MongoDB’s document store creates a powerful foundation for many use cases, including e-commerce, as shown in the example above. Building your own pipeline is much like assembling a high-performance engine: once all components are connected properly, it can power demanding applications smoothly and efficiently. Of course, actual performance will always depend on hardware, network conditions, and configuration choices.
Now it is your turn to bring this architecture to life. Experiment by creating your own Kafka-MongoDB integrations, adapt them to your specific data flows, and explore how they can be extended further with next-generation AI agents and intelligent platforms that add autonomy and insight to your event streams.
You can also make use of active community resources, share your experiments, and learn from others exploring the same path. Mastering Kafka and MongoDB is an ongoing journey that continues to evolve and offers many exciting opportunities.


