Files
LivingAgents/character_explorer.py
2025-09-05 19:00:24 +02:00

510 lines
19 KiB
Python

import asyncio
import os
import pickle
from pprint import pprint
from datetime import datetime
from typing import Optional, List
import yaml
from pathlib import Path
from dotenv import load_dotenv
from living_agents import CharacterAgent, PromptManager
import logging
from llm_connector import LLMMessage
logger = logging.getLogger(__name__)
def save_character_to_cache(agent: CharacterAgent, template_path: Path):
"""Save a character agent to cache"""
cache_path = Path('.cache') / f"{template_path.stem}.pkl"
try:
# Create cache data with metadata
cache_data = {
'character': agent.character,
'memories': agent.memory_stream.memories,
'template_path': template_path,
'cached_at': datetime.now(),
'cache_version': '1.0'
}
with open(cache_path, 'wb') as f:
pickle.dump(cache_data, f)
print(f"💾 Character saved to cache: {cache_path.name}")
except Exception as e:
print(f"⚠️ Warning: Could not save character to cache: {e}")
def load_character_from_cache(template_path: Path) -> Optional[CharacterAgent]:
"""Load a character agent from cache, return None if not found or invalid"""
cache_path = Path('.cache') / f"{template_path.stem}.pkl"
if not cache_path.exists():
return None
try:
with open(cache_path, 'rb') as f:
cache_data = pickle.load(f)
# Verify cache data structure
required_keys = ['character', 'memories', 'template_path', 'cached_at']
if not all(key in cache_data for key in required_keys):
print(f"⚠️ Invalid cache file format: {cache_path.name}")
return None
# Check if template file is newer than cache
template_mtime = Path(template_path).stat().st_mtime
cache_time = cache_data['cached_at'].timestamp()
if template_mtime > cache_time:
print(f"📝 Template file is newer than cache, will recreate character")
return None
# Reconstruct CharacterAgent
from living_agents import LLMAgent
agent = CharacterAgent(cache_data['character'], LLMAgent())
agent.memory_stream.memories = cache_data['memories']
cached_date = cache_data['cached_at'].strftime('%Y-%m-%d %H:%M:%S')
print(f"💾 Loaded character from cache (created: {cached_date})")
return agent
except Exception as e:
print(f"⚠️ Could not load character from cache: {e}")
return None
def get_cache_info(template_path: Path) -> Optional[dict]:
"""Get information about cached character"""
cache_path = Path('.cache') / f"{template_path.stem}.pkl"
if not cache_path.exists():
return None
try:
with open(cache_path, 'rb') as f:
cache_data = pickle.load(f)
return {
'cache_file': cache_path.name,
'cached_at': cache_data['cached_at'],
'character_name': cache_data['character'].name,
'memory_count': len(cache_data['memories'])
}
except:
return None
class CharacterExplorer:
"""Interactive explorer for testing CharacterAgent functionality"""
chat_history: List[LLMMessage]
def __init__(self, character_agent: CharacterAgent):
self.agent = character_agent
self.chat_history = []
async def start_interactive_session(self):
"""Start the interactive exploration menu"""
print(f"\n🎭 Character Explorer - {self.agent.character.name}")
print("=" * 50)
while True:
self._show_menu()
try:
choice = input("\nChoose option (1-8): ").strip()
if choice == "1":
await self._handle_ask_question()
if choice == "2":
await self._handle_chat()
elif choice == "3":
await self._handle_list_memories()
elif choice == "4":
await self._handle_view_memory()
elif choice == "5":
await self._handle_memory_stats()
elif choice == "6":
await self._handle_character_summary()
elif choice == "7":
# Regenerate character
if await self._handle_regenerate_character():
break # Exit to main menu after regeneration
elif choice == "8":
print("👋 Goodbye!")
break
else:
print("❌ Invalid choice. Please enter 1-8.")
except KeyboardInterrupt:
print("\n👋 Goodbye!")
break
except Exception as e:
print(f"❌ Error: {e}")
def clear_character_cache(self) -> bool:
"""Clear cached character for a template"""
cache_file = Path('.cache') / f"{self.agent.character.template_file.stem}.pkl"
if cache_file.exists():
try:
cache_file.unlink()
return True
except Exception as e:
print(f"❌ Could not clear cache: {e}")
return False
return False
def _show_menu(self):
"""Display the interactive menu options"""
print(f"\n🎭 Character Explorer Menu")
print("=" * 30)
print(f"1. 💬 Ask {self.agent.character.name} a question")
print(f"2. 💬 Chat with {self.agent.character.name}")
print("3. 📚 List all memories")
print("4. 🔍 View specific memory (with related)")
print("5. 📊 Memory statistics")
print("6. 👤 Character summary")
print("7. 🔄 Regenerate character")
print("8. 🚪 Exit")
async def _handle_ask_question(self):
"""Handle asking questions to the character"""
question = input(f"\n💬 Ask {self.agent.character.name}: ").strip()
if not question:
return
print(f"\n🤔 {self.agent.character.name} is thinking...")
try:
response = await self.agent.react_to_situation(question)
print(f"💬 {self.agent.character.name}: {response}")
# Show which memories were retrieved for this response
relevant_memories = await self.agent.memory_stream.retrieve_related_memories(question, k=5)
print(f"\n🧠 Memories used for this response:")
for i, mem in enumerate(relevant_memories, 1):
print(f" {i}. [{mem.memory_type}] {mem.description[:80]}{'...' if len(mem.description) > 80 else ''}")
except Exception as e:
print(f"❌ Error getting response: {e}")
async def _handle_chat(self):
"""Handle chat with a character"""
if not self.chat_history:
scenario_content = input(f"\n💬 Set a scenario for a new chat with {self.agent.character.name}: ").strip()
if not scenario_content:
return
self.chat_history.append({'role': 'system', 'content': scenario_content})
scenario_content = input(f"\n💬 Ask {self.agent.character.name}: ").strip()
if not scenario_content:
return
print(f"\n🤔 {self.agent.character.name} is thinking...")
try:
response = await self.agent.react_to_situation(scenario_content)
print(f"💬 {self.agent.character.name}: {response}")
# Show which memories were retrieved for this response
relevant_memories = await self.agent.memory_stream.retrieve_related_memories(scenario_content, k=5)
print(f"\n🧠 Memories used for this response:")
for i, mem in enumerate(relevant_memories, 1):
print(f" {i}. [{mem.memory_type}] {mem.description[:80]}{'...' if len(mem.description) > 80 else ''}")
except Exception as e:
print(f"❌ Error getting response: {e}")
async def _handle_list_memories(self):
"""List all memories with filtering options"""
print("\n📚 Memory Filter Options:")
print("1. All memories")
print("2. Observations only")
print("3. Reflections only")
print("4. Plans only")
print("5. By importance (high to low)")
print("6. By recency (newest first)")
filter_choice = input("Choose filter (1-6): ").strip()
# memories = self.agent.memory_stream.memories.copy()
if filter_choice == "2":
memories = [m for m in self.agent.memory_stream.memories if m.memory_type == "observation"]
title = "Observations"
elif filter_choice == "3":
memories = [m for m in self.agent.memory_stream.memories if m.memory_type == "reflection"]
title = "Reflections"
elif filter_choice == "4":
memories = [m for m in self.agent.memory_stream.memories if m.memory_type == "plan"]
title = "Plans"
elif filter_choice == "5":
memories = sorted(self.agent.memory_stream.memories, key=lambda m: m.importance_score, reverse=True)
title = "All Memories (by importance)"
elif filter_choice == "6":
memories = sorted(self.agent.memory_stream.memories, key=lambda m: m.creation_time, reverse=True)
title = "All Memories (by recency)"
else:
memories = self.agent.memory_stream.memories
title = "All Memories"
print(f"\n📋 {title} ({len(memories)} total):")
for i, memory in enumerate(memories):
age_hours = (memory.last_accessed - memory.creation_time).total_seconds() / 3600
print(
f"[#{i:3d}] [{memory.memory_type[:4]}] [imp:{memory.importance_score}] [age:{age_hours:.1f}h] {memory.description}")
if len(memories) > 20:
print(f"\n... showing first 20 of {len(memories)} memories")
print("💡 Tip: Use option 3 to view specific memories in detail")
async def _handle_view_memory(self):
"""View a specific memory with its related memories"""
memory_num = int(input(f"\nEnter memory number (1-{len(self.agent.memory_stream.memories) - 1}): ").strip())
if 0 <= memory_num <= len(self.agent.memory_stream.memories) - 1:
memory = self.agent.memory_stream.memories[memory_num]
print(f"\n🔍 Memory #{memory_num} Details:")
print(f" Type: {memory.memory_type}")
print(f" Importance: {memory.importance_score}/10")
print(f" Created: {memory.creation_time.strftime('%Y-%m-%d %H:%M:%S')}")
print(f" Last accessed: {memory.last_accessed.strftime('%Y-%m-%d %H:%M:%S')}")
print(f" Description: {memory.description}")
# Show related memories using embeddings
if memory.memory_type == 'observation' or memory.memory_type == 'plan':
print(f"\n🔗 Related memories:")
for rel_mem in memory.related_memories:
rel_index = self.agent.memory_stream.memories.index(rel_mem)
print(
f" [#{rel_index:3d}] [{rel_mem.memory_type}] {rel_mem.description}")
else:
print(f"❌ Invalid memory number. Range: 1-{len(self.agent.memory_stream.memories)}")
async def _handle_memory_stats(self):
"""Show detailed memory statistics"""
memories = self.agent.memory_stream.memories
print(f"\n📊 Memory Statistics for {self.agent.character.name}:")
print(f" Total memories: {len(memories)}")
# Breakdown by type
by_type = {}
for mem in memories:
by_type[mem.memory_type] = by_type.get(mem.memory_type, 0) + 1
print(f"\n📂 Memory Types:")
for mem_type, count in by_type.items():
percentage = (count / len(memories)) * 100
print(f" {mem_type.title()}: {count} ({percentage:.1f}%)")
# Importance distribution
importance_scores = [m.importance_score for m in memories]
if importance_scores:
print(f"\n📈 Importance Distribution:")
print(f" Average: {sum(importance_scores) / len(importance_scores):.1f}")
print(f" Range: {min(importance_scores)}-{max(importance_scores)}")
# Visual histogram
print(f" Score distribution:")
for score in range(1, 11):
count = importance_scores.count(score)
bar = "" * max(1, count // 2) if count > 0 else ""
print(f" {score:2d}: {count:2d} {bar}")
# Recency info
if memories:
oldest = min(memories, key=lambda m: m.creation_time)
newest = max(memories, key=lambda m: m.creation_time)
print(f"\n⏰ Time Span:")
print(f" Oldest: {oldest.creation_time.strftime('%Y-%m-%d %H:%M')}")
print(f" Newest: {newest.creation_time.strftime('%Y-%m-%d %H:%M')}")
async def _handle_character_summary(self):
"""Generate and show character summary"""
print(f"\n👤 Generating summary for {self.agent.character.name}...")
try:
# summary = await self.agent.get_summary()
print(f"\n📝 AI-Generated Character Summary:")
# print(f" {summary}")
print(f"\n📋 Structured Character Data:")
print(f" Name: {self.agent.character.name}")
print(f" Age: {self.agent.character.age}")
print(f" Personality: {self.agent.character.personality}")
print(f" Occupation: {self.agent.character.occupation}")
print(f" Location: {self.agent.character.location}")
print("")
print(f" - File: {self.agent.character.template_file}")
print(f" Traits")
traits_summary = "\n".join([f" - {trait.strength}/10 {trait.name} ({trait.description})" for trait in self.agent.character.traits]) if self.agent.character.traits else "No traits yet."
print(traits_summary)
if self.agent.character.relationships:
print(f" Relationships:")
for person, relationship in self.agent.character.relationships.items():
print(f"{person}: {relationship}")
if self.agent.character.goals:
print(f" Goals:")
for goal in self.agent.character.goals:
print(f"{goal}")
except Exception as e:
print(f"❌ Error generating summary: {e}")
async def _handle_regenerate_character(self) -> bool:
"""Handle regenerating the current character"""
character_name = self.agent.character.name
print(f"\n🔄 Regenerate {character_name}")
print("This will clear the cache and recreate the character from scratch.")
print("All current memories and traits will be lost!")
# Confirm regeneration
confirm = input(f"Regenerate {character_name}? (y/N): ").strip().lower()
if confirm not in ['y', 'yes']:
print("❌ Regeneration cancelled")
return False
template_file = self.agent.character.template_file
if self.agent.character.template_file is None:
print(f"❌ Could not find template file for {character_name}")
return False
# Clear the cache
if self.clear_character_cache():
print(f"🗑️ Cleared cache for {character_name}")
else:
print("❌ Failed to clear cache")
return False
await load_and_explore_character(template_file)
return True
def select_character_template(show_cache_info: bool = True) -> Path:
"""Show character template selection menu and return chosen template path"""
templates_dir = Path(__file__).parent / 'character_templates'
# Find all .yml files in the character_templates directory
template_files = list(templates_dir.glob('*.yml'))
if not template_files:
print("❌ No character templates found in character_templates/")
return None
print("\n🎭 Available Character Templates:")
print("=" * 60)
# Display template options with cache info
for i, template_file in enumerate(template_files, 1):
# Try to load the template to get the name
try:
with open(template_file, 'r', encoding='utf-8') as f:
template_data = yaml.safe_load(f)
character_name = template_data.get('name', template_file.stem)
except:
character_name = template_file.stem
# Check for cached version if requested
if show_cache_info:
cache_info = get_cache_info(template_file)
if cache_info:
cache_status = f"💾 (cached {cache_info['cached_at'].strftime('%m/%d %H:%M')})"
else:
cache_status = "🔄 (will create new)"
print(f"{i}. {character_name} ({template_file.name}) {cache_status}")
else:
print(f"{i}. {character_name} ({template_file.name})")
print(f"{len(template_files) + 1}. 🚪 Exit")
# Get user choice
while True:
try:
choice = input(f"\nChoose a character template (1-{len(template_files) + 1}): ").strip()
choice_num = int(choice)
if choice_num == len(template_files) + 1:
return None # Exit choice
elif 1 <= choice_num <= len(template_files):
return template_files[choice_num - 1]
else:
print(f"❌ Invalid choice. Please enter 1-{len(template_files) + 1}.")
except ValueError:
print(f"❌ Please enter a valid number (1-{len(template_files) + 1}).")
except KeyboardInterrupt:
return None
async def load_and_explore_character(template_path: Path):
"""Load a character template and start exploration"""
# Load environment
load_dotenv()
# Try to load from cache first
print(f"📁 Loading character from {template_path.name}...")
agent = load_character_from_cache(template_path)
if agent is None:
# No cache or cache invalid, create from template
try:
with open(template_path, 'r', encoding='utf-8') as f:
template = yaml.safe_load(f)
template['yaml_file'] = template_path
except Exception as e:
print(f"❌ Error loading template: {e}")
return None
print(f"🤖 Creating memories and scoring importance...")
try:
# Create character agent from template
agent = await CharacterAgent.create_from_template(template)
print(f"{agent.character.name} created successfully!")
# Save to cache for next time
save_character_to_cache(agent, template_path)
except Exception as e:
print(f"❌ Error creating character: {e}")
import traceback
traceback.print_exc()
return None
else:
print(f"{agent.character.name} loaded successfully!")
# Start explorer
explorer = CharacterExplorer(agent)
await explorer.start_interactive_session()
return agent
async def main():
"""Main function with character selection"""
print("🎭 Welcome to the Living Agents Character Explorer!")
selected_template = select_character_template(show_cache_info=True)
if selected_template is None:
print("👋 Goodbye!")
return
# Load and explore the selected character
await load_and_explore_character(selected_template)
if __name__ == '__main__':
asyncio.run(main())