This tutorial builds a complete news summarization pipeline using GraphBus. By the end you'll have three agents that coordinate via a typed message bus, negotiate shared schemas during a build phase, and then run reliably in production with structured, observable inter-agent messaging.

The pipeline: FetcherService pulls headlines โ†’ CleanerService deduplicates and normalizes โ†’ FormatterService renders a digest. Three agents, clean contracts between them, negotiated once at build time and enforced at runtime.

Prerequisites: Python 3.9+, GraphBus installed (pip install -e . from the repo). Optional: ANTHROPIC_API_KEY for build-time agent negotiation.

Project setup

Start by scaffolding the project:

graphbus init news-digest && cd news-digest
# Creates: agents/, tests/, graphbus.yaml, README.md

We'll replace the default agents with our three. Create the agents directory structure:

rm agents/*.py
touch agents/__init__.py
touch agents/fetcher.py agents/cleaner.py agents/formatter.py

Agent 1: FetcherService

The fetcher pulls headlines for a given topic. In a real pipeline this would call a news API; here we use structured mock data to keep the tutorial self-contained.

# agents/fetcher.py
from graphbus_core import GraphBusNode, schema_method


class FetcherService(GraphBusNode):
    SYSTEM_PROMPT = """
    I fetch news headlines from external sources.

    During build cycles, I can propose improvements to:
    - The structure of the headlines I return (add source, timestamp, score)
    - Filtering logic (remove paywalled content, duplicates by URL)
    - The topics I support

    I care about data quality and want my output schema to serve
    both CleanerService and FormatterService well.
    """

    @schema_method(
        input_schema={"topic": str, "limit": int},
        output_schema={"headlines": list, "topic": str, "fetched_at": str}
    )
    def fetch_headlines(self, topic: str = "technology", limit: int = 10) -> dict:
        """Fetch headlines for a given topic."""
        import datetime

        # In production: replace with a real news API call
        mock_headlines = {
            "technology": [
                {"title": "GraphBus framework hits 1,000 stars on GitHub", "url": "https://example.com/1"},
                {"title": "Multi-agent systems reshape software development", "url": "https://example.com/2"},
                {"title": "LLM-powered code refactoring is production-ready in 2026", "url": "https://example.com/3"},
                {"title": "  Whitespace-heavy headline   needs   cleaning  ", "url": "https://example.com/4"},
                {"title": "Python remains top language for ML pipelines", "url": "https://example.com/5"},
                {"title": "GraphBus framework hits 1,000 stars on GitHub", "url": "https://example.com/1"},  # duplicate
            ],
            "ai": [
                {"title": "Claude 4 sets new reasoning benchmarks", "url": "https://example.com/6"},
                {"title": "Open-source LLMs close gap with proprietary models", "url": "https://example.com/7"},
            ],
        }

        headlines = mock_headlines.get(topic, mock_headlines["technology"])[:limit]
        return {
            "headlines": headlines,
            "topic": topic,
            "fetched_at": datetime.datetime.utcnow().isoformat() + "Z",
        }

Agent 2: CleanerService

The cleaner normalizes whitespace, deduplicates by URL, and validates the structure. Notice the @subscribe โ€” it listens for raw news events on the bus and can also be called directly.

# agents/cleaner.py
from graphbus_core import GraphBusNode, schema_method, subscribe


class CleanerService(GraphBusNode):
    SYSTEM_PROMPT = """
    I clean and normalize news headlines from FetcherService.

    During build cycles, I can propose:
    - Additional validation rules (e.g. minimum title length)
    - Deduplication strategies (URL-based vs title similarity)
    - Structured output improvements (add 'cleaned_at' timestamp)

    I depend on FetcherService's output schema. If it changes,
    I need to update my input expectations accordingly.
    """

    @schema_method(
        input_schema={"headlines": list, "topic": str},
        output_schema={"cleaned": list, "removed_count": int, "topic": str}
    )
    def clean(self, headlines: list, topic: str = "") -> dict:
        """Normalize whitespace and deduplicate by URL."""
        seen_urls = set()
        cleaned = []
        removed = 0

        for item in headlines:
            if not isinstance(item, dict):
                removed += 1
                continue

            title = " ".join(item.get("title", "").split())  # normalize whitespace
            url = item.get("url", "")

            if not title or len(title) < 10:  # skip too-short titles
                removed += 1
                continue

            if url in seen_urls:  # deduplicate by URL
                removed += 1
                continue

            seen_urls.add(url)
            cleaned.append({"title": title, "url": url})

        return {
            "cleaned": cleaned,
            "removed_count": removed,
            "topic": topic,
        }

    @subscribe("/News/Raw")
    def on_raw_news(self, event):
        """Handle raw news events from the message bus."""
        result = self.clean(
            headlines=event.get("headlines", []),
            topic=event.get("topic", ""),
        )
        self.log(f"[Cleaner] Processed {len(result['cleaned'])} headlines "
                 f"(removed {result['removed_count']})")
        self.publish("/News/Cleaned", result)

Agent 3: FormatterService

The formatter takes cleaned headlines and renders a human-readable digest with a header, numbered list, and summary line.

# agents/formatter.py
from graphbus_core import GraphBusNode, schema_method, subscribe


class FormatterService(GraphBusNode):
    SYSTEM_PROMPT = """
    I format cleaned news headlines into human-readable digests.

    During build cycles, I can propose:
    - Richer output formats (Markdown, HTML, plain text variants)
    - Adding metadata like reading time estimates
    - Priority scoring or categorization of headlines
    - Summary generation from multiple headlines

    I depend on CleanerService's output schema.
    """

    @schema_method(
        input_schema={"cleaned": list, "topic": str},
        output_schema={"digest": str, "headline_count": int, "topic": str}
    )
    def format_digest(self, cleaned: list, topic: str = "") -> dict:
        """Format cleaned headlines into a readable digest."""
        import datetime

        if not cleaned:
            return {"digest": "No headlines available.", "headline_count": 0, "topic": topic}

        now = datetime.datetime.utcnow().strftime("%B %d, %Y โ€“ %H:%M UTC")
        lines = [
            f"# News Digest: {topic.title() or 'Top Headlines'}",
            f"*{now}*",
            f"*{len(cleaned)} headline{'s' if len(cleaned) != 1 else ''}*",
            "",
        ]

        for i, item in enumerate(cleaned, 1):
            lines.append(f"{i}. **{item['title']}**")
            if item.get("url"):
                lines.append(f"   {item['url']}")

        lines.append("")
        lines.append(f"---")
        lines.append(f"*Generated by GraphBus FormatterService*")

        return {
            "digest": "\n".join(lines),
            "headline_count": len(cleaned),
            "topic": topic,
        }

    @subscribe("/News/Cleaned")
    def on_cleaned_news(self, event):
        """Handle cleaned news events from the message bus."""
        result = self.format_digest(
            cleaned=event.get("cleaned", []),
            topic=event.get("topic", ""),
        )
        self.log(f"[Formatter] Generated digest ({result['headline_count']} headlines)")
        print(result["digest"])

Building the pipeline

Step 1: Static build (no LLM โ€” fast, always works)

graphbus build agents/

You'll see:

GraphBus Build
โ„น Source: agents/   Output: .graphbus/

โœ“ Build completed

Build Summary
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ Component     โ”‚ Count โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚ Agents        โ”‚     3 โ”‚
โ”‚ Topics        โ”‚     2 โ”‚
โ”‚ Subscriptions โ”‚     2 โ”‚
โ”‚ Dependencies  โ”‚     0 โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Agents:
  โ€ข FetcherService   (1 methods, 0 subscriptions)
  โ€ข CleanerService   (1 methods, 1 subscriptions)
  โ€ข FormatterService (1 methods, 1 subscriptions)

โœ“ Artifacts written to: .graphbus/

Step 2: LLM build โ€” agents negotiate the schema

export ANTHROPIC_API_KEY=sk-ant-...
graphbus build agents/ --enable-agents

Watch the negotiation live:

GraphBus Build  โœฆ agent mode
โ„น Activating LLM agents...

[AGENT] FetcherService: "I propose adding a 'source' field to my output schema"
         rationale: FormatterService could display source alongside headline
         diff: output_schema["source"] = str  (optional)
         affects: CleanerService, FormatterService

[AGENT] CleanerService:   "Accepted โ€” I'll pass 'source' through unchanged"
[AGENT] FormatterService: "Accepted โ€” I'll display it when present"

[ARBITER] Consensus (2/2). Committing.
  โœ“ Modified: agents/fetcher.py
  โœ“ Modified: agents/cleaner.py
  โœ“ Modified: agents/formatter.py

[AGENT] FormatterService: "I propose adding reading time to the digest header"
         rationale: Improves digest usefulness for readers

[AGENT] FetcherService:  "Accepted โ€” no schema impact on my end"
[AGENT] CleanerService:  "Accepted โ€” no schema impact on my end"

[ARBITER] Consensus (2/2). Committing.
  โœ“ Modified: agents/formatter.py

โœ“ Build complete โ€” 2 rounds, 4 files improved by agents
โœ“ Artifacts written to: .graphbus/

After the build, your source files have been improved by the agents. Run git diff agents/ to review the changes before committing them.

When do LLMs run? You control this completely. By default, graphbus build analyzes your code statically โ€” no LLM calls. Add --enable-agents to activate LLM-powered negotiation at build time. At runtime, your agents call LLMs whenever their logic requires it โ€” the bus routes messages regardless.

Step 3: Run the pipeline

graphbus run .graphbus/
GraphBus Runtime
โ„น Loading artifacts from: .graphbus/

============================================================
RUNTIME READY โ€” 3 nodes active
  Contract Validation: ENABLED
  Coherence Tracking: ENABLED
============================================================

โœ“ Runtime started successfully

Step 4: Invoke the pipeline programmatically

You can drive the pipeline from Python code without going through the CLI REPL:

# run_pipeline.py
from graphbus_core.runtime import RuntimeExecutor, RuntimeConfig

config = RuntimeConfig(artifacts_dir=".graphbus/")
executor = RuntimeExecutor(config)
executor.start()

# Step 1: fetch
fetch_result = executor.invoke(
    "FetcherService",
    "fetch_headlines",
    {"topic": "technology", "limit": 5}
)
print(f"Fetched: {len(fetch_result['headlines'])} headlines")

# Step 2: clean
clean_result = executor.invoke(
    "CleanerService",
    "clean",
    {"headlines": fetch_result["headlines"], "topic": fetch_result["topic"]}
)
print(f"Cleaned: {clean_result['removed_count']} removed, {len(clean_result['cleaned'])} kept")

# Step 3: format
digest_result = executor.invoke(
    "FormatterService",
    "format_digest",
    {"cleaned": clean_result["cleaned"], "topic": clean_result["topic"]}
)

print()
print(digest_result["digest"])

executor.stop()

Output:

Fetched: 5 headlines
Cleaned: 1 removed, 4 kept

# News Digest: Technology
*February 20, 2026 โ€“ 15:30 UTC*
*4 headlines*

1. **GraphBus framework hits 1,000 stars on GitHub**
   https://example.com/1
2. **Multi-agent systems reshape software development**
   https://example.com/2
3. **LLM-powered code refactoring is production-ready in 2026**
   https://example.com/3
4. **Python remains top language for ML pipelines**
   https://example.com/5

---
*Generated by GraphBus FormatterService*

Using the message bus for event-driven flow

The @subscribe handlers let you drive the pipeline via events rather than direct method calls. This is useful for continuous pipelines where FetcherService runs on a schedule โ€” or any time you want loosely coupled, observable inter-agent communication:

# Event-driven version
executor.publish("/News/Raw", {
    "headlines": fetch_result["headlines"],
    "topic": "technology"
})
# โ†’ CleanerService.on_raw_news() fires automatically
# โ†’ publishes /News/Cleaned
# โ†’ FormatterService.on_cleaned_news() fires automatically
# โ†’ digest printed to stdout

Every message on the bus is typed, validated against the negotiated schema, and observable. You can attach a listener to any topic for logging, debugging, or metrics โ€” without modifying the agents themselves.

Deploying to production

Docker

graphbus docker build
# โ†’ writes Dockerfile
docker build -t news-digest:latest .
docker run --rm news-digest:latest python run_pipeline.py

Kubernetes

graphbus k8s generate --namespace production
# โ†’ writes deployment.yaml, service.yaml, configmap.yaml
kubectl apply -f k8s/

GitHub Actions CI

graphbus ci github
# โ†’ writes .github/workflows/graphbus.yml
# Runs: static build on every PR, LLM build on merge to main

Inspecting the negotiation history

After an agent build, the full negotiation is logged:

graphbus inspect-negotiation
# Shows: every proposal, every accept/reject, every arbiter decision
# Useful for understanding what your agents changed and why
graphbus inspect .graphbus/
# Shows: agent graph, schemas, topic subscriptions
# Useful for verifying the build output before deploying

What the agents actually improved

If you ran with --enable-agents, your source files were improved. Common changes agents propose in this pipeline:

The agents reason about the pipeline end-to-end. FetcherService knows FormatterService wants more data; FormatterService knows CleanerService is a dependency; all three agents converge on a schema that works for the whole pipeline. That's the negotiation protocol doing its job.

After the build, run git diff agents/ to see exactly what changed. If any proposal looks wrong, revert it and re-run. The negotiation is additive and auditable โ€” every proposal, vote, and commit is logged in .graphbus/negotiation.log.

Scaling this pattern

This 3-agent pipeline is a template. Real production news pipelines built on GraphBus typically add:

Each new agent brings its own SYSTEM_PROMPT describing its negotiation strategy. The build automatically incorporates it into the consensus process. At runtime, agents can call LLMs whenever their logic requires it โ€” the bus routes messages regardless of whether an LLM is involved in producing them.

Next steps


The full working version of this example is in examples/news_summarizer/ in the GraphBus repo. The agents there have been through real negotiation rounds and reflect what agents actually propose.

Build your own GraphBus pipeline

Join the alpha waitlist โ€” we're working directly with early adopters to get your first pipeline running.

Join the waitlist Full documentation