92 lines
3.8 KiB
Python
92 lines
3.8 KiB
Python
from typing import List, Dict, Literal
|
|
from living_agents import LLMAgent, Character, CharacterAgent, Memory
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
|
|
logger = logging.getLogger(__name__)
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
|
|
class RoleplaySystem:
|
|
agents: Dict[Character, CharacterAgent]
|
|
global_time: datetime
|
|
scene_state: Dict
|
|
|
|
def __init__(self):
|
|
self.agents = {}
|
|
self.global_time = datetime.now()
|
|
self.scene_state = {
|
|
"location": "cozy coffee shop",
|
|
"time": "afternoon",
|
|
"atmosphere": "quiet and peaceful",
|
|
"active_conversations": [],
|
|
"events": []
|
|
}
|
|
|
|
async def setup_characters(self, characters: List[Character]):
|
|
logger.info('Setting up Characters.')
|
|
|
|
for character in characters:
|
|
self.agents[character] = CharacterAgent(character, LLMAgent(temperature=0.9))
|
|
await self.agents[character].initialize_memories()
|
|
|
|
async def get_character_response(self, character: Character, user_input: str) -> str:
|
|
|
|
print(f"🧠 {character} accessing memories...")
|
|
|
|
# Agent perceives user interaction
|
|
await self.agents[character].perceive(f"Someone asked me: '{user_input}'")
|
|
|
|
# Generate response
|
|
response = await self.agents[character].react_to_situation(user_input)
|
|
return response
|
|
|
|
async def character_chat(self, character_1: Character, character_2: Character, context: str) -> str:
|
|
"""Make two characters interact with each other"""
|
|
interaction = await self.character_interaction(character_1, character_2, context)
|
|
|
|
result = f"\n💬 **{character_1}**: {interaction[character_1]}\n💬 **{character_2}**: {interaction[character_2]}\n"
|
|
return result
|
|
|
|
async def advance_time(self, hours: int = 1):
|
|
"""Advance scene time and trigger agent planning"""
|
|
self.global_time += timedelta(hours=hours)
|
|
self.scene_state["time"] = self.global_time.strftime("%I:%M %p")
|
|
|
|
# Each agent plans their next actions
|
|
for character, agent in self.agents.items():
|
|
await agent.perceive(f"Time is now {self.scene_state['time']}")
|
|
|
|
def get_character_memories(self, character: Character, memory_type: Literal['all', 'observation', 'reflection', 'plan'] = "all") -> List[Memory]:
|
|
return self.agents[character].memory_stream.memories
|
|
|
|
async def get_character_summary(self, character: Character) -> str:
|
|
"""Get AI-generated summary of character based on their memories"""
|
|
|
|
summary = await self.agents[character].get_summary()
|
|
|
|
return f"\n📝 Current summary of {character}:\n{summary}\n"
|
|
|
|
async def character_interaction(self, character_1: Character, character_2: Character, context: str) -> Dict[Character, str]:
|
|
"""Handle interaction between two characters"""
|
|
|
|
char1_agent = self.agents[character_1]
|
|
char2_agent = self.agents[character_2]
|
|
|
|
# Both characters observe the interaction context
|
|
await char1_agent.perceive(f"Interacting with {character_2}: {context}")
|
|
await char2_agent.perceive(f"Interacting with {character_1}: {context}")
|
|
|
|
# Generate responses
|
|
char1_response = await char1_agent.react_to_situation(f"You are talking with {character_2}. Context: {context}")
|
|
char2_response = await char2_agent.react_to_situation(f"{character_1} said: '{char1_response}'")
|
|
|
|
# Both remember the conversation
|
|
await char1_agent.perceive(f"Conversation with {character_2}: I said '{char1_response}', they replied '{char2_response}'")
|
|
await char2_agent.perceive(f"Conversation with {character_1}: They said '{char1_response}', I replied '{char2_response}'")
|
|
|
|
return {
|
|
character_1: char1_response,
|
|
character_2: char2_response
|
|
}
|