On this tutorial, we construct a sophisticated, end-to-end studying pipeline round Atomic-Brokers by wiring collectively typed agent interfaces, structured prompting, and a compact retrieval layer that grounds outputs in actual undertaking documentation. Additionally, we show the best way to plan retrieval, retrieve related context, inject it dynamically into an answering agent, and run an interactive loop that turns the setup right into a reusable analysis assistant for any new Atomic Brokers query. Take a look at the FULL CODES right here.
import os, sys, textwrap, time, json, re
from typing import Listing, Optionally available, Dict, Tuple
from dataclasses import dataclass
import subprocess
subprocess.check_call([sys.executable, "-m", "pip", "install", "-q",
"atomic-agents", "instructor", "openai", "pydantic",
"requests", "beautifulsoup4", "scikit-learn"])
from getpass import getpass
if not os.environ.get("OPENAI_API_KEY"):
os.environ["OPENAI_API_KEY"] = getpass("Enter OPENAI_API_KEY (enter hidden): ").strip()
MODEL = os.environ.get("OPENAI_MODEL", "gpt-4o-mini")
from pydantic import Subject
from openai import OpenAI
import teacher
from atomic_agents import AtomicAgent, AgentConfig, BaseIOSchema
from atomic_agents.context import SystemPromptGenerator, ChatHistory, BaseDynamicContextProvider
import requests
from bs4 import BeautifulSoup
We set up all required packages, import the core Atomic-Brokers primitives, and arrange Colab-compatible dependencies in a single place. We securely seize the OpenAI API key from the keyboard and retailer it within the surroundings so downstream code by no means hardcodes secrets and techniques. We additionally lock in a default mannequin identify whereas maintaining it configurable by way of an surroundings variable.
def fetch_url_text(url: str, timeout: int = 20) -> str:
r = requests.get(url, timeout=timeout, headers={"Consumer-Agent": "Mozilla/5.0"})
r.raise_for_status()
soup = BeautifulSoup(r.textual content, "html.parser")
for tag in soup(["script", "style", "nav", "header", "footer", "noscript"]):
tag.decompose()
textual content = soup.get_text("n")
textual content = re.sub(r"[ t]+", " ", textual content)
textual content = re.sub(r"n{3,}", "nn", textual content).strip()
return textual content
def chunk_text(textual content: str, max_chars: int = 1400, overlap: int = 200) -> Listing[str]:
if not textual content:
return []
chunks = []
i = 0
whereas i < len(textual content):
chunk = textual content[i:i+max_chars].strip()
if chunk:
chunks.append(chunk)
i += max_chars - overlap
return chunks
def clamp(s: str, n: int = 800) -> str:
s = (s or "").strip()
return s if len(s) <= n else s[:n].rstrip() + "…"
We fetch internet pages from the Atomic Brokers repo and docs, then clear them into plain textual content so retrieval turns into dependable. We chunk lengthy paperwork into overlapping segments, preserving context whereas maintaining every chunk sufficiently small for rating and quotation. We additionally add a small helper to clamp lengthy snippets so our injected context stays readable.
from sklearn.feature_extraction.textual content import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
@dataclass
class Snippet:
doc_id: str
url: str
chunk_id: int
textual content: str
rating: float
class MiniCorpusRetriever:
def __init__(self, docs: Dict[str, Tuple[str, str]]):
self.gadgets: Listing[Tuple[str, str, int, str]] = []
for doc_id, (url, uncooked) in docs.gadgets():
for idx, ch in enumerate(chunk_text(uncooked)):
self.gadgets.append((doc_id, url, idx, ch))
if not self.gadgets:
elevate RuntimeError("No paperwork have been fetched; can not construct TF-IDF index.")
self.vectorizer = TfidfVectorizer(stop_words="english", max_features=50000)
self.matrix = self.vectorizer.fit_transform([it[3] for it in self.gadgets])
def search(self, question: str, ok: int = 6) -> Listing[Snippet]:
qv = self.vectorizer.rework([query])
sims = cosine_similarity(qv, self.matrix).ravel()
high = sims.argsort()[::-1][:k]
out = []
for j in high:
doc_id, url, chunk_id, txt = self.gadgets[j]
out.append(Snippet(doc_id=doc_id, url=url, chunk_id=chunk_id, textual content=txt, rating=float(sims[j])))
return out
class RetrievedContextProvider(BaseDynamicContextProvider):
def __init__(self, title: str, snippets: Listing[Snippet]):
tremendous().__init__(title=title)
self.snippets = snippets
def get_info(self) -> str:
blocks = []
for s in self.snippets:
blocks.append(
f"[{s.doc_id}#{s.chunk_id}] (rating={s.rating:.3f}) {s.url}n{clamp(s.textual content, 900)}"
)
return "nn".be a part of(blocks)
We construct a mini retrieval system utilizing TF-IDF and cosine similarity over the chunked documentation corpus. We wrap every retrieved chunk in a structured Snippet object to trace doc IDs, chunk IDs, and quotation scores. We then inject top-ranked chunks into the agent’s runtime by way of a dynamic context supplier, maintaining the answering agent grounded. Take a look at the FULL CODES right here.
class PlanInput(BaseIOSchema):
"""Enter schema for the planner agent: describes the person's process and what number of retrieval queries to draft."""
process: str = Subject(...)
num_queries: int = Subject(4)
class PlanOutput(BaseIOSchema):
"""Output schema from the planner agent: retrieval queries, protection guidelines, and security checks."""
queries: Listing[str]
must_cover: Listing[str]
safety_checks: Listing[str]
class AnswerInput(BaseIOSchema):
"""Enter schema for the answering agent: person query plus type constraints."""
query: str
type: str = "concise however superior"
class AnswerOutput(BaseIOSchema):
"""Output schema for the answering agent: grounded reply, subsequent steps, and which citations have been used."""
reply: str
next_steps: Listing[str]
used_citations: Listing[str]
consumer = teacher.from_openai(OpenAI(api_key=os.environ["OPENAI_API_KEY"]))
planner_prompt = SystemPromptGenerator(
background=[
"You are a rigorous research planner for a small RAG system.",
"You propose retrieval queries that are diverse (lexical + semantic) and designed to find authoritative info.",
"You do NOT answer the task; you only plan retrieval."
],
steps=[
"Read the task.",
"Propose diverse retrieval queries (not too long).",
"List must-cover aspects and safety checks."
],
output_instructions=[
"Return strictly the PlanOutput schema.",
"Queries must be directly usable as search strings.",
"Must-cover should be 4–8 bullets."
]
)
planner = AtomicAgent[PlanInput, PlanOutput](
config=AgentConfig(
consumer=consumer,
mannequin=MODEL,
system_prompt_generator=planner_prompt,
historical past=ChatHistory(),
)
)
answerer_prompt = SystemPromptGenerator(
background=[
"You are an expert technical tutor for Atomic Agents (atomic-agents).",
"You are given retrieved context snippets with IDs like [doc#chunk].",
"You will need to floor claims within the offered snippets and cite them inline."
],
steps=[
"Read the question and the provided context.",
"Synthesize an accurate answer using only supported facts.",
"Cite claims inline using the provided snippet IDs."
],
output_instructions=[
"Use inline citations like [readme#12] or [docs_home#3].",
"If the context doesn't help one thing, say so briefly and recommend what to retrieve subsequent.",
"Return strictly the AnswerOutput schema."
]
)
answerer = AtomicAgent[AnswerInput, AnswerOutput](
config=AgentConfig(
consumer=consumer,
mannequin=MODEL,
system_prompt_generator=answerer_prompt,
historical past=ChatHistory(),
)
)
We outline strict-typed schemas for planner and answerer inputs and outputs, and embody docstrings to fulfill Atomic Brokers’ schema necessities. We create an Teacher-wrapped OpenAI consumer and configure two Atomic Brokers with specific system prompts and chat historical past. We implement structured outputs so the planner produces queries and the answerer produces a cited response with clear subsequent steps.
SOURCES = {
"readme": "https://github.com/BrainBlend-AI/atomic-agents",
"docs_home": "https://brainblend-ai.github.io/atomic-agents/",
"examples_index": "https://brainblend-ai.github.io/atomic-agents/examples/index.html",
}
raw_docs: Dict[str, Tuple[str, str]] = {}
for doc_id, url in SOURCES.gadgets():
strive:
raw_docs[doc_id] = (url, fetch_url_text(url))
besides Exception:
raw_docs[doc_id] = (url, "")
non_empty = [d for d in raw_docs.values() if d[1].strip()]
if not non_empty:
elevate RuntimeError("All supply fetches failed or have been empty. Test community entry in Colab and retry.")
retriever = MiniCorpusRetriever(raw_docs)
def run_atomic_rag(query: str, ok: int = 7, verbose: bool = True) -> AnswerOutput:
t0 = time.time()
plan = planner.run(PlanInput(process=query, num_queries=4))
all_snips: Listing[Snippet] = []
for q in plan.queries:
all_snips.lengthen(retriever.search(q, ok=max(2, ok // 2)))
finest: Dict[Tuple[str, int], Snippet] = {}
for s in all_snips:
key = (s.doc_id, s.chunk_id)
if (key not in finest) or (s.rating > finest[key].rating):
finest[key] = s
snips = sorted(finest.values(), key=lambda x: x.rating, reverse=True)[:k]
ctx = RetrievedContextProvider(title="Retrieved Atomic Brokers Context", snippets=snips)
answerer.register_context_provider("retrieved_context", ctx)
out = answerer.run(AnswerInput(query=query, type="concise, superior, sensible"))
if verbose:
print(out.reply)
return out
demo_q = "Educate me Atomic Brokers at a sophisticated degree: clarify the core constructing blocks and present the best way to chain brokers with typed schemas and dynamic context."
run_atomic_rag(demo_q, ok=7, verbose=True)
whereas True:
user_q = enter("nYour query> ").strip()
if not user_q or user_q.decrease() in {"exit", "give up"}:
break
run_atomic_rag(user_q, ok=7, verbose=True)
We fetch a small set of authoritative Atomic Brokers sources and construct an area retrieval index from them. We implement a full pipeline operate that plans queries, retrieves related context, injects it, and produces a grounded remaining reply. We end by operating a demo question and launching an interactive loop so we are able to maintain asking questions and getting cited solutions.
In conclusion, we accomplished the Atomic-Brokers workflow in Colab, cleanly separating planning, retrieval, answering, and making certain robust typing. We saved the system grounded by injecting solely the highest-signal documentation chunks as dynamic context, and we enforced a quotation self-discipline that makes outputs auditable. From right here, we are able to scale this sample by including extra sources, swapping in stronger retrievers or rerankers, introducing tool-use brokers, and turning the pipeline right into a production-grade analysis assistant that is still each quick and reliable.
Take a look at the FULL CODES right here. Additionally, be at liberty to observe us on Twitter and don’t neglect to hitch our 100k+ ML SubReddit and Subscribe to our E-newsletter. Wait! are you on telegram? now you possibly can be a part of us on telegram as properly.

