Implementing Message Queuing with Golang, Redis®, and MySQL on Linux

Message queuing represents a core concept in microservice architecture. It enables the seamless transmission of data across various applications for further operations. Typically, end-users submit data to your web app, which is then added to a queue managed by a message broker such as Redis® Server. Afterward, one or more worker processes retrieve and handle each queued task.

Message Queuing vs Pub/Sub Model

Unlike the publish/subscribe (pub/sub) pattern, where multiple consumers can receive a single message, message queuing ensures that each task is handled by only one worker and is removed from the queue upon completion. This mechanism guarantees that jobs are not duplicated, even when processed concurrently by multiple workers.

Use Cases and Benefits

Common applications of message queuing include payment processing systems, order handling workflows, inter-server communications, and more. Since this approach often employs in-memory databases like Redis®, it offers exceptional performance in environments that demand high availability, scalability, and real-time responsiveness—thus improving the overall user experience.

Project Overview

This tutorial walks you through setting up message queuing for a payment processing system. It uses Golang for application logic, Redis® for message handling, and MySQL 8 for persistent storage, all hosted on a Linux server.

Prerequisites

Before proceeding, ensure you have access to the following components:

  • A Linux server
  • A non-root user with sudo privileges
  • A running MySQL database
  • An active Redis® server
  • The Go programming language installed

Step 1: Set Up the MySQL Database and User

In this payment application, an HTTP endpoint receives payment requests and adds them to a Redis® queue using RPush. A backend process then continuously listens with BLPop to retrieve and store the payment records in a MySQL database.

Start by connecting to your server through SSH and accessing MySQL as the root user.

Enter the root password when prompted. Next, run the following SQL commands to create a database named web_payments and a user account web_payments_user. Be sure to replace EXAMPLE_PASSWORD with a strong password.

mysql> CREATE DATABASE web_payments;
        CREATE USER 'web_payments_user'@'localhost' IDENTIFIED WITH mysql_native_password BY 'EXAMPLE_PASSWORD';
        GRANT ALL PRIVILEGES ON web_payments.* TO 'web_payments_user'@'localhost';
        FLUSH PRIVILEGES;

Switch to the newly created database:

Now create a payments table, which the Redis® worker will populate with incoming payment data.

mysql> CREATE TABLE payments (
            payment_id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
            payment_date DATETIME,
            first_name VARCHAR(50),
            last_name VARCHAR(50),
            payment_mode VARCHAR(255),
            payment_ref_no VARCHAR (255),
            amount DECIMAL(17,4)
        ) ENGINE = InnoDB;

Exit the MySQL session once finished:

Step 2: Create the Project Directory Structure

To keep your files organized and avoid conflicts with the system, structure your application as follows:

payment_gateway
    --queue
        --main.go
    --worker
        --main.go

Create the root project directory inside your home folder:

Navigate to the newly created folder:

Create the subdirectories for your queuing logic and worker logic:

$ mkdir queue
$ mkdir worker

Your directory layout is now complete. You are ready to proceed by adding the Golang source files to their respective folders.

Step 3: Build the Message Queuing Script

In this step, you’ll create a script that monitors for incoming payment data and sends it into a Redis® queue.

Start by navigating to the ~/payment_gateway/queue directory:

$ cd ~/payment_gateway/queue

Create and open the main.go file using nano:

Paste the following Go code into the main.go file:

package main

import (
    "net/http"
    "github.com/go-redis/redis"
    "context"
    "bytes"
    "fmt"
)

func main() {
    http.HandleFunc("/payments", paymentsHandler)
    http.ListenAndServe(":8080", nil)
}

func paymentsHandler(w http.ResponseWriter, req *http.Request) {

    redisClient := redis.NewClient(&redis.Options{
        Addr: "localhost:6379",
        Password: "",
        DB: 0,
    })

    ctx := context.TODO()
    buf := new(bytes.Buffer)

    // Implement validation for req.Body in production to sanitize input

    buf.ReadFrom(req.Body)
    paymentDetails := buf.String()

    err := redisClient.RPush(ctx, "payments", paymentDetails).Err()

    if err != nil {
        fmt.Fprintf(w, err.Error() + "\r\n")
    } else {
        fmt.Fprintf(w, "Payment details accepted successfully\r\n")
    }
}

This script listens on /payments at port 8080, then hands the incoming request to paymentsHandler(...). The handler sets up a connection to Redis® at port 6379 and queues the payment payload using the RPush function under the payments key.

Step 4: Build the Worker Script

This section covers the creation of a message worker that leverages the Redis® BLPOP command to retrieve, handle, and remove payment entries from the queue.

Navigate to the ~/payment_gateway/worker directory:

$ cd ~/payment_gateway/worker

Create a new main.go file:

Add the following content, replacing EXAMPLE_PASSWORD with your actual MySQL password for web_payments_user:

package main

import (
    "github.com/go-redis/redis"
    _ "github.com/go-sql-driver/mysql"
    "database/sql"
    "encoding/json"
    "context"
    "fmt"
    "strings"
    "strconv"
    "time"
)

