Complete implementation guide with Python examples
This comprehensive tutorial will guide you through implementing A2A Protocol in Python, from basic message handling to building a complete agent system.
# Create virtual environment
python -m venv a2a_env
source a2a_env/bin/activate # On Windows: a2a_env\Scripts\activate
# Install dependencies
pip install requests asyncio aiohttp websockets pydantic
pip install fastapi uvicorn # For building agent servers
pip install pytest pytest-asyncio # For testing
import json
import uuid
from datetime import datetime
from typing import Dict, Any, Optional
class A2AMessage:
def __init__(self, message_type: str, sender_id: str, recipient_id: str,
payload: Dict[str, Any], priority: str = "NORMAL"):
self.protocol_version = "1.0"
self.message_id = str(uuid.uuid4())
self.timestamp = datetime.utcnow().isoformat() + "Z"
self.message_type = message_type
self.priority = priority
self.sender = {"agent_id": sender_id}
self.recipient = {"agent_id": recipient_id}
self.payload = payload
def to_dict(self) -> Dict[str, Any]:
return {
"protocol_version": self.protocol_version,
"message_id": self.message_id,
"timestamp": self.timestamp,
"message_type": self.message_type,
"priority": self.priority,
"sender": self.sender,
"recipient": self.recipient,
"payload": self.payload
}
def to_json(self) -> str:
return json.dumps(self.to_dict(), indent=2)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'A2AMessage':
message = cls(
message_type=data["message_type"],
sender_id=data["sender"]["agent_id"],
recipient_id=data["recipient"]["agent_id"],
payload=data["payload"],
priority=data.get("priority", "NORMAL")
)
message.message_id = data["message_id"]
message.timestamp = data["timestamp"]
return message
import asyncio
import aiohttp
from typing import Dict, Any, Optional
class A2AClient:
def __init__(self, agent_id: str, endpoint: str):
self.agent_id = agent_id
self.endpoint = endpoint
self.session: Optional[aiohttp.ClientSession] = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def send_request(self, target_agent: str, service: str,
parameters: Dict[str, Any]) -> Dict[str, Any]:
"""Send a REQUEST message to another agent"""
message = A2AMessage(
message_type="REQUEST",
sender_id=self.agent_id,
recipient_id=target_agent,
payload={
"service": service,
"parameters": parameters,
"response_required": True
}
)
async with self.session.post(
f"{self.endpoint}/a2a/message",
json=message.to_dict(),
headers={"Content-Type": "application/json"}
) as response:
if response.status == 200:
return await response.json()
else:
raise Exception(f"Request failed: {response.status}")
async def discover_agents(self, capability: str) -> list:
"""Discover agents with specific capabilities"""
discovery_message = A2AMessage(
message_type="REQUEST",
sender_id=self.agent_id,
recipient_id="registry_service",
payload={
"service": "discover_agents",
"parameters": {
"capability": capability,
"max_results": 10
}
}
)
# Implementation would connect to registry service
# For demo purposes, return mock data
return [
{"agent_id": "data_processor_001", "endpoint": "https://dp1.example.com"},
{"agent_id": "text_analyzer_002", "endpoint": "https://ta2.example.com"}
]
This example demonstrates how multiple agents collaborate to handle customer inquiries.
class CustomerServiceOrchestrator:
def __init__(self, agent_id: str):
self.agent_id = agent_id
self.client = A2AClient(agent_id, "https://orchestrator.example.com")
async def handle_customer_inquiry(self, inquiry: str, customer_id: str):
"""Handle customer inquiry using multiple specialized agents"""
# Step 1: Analyze inquiry intent
intent_result = await self.client.send_request(
target_agent="intent_analyzer",
service="analyze_intent",
parameters={
"text": inquiry,
"customer_id": customer_id
}
)
intent = intent_result["payload"]["intent"]
confidence = intent_result["payload"]["confidence"]
# Step 2: Route to appropriate specialist agent
if intent == "technical_support" and confidence > 0.8:
response = await self.handle_technical_support(inquiry, customer_id)
elif intent == "billing_inquiry":
response = await self.handle_billing_inquiry(inquiry, customer_id)
elif intent == "product_recommendation":
response = await self.handle_product_recommendation(inquiry, customer_id)
else:
response = await self.handle_general_inquiry(inquiry, customer_id)
# Step 3: Generate customer response
final_response = await self.client.send_request(
target_agent="response_generator",
service="generate_customer_response",
parameters={
"intent": intent,
"agent_response": response,
"customer_id": customer_id,
"tone": "friendly"
}
)
return final_response["payload"]["response"]
async def handle_technical_support(self, inquiry: str, customer_id: str):
"""Route technical inquiries to support specialist"""
return await self.client.send_request(
target_agent="technical_support_agent",
service="solve_technical_issue",
parameters={
"issue_description": inquiry,
"customer_id": customer_id,
"priority": "HIGH"
}
)
async def handle_billing_inquiry(self, inquiry: str, customer_id: str):
"""Handle billing-related questions"""
# Get customer billing information
billing_info = await self.client.send_request(
target_agent="billing_agent",
service="get_customer_billing",
parameters={"customer_id": customer_id}
)
# Analyze billing inquiry
return await self.client.send_request(
target_agent="billing_agent",
service="analyze_billing_inquiry",
parameters={
"inquiry": inquiry,
"billing_data": billing_info["payload"]["billing_data"]
}
)
import pytest
import asyncio
from unittest.mock import AsyncMock, patch
@pytest.mark.asyncio
async def test_a2a_message_creation():
"""Test basic A2A message creation"""
message = A2AMessage(
message_type="REQUEST",
sender_id="test_agent_001",
recipient_id="test_agent_002",
payload={"service": "test_service", "data": "test_data"}
)
assert message.message_type == "REQUEST"
assert message.sender["agent_id"] == "test_agent_001"
assert message.recipient["agent_id"] == "test_agent_002"
assert message.protocol_version == "1.0"
@pytest.mark.asyncio
async def test_agent_client_request():
"""Test agent client request functionality"""
with patch('aiohttp.ClientSession.post') as mock_post:
# Mock successful response
mock_response = AsyncMock()
mock_response.status = 200
mock_response.json.return_value = {
"message_type": "RESPONSE",
"payload": {"result": "success", "data": "processed_data"}
}
mock_post.return_value.__aenter__.return_value = mock_response
async with A2AClient("test_agent", "http://test.example.com") as client:
result = await client.send_request(
target_agent="target_agent",
service="test_service",
parameters={"input": "test_input"}
)
assert result["payload"]["result"] == "success"
assert result["payload"]["data"] == "processed_data"
@pytest.mark.asyncio
async def test_customer_service_orchestrator():
"""Test customer service orchestrator workflow"""
orchestrator = CustomerServiceOrchestrator("orchestrator_001")
with patch.object(orchestrator.client, 'send_request') as mock_request:
# Mock intent analysis response
mock_request.side_effect = [
{"payload": {"intent": "technical_support", "confidence": 0.9}},
{"payload": {"solution": "Restart your device"}},
{"payload": {"response": "Please try restarting your device."}}
]
response = await orchestrator.handle_customer_inquiry(
"My device is not working",
"customer_123"
)
assert "restart" in response.lower()
assert mock_request.call_count == 3
Implement comprehensive error handling and retry mechanisms
Use authentication tokens and validate all incoming messages
Implement connection pooling and async processing
Add comprehensive logging and metrics collection
Design for horizontal scaling and load distribution
Document agent capabilities and API contracts