import asyncio import os import pickle from pprint import pprint from datetime import datetime from typing import Optional, List import yaml from pathlib import Path from dotenv import load_dotenv from living_agents import CharacterAgent, PromptManager import logging from llm_connector import LLMMessage logger = logging.getLogger(__name__) def save_character_to_cache(agent: CharacterAgent, template_path: Path): """Save a character agent to cache""" cache_path = Path('.cache') / f"{template_path.stem}.pkl" try: # Create cache data with metadata cache_data = { 'character': agent.character, 'memories': agent.memory_stream.memories, 'template_path': template_path, 'cached_at': datetime.now(), 'cache_version': '1.0' } with open(cache_path, 'wb') as f: pickle.dump(cache_data, f) print(f"šŸ’¾ Character saved to cache: {cache_path.name}") except Exception as e: print(f"āš ļø Warning: Could not save character to cache: {e}") def load_character_from_cache(template_path: Path) -> Optional[CharacterAgent]: """Load a character agent from cache, return None if not found or invalid""" cache_path = Path('.cache') / f"{template_path.stem}.pkl" if not cache_path.exists(): return None try: with open(cache_path, 'rb') as f: cache_data = pickle.load(f) # Verify cache data structure required_keys = ['character', 'memories', 'template_path', 'cached_at'] if not all(key in cache_data for key in required_keys): print(f"āš ļø Invalid cache file format: {cache_path.name}") return None # Check if template file is newer than cache template_mtime = Path(template_path).stat().st_mtime cache_time = cache_data['cached_at'].timestamp() if template_mtime > cache_time: print(f"šŸ“ Template file is newer than cache, will recreate character") return None # Reconstruct CharacterAgent from living_agents import LLMAgent agent = CharacterAgent(cache_data['character'], LLMAgent()) agent.memory_stream.memories = cache_data['memories'] cached_date = cache_data['cached_at'].strftime('%Y-%m-%d %H:%M:%S') print(f"šŸ’¾ Loaded character from cache (created: {cached_date})") return agent except Exception as e: print(f"āš ļø Could not load character from cache: {e}") return None def get_cache_info(template_path: Path) -> Optional[dict]: """Get information about cached character""" cache_path = Path('.cache') / f"{template_path.stem}.pkl" if not cache_path.exists(): return None try: with open(cache_path, 'rb') as f: cache_data = pickle.load(f) return { 'cache_file': cache_path.name, 'cached_at': cache_data['cached_at'], 'character_name': cache_data['character'].name, 'memory_count': len(cache_data['memories']) } except: return None class CharacterExplorer: """Interactive explorer for testing CharacterAgent functionality""" chat_history: List[LLMMessage] def __init__(self, character_agent: CharacterAgent): self.agent = character_agent self.chat_history = [] async def start_interactive_session(self): """Start the interactive exploration menu""" print(f"\nšŸŽ­ Character Explorer - {self.agent.character.name}") print("=" * 50) while True: self._show_menu() try: choice = input("\nChoose option (1-8): ").strip() if choice == "1": await self._handle_ask_question() if choice == "2": await self._handle_chat() elif choice == "3": await self._handle_list_memories() elif choice == "4": await self._handle_view_memory() elif choice == "5": await self._handle_memory_stats() elif choice == "6": await self._handle_character_summary() elif choice == "7": # Regenerate character if await self._handle_regenerate_character(): break # Exit to main menu after regeneration elif choice == "8": print("šŸ‘‹ Goodbye!") break else: print("āŒ Invalid choice. Please enter 1-8.") except KeyboardInterrupt: print("\nšŸ‘‹ Goodbye!") break except Exception as e: print(f"āŒ Error: {e}") def clear_character_cache(self) -> bool: """Clear cached character for a template""" cache_file = Path('.cache') / f"{self.agent.character.template_file.stem}.pkl" if cache_file.exists(): try: cache_file.unlink() return True except Exception as e: print(f"āŒ Could not clear cache: {e}") return False return False def _show_menu(self): """Display the interactive menu options""" print(f"\nšŸŽ­ Character Explorer Menu") print("=" * 30) print(f"1. šŸ’¬ Ask {self.agent.character.name} a question") print(f"2. šŸ’¬ Chat with {self.agent.character.name}") print("3. šŸ“š List all memories") print("4. šŸ” View specific memory (with related)") print("5. šŸ“Š Memory statistics") print("6. šŸ‘¤ Character summary") print("7. šŸ”„ Regenerate character") print("8. 🚪 Exit") async def _handle_ask_question(self): """Handle asking questions to the character""" question = input(f"\nšŸ’¬ Ask {self.agent.character.name}: ").strip() if not question: return print(f"\nšŸ¤” {self.agent.character.name} is thinking...") try: response = await self.agent.react_to_situation(question) print(f"šŸ’¬ {self.agent.character.name}: {response}") # Show which memories were retrieved for this response relevant_memories = await self.agent.memory_stream.retrieve_related_memories(question, k=5) print(f"\n🧠 Memories used for this response:") for i, mem in enumerate(relevant_memories, 1): print(f" {i}. [{mem.memory_type}] {mem.description[:80]}{'...' if len(mem.description) > 80 else ''}") except Exception as e: print(f"āŒ Error getting response: {e}") async def _handle_chat(self): """Handle chat with a character""" if not self.chat_history: scenario_content = input(f"\nšŸ’¬ Set a scenario for a new chat with {self.agent.character.name}: ").strip() if not scenario_content: return self.chat_history.append({'role': 'system', 'content': scenario_content}) scenario_content = input(f"\nšŸ’¬ Ask {self.agent.character.name}: ").strip() if not scenario_content: return print(f"\nšŸ¤” {self.agent.character.name} is thinking...") try: response = await self.agent.react_to_situation(scenario_content) print(f"šŸ’¬ {self.agent.character.name}: {response}") # Show which memories were retrieved for this response relevant_memories = await self.agent.memory_stream.retrieve_related_memories(scenario_content, k=5) print(f"\n🧠 Memories used for this response:") for i, mem in enumerate(relevant_memories, 1): print(f" {i}. [{mem.memory_type}] {mem.description[:80]}{'...' if len(mem.description) > 80 else ''}") except Exception as e: print(f"āŒ Error getting response: {e}") async def _handle_list_memories(self): """List all memories with filtering options""" print("\nšŸ“š Memory Filter Options:") print("1. All memories") print("2. Observations only") print("3. Reflections only") print("4. Plans only") print("5. By importance (high to low)") print("6. By recency (newest first)") filter_choice = input("Choose filter (1-6): ").strip() # memories = self.agent.memory_stream.memories.copy() if filter_choice == "2": memories = [m for m in self.agent.memory_stream.memories if m.memory_type == "observation"] title = "Observations" elif filter_choice == "3": memories = [m for m in self.agent.memory_stream.memories if m.memory_type == "reflection"] title = "Reflections" elif filter_choice == "4": memories = [m for m in self.agent.memory_stream.memories if m.memory_type == "plan"] title = "Plans" elif filter_choice == "5": memories = sorted(self.agent.memory_stream.memories, key=lambda m: m.importance_score, reverse=True) title = "All Memories (by importance)" elif filter_choice == "6": memories = sorted(self.agent.memory_stream.memories, key=lambda m: m.creation_time, reverse=True) title = "All Memories (by recency)" else: memories = self.agent.memory_stream.memories title = "All Memories" print(f"\nšŸ“‹ {title} ({len(memories)} total):") for i, memory in enumerate(memories): age_hours = (memory.last_accessed - memory.creation_time).total_seconds() / 3600 print( f"[#{i:3d}] [{memory.memory_type[:4]}] [imp:{memory.importance_score}] [age:{age_hours:.1f}h] {memory.description}") if len(memories) > 20: print(f"\n... showing first 20 of {len(memories)} memories") print("šŸ’” Tip: Use option 3 to view specific memories in detail") async def _handle_view_memory(self): """View a specific memory with its related memories""" memory_num = int(input(f"\nEnter memory number (1-{len(self.agent.memory_stream.memories) - 1}): ").strip()) if 0 <= memory_num <= len(self.agent.memory_stream.memories) - 1: memory = self.agent.memory_stream.memories[memory_num] print(f"\nšŸ” Memory #{memory_num} Details:") print(f" Type: {memory.memory_type}") print(f" Importance: {memory.importance_score}/10") print(f" Created: {memory.creation_time.strftime('%Y-%m-%d %H:%M:%S')}") print(f" Last accessed: {memory.last_accessed.strftime('%Y-%m-%d %H:%M:%S')}") print(f" Description: {memory.description}") # Show related memories using embeddings if memory.memory_type == 'observation' or memory.memory_type == 'plan': print(f"\nšŸ”— Related memories:") for rel_mem in memory.related_memories: rel_index = self.agent.memory_stream.memories.index(rel_mem) print( f" [#{rel_index:3d}] [{rel_mem.memory_type}] {rel_mem.description}") else: print(f"āŒ Invalid memory number. Range: 1-{len(self.agent.memory_stream.memories)}") async def _handle_memory_stats(self): """Show detailed memory statistics""" memories = self.agent.memory_stream.memories print(f"\nšŸ“Š Memory Statistics for {self.agent.character.name}:") print(f" Total memories: {len(memories)}") # Breakdown by type by_type = {} for mem in memories: by_type[mem.memory_type] = by_type.get(mem.memory_type, 0) + 1 print(f"\nšŸ“‚ Memory Types:") for mem_type, count in by_type.items(): percentage = (count / len(memories)) * 100 print(f" {mem_type.title()}: {count} ({percentage:.1f}%)") # Importance distribution importance_scores = [m.importance_score for m in memories] if importance_scores: print(f"\nšŸ“ˆ Importance Distribution:") print(f" Average: {sum(importance_scores) / len(importance_scores):.1f}") print(f" Range: {min(importance_scores)}-{max(importance_scores)}") # Visual histogram print(f" Score distribution:") for score in range(1, 11): count = importance_scores.count(score) bar = "ā–ˆ" * max(1, count // 2) if count > 0 else "" print(f" {score:2d}: {count:2d} {bar}") # Recency info if memories: oldest = min(memories, key=lambda m: m.creation_time) newest = max(memories, key=lambda m: m.creation_time) print(f"\nā° Time Span:") print(f" Oldest: {oldest.creation_time.strftime('%Y-%m-%d %H:%M')}") print(f" Newest: {newest.creation_time.strftime('%Y-%m-%d %H:%M')}") async def _handle_character_summary(self): """Generate and show character summary""" print(f"\nšŸ‘¤ Generating summary for {self.agent.character.name}...") try: # summary = await self.agent.get_summary() print(f"\nšŸ“ AI-Generated Character Summary:") # print(f" {summary}") print(f"\nšŸ“‹ Structured Character Data:") print(f" Name: {self.agent.character.name}") print(f" Age: {self.agent.character.age}") print(f" Personality: {self.agent.character.personality}") print(f" Occupation: {self.agent.character.occupation}") print(f" Location: {self.agent.character.location}") print("") print(f" - File: {self.agent.character.template_file}") print(f" Traits") traits_summary = "\n".join([f" - {trait.strength}/10 {trait.name} ({trait.description})" for trait in self.agent.character.traits]) if self.agent.character.traits else "No traits yet." print(traits_summary) if self.agent.character.relationships: print(f" Relationships:") for person, relationship in self.agent.character.relationships.items(): print(f" • {person}: {relationship}") if self.agent.character.goals: print(f" Goals:") for goal in self.agent.character.goals: print(f" • {goal}") except Exception as e: print(f"āŒ Error generating summary: {e}") async def _handle_regenerate_character(self) -> bool: """Handle regenerating the current character""" character_name = self.agent.character.name print(f"\nšŸ”„ Regenerate {character_name}") print("This will clear the cache and recreate the character from scratch.") print("All current memories and traits will be lost!") # Confirm regeneration confirm = input(f"Regenerate {character_name}? (y/N): ").strip().lower() if confirm not in ['y', 'yes']: print("āŒ Regeneration cancelled") return False template_file = self.agent.character.template_file if self.agent.character.template_file is None: print(f"āŒ Could not find template file for {character_name}") return False # Clear the cache if self.clear_character_cache(): print(f"šŸ—‘ļø Cleared cache for {character_name}") else: print("āŒ Failed to clear cache") return False await load_and_explore_character(template_file) return True def select_character_template(show_cache_info: bool = True) -> Path: """Show character template selection menu and return chosen template path""" templates_dir = Path(__file__).parent / 'character_templates' # Find all .yml files in the character_templates directory template_files = list(templates_dir.glob('*.yml')) if not template_files: print("āŒ No character templates found in character_templates/") return None print("\nšŸŽ­ Available Character Templates:") print("=" * 60) # Display template options with cache info for i, template_file in enumerate(template_files, 1): # Try to load the template to get the name try: with open(template_file, 'r', encoding='utf-8') as f: template_data = yaml.safe_load(f) character_name = template_data.get('name', template_file.stem) except: character_name = template_file.stem # Check for cached version if requested if show_cache_info: cache_info = get_cache_info(template_file) if cache_info: cache_status = f"šŸ’¾ (cached {cache_info['cached_at'].strftime('%m/%d %H:%M')})" else: cache_status = "šŸ”„ (will create new)" print(f"{i}. {character_name} ({template_file.name}) {cache_status}") else: print(f"{i}. {character_name} ({template_file.name})") print(f"{len(template_files) + 1}. 🚪 Exit") # Get user choice while True: try: choice = input(f"\nChoose a character template (1-{len(template_files) + 1}): ").strip() choice_num = int(choice) if choice_num == len(template_files) + 1: return None # Exit choice elif 1 <= choice_num <= len(template_files): return template_files[choice_num - 1] else: print(f"āŒ Invalid choice. Please enter 1-{len(template_files) + 1}.") except ValueError: print(f"āŒ Please enter a valid number (1-{len(template_files) + 1}).") except KeyboardInterrupt: return None async def load_and_explore_character(template_path: Path): """Load a character template and start exploration""" # Load environment load_dotenv() # Try to load from cache first print(f"šŸ“ Loading character from {template_path.name}...") agent = load_character_from_cache(template_path) if agent is None: # No cache or cache invalid, create from template try: with open(template_path, 'r', encoding='utf-8') as f: template = yaml.safe_load(f) template['yaml_file'] = template_path except Exception as e: print(f"āŒ Error loading template: {e}") return None print(f"šŸ¤– Creating memories and scoring importance...") try: # Create character agent from template agent = await CharacterAgent.create_from_template(template) print(f"āœ… {agent.character.name} created successfully!") # Save to cache for next time save_character_to_cache(agent, template_path) except Exception as e: print(f"āŒ Error creating character: {e}") import traceback traceback.print_exc() return None else: print(f"āœ… {agent.character.name} loaded successfully!") # Start explorer explorer = CharacterExplorer(agent) await explorer.start_interactive_session() return agent async def main(): """Main function with character selection""" print("šŸŽ­ Welcome to the Living Agents Character Explorer!") selected_template = select_character_template(show_cache_info=True) if selected_template is None: print("šŸ‘‹ Goodbye!") return # Load and explore the selected character await load_and_explore_character(selected_template) if __name__ == '__main__': asyncio.run(main())