Friday, May 30, 2025

Multi-Agent Communication with the A2A Python SDK


If below a rock and you’re employed with AI, you’ve most likely heard about Agent2Agent (A2A) Protocol, “an open commonplace designed to allow communication and collaboration between AI Brokers”. It’s nonetheless fairly new, but it surely’s already getting plenty of buzz. Because it performs so properly with MCP (which seems prefer it’s changing into the business’s commonplace), A2A is shaping as much as be the go-to commonplace for multi-agent communication within the business.

When Google first dropped the Protocol Specification, my first response was principally: “Okay, cool… however what am I presupposed to do with this?” Fortunately, this week they launched the official Python SDK for the protocol, so now it lastly speaks a language I perceive.

On this article we’re going to dive into how the protocol truly units up communication between brokers and purchasers. Spoiler: it’s all in a task-oriented method. To make issues much less summary, let’s construct just a little toy instance collectively.

Communication between the Occasion Detector Agent and an A2A Consumer

In our techniques we have now an Occasion Detector AI Agent (answerable for detecting occasions) and an Alert AI Agent (answerable for alerting the person of the occasions). Since I’m specializing in the A2A protocol right here, each brokers are mocked as easy Python strategies that return strings. However in actual life you’ll be able to construct your brokers with any framework you want (LangGraph, Google ADK, CrewAI and so forth).

We’ve three characters in our system, the person, the Occasion Agent and the Alert Agent. All of them talk utilizing Messages. A Message represents a single flip of communication within the A2A protocol. We wrap the brokers into A2A Servers. The servers expose an HTTP endpoint implementing the protocol. Every A2A Server has Occasion queues that act as a buffer between the agent’s asynchronous execution and the server’s response dealing with.

The A2A Consumer initiates communication, and if two brokers want to speak an A2A Server also can play the position of a A2A Consumer. The diagram beneath reveals how a Consumer and a Server talk inside the protocol.

Picture by writer

The EventQueue shops Messages, Duties, TaskStatusUpdateEvent, TaskArtifactUpdateEvent, A2AError and JSONRPCError objects. The Job could be a very powerful object to know find out how to construct multi-agent techniques with A2A. In accordance with the A2A documentation:

  • When a consumer sends a message to an agent, the agent may decide that fulfilling the request requires a stateful activity to be accomplished (e.g., “generate a report,” “ebook a flight,” “reply a query”).
  • Every activity has a novel ID outlined by the agent and progresses by way of an outlined lifecycle (e.g., submitted, working, input-required, accomplished, failed).
  • Duties are stateful and may contain a number of exchanges (messages) between the consumer and the server.

Consider a Job as one thing in your multi-agent system that has a clear and distinctive objective. We’ve two duties in our system:

  1. Detect an occasion
  2. Alert the person

Every Agent does its personal factor (activity). Let’s construct the A2A Server for the Occasion Agent so issues grow to be extra tangible.

Constructing the A2A Server for the Occasion Agent

First up: the Agent Card. The Agent Card is JSON doc used to get to know different brokers obtainable. :

  • Server’s identification
  • Capabilities
  • Expertise
  • Service endpoint
  • URL
  • How purchasers ought to authenticate and work together with the agent

Let’s first outline the Agent Card for the Occasion Detector AI Agent (I’ve outlined the abilities primarily based on this instance from Google):

agent_card = AgentCard(  
    title='Occasion Detection Agent',  
    description='Detects related occasions and alerts the person',  
    url='http://localhost:10008/',  
    model='1.0.0',  
    defaultInputModes=['text'],  
    defaultOutputModes=['text'],  
    capabilities=AgentCapabilities(streaming=False),  
    authentication={ "schemes": ["basic"] },  
    abilities=[  
        AgentSkill(  
            id='detect_events',  
            name='Detect Events',  
            description='Detects events and alert the user',  
            tags=['event'],  
        ),  
    ],  
)

You’ll be able to study extra in regards to the Agent Card object construction right here: https://google.github.io/A2A/specification/#55-agentcard-object-structure

The agent itself will truly be a Uvicorn server, so let’s construct the foremost() technique to get it up and working. All of the request will likely be dealt with by the DefaultRequestHandler of the a2a-python SDK. The handler wants a TaskStore to retailer the Duties and an AgentExecutor which has the implementation of the core logic of the agent (we’ll construct the EventAgentExecutor in a minute).

The final element of the foremost() technique is the A2AStarletteApplication, which is the Starlette utility that implements the A2A protocol server endpoints. We have to present the Agent Card and the DefaultRequestHandler to initialize it. Now the final step is to run the app utilizing uvicorn. Right here is the total code of the foremost() technique:

import click on  
import uvicorn  
from a2a.varieties import (  
    AgentCard, AgentCapabilities, AgentSkill
) 
from a2a.server.request_handlers import DefaultRequestHandler  
from a2a.server.duties import InMemoryTaskStore  
from a2a.server.apps import A2AStarletteApplication 
 
