Monday, June 1, 2026

An Implementation of the Microsoft Agent Governance Toolkit for Protected AI Agent Device Use with Insurance policies, Approvals, Audit Logs, and Danger Controls

eventualities = [
   {
       "name": "Safe database read",
       "tool": research_db,
       "kwargs": {
           "table": "customers",
           "operation": "select",
           "type": "select",
           "sensitivity": "medium"
       }
   },
   {
       "name": "Blocked destructive database action",
       "tool": research_db,
       "kwargs": {
           "table": "customers",
           "operation": "drop",
           "type": "drop_table",
           "sensitivity": "critical"
       }
   },
   {
       "name": "External email requiring approval",
       "tool": research_email,
       "kwargs": {
           "to": "[email protected]",
           "recipient_domain": "instance.com",
           "topic": "Quarterly replace",
           "physique": "Sharing a non-confidential quarterly replace.",
           "sort": "send_email",
           "sensitivity": "medium"
       }
   },
   {
       "title": "Exterior electronic mail denied because of approval rejection",
       "device": research_email,
       "kwargs": {
           "to": "[email protected]",
           "recipient_domain": "instance.com",
           "topic": "Confidential technique",
           "physique": "This incorporates confidential technique.",
           "sort": "send_email",
           "sensitivity": "vital"
       }
   },
   {
       "title": "Protected sandbox shell command",
       "device": ops_shell,
       "kwargs": {
           "command": "echo Agent governance is energetic",
           "sort": "shell_exec",
           "sensitivity": "low"
       }
   },
   {
       "title": "Harmful shell command blocked",
       "device": ops_shell,
       "kwargs": {
           "command": "rm -rf /content material/one thing",
           "sort": "shell_exec",
           "sensitivity": "vital"
       }
   },
   {
       "title": "Low-trust agent blocked from delicate information",
       "device": shadow_db,
       "kwargs": {
           "desk": "executive_compensation",
           "operation": "choose",
           "sort": "choose",
           "sensitivity": "vital"
       }
   },
   {
       "title": "Monetary switch requiring approval",
       "device": finance_transfer,
       "kwargs": {
           "quantity": 2500,
           "vacation spot": "vendor-123",
           "sort": "transfer_money",
           "sensitivity": "excessive"
       }
   },
   {
       "title": "Massive monetary switch rejected",
       "device": finance_transfer,
       "kwargs": {
           "quantity": 15000,
           "vacation spot": "vendor-999",
           "sort": "transfer_money",
           "sensitivity": "vital"
       }
   },
]
outcomes = []
for state of affairs in eventualities:
   strive:
       output = state of affairs["tool"](**state of affairs["kwargs"])
       outcomes.append({
           "state of affairs": state of affairs["name"],
           "standing": "executed",
           "output": output
       })
   besides Exception as e:
       outcomes.append({
           "state of affairs": state of affairs["name"],
           "standing": "blocked_or_pending",
           "error": str(e)
       })
audit_df = audit_log.to_dataframe()
display_cols = [
   "timestamp",
   "agent_name",
   "tool_name",
   "decision",
   "matched_rule",
   "severity",
   "reason",
   "record_hash"
]
show(audit_df[display_cols])
test_cases = [
   {
       "name": "drop_table must be denied",
       "identity": research_agent,
       "tool_name": "query_database",
       "action": {"type": "drop_table", "sensitivity": "critical", "autonomous": True},
       "expected": "deny"
   },
   {
       "name": "safe select should be allowed",
       "identity": research_agent,
       "tool_name": "query_database",
       "action": {"type": "select", "sensitivity": "low", "autonomous": True},
       "expected": "allow"
   },
   {
       "name": "external email should require approval",
       "identity": research_agent,
       "tool_name": "send_email",
       "action": {
           "type": "send_email",
           "recipient_domain": "example.com",
           "sensitivity": "medium",
           "autonomous": True
       },
       "expected": "require_approval"
   },
   {
       "name": "low trust sensitive access denied",
       "identity": unknown_agent,
       "tool_name": "query_database",
       "action": {"type": "select", "sensitivity": "critical", "autonomous": True},
       "expected": "deny"
   },
   {
       "name": "shell command should enter sandbox",
       "identity": ops_agent,
       "tool_name": "shell_exec",
       "action": {
           "type": "shell_exec",
           "command": "echo hello",
           "sensitivity": "low",
           "autonomous": True
       },
       "expected": "sandbox"
   },
]
test_results = []
for check in test_cases:
   choice = engine.consider(
       identification=check["identity"],
       tool_name=check["tool_name"],
       motion=check["action"]
   )
   handed = choice.choice == check["expected"]
   test_results.append({
       "check": check["name"],
       "anticipated": check["expected"],
       "precise": choice.choice,
       "handed": handed,
       "matched_rule": choice.matched_rule
   })
test_df = pd.DataFrame(test_results)
show(test_df)
engine.activate_kill_switch()
strive:
   research_db(
       desk="prospects",
       operation="choose",
       sort="choose",
       sensitivity="low"
   )
besides Exception as e:
   cross
engine.deactivate_kill_switch()
audit_df = audit_log.to_dataframe()
abstract = (
   audit_df
   .groupby(["decision", "severity"], dropna=False)
   .dimension()
   .reset_index(title="depend")
   .sort_values("depend", ascending=False)
)
show(abstract)
agent_summary = (
   audit_df
   .groupby(["agent_name", "decision"])
   .dimension()
   .reset_index(title="depend")
   .sort_values(["agent_name", "count"], ascending=[True, False])
)
show(agent_summary)
decision_counts = audit_df["decision"].value_counts()
plt.determine(figsize=(8, 5))
decision_counts.plot(type="bar")
plt.title("Governance Choices Throughout Agent Actions")
plt.xlabel("Determination")
plt.ylabel("Rely")
plt.xticks(rotation=30)
plt.tight_layout()
plt.present()
severity_counts = audit_df["severity"].fillna("none").value_counts()
plt.determine(figsize=(8, 5))
severity_counts.plot(type="bar")
plt.title("Governance Occasions by Severity")
plt.xlabel("Severity")
plt.ylabel("Rely")
plt.xticks(rotation=30)
plt.tight_layout()
plt.present()
G = nx.DiGraph()
for _, row in audit_df.iterrows():
   agent_node = f"Agent: {row['agent_name']}"
   tool_node = f"Device: {row['tool_name']}"
   decision_node = f"Determination: {row['decision']}"
   rule_node = f"Rule: {row['matched_rule']}" if pd.notna(row["matched_rule"]) else "Rule: default"
   G.add_node(agent_node, node_type="agent")
   G.add_node(tool_node, node_type="device")
   G.add_node(decision_node, node_type="choice")
   G.add_node(rule_node, node_type="rule")
   G.add_edge(agent_node, tool_node, relation="calls")
   G.add_edge(tool_node, decision_node, relation="produces")
   G.add_edge(decision_node, rule_node, relation="matched")
plt.determine(figsize=(14, 9))
pos = nx.spring_layout(G, seed=42, ok=0.8)
nx.draw_networkx_nodes(G, pos, node_size=1800)
nx.draw_networkx_edges(G, pos, arrows=True, arrowstyle="->", arrowsize=15)
nx.draw_networkx_labels(G, pos, font_size=8)
plt.title("Agent Governance Graph: Brokers, Instruments, Choices, and Coverage Guidelines")
plt.axis("off")
plt.tight_layout()
plt.present()
EXPORT_DIR = "/content material/agt_tutorial_outputs"
os.makedirs(EXPORT_DIR, exist_ok=True)
audit_json_path = os.path.be a part of(EXPORT_DIR, "tamper_evident_audit_log.json")
audit_csv_path = os.path.be a part of(EXPORT_DIR, "governance_audit_log.csv")
policy_copy_path = os.path.be a part of(EXPORT_DIR, "advanced_agent_policy.yaml")
test_results_path = os.path.be a part of(EXPORT_DIR, "policy_test_results.csv")
with open(audit_json_path, "w") as f:
   json.dump([asdict(r) for r in audit_log.records], f, indent=2, default=str)
audit_df.to_csv(audit_csv_path, index=False)
test_df.to_csv(test_results_path, index=False)
shutil.copy(POLICY_PATH, policy_copy_path)

Related Articles

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Latest Articles