This tutorial builds a complete news summarization pipeline using GraphBus. By the end you'll have three agents that coordinate via a typed message bus, negotiate shared schemas during a build phase, and then run reliably in production with structured, observable inter-agent messaging.
The pipeline: FetcherService pulls headlines โ CleanerService deduplicates and normalizes โ FormatterService renders a digest. Three agents, clean contracts between them, negotiated once at build time and enforced at runtime.
pip install -e . from the repo). Optional: ANTHROPIC_API_KEY for build-time agent negotiation.
Project setup
Start by scaffolding the project:
graphbus init news-digest && cd news-digest
# Creates: agents/, tests/, graphbus.yaml, README.md
We'll replace the default agents with our three. Create the agents directory structure:
rm agents/*.py
touch agents/__init__.py
touch agents/fetcher.py agents/cleaner.py agents/formatter.py
Agent 1: FetcherService
The fetcher pulls headlines for a given topic. In a real pipeline this would call a news API; here we use structured mock data to keep the tutorial self-contained.
# agents/fetcher.py
from graphbus_core import GraphBusNode, schema_method
class FetcherService(GraphBusNode):
SYSTEM_PROMPT = """
I fetch news headlines from external sources.
During build cycles, I can propose improvements to:
- The structure of the headlines I return (add source, timestamp, score)
- Filtering logic (remove paywalled content, duplicates by URL)
- The topics I support
I care about data quality and want my output schema to serve
both CleanerService and FormatterService well.
"""
@schema_method(
input_schema={"topic": str, "limit": int},
output_schema={"headlines": list, "topic": str, "fetched_at": str}
)
def fetch_headlines(self, topic: str = "technology", limit: int = 10) -> dict:
"""Fetch headlines for a given topic."""
import datetime
# In production: replace with a real news API call
mock_headlines = {
"technology": [
{"title": "GraphBus framework hits 1,000 stars on GitHub", "url": "https://example.com/1"},
{"title": "Multi-agent systems reshape software development", "url": "https://example.com/2"},
{"title": "LLM-powered code refactoring is production-ready in 2026", "url": "https://example.com/3"},
{"title": " Whitespace-heavy headline needs cleaning ", "url": "https://example.com/4"},
{"title": "Python remains top language for ML pipelines", "url": "https://example.com/5"},
{"title": "GraphBus framework hits 1,000 stars on GitHub", "url": "https://example.com/1"}, # duplicate
],
"ai": [
{"title": "Claude 4 sets new reasoning benchmarks", "url": "https://example.com/6"},
{"title": "Open-source LLMs close gap with proprietary models", "url": "https://example.com/7"},
],
}
headlines = mock_headlines.get(topic, mock_headlines["technology"])[:limit]
return {
"headlines": headlines,
"topic": topic,
"fetched_at": datetime.datetime.utcnow().isoformat() + "Z",
}
Agent 2: CleanerService
The cleaner normalizes whitespace, deduplicates by URL, and validates the structure. Notice the @subscribe โ it listens for raw news events on the bus and can also be called directly.
# agents/cleaner.py
from graphbus_core import GraphBusNode, schema_method, subscribe
class CleanerService(GraphBusNode):
SYSTEM_PROMPT = """
I clean and normalize news headlines from FetcherService.
During build cycles, I can propose:
- Additional validation rules (e.g. minimum title length)
- Deduplication strategies (URL-based vs title similarity)
- Structured output improvements (add 'cleaned_at' timestamp)
I depend on FetcherService's output schema. If it changes,
I need to update my input expectations accordingly.
"""
@schema_method(
input_schema={"headlines": list, "topic": str},
output_schema={"cleaned": list, "removed_count": int, "topic": str}
)
def clean(self, headlines: list, topic: str = "") -> dict:
"""Normalize whitespace and deduplicate by URL."""
seen_urls = set()
cleaned = []
removed = 0
for item in headlines:
if not isinstance(item, dict):
removed += 1
continue
title = " ".join(item.get("title", "").split()) # normalize whitespace
url = item.get("url", "")
if not title or len(title) < 10: # skip too-short titles
removed += 1
continue
if url in seen_urls: # deduplicate by URL
removed += 1
continue
seen_urls.add(url)
cleaned.append({"title": title, "url": url})
return {
"cleaned": cleaned,
"removed_count": removed,
"topic": topic,
}
@subscribe("/News/Raw")
def on_raw_news(self, event):
"""Handle raw news events from the message bus."""
result = self.clean(
headlines=event.get("headlines", []),
topic=event.get("topic", ""),
)
self.log(f"[Cleaner] Processed {len(result['cleaned'])} headlines "
f"(removed {result['removed_count']})")
self.publish("/News/Cleaned", result)
Agent 3: FormatterService
The formatter takes cleaned headlines and renders a human-readable digest with a header, numbered list, and summary line.
# agents/formatter.py
from graphbus_core import GraphBusNode, schema_method, subscribe
class FormatterService(GraphBusNode):
SYSTEM_PROMPT = """
I format cleaned news headlines into human-readable digests.
During build cycles, I can propose:
- Richer output formats (Markdown, HTML, plain text variants)
- Adding metadata like reading time estimates
- Priority scoring or categorization of headlines
- Summary generation from multiple headlines
I depend on CleanerService's output schema.
"""
@schema_method(
input_schema={"cleaned": list, "topic": str},
output_schema={"digest": str, "headline_count": int, "topic": str}
)
def format_digest(self, cleaned: list, topic: str = "") -> dict:
"""Format cleaned headlines into a readable digest."""
import datetime
if not cleaned:
return {"digest": "No headlines available.", "headline_count": 0, "topic": topic}
now = datetime.datetime.utcnow().strftime("%B %d, %Y โ %H:%M UTC")
lines = [
f"# News Digest: {topic.title() or 'Top Headlines'}",
f"*{now}*",
f"*{len(cleaned)} headline{'s' if len(cleaned) != 1 else ''}*",
"",
]
for i, item in enumerate(cleaned, 1):
lines.append(f"{i}. **{item['title']}**")
if item.get("url"):
lines.append(f" {item['url']}")
lines.append("")
lines.append(f"---")
lines.append(f"*Generated by GraphBus FormatterService*")
return {
"digest": "\n".join(lines),
"headline_count": len(cleaned),
"topic": topic,
}
@subscribe("/News/Cleaned")
def on_cleaned_news(self, event):
"""Handle cleaned news events from the message bus."""
result = self.format_digest(
cleaned=event.get("cleaned", []),
topic=event.get("topic", ""),
)
self.log(f"[Formatter] Generated digest ({result['headline_count']} headlines)")
print(result["digest"])
Building the pipeline
Step 1: Static build (no LLM โ fast, always works)
graphbus build agents/
You'll see:
GraphBus Build
โน Source: agents/ Output: .graphbus/
โ Build completed
Build Summary
โโโโโโโโโโโโโโโโโฌโโโโโโโโ
โ Component โ Count โ
โโโโโโโโโโโโโโโโโผโโโโโโโโค
โ Agents โ 3 โ
โ Topics โ 2 โ
โ Subscriptions โ 2 โ
โ Dependencies โ 0 โ
โโโโโโโโโโโโโโโโโดโโโโโโโโ
Agents:
โข FetcherService (1 methods, 0 subscriptions)
โข CleanerService (1 methods, 1 subscriptions)
โข FormatterService (1 methods, 1 subscriptions)
โ Artifacts written to: .graphbus/
Step 2: LLM build โ agents negotiate the schema
export ANTHROPIC_API_KEY=sk-ant-...
graphbus build agents/ --enable-agents
Watch the negotiation live:
GraphBus Build โฆ agent mode
โน Activating LLM agents...
[AGENT] FetcherService: "I propose adding a 'source' field to my output schema"
rationale: FormatterService could display source alongside headline
diff: output_schema["source"] = str (optional)
affects: CleanerService, FormatterService
[AGENT] CleanerService: "Accepted โ I'll pass 'source' through unchanged"
[AGENT] FormatterService: "Accepted โ I'll display it when present"
[ARBITER] Consensus (2/2). Committing.
โ Modified: agents/fetcher.py
โ Modified: agents/cleaner.py
โ Modified: agents/formatter.py
[AGENT] FormatterService: "I propose adding reading time to the digest header"
rationale: Improves digest usefulness for readers
[AGENT] FetcherService: "Accepted โ no schema impact on my end"
[AGENT] CleanerService: "Accepted โ no schema impact on my end"
[ARBITER] Consensus (2/2). Committing.
โ Modified: agents/formatter.py
โ Build complete โ 2 rounds, 4 files improved by agents
โ Artifacts written to: .graphbus/
After the build, your source files have been improved by the agents. Run git diff agents/ to review the changes before committing them.
graphbus build analyzes your code statically โ no LLM calls. Add --enable-agents to activate LLM-powered negotiation at build time. At runtime, your agents call LLMs whenever their logic requires it โ the bus routes messages regardless.
Step 3: Run the pipeline
graphbus run .graphbus/
GraphBus Runtime
โน Loading artifacts from: .graphbus/
============================================================
RUNTIME READY โ 3 nodes active
Contract Validation: ENABLED
Coherence Tracking: ENABLED
============================================================
โ Runtime started successfully
Step 4: Invoke the pipeline programmatically
You can drive the pipeline from Python code without going through the CLI REPL:
# run_pipeline.py
from graphbus_core.runtime import RuntimeExecutor, RuntimeConfig
config = RuntimeConfig(artifacts_dir=".graphbus/")
executor = RuntimeExecutor(config)
executor.start()
# Step 1: fetch
fetch_result = executor.invoke(
"FetcherService",
"fetch_headlines",
{"topic": "technology", "limit": 5}
)
print(f"Fetched: {len(fetch_result['headlines'])} headlines")
# Step 2: clean
clean_result = executor.invoke(
"CleanerService",
"clean",
{"headlines": fetch_result["headlines"], "topic": fetch_result["topic"]}
)
print(f"Cleaned: {clean_result['removed_count']} removed, {len(clean_result['cleaned'])} kept")
# Step 3: format
digest_result = executor.invoke(
"FormatterService",
"format_digest",
{"cleaned": clean_result["cleaned"], "topic": clean_result["topic"]}
)
print()
print(digest_result["digest"])
executor.stop()
Output:
Fetched: 5 headlines
Cleaned: 1 removed, 4 kept
# News Digest: Technology
*February 20, 2026 โ 15:30 UTC*
*4 headlines*
1. **GraphBus framework hits 1,000 stars on GitHub**
https://example.com/1
2. **Multi-agent systems reshape software development**
https://example.com/2
3. **LLM-powered code refactoring is production-ready in 2026**
https://example.com/3
4. **Python remains top language for ML pipelines**
https://example.com/5
---
*Generated by GraphBus FormatterService*
Using the message bus for event-driven flow
The @subscribe handlers let you drive the pipeline via events rather than direct method calls. This is useful for continuous pipelines where FetcherService runs on a schedule โ or any time you want loosely coupled, observable inter-agent communication:
# Event-driven version
executor.publish("/News/Raw", {
"headlines": fetch_result["headlines"],
"topic": "technology"
})
# โ CleanerService.on_raw_news() fires automatically
# โ publishes /News/Cleaned
# โ FormatterService.on_cleaned_news() fires automatically
# โ digest printed to stdout
Every message on the bus is typed, validated against the negotiated schema, and observable. You can attach a listener to any topic for logging, debugging, or metrics โ without modifying the agents themselves.
Deploying to production
Docker
graphbus docker build
# โ writes Dockerfile
docker build -t news-digest:latest .
docker run --rm news-digest:latest python run_pipeline.py
Kubernetes
graphbus k8s generate --namespace production
# โ writes deployment.yaml, service.yaml, configmap.yaml
kubectl apply -f k8s/
GitHub Actions CI
graphbus ci github
# โ writes .github/workflows/graphbus.yml
# Runs: static build on every PR, LLM build on merge to main
Inspecting the negotiation history
After an agent build, the full negotiation is logged:
graphbus inspect-negotiation
# Shows: every proposal, every accept/reject, every arbiter decision
# Useful for understanding what your agents changed and why
graphbus inspect .graphbus/
# Shows: agent graph, schemas, topic subscriptions
# Useful for verifying the build output before deploying
What the agents actually improved
If you ran with --enable-agents, your source files were improved. Common changes agents propose in this pipeline:
- FetcherService: adds
sourcefield,scorefield, or pagination support to the output schema - CleanerService: adds title length validation, URL normalization, or language detection
- FormatterService: adds reading time estimates, Markdown vs plain-text variants, or priority ordering
The agents reason about the pipeline end-to-end. FetcherService knows FormatterService wants more data; FormatterService knows CleanerService is a dependency; all three agents converge on a schema that works for the whole pipeline. That's the negotiation protocol doing its job.
git diff agents/ to see exactly what changed. If any proposal looks wrong, revert it and re-run. The negotiation is additive and auditable โ every proposal, vote, and commit is logged in .graphbus/negotiation.log.
Scaling this pattern
This 3-agent pipeline is a template. Real production news pipelines built on GraphBus typically add:
- A ScorerService that ranks headlines by relevance โ can call an LLM at runtime if scoring needs it
- A DeduplicatorService that does semantic similarity (build-time: negotiates the embedding strategy)
- A RouterService that sends digests to different channels (email, Slack, RSS)
- A StorageService that caches fetch results to avoid repeated API calls
Each new agent brings its own SYSTEM_PROMPT describing its negotiation strategy. The build automatically incorporates it into the consensus process. At runtime, agents can call LLMs whenever their logic requires it โ the bus routes messages regardless of whether an LLM is involved in producing them.
Next steps
- Replace the mock data in
FetcherServicewith a real news API (NewsAPI, GNews, etc.) - Run
graphbus validate agents/to check schema contracts before building โ see the docs for the full CLI reference - Add
IS_ARBITER = Trueto a dedicated arbiter agent for larger pipelines with more conflict potential - Deploy with
graphbus k8s generateand run the--enable-agentsbuild phase in CI only โ keep runtime configuration separate
The full working version of this example is in examples/news_summarizer/ in the GraphBus repo. The agents there have been through real negotiation rounds and reflect what agents actually propose.
Build your own GraphBus pipeline
Join the alpha waitlist โ we're working directly with early adopters to get your first pipeline running.
Join the waitlist Full documentation