Use Case

How-to integrate Coreflux with DigitalOcean and OpenSearch A Step-by-Step Guide

September , 2024

Coreflux is a Strategic Partner of DigitalOcean. The integration of both companies' technologies is key to fast-forwarding the state of Industrial Digitalization around the globe.

João Schier is a Solutions Engineer at Coreflux. He prepared the following article for publication on DigitalOcean's website on September 2024. Here he is to tell you more:

Context

With the increasing number of devices in tech, and today’s world being more data-driventhan ever before, we have to be able process billions of pieces of information at scale. AMQTT broker - Coreflux, paired with a scalable cloud platform like DigitalOcean can be theanswer to this problem of processing and analyzing IoT (Internet of Things) data.

In this guide, you will learn how to connect an MQTT broker with a managed OpenSearchservice on DigitalOcean. The end result of this is going to be a seamless setup that allowsfor real-time data collection and storage, making it easier to monitor, analyze, and visualizeyour IoT data.

Partner Update: DigitalOcean + Coreflux

DigitalOcean is a cloud infrastructure provider focused on delivering simplified, scalable computing solutions for developers, startups, and small to medium-sized businesses. Known for its user-friendly interface and transparent pricing, DigitalOcean offers a range of services, including virtual servers (Droplets), managed databases, object storage, and Kubernetes-based container orchestration.

With data centers worldwide, the platform provides reliable access and performance, allowing businesses to deploy, manage, and scale applications efficiently. DigitalOcean's emphasis on ease of use and affordability helps users focus on development and growth rather than managing cloud infrastructure.

Coreflux offers a lightweight MQTT broker designed to facilitate efficient, real-timecommunication between IoT devices and applications. Built for scalability and reliability,Coreflux is tailored for environments where low latency and high throughput are critical. Itsupports the MQTT (Message Queuing Telemetry Transport) protocol, which is widelyadopted in IoT ecosystems for its efficiency in bandwidth-constrained environments.

Whether you’re developing a small-scale IoT project or deploying a large-scale industrialmonitoring system, Coreflux provides the reliable messaging backbone needed to ensuresmooth data flow between devices.

Why Integrate Coreflux with DigitalOcean?

Integrating Coreflux with DigitalOcean’s managed OpenSearch service offers numerousbenefits:

  • Scalability: As your IoT network capabilities expand using Coreflux’s offering,DigitalOcean’s Droplets (virtual servers) can be scaled to accommodate increasedtraffic and data processing needs.
  • Reliability: DigitalOcean offers a reliable and consistent cloud environment,ensuring that your Coreflux broker remains online and responsive across multipleregions.
  • Ease of Management: With DigitalOcean’s dashboard and API, managing yourinfrastructure becomes straightforward, and that includes deploying andmonitoring the data flow in your Coreflux broker.
  • Cost-Effectiveness: DigitalOcean’s pricing is transparent and affordable andCoreflux’s pricing aligns with those exact same principles, making it a great option  for both startups and established businesses looking to manage their IoTinfrastructure without breaking the bank.

Step-by-Step Integration Guide

Before diving into the integration, make sure you have the following:

  • DigitalOcean Account: If you don’t have one, sign up for an account at DigitalOcean.
  • Coreflux Broker Setup: Coreflux broker should be running and accessible. If it’s notset up yet, you can refer to the Coreflux Documentation to get started, or check theinitial steps in this guide.
  • MQTT Explorer: This tool will be used to interact with the MQTT broker. You candownload it from MQTT Explorer.
  • Python Environment: Ensure you have Python installed, along with the necessarylibraries like paho-mqtt and opensearch-py.
  • Python Script: You’ll need the Python script that bridges the Coreflux MQTT brokerwith your DigitalOcean OpenSearch instance. This script is responsible for checkingpublished MQTT messages, processing them, and storing them in OpenSearc.

New to Python? Here’s a basic outline of what the Python script above does:

  • Connects to Coreflux: The script uses paho-mqtt to connect to your Coreflux MQTTbroker.
  • Subscribes to Topics: It listens for messages on specific topics that you define inthe python script.
  • Processes and Indexes Data: The script parses published JSON messages andattempts to index them into OpenSearch using opensearch-py.
  • Publishes Feedback: After processing, the script can publish feedback messagesback to the MQTT broker alerting of errors or completion of tasks.

