Message Queuing mit Golang, Redis® und MySQL unter Linux einrichten

Message Queuing ist ein zentrales Prinzip in der Microservice-Architektur. Es ermöglicht den reibungslosen Datentransfer zwischen verschiedenen Anwendungen zur weiteren Verarbeitung. Üblicherweise senden Endnutzer Daten an eine Webanwendung, die diese dann in eine Warteschlange einfügt – gesteuert durch einen Message Broker wie Redis® Server. Anschließend übernehmen ein oder mehrere Worker-Prozesse die Bearbeitung der einzelnen Aufgaben aus dieser Queue.

Message Queuing im Vergleich zum Pub/Sub-Modell

Im Gegensatz zum Publish/Subscribe-Modell (pub/sub), bei dem mehrere Konsumenten eine Nachricht erhalten können, wird beim Message Queuing sichergestellt, dass jede Aufgabe nur einmal von genau einem Worker verarbeitet wird. Nach der Bearbeitung wird sie aus der Warteschlange gelöscht. Dieses Verfahren verhindert doppelte Verarbeitung selbst bei gleichzeitiger Ausführung durch mehrere Prozesse.

Anwendungsfälle und Vorteile

Message Queuing wird häufig bei Zahlungsabwicklungen, Bestellprozessen, Server-Kommunikation und ähnlichen Szenarien eingesetzt. Durch die Nutzung speicherbasierter Datenbanken wie Redis® bietet diese Methode hohe Verfügbarkeit, Skalierbarkeit und Reaktionsfähigkeit – ideal für Systeme mit hohen Performance-Anforderungen und Echtzeitverarbeitung.

Projektüberblick

In dieser Anleitung richten Sie eine Zahlungsanwendung mit Message Queuing ein. Dabei kommen Golang für die Anwendungslogik, Redis® für die Nachrichtenverarbeitung und MySQL 8 für die dauerhafte Speicherung zum Einsatz – alles auf einem Linux-Server.

Voraussetzungen

Für die Umsetzung benötigen Sie folgende Komponenten:

  • Einen Linux-Server
  • Einen nicht-root Nutzer mit sudo-Rechten
  • Eine laufende MySQL-Datenbank
  • Einen aktiven Redis®-Server
  • Die Go-Programmiersprache (Golang)

Schritt 1: MySQL-Datenbank und Benutzer einrichten

In dieser Zahlungsanwendung empfängt ein HTTP-Endpunkt Zahlungsdaten und fügt sie per RPush in eine Redis®-Queue ein. Ein Backend-Skript überwacht per BLPop diese Queue, liest die Daten aus und speichert sie in der MySQL-Datenbank.

Verbinden Sie sich zunächst per SSH mit Ihrem Server und melden Sie sich als root-Benutzer bei MySQL an:

Geben Sie das Root-Passwort ein. Führen Sie anschließend folgende SQL-Befehle aus, um eine Datenbank web_payments sowie einen Nutzer web_payments_user anzulegen. Ersetzen Sie EXAMPLE_PASSWORD mit einem sicheren Passwort:

mysql> CREATE DATABASE web_payments;
        CREATE USER 'web_payments_user'@'localhost' IDENTIFIED WITH mysql_native_password BY 'EXAMPLE_PASSWORD';
        GRANT ALL PRIVILEGES ON web_payments.* TO 'web_payments_user'@'localhost';
        FLUSH PRIVILEGES;

Wechseln Sie zur neuen Datenbank:

Erstellen Sie nun die Tabelle payments, die später vom Redis®-Worker mit Zahlungsdaten befüllt wird:

mysql> CREATE TABLE payments (
            payment_id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
            payment_date DATETIME,
            first_name VARCHAR(50),
            last_name VARCHAR(50),
            payment_mode VARCHAR(255),
            payment_ref_no VARCHAR (255),
            amount DECIMAL(17,4)
        ) ENGINE = InnoDB;

Beenden Sie die MySQL-Sitzung:

Schritt 2: Projektverzeichnisstruktur erstellen

Zur besseren Organisation und um Konflikte mit anderen Systemdateien zu vermeiden, strukturieren Sie Ihre Anwendung wie folgt:

payment_gateway
    --queue
        --main.go
    --worker
        --main.go

Erstellen Sie das Hauptverzeichnis im Home-Verzeichnis des Nutzers:

Wechseln Sie in das neu erstellte Verzeichnis:

Erstellen Sie die Unterordner für die Queue- und Worker-Logik:

$ mkdir queue
$ mkdir worker

Die Projektstruktur ist nun angelegt – Sie können jetzt mit dem Schreiben des Golang-Codes beginnen und diesen in den jeweiligen Verzeichnissen speichern.

