Schnellere agentenbasierte LLM-Workflows mit asynchronen Python-Aufrufen erstellen

Große Sprachmodelle können im produktiven Einsatz anspruchsvoll sein, da sie ungenaue Antworten, uneinheitliches Verhalten oder spürbare Verzögerungen verursachen können. Je leistungsfähiger Modelle werden, desto naheliegender erscheint es, jedem Prompt mehr Kontext und ausführlichere Anweisungen mitzugeben und davon auszugehen, dass das Modell alles in einem einzigen Schritt korrekt verarbeitet. Wird jedoch ein einzelner System-Prompt an ein vergleichsweise großes Modell gesendet, kann die Antwortzeit häufig zwischen 3 und 15 Sekunden liegen. Für zeitkritische Agenten, etwa sprachbasierte Telefonassistenten, ist eine solche Pause zu lang.

Ein schnellerer Ansatz besteht darin, die Gesamtaufgabe in mehrere kleinere Teilaufgaben aufzuteilen und diese im LLM-Workflow parallel ausführen zu lassen. Dadurch können kleinere Modelle einzelne Aufgaben leichter zuverlässig bearbeiten. Gleichzeitig lässt sich die Latenz deutlich reduzieren und in vielen Fällen können auch die Betriebskosten sinken.

Es gibt verschiedene Möglichkeiten, agentenbasierte Workflows parallel auszuführen. In vielen agentischen Systemen ist jedoch der eigentliche Aufruf des LLM der zentrale Engpass. Dieser Engpass lässt sich verringern, indem mehrere LLM-API-Anfragen gleichzeitig mit den Python-Bibliotheken asyncio und aiohttp gesendet werden.

In diesem Tutorial erstellst du mit Python agentenbasierte Workflows, die mehrere LLM-Anfragen asynchron versenden. Dadurch kannst du schnellere und genauere LLM-Anwendungen entwickeln, die auf einem virtuellen Server, GPU-Server oder in einer serverlosen Umgebung als API-Endpunkt bereitgestellt werden können.

Du erstellst einen Kundenservice-Workflow für das Telefonsystem einer Immobilienagentur. Der Workflow beantwortet Fragen zu Immobilienangeboten, plant Termine und verbindet Anrufer mit menschlichen Ansprechpartnern. Dafür sendet er mehrere asynchrone Anfragen an unterschiedliche LLM-Endpunkte, reduziert die Antwortzeit und bietet mehr Flexibilität bei der Auswahl des passenden Modells für jede Aufgabe.

Wichtige Erkenntnisse

  • Asynchrone Anfragen an kleine, schnelle Modelle können dabei helfen, effektive agentenbasierte Workflows mit geringer Latenz für Anwendungen zu erstellen, bei denen kurze Antwortzeiten entscheidend sind.
  • Multi-Modell-Workflows ermöglichen es, jeden Prompt gezielt auf ein bestimmtes Modell abzustimmen. Dadurch können der gesamte Rechenaufwand sowie die Kosten für selbst gehostete LLM-Bereitstellungen und externe API-Nutzung reduziert werden.

Voraussetzungen

Für dieses Tutorial benötigst du:

  • Eine lokale Entwicklungsumgebung für Python 3.
  • Zugriff auf eine LLM-API. In diesem Tutorial wird das Modell Mistral-Small-3.2-24B verwendet, das mit vLLM auf einem GPU-Server bereitgestellt wird. Du kannst einen GPU-basierten Server mit vLLM einrichten, indem du einer passenden Anleitung für den Betrieb von vLLM in deiner bevorzugten GPU-Umgebung folgst.
  • Grundkenntnisse der Python-Syntax async und await. Diese sind optional, aber hilfreich.

Schritt 1 — Umgebung einrichten

Bereite zunächst deine Umgebung vor und installiere die erforderlichen Abhängigkeiten. In diesem Tutorial werden Anfragen an ein Mistral-Small-3.2-24B-Modell gesendet, das auf einem H200-GPU-Server bereitgestellt wurde. Du kannst entweder Anfragen an dein eigenes bereitgestelltes Modell senden oder eine externe LLM-API verwenden. Dieses Tutorial sendet nur Anfragen an ein einzelnes LLM. Bei Bedarf kannst du jedoch für jeden Prompt unterschiedliche Modelle von verschiedenen Anbietern verwenden.

Erstelle eine virtuelle Umgebung und installiere die benötigte Abhängigkeit:

python3 -m venv venv
source venv/bin/activate
pip install aiohttp

Da die Modellanfragen asynchron ausgeführt werden, musst du asynchrone Funktionen definieren, die diese Aufrufe verwalten können. Dafür verwendest du die Python-Bibliotheken asyncio und aiohttp. Wenn du eine andere LLM-API nutzt, installiere bei Bedarf die Client-Bibliothek des jeweiligen Anbieters, um mit dem Endpunkt zu kommunizieren.

Erstelle eine neue Datei mit dem Namen agentic_workflows.py. Füge den folgenden Code ein, um die erforderlichen Abhängigkeiten zu importieren und den API-Endpunkt zu definieren.

Ersetze your_server_ip durch die IP-Adresse deiner GPU-Server-Bereitstellung oder definiere den Client für die von dir verwendete LLM-API:

agentic_workflows.py

import asyncio
import aiohttp

# URL deines vLLM-Servers
VLLM_SERVER_URL = "http://your_server_ip:8000/v1/chat/completions"

Schritt 2 — Asynchrone Aufruflogik schreiben

Die asynchrone Aufruflogik sollte eine Liste von Dictionaries entgegennehmen. Jedes Dictionary enthält die Modell-ID und den Prompt, der an das LLM gesendet werden soll. Die Funktion soll alle Anfragen abschicken, ohne auf die einzelne Antwort einer Anfrage zu warten, bevor die nächste gestartet wird. Jeder Prompt enthält den Gesprächsverlauf und den System-Prompt für genau diesen Aufruf. Zusätzlich fügst du eine synchrone Funktion call_models hinzu, die in dein Hauptskript importiert und dort genutzt werden kann.

Füge die folgenden asynchronen Funktionen zu agentic_workflows.py hinzu:

agentic_workflows.py

async def _call_single_model(call_spec):
    """Führt einen asynchronen Aufruf an den GPU-Server mit einem bestimmten Modell und Nachrichten aus"""
    model_id = call_spec["model_id"]
    messages = call_spec["messages"]
    
    payload = {
        "model": model_id,
        "messages": messages,
        "max_tokens": 100, 
        "temperature": 0.1 
    }
    
    async with aiohttp.ClientSession() as session:
        async with session.post(VLLM_SERVER_URL, json=payload) as response:
            result = await response.json()
            
            message = result["choices"][0]["message"]["content"]
            return message

async def _call_models_async(call_list):
    """Ruft mehrere Modelle asynchron auf und gibt die Antworten in derselben Reihenfolge zurück"""
    # Tasks für alle Modellaufrufe erstellen
    tasks = [_call_single_model(call_spec) for call_spec in call_list]
    
    # Alle Tasks parallel ausführen und Antworten geordnet zurückgeben
    responses = await asyncio.gather(*tasks)
    return responses

def call_models(call_list):
    return asyncio.run(_call_models_async(call_list))

Die Methode aiohttp.ClientSession() erstellt eine asynchrone Client-Sitzung, die POST-Anfragen an den API-Endpunkt sendet und HTTP-Antworten verarbeitet, ohne den restlichen Programmablauf zu blockieren. So können mehrere LLM-API-Aufrufe gleichzeitig statt nacheinander ausgeführt werden.

Die Funktion _call_models_async() sammelt die übergebenen Anfragedetails und sendet alle API-Aufrufe, ohne vor dem Start des nächsten Aufrufs auf eine Antwort zu warten. Sobald die Antworten eintreffen, werden sie in einer Liste in derselben Reihenfolge gespeichert, in der die Anfragen gesendet wurden. Die Funktion call_models() ist synchron und erleichtert dadurch den Aufruf des asynchronen Workflows aus anderem Code heraus.

Schritt 3 — Prompts schreiben

Nachdem die grundlegende Logik für asynchrone API-Anfragen vorbereitet ist, musst du die Prompts definieren. Dieser Workflow verwendet drei separate System-Prompts. Ein Prompt verarbeitet Preisfragen, ein weiterer behandelt Terminwünsche, und ein dritter erkennt andere angebotsbezogene Fragen, die an einen menschlichen Ansprechpartner weitergeleitet werden sollen. Du kannst später weitere Prompts ergänzen, auch solche mit stärkerer autonomer Entscheidungslogik. Für dieses Beispiel reichen diese drei Prompts aus.

agentic_workflows.py

SYSTEM_PROMPT_CONFIGURATIONS = {
    "pricing_prompt": {
        "model_id": "mistralai/Mistral-Small-3.2-24B-Instruct-2506", 
        "prompt": "You are a customer service agent. Determine if the user's most recent request is asking about the price of a listing. If they are asking about the price of a listing AND if they have included the listing_id, return only the listing_id of the item they are asking about in the following format: 'listing_id: XXXXXX'. \nIf they are asking about the pricing of a listing AND did NOT mention the specific listing_id number, ask them for the listing id number. If they are requesting something other than the price of a listing: return only the word 'false'."
    },
    "scheduling_prompt": {
        "model_id": "mistralai/Mistral-Small-3.2-24B-Instruct-2506", 
        "prompt": "You are a customer service agent. Determine if the user's most recent request is asking to schedule a call with a real estate agent. If they are trying to schedule a call, return the date and time they would like to schedule the call in the following format: 'date: YYYY-MM-DD, time: HH:MM'. If they are trying to schedule a call but they did not mention the specific date and time they are available, ask them what day and time they are available. Do not provide any additional information, such as the agent's availability. If they are not asking to schedule a call, return only the word 'false'." 
    }, 
    "listing_prompt": {
        "model_id": "mistralai/Mistral-Small-3.2-24B-Instruct-2506", 
        "prompt": "You are a customer service agent. Determine if the user is asking a question about a listing. If they are asking a question about a listing, return only the word 'true'. Otherwise, return only the word 'false'."
    }
}

Diese System-Prompt-Konfigurationen enthalten den Prompt-Namen für die spätere Referenzierung, die Modell-ID des Modells, an das der Prompt gesendet werden soll, sowie den Prompt-Text, der mit dem Gesprächsverlauf kombiniert wird.

Schritt 4 — Benutzereingaben durch die Modelle verarbeiten

Als Nächstes erstellst du eine Funktion, die den Gesprächsverlauf des Nutzers entgegennimmt, jeden System-Prompt zu diesem Verlauf hinzufügt und die asynchronen Anfragen mit der zuvor erstellten Funktion call_models() an das jeweils passende Modell sendet. Dieses Beispiel geht davon aus, dass der Gesprächsverlauf im Frontend im selben Format gespeichert wird, das auch von der API akzeptiert wird, und zusammen mit der neuesten Nutzereingabe übermittelt wird.

Füge die folgende Funktion zu agentic_workflows.py hinzu:

agentic_workflows.py

def run_agentic_workflow(conversation_history):
    model_calls_list = []
    prompt_names = []  # Keep track of prompt order for response mapping
    
    for prompt_name, config in SYSTEM_PROMPT_CONFIGURATIONS.items():
        model_id = config["model_id"]
        system_prompt = config["prompt"]
        
        # Construct the full message prompt: system prompt + conversation history
        full_messages = [{"role": "system", "content": system_prompt}] + conversation_history
        
        model_calls_list.append({
            "model_id": model_id,
            "messages": full_messages
        })
        prompt_names.append(prompt_name) 
    
    
    prompt_responses = call_models(model_calls_list)

Diese Funktion erstellt für jeden System-Prompt eine Liste von Dictionaries. Jedes Dictionary enthält die Modell-ID und den vollständigen Nachrichtensatz, bestehend aus System-Prompt und Gesprächsverlauf. Anschließend ruft sie mit der Funktion call_models() jedes Modell für jeden System-Prompt asynchron auf.

Nachdem die Modelle aufgerufen wurden, werden die Antworten in der Variable prompt_responses gespeichert. Die Reihenfolge entspricht dabei der Reihenfolge, in der die Anfragen gesendet wurden. Danach müssen diese Antworten konkreten Variablen zugeordnet werden.

Erweitere die Funktion run_agentic_workflow wie folgt:

agentic_workflows.py

    # Map responses to their respective prompts
    pricing_response = prompt_responses[prompt_names.index("pricing_prompt")]
    scheduling_response = prompt_responses[prompt_names.index("scheduling_prompt")]
    listing_response = prompt_responses[prompt_names.index("listing_prompt")]

Schritt 5 — Workflow-Logik definieren

Zum Schluss ergänzt du diese Funktion um eine Logikschicht. Diese Ebene prüft die Antworten und entscheidet, welcher nächste Schritt ausgeführt werden soll. Sie kann einen Angebotspreis in einer Datenbank nachschlagen, den Anruf an einen menschlichen Ansprechpartner weiterleiten, einen Termin mit einem Immobilienberater planen oder dem Nutzer eine Rückfrage stellen, wenn weitere Informationen benötigt werden.

Füge der Funktion run_agentic_workflow in agentic_workflows.py weiterhin folgenden Code hinzu:

agentic_workflows.py

    # Route 1: Handle pricing inquiries
    if pricing_response.lower() != "false":
        if pricing_response.startswith("listing_id:"):
            # Extract listing ID from response
            listing_id = pricing_response.split("listing_id: ")[1].strip()
            
            # Simulate database/API lookup
            # You can add database querying logic here to look for a target listing ID. For our example, we will use a simple Python dictionary
            example_price_database = {
                "123456": "$350,000",
                "654321": "$450,000", 
                "112233": "$550,000"
            }
            
            found_price = example_price_database.get(listing_id)
            if found_price:
                final_response = f"The price for listing {listing_id} is {found_price}."
            else:
                final_response = "We are unable to find that listing ID in our records. Are you sure you have the correct listing ID?"
        else:
            final_response = pricing_response  # Response asking for listing ID
    
    # Route 2: Handle scheduling requests
    elif scheduling_response.lower() != "false":
        if scheduling_response.startswith("date:"):
            final_response = f"Perfect! I've scheduled a call for you on that date and time. A sales representative will reach out to you at that time."
            # In production: Add logic to actually book the appointment, and consider customizing the message to confirm the date and time selected.
        else:
            final_response = scheduling_response  # Response asking for specific time
    
    # Route 3: Handle general listing questions  
    elif listing_response.lower() != "false":
        final_response = "Please hold while I transfer you to a specialist for further assistance."
        # In production: Add logic to transfer chat to human representative
        # You could alternatively add logic to access listing details and answer the user's specific question

    else:
        final_response = "I apologize, I'm not sure how I can help with that. Let me transfer you to a human representative who can better assist you."
        # In production: Add logic to transfer chat to human representative
    
    return final_response

Schritt 6 — Workflow testen

Nachdem der Code für einen einfachen agentenbasierten Workflow fertiggestellt ist, kannst du testen, wie er drei verschiedene Kategorien von Kundenfragen verarbeitet. Zunächst testest du ein Gespräch über den Preis eines Immobilienangebots. In diesem Beispiel nennt der Nutzer in der ersten Anfrage keine Angebots-ID, daher stellt das Modell eine Rückfrage. Anschließend übermittelt der Nutzer die ID, und das Modell gibt den passenden Preis zurück.

Erstelle eine neue Datei mit dem Namen test_workflow.py und füge den folgenden Testcode ein:

test_workflow.py

from agentic_workflows import run_agentic_workflow

conversation_history = [
    {"role": "user", "content": "Hi, can you tell me the price of one of your listings?"},
]
final_response = run_agentic_workflow(conversation_history)
print(f"Response: {final_response}")

Führe das Testskript aus:

Du siehst eine Ausgabe, die ungefähr so aussieht:

Ausgabe

Response: Could you please provide the listing_id of the item you're asking about?

Teste nun ein vollständiges Gespräch, bei dem der Nutzer die Angebots-ID angibt:

test_workflow.py

conversation_history = [
    {"role": "user", "content": "Hi, can you tell me the price of one of your listings?"},
    {"role": "assistant", "content": "Could you please provide the listing_id of the item you're asking about?"},
    {"role": "user", "content": "Yes, the listing ID is 123456"},
]
final_response = run_agentic_workflow(conversation_history)
print(f"Response: {final_response}")

Führe das Skript erneut aus:

Du erhältst die Preisantwort:

Ausgabe

Response: The price for listing 123456 is $350,000.

Fazit

In diesem Tutorial hast du einen parallelen agentenbasierten Workflow von Grund auf mit Python erstellt. Du hast asynchrone Funktionen geschrieben, die mehrere LLM-Anfragen gleichzeitig versenden, mehrere System-Prompts für unterschiedliche Aufgaben konfiguriert und eine Routing-Logik ergänzt, um Nutzeranfragen zu verarbeiten.

Damit dieses Framework produktionsreif wird, musst du es testen, Prompts überarbeiten, Modelle austauschen und erneut mit unterschiedlichen Gesprächsverläufen testen, bis Genauigkeit und Geschwindigkeit deinen Anforderungen entsprechen. Bei dieser Art von Workflow sollte jeder Prompt einzeln geprüft und kontinuierlich verbessert werden. In diesem Beispiel wurde das Modell Mistral-Small-3.2-24B ausgewählt, weil es eine geringe Latenz bietet, mit einer mittleren Antwortzeit von unter 0,5 Sekunden arbeitet, konkrete Anweisungen zuverlässig befolgt und auf einer einzelnen GPU bereitgestellt werden kann. Auch GPT-OSS-120b, GPT-OSS-20b und Llama 3.1 8b wurden getestet, doch für diesen konkreten Anwendungsfall erzielte das Mistral-Modell die besten Ergebnisse. Die Auswahl des passenden Modells für jeden Prompt oder jede Aufgabe ist ein iterativer Prozess.

Wenn du Agenten mit mehr Autonomie einsetzen möchtest, kannst du den System-Prompts mehr oder weniger Eigenständigkeit geben, indem du zusätzlichen Kontext oder Zugriff auf weitere Tools bereitstellst. Außerdem lassen sich nachgelagerte LLM-Aufrufe für Schlussfolgerungen oder andere Zwecke miteinander verketten. Erfordert ein Prompt jedoch mehrere API-Aufrufe nacheinander, erhöht sich die Gesamtlatenz, da der Workflow nicht mehr nur auf eine einzelne Gruppe asynchroner Modellaufrufe wartet.

Das Mistral-Modell in diesem Tutorial wurde mit vLLM bereitgestellt, sodass das Prompt-Format nicht für verschiedene Modelle angepasst werden musste. OpenAI, Claude und Open-Source-Modelle können jedoch unterschiedliche Anforderungen an das Format der Prompts stellen, die an ihre APIs gesendet werden. Wenn du für einzelne Aufrufe unterschiedliche Modellanbieter nutzt, musst du möglicherweise Python-Funktionen schreiben, die Prompts in das jeweils erforderliche Format umwandeln. Beispielsweise kann eine Funktion erforderlich sein, die einen vLLM-formatierten Prompt in ein Claude-kompatibles Prompt-Format konvertiert.

Einige Modelle können deutlich langsamer antworten als andere. Dadurch kann es passieren, dass der Workflow auf die langsamste Antwort warten muss, bevor alle Ergebnisse verfügbar sind. Mit der Python-Bibliothek asyncio kannst du Modellantworten direkt verarbeiten, sobald sie eintreffen, indem du asyncio.as_completed() anstelle von asyncio.gather() verwendest. Anschließend kannst du die frühesten Antworten prüfen, um festzustellen, ob sie bereits die passende Route enthalten, und fortfahren, ohne auf alle übrigen Antworten zu warten. Dafür wären ebenfalls Anpassungen an der Logikschicht erforderlich.

Wenn du weitere System-Prompts ergänzt, wird der wichtigste Engpass voraussichtlich in den Rate Limits der LLM-API oder in den Verarbeitungsgrenzen deiner GPU liegen. Dem kannst du begegnen, indem du ein eigenes Modell auf einem GPU-Server hostest, API-Limits erhöhst, Anfragen auf mehrere APIs verteilst sowie Wiederholungslogik, Timeout-Behandlung und Fehlerbehandlung für Situationen implementierst, in denen Rate Limits erreicht werden.

Quelle: digitalocean.com

Jetzt 200€ Guthaben sichern

Registrieren Sie sich jetzt in unserer ccloud³ und erhalten Sie 200€ Startguthaben für Ihr Projekt.

Das könnte Sie auch interessieren:

Moderne Hosting Services mit Cloud Server, Managed Server und skalierbarem Cloud Hosting für professionelle IT-Infrastrukturen

Kimi Linear: Effiziente KI-Inferenz für lange Kontexte

AI/ML, Tutorial
Vijonavor 43 Minuten Kimi Linear: Eine hardwarebewusste Architektur für effiziente KI-Inferenz mit langen Kontexten Moonshot AI hat erneut eine bemerkenswerte Veröffentlichung vorgestellt. Nachdem Kimi-K2 und der dazugehörige Post-Training-Ansatz bereits einen starken…
Moderne Hosting Services mit Cloud Server, Managed Server und skalierbarem Cloud Hosting für professionelle IT-Infrastrukturen

Apache Airflow: Workflow-Orchestrierung erklärt

Python, Tutorial
Vijonavor 1 Stunde Apache Airflow: Workflow-Orchestrierung für Datenpipelines Moderne datengetriebene Organisationen arbeiten mit Pipelines, die Informationen erfassen, umwandeln, anreichern und von einem System in ein anderes übertragen. Solche Datenpipelines bestehen häufig…