A2A Protocol Tutorial

Complete implementation guide with Python examples

Tutorial examples designed for A2A Protocol learning and development

Getting Started with A2A Protocol

This comprehensive tutorial will guide you through implementing A2A Protocol in Python, from basic message handling to building a complete agent system.

Prerequisites

  • • Python 3.8 or higher
  • • Basic understanding of REST APIs
  • • Familiarity with JSON data structures
  • • Knowledge of asynchronous programming (recommended)

Environment Setup

Install Required Dependencies

# Create virtual environment
python -m venv a2a_env
source a2a_env/bin/activate  # On Windows: a2a_env\Scripts\activate

# Install dependencies
pip install requests asyncio aiohttp websockets pydantic
pip install fastapi uvicorn  # For building agent servers
pip install pytest pytest-asyncio  # For testing

Basic Agent Implementation

A2A Message Handler

import json
import uuid
from datetime import datetime
from typing import Dict, Any, Optional

class A2AMessage:
    def __init__(self, message_type: str, sender_id: str, recipient_id: str, 
                 payload: Dict[str, Any], priority: str = "NORMAL"):
        self.protocol_version = "1.0"
        self.message_id = str(uuid.uuid4())
        self.timestamp = datetime.utcnow().isoformat() + "Z"
        self.message_type = message_type
        self.priority = priority
        self.sender = {"agent_id": sender_id}
        self.recipient = {"agent_id": recipient_id}
        self.payload = payload
    
    def to_dict(self) -> Dict[str, Any]:
        return {
            "protocol_version": self.protocol_version,
            "message_id": self.message_id,
            "timestamp": self.timestamp,
            "message_type": self.message_type,
            "priority": self.priority,
            "sender": self.sender,
            "recipient": self.recipient,
            "payload": self.payload
        }
    
    def to_json(self) -> str:
        return json.dumps(self.to_dict(), indent=2)
    
    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> 'A2AMessage':
        message = cls(
            message_type=data["message_type"],
            sender_id=data["sender"]["agent_id"],
            recipient_id=data["recipient"]["agent_id"],
            payload=data["payload"],
            priority=data.get("priority", "NORMAL")
        )
        message.message_id = data["message_id"]
        message.timestamp = data["timestamp"]
        return message

A2A Agent Client

Python Client Implementation

import asyncio
import aiohttp
from typing import Dict, Any, Optional

class A2AClient:
    def __init__(self, agent_id: str, endpoint: str):
        self.agent_id = agent_id
        self.endpoint = endpoint
        self.session: Optional[aiohttp.ClientSession] = None
    
    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.session:
            await self.session.close()
    
    async def send_request(self, target_agent: str, service: str, 
                          parameters: Dict[str, Any]) -> Dict[str, Any]:
        """Send a REQUEST message to another agent"""
        message = A2AMessage(
            message_type="REQUEST",
            sender_id=self.agent_id,
            recipient_id=target_agent,
            payload={
                "service": service,
                "parameters": parameters,
                "response_required": True
            }
        )
        
        async with self.session.post(
            f"{self.endpoint}/a2a/message",
            json=message.to_dict(),
            headers={"Content-Type": "application/json"}
        ) as response:
            if response.status == 200:
                return await response.json()
            else:
                raise Exception(f"Request failed: {response.status}")
    
    async def discover_agents(self, capability: str) -> list:
        """Discover agents with specific capabilities"""
        discovery_message = A2AMessage(
            message_type="REQUEST",
            sender_id=self.agent_id,
            recipient_id="registry_service",
            payload={
                "service": "discover_agents",
                "parameters": {
                    "capability": capability,
                    "max_results": 10
                }
            }
        )
        
        # Implementation would connect to registry service
        # For demo purposes, return mock data
        return [
            {"agent_id": "data_processor_001", "endpoint": "https://dp1.example.com"},
            {"agent_id": "text_analyzer_002", "endpoint": "https://ta2.example.com"}
        ]

Real-world Example: Smart Customer Service

Multi-Agent Customer Service System

This example demonstrates how multiple agents collaborate to handle customer inquiries.