Schritt 3: Skript für das Message Queuing erstellen

In diesem Abschnitt entwickelst du ein Skript, das eingehende Zahlungsdaten erfasst und sie in eine Redis®-Warteschlange überträgt.

Wechsle zuerst in das Verzeichnis ~/payment_gateway/queue:

$ cd ~/payment_gateway/queue  

Erstelle und öffne die Datei main.go mit nano:

Füge den folgenden Go-Code in die Datei main.go ein:

package main

import (
    "net/http"
    "github.com/go-redis/redis"
    "context"
    "bytes"
    "fmt"
)

func main() {
    http.HandleFunc("/payments", paymentsHandler)
    http.ListenAndServe(":8080", nil)
}

func paymentsHandler(w http.ResponseWriter, req *http.Request) {

    redisClient := redis.NewClient(&redis.Options{
        Addr: "localhost:6379",
        Password: "",
        DB: 0,
    })

    ctx := context.TODO()
    buf := new(bytes.Buffer)

    // Validierung des req.Body für den Produktiveinsatz einbauen

    buf.ReadFrom(req.Body)
    paymentDetails := buf.String()

    err := redisClient.RPush(ctx, "payments", paymentDetails).Err()

    if err != nil {
        fmt.Fprintf(w, err.Error() + "\r\n")
    } else {
        fmt.Fprintf(w, "Zahlungsdetails erfolgreich empfangen\r\n")
    }
}

Dieses Skript lauscht auf Anfragen an der URL /payments über Port 8080. Es leitet die eingehenden Daten an die Funktion paymentsHandler(...) weiter, die eine Verbindung zu Redis® auf Port 6379 herstellt und die Zahlungsinformationen mit RPush in der Warteschlange payments speichert.

Schritt 4: Worker-Skript erstellen

Im folgenden Schritt erstellst du ein Worker-Skript, das mithilfe des Redis®-Befehls BLPOP Zahlungsdaten aus der Queue abruft, verarbeitet und entfernt.

Wechsle in das Verzeichnis ~/payment_gateway/worker:

$ cd ~/payment_gateway/worker  

Erstelle eine neue Datei main.go:

Trage folgenden Go-Code ein. Ersetze dabei EXAMPLE_PASSWORD durch das tatsächliche Passwort des MySQL-Nutzers web_payments_user:

package main

import (
    "github.com/go-redis/redis"
    _ "github.com/go-sql-driver/mysql"
    "database/sql"
    "encoding/json"
    "context"
    "fmt"
    "strings"
    "strconv"
    "time"
)

func main() {

    ctx := context.TODO()

    redisClient := redis.NewClient(&redis.Options{
        Addr: "localhost:6379",
        Password: "",
        DB: 0,
    })

    for {
        result, err := redisClient.BLPop(ctx, 0 * time.Second, "payments").Result()

        if err != nil {
            fmt.Println(err.Error())
        } else {

            params := map[string]interface{}{}

            err := json.NewDecoder(strings.NewReader(string(result[1]))).Decode(¶ms)

            if err != nil {
                fmt.Println(err.Error())
            } else {

                paymentId, err := savePayment(params)

                if err != nil {
                    fmt.Println(err.Error())
                } else {
                    fmt.Println("Zahlung # " + strconv.FormatInt(paymentId, 10) + " erfolgreich verarbeitet.\r\n")
                }
            }
        }
    }
}

func savePayment(params map[string]interface{}) (int64, error) {

    db, err := sql.Open("mysql", "web_payments_user:EXAMPLE_PASSWORD@tcp(127.0.0.1:3306)/web_payments")

    if err != nil {
        return 0, err
    }

    defer db.Close()

    queryString := `insert into payments (
                        payment_date,
                        first_name,
                        last_name,
                        payment_mode,
                        payment_ref_no,
                        amount
                    ) values (
                        ?,
                        ?,
                        ?,
                        ?,
                        ?,
                        ?
                    )`

    stmt, err := db.Prepare(queryString)

    if err != nil {
        return 0, err
    }

    defer stmt.Close()

    paymentDate := time.Now().Format("2006-01-02 15:04:05")
    firstName := params["first_name"]
    lastName := params["last_name"]
    paymentMode := params["payment_mode"]
    paymentRefNo := params["payment_ref_no"]
    amount := params["amount"]

    res, err := stmt.Exec(paymentDate, firstName, lastName, paymentMode, paymentRefNo, amount)

    if err != nil {
        return 0, err
    }

    paymentId, err := res.LastInsertId()

    if err != nil {
        return 0, err
    }

    return paymentId, nil
}