@click on.command()  
@click on.choice('--host', default='localhost')  
@click on.choice('--port', default=10008)  
def foremost(host: str, port: int):  
    agent_executor = EventAgentExecutor()

    agent_card = AgentCard(  
        title='Occasion Detection Agent',  
        description='Detects related occasions and alerts the person',  
        url='http://localhost:10008/',  
        model='1.0.0',  
        defaultInputModes=['text'],  
        defaultOutputModes=['text'],  
        capabilities=AgentCapabilities(streaming=False),  
        authentication={ "schemes": ["basic"] },  
        abilities=[              AgentSkill(                  id='detect_events',                  name='Detect Events',                  description='Detects events and alert the user',                  tags=['event'],  
            ),  
        ],  
    )
      
    request_handler = DefaultRequestHandler(  
        agent_executor=agent_executor,  
        task_store=InMemoryTaskStore()  
    ) 
 
    a2a_app = A2AStarletteApplication(  
        agent_card=agent_card,  
        http_handler=request_handler  
    )  

    uvicorn.run(a2a_app.construct(), host=host, port=port)

Creating the EventAgentExecutor

Now it’s time to construct the core of our agent and eventually see find out how to use the Duties to make the brokers work together with one another. The EventAgentExecutor class inherits from AgentExecutor interface and thus we have to implement the execute() and the cancel() strategies. Each take a RequestContext and an EventQueue object as parameters. The RequestContext holds details about the present request being processed by the server and the EventQueue acts as a buffer between the agent’s asynchronous execution and the server’s response dealing with.

Our agent will simply examine if the string “occasion” is within the message the person have despatched (KISS ✨). If the “occasion” is there then we should always name the Alert Agent. We’ll try this by sending a Message to this different Alert agent. That is the Direct Configuration technique, that means we’ll configure the agent with a URL to fetch the Agent Card of the Alert Agent. To do this our Occasion Agent will act like a A2A Consumer. 

Let’s construct the Executor step-by-step. First let’s create the primary Job (the duty to detect the occasions). We have to instantiate a TaskUpdater object (a helper class for brokers to publish updates to a activity’s occasion queue), then submit the duty and announce we’re engaged on it with the start_work() technique:

from a2a.server.agent_execution import AgentExecutor

class EventAgentExecutor(AgentExecutor):  
    async def execute(self, context: RequestContext, event_queue: EventQueue):  
        task_updater = TaskUpdater(event_queue, context.task_id, context.context_id)  
        task_updater.submit()  
        task_updater.start_work()

The message the person will ship to the agent will appear to be this:

send_message_payload = {  
        'message': {  
            'position': 'person',  
            'components': [{'type': 'text', 'text': f'it has an event!'}],  
            'messageId': uuid4().hex,  
        }  
    }

A Half represents a definite piece of content material inside a Message, representing exportable content material as both TextPart, FilePart, or DataPart. We’ll use a TextPart so we have to unwrap it within the executor:

from a2a.server.agent_execution import AgentExecutor

class EventAgentExecutor(AgentExecutor):  
    async def execute(self, context: RequestContext, event_queue: EventQueue):  
        task_updater = TaskUpdater(event_queue, context.task_id, context.context_id)  
        task_updater.submit()  
        task_updater.start_work()

        await asyncio.sleep(1) #let's faux we're truly doing one thing

        user_message = context.message.components[0].root.textual content # unwraping the TextPart

Time to create the tremendous superior logic of our agent. If the message doesn’t have the string “occasion” we don’t must name the Alert Agent and the duty is completed:

from a2a.server.agent_execution import AgentExecutor

class EventAgentExecutor(AgentExecutor):  
    async def execute(self, context: RequestContext, event_queue: EventQueue):  
        task_updater = TaskUpdater(event_queue, context.task_id, context.context_id)  
        task_updater.submit()  
        task_updater.start_work()

        await asyncio.sleep(1) #let's faux we're truly doing one thing

        user_message = context.message.components[0].root.textual content # unwraping the TextPart

        if "occasion" not in user_message:  
            task_updater.update_status(  
                TaskState.accomplished,  
                message=task_updater.new_agent_message(components=[TextPart(text=f"No event detected")]),
            )

Creating an A2A Consumer for the Consumer

Let’s create an A2A Consumer so we will check the agent as it’s. The consumer makes use of the get_client_from_agent_card_url() technique from A2AClient class to (guess what) get the agent card. Then we wrap the message in a SendMessageRequest object and ship it to the agent utilizing the send_message() technique of the consumer. Right here is the total code:

import httpx  
import asyncio  
from a2a.consumer import A2AClient  
from a2a.varieties import SendMessageRequest, MessageSendParams  
from uuid import uuid4  
from pprint import pprint
  