class CustomerServiceOrchestrator:
    def __init__(self, agent_id: str):
        self.agent_id = agent_id
        self.client = A2AClient(agent_id, "https://orchestrator.example.com")
    
    async def handle_customer_inquiry(self, inquiry: str, customer_id: str):
        """Handle customer inquiry using multiple specialized agents"""
        
        # Step 1: Analyze inquiry intent
        intent_result = await self.client.send_request(
            target_agent="intent_analyzer",
            service="analyze_intent",
            parameters={
                "text": inquiry,
                "customer_id": customer_id
            }
        )
        
        intent = intent_result["payload"]["intent"]
        confidence = intent_result["payload"]["confidence"]
        
        # Step 2: Route to appropriate specialist agent
        if intent == "technical_support" and confidence > 0.8:
            response = await self.handle_technical_support(inquiry, customer_id)
        elif intent == "billing_inquiry":
            response = await self.handle_billing_inquiry(inquiry, customer_id)
        elif intent == "product_recommendation":
            response = await self.handle_product_recommendation(inquiry, customer_id)
        else:
            response = await self.handle_general_inquiry(inquiry, customer_id)
        
        # Step 3: Generate customer response
        final_response = await self.client.send_request(
            target_agent="response_generator",
            service="generate_customer_response",
            parameters={
                "intent": intent,
                "agent_response": response,
                "customer_id": customer_id,
                "tone": "friendly"
            }
        )
        
        return final_response["payload"]["response"]
    
    async def handle_technical_support(self, inquiry: str, customer_id: str):
        """Route technical inquiries to support specialist"""
        return await self.client.send_request(
            target_agent="technical_support_agent",
            service="solve_technical_issue",
            parameters={
                "issue_description": inquiry,
                "customer_id": customer_id,
                "priority": "HIGH"
            }
        )
    
    async def handle_billing_inquiry(self, inquiry: str, customer_id: str):
        """Handle billing-related questions"""
        # Get customer billing information
        billing_info = await self.client.send_request(
            target_agent="billing_agent",
            service="get_customer_billing",
            parameters={"customer_id": customer_id}
        )
        
        # Analyze billing inquiry
        return await self.client.send_request(
            target_agent="billing_agent",
            service="analyze_billing_inquiry",
            parameters={
                "inquiry": inquiry,
                "billing_data": billing_info["payload"]["billing_data"]
            }
        )

Testing Your Implementation

Unit Tests Example

import pytest
import asyncio
from unittest.mock import AsyncMock, patch

@pytest.mark.asyncio
async def test_a2a_message_creation():
    """Test basic A2A message creation"""
    message = A2AMessage(
        message_type="REQUEST",
        sender_id="test_agent_001",
        recipient_id="test_agent_002",
        payload={"service": "test_service", "data": "test_data"}
    )
    
    assert message.message_type == "REQUEST"
    assert message.sender["agent_id"] == "test_agent_001"
    assert message.recipient["agent_id"] == "test_agent_002"
    assert message.protocol_version == "1.0"

@pytest.mark.asyncio
async def test_agent_client_request():
    """Test agent client request functionality"""
    
    with patch('aiohttp.ClientSession.post') as mock_post:
        # Mock successful response
        mock_response = AsyncMock()
        mock_response.status = 200
        mock_response.json.return_value = {
            "message_type": "RESPONSE",
            "payload": {"result": "success", "data": "processed_data"}
        }
        mock_post.return_value.__aenter__.return_value = mock_response
        
        async with A2AClient("test_agent", "http://test.example.com") as client:
            result = await client.send_request(
                target_agent="target_agent",
                service="test_service",
                parameters={"input": "test_input"}
            )
            
            assert result["payload"]["result"] == "success"
            assert result["payload"]["data"] == "processed_data"

@pytest.mark.asyncio
async def test_customer_service_orchestrator():
    """Test customer service orchestrator workflow"""
    
    orchestrator = CustomerServiceOrchestrator("orchestrator_001")
    
    with patch.object(orchestrator.client, 'send_request') as mock_request:
        # Mock intent analysis response
        mock_request.side_effect = [
            {"payload": {"intent": "technical_support", "confidence": 0.9}},
            {"payload": {"solution": "Restart your device"}},
            {"payload": {"response": "Please try restarting your device."}}
        ]
        
        response = await orchestrator.handle_customer_inquiry(
            "My device is not working",
            "customer_123"
        )
        
        assert "restart" in response.lower()
        assert mock_request.call_count == 3

Best Practices

Error Handling

Implement comprehensive error handling and retry mechanisms

Security

Use authentication tokens and validate all incoming messages

Performance

Implement connection pooling and async processing

Monitoring

Add comprehensive logging and metrics collection

Scalability

Design for horizontal scaling and load distribution

Documentation

Document agent capabilities and API contracts