Deploying a Coreflux MQTT Broker with Managed Databases for IoT Data Processing
MQTT brokers connect IoT devices and applications through a publish-subscribe messaging model, which makes them a key component of modern IoT infrastructure. Coreflux is a low-code MQTT broker that extends this foundation with real-time data processing and transformation features, enabling direct integration with managed databases such as MongoDB, PostgreSQL, MySQL, and OpenSearch without the need to build custom integration logic.
What You’ll Learn
This guide explains how to deploy a complete IoT data pipeline, from provisioning a managed database cluster and a Coreflux MQTT broker in a cloud environment, to configuring secure private networking, creating data transformation models with Coreflux’s Language of Things (LoT), and automatically saving processed IoT data in the database you choose. By the end, you will have a production-ready environment for real-time messaging and persistent IoT data storage.
Key Takeaways
Before starting the deployment process, here are the main outcomes of this guide:
- Deploy a managed database cluster such as PostgreSQL, MongoDB, MySQL, or OpenSearch for scalable IoT data storage.
- Install the Coreflux MQTT broker on a cloud virtual machine by using either a marketplace image or Docker.
- Create secure private networking so the MQTT broker and database can communicate without public exposure.
- Build real-time data pipelines with Coreflux’s Language of Things (LoT) for low-code IoT automation.
- Transform and store IoT data automatically from MQTT topics into database tables, collections, or indexes.
- Validate end-to-end data flow from simulated sensors through transformation models into long-term database storage.
This guide establishes a production-ready starting point for IoT applications that require real-time messaging, durable storage, and advanced features such as search, analytics, or relational queries.
What You Will Build
By the end of this automation guide, you will have deployed:
- A managed database cluster such as PostgreSQL, MongoDB, MySQL, or OpenSearch for scalable storage
- A cloud virtual machine running the Coreflux MQTT broker
- A Virtual Private Cloud (VPC) network for secure IoT communication
- Real-time data simulation by using the LoT Notebook extension
- Low-code data transformation models and database integration routes
- A complete data integration and transformation pipeline for IoT automation
Coreflux and Cloud Platform Integration
Coreflux delivers a lightweight MQTT broker and data pipeline tools through the Language of Things programming language for efficient IoT communication in cloud environments.
What Is MQTT?
MQTT, which stands for Message Queuing Telemetry Transport, is a lightweight publish-subscribe network protocol that is widely used in IoT ecosystems. It was designed for constrained devices and for networks with limited bandwidth, high latency, or unreliable connectivity. Because of this, MQTT supports efficient real-time messaging even in environments where bandwidth is restricted.
About Coreflux
Coreflux provides a lightweight MQTT broker that supports efficient real-time communication between IoT devices and applications, while also offering real-time data transformation features tailored to each specific scenario. Built for scalability and reliability, Coreflux is suited to environments where low latency and high throughput are essential.
Coreflux manages message routing and data flow between devices, whether the deployment is a small IoT project or a large industrial monitoring platform.
With Coreflux in a cloud environment, you gain:
- Data Processing: Centralized data processing where the data is stored, enabling real-time handling close to the source of truth.
- Data Integration: Straightforward integration with managed database services such as PostgreSQL, MongoDB, MySQL, or OpenSearch, creating one consistent ecosystem for data storage and processing.
- Scalability: The ability to manage increasing numbers of devices and rising data volumes without sacrificing performance.
- Reliability: Stable and dependable messaging across all connected devices.
Prerequisites
Before beginning this MQTT broker deployment guide, you will need:
- A cloud account with billing enabled
- A general understanding of MQTT concepts and IoT architecture
- Visual Studio Code for the LoT Notebook extension
Estimated time: Completing this guide usually takes around 30 to 45 minutes, depending on how long database provisioning takes, which is typically between 1 and 5 minutes per database cluster.
Step 1 — Creating the Network Infrastructure for IoT Automation
Creating a VPC Network for Secure MQTT Communication
Start by creating a Virtual Private Cloud (VPC) so your IoT services and MQTT broker can communicate securely without relying on public access.
- Sign in to your cloud control panel.
- Open the networking section and navigate to VPC.
- Select the option to create a new VPC network.
Configure your VPC for IoT automation with settings such as:
- Name: coreflux-integrations-vpc or another suitable VPC name
- Datacenter region: Choose Frankfurt or your preferred region
- IP Range: Use the default range or customize it if required
- Description: Add a meaningful description for the MQTT broker and database network
Then create the VPC network.
The VPC provides isolated networking for all IoT resources, ensuring secure communication between the Coreflux MQTT broker and managed databases.
Step 2 — Setting Up a Managed Database for Scalable Storage
Select one of the following database options according to your IoT application’s needs:
- PostgreSQL: Best for structured data that requires relational queries, ACID compliance, and complex relationships
- MySQL: Well suited to structured workloads and transactional queries with strong consistency and broad tooling support
- MongoDB: Ideal for flexible document storage when schemas vary and development needs to move quickly
- OpenSearch: A strong choice for advanced search, analytics, log analysis, and time-series visualization
Setting Up a Managed PostgreSQL Database
Managed PostgreSQL is a strong option when IoT workloads require relational schemas, strong consistency, and advanced SQL analysis, along with managed backups, monitoring, and maintenance.
- In the cloud control panel, go to the databases section.
- Choose the option to create a database cluster.
Configure the PostgreSQL cluster for IoT automation:
- Database engine: PostgreSQL
- Version: The latest stable release
- Datacenter region: Frankfurt or the same region as the VPC
- VPC Network: Select the VPC you created earlier
- Database cluster name: postgresql-coreflux-test
- Project: Select the destination project
Choose a plan based on your workload:
- For development: A basic plan with 1 GB RAM
- For production: A general purpose plan or higher for scalable storage
Create the database cluster.
The managed database setup usually takes 1 to 5 minutes. After completion, you are redirected to the database overview page, where connection details and administrative controls are available.
Configuring PostgreSQL Database Access for MQTT Broker Integration
You will usually see a getting started workflow that displays connection information and allows you to configure inbound access rules. It is recommended to restrict access to your IP address and private network only.
- Open the getting started setup for the PostgreSQL database
- Optional: Restrict inbound connections
- Add your local computer’s IP for management access
- The virtual machine will typically be allowed through private networking
You will typically see two connection options: Public Network and VPC Network. The public endpoint is intended for external tools such as DBeaver, while the private VPC endpoint is the one Coreflux should use.
Record the provided connection details for both public and VPC access, which are usually different:
- Host: The database hostname
- User: The default administrative user
- Password: An automatically generated secure password
- Database: The authentication database name
Creating a PostgreSQL Application Database and User (Optional)
For better structure and stronger security, create a dedicated database and user for the IoT automation application. This can be done through DBeaver or the command line, but many cloud platforms also offer an easier graphical interface.
- Open the Users & Databases tab in the managed database cluster
- Create a user:
- Username: coreflux-broker-client
- Password: Auto-generated
- Create a database:
- Database name: coreflux-broker-data
Note: You may need to adjust user permissions so the account can create tables and insert or query data. In PostgreSQL, grant the required privileges with:
GRANT CREATE, INSERT, SELECT ON DATABASE coreflux-broker-data TO coreflux-broker-client;
For MySQL, use:
GRANT CREATE, INSERT, SELECT ON coreflux-broker-data.* TO 'coreflux-broker-client'@'%';
Setting Up a Managed MySQL Database
Managed MySQL is a suitable option for structured, transactional IoT data when you want familiar SQL syntax, broad ecosystem support, and a managed service that handles backups, updates, and monitoring.
- In the cloud control panel, go to the databases section.
- Choose the option to create a database cluster.
Configure the MySQL cluster for IoT automation:
- Database engine: MySQL
- Version: The latest stable release
- Datacenter region: Frankfurt or the same region as your VPC
- VPC Network: Select the VPC you created
- Database cluster name: mysql-coreflux-test
- Project: Select the intended project
Choose a plan based on your IoT requirements:
- For development: A basic plan with 1 GB RAM
- For production: A general purpose plan or higher for scalable storage
Create the database cluster.
The managed database setup generally takes between 1 and 5 minutes. After that, the overview page displays your connection details and management options.
Configuring MySQL Database Access for MQTT Broker Integration
You will typically be shown a getting started workflow that displays the connection details and lets you define inbound access rules. It is recommended to restrict access to your own IP address and private network only.
- Open the getting started configuration for the MySQL database
- Optional: Restrict inbound connections
- Add your local IP address for management access
- The virtual machine should be permitted over private networking
You will generally see two connection choices: Public Network and VPC Network. Public access is intended for external tools such as DBeaver, while the VPC endpoint is what Coreflux should use.
Record the connection details for both public and VPC access, since they differ:
- Host: The database hostname
- User: The default administrative user
- Password: An automatically generated secure password
- Database: The authentication database name
Creating a MySQL Application Database and User (Optional)
For better security and organization, create a dedicated application database and user. This can be done through DBeaver or the command line, but many platforms also provide a simple interface for it.
- Open the Users & Databases tab in the managed database cluster
- Create a user:
- Username: coreflux-broker-client
- Password: Auto-generated
- Create a database:
- Database name: coreflux-broker-data
Setting Up a Managed MongoDB Database
Managed MongoDB is a good match for flexible or changing IoT payloads because it allows you to store different sensor documents without forcing a rigid schema, while the platform manages replication, backups, and monitoring.
Creating a Managed MongoDB Cluster
- In the cloud control panel, open the databases section.
- Select the option to create a database cluster.
Configure the MongoDB cluster for IoT automation:
- Database engine: MongoDB
- Version: The latest stable release
- Datacenter region: Frankfurt or the same region as your VPC
- VPC Network: Select the VPC you created
- Database cluster name: mongodb-coreflux-test
- Project: Select the target project
Choose your plan based on workload requirements:
- For development: A basic plan with 1 GB RAM
- For production: A general purpose plan or higher for scalable storage
Create the database cluster.
The managed database setup process usually takes 1 to 5 minutes. After it is ready, you are redirected to the overview page with connection information and management options.
Configuring MongoDB Database Access for MQTT Broker Integration
A getting started workflow will generally display the connection details and allow you to define inbound access restrictions. Limiting access to your IP address and private network is recommended.
- Open the getting started configuration for the MongoDB database
- Optional: Restrict inbound connections
- Add your local computer’s IP for management access
- The virtual machine should be permitted automatically through private networking
For connection details, you will usually see two options: Public Network and VPC Network. Public access is used for external tools such as MongoDB Compass, while Coreflux should use the private VPC connection.
Record the connection details for both public and private access:
- Host: The database hostname
- User: The default administrative user
- Password: An automatically generated secure password
- Database: The authentication database name
Creating a MongoDB Application Database and User (Optional)
For better structure and stronger security, create a dedicated application user and database. This can be done through MongoDB Compass or the command line, though many cloud platforms provide a simpler interface.
- Open the Users & Databases tab in the managed database cluster
- Create a user:
- Username: coreflux-broker-client
- Password: Auto-generated
- Create a database:
- Database name: coreflux-broker-data
Setting Up a Managed OpenSearch Database
Managed OpenSearch is intended for search, log analytics, and time-series dashboards built on top of high-volume IoT data, while the service takes care of cluster health, scaling, and index storage.
Create a Managed OpenSearch Cluster
- In the cloud control panel, open the databases section.
- Select the option to create a database cluster.
Configure the OpenSearch cluster for IoT automation:
- Database engine: OpenSearch
- Version: The latest stable release
- Datacenter region: Frankfurt or the same region as the VPC
- VPC Network: Select the VPC you created earlier
- Database cluster name: opensearch-coreflux-test
- Project: Select the target project
Choose a plan according to your IoT requirements:
- For development: A basic plan with 1 GB RAM
- For production: A general purpose plan or higher for scalable storage
Create the database cluster.
The managed database creation process generally takes 1 to 5 minutes. Once completed, the overview page displays connection details and management options.
Configuring OpenSearch Database Access for MQTT Broker Integration
You will usually be guided through a getting started workflow where connection details are shown and inbound access rules can be configured. It is recommended to restrict access to your own IP address and private network only.
- Open the getting started configuration for the OpenSearch database
- Optional: Restrict inbound connections
- Add your local IP address for management access
- The virtual machine should be permitted through private networking
For connection details, you will usually see two endpoints: Public Network and VPC Network. Public access is used for external tools, while the Coreflux service should use the VPC endpoint. You will also be given the URL and parameters for OpenSearch Dashboards access.
Record the connection information for both public and VPC access:
- Host: The database hostname
- User: The default administrative user
- Password: An automatically generated secure password
- Database: The authentication database name
Step 3 — Deploying the Coreflux MQTT Broker on a Cloud Virtual Machine
Creating the Cloud Virtual Machine
- Navigate to the virtual machine or compute instance section in your cloud control panel.
- Select the option to create a new virtual machine.
Configure the instance for MQTT broker deployment:
- Choose Region: Frankfurt or the same region as the managed database
- VPC Network: Select coreflux-integrations-vpc
- Choose an image: Open the marketplace tab
- Search for Coreflux and choose the Coreflux marketplace image
Choose the instance size according to the workload:
- For development: A basic plan with 2 GB memory
- For production: A basic or general purpose plan with 4 GB RAM or more for scalable performance
Choose the authentication method:
- SSH Key: Recommended for stronger security
- You can create a key locally with
ssh-keygen
- You can create a key locally with
- Password: An alternative option
Finalize the details:
- Hostname: coreflux-test-broker
- Project: Select your project
- Tags: Add relevant tags for organization
Create the virtual machine.
Open the instance home page and wait until deployment is complete.
Alternative — Installing the Coreflux MQTT Broker with Docker
Instead of using the Coreflux marketplace image, you can select a Docker-ready marketplace image and deploy Coreflux manually.
Once the instance is running, connect through SSH using the chosen authentication method or through the web console available on the instance page:
ssh root@your-droplet-ip
Run the Coreflux MQTT broker in Docker:
docker run -d \
--name coreflux \
-p 1883:1883 \
-p 1884:1884 \
-p 5000:5000 \
-p 443:443 \
coreflux/coreflux-mqtt-broker-t:1.6.3
This Docker command:
- Starts the container in detached mode with
-d - Assigns the name
corefluxto the container - Publishes the required ports for MQTT and the web interface
- Uses the current Coreflux image version specified in the command
Verify that the MQTT broker is running:
docker ps
You should see a running container.
Validating the MQTT Broker Deployment
You can test access to the MQTT broker with a client such as MQTT Explorer, regardless of whether you deployed it through a marketplace image or Docker.
Step 4 — Configuring Firewall Rules for Secure IoT Communication (Optional)
For production IoT automation deployments, configure firewall rules so that access remains restricted.
- Open the networking section and navigate to firewalls.
- Create a new firewall.
Configure inbound rules for MQTT broker security:
- SSH: Port 22 from your IP address
- MQTT: Port 1883 from approved IoT application sources
- MQTT with TLS: Port 1884 for secure MQTT over TLS
- WebSocket: Port 5000 for MQTT over WebSocket
- WebSocket with TLS: Port 443 for MQTT over WebSocket secured by TLS
Apply the firewall to the virtual machine.
Production tip: Limit access to port 1883 to specific source IP addresses or VPC ranges and prefer port 1884 for external device connections. If additional security layers are required, use private networking and controlled access paths such as VPNs or bastion hosts.
Step 5 — Setting Up IoT Data Integration with Coreflux’s Language of Things
Installing the LoT Notebook Extension
The LoT Notebook extension for Visual Studio Code provides an integrated low-code development environment for MQTT broker programming and IoT automation.
- Open Visual Studio Code.
- Go to Extensions with
Ctrl+Shift+X. - Search for LoT Notebooks.
- Install the LoT VSCode Notebooks Extension by Coreflux.
Connecting to Your MQTT Broker
Configure the connection to the Coreflux MQTT broker with the default credentials when prompted in the top bar or by using the MQTT button in the lower bar on the left:
- User: root
- Password: coreflux
If the connection is successful, the status bar in the lower-left corner will show the MQTT connectivity state for the broker.
Step 6 — Creating Data in the MQTT Broker Through Actions
For this scenario, the goal is to build an integration that takes raw data through a transformation pipeline and stores it in a database. Because the demonstration is not connected to physical MQTT devices, LoT can be used to simulate device data through an Action.
In LoT, an Action is executable logic triggered by events such as time intervals, topic updates, or explicit calls from other actions or system components. Actions make it possible to interact dynamically with MQTT topics, internal variables, and payloads, which supports more advanced IoT automation workflows.
This means an Action can generate data in selected topics at a defined time interval, and that data can then be used by the rest of the pipeline described below.
Generating Simulated IoT Data
Create an Action that generates simulated sensor data by using the low-code LoT interface:
DEFINE ACTION RANDOMIZEMachineData
ON EVERY 10 SECONDS DO
PUBLISH TOPIC "raw_data/machine1" WITH RANDOM BETWEEN 0 AND 10
PUBLISH TOPIC "raw_data/station2" WITH RANDOM BETWEEN 0 AND 60
The notebook also includes an Action that uses an increasing counter to simulate data as an alternative to the random generator shown above.
When you run this Action, it will:
- Deploy automatically to the MQTT broker
- Generate simulated IoT sensor values every 10 seconds
- Publish real-time data to specific MQTT topics
- Show synchronization status in the LoT Notebook interface
This status indicates whether the code in the LoT Notebook differs from what is currently running in the broker, or whether it has not yet been deployed at all.
Step 7 — Creating Data Transformation Models for Real-Time Processing
Defining Data Models with Language of Things
In Coreflux, Models are used to transform, aggregate, and calculate values from input MQTT topics, then publish the results to new topics. They form the basis for building the Unified Namespace, or UNS, of the system and can be applied across several data sources.
This means a Model defines how raw IoT data should be structured and transformed, whether for one device or many devices at once by using the wildcard +. A model also acts as the key schema for storing data in the managed database at scale.
DEFINE MODEL MachineData WITH TOPIC "Simulator/Machine/+/Data"
ADD "energy" WITH TOPIC "raw_data/+" AS TRIGGER
ADD "energy_wh" WITH (energy * 1000)
ADD "production_status" WITH (IF energy > 5 THEN "active" ELSE "inactive")
ADD "production_count" WITH (IF production_status EQUALS "active" THEN (production_count + 1) ELSE 0)
ADD "stoppage" WITH (IF production_status EQUALS "inactive" THEN 1 ELSE 0)
ADD "maintenance_alert" WITH (IF energy > 50 THEN TRUE ELSE FALSE)
ADD "timestamp" WITH TIMESTAMP "UTC"
This low-code model:
- Uses the wildcard
+so the same logic applies automatically to all machines - Converts energy values to watt-hours by multiplying by 1000
- Defines production status based on energy thresholds
- Tracks production counts and stoppage events
- Adds timestamps to each real-time data point
- Extracts the machine identifier from the topic structure
- Publishes structured data to the
Simulator/Machine/Datatopics by replacing the wildcard with each matching source topic
Because the Action generated two simulated sensors or machines, the Model automatically applies the structure to both of them, producing both a JSON object and separate individual topics.
Step 8 — Setting Up Database Integration for Scalable Storage
Use the database integration section that matches the database you selected in Step 2.
PostgreSQL Integration
In this section, you will configure processed IoT data to be stored in a managed PostgreSQL database.
To save processed IoT data in PostgreSQL, define a Route in Coreflux. Routes describe how data moves from the MQTT broker to the PostgreSQL cluster by using a simple low-code configuration:
DEFINE ROUTE PostgreSQL_Log WITH TYPE POSTGRESQL
ADD SQL_CONFIG
WITH SERVER "db-postgresql.db.onmyserver.com"
WITH PORT 25060
WITH DATABASE "defaultdb"
WITH USERNAME "doadmin"
WITH PASSWORD "AVNS_pass"
WITH USE_SSL TRUE
WITH TRUST_SERVER_CERTIFICATE FALSE
Replace these placeholders with your actual PostgreSQL connection details and run the Route from the LoT Notebook. Important: Use the private VPC connection details instead of the public endpoint for stronger security and lower latency. The VPC hostname and port are usually different from the public connection string, so check both in the database cluster connection details.
Updating the Model for PostgreSQL Database Storage
Extend the LoT model so it uses the database route for scalable storage by adding the following lines at the end of the model:
STORE IN "PostgreSQL_Log"
WITH TABLE "MachineProductionData"
Also add a parameter based on the topic so that each row in the managed database includes a unique identifier.
DEFINE MODEL MachineData WITH TOPIC "Simulator/Machine/+/Data"
ADD "energy" WITH TOPIC "raw_data/+" AS TRIGGER
ADD "device_name" WITH REPLACE "+" WITH TOPIC POSITION 2 IN "+"
ADD "energy_wh" WITH (energy * 1000)
ADD "production_status" WITH (IF energy > 5 THEN "active" ELSE "inactive")
ADD "production_count" WITH (IF production_status EQUALS "active" THEN (production_count + 1) ELSE 0)
ADD "stoppage" WITH (IF production_status EQUALS "inactive" THEN 1 ELSE 0)
ADD "maintenance_alert" WITH (IF energy > 50 THEN TRUE ELSE FALSE)
ADD "timestamp" WITH TIMESTAMP "UTC"
STORE IN "PostgreSQL_Log"
WITH TABLE "MachineProductionData"
After deploying this updated model, all new data should automatically be stored in the database whenever it changes.
MySQL Integration
MySQL is one of the most widely used relational database systems, which makes it a strong option for storing and analyzing IoT data at scale. In this section, the Coreflux MQTT broker is connected to a managed MySQL database so real-time device data can be stored securely and reliably for reporting, analytics, and application integration.
To enable the integration, define a Route in Coreflux’s LoT that tells the system where and how to send processed data. Use the following low-code structure and replace the placeholders with your own connection information:
DEFINE ROUTE MySQL_Log WITH TYPE MYSQL
ADD SQL_CONFIG
WITH SERVER "db-mysql.db.onmyserver.com"
WITH PORT 25060
WITH DATABASE "defaultdb"
WITH USERNAME "doadmin"
WITH PASSWORD "AVNS_pass"
WITH USE_SSL TRUE
WITH TRUST_SERVER_CERTIFICATE FALSE
Replace these values with your actual MySQL connection details and run the Route from the LoT Notebook. Important: Use the private VPC endpoint instead of the public one for improved security and reduced latency. If connection issues occur, verify the setting for TRUST_SERVER_CERTIFICATE, because some MySQL versions require TRUE while others work correctly with FALSE.
Updating the Model for MySQL Database Storage
Modify the LoT model so it uses the MySQL route for scalable storage by adding this to the end of the model:
STORE IN "MySQL_Log"
WITH TABLE "MachineProductionData"
Also add a topic-based parameter so each record in the database has a unique identifier.
DEFINE MODEL MachineData WITH TOPIC "Simulator/Machine/+/Data"
ADD "energy" WITH TOPIC "raw_data/+" AS TRIGGER
ADD "device_name" WITH REPLACE "+" WITH TOPIC POSITION 2 IN "+"
ADD "energy_wh" WITH (energy * 1000)
ADD "production_status" WITH (IF energy > 5 THEN "active" ELSE "inactive")
ADD "production_count" WITH (IF production_status EQUALS "active" THEN (production_count + 1) ELSE 0)
ADD "stoppage" WITH (IF production_status EQUALS "inactive" THEN 1 ELSE 0)
ADD "maintenance_alert" WITH (IF energy > 50 THEN TRUE ELSE FALSE)
ADD "timestamp" WITH TIMESTAMP "UTC"
STORE IN "MySQL_Log"
WITH TABLE "MachineProductionData"
After deploying this updated model, all incoming data should be written automatically to the database whenever new updates arrive.
MongoDB Integration
MongoDB is a NoSQL database that is especially useful for storing and querying IoT data with flexible schemas. In this section, the Coreflux MQTT broker is connected to a managed MongoDB database so real-time device data can be stored securely and reliably for reporting, analytics, and integration with other systems.
To enable this integration, define a Route in Coreflux’s LoT that specifies where and how the processed data should be sent. Use the following low-code format and replace the placeholders with your own connection details:
DEFINE ROUTE mongo_route WITH TYPE MONGODB
ADD MONGODB_CONFIG
WITH CONNECTION_STRING "mongodb+srv://:@/?tls=true&authSource=admin&replicaSet="
WITH DATABASE "admin"
Replace the placeholders with your MongoDB connection details and run the Route from the LoT Notebook. Important: Use the VPC connection string format when it is available. The connection string should include tls=true and authSource=admin.
Updating the Model for MongoDB Database Storage
Modify the LoT model so it uses the MongoDB route for scalable storage by adding this to the end of the model:
STORE IN "mongo_route"
WITH TABLE "MachineProductionData"
Also add a parameter from the topic so that every document in the managed database includes a unique identifier.
DEFINE MODEL MachineData WITH TOPIC "Simulator/Machine/+/Data"
ADD "energy" WITH TOPIC "raw_data/+" AS TRIGGER
ADD "device_name" WITH REPLACE "+" WITH TOPIC POSITION 2 IN "+"
ADD "energy_wh" WITH (energy * 1000)
ADD "production_status" WITH (IF energy > 5 THEN "active" ELSE "inactive")
ADD "production_count" WITH (IF production_status EQUALS "active" THEN (production_count + 1) ELSE 0)
ADD "stoppage" WITH (IF production_status EQUALS "inactive" THEN 1 ELSE 0)
ADD "maintenance_alert" WITH (IF energy > 50 THEN TRUE ELSE FALSE)
ADD "timestamp" WITH TIMESTAMP "UTC"
STORE IN "mongo_route"
WITH TABLE "MachineProductionData"
After this updated model is deployed, all new data should automatically be stored in the database when it changes.
OpenSearch Integration
OpenSearch is a distributed search and analytics engine designed for large-scale processing and real-time analysis of data. In this section, the Coreflux MQTT broker is connected to a managed OpenSearch database so real-time device data can be stored reliably for reporting, analytics, and integration with other systems.
To enable this integration, define a Route in Coreflux’s LoT that tells the platform where and how the processed data should be sent. Use the following low-code structure and replace the placeholders with your own values:
DEFINE ROUTE OpenSearch_log WITH TYPE OPENSEARCH
ADD OPENSEARCH_CONFIG
WITH BASE_URL "https://my-opensearch-cluster:9200"
WITH USERNAME "myuser"
WITH PASSWORD "mypassword"
WITH USE_SSL TRUE
WITH IGNORE_CERT_ERRORS FALSE
Replace these values with your own OpenSearch connection details and run the Route in the LoT Notebook. Important: Use the private VPC base URL instead of the public endpoint when available. The base URL format is usually https://your-cluster-hostname:9200. For access to OpenSearch Dashboards, use the separate dashboards URL provided with the cluster details.
Updating the Model for OpenSearch Database Storage
Modify the LoT model so it uses the OpenSearch route for scalable storage by adding the following lines at the end of the model:
STORE IN "OpenSearch_Log"
WITH TABLE "MachineProductionData"
Also add a topic-based parameter so that each entry in the managed database includes a unique identifier.
DEFINE MODEL MachineData WITH TOPIC "Simulator/Machine/+/Data"
ADD "energy" WITH TOPIC "raw_data/+" AS TRIGGER
ADD "device_name" WITH REPLACE "+" WITH TOPIC POSITION 2 IN "+"
ADD "energy_wh" WITH (energy * 1000)
ADD "production_status" WITH (IF energy > 5 THEN "active" ELSE "inactive")
ADD "production_count" WITH (IF production_status EQUALS "active" THEN (production_count + 1) ELSE 0)
ADD "stoppage" WITH (IF production_status EQUALS "inactive" THEN 1 ELSE 0)
ADD "maintenance_alert" WITH (IF energy > 50 THEN TRUE ELSE FALSE)
ADD "timestamp" WITH TIMESTAMP "UTC"
STORE IN "OpenSearch_Log"
WITH TABLE "MachineProductionData"
After deploying this updated model, all data should be stored automatically in the database when new updates arrive.
Step 9 — Verifying the Complete IoT Automation Pipeline
Monitoring Real-Time Data Flow
- MQTT Explorer: Use an MQTT client to confirm that real-time data is being published
- Database Client: Connect to verify that the data is being stored, for example with DBeaver for PostgreSQL, MongoDB Compass for MongoDB, or OpenSearch Dashboards for OpenSearch
Verifying PostgreSQL Storage
Connect to the managed PostgreSQL database through DBeaver to validate scalable storage:
- Use the connection string provided by the database service
- Open the
coreflux-broker-datadatabase or the name you assigned to it - Inspect the
MachineProductionDatatable for stored records
As shown earlier, all of the data also remains available in the MQTT broker for additional integrations and downstream use cases.
Verifying MongoDB Storage
Connect to the managed MongoDB database through MongoDB Compass to validate scalable storage:
- Use the connection string provided by the database service
- Open the
coreflux-broker-datadatabase or the name you assigned - Inspect the
MachineProductionDatacollection for stored documents
You should see real-time data documents with a structure similar to this:
{
"_id": {
"$oid": "68626dc3e8385cbe9a1666c3"
},
"energy": 36,
"energy_wh": 36000,
"production_status": "active",
"production_count": 31,
"stoppage": 0,
"maintenance_alert": false,
"timestamp": "2025-06-30 10:58:11",
"device_name": "station2"
}
As shown earlier, all of the data also remains available in the MQTT broker for additional integrations and downstream processes.
Verifying MySQL Storage
Connect to the managed MySQL database with DBeaver to validate scalable storage:
- Use the connection string provided by the database service
- Open the
coreflux-broker-datadatabase or the database name you selected - Inspect the
MachineProductionDatatable for stored records
As with the other integrations, the same data is also still available in the MQTT broker for further use and downstream integrations.
Verifying OpenSearch Storage
Open OpenSearch Dashboards with the provided URL and credentials:
- Open the menu and select Index Management
- Choose the Indexes option and check whether your table name appears in the list
- Return to the main page and open Discover from the menu
- Create an index pattern by following the provided steps
- Return to the Discover page and confirm that your data is visible
As shown earlier, all of the data is also available in the MQTT broker for other integrations and additional use cases.
Step 10 — Expanding Your Use Case and Integrations
Testing LoT Capabilities
- Publish Sample Data: Use MQTT Explorer to publish sample payloads to the Coreflux broker. Try different payload structures and experiment with multiple Models and Actions to observe how the data is transformed and stored in the selected database.
- Data Validation: Check that the data stored in the database matches the payloads that were published. Use your database client, such as DBeaver for PostgreSQL, MongoDB Compass for MongoDB, or OpenSearch Dashboards for OpenSearch, to confirm accuracy and consistency. Compare timestamps, transformed fields, and data types to validate the real-time data pipeline.
- Real-Time Monitoring: Create a continuous live data stream by using another MQTT source, such as a simple MQTT-enabled sensor. Observe how Coreflux and the database manage incoming IoT streams and evaluate response times for retrieval and query operations.
Building Analytics and Visualizations
- Create Dashboards: Integrate visualization tools such as Grafana to build dashboards that display IoT data from both live MQTT topics and historical database queries. Monitor metrics such as device uptime, sensor readings, production counts, or maintenance alerts. For real-time dashboards, subscribe directly to MQTT topics. For long-term trends and aggregated views, query the database.
- Trend Analysis: Use the capabilities of your selected database to analyze trends over time:
- PostgreSQL: Use SQL queries for complex relational analysis
- MongoDB: Use the aggregation framework for document-oriented analysis
- OpenSearch: Use built-in search and analytics features for full-text and time-series analysis
- Multi-Database Integration: Explore the use of multiple managed databases at the same time, for example MongoDB for unstructured data, PostgreSQL for relational data, MySQL for structured query workloads, or OpenSearch for advanced analytics and search. Coreflux routes can send data to several destinations at once.
Optimizing and Scaling Your IoT Infrastructure
- Load Testing: Simulate heavy traffic by publishing large numbers of messages simultaneously through the LoT Notebook or automated scripts. Monitor how the Coreflux MQTT broker and database cluster respond to the load and identify bottlenecks in the data pipeline.
- Scaling: Cloud platforms typically provide both vertical and horizontal scaling options. Increase virtual machine resources such as CPU, RAM, or storage as IoT data volume grows. Scale the managed database cluster to handle larger datasets and configure alerts so you are notified before resource limits are reached.
Frequently Asked Questions
1. How do I integrate a Coreflux MQTT broker with a managed database?
You integrate a Coreflux MQTT broker with a managed database by defining a Route in LoT that points to the target service, such as PostgreSQL, MySQL, MongoDB, or OpenSearch. Each Route includes the correct connection parameters, such as the server or connection string, port, database name, username, password, and SSL or TLS settings, and then writes MQTT message payloads into tables, collections, or indexes automatically. After the Route is defined, attach it to a Model by using the STORE IN directive so each processed message is saved in the selected database.
2. Can I save MQTT data directly to databases without writing custom integration code?
Yes. Coreflux is built as a low-code integration layer, so there is no need to write application logic or separate ETL jobs just to persist the data. For each database type, configure an LoT route such as PostgreSQL_Log, MySQL_Log, mongo_route, or OpenSearch_Log, and then extend the model with STORE IN "<route_name>" WITH TABLE "MachineProductionData". Coreflux handles connection pooling, retries, and error management so the focus remains on data modeling and transformation instead of repetitive integration code.
3. Which managed database should I choose for MQTT IoT data storage?
The best managed database depends on the structure of your data, the kinds of queries you need, and your analytics goals. Use the comparison below as a guide:
| Database | Best For | Example Use Cases |
|---|---|---|
| PostgreSQL | Strong consistency, relational schemas, complex SQL queries | Industrial sensor networks, transactional events, analytics |
| MySQL | Relational data, structured queries, broad compatibility | Inventory systems, production metrics, traditional business records |
| MongoDB | Flexible, evolving schemas and document storage | Connected devices with variable payloads, IoT telemetry with changing structures |
| OpenSearch | Full-text search, analytics, dashboards, and log indexing | Time-series analytics, monitoring, event logs, IoT search and visualization |
Tip: Tip: Several managed databases can be connected in parallel by setting up separate Coreflux routes. This allows the same MQTT stream to send structured IoT data to PostgreSQL or MySQL, logs and metrics to OpenSearch, and unstructured or schemaless data to MongoDB.
4. How does this architecture support both real-time and historical analytics?
Coreflux keeps processed values available on MQTT topics for live consumption, dashboards, or additional data pipelines, while Routes store the same modeled data in databases for historical queries. In practice, this means topics can be subscribed to for immediate reactions such as alerts or control loops, while PostgreSQL, MySQL, MongoDB, or OpenSearch can be queried for trends, aggregations, and long-term analysis. This dual-path design reflects a common architecture in IoT systems, where the broker handles live messaging and the database provides durable storage and analysis.
5. How secure is the connection between Coreflux and managed databases?
When deployed in a cloud environment, private VPC networking can be used to keep communication between the Coreflux MQTT broker and the databases internal. This isolates resources from direct public internet exposure, while managed databases usually support TLS-encrypted connections. In addition, dedicated database users with limited permissions can be created for the Coreflux application, which supports the principle of least privilege.
6. Is this setup suitable for production IoT deployments?
Yes. This architecture reflects patterns commonly used in production MQTT and database integrations, where a broker sits in front of device traffic and a managed database layer provides durability and analytics. Managed databases typically provide automated backups, high availability, and monitoring, while the Coreflux MQTT broker can scale horizontally to support high message throughput. In production, firewall rules, strong credentials, TLS for MQTT and database access, and correctly sized virtual machines and database clusters should also be part of the deployment.
7. Can I run the MQTT broker without public internet access or in hybrid environments?
Yes. MQTT brokers are frequently deployed in private networks or edge environments, and MQTT can function without public internet access as long as clients can reach the broker. In a cloud setup, Coreflux and the databases can remain inside a VPC, with only the minimum required services exposed, such as a VPN, bastion host, or tightly restricted firewall rules. Selected topics can also be synchronized with other brokers or cloud regions when hybrid or multi-site architectures are required.
8. Are there limitations or trade-offs when using MQTT with databases for IoT data?
MQTT is optimized for lightweight, event-driven messaging, while databases are optimized for storage and query workloads. Persisting every raw message may become costly or unnecessarily noisy, so it is best to model data carefully by aggregating metrics, filtering topics, or downsampling data where appropriate. Extremely low-power devices or highly constrained networks may also struggle with persistent connections or TLS overhead, in which case QoS settings, batching, and retention policies may need adjustment. When models and routes are designed with these trade-offs in mind, MQTT combined with managed databases works well for most IoT scenarios.
9. How do I choose between PostgreSQL, MySQL, MongoDB, and OpenSearch for an IoT project?
The right managed database depends on the structure of your IoT data, your scalability needs, and how you plan to query device information. The following table summarizes the strengths of each option:
| Database | Best When… | Typical Use Cases | Key Strengths |
|---|---|---|---|
| PostgreSQL | You need complex relational queries, strong consistency, and transactional integrity with ACID support. | Industrial sensor networks, correlation of device data with production systems, analytics across joined datasets | Relational schemas, advanced SQL, consistency |
| MySQL | Your workloads are structured and require broad tooling support and compatibility. | Inventory tracking, traditional business applications, production metrics | Simpler relational needs, broad support |
| MongoDB | Your device payloads evolve frequently, or fast prototyping with document-based storage is important. | IoT telemetry with varying formats, rapid development, semi-structured data | Flexible schemas, simple scaling, fast prototyping |
| OpenSearch | You need to search, analyze, or visualize large amounts of IoT data such as logs, events, or time-series streams. | Searching sensor data, log analytics, visualization, keyword and time-based queries | Search, full-text analysis, analytics, fast aggregation |
Conclusion
Integrating the Coreflux MQTT broker with managed database services such as PostgreSQL, MongoDB, MySQL, or OpenSearch creates a complete environment for real-time IoT data processing and storage. By following this guide, you build an automation pipeline that collects, processes, and stores IoT data through low-code development practices.
With Coreflux’s architecture and the storage capabilities of the selected database, large volumes of real-time data can be handled efficiently and queried for meaningful insights. Whether the goal is monitoring industrial systems, tracking environmental sensors, or managing smart city infrastructure, this setup supports data-driven decisions by combining live MQTT topics with historical database queries.