Das Worker-Skript baut eine Verbindung zu Redis® auf und nutzt den Befehl BLPOP, um kontinuierlich Zahlungsdaten aus der Warteschlange payments zu entnehmen. Diese werden anschließend in der MySQL-Datenbank gespeichert. Die Funktion savePayment(...) liefert bei erfolgreicher Eintragung die ID der Zahlung zurück.

Schritt 5: Die Message Queue mit Golang und Redis® testen

Nachdem die Anwendung vollständig eingerichtet wurde, ist es Zeit, die Funktionsfähigkeit durch Tests zu überprüfen.

Installiere zunächst die benötigten Go-Pakete für MySQL und Redis®:

$ go get github.com/go-sql-driver/mysql  
$ go get github.com/go-redis/redis  

Wechsle in das Verzeichnis für die Queue und starte den Redis®-Queue-Service. Dadurch wird der integrierte Go-Webserver aktiviert, der Anfragen über Port 8080 entgegennimmt:

$ cd ~/payment_gateway/queue  
$ go run ./  

Öffne ein zweites Terminal, stelle erneut per SSH die Verbindung zum Server her und starte den Worker-Prozess:

$ cd ~/payment_gateway/worker  
$ go run ./  

Die Anwendung wartet nun aktiv auf eingehende Zahlungsanfragen.

Nutze ein drittes Terminal und sende mit curl nacheinander Testdaten an die Anwendung:

$ curl -i -X POST localhost:8080/payments -H "Content-Type: application/json" -d '{"first_name": "JOHN", "last_name": "DOE", "payment_mode": "CASH", "payment_ref_no": "-", "amount" : 5000.25}'  
$ curl -i -X POST localhost:8080/payments -H "Content-Type: application/json" -d '{"first_name": "MARY", "last_name": "SMITH", "payment_mode": "CHEQUE", "payment_ref_no": "985", "amount" : 985.65}'  
$ curl -i -X POST localhost:8080/payments -H "Content-Type: application/json" -d '{"first_name": "ANN", "last_name": "JACOBS", "payment_mode": "PAYPAL", "payment_ref_no": "ABC-XYZ", "amount" : 15.25}'  

Jede Anfrage sollte mit folgender Nachricht bestätigt werden:

Zahlungsdetails erfolgreich empfangen

Im Terminal des Workers sollte die folgende Ausgabe erscheinen, die die erfolgreiche Verarbeitung bestätigt:

Payment # 1 processed successfully.  
Payment # 2 processed successfully.  
Payment # 3 processed successfully.  

Um zu überprüfen, ob die Daten tatsächlich in der Datenbank gespeichert wurden, logge dich über das dritte Terminal in den MySQL-Server ein:

Gib das Passwort ein und wechsle zur Datenbank:

Führe folgenden SQL-Befehl aus, um alle Einträge in der Tabelle payments anzuzeigen:

mysql> SELECT  
           payment_id,  
           payment_date,  
           first_name,  
           last_name,  
           payment_mode,  
           payment_ref_no,  
           amount  
       FROM payments;  

Die Ausgabe sollte wie folgt aussehen:

+------------+---------------------+------------+-----------+--------------+----------------+-----------+  
| payment_id | payment_date        | first_name | last_name | payment_mode | payment_ref_no | amount    |  
+------------+---------------------+------------+-----------+--------------+----------------+-----------+  
|          1 | 2021-12-01 09:48:32 | JOHN       | DOE       | CASH         | -              | 5000.2500 |  
|          2 | 2021-12-01 09:48:42 | MARY       | SMITH     | CHEQUE       | 985            |  985.6500 |  
|          3 | 2021-12-01 09:48:55 | ANN        | JACOBS    | PAYPAL       | ABC-XYZ        |   15.2500 |  
+------------+---------------------+------------+-----------+--------------+----------------+-----------+  
3 rows in set (0.00 sec)  

Damit ist bestätigt, dass die Anwendung die Daten korrekt aus der Warteschlange gelesen und verarbeitet hat.

Fazit

In dieser Anleitung hast du erfolgreich ein Message-Queue-System mit Golang, Redis® und MySQL 8 unter Linux implementiert. Durch den Einsatz der Redis®-Befehle RPush und BLPop wurde die Entkopplung von Datenerfassung und Verarbeitung erreicht – ein wichtiger Schritt zur Skalierung und Stabilität moderner Anwendungen.

Quelle: vultr.com

Jetzt 200€ Guthaben sichern

Registrieren Sie sich jetzt in unserer ccloud³ und erhalten Sie 200€ Startguthaben für Ihr Projekt.

Das könnte Sie auch interessieren: