Schnellere agentenbasierte LLM-Workflows mit asynchronen Python-Aufrufen erstellen
Große Sprachmodelle können im produktiven Einsatz anspruchsvoll sein, da sie ungenaue Antworten, uneinheitliches Verhalten oder spürbare Verzögerungen verursachen können. Je leistungsfähiger Modelle werden, desto naheliegender erscheint es, jedem Prompt mehr Kontext und ausführlichere Anweisungen mitzugeben und davon auszugehen, dass das Modell alles in einem einzigen Schritt korrekt verarbeitet. Wird jedoch ein einzelner System-Prompt an ein vergleichsweise großes Modell gesendet, kann die Antwortzeit häufig zwischen 3 und 15 Sekunden liegen. Für zeitkritische Agenten, etwa sprachbasierte Telefonassistenten, ist eine solche Pause zu lang.
Ein schnellerer Ansatz besteht darin, die Gesamtaufgabe in mehrere kleinere Teilaufgaben aufzuteilen und diese im LLM-Workflow parallel ausführen zu lassen. Dadurch können kleinere Modelle einzelne Aufgaben leichter zuverlässig bearbeiten. Gleichzeitig lässt sich die Latenz deutlich reduzieren und in vielen Fällen können auch die Betriebskosten sinken.
Es gibt verschiedene Möglichkeiten, agentenbasierte Workflows parallel auszuführen. In vielen agentischen Systemen ist jedoch der eigentliche Aufruf des LLM der zentrale Engpass. Dieser Engpass lässt sich verringern, indem mehrere LLM-API-Anfragen gleichzeitig mit den Python-Bibliotheken asyncio und aiohttp gesendet werden.
In diesem Tutorial erstellst du mit Python agentenbasierte Workflows, die mehrere LLM-Anfragen asynchron versenden. Dadurch kannst du schnellere und genauere LLM-Anwendungen entwickeln, die auf einem virtuellen Server, GPU-Server oder in einer serverlosen Umgebung als API-Endpunkt bereitgestellt werden können.
Du erstellst einen Kundenservice-Workflow für das Telefonsystem einer Immobilienagentur. Der Workflow beantwortet Fragen zu Immobilienangeboten, plant Termine und verbindet Anrufer mit menschlichen Ansprechpartnern. Dafür sendet er mehrere asynchrone Anfragen an unterschiedliche LLM-Endpunkte, reduziert die Antwortzeit und bietet mehr Flexibilität bei der Auswahl des passenden Modells für jede Aufgabe.
Wichtige Erkenntnisse
- Asynchrone Anfragen an kleine, schnelle Modelle können dabei helfen, effektive agentenbasierte Workflows mit geringer Latenz für Anwendungen zu erstellen, bei denen kurze Antwortzeiten entscheidend sind.
- Multi-Modell-Workflows ermöglichen es, jeden Prompt gezielt auf ein bestimmtes Modell abzustimmen. Dadurch können der gesamte Rechenaufwand sowie die Kosten für selbst gehostete LLM-Bereitstellungen und externe API-Nutzung reduziert werden.
Voraussetzungen
Für dieses Tutorial benötigst du:
- Eine lokale Entwicklungsumgebung für Python 3.
- Zugriff auf eine LLM-API. In diesem Tutorial wird das Modell Mistral-Small-3.2-24B verwendet, das mit vLLM auf einem GPU-Server bereitgestellt wird. Du kannst einen GPU-basierten Server mit vLLM einrichten, indem du einer passenden Anleitung für den Betrieb von vLLM in deiner bevorzugten GPU-Umgebung folgst.
- Grundkenntnisse der Python-Syntax
asyncundawait. Diese sind optional, aber hilfreich.
Schritt 1 — Umgebung einrichten
Bereite zunächst deine Umgebung vor und installiere die erforderlichen Abhängigkeiten. In diesem Tutorial werden Anfragen an ein Mistral-Small-3.2-24B-Modell gesendet, das auf einem H200-GPU-Server bereitgestellt wurde. Du kannst entweder Anfragen an dein eigenes bereitgestelltes Modell senden oder eine externe LLM-API verwenden. Dieses Tutorial sendet nur Anfragen an ein einzelnes LLM. Bei Bedarf kannst du jedoch für jeden Prompt unterschiedliche Modelle von verschiedenen Anbietern verwenden.
Erstelle eine virtuelle Umgebung und installiere die benötigte Abhängigkeit:
python3 -m venv venv
source venv/bin/activate
pip install aiohttp
Da die Modellanfragen asynchron ausgeführt werden, musst du asynchrone Funktionen definieren, die diese Aufrufe verwalten können. Dafür verwendest du die Python-Bibliotheken asyncio und aiohttp. Wenn du eine andere LLM-API nutzt, installiere bei Bedarf die Client-Bibliothek des jeweiligen Anbieters, um mit dem Endpunkt zu kommunizieren.
Erstelle eine neue Datei mit dem Namen agentic_workflows.py. Füge den folgenden Code ein, um die erforderlichen Abhängigkeiten zu importieren und den API-Endpunkt zu definieren.
Ersetze your_server_ip durch die IP-Adresse deiner GPU-Server-Bereitstellung oder definiere den Client für die von dir verwendete LLM-API:
agentic_workflows.py
import asyncio
import aiohttp
# URL deines vLLM-Servers
VLLM_SERVER_URL = "http://your_server_ip:8000/v1/chat/completions"
Schritt 2 — Asynchrone Aufruflogik schreiben
Die asynchrone Aufruflogik sollte eine Liste von Dictionaries entgegennehmen. Jedes Dictionary enthält die Modell-ID und den Prompt, der an das LLM gesendet werden soll. Die Funktion soll alle Anfragen abschicken, ohne auf die einzelne Antwort einer Anfrage zu warten, bevor die nächste gestartet wird. Jeder Prompt enthält den Gesprächsverlauf und den System-Prompt für genau diesen Aufruf. Zusätzlich fügst du eine synchrone Funktion call_models hinzu, die in dein Hauptskript importiert und dort genutzt werden kann.
Füge die folgenden asynchronen Funktionen zu agentic_workflows.py hinzu:
agentic_workflows.py
async def _call_single_model(call_spec):
"""Führt einen asynchronen Aufruf an den GPU-Server mit einem bestimmten Modell und Nachrichten aus"""
model_id = call_spec["model_id"]
messages = call_spec["messages"]
payload = {
"model": model_id,
"messages": messages,
"max_tokens": 100,
"temperature": 0.1
}
async with aiohttp.ClientSession() as session:
async with session.post(VLLM_SERVER_URL, json=payload) as response:
result = await response.json()
message = result["choices"][0]["message"]["content"]
return message
async def _call_models_async(call_list):
"""Ruft mehrere Modelle asynchron auf und gibt die Antworten in derselben Reihenfolge zurück"""
# Tasks für alle Modellaufrufe erstellen
tasks = [_call_single_model(call_spec) for call_spec in call_list]
# Alle Tasks parallel ausführen und Antworten geordnet zurückgeben
responses = await asyncio.gather(*tasks)
return responses
def call_models(call_list):
return asyncio.run(_call_models_async(call_list))
Die Methode aiohttp.ClientSession() erstellt eine asynchrone Client-Sitzung, die POST-Anfragen an den API-Endpunkt sendet und HTTP-Antworten verarbeitet, ohne den restlichen Programmablauf zu blockieren. So können mehrere LLM-API-Aufrufe gleichzeitig statt nacheinander ausgeführt werden.
Die Funktion _call_models_async() sammelt die übergebenen Anfragedetails und sendet alle API-Aufrufe, ohne vor dem Start des nächsten Aufrufs auf eine Antwort zu warten. Sobald die Antworten eintreffen, werden sie in einer Liste in derselben Reihenfolge gespeichert, in der die Anfragen gesendet wurden. Die Funktion call_models() ist synchron und erleichtert dadurch den Aufruf des asynchronen Workflows aus anderem Code heraus.
Schritt 3 — Prompts schreiben
Nachdem die grundlegende Logik für asynchrone API-Anfragen vorbereitet ist, musst du die Prompts definieren. Dieser Workflow verwendet drei separate System-Prompts. Ein Prompt verarbeitet Preisfragen, ein weiterer behandelt Terminwünsche, und ein dritter erkennt andere angebotsbezogene Fragen, die an einen menschlichen Ansprechpartner weitergeleitet werden sollen. Du kannst später weitere Prompts ergänzen, auch solche mit stärkerer autonomer Entscheidungslogik. Für dieses Beispiel reichen diese drei Prompts aus.
agentic_workflows.py
SYSTEM_PROMPT_CONFIGURATIONS = {
"pricing_prompt": {
"model_id": "mistralai/Mistral-Small-3.2-24B-Instruct-2506",
"prompt": "You are a customer service agent. Determine if the user's most recent request is asking about the price of a listing. If they are asking about the price of a listing AND if they have included the listing_id, return only the listing_id of the item they are asking about in the following format: 'listing_id: XXXXXX'. \nIf they are asking about the pricing of a listing AND did NOT mention the specific listing_id number, ask them for the listing id number. If they are requesting something other than the price of a listing: return only the word 'false'."
},
"scheduling_prompt": {
"model_id": "mistralai/Mistral-Small-3.2-24B-Instruct-2506",
"prompt": "You are a customer service agent. Determine if the user's most recent request is asking to schedule a call with a real estate agent. If they are trying to schedule a call, return the date and time they would like to schedule the call in the following format: 'date: YYYY-MM-DD, time: HH:MM'. If they are trying to schedule a call but they did not mention the specific date and time they are available, ask them what day and time they are available. Do not provide any additional information, such as the agent's availability. If they are not asking to schedule a call, return only the word 'false'."
},
"listing_prompt": {
"model_id": "mistralai/Mistral-Small-3.2-24B-Instruct-2506",
"prompt": "You are a customer service agent. Determine if the user is asking a question about a listing. If they are asking a question about a listing, return only the word 'true'. Otherwise, return only the word 'false'."
}
}
Diese System-Prompt-Konfigurationen enthalten den Prompt-Namen für die spätere Referenzierung, die Modell-ID des Modells, an das der Prompt gesendet werden soll, sowie den Prompt-Text, der mit dem Gesprächsverlauf kombiniert wird.
Schritt 4 — Benutzereingaben durch die Modelle verarbeiten
Als Nächstes erstellst du eine Funktion, die den Gesprächsverlauf des Nutzers entgegennimmt, jeden System-Prompt zu diesem Verlauf hinzufügt und die asynchronen Anfragen mit der zuvor erstellten Funktion call_models() an das jeweils passende Modell sendet. Dieses Beispiel geht davon aus, dass der Gesprächsverlauf im Frontend im selben Format gespeichert wird, das auch von der API akzeptiert wird, und zusammen mit der neuesten Nutzereingabe übermittelt wird.
Füge die folgende Funktion zu agentic_workflows.py hinzu:
agentic_workflows.py
def run_agentic_workflow(conversation_history):
model_calls_list = []
prompt_names = [] # Keep track of prompt order for response mapping
for prompt_name, config in SYSTEM_PROMPT_CONFIGURATIONS.items():
model_id = config["model_id"]
system_prompt = config["prompt"]
# Construct the full message prompt: system prompt + conversation history
full_messages = [{"role": "system", "content": system_prompt}] + conversation_history
model_calls_list.append({
"model_id": model_id,
"messages": full_messages
})
prompt_names.append(prompt_name)
prompt_responses = call_models(model_calls_list)
Diese Funktion erstellt für jeden System-Prompt eine Liste von Dictionaries. Jedes Dictionary enthält die Modell-ID und den vollständigen Nachrichtensatz, bestehend aus System-Prompt und Gesprächsverlauf. Anschließend ruft sie mit der Funktion call_models() jedes Modell für jeden System-Prompt asynchron auf.
Nachdem die Modelle aufgerufen wurden, werden die Antworten in der Variable prompt_responses gespeichert. Die Reihenfolge entspricht dabei der Reihenfolge, in der die Anfragen gesendet wurden. Danach müssen diese Antworten konkreten Variablen zugeordnet werden.
Erweitere die Funktion run_agentic_workflow wie folgt:
agentic_workflows.py
# Map responses to their respective prompts
pricing_response = prompt_responses[prompt_names.index("pricing_prompt")]
scheduling_response = prompt_responses[prompt_names.index("scheduling_prompt")]
listing_response = prompt_responses[prompt_names.index("listing_prompt")]
Schritt 5 — Workflow-Logik definieren
Zum Schluss ergänzt du diese Funktion um eine Logikschicht. Diese Ebene prüft die Antworten und entscheidet, welcher nächste Schritt ausgeführt werden soll. Sie kann einen Angebotspreis in einer Datenbank nachschlagen, den Anruf an einen menschlichen Ansprechpartner weiterleiten, einen Termin mit einem Immobilienberater planen oder dem Nutzer eine Rückfrage stellen, wenn weitere Informationen benötigt werden.
Füge der Funktion run_agentic_workflow in agentic_workflows.py weiterhin folgenden Code hinzu:
agentic_workflows.py
# Route 1: Handle pricing inquiries
if pricing_response.lower() != "false":
if pricing_response.startswith("listing_id:"):
# Extract listing ID from response
listing_id = pricing_response.split("listing_id: ")[1].strip()
# Simulate database/API lookup
# You can add database querying logic here to look for a target listing ID. For our example, we will use a simple Python dictionary
example_price_database = {
"123456": "$350,000",
"654321": "$450,000",
"112233": "$550,000"
}
found_price = example_price_database.get(listing_id)
if found_price:
final_response = f"The price for listing {listing_id} is {found_price}."
else:
final_response = "We are unable to find that listing ID in our records. Are you sure you have the correct listing ID?"
else:
final_response = pricing_response # Response asking for listing ID
# Route 2: Handle scheduling requests
elif scheduling_response.lower() != "false":
if scheduling_response.startswith("date:"):
final_response = f"Perfect! I've scheduled a call for you on that date and time. A sales representative will reach out to you at that time."
# In production: Add logic to actually book the appointment, and consider customizing the message to confirm the date and time selected.
else:
final_response = scheduling_response # Response asking for specific time
# Route 3: Handle general listing questions
elif listing_response.lower() != "false":
final_response = "Please hold while I transfer you to a specialist for further assistance."
# In production: Add logic to transfer chat to human representative
# You could alternatively add logic to access listing details and answer the user's specific question
else:
final_response = "I apologize, I'm not sure how I can help with that. Let me transfer you to a human representative who can better assist you."
# In production: Add logic to transfer chat to human representative
return final_response
Schritt 6 — Workflow testen
Nachdem der Code für einen einfachen agentenbasierten Workflow fertiggestellt ist, kannst du testen, wie er drei verschiedene Kategorien von Kundenfragen verarbeitet. Zunächst testest du ein Gespräch über den Preis eines Immobilienangebots. In diesem Beispiel nennt der Nutzer in der ersten Anfrage keine Angebots-ID, daher stellt das Modell eine Rückfrage. Anschließend übermittelt der Nutzer die ID, und das Modell gibt den passenden Preis zurück.
Erstelle eine neue Datei mit dem Namen test_workflow.py und füge den folgenden Testcode ein:
test_workflow.py
from agentic_workflows import run_agentic_workflow
conversation_history = [
{"role": "user", "content": "Hi, can you tell me the price of one of your listings?"},
]
final_response = run_agentic_workflow(conversation_history)
print(f"Response: {final_response}")
Führe das Testskript aus:
python3 test_workflow.py
Du siehst eine Ausgabe, die ungefähr so aussieht:
Ausgabe
Response: Could you please provide the listing_id of the item you're asking about?
Teste nun ein vollständiges Gespräch, bei dem der Nutzer die Angebots-ID angibt:
test_workflow.py
conversation_history = [
{"role": "user", "content": "Hi, can you tell me the price of one of your listings?"},
{"role": "assistant", "content": "Could you please provide the listing_id of the item you're asking about?"},
{"role": "user", "content": "Yes, the listing ID is 123456"},
]
final_response = run_agentic_workflow(conversation_history)
print(f"Response: {final_response}")
Führe das Skript erneut aus:
python3 test_workflow.py
Du erhältst die Preisantwort:
Ausgabe
Response: The price for listing 123456 is $350,000.
Fazit
In diesem Tutorial hast du einen parallelen agentenbasierten Workflow von Grund auf mit Python erstellt. Du hast asynchrone Funktionen geschrieben, die mehrere LLM-Anfragen gleichzeitig versenden, mehrere System-Prompts für unterschiedliche Aufgaben konfiguriert und eine Routing-Logik ergänzt, um Nutzeranfragen zu verarbeiten.
Damit dieses Framework produktionsreif wird, musst du es testen, Prompts überarbeiten, Modelle austauschen und erneut mit unterschiedlichen Gesprächsverläufen testen, bis Genauigkeit und Geschwindigkeit deinen Anforderungen entsprechen. Bei dieser Art von Workflow sollte jeder Prompt einzeln geprüft und kontinuierlich verbessert werden. In diesem Beispiel wurde das Modell Mistral-Small-3.2-24B ausgewählt, weil es eine geringe Latenz bietet, mit einer mittleren Antwortzeit von unter 0,5 Sekunden arbeitet, konkrete Anweisungen zuverlässig befolgt und auf einer einzelnen GPU bereitgestellt werden kann. Auch GPT-OSS-120b, GPT-OSS-20b und Llama 3.1 8b wurden getestet, doch für diesen konkreten Anwendungsfall erzielte das Mistral-Modell die besten Ergebnisse. Die Auswahl des passenden Modells für jeden Prompt oder jede Aufgabe ist ein iterativer Prozess.
Wenn du Agenten mit mehr Autonomie einsetzen möchtest, kannst du den System-Prompts mehr oder weniger Eigenständigkeit geben, indem du zusätzlichen Kontext oder Zugriff auf weitere Tools bereitstellst. Außerdem lassen sich nachgelagerte LLM-Aufrufe für Schlussfolgerungen oder andere Zwecke miteinander verketten. Erfordert ein Prompt jedoch mehrere API-Aufrufe nacheinander, erhöht sich die Gesamtlatenz, da der Workflow nicht mehr nur auf eine einzelne Gruppe asynchroner Modellaufrufe wartet.
Das Mistral-Modell in diesem Tutorial wurde mit vLLM bereitgestellt, sodass das Prompt-Format nicht für verschiedene Modelle angepasst werden musste. OpenAI, Claude und Open-Source-Modelle können jedoch unterschiedliche Anforderungen an das Format der Prompts stellen, die an ihre APIs gesendet werden. Wenn du für einzelne Aufrufe unterschiedliche Modellanbieter nutzt, musst du möglicherweise Python-Funktionen schreiben, die Prompts in das jeweils erforderliche Format umwandeln. Beispielsweise kann eine Funktion erforderlich sein, die einen vLLM-formatierten Prompt in ein Claude-kompatibles Prompt-Format konvertiert.
Einige Modelle können deutlich langsamer antworten als andere. Dadurch kann es passieren, dass der Workflow auf die langsamste Antwort warten muss, bevor alle Ergebnisse verfügbar sind. Mit der Python-Bibliothek asyncio kannst du Modellantworten direkt verarbeiten, sobald sie eintreffen, indem du asyncio.as_completed() anstelle von asyncio.gather() verwendest. Anschließend kannst du die frühesten Antworten prüfen, um festzustellen, ob sie bereits die passende Route enthalten, und fortfahren, ohne auf alle übrigen Antworten zu warten. Dafür wären ebenfalls Anpassungen an der Logikschicht erforderlich.
Wenn du weitere System-Prompts ergänzt, wird der wichtigste Engpass voraussichtlich in den Rate Limits der LLM-API oder in den Verarbeitungsgrenzen deiner GPU liegen. Dem kannst du begegnen, indem du ein eigenes Modell auf einem GPU-Server hostest, API-Limits erhöhst, Anfragen auf mehrere APIs verteilst sowie Wiederholungslogik, Timeout-Behandlung und Fehlerbehandlung für Situationen implementierst, in denen Rate Limits erreicht werden.