async def foremost():    
    send_message_payload = {  
        'message': {  
            'position': 'person',  
            'components': [{'type': 'text', 'text': f'nothing happening here'}],  
            'messageId': uuid4().hex,  
        }  
    }  

    async with httpx.AsyncClient() as httpx_client:  
        consumer = await A2AClient.get_client_from_agent_card_url(  
            httpx_client, 'http://localhost:10008'  
        )  
        request = SendMessageRequest(  
            params=MessageSendParams(**send_message_payload)  
        )  
        response = await consumer.send_message(request)  
        pprint(response.model_dump(mode='json', exclude_none=True))  
  
if __name__ == "__main__":  
    asyncio.run(foremost())

That is what occurs within the terminal that’s working the EventAgent server:

Picture by writer

And that is the message the consumer sees:

Picture by writer

The duty to detect the occasion was created and no occasion was detected, good! However the entire level of A2A is to make Brokers talk with one another, so let’s make the Occasion Agent discuss to the Alert Agent. 

Making the Occasion Agent discuss to the Alert Agent

To make the Occasion Agent discuss to the Alert Agent the Occasion Agent will act as a consumer as properly:

from a2a.server.agent_execution import AgentExecutor

ALERT_AGENT_URL = "http://localhost:10009/" 

class EventAgentExecutor(AgentExecutor):  
    async def execute(self, context: RequestContext, event_queue: EventQueue):  
        task_updater = TaskUpdater(event_queue, context.task_id, context.context_id)  
        task_updater.submit()  
        task_updater.start_work()

        await asyncio.sleep(1) #let's faux we're truly doing one thing

        user_message = context.message.components[0].root.textual content # unwraping the TextPart

        if "occasion" not in user_message:  
            task_updater.update_status(  
                TaskState.accomplished,  
                message=task_updater.new_agent_message(components=[TextPart(text=f"No event detected")]),
            )
        else:
            alert_message = task_updater.new_agent_message(components=[TextPart(text="Event detected!")])

            send_alert_payload = SendMessageRequest(  
                params=MessageSendParams(  
                    message=alert_message  
                )  
            )  

            async with httpx.AsyncClient() as consumer:  
                alert_agent = A2AClient(httpx_client=consumer, url=ALERT_AGENT_URL)  
                response = await alert_agent.send_message(send_alert_payload)  

                if hasattr(response.root, "outcome"):  
                    alert_task = response.root.outcome  
                    # Polling till the duty is completed
                    whereas alert_task.standing.state not in (  
                        TaskState.accomplished, TaskState.failed, TaskState.canceled, TaskState.rejected  
                    ):  
                        await asyncio.sleep(0.5)  
                        get_resp = await alert_agent.get_task(  
                            GetTaskRequest(params=TaskQueryParams(id=alert_task.id))  
                        )  
                        if isinstance(get_resp.root, GetTaskSuccessResponse):  
                            alert_task = get_resp.root.outcome  
                        else:  
                            break  
  
                    # Full the unique activity  
                    if alert_task.standing.state == TaskState.accomplished:  
                        task_updater.update_status(  
                            TaskState.accomplished,  
                            message=task_updater.new_agent_message(components=[TextPart(text="Event detected and alert sent!")]),  
                        )  
                    else:  
                        task_updater.update_status(  
                            TaskState.failed,  
                            message=task_updater.new_agent_message(components=[TextPart(text=f"Failed to send alert: {alert_task.status.state}")]),  
                        )  
                else:  
                    task_updater.update_status(  
                        TaskState.failed,  
                        message=task_updater.new_agent_message(components=[TextPart(text=f"Failed to create alert task")]),  
                    )

We name the Alert Agent simply as we known as the Occasion Agent because the person, and when the Alert Agent activity is completed, we full the unique Occasion Agent activity. Let’s name the Occasion Agent once more however this time with an occasion:

Picture by writer

The sweetness right here is that we merely known as the Alert Agent and we don’t must know something about the way it alerts the person. We simply ship a message to it and await it to complete.

The Alert Agent is tremendous much like the Occasion Agent. You’ll be able to examine the entire code right here: https://github.com/dmesquita/multi-agent-communication-a2a-python

Closing Ideas

Understanding find out how to construct multi-agent techniques with A2A could be daunting at first, however ultimately you simply ship messages to let the brokers do their factor. All that you must do to combine you brokers with A2A is to create a category with the agent’s logic that inherit from the AgentExecutor and run the agent as a server.

I hope this text have helped you in your A2A journey, thanks for studying!

References

[1] Padgham, Lin, and Michael Winikoff. Creating clever agent techniques: A sensible information. John Wiley & Sons, 2005.

[2] https://github.com/google/a2a-python

[3] https://github.com/google/a2a-python/tree/foremost/examples/google_adk

[4] https://builders.googleblog.com/en/agents-adk-agent-engine-a2a-enhancements-google-io/

Related Articles

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Latest Articles

PHP Code Snippets Powered By : XYZScripts.com