func main() {

    ctx := context.TODO()

    redisClient := redis.NewClient(&redis.Options{
        Addr: "localhost:6379",
        Password: "",
        DB: 0,
    })

    for {
        result, err := redisClient.BLPop(ctx, 0 * time.Second, "payments").Result()

        if err != nil {
            fmt.Println(err.Error())
        } else {

            params := map[string]interface{}{}

            err := json.NewDecoder(strings.NewReader(string(result[1]))).Decode(¶ms)

            if err != nil {
                fmt.Println(err.Error())
            } else {

                paymentId, err := savePayment(params)

                if err != nil {
                    fmt.Println(err.Error())
                } else {
                    fmt.Println("Payment # " + strconv.FormatInt(paymentId, 10) + " processed successfully.\r\n")
                }
            }
        }
    }
}

func savePayment(params map[string]interface{}) (int64, error) {

    db, err := sql.Open("mysql", "web_payments_user:EXAMPLE_PASSWORD@tcp(127.0.0.1:3306)/web_payments")

    if err != nil {
        return 0, err
    }

    defer db.Close()

    queryString := `insert into payments (
                        payment_date,
                        first_name,
                        last_name,
                        payment_mode,
                        payment_ref_no,
                        amount
                    ) values (
                        ?,
                        ?,
                        ?,
                        ?,
                        ?,
                        ?
                    )`

    stmt, err := db.Prepare(queryString)

    if err != nil {
        return 0, err
    }

    defer stmt.Close()

    paymentDate := time.Now().Format("2006-01-02 15:04:05")
    firstName := params["first_name"]
    lastName := params["last_name"]
    paymentMode := params["payment_mode"]
    paymentRefNo := params["payment_ref_no"]
    amount := params["amount"]

    res, err := stmt.Exec(paymentDate, firstName, lastName, paymentMode, paymentRefNo, amount)

    if err != nil {
        return 0, err
    }

    paymentId, err := res.LastInsertId()

    if err != nil {
        return 0, err
    }

    return paymentId, nil
}

The worker connects to Redis® and continuously uses BLPop to fetch and remove data from the payments queue. Upon successfully retrieving a payment, the savePayment(...) function sends the parsed data into the MySQL database. The function then returns the paymentId for verification.

Step 5: Test the Golang and Redis® Message Queue

Now that the application setup is complete, it’s time to run tests to verify that everything functions correctly.

Begin by downloading the necessary Go packages for MySQL and Redis®:

$ go get github.com/go-sql-driver/mysql
$ go get github.com/go-redis/redis

Move into the queue directory and run the Redis® queuing service. This launches the Go web server to receive payment requests via port 8080:

$ cd ~/payment_gateway/queue
$ go run ./

In a second terminal, connect to your server via SSH again and start the worker process:

$ cd ~/payment_gateway/worker
$ go run ./

The application is now actively monitoring for incoming payment entries.

Open a third terminal and use curl to send test payment data to the application one entry at a time:

$ curl -i -X POST localhost:8080/payments -H "Content-Type: application/json" -d '{"first_name": "JOHN", "last_name": "DOE", "payment_mode": "CASH", "payment_ref_no": "-", "amount" : 5000.25}'
$ curl -i -X POST localhost:8080/payments -H "Content-Type: application/json" -d '{"first_name": "MARY", "last_name": "SMITH", "payment_mode": "CHEQUE", "payment_ref_no": "985", "amount" : 985.65}'
$ curl -i -X POST localhost:8080/payments -H "Content-Type: application/json" -d '{"first_name": "ANN", "last_name": "JACOBS", "payment_mode": "PAYPAL", "payment_ref_no": "ABC-XYZ", "amount" : 15.25}'

Each command should return the message:

Payment details accepted successfully

Meanwhile, the worker terminal should display output confirming successful processing:

Payment # 1 processed successfully.
Payment # 2 processed successfully.
Payment # 3 processed successfully.

To confirm the transactions are stored in the database, use the third terminal to log into MySQL:

After entering your root password, switch to the relevant database:

Run the following query to view the inserted payments:

mysql> SELECT
           payment_id,
           payment_date,
           first_name,
           last_name,
           payment_mode,
           payment_ref_no,
           amount
       FROM payments;

If successful, you’ll see a result like this:

+------------+---------------------+------------+-----------+--------------+----------------+-----------+
| payment_id | payment_date        | first_name | last_name | payment_mode | payment_ref_no | amount    |
+------------+---------------------+------------+-----------+--------------+----------------+-----------+
|          1 | 2021-12-01 09:48:32 | JOHN       | DOE       | CASH         | -              | 5000.2500 |
|          2 | 2021-12-01 09:48:42 | MARY       | SMITH     | CHEQUE       | 985            |  985.6500 |
|          3 | 2021-12-01 09:48:55 | ANN        | JACOBS    | PAYPAL       | ABC-XYZ        |   15.2500 |
+------------+---------------------+------------+-----------+--------------+----------------+-----------+
3 rows in set (0.00 sec)

This confirms that the application has correctly dequeued and processed the payments.

Conclusion

Throughout this guide, you’ve successfully built a message queuing system using Golang, Redis®, and MySQL 8 on a Linux server. By leveraging Redis® commands like RPush and BLPop, you’ve decoupled the receipt and processing of payments, which enhances the application’s scalability and dependability.

Source: vultr.com

Create a Free Account

Register now and get access to our Cloud Services.

Posts you might be interested in: