From 5405bc8f44314a329c1878f47158e024c834be14 Mon Sep 17 00:00:00 2001 From: Sutidon Date: Tue, 7 Oct 2025 23:50:00 +0700 Subject: [PATCH] Create summary.yml Guardian agents ai Thailand --- .github/workflows/summary.yml | 127 ++++++++++++++++++++++++++++++++++ 1 file changed, 127 insertions(+) create mode 100644 .github/workflows/summary.yml diff --git a/.github/workflows/summary.yml b/.github/workflows/summary.yml new file mode 100644 index 000000000..de5623c5a --- /dev/null +++ b/.github/workflows/summary.yml @@ -0,0 +1,127 @@ +name: Summarize new issues + +on: + issues: + types: [opened] + +jobs: + summary: + runs-on: ubuntu-latest + permissions: + issues: write + models: read + contents: read + + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Run AI inference + id: inference + uses: actions/ai-inference@v1 + with: + prompt: | + Summarize the following GitHub issue in one paragraph: + Title: ${{ github.event.issue.title }} + Body: ${{ github.event.issue.body }} + + - name: Comment with AI summary + run: | + gh issue comment $ISSUE_NUMBER --body '${{ steps.inference.outputs.response }}' + env: + GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} + ISSUE_NUMBER: ${{ github.event.issue.number }} + RESPONSE: ${{ steps.inference.outputs.response }}""" guardian_ai_system.py A self-contained prototype for Guardian AI - Thailand +Features included: +Zero-Day Detector (simple honeypot log scanner & signature matcher) +LINE Notify integration for immediate alerts +Daily report generation (PDF if reportlab available; otherwise HTML) +Simple persistent event store (events.json) +CLI interface: scan, watch (daemon), generate-report +How to use: +Place any honeypot logs under ./honeypot_logs/ (text files). The scanner will parse them. +Configure LINE token in config.json or set env var LINE_NOTIFY_TOKEN. +Run quick scan: python guardian_ai_system.py --scan +Run as watcher (polls every N seconds): python guardian_ai_system.py --watch --interval 30 +Generate daily report: python guardian_ai_system.py --report --output report_YYYYMMDD.pdf +Dependencies (optional but recommended): +requests (for LINE Notify) +reportlab (for PDF report generation) +PyPDF2 (to extract text from source PDFs for use in report) +This file is a prototype and should be adapted for production, hardened for security, and integrated with real honeypots, SIEMs, and authenticated services. +Author: S.A.I. Guardian prototype """ +import os import re import time import json import argparse import datetime import threading from pathlib import Path +Optional deps +try: import requests except Exception: requests = None +try: from reportlab.lib.pagesizes import A4 from reportlab.pdfgen import canvas except Exception: reportlab = None +try: import PyPDF2 except Exception: PyPDF2 = None +CONFIG_PATH = Path('guardian_config.json') EVENTS_PATH = Path('events.json') HONEYPOT_DIR = Path('honeypot_logs') +Default signature list - in real deployment use curated YARA rules or threat intel +DEFAULT_SIGNATURES = [ r"ms17_010", # EternalBlue r"CVE-2021-34527", # PrintNightmare r"nmap scan", r"unauthorized login", r"reverse shell", r"/etc/shadow", ] +class Config: def init(self, path=CONFIG_PATH): self.path = Path(path) self.data = { "line_token": None, "signatures": DEFAULT_SIGNATURES, "report_sources": ["Daily_Brief_230725.pdf", "GuardianAI_TH.pdf"], "honeypot_dir": str(HONEYPOT_DIR), } self.load() +def load(self): if self.path.exists(): try: with open(self.path, 'r', encoding='utf-8') as f: d = json.load(f) self.data.update(d) except Exception: print("[Config] failed to parse config file; using defaults") def save(self): with open(self.path, 'w', encoding='utf-8') as f: json.dump(self.data, f, ensure_ascii=False, indent=2) def get(self, key, default=None): return self.data.get(key, default) +class EventStore: def init(self, path=EVENTS_PATH): self.path = Path(path) self.events = [] self._load() +def _load(self): if self.path.exists(): try: with open(self.path, 'r', encoding='utf-8') as f: self.events = json.load(f) except Exception: self.events = [] def add(self, event): self.events.append(event) self._save() def _save(self): with open(self.path, 'w', encoding='utf-8') as f: json.dump(self.events, f, ensure_ascii=False, indent=2) def all(self): return self.events +class ZeroDayDetector: def init(self, signatures=None, honeypot_dir=HONEYPOT_DIR, store=None): self.signatures = [re.compile(s, re.IGNORECASE) if isinstance(s, str) else s for s in (signatures or DEFAULT_SIGNATURES)] self.honeypot_dir = Path(honeypot_dir) self.honeypot_dir.mkdir(exist_ok=True) self.store = store or EventStore() +def scan_file(self, path: Path): matches = [] try: text = path.read_text(errors='ignore') except Exception as e: print(f"[ZeroDayDetector] failed to read {path}: {e}") return matches for sig in self.signatures: for m in sig.finditer(text): snippet = text[max(0, m.start()-40):m.end()+40] evt = { "time": datetime.datetime.utcnow().isoformat() + 'Z', "file": str(path), "signature": sig.pattern if hasattr(sig, 'pattern') else str(sig), "match_text": snippet, } matches.append(evt) return matches def scan(self): found = [] for p in self.honeypot_dir.glob('**/*'): if p.is_file(): m = self.scan_file(p) for evt in m: self.store.add(evt) found.append(evt) return found def watch(self, interval=30, callback=None): """Watch the honeypot dir and scan periodically. callback(event_list) will be called when events found.""" print(f"[ZeroDayDetector] watching {self.honeypot_dir} every {interval}s") try: while True: new = self.scan() if new and callback: callback(new) time.sleep(interval) except KeyboardInterrupt: print("[ZeroDayDetector] watcher stopped by user") +LINE Notify integration +def line_notify_send(token: str, message: str, image_path: str = None): if not token: print("[LINE] No token provided; skip sending") return False if requests is None: print("[LINE] requests library missing. Install with: pip install requests") return False url = "https://notify-api.line.me/api/notify" headers = {"Authorization": f"Bearer {token}"} data = {"message": message} files = None if image_path and Path(image_path).exists(): files = {"imageFile": open(image_path, 'rb')} try: resp = requests.post(url, headers=headers, data=data, files=files) if files: files['imageFile'].close() if resp.status_code == 200: print("[LINE] Notification sent") return True else: print(f"[LINE] Failed to send ({resp.status_code}): {resp.text}") return False except Exception as e: print(f"[LINE] Exception sending: {e}") return False +Reporting +def extract_text_from_pdfs(paths): texts = [] if PyPDF2 is None: print("[Report] PyPDF2 not available; cannot extract PDF text") return texts for p in paths: pth = Path(p) if not pth.exists(): continue try: with open(pth, 'rb') as f: reader = PyPDF2.PdfReader(f) text = [] for page in reader.pages: try: t = page.extract_text() or '' except Exception: t = '' text.append(t) texts.append('\n'.join(text)) except Exception as e: print(f"[Report] failed to read {p}: {e}") return texts +def generate_pdf_report(output_path, events, extracted_texts=None): if reportlab is None: print("[Report] reportlab not available; will generate HTML report instead") return generate_html_report(output_path.replace('.pdf', '.html'), events, extracted_texts) +c = canvas.Canvas(output_path, pagesize=A4) width, height = A4 title = "Guardian AI - Daily Brief" c.setFont("Helvetica-Bold", 18) c.drawString(40, height - 60, title) c.setFont("Helvetica", 10) c.drawString(40, height - 80, f"Generated: {datetime.datetime.utcnow().isoformat()}Z") y = height - 110 # Brief extracted text snippets if extracted_texts: c.setFont("Helvetica-Bold", 12) c.drawString(40, y, "Sources summary:") y -= 20 c.setFont("Helvetica", 9) for t in extracted_texts[:3]: for line in (t.strip().split('\n')[:4]): c.drawString(50, y, line[:100]) y -= 12 if y < 80: c.showPage(); y = height - 40 # Events c.setFont("Helvetica-Bold", 12) c.drawString(40, y, "Detected Events:") y -= 20 c.setFont("Helvetica", 9) for e in events[-50:]: s = f"[{e.get('time')}] {Path(e.get('file')).name} - {e.get('signature')}" c.drawString(50, y, s[:120]) y -= 12 if y < 80: c.showPage(); y = height - 40 c.save() print(f"[Report] PDF report written to {output_path}") return output_path +def generate_html_report(output_path, events, extracted_texts=None): html = [' +Guardian AI - Daily Brief'] html.append(f" +Guardian AI - Daily Brief +") html.append(f" +Generated: {datetime.datetime.utcnow().isoformat()}Z +") if extracted_texts: html.append(' +Source summaries +') for t in extracted_texts[:3]: html.append('') html.append(t[:2000]) html.append('') html.append(' +Detected Events +') html.append('') for e in events[-200:]: html.append(f"{e.get('time')} - {Path(e.get('file')).name} - {e.get('signature')}{e.get('match_text')[:300]}") html.append('') html.append('') with open(output_path, 'w', encoding='utf-8') as f: f.write('\n'.join(html)) print(f"[Report] HTML report written to {output_path}") return output_path +Glue: CLI and orchestration +def load_config(): cfg = Config() return cfg +def on_new_events(events): cfg = load_config() token = cfg.get('line_token') for e in events: msg = f"[Guardian AI] Detected possible Zero-Day signature: {e.get('signature')}\nHost/file: {e.get('file')}\nTime: {e.get('time')}" print(msg) # send via LINE if token if token: line_notify_send(token, msg) +def quick_scan_and_alert(cfg=None): cfg = cfg or load_config() store = EventStore() detector = ZeroDayDetector(signatures=cfg.get('signatures'), honeypot_dir=cfg.get('honeypot_dir'), store=store) found = detector.scan() if found: print(f"[Main] Found {len(found)} events") token = cfg.get('line_token') if token: # send summary s = f"Guardian AI Alert: {len(found)} event(s) detected. Latest: {found[-1].get('signature')} in {Path(found[-1].get('file')).name}" line_notify_send(token, s) else: print("[Main] No suspicious events detected") return store +def run_watcher(cfg=None, interval=30): cfg = cfg or load_config() store = EventStore() detector = ZeroDayDetector(signatures=cfg.get('signatures'), honeypot_dir=cfg.get('honeypot_dir'), store=store) try: detector.watch(interval=interval, callback=on_new_events) except KeyboardInterrupt: print("[Main] watcher stopped") +def generate_daily_report(cfg=None, output=None): cfg = cfg or load_config() store = EventStore() events = store.all() srcs = cfg.get('report_sources') or [] extracted = extract_text_from_pdfs(srcs) if not output: output = f"report_{datetime.datetime.utcnow().strftime('%Y%m%d')}.pdf" out = generate_pdf_report(output, events, extracted_texts=extracted) return out +def ensure_example_files(): # create honeypot sample HONEYPOT_DIR.mkdir(exist_ok=True) sample = HONEYPOT_DIR / 'sample_connection.log' if not sample.exists(): sample.write_text('Oct 10 12:00 Connection from 10.10.10.10 - nmap scan detected\nUnauthorized login attempt user=admin\n') # create config if missing cfg = Config() if not cfg.get('line_token'): # keep as None unless user sets cfg.save() +def main(): parser = argparse.ArgumentParser(description='Guardian AI System - prototype') parser.add_argument('--scan', action='store_true', help='Run a quick scan of honeypot logs') parser.add_argument('--watch', action='store_true', help='Run watcher that polls periodically') parser.add_argument('--interval', type=int, default=30, help='Watcher interval in seconds') parser.add_argument('--report', action='store_true', help='Generate daily report') parser.add_argument('--output', type=str, help='Output path for report') parser.add_argument('--setup-line', action='store_true', help='Interactive setup for LINE token (writes config)') +args = parser.parse_args() ensure_example_files() cfg = load_config() if args.setup_line: token = input('Paste LINE Notify token (or leave blank): ').strip() if token: cfg.data['line_token'] = token cfg.save() print('[Setup] Token saved to guardian_config.json') else: print('[Setup] No token provided; config unchanged') return if args.scan: quick_scan_and_alert(cfg) return if args.watch: run_watcher(cfg, interval=args.interval) return if args.report: out = generate_daily_report(cfg, output=args.output) print(f"[Main] Report created: {out}") return parser.print_help() +if name == 'main': main() + +SAI - ภาพรวมคลังข้อมูลตัวแทนผู้พิทักษ์ +0-welcome.yml +คุณพูดว่า: ตัวแทนผู้พิทักษ์ +ตัวแทนผู้พิทักษ์ +Copilot กล่าวว่า: ดูเหมือนว่าคุณจะพูดถึง "ตัวแทนผู้พิทักษ์" +ดูเหมือนว่าคุณจะพูดถึง "ตัวแทนผู้พิทักษ์" แต่คำขอของคุณยังไม่ชัดเจน ต่อไปนี้คือการตีความที่เป็นไปได้: +หากคุณถามว่า "Guardian Agents" หมายถึงอะไร:โดยทั่วไป "ตัวแทนผู้พิทักษ์" อาจหมายถึงตัวแทน (ซอฟต์แวร์ มนุษย์ หรืออื่นๆ) ที่ออกแบบมาเพื่อปกป้อง ตรวจสอบ หรือปกป้องระบบหรือกระบวนการ +ในบริบทของซอฟต์แวร์หรือ AI อาจเป็นการตรวจสอบตัวแทนที่รับรองว่าเป็นไปตามเงื่อนไขบางประการหรือระบบทำงานอย่างปลอดภัย +หากคุณกำลังถามถึงวิธีการนำ Guardian Agents ไปใช้งานใน GitHub Actions หรือเวิร์กโฟลว์ที่ให้มา:GitHub Actions ไม่มีแนวคิดดั้งเดิมที่เรียกว่า "Guardian Agents" +หากคุณหมายถึงการเพิ่มการดำเนินการหรืองานที่ปกป้องหรือตรวจสอบบางสิ่งบางอย่างในเวิร์กโฟลว์ของคุณ (เช่น การรันการตรวจสอบความปลอดภัย การบังคับใช้การป้องกันสาขา หรือการตรวจสอบความลับ) คุณสามารถเพิ่มขั้นตอนในเวิร์กโฟลว์ YAML ของคุณเพื่อจุดประสงค์นั้นได้ +หากคุณกำลังอ้างอิงถึงเครื่องมือ คลังข้อมูล หรือกรอบงานเฉพาะที่ชื่อว่า "Guardian Agents":โปรดให้รายละเอียดหรือบริบทเพิ่มเติมเพื่อให้ฉันสามารถช่วยคุณบูรณาการ กำหนดค่า หรืออธิบายได้ +คุณช่วยอธิบายให้ชัดเจนหน่อยได้ไหมว่า "ตัวแทนผู้พิทักษ์" หมายถึงอะไร +หากคุณมีข้อกำหนดเฉพาะ หรือต้องการเพิ่มขั้นตอนผู้พิทักษ์/การตรวจสอบ/ความปลอดภัยในเวิร์กโฟลว์ GitHub Actions ของคุณ โปรดแจ้งให้ฉันทราบถึงเป้าหมายของคุณ! +You said: """ Repository: S.A.I. - Guardian Agents Single-file +ที่เก็บ """: SAI - Guardian Agents ชุดไฟล์เดี่ยวสำหรับดูตัวอย่าง แบ่งออกเป็นไฟล์ตามเครื่องหมาย (### ไฟล์: path) วัตถุประสงค์: ต้นแบบแบบฟูลสแตกที่แสดงแกนหลักของ SAI ส่วนกลาง, เฟรมเวิร์ก Agents, message bus (in-memory + Redis เสริม), FastAPI control API, ตัวอย่างเอเจนต์, ความปลอดภัย (คีย์ API / JWT), Docker Compose สำหรับรันกับ Redis และสคริปต์สาธิต หมายเหตุ: นี่คือต้นแบบ สำหรับการผลิต ให้เพิ่มความปลอดภัยให้กับความลับ ใช้ TLS, RBAC, การจัดการคีย์ที่ปลอดภัย, ขีดจำกัดอัตรา, การตรวจสอบ และการตรวจสอบทางกฎหมาย ไฟล์ """: README.md """ SAI - Guardian Agents (ต้นแบบ) สิ่งที่รวมอยู่ sai_core.py — จิตสำนึกส่วนกลาง, รีจิสทรี, การตัดสินใจ agent_base.py — คลาสพื้นฐานและวงจรชีวิตของเอเจนต์ agents/agent_cybershield.py — ตัวอย่างเอเจนต์: CyberShield agents/agent_zeroday.py — ตัวอย่างเอเจนต์: ZeroDay detector messagebus.py — พับ/ซับในหน่วยความจำและตัวเลือก Redis backend api/main.py — แอป FastAPI สำหรับจัดการ SAI และเอเจนต์ auth.py — คีย์ API แบบง่าย/ยูทิลิตี้ JWT docker-compose.yml — เริ่ม Redis + app requirements.txt — การอ้างอิง Python run_demo.sh — เรียกใช้เดโมท้องถิ่นโดยใช้บัสในหน่วยความจำ วิธีเรียกใช้ (เดโมโดยไม่ใช้ Redis) สร้าง Python 3.10+ venv pip install -r requirements.txt bash run_demo.sh เปิด http://localhost:8000/docs เพื่อโต้ตอบกับ API วิธีเรียกใช้ Redis (แนะนำสำหรับหลายกระบวนการ) docker-compose up --build แอปจะเชื่อมต่อกับ Redis ที่ redis://redis:6379 สรุปสถาปัตยกรรม แกนหลักของ SAI ส่วนกลางจะควบคุมดูแลนโยบาย รับรายงานจากเอเจนต์ และออกคำสั่งผ่านบัสข้อความ เอเจนต์จะสมัครรับหัวข้อข้อความ ดำเนินงาน และเผยแพร่เหตุการณ์ API เปิดเผยจุดสิ้นสุดเพื่อแสดงรายการเอเจนต์ ส่งคำสั่ง และอ่านการแจ้งเตือน ไฟล์ """: requirements.txt """ fastapi uvicorn[standard] httpx pydantic python-jose[cryptography] aioredis asyncio watchfiles """ FILE: docker-compose.yml """ version: '3.8' services: redis: image: redis:7-alpine ports: - '6379:6379' app: build: . command: uvicorn api.main:app --host 0.0.0.0 --port 8000 --reload volumes: - .:/app depends_on: - redis ports: - '8000:8000' """ FILE: messagebus.py """ บัสข้อความแบบอะซิงโครนัสที่เรียบง่ายพร้อมแบ็กเอนด์แบบปลั๊กอิน import asyncio from typing import Callable, Dict, List, Any, Optional class InMemoryBus: def init(self): self.subscribers: Dict[str, List[Callable[[Any], None]]] = {} def subscribe(self, topic: str, callback: Callable[[Any], None]): self.subscribers.setdefault(topic, []).append(callback) async def publish(self, topic: str, message: Any): # ส่งมอบให้กับสมาชิกแบบอะซิงโครนัส หาก topic อยู่ใน self.subscribers: สำหรับ cb ในรายการ(self.subscribers[topic]): # กำหนดเวลาโดยไม่ต้องรอเพื่อหลีกเลี่ยงการบล็อก asyncio.create_task(cb(message)) บัสที่รองรับ Redis ที่เป็นทางเลือก (ต้องใช้ aioredis) ลอง: นำเข้า aioredis ยกเว้น ข้อยกเว้น: aioredis = None คลาส RedisBus: def init(self, url: str = 'redis://localhost:6379'): หาก aioredis เป็น None:เพิ่ม RuntimeError('aioredis ไม่ได้ติดตั้ง') self.url = url self._pub = None self._sub = None self._listeners: Dict[str, List[Callable[[Any], None]]] = {} async def connect(self): self._pub = await aioredis.from_url(self.url) self._sub = await aioredis.from_url(self.url) async def subscribe(self, topic: str, callback: Callable[[Any], None]): self._listeners.setdefault(topic, []).append(callback) # เริ่มตัวรับฟังพื้นหลังสำหรับหัวข้อ asyncio.create_task(self._listener(topic)) async def _listener(self, topic: str): pubsub = self._sub.pubsub() await pubsub.subscribe(topic) async สำหรับข้อความใน pubsub.listen(): ถ้า msg เป็น None: ดำเนินการต่อ # msg เป็น dict ที่มี 'type' และ 'data' ถ้า msg.get('type') == 'message': data = msg.get('data') สำหรับ cb ในรายการ(self._listeners.get(topic, [])): asyncio.create_task(cb(data)) async def publish(self, topic: str, message: Any): รอ self._pub.publish(topic, str(message)) """ FILE: agent_base.py """ import asyncio import uuid from typing import Any, Dict class AgentBase: def init(self, name: str, bus, core_ref=None, config: Dict = None): self.id = str(uuid.uuid4()) self.name = name self.bus = bus self.core = core_ref self.config = config หรือ {} self.running = False async def start(self): self.running = True # โดยทั่วไปตัวแทนจะสมัครรับหัวข้อที่เกี่ยวข้องกับบทบาทของพวกเขา รอ self.subscribe() asyncio.create_task(self.run()) async def stop(self): self.running = False async def subscribe(self): # override เพื่อสมัครรับหัวข้อ ส่ง async def run(self): # override ลูปหลัก ขณะที่ self.running: รอ asyncio.sleep(1) async def handle_message(self, message: Any): # override: ประมวลผลข้อความที่เผยแพร่บนบัส ส่ง async def publish_event(self, topic: str, event: Any): รอ self.bus.publish(topic, { 'agent': self.name, 'agent_id': self.id, 'event': event }) """ FILE: agents/agent_cybershield.py """ from agent_base import AgentBase import asyncio class AgentCyberShield(AgentBase): async def subscribe(self): # รับฟังเหตุการณ์ 'network.alert' และ 'command.cybershield' รอ self.bus.subscribe('network.alert', self.handle_message) รอ self.bus.subscribe('command.cybershield', self.handle_message) async def run(self): # การตรวจสอบสุขภาพเป็นระยะ / ลูปการล่าภัยคุกคาม ในขณะที่ self.running: # การตรวจสอบจำลองแบบง่าย ๆ ภัยคุกคาม = รอ self._perform_scan() ถ้าภัยคุกคาม: รอ self.publish_event('sai.alert', {'type': 'cyber', 'threat': threat}) รอ asyncio.sleep(self.config.get('scan_interval', 10)) async def _perform_scan(self): # ตัวแทน: รวม IDS, YARA หรือบันทึก การวิเคราะห์ # คืนค่า None หรือ threat dict return None async def handle_message(self, message): print(f"[CyberShield] received message: {message}") # ประมวลผลคำสั่ง if isinstance(message, dict) and message.get('command') == 'force_scan':threat = await self._perform_scan() await self.publish_event('sai.alert', {'type': 'cyber', 'threat': threat or 'no-threat'}) """ FILE: agents/agent_zeroday.py """ from agent_base import AgentBase import asyncio class AgentZeroDay(AgentBase): async def subscribe(self): await self.bus.subscribe('feed.external', self.handle_message) async def run(self): # ตัวแทนที่ไม่ได้ใช้งาน — ตอบสนองต่อฟีดในขณะที่ self.running: await asyncio.sleep(1) async def handle_message(self, message): # จำลองการตรวจจับรูปแบบที่น่าสงสัย data = message # ฮิวริสติกแบบไร้เดียงสา: ถ้า 'exploit' ใน data -> แจ้งเตือน ถ้า isinstance(data, str) และ 'exploit' ใน data.lower(): รอ self.publish_event('sai.alert', {'type': 'zeroday', 'evidence': data}) """ FILE: sai_core.py """ import asyncio from typing import Dict, Any class SAICore: def init(self, bus): self.bus = bus self.agents: Dict[str, Any] = {} self.policies = {} # central alert queue self.alerts = [] async def register_agent(self, agent): self.agents[agent.id] = agent # อนุญาตให้แกนกลางรับฟังเหตุการณ์ของตัวแทน await self.bus.subscribe('sai.alert', self._handle_alert) async def _handle_alert(self, message): # standardize and store alert print(f"[SAI] Alert received: {message}") self.alerts.append(message) # simple decision ตรรกะ: ยกระดับถ้าเหตุการณ์ทางไซเบอร์หรือ zeroday = message.get('event') ถ้า isinstance(message, dict) มิฉะนั้นจะเป็น None ถ้า event และ event.get('type') ใน ('cyber', 'zeroday'): # เผยแพร่คำสั่งไปยังผู้ตอบสนองรอ self.bus.publish('command.cordon', {'action': 'isolate_host', 'details': event}) def list_agents(self): return [{ 'id': k, 'name': v.name } for k,v in self.agents.items()] def get_alerts(self): return list(self.alerts) """ FILE: auth.py """ from datetime import datetime, timedelta from jose import jwt SECRET_KEY = 'please-change-this-in-prod' ALGORITHM = 'HS256' def create_token(subject: str, expires_minutes: int = 60): to_encode = {'sub': subject, 'exp': datetime.utcnow() + timedelta(minutes=expires_minutes)} return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) def decode_token(token: str): from jose import JWTError try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) return payload.get('sub') except JWTError: return None """ FILE: api/main.py """ from fastapi import FastAPI, HTTPException, Depends, Header import asyncio from messagebus import InMemoryBus, RedisBus from sai_core import SAICore from agent_base import AgentBase from agents.agent_cybershield import AgentCyberShield from agents.agent_zeroday นำเข้า AgentZeroDay จากการรับรองความถูกต้อง นำเข้า create_token, decode_token app = FastAPI(title='SAI - Guardian Agents API') เลือกบัส:หากตั้งค่า REDIS_URL env var ให้ใช้ RedisBus มิฉะนั้น in-memory BUS = InMemoryBus() CORE = SAICore(BUS) REGISTERED = {} async def require_auth(authorization: str = Header(None)): ถ้าไม่ใช่ authorization: เพิ่ม HTTPException(status_code=401, detail='Missing Authorization') token = authorization.replace('Bearer ', '') sub = decode_token(token) ถ้าไม่ใช่ sub: เพิ่ม HTTPException(status_code=401, detail='Invalid token') return sub @app.on_event('startup') async def startup_event(): # spawn ตัวอย่างตัวแทน cyber = AgentCyberShield('Agent-CyberShield', BUS, core_ref=CORE, config={'scan_interval': 10}) zeroday = AgentZeroDay('Agent-ZeroDay', BUS, core_ref=CORE) รอ cyber.start() รอ zeroday.start() รอ CORE.register_agent(cyber) รอ CORE.register_agent(zeroday) REGISTERED[cyber.id] = cyber REGISTERED[zeroday.id] = zeroday @app.get('/agents') อะซิงโครนัส def list_agents(user=Depends(require_auth)): return CORE.list_agents() @app.post('/agents/{agent_id}/command') อะซิงโครนัส def command_agent(agent_id: str, command: dict, user=Depends(require_auth)): agent = REGISTERED.get(agent_id) ถ้าไม่ใช่ agent: เพิ่ม HTTPException(404, 'ไม่พบ Agent') # เผยแพร่คำสั่งที่กำหนดเป้าหมายไปยัง Agent ตามหัวข้อ topic = f'command.{agent.name.lower()}' รอ BUS.publish(หัวข้อ, คำสั่ง) กลับ {'status': 'ตกลง'} @app.ge +### FILE: guardian_ai_system.py +# guardian_ai_system.py import json import threading import time from typing import List, Dict, Any # ------------------------------- # Law Compliance Module # ------------------------------- class LawComplianceModule: def __init__(self, law_db: Dict[str, Any]): self.law_db = law_db def is_compliant(self, action: dict, country_code: str) -> bool: law_rules = self.law_db.get(country_code, {}) action_type = action.get("type") return law_rules.get(action_type, True) # ------------------------------- # Plan Manager # ------------------------------- class PlanManager: def __init__(self): self.backup_plans = {} def add_backup_plan(self, main_plan: str, backup_plan: str): self.backup_plans[main_plan] = backup_plan def get_backup_plan(self, main_plan: str): return self.backup_plans.get(main_plan, None) # ------------------------------- # Team Member # ------------------------------- class TeamMember: def __init__(self, name: str): self.name = name def suggest_solution(self, event: dict) -> dict: return { "solution": f"{self.name}'s solution for {event.get('description', '')}", "impact_score": event.get("risk_level", 1.0) } # ------------------------------- # Guardian AI Core # ------------------------------- class GuardianAI: def __init__(self, law_db: Dict[str, Any], team_members: List[TeamMember]): self.law_module = LawComplianceModule(law_db) self.plan_manager = PlanManager() self.team_members = team_members self.feedback_log: List[Dict[str, Any]] = [] # ----- Analysis ----- def analyze_intent(self, action: dict) -> Dict[str, Any]: intent = action.get("intent", "unknown") impact = action.get("impact", {}) return {"intent": intent, "impact": impact} def assess_impact(self, action: dict) -> float: return float(action.get("impact", {}).get("risk_level", 0.5)) def check_compliance(self, action: dict, country_code: str) -> bool: return self.law_module.is_compliant(action, country_code) def contingency(self, main_plan: str) -> str: return self.plan_manager.get_backup_plan(main_plan) def collaborative_solution(self, event: dict) -> dict: solutions = [member.suggest_solution(event) for member in self.team_members] return min(solutions, key=lambda s: s.get("impact_score", 100)) # ----- Feedback & Learning ----- def empathetic_feedback(self, action: dict, outcome: dict): self.feedback_log.append({"action": action, "outcome": outcome}) self.learn_from_feedback() def learn_from_feedback(self): if not self.feedback_log: return avg_risk = sum(float(f['outcome'].get('risk_level', 0.0)) for f in self.feedback_log) / len(self.feedback_log) print(f"[Learning] Average risk updated: {avg_risk:.2f}") # ----- Intervention ----- def intervene(self, risk_level: float, threshold: float = 0.7) -> bool: if risk_level > threshold: self.empathetic_feedback({"type": "intervene"}, {"risk_level": risk_level}) return True return False # ----- Main Workflow ----- def workflow(self, action: dict, country_code: str = "TH") -> dict: print("\n----- Guardian AI Workflow -----") print(f"Action: {action}") analysis = self.analyze_intent(action) print(f"Intent: {analysis['intent']}, Impact: {analysis['impact']}") compliant = self.check_compliance(action, country_code) print(f"Law Compliance: {compliant}") risk_level = self.assess_impact(action) print(f"Risk Level: {risk_level}") backup_plan = self.contingency(action.get("plan", "default")) print(f"Backup Plan: {backup_plan}") event = {"description": action.get("description", ""), "risk_level": risk_level} solution = self.collaborative_solution(event) print(f"Collaborative Solution: {solution['solution']} (Impact Score: {solution['impact_score']})") if self.intervene(risk_level): print("Guardian AI Intervened due to high risk.") else: print("No intervention needed.") self.empathetic_feedback(action, { "compliance": compliant, "risk_level": risk_level, "solution": solution }) print("----- End Workflow -----\n") return { "action": action, "analysis": analysis, "compliance": compliant, "risk_level": risk_level, "backup_plan": backup_plan, "solution": solution } # ----- Dashboard Updater ----- def start_dashboard_updater(self, filepath: str = "feedback_log.json", interval_sec: int = 5): def updater(): while True: try: with open(filepath, "w") as f: json.dump(self.feedback_log, f, indent=2) except Exception as e: print("[dashboard_updater] write error:", e) time.sleep(interval_sec) threading.Thread(target=updater, daemon=True).start() # ------------------------------- # Main Execution (demo) # ------------------------------- if __name__ == "__main__": law_db = { "TH": {"send_data": True, "delete_data": False, "update_profile": True}, "US": {"send_data": False, "delete_data": True, "update_profile": True} } team = [TeamMember("Alice"), TeamMember("Bob")] guardian = GuardianAI(law_db, team) guardian.plan_manager.add_backup_plan("send_data", "notify_admin") guardian.plan_manager.add_backup_plan("delete_data", "archive_data") guardian.start_dashboard_updater("data/feedback_log.json", interval_sec=3) actions = [ {"type": "send_data", "intent": "help_user", "impact": {"risk_level": 0.8}, "plan": "send_data", "description": "Request to send sensitive data"}, {"type": "update_profile", "intent": "improve_account", "impact": {"risk_level": 0.2}, "plan": "update_profile", "description": "User updates social profile"}, {"type": "delete_data", "intent": "remove_personal_info", "impact": {"risk_level": 0.6}, "plan": "delete_data", "description": "Request to delete user data"} ] for action in actions: guardian.workflow(action, country_code="TH") +### FILE: requirements.txt +fastapi==0.95.2 uvicorn[standard]==0.22.0 httpx==0.24.1 pydantic==1.10.11 python-jose[cryptography]==3.3.0 aioredis==2.0.1 +### FILE: docker-compose.yml +version: '3.8' services: redis: image: redis:7-alpine ports: - '6379:6379' app: build: . command: uvicorn api.main:app --host 0.0.0.0 --port 8000 --reload volumes: - .:/app depends_on: - redis ports: - '8000:8000' +### FILE: Dockerfile +FROM python:3.10-slim WORKDIR /app COPY . /app RUN pip install --no-cache-dir -r requirements.txt CMD ["uvicorn", "api.main:app", "--host", "0.0.0.0", "--port", "8000"] +### FILE: messagebus.py +# messagebus.py import asyncio from typing import Callable, Dict, List, Any, Optional import json import os # Simple async in-memory pub/sub bus class InMemoryBus: def __init__(self): self.subscribers: Dict[str, List[Callable[[Any], Any]]] = {} self._lock = asyncio.Lock() async def subscribe(self, topic: str, callback: Callable[[Any], Any]): async with self._lock: self.subscribers.setdefault(topic, []).append(callback) async def publish(self, topic: str, message: Any): # deliver to subscribers without blocking if topic in self.subscribers: for cb in list(self.subscribers[topic]): try: asyncio.create_task(cb(message)) except Exception: # swallow per-subscriber errors pass # Optional Redis bus (requires aioredis) try: import aioredis except Exception: aioredis = None class RedisBus: def __init__(self, url: str = 'redis://localhost:6379'): if aioredis is None: raise RuntimeError("aioredis not installed") self.url = url self._pub = None self._sub = None self._listeners: Dict[str, List[Callable[[Any], Any]]] = {} self._tasks = [] async def connect(self): self._pub = aioredis.from_url(self.url) self._sub = aioredis.from_url(self.url) # nothing else yet async def subscribe(self, topic: str, callback: Callable[[Any], Any]): self._listeners.setdefault(topic, []).append(callback) # start a listener task per topic task = asyncio.create_task(self._listener(topic)) self._tasks.append(task) async def _listener(self, topic: str): pubsub = self._sub.pubsub() await pubsub.subscribe(topic) async for msg in pubsub.listen(): if msg is None: continue if msg.get('type') == 'message': raw = msg.get('data') # aioredis returns bytes - try decode data = raw.decode() if isinstance(raw, (bytes, bytearray)) else raw for cb in list(self._listeners.get(topic, [])): asyncio.create_task(cb(data)) async def publish(self, topic: str, message: Any): if self._pub is None: await self.connect() await self._pub.publish(topic, json.dumps(message)) +### FILE: agent_base.py +# agent_base.py import asyncio import uuid from typing import Any, Dict class AgentBase: def __init__(self, name: str, bus, core_ref=None, config: Dict = None): self.id = str(uuid.uuid4()) self.name = name self.bus = bus self.core = core_ref self.config = config or {} self.running = False self._tasks = [] async def start(self): self.running = True await self.subscribe() task = asyncio.create_task(self.run()) self._tasks.append(task) async def stop(self): self.running = False for t in self._tasks: t.cancel() self._tasks = [] async def subscribe(self): """Override: subscribe to topics on bus""" pass async def run(self): """Override: main loop""" while self.running: await asyncio.sleep(1) async def handle_message(self, message: Any): """Override: called when bus delivers a message""" pass async def publish_event(self, topic: str, event: Any): await self.bus.publish(topic, { 'agent': self.name, 'agent_id': self.id, 'event': event }) +### FILE: agents/agent_cybershield.py +# agents/agent_cybershield.py import asyncio from agent_base import AgentBase class AgentCyberShield(AgentBase): async def subscribe(self): await self.bus.subscribe('network.alert', self.handle_message) await self.bus.subscribe('command.cybershield', self.handle_message) async def run(self): while self.running: threat = await self._perform_scan() if threat: await self.publish_event('sai.alert', {'type': 'cyber', 'threat': threat}) await asyncio.sleep(self.config.get('scan_interval', 10)) async def _perform_scan(self): # placeholder: integrate IDS / scanning # return dict if threat found else None # For demo, randomly no threat return None async def handle_message(self, message): print(f"[AgentCyberShield] message: {message}") if isinstance(message, dict) and message.get('command') == 'force_scan': threat = await self._perform_scan() await self.publish_event('sai.alert', {'type': 'cyber', 'threat': threat or 'no-threat'}) +### FILE: agents/agent_zeroday.py +# agents/agent_zeroday.py import asyncio from agent_base import AgentBase class AgentZeroDay(AgentBase): async def subscribe(self): await self.bus.subscribe('feed.external', self.handle_message) async def run(self): while self.running: await asyncio.sleep(1) async def handle_message(self, message): # simple heuristic: if payload contains 'exploit' -> alert data = message if isinstance(data, str) and 'exploit' in data.lower(): await self.publis + + . +