Step 1: Set Up Your Coreflux MQTT Broker

Ready to go? You can watch the Coreflux Tutorial on how to quickly start a Free Trial of theOnline MQTT Broker, and follow along this step-by-step guide:

  • Create a Coreflux Account:
    • Go to the Coreflux website and sign up for a free account if you don’t alreadyhave one.
    • After signing up, verify your email address to activate your account.
  • Start a Free Trial Broker:
    • Once logged in, navigate to mqtt.coreflux.org.
    • Click on Start Free Trial to create a new MQTT broker.
    • Choose a Data Center Region: Select a region that is geographically close toyour IoT devices or the DigitalOcean data center where you plan to deployOpenSearch for lower latency.
    • Confirm your choices to start the trial.
  • Receive Broker Credentials:
    • After creating your broker, Coreflux will send the credentials to access yourbroker (such as the broker URL, port, username, and password) to yourregistered email.
    • Keep these credentials safe as you’ll need them to configure your IoT devicesand the Python script later.
  • Set Up MQTT Explorer:
    • Download and install MQTT Explorer if you haven’t already.
    • Open MQTT Explorer and configure it to connect to your Coreflux brokerusing the credentials you received
    • Broker Address: Enter the broker URL.
    • Port: Use the port provided (typically 8883 for SSL connections).
    • Username/Password: Enter the credentials provided in your email.
    • Connect to the broker and try subscribing to a topic to ensure everything isworking.

  • Test the Broker Connection:
    • Publish a test message using MQTT Explorer to one of the topics on yourCoreflux broker.
    • Verify that the message is received and displayed correctly in the Explorer.This confirms that your broker is up and running.

Step 2: Set Up Your OpenSearch Instance on DigitalOcean

Now that your Coreflux MQTT broker is set up and tested, it’s time to connect it to amanaged OpenSearch instance on DigitalOcean. Here’s how:

  • Log in to DigitalOcean
    • Head over to the DigitalOcean dashboard and log in with your credentials.
  • Create a New Database:
    • On the dashboard, click on Databases in the left-hand menu
    • Select Create Database Cluster.
    • Choose OpenSearch from the list of available database types.
  • Configure Your OpenSearch Instance:
    • Select a Data Center Region: Choose a region that’s geographically close toyour IoT devices or Coreflux broker for lower latency.
    • Choose Your Plan: Start with a basic plan suitable for your current needs.You can always scale up later as your data grows.
  • Create the Cluster
    • Once configured, click Create Cluster and wait for DigitalOcean to provisionyour OpenSearch instance. This may take a few minutes.
  • Get Your Connection Details:
    • After the cluster is created, go to the Connection Details tab in yourdatabase cluster. Note down the host, port, username and passworddetails as you’ll need them to connect from your Python script.

Step 3: Mapping the Index in Your OpenSearch Instance

Before you start indexing data from your Coreflux MQTT broker into OpenSearch, you need to define the mapping for your index. Mapping is essentially the schema for your index, specifying the data types for each field in your documents. This step is crucial for ensuring that the data is stored correctly and can be searched effectively.

Here’s how to create and map an index in your OpenSearch instance:

  • Access the OpenSearch Dashboard:
    • Log in to the OpenSearch dashboard using the connection details you obtained when setting up the OpenSearch instance.
    • Navigate to the “Index Management” section.
  • Create a New Index:
    • Click on Create Index to start the process.
    • Enter a name for your index (e.g., machine-production).
  • Define the Mapping:
    • Click on the Mappings tab during the index creation process.
    • Here, you will define the fields that your data will have. For example:
{
  "mappings": {
    "properties": {
      "timestamp": {
        "type": "date"
      },
      "machine_id": {
        "type": "keyword"
      },
      "temperature": {
        "type": "float"
      },
      "status": {
        "type": "keyword"
      },
      "error_code": {
        "type": "integer"
      }
    }
  }
}
  • In this example:
    • timestamp is stored as a date type, which is useful for time-based searches.
    • machine_id and status are stored as keyword types, which means they are not analyzed and are used for exact matches.
    • temperature is stored as a float type to accommodate decimal values.
    • error_code is stored as an integer type, suitable for numeric values without decimals.
  • Finalize the Index Creation:
    • After defining your mappings, review the settings, and click on Create Index.
    • OpenSearch will now create the index with the mappings you specified. This index is now ready to store and organize the data that will be published from your Coreflux MQTT broker.
  • Test the Mapping:
    • Use your Python script or directly use the OpenSearch API to index a test document and ensure that it matches the defined mapping.
    • Example of a test document:
{
  "timestamp": "2024-08-23T10:30:00Z",
  "machine_id": "MACHINE123",
  "temperature": 75.5,
  "status": "operational",
  "error_code": 0
}
  • Insert this document into the machine-production index (or the index of your choice) and verify that all fields are correctly stored and searchable.

