← Back to Home Page

Multi-Agent System (MAS) Template Code

Check MAS Collection if you are looking for discovered MAS.


    import copy
    import json
    import os
    import random
    from collections import namedtuple
    from concurrent.futures import ThreadPoolExecutor
    import numpy as np
    from tqdm import tqdm
    import re
    from typing import Any
    from datasets import load_dataset
    from utils import random_id
    
    
    Info = namedtuple('Info', ['name', 'author', 'content', 'prompt', 'sub_tasks', 'agents', 'iteration_idx'])
    
    ROLE_DESC = lambda role: f"You are a {role}."
    
    class LLMAgentBase():
        """
        Define the LLM Agent Class. It will construct the prompt and call the LLM.
        """
    
        def __init__(self, output_fields: list, agent_name: str,
                     role='helpful assistant', model=None, temperature=None) -> None:
            self.output_fields = output_fields
            self.agent_name = agent_name
    
            self.role = role
            self.model = model
            self.temperature = temperature
            # give each instance a unique id
            self.id = random_id()
            
        def extract_pattern(self, prompt):
            # pattern = r"\s*(.*?)\s*\n\nRelated original question"
            pattern = r"Given the above, answer the following question: \s*(.*?)\s*\n\n"
    
            sub_question = prompt[-1]['content']
            match = re.search(pattern, sub_question, re.DOTALL)
            extracted_question = match.group(1)
    
            return extracted_question
    
        def generate_prompt(self, input_infos, instruction, is_sub_task=False) -> str:
    
            if global_format_choice == 'json':
                output_fields_and_description = {key: f"Your {key}." if not 'answer' in key else f"Your {key}. {global_output_description}" for key in self.output_fields}
            elif global_format_choice == 'xml':
                output_fields_and_description = '\n'.join([f"<{key}> [Your {key}.] " if not 'answer' in key else f"<{key}> [Your {key}. {global_output_description}] \n" for key in self.output_fields])
            else:
                raise NotImplementedError
    
            system_prompt = ROLE_DESC(self.role) + "\n\n" + global_FORMAT_INST(output_fields_and_description)
              
    
            # construct input infos text
            input_infos_text = ''
            prev_extracted_question = ''
            for input_info in input_infos:
                if isinstance(input_info, Info):
                    (field_name, author, content, prompt, _, _, iteration_idx) = input_info
                else:
                    continue
                if author == self.__repr__():
                    author += ' (yourself)'
                if field_name == 'task':
                    if is_sub_task: 
                        input_infos_text += f'Related original question:\n\n{content}. \n\nRelated sub-task questions and answers:\n\n'
                    else:
                        input_infos_text += f'{content}\n\n'
                elif iteration_idx != -1:
                    if is_sub_task and prompt is not None: 
                        extracted_question = self.extract_pattern(prompt)
                        if extracted_question != prev_extracted_question:
                            input_infos_text += f'### {extracted_question} \n\n ### {field_name} #{iteration_idx + 1} by {author}:\n{content}\n\n'
                            prev_extracted_question = extracted_question
                        else:
                            input_infos_text += f'### {field_name} #{iteration_idx + 1} by {author}:\n{content}\n\n'
    
                    else:
                        input_infos_text += f'### {field_name} #{iteration_idx + 1} by {author}:\n{content}\n\n'
                else:
                    if is_sub_task and prompt is not None: 
                        extracted_question = self.extract_pattern(prompt)
                        if extracted_question != prev_extracted_question:
                            input_infos_text += f'### {extracted_question} \n\n ### {field_name} by {author}:\n{content}\n\n'
                            prev_extracted_question = extracted_question # we do not want to duplicate the prompt
                        else:
                            input_infos_text += f'### {field_name} by {author}:\n{content}\n\n'
                    else:
                        input_infos_text += f'### {field_name} by {author}:\n{content}\n\n'
    
            if is_sub_task: 
    
                if global_format_choice == 'json':
                    prompt = input_infos_text + f'Given the above, answer the following question: {instruction}\n\nIf the question is too complicated or informaion is missing, you still need to give your best answer but add (1) an additional mark [TOO_HARD] in the next line of your final answer (2) information request or decomposison suggestion in the next line of the [TOO_HARD] mark, in the "answer" entry (for example, 300\n[TOO_HARD]\nSuggestion:...) and justify why you think so in the "thinking" entry'
    
                elif global_format_choice == 'xml':
                    prompt = input_infos_text + f"""Given the above, answer the following question: {instruction}\n\n 
                    
                    If the question is too complicated or informaion is missing, you still need to give your best guess but add (1) an additional mark [TOO_HARD] in the next line of your final answer (2) information request or decomposison suggestion in the next line of the [TOO_HARD] mark, in the "answer" entry. In the "thinking", justify why you think so. Following the format below:
                    
                    "answer" entry: [Your best guess, e.g., 300]\n[TOO_HARD]\nSuggestion: [your suggestion]
                    "thinking" entry:  [why you thinking is is too complicated or missing information. How to you arrive your best guess regardless]
    
                    Otherwise, give your answer and thinking normally.
    
                    "answer" entry: [your answer]
                    "thinking" entry: [How do you arrive your answer]
    
                    IMPORTANT: You need to give your best guess in both cases. Do not give [TOO_HARD] directly but always give your best guess first
    
                    """
                else:
                    raise NotImplementedError
    
    
            else:
                prompt = input_infos_text + instruction
            return system_prompt, prompt
    
        def query(self, input_infos: list, instruction, iteration_idx=-1, is_sub_task=False) -> dict:
            
    
            system_prompt, prompt = self.generate_prompt(input_infos, instruction, is_sub_task=is_sub_task)
    
            prompt = [
                _pack_message(content=system_prompt, role="system"),
                _pack_message(content=prompt, role="user")]
            # use system prompt
    
            response_json = get_json_response_from_gpt(prompt, self.model, self.output_fields, self.temperature)
    
            output_infos = []
            for key, value in response_json.items():
                info = Info(key, self.__repr__(), value, prompt, None, None, iteration_idx)
                output_infos.append(info)
            return output_infos
    
        def __repr__(self):
            return f"{self.agent_name} {self.id}"
    
        def __call__(self, input_infos: list, instruction, iteration_idx=-1, is_sub_task=False):
            return self.query(input_infos, instruction, iteration_idx=iteration_idx,  is_sub_task=is_sub_task)
    
    
    
    
    class AgentSystem():
        """
        Define the Agent System Class. It will call the LLM Agent Class following the MAS (the forward function).
        """
        def __init__(self) -> None:
            pass
    
        def make_final_answer(self, thinking, answer, sub_tasks=None, agents=None):
    
            name = thinking.name
            author = thinking.author
            prompt = thinking.prompt
            iteration_idx = thinking.iteration_idx
    
            if type(answer) == str:
                answer_content = answer
            else:
                answer_content = answer.content
    
            if agents is None: # this means sub_task is None, according to the propose prompt
                sub_tasks, agents = agents, sub_tasks
    
            if sub_tasks is None and agents is None:
                final_answer = Info(name, author, f'{thinking.content}\n\nAnswer:{answer_content}', prompt, None, None, iteration_idx)
            elif agents is not None:
                final_answer = Info(name, author, f'{thinking.content}\n\nAnswer:{answer_content}', prompt, None, '\n'.join(agents), iteration_idx)
            else:
                final_answer = Info(name, author, f'{thinking.content}\n\nAnswer:{answer_content}', prompt, '\n'.join(sub_tasks), '\n'.join(agents), iteration_idx)
            return final_answer
    
        def forward(self, task_queue):
            '''
            TODO: 
            (1) Insert your MAS code here. 
            (2) Set your preferred global variables.
            (3) Implement the call LLM function (get_json_response_from_gpt())
            '''
    
    
    if __name__ == "__main__":
    
        agentSystem = AgentSystem()
    
    
        with ThreadPoolExecutor(max_workers=global_max_workers) as executor:
            results = list(tqdm(executor.map(agentSystem.forward, global_task_queue), total=len(global_task_queue)))
    
    
        return results