Step 4: Integrate Coreflux with OpenSearch Using Python

With both Coreflux and OpenSearch set up, it’s time to link them together using a Python script. This script will connect to the Coreflux broker, process published messages, and store them in OpenSearch.

  • Set Up Your Environment Variables
    • In the directory where your Python script is located, create a .env file.
    • Add the following environment variables, replacing the placeholder values with your actual credentials:

MQTT_BROKER=your-coreflux-broker-url

MQTT_PORT=1883

MQTT_USERNAME=your-coreflux-username

MQTT_PASSWORD=your-coreflux-password

OPENSEARCH_HOST=your-opensearch-host

OPENSEARCH_USERNAME=your-opensearch-username

OPENSEARCH_PASSWORD=your-opensearch-password

  • Install Required Python Libraries:
    • Ensure you have the necessary Python libraries installed. You can install them using pip:

pip install paho-mqtt opensearch-py python-dotenv

  • Write or Configure the Python Script:
    • Use the Python script provided earlier, which connects to the Coreflux MQTT broker, listens for published messages in the topic Machine/Produce, and indexes them into OpenSearch.
    • Make sure the script correctly references the environment variables you set up.
  • Run the Script:  
    • Execute the Python script. It should connect to the Coreflux broker, subscribe to the desired topics, and start indexing published messages into your OpenSearch instance.
    • Monitor the output to ensure that messages are being processed and stored correctly.

Step 5: Have fun with your integration!  

  • Test Data Flow:
    • Publish Sample Data: Use MQTT Explorer to publish sample datasets to your Coreflux broker. Experiment with different payload structures to see how they are processed and indexed in OpenSearch.
    • Data Validation: Verify that the data in OpenSearch matches the payloads you published. Check for consistency and accuracy, ensuring your integration is working as expected.
    • Real-Time Monitoring: Set up a real-time feed using MQTT Explorer to publish messages continuously. Watch how OpenSearch handles incoming data streams and explore how quickly you can retrieve and analyze the data.
  • Build Visualizations:
    • Create Dashboards: Use OpenSearch’s dashboarding tools to create dynamic dashboards that visualize your IoT data. You could track metrics like device uptime, sensor readings, or user interactions.
    • Trend Analysis: Analyze trends over time by aggregating data in OpenSearch. Look for patterns, spikes, or anomalies in the data you’re collecting.
    • Geo-Visualizations: If your data includes geographic information, create maps that display data points based on location. This is especially useful for IoT devices spread across different regions.
  • Optimize and Scale:
    • Performance Tuning: Experiment with different broker and OpenSearch configurations to optimize performance. Adjust your Coreflux broker settings, such as connection limits or message retention policies, to improve efficiency. You can also learn about more advanced configurations for DO’s droplet.
    • Load Testing: Simulate high traffic by publishing a large number of messages simultaneously. Monitor how your Coreflux broker and OpenSearch instance handle the load, and identify any bottlenecks or areas for improvement.
    • Scaling: DigitalOcean offers scaling, allowing you to increase the resources (CPU, RAM, or storage) of your Droplets as your data needs grow. You can also set up alerts to notify you when resource limits are approaching.

You can check here on how to get started with OpenSearch on DigitalOcean.

Get a free trial of the Coreflux Online MQTT Broker or learn more with the Coreflux Docs and Tutorials.

João Henrique Schier Solutions Engineer @ Coreflux | Fullstack Developer | C# | Python | Node.js | Javascript | React | MQTT | IoT

Coreflux is on a mission to #democratizeIIoT and empower each human being to extract the maximum potential from the technology they use in their industry.

Visit coreflux.org to power-up your IIoT project.  

Recent Posts