From 78911cf268452df1804355bc5138743ac973a13c Mon Sep 17 00:00:00 2001 From: David Grieve Date: Tue, 6 Aug 2024 16:51:53 -0400 Subject: [PATCH 1/7] make ChatHistory thread safe --- .../chatcompletion/OpenAIChatCompletion.java | 27 +++++++++++-------- .../OpenAIChatMessageContent.java | 2 +- .../services/chatcompletion/ChatHistory.java | 16 ++++++----- 3 files changed, 26 insertions(+), 19 deletions(-) diff --git a/aiservices/openai/src/main/java/com/microsoft/semantickernel/aiservices/openai/chatcompletion/OpenAIChatCompletion.java b/aiservices/openai/src/main/java/com/microsoft/semantickernel/aiservices/openai/chatcompletion/OpenAIChatCompletion.java index 6bdb4f1c..f4a57f00 100644 --- a/aiservices/openai/src/main/java/com/microsoft/semantickernel/aiservices/openai/chatcompletion/OpenAIChatCompletion.java +++ b/aiservices/openai/src/main/java/com/microsoft/semantickernel/aiservices/openai/chatcompletion/OpenAIChatCompletion.java @@ -183,7 +183,7 @@ private static class ChatMessages { private final List newMessages; private final List allMessages; - private final List newChatMessageContent; + private final List> newChatMessageContent; public ChatMessages(List allMessages) { this.allMessages = Collections.unmodifiableList(allMessages); @@ -194,7 +194,7 @@ public ChatMessages(List allMessages) { private ChatMessages( List allMessages, List newMessages, - List newChatMessageContent) { + List> newChatMessageContent) { this.allMessages = Collections.unmodifiableList(allMessages); this.newMessages = Collections.unmodifiableList(newMessages); this.newChatMessageContent = Collections.unmodifiableList(newChatMessageContent); @@ -218,8 +218,8 @@ public ChatMessages add(ChatRequestMessage requestMessage) { } @CheckReturnValue - public ChatMessages addChatMessage(List chatMessageContent) { - ArrayList tmpChatMessageContent = new ArrayList<>( + public ChatMessages addChatMessage(List> chatMessageContent) { + ArrayList> tmpChatMessageContent = new ArrayList<>( newChatMessageContent); tmpChatMessageContent.addAll(chatMessageContent); @@ -580,7 +580,7 @@ private OpenAIFunctionToolCall extractOpenAIFunctionToolCall( arguments); } - private Mono> getChatMessageContentsAsync( + private Mono>> getChatMessageContentsAsync( ChatCompletions completions) { FunctionResultMetadata completionMetadata = FunctionResultMetadata.build( completions.getId(), @@ -594,22 +594,27 @@ private Mono> getChatMessageContentsAsync( .filter(Objects::nonNull) .collect(Collectors.toList()); - return Flux.fromIterable(responseMessages) - .flatMap(response -> { + List> chatMessageContent = + responseMessages + .stream() + .map(response -> { try { - return Mono.just(new OpenAIChatMessageContent( + return new OpenAIChatMessageContent<>( AuthorRole.ASSISTANT, response.getContent(), this.getModelId(), null, null, completionMetadata, - formOpenAiToolCalls(response))); + formOpenAiToolCalls(response)); } catch (Exception e) { - return Mono.error(e); + return null; } }) - .collectList(); + .filter(Objects::nonNull) + .collect(Collectors.toList()); + + return Mono.just(chatMessageContent); } private List> toOpenAIChatMessageContent( diff --git a/aiservices/openai/src/main/java/com/microsoft/semantickernel/aiservices/openai/chatcompletion/OpenAIChatMessageContent.java b/aiservices/openai/src/main/java/com/microsoft/semantickernel/aiservices/openai/chatcompletion/OpenAIChatMessageContent.java index f2cbf858..89f45014 100644 --- a/aiservices/openai/src/main/java/com/microsoft/semantickernel/aiservices/openai/chatcompletion/OpenAIChatMessageContent.java +++ b/aiservices/openai/src/main/java/com/microsoft/semantickernel/aiservices/openai/chatcompletion/OpenAIChatMessageContent.java @@ -36,7 +36,7 @@ public OpenAIChatMessageContent( @Nullable String modelId, @Nullable T innerContent, @Nullable Charset encoding, - @Nullable FunctionResultMetadata metadata, + @Nullable FunctionResultMetadata metadata, @Nullable List toolCall) { super(authorRole, content, modelId, innerContent, encoding, metadata); diff --git a/semantickernel-api/src/main/java/com/microsoft/semantickernel/services/chatcompletion/ChatHistory.java b/semantickernel-api/src/main/java/com/microsoft/semantickernel/services/chatcompletion/ChatHistory.java index d2f391ff..170aac40 100644 --- a/semantickernel-api/src/main/java/com/microsoft/semantickernel/services/chatcompletion/ChatHistory.java +++ b/semantickernel-api/src/main/java/com/microsoft/semantickernel/services/chatcompletion/ChatHistory.java @@ -5,11 +5,13 @@ import com.microsoft.semantickernel.services.chatcompletion.message.ChatMessageTextContent; import java.nio.charset.Charset; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Optional; import java.util.Spliterator; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.function.Consumer; import javax.annotation.Nullable; @@ -18,7 +20,7 @@ */ public class ChatHistory implements Iterable> { - private final List> chatMessageContents; + private final Collection> chatMessageContents; /** * The default constructor @@ -33,7 +35,7 @@ public ChatHistory() { * @param instructions The instructions to add to the chat history */ public ChatHistory(@Nullable String instructions) { - this.chatMessageContents = new ArrayList<>(); + this.chatMessageContents = new ConcurrentLinkedQueue<>(); if (instructions != null) { this.chatMessageContents.add( ChatMessageTextContent.systemMessage(instructions)); @@ -45,8 +47,8 @@ public ChatHistory(@Nullable String instructions) { * * @param chatMessageContents The chat message contents to add to the chat history */ - public ChatHistory(List chatMessageContents) { - this.chatMessageContents = new ArrayList(chatMessageContents); + public ChatHistory(List> chatMessageContents) { + this.chatMessageContents = new ConcurrentLinkedQueue<>(chatMessageContents); } /** @@ -55,7 +57,7 @@ public ChatHistory(List chatMessageContents) { * @return List of messages in the chat */ public List> getMessages() { - return Collections.unmodifiableList(chatMessageContents); + return Collections.unmodifiableList(new ArrayList<>(chatMessageContents)); } /** @@ -67,7 +69,7 @@ public Optional> getLastMessage() { if (chatMessageContents.isEmpty()) { return Optional.empty(); } - return Optional.of(chatMessageContents.get(chatMessageContents.size() - 1)); + return Optional.of(((ConcurrentLinkedQueue>)chatMessageContents).peek()); } /** @@ -114,7 +116,7 @@ public Spliterator> spliterator() { * @param metadata The metadata of the message */ public void addMessage(AuthorRole authorRole, String content, Charset encoding, - FunctionResultMetadata metadata) { + FunctionResultMetadata metadata) { chatMessageContents.add( ChatMessageTextContent.builder() .withAuthorRole(authorRole) From 1257fee6250582d36aa48e3d24ac30a6042bf10d Mon Sep 17 00:00:00 2001 From: David Grieve Date: Wed, 7 Aug 2024 11:09:51 -0400 Subject: [PATCH 2/7] Log message if formOpenAiToolCalls throws exception --- .../openai/chatcompletion/OpenAIChatCompletion.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/aiservices/openai/src/main/java/com/microsoft/semantickernel/aiservices/openai/chatcompletion/OpenAIChatCompletion.java b/aiservices/openai/src/main/java/com/microsoft/semantickernel/aiservices/openai/chatcompletion/OpenAIChatCompletion.java index 4bd424dd..c02c8ef0 100644 --- a/aiservices/openai/src/main/java/com/microsoft/semantickernel/aiservices/openai/chatcompletion/OpenAIChatCompletion.java +++ b/aiservices/openai/src/main/java/com/microsoft/semantickernel/aiservices/openai/chatcompletion/OpenAIChatCompletion.java @@ -619,7 +619,8 @@ private Mono>> getChatMessageContentsAsync( null, completionMetadata, formOpenAiToolCalls(response)); - } catch (Exception e) { + } catch (SKCheckedException e) { + LOGGER.warn("Failed to form chat message content", e); return null; } }) @@ -936,7 +937,7 @@ private static boolean hasToolCallBeenExecuted(List chatRequ } private static List getChatRequestMessages( - List messages) { + List> messages) { if (messages == null || messages.isEmpty()) { return new ArrayList<>(); } From 8d8d5df3b3de90571defe0ec8661ce3206278a62 Mon Sep 17 00:00:00 2001 From: David Grieve Date: Wed, 7 Aug 2024 12:24:46 -0400 Subject: [PATCH 3/7] Remove Mono from private getChatMessageContentsAsync --- .../chatcompletion/OpenAIChatCompletion.java | 22 +++++++++---------- 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/aiservices/openai/src/main/java/com/microsoft/semantickernel/aiservices/openai/chatcompletion/OpenAIChatCompletion.java b/aiservices/openai/src/main/java/com/microsoft/semantickernel/aiservices/openai/chatcompletion/OpenAIChatCompletion.java index 4bd424dd..ca8eb5b3 100644 --- a/aiservices/openai/src/main/java/com/microsoft/semantickernel/aiservices/openai/chatcompletion/OpenAIChatCompletion.java +++ b/aiservices/openai/src/main/java/com/microsoft/semantickernel/aiservices/openai/chatcompletion/OpenAIChatCompletion.java @@ -357,19 +357,16 @@ private Mono internalChatMessageContentsAsync( // If we don't want to attempt to invoke any functions // Or if we are auto-invoking, but we somehow end up with other than 1 choice even though only 1 was requested if (autoInvokeAttempts == 0 || responseMessages.size() != 1) { - return getChatMessageContentsAsync(completions) - .flatMap(m -> { - return Mono.just(messages.addChatMessage(m)); - }); + List> chatMessageContents = getChatMessageContentsAsync(completions); + return Mono.just(messages.addChatMessage(chatMessageContents)); } // Or if there are no tool calls to be done ChatResponseMessage response = responseMessages.get(0); List toolCalls = response.getToolCalls(); if (toolCalls == null || toolCalls.isEmpty()) { - return getChatMessageContentsAsync(completions) - .flatMap(m -> { - return Mono.just(messages.addChatMessage(m)); - }); + List> chatMessageContents = getChatMessageContentsAsync( + completions); + return Mono.just(messages.addChatMessage(chatMessageContents)); } ChatRequestAssistantMessage requestMessage = new ChatRequestAssistantMessage( @@ -592,7 +589,7 @@ private OpenAIFunctionToolCall extractOpenAIFunctionToolCall( arguments); } - private Mono>> getChatMessageContentsAsync( + private List> getChatMessageContentsAsync( ChatCompletions completions) { FunctionResultMetadata completionMetadata = FunctionResultMetadata.build( completions.getId(), @@ -619,14 +616,15 @@ private Mono>> getChatMessageContentsAsync( null, completionMetadata, formOpenAiToolCalls(response)); - } catch (Exception e) { + } catch (SKCheckedException e) { + LOGGER.warn("Failed to form chat message content", e); return null; } }) .filter(Objects::nonNull) .collect(Collectors.toList()); - return Mono.just(chatMessageContent); + return chatMessageContent; } private List> toOpenAIChatMessageContent( @@ -936,7 +934,7 @@ private static boolean hasToolCallBeenExecuted(List chatRequ } private static List getChatRequestMessages( - List messages) { + List> messages) { if (messages == null || messages.isEmpty()) { return new ArrayList<>(); } From 1c4aff3e67cb13b3cee243ecc1c237f14369b96f Mon Sep 17 00:00:00 2001 From: David Grieve Date: Wed, 7 Aug 2024 12:58:30 -0400 Subject: [PATCH 4/7] Initial Agent API --- .../semantickernel/agents/Agent.java | 124 ++++++++++++++++++ .../semantickernel/agents/AgentChannel.java | 37 ++++++ .../semantickernel/agents/AgentChat.java | 93 +++++++++++++ .../agents/ChatHistoryChannel.java | 59 +++++++++ .../agents/ChatHistoryHandler.java | 22 ++++ .../agents/ChatHistoryKernelAgent.java | 66 ++++++++++ .../semantickernel/agents/KernelAgent.java | 66 ++++++++++ 7 files changed, 467 insertions(+) create mode 100644 semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/Agent.java create mode 100644 semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/AgentChannel.java create mode 100644 semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/AgentChat.java create mode 100644 semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/ChatHistoryChannel.java create mode 100644 semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/ChatHistoryHandler.java create mode 100644 semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/ChatHistoryKernelAgent.java create mode 100644 semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/KernelAgent.java diff --git a/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/Agent.java b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/Agent.java new file mode 100644 index 00000000..08e6eeae --- /dev/null +++ b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/Agent.java @@ -0,0 +1,124 @@ +// Copyright (c) Microsoft. All rights reserved. +package com.microsoft.semantickernel.agents; + +import java.util.List; + +import javax.annotation.Nullable; + +import reactor.core.publisher.Mono; + +/** + * Base abstraction for all Semantic Kernel agents. An agent instance + * may participate in one or more conversations, or {@link AgentChat}. + * A conversation may include one or more agents. + * + * In addition to identity and descriptive meta-data, an {@link Agent} + * must define its communication protocol, or {@link AgentChannel}. + * + * @param The type of {@code AgentChannel} associated with the agent. + */ +public abstract class Agent>> { + + /** + * The description of the agent (optional) + */ + private final String description; + + /** + * The identifier of the agent (optional). + * Default to a random guid value, but may be overridden. + */ + private final String id; + + /** + * The name of the agent (optional) + */ + private final String name; + + /** + * Construct a new {@link Agent} instance. + * @param id The identifier of the agent. + * @param name The name of the agent. + * @param description The description of the agent. + */ + protected Agent( + @Nullable String id, + @Nullable String name, + @Nullable String description) { + this.id = id; + this.name = name; + this.description = description; + } + + /** + * Get the description of the agent. + * @return The description of the agent. + */ + public String getDescription() { + return description; + } + + /** + * Get the identifier of the agent. + * @return The identifier of the agent. + */ + public String getId() { + return id; + } + + /** + * Get the name of the agent. + * @return The name of the agent. + */ + public String getName() { + return name; + } + + /** + * Set of keys to establish channel affinity. + * Two specific agents of the same type may each require their own channel. This is + * why the channel type alone is insufficient. + * For example, two OpenAI Assistant agents each targeting a different Azure OpenAI endpoint + * would require their own channel. In this case, the endpoint could be expressed as an additional key. + */ + protected abstract List getChannelKeys(); + + /** + * Produce the an {@link AgentChannel} appropriate for the agent type. + * Every agent conversation, or {@link AgentChat}, will establish one or more + * {@link AgentChannel} objects according to the specific {@link Agent} type. + * + * @return An {@link AgentChannel} appropriate for the agent type. + */ + protected abstract Mono createChannelAsync(); + + /** + * Base class for agent builders. + */ + public abstract static class Builder> { + + protected String id; + protected String name; + protected String description; + + public Builder withId(String id) { + this.id = id; + return this; + } + + public Builder withName(String name) { + this.name = name; + return this; + } + + public Builder withDescription(String description) { + this.description = description; + return this; + } + + public abstract TAgent build(); + + protected Builder() { + } + } +} diff --git a/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/AgentChannel.java b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/AgentChannel.java new file mode 100644 index 00000000..154bdd86 --- /dev/null +++ b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/AgentChannel.java @@ -0,0 +1,37 @@ +// Copyright (c) Microsoft. All rights reserved. +package com.microsoft.semantickernel.agents; + +import java.util.List; + +import com.microsoft.semantickernel.services.chatcompletion.ChatMessageContent; + +import reactor.core.publisher.Flux; + +/** + * Defines the communication protocol for a particular {@code Agent} type. + * An {@code Agent} provides its own {@code AgentChannel} via + * {@link Agent#createChannelAsync()}. + * @param The type of agent that this channel is associated with. + */ +public interface AgentChannel>> { + + + /** + * Receive the conversation messages. Used when joining a conversation and also during each agent interaction. + * @param history The chat history at the point the channel is created. + */ + void receiveAsync(List> history); + + /** + * Perform a discrete incremental interaction between a single Agent and AgentChat. + * @param agent The agent actively interacting with the chat. + * @return Asynchronous enumeration of messages. + */ + Flux> invokeAsync(TAgent agent); + + /** + * Retrieve the message history specific to this channel. + * @return Asynchronous enumeration of messages. + */ + Flux> getHistoryAsync(); +} diff --git a/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/AgentChat.java b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/AgentChat.java new file mode 100644 index 00000000..f09fef15 --- /dev/null +++ b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/AgentChat.java @@ -0,0 +1,93 @@ +// Copyright (c) Microsoft. All rights reserved. +package com.microsoft.semantickernel.agents; + +import java.util.List; +import com.microsoft.semantickernel.services.chatcompletion.ChatHistory; +import com.microsoft.semantickernel.services.chatcompletion.ChatMessageContent; + +import reactor.core.publisher.Flux; + +/** + * Point of interaction for one or more agents. + * + * An AgentChat instance does not support concurrent invocation and + * are synchronized using {@code java.util.concurrent.locks.Lock}. Any + * thread attempting to invoke a public method while another thread is + * holding the lock will block until the lock is released. + */ +public abstract class AgentChat { + + + protected AgentChat() { + } + + /** + * Exposes the internal history to subclasses. + */ + protected ChatHistory getHistory() { + throw new UnsupportedOperationException("Not implemented"); + } + + /** + * Process a series of interactions between the agents participating in this chat. + * + * @return Asynchronous enumeration of messages. + */ + public abstract List> invokeAsync(); + + /** + * Retrieve the chat history. + * + * @return The message history + */ + public Flux> getChatMessagesAsync() { + throw new UnsupportedOperationException("Not implemented"); + } + + /** + * Retrieve the message history, either the primary history or + * an agent specific version. + * + * @param agent An optional agent, if requesting an agent history. + * @return The message history + * + * Any AgentChat instance does not support concurrent invocation and + * will throw exception if concurrent activity is attempted. + */ + public Flux> getChatMessagesAsync(Agent agent) { + throw new UnsupportedOperationException("Not implemented"); + } + + /** + * Append a message to the conversation. Adding a message while an agent is active is not allowed. + * + * @param message A non-system message with which to append to the conversation. + * @throws KernelException if a system message is present, without taking any other action + * @throws KernelException chat has current activity. + */ + public void addChatMessage(ChatMessageContent message) { + throw new UnsupportedOperationException("Not implemented"); + } + + /** + * Append messages to the conversation. Adding messages while an agent is active is not allowed. + * + * @param messages Set of non-system messages with which to append to the conversation. + * @throws KernelException if a system message is present, without taking any other action + * @throws KernelException chat has current activity. + */ + public void addChatMessages(List> messages) { + throw new UnsupportedOperationException("Not implemented"); + } + + /** + * Process a discrete incremental interaction between a single Agent an a AgentChat. + * + * @param agent The agent actively interacting with the chat. + * @return Asynchronous enumeration of messages. + */ + protected Flux> invokeAgentAsync(Agent agent) { + throw new UnsupportedOperationException("Not implemented"); + } + +} diff --git a/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/ChatHistoryChannel.java b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/ChatHistoryChannel.java new file mode 100644 index 00000000..045fb506 --- /dev/null +++ b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/ChatHistoryChannel.java @@ -0,0 +1,59 @@ +// Copyright (c) Microsoft. All rights reserved. +package com.microsoft.semantickernel.agents; + +import java.util.List; +import com.microsoft.semantickernel.services.chatcompletion.ChatHistory; +import com.microsoft.semantickernel.services.chatcompletion.ChatMessageContent; + +import reactor.core.publisher.Flux; + +/** + * A {@link AgentChannel} specialization that acts upon a {@link ChatHistoryHandler}. + */ +public class ChatHistoryChannel implements AgentChannel { + + private final ChatHistory history; + + public ChatHistoryChannel(ChatHistory history) { + this.history = history; + } + + /** + * Invokes the channel asynchronously. + * + * @param agent The agent. + * @return An asynchronous stream of chat messages. + */ + @Override + public Flux> invokeAsync(ChatHistoryKernelAgent agent) { + return Flux.error(new UnsupportedOperationException("Not implemented")); + } + + /** + * Receives chat messages asynchronously. + * + * @param history The chat message history. + */ + @Override + public void receiveAsync(List> history) { + this.history.addAll(history); + } + + /** + * Gets the chat message history asynchronously. + * + * @return An asynchronous stream of chat messages. + */ + @Override + public Flux> getHistoryAsync() { + return Flux.fromIterable(this.history.getMessages()); + } + + /** + * Initializes a new instance of the {@link ChatHistoryChannel} class. + */ + public ChatHistoryChannel() { + this.history = new ChatHistory(); + } + +} diff --git a/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/ChatHistoryHandler.java b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/ChatHistoryHandler.java new file mode 100644 index 00000000..2d34ed34 --- /dev/null +++ b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/ChatHistoryHandler.java @@ -0,0 +1,22 @@ +// Copyright (c) Microsoft. All rights reserved. +package com.microsoft.semantickernel.agents; + +import com.microsoft.semantickernel.services.chatcompletion.ChatHistory; +import com.microsoft.semantickernel.services.chatcompletion.ChatMessageContent; + +import reactor.core.publisher.Flux; + +/** + * Contract for an agent that utilizes a {@link ChatHistoryChannel}. + */ +public interface ChatHistoryHandler extends AgentChannel { + + /** + * Entry point for calling into an agent from a {@link ChatHistoryChannel}. + * + * @param history The chat history at the point the channel is created. + * @return A {@link Publisher} of {@link ChatMessageContent}. + */ + Flux> invokeAsync(ChatHistory history); + +} diff --git a/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/ChatHistoryKernelAgent.java b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/ChatHistoryKernelAgent.java new file mode 100644 index 00000000..f29e2abd --- /dev/null +++ b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/ChatHistoryKernelAgent.java @@ -0,0 +1,66 @@ +// Copyright (c) Microsoft. All rights reserved. +package com.microsoft.semantickernel.agents; + +import java.util.Collections; +import java.util.List; + +import javax.annotation.Nullable; + +import com.microsoft.semantickernel.Kernel; +import com.microsoft.semantickernel.services.chatcompletion.ChatHistory; +import com.microsoft.semantickernel.services.chatcompletion.ChatMessageContent; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +/** + * A {@link KernelAgent} specialization bound to a {@link ChatHistoryChannel}. + */ +public abstract class ChatHistoryKernelAgent extends KernelAgent implements ChatHistoryHandler { + + /** + * Construct a new {@link ChatHistoryKernelAgent} instance. + * + * @param id The identifier of the agent. + * @param name The name of the agent. + * @param description The description of the agent. + * @param instructions The instructions for the agent. + * @param kernel The kernel. + */ + public ChatHistoryKernelAgent( + @Nullable String id, + @Nullable String name, + @Nullable String description, + @Nullable String instructions, + @Nullable Kernel kernel) { + super(id, name, description, instructions, kernel); + } + + @Override + public List getChannelKeys() { + return Collections.singletonList(ChatHistoryChannel.class.getName()); + } + + + @Override + public Mono createChannelAsync() { + return Mono.just(new ChatHistoryChannel()); + } + + /** + * Invokes asynchronously. + * + * @param history The chat history. + * @return An asynchronous sequence of {@link ChatMessageContent}. + */ + public abstract Flux> invokeAsync(ChatHistory history); + + public static abstract class Builder extends KernelAgent.Builder { + // No additional properties. + + protected Builder() { + super(); + } + } + +} diff --git a/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/KernelAgent.java b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/KernelAgent.java new file mode 100644 index 00000000..04391acc --- /dev/null +++ b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/KernelAgent.java @@ -0,0 +1,66 @@ +// Copyright (c) Microsoft. All rights reserved. +package com.microsoft.semantickernel.agents; + +import javax.annotation.Nullable; + +import com.microsoft.semantickernel.Kernel; + +/** + * Base class for agents utilizing {@link Microsoft.SemanticKernel.Kernel} plugins or services. + * @param The type of {@code AgentChannel} associated with the agent. + */ +public abstract class KernelAgent>> extends Agent { + + private final String instructions; + private final Kernel kernel; + + protected KernelAgent( + @Nullable String id, + @Nullable String name, + @Nullable String description, + @Nullable String instructions, + Kernel kernel) { + super(id, name, description); + this.instructions = instructions; + this.kernel = kernel; + } + + /** + * The instructions of the agent (optional). + */ + public String getInstructions() { + return instructions; + } + + /** + * The {@link Kernel} containing services, plugins, and filters for use throughout the agent lifetime. + * Defaults to empty Kernel, but may be overridden. + */ + public Kernel getKernel() { + return kernel; + + } + + /** + * Builder for {@link KernelAgent} instances. + */ + public abstract static class Builder extends Agent.Builder> { + + protected String instructions; + protected Kernel kernel; + + public Builder withInstructions(String instructions) { + this.instructions = instructions; + return this; + } + + public Builder withKernel(Kernel kernel) { + this.kernel = kernel; + return this; + } + + protected Builder() { + super(); + } + } +} From d989163eb7ab299910d14b361ca2366ff5b39723 Mon Sep 17 00:00:00 2001 From: David Grieve Date: Wed, 7 Aug 2024 13:24:23 -0400 Subject: [PATCH 5/7] replace Flux with Mono --- .../microsoft/semantickernel/agents/AgentChannel.java | 6 +++--- .../microsoft/semantickernel/agents/AgentChat.java | 9 +++++---- .../semantickernel/agents/ChatHistoryChannel.java | 11 ++++++----- .../semantickernel/agents/ChatHistoryHandler.java | 8 ++++++-- .../semantickernel/agents/ChatHistoryKernelAgent.java | 3 +-- 5 files changed, 21 insertions(+), 16 deletions(-) diff --git a/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/AgentChannel.java b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/AgentChannel.java index 154bdd86..d00718ef 100644 --- a/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/AgentChannel.java +++ b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/AgentChannel.java @@ -5,7 +5,7 @@ import com.microsoft.semantickernel.services.chatcompletion.ChatMessageContent; -import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; /** * Defines the communication protocol for a particular {@code Agent} type. @@ -27,11 +27,11 @@ public interface AgentChannel> invokeAsync(TAgent agent); + Mono>> invokeAsync(TAgent agent); /** * Retrieve the message history specific to this channel. * @return Asynchronous enumeration of messages. */ - Flux> getHistoryAsync(); + Mono>> getHistoryAsync(); } diff --git a/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/AgentChat.java b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/AgentChat.java index f09fef15..e30ff260 100644 --- a/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/AgentChat.java +++ b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/AgentChat.java @@ -2,10 +2,11 @@ package com.microsoft.semantickernel.agents; import java.util.List; + import com.microsoft.semantickernel.services.chatcompletion.ChatHistory; import com.microsoft.semantickernel.services.chatcompletion.ChatMessageContent; -import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; /** * Point of interaction for one or more agents. @@ -40,7 +41,7 @@ protected ChatHistory getHistory() { * * @return The message history */ - public Flux> getChatMessagesAsync() { + public Mono>> getChatMessagesAsync() { throw new UnsupportedOperationException("Not implemented"); } @@ -54,7 +55,7 @@ public Flux> getChatMessagesAsync() { * Any AgentChat instance does not support concurrent invocation and * will throw exception if concurrent activity is attempted. */ - public Flux> getChatMessagesAsync(Agent agent) { + public Mono>> getChatMessagesAsync(Agent agent) { throw new UnsupportedOperationException("Not implemented"); } @@ -86,7 +87,7 @@ public void addChatMessages(List> messages) { * @param agent The agent actively interacting with the chat. * @return Asynchronous enumeration of messages. */ - protected Flux> invokeAgentAsync(Agent agent) { + protected Mono>> invokeAgentAsync(Agent agent) { throw new UnsupportedOperationException("Not implemented"); } diff --git a/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/ChatHistoryChannel.java b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/ChatHistoryChannel.java index 045fb506..6203a028 100644 --- a/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/ChatHistoryChannel.java +++ b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/ChatHistoryChannel.java @@ -2,10 +2,11 @@ package com.microsoft.semantickernel.agents; import java.util.List; + import com.microsoft.semantickernel.services.chatcompletion.ChatHistory; import com.microsoft.semantickernel.services.chatcompletion.ChatMessageContent; -import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; /** * A {@link AgentChannel} specialization that acts upon a {@link ChatHistoryHandler}. @@ -25,8 +26,8 @@ public ChatHistoryChannel(ChatHistory history) { * @return An asynchronous stream of chat messages. */ @Override - public Flux> invokeAsync(ChatHistoryKernelAgent agent) { - return Flux.error(new UnsupportedOperationException("Not implemented")); + public Mono>> invokeAsync(ChatHistoryKernelAgent agent) { + return Mono.error(new UnsupportedOperationException("Not implemented")); } /** @@ -45,8 +46,8 @@ public void receiveAsync(List> history) { * @return An asynchronous stream of chat messages. */ @Override - public Flux> getHistoryAsync() { - return Flux.fromIterable(this.history.getMessages()); + public Mono>> getHistoryAsync() { + return Mono.just(this.history.getMessages()); } /** diff --git a/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/ChatHistoryHandler.java b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/ChatHistoryHandler.java index 2d34ed34..00cf63d1 100644 --- a/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/ChatHistoryHandler.java +++ b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/ChatHistoryHandler.java @@ -1,10 +1,14 @@ // Copyright (c) Microsoft. All rights reserved. package com.microsoft.semantickernel.agents; +import java.util.List; + +import org.reactivestreams.Publisher; + import com.microsoft.semantickernel.services.chatcompletion.ChatHistory; import com.microsoft.semantickernel.services.chatcompletion.ChatMessageContent; -import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; /** * Contract for an agent that utilizes a {@link ChatHistoryChannel}. @@ -17,6 +21,6 @@ public interface ChatHistoryHandler extends AgentChannel * @param history The chat history at the point the channel is created. * @return A {@link Publisher} of {@link ChatMessageContent}. */ - Flux> invokeAsync(ChatHistory history); + Mono>> invokeAsync(ChatHistory history); } diff --git a/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/ChatHistoryKernelAgent.java b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/ChatHistoryKernelAgent.java index f29e2abd..5feaba5d 100644 --- a/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/ChatHistoryKernelAgent.java +++ b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/ChatHistoryKernelAgent.java @@ -10,7 +10,6 @@ import com.microsoft.semantickernel.services.chatcompletion.ChatHistory; import com.microsoft.semantickernel.services.chatcompletion.ChatMessageContent; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** @@ -53,7 +52,7 @@ public Mono createChannelAsync() { * @param history The chat history. * @return An asynchronous sequence of {@link ChatMessageContent}. */ - public abstract Flux> invokeAsync(ChatHistory history); + public abstract Mono>> invokeAsync(ChatHistory history); public static abstract class Builder extends KernelAgent.Builder { // No additional properties. From f592d468a5b1147ae9bfe7fc790c5090e4335850 Mon Sep 17 00:00:00 2001 From: David Grieve Date: Fri, 9 Aug 2024 14:46:33 -0400 Subject: [PATCH 6/7] wip: method body implementation --- .../semantickernel/agents/Agent.java | 14 +- .../semantickernel/agents/AgentChannel.java | 10 +- .../semantickernel/agents/AgentChat.java | 147 ++++++++-- .../agents/ChatHistoryChannel.java | 8 +- .../agents/ChatHistoryHandler.java | 2 +- .../agents/ChatHistoryKernelAgent.java | 4 +- .../semantickernel/agents/KernelAgent.java | 6 +- .../agents/internal/BroadcastQueue.java | 250 ++++++++++++++++++ .../agents/internal/ChannelReference.java | 34 +++ .../agents/internal/KeyEncoder.java | 35 +++ 10 files changed, 471 insertions(+), 39 deletions(-) create mode 100644 semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/internal/BroadcastQueue.java create mode 100644 semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/internal/ChannelReference.java create mode 100644 semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/internal/KeyEncoder.java diff --git a/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/Agent.java b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/Agent.java index 08e6eeae..d425fe00 100644 --- a/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/Agent.java +++ b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/Agent.java @@ -15,9 +15,9 @@ * In addition to identity and descriptive meta-data, an {@link Agent} * must define its communication protocol, or {@link AgentChannel}. * - * @param The type of {@code AgentChannel} associated with the agent. + * @param The type of {@code AgentChannel} associated with the agent. */ -public abstract class Agent>> { +public abstract class Agent { /** * The description of the agent (optional) @@ -90,28 +90,28 @@ public String getName() { * * @return An {@link AgentChannel} appropriate for the agent type. */ - protected abstract Mono createChannelAsync(); + protected abstract Mono createChannelAsync(); /** * Base class for agent builders. */ - public abstract static class Builder> { + public abstract static class Builder { protected String id; protected String name; protected String description; - public Builder withId(String id) { + public Builder withId(String id) { this.id = id; return this; } - public Builder withName(String name) { + public Builder withName(String name) { this.name = name; return this; } - public Builder withDescription(String description) { + public Builder withDescription(String description) { this.description = description; return this; } diff --git a/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/AgentChannel.java b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/AgentChannel.java index d00718ef..3c7b12ca 100644 --- a/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/AgentChannel.java +++ b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/AgentChannel.java @@ -11,23 +11,25 @@ * Defines the communication protocol for a particular {@code Agent} type. * An {@code Agent} provides its own {@code AgentChannel} via * {@link Agent#createChannelAsync()}. - * @param The type of agent that this channel is associated with. + * @param The type of agent that this channel is associated with. */ -public interface AgentChannel>> { +public interface AgentChannel { /** * Receive the conversation messages. Used when joining a conversation and also during each agent interaction. + * * @param history The chat history at the point the channel is created. + * @return A future task that completes when the conversation messages are received. */ - void receiveAsync(List> history); + Mono receiveAsync(List> history); /** * Perform a discrete incremental interaction between a single Agent and AgentChat. * @param agent The agent actively interacting with the chat. * @return Asynchronous enumeration of messages. */ - Mono>> invokeAsync(TAgent agent); + Mono>> invokeAsync(Agent agent); /** * Retrieve the message history specific to this channel. diff --git a/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/AgentChat.java b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/AgentChat.java index e30ff260..4b7a2181 100644 --- a/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/AgentChat.java +++ b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/AgentChat.java @@ -1,8 +1,21 @@ // Copyright (c) Microsoft. All rights reserved. package com.microsoft.semantickernel.agents; +import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; +import javax.annotation.Nonnull; + +import com.microsoft.semantickernel.agents.internal.BroadcastQueue; +import com.microsoft.semantickernel.agents.internal.ChannelReference; +import com.microsoft.semantickernel.agents.internal.KeyEncoder; +import com.microsoft.semantickernel.exceptions.SKException; +import com.microsoft.semantickernel.services.chatcompletion.AuthorRole; import com.microsoft.semantickernel.services.chatcompletion.ChatHistory; import com.microsoft.semantickernel.services.chatcompletion.ChatMessageContent; @@ -18,16 +31,13 @@ */ public abstract class AgentChat { - - protected AgentChat() { - } - - /** - * Exposes the internal history to subclasses. - */ - protected ChatHistory getHistory() { - throw new UnsupportedOperationException("Not implemented"); - } + private final BroadcastQueue broadcastQueue; + // Map channel hash to channel: one entry per channel. + private final Map agentChannels; + // Map agent to its channel-hash: one entry per agent. + private final Map channelMap; + private final ChatHistory chatHistory; + private final Lock lock; /** * Process a series of interactions between the agents participating in this chat. @@ -42,10 +52,10 @@ protected ChatHistory getHistory() { * @return The message history */ public Mono>> getChatMessagesAsync() { - throw new UnsupportedOperationException("Not implemented"); + return getChatMessagesAsync(null); } - /** + /** * Retrieve the message history, either the primary history or * an agent specific version. * @@ -55,8 +65,19 @@ public Mono>> getChatMessagesAsync() { * Any AgentChat instance does not support concurrent invocation and * will throw exception if concurrent activity is attempted. */ - public Mono>> getChatMessagesAsync(Agent agent) { - throw new UnsupportedOperationException("Not implemented"); + public Mono>> getChatMessagesAsync(Agent agent) { + lock.lock(); + try { + if (agent == null) { + return Mono.just(chatHistory.getMessages()); + } else { + String channelHash = getAgentHash(agent); + return synchronizeChannelAsync(channelHash) + .flatMap(channel -> channel.getHistoryAsync()); + } + } finally { + lock.unlock(); + } } /** @@ -67,7 +88,7 @@ public Mono>> getChatMessagesAsync(Agent agent) { * @throws KernelException chat has current activity. */ public void addChatMessage(ChatMessageContent message) { - throw new UnsupportedOperationException("Not implemented"); + addChatMessages(Arrays.asList(message)); } /** @@ -78,7 +99,47 @@ public void addChatMessage(ChatMessageContent message) { * @throws KernelException chat has current activity. */ public void addChatMessages(List> messages) { - throw new UnsupportedOperationException("Not implemented"); + lock.lock(); + try { + + if (messages.stream().anyMatch(it -> it.getAuthorRole() == AuthorRole.SYSTEM)) { + throw new SKException( + String.format("History does not support messages with Rople of %s", AuthorRole.SYSTEM)); + } + + // Append chat history + chatHistory.addAll(messages); + + // Broadcast message to other channels (in parallel) + // Note: Able to queue messages without synchronizing channels. + List channelRefs = + agentChannels.entrySet().stream() + .map(entry -> new ChannelReference(entry.getValue(), entry.getKey())) + .collect(Collectors.toList()); + + broadcastQueue.enqueue(channelRefs, messages); + + } finally { + lock.unlock(); + } + } + + /** + * Construct a new {@link AgentChat} instance. + */ + protected AgentChat() { + broadcastQueue = new BroadcastQueue(); + agentChannels = new HashMap<>(); + channelMap = new HashMap<>(); + chatHistory = new ChatHistory(); + lock = new ReentrantLock(); + } + + /** + * Exposes the internal history to subclasses. + */ + protected ChatHistory getHistory() { + return chatHistory; } /** @@ -87,8 +148,58 @@ public void addChatMessages(List> messages) { * @param agent The agent actively interacting with the chat. * @return Asynchronous enumeration of messages. */ - protected Mono>> invokeAgentAsync(Agent agent) { - throw new UnsupportedOperationException("Not implemented"); + protected Mono>> invokeAgentAsync(Agent agent) { + lock.lock(); + try { + return getOrCreateChannel(agent) + .flatMap(channel -> channel.invokeAsync(agent) + .doOnNext(messages -> { + chatHistory.addAll(messages); + List channelRefs = agentChannels.entrySet().stream() + .filter(entry -> !entry.getKey().equals(agent)) + .map(entry -> new ChannelReference(entry.getValue(), entry.getKey())) + .collect(Collectors.toList()); + broadcastQueue.enqueue(channelRefs, messages); + }) + ); + } finally { + lock.unlock(); + } + } + + private String getAgentHash(@Nonnull Agent agent) { + String hash = channelMap.computeIfAbsent(agent, key -> KeyEncoder.generateHash(key.getChannelKeys())); + return hash; } + private Mono synchronizeChannelAsync(String channelHash) { + AgentChannel channel = agentChannels.get(channelHash); + if (channel != null) { + return broadcastQueue.ensureSynchronizedAsync( + new ChannelReference(channel, channelHash) + ); + } + return Mono.empty(); + } + + private Mono getOrCreateChannel(Agent agent) { + String channelHash = getAgentHash(agent); + return synchronizeChannelAsync(channelHash) + .flatMap(channel -> { + if (channel == null) { + return agent.createChannelAsync() + .doOnNext(newChannel -> { + agentChannels.put(channelHash, newChannel); + }) + .flatMap(newChannel -> { + if (chatHistory.getMessages().size() > 0) { + return newChannel.receiveAsync(chatHistory.getMessages()); + } + return Mono.empty(); + }) + .then(Mono.just(agentChannels.get(channelHash))); + } + return Mono.just(channel); + }); + } } diff --git a/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/ChatHistoryChannel.java b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/ChatHistoryChannel.java index 6203a028..7237dd33 100644 --- a/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/ChatHistoryChannel.java +++ b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/ChatHistoryChannel.java @@ -11,7 +11,7 @@ /** * A {@link AgentChannel} specialization that acts upon a {@link ChatHistoryHandler}. */ -public class ChatHistoryChannel implements AgentChannel { +public class ChatHistoryChannel implements AgentChannel { private final ChatHistory history; @@ -26,7 +26,7 @@ public ChatHistoryChannel(ChatHistory history) { * @return An asynchronous stream of chat messages. */ @Override - public Mono>> invokeAsync(ChatHistoryKernelAgent agent) { + public Mono>> invokeAsync(Agent agent) { return Mono.error(new UnsupportedOperationException("Not implemented")); } @@ -36,8 +36,8 @@ public Mono>> invokeAsync(ChatHistoryKernelAgent agen * @param history The chat message history. */ @Override - public void receiveAsync(List> history) { - this.history.addAll(history); + public Mono receiveAsync(List> history) { + return null; } /** diff --git a/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/ChatHistoryHandler.java b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/ChatHistoryHandler.java index 00cf63d1..aaa22b73 100644 --- a/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/ChatHistoryHandler.java +++ b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/ChatHistoryHandler.java @@ -13,7 +13,7 @@ /** * Contract for an agent that utilizes a {@link ChatHistoryChannel}. */ -public interface ChatHistoryHandler extends AgentChannel { +public interface ChatHistoryHandler extends AgentChannel { /** * Entry point for calling into an agent from a {@link ChatHistoryChannel}. diff --git a/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/ChatHistoryKernelAgent.java b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/ChatHistoryKernelAgent.java index 5feaba5d..a8c5004b 100644 --- a/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/ChatHistoryKernelAgent.java +++ b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/ChatHistoryKernelAgent.java @@ -15,7 +15,7 @@ /** * A {@link KernelAgent} specialization bound to a {@link ChatHistoryChannel}. */ -public abstract class ChatHistoryKernelAgent extends KernelAgent implements ChatHistoryHandler { +public abstract class ChatHistoryKernelAgent extends KernelAgent implements ChatHistoryHandler { /** * Construct a new {@link ChatHistoryKernelAgent} instance. @@ -42,7 +42,7 @@ public List getChannelKeys() { @Override - public Mono createChannelAsync() { + public Mono createChannelAsync() { return Mono.just(new ChatHistoryChannel()); } diff --git a/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/KernelAgent.java b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/KernelAgent.java index 04391acc..23fe342b 100644 --- a/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/KernelAgent.java +++ b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/KernelAgent.java @@ -7,9 +7,9 @@ /** * Base class for agents utilizing {@link Microsoft.SemanticKernel.Kernel} plugins or services. - * @param The type of {@code AgentChannel} associated with the agent. + * @param The type of {@code AgentChannel} associated with the agent. */ -public abstract class KernelAgent>> extends Agent { +public abstract class KernelAgent extends Agent { private final String instructions; private final Kernel kernel; @@ -44,7 +44,7 @@ public Kernel getKernel() { /** * Builder for {@link KernelAgent} instances. */ - public abstract static class Builder extends Agent.Builder> { + public abstract static class Builder extends Agent.Builder { protected String instructions; protected Kernel kernel; diff --git a/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/internal/BroadcastQueue.java b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/internal/BroadcastQueue.java new file mode 100644 index 00000000..9d49447d --- /dev/null +++ b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/internal/BroadcastQueue.java @@ -0,0 +1,250 @@ +// Copyright (c) Microsoft. All rights reserved. +package com.microsoft.semantickernel.agents.internal; + +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.reactivestreams.Publisher; + +import com.microsoft.semantickernel.agents.Agent; +import com.microsoft.semantickernel.agents.AgentChannel; +import com.microsoft.semantickernel.exceptions.SKException; +import com.microsoft.semantickernel.services.chatcompletion.ChatMessageContent; + +import reactor.core.publisher.Mono; + +/** + * Utility class used by {@link AgentChat} to manage the broadcast of + * conversation messages via the {@link com.microsoft.semantickernel.agents.AgentChannel#receiveAsync}. + * Interaction occurs via two methods: + *
    + *
  • {@link BroadcastQueue#enqueue}: Adds messages to a channel specific queue for processing.
  • + *
  • {@link BroadcastQueue#ensureSynchronizedAsync}: Blocks until the specified channel's processing queue is empty.
  • + *
+ * Maintains a set of channel specific queues, each with individual locks. + * Queue specific locks exist to synchronize access to an individual queue only. + * Due to the closed "friend" relationship between with {@link AgentChat}, + * {@link BroadcastQueue} is never invoked concurrently, which eliminates + * race conditions over the queue dictionary. + */ +public class BroadcastQueue { + + /** + * The queue reference structure. + */ + private static class QueueReference { + + private final ConcurrentLinkedQueue>> queue = new ConcurrentLinkedQueue<>(); + private final Lock queueLock = new ReentrantLock(); + private FutureTask receiveTask; + + // Any failure that occured during execution of {@link #receiveTask}. + private Exception receiveFailure; + + /** + * Convenience logic + */ + private boolean isEmpty() { + return this.queue.isEmpty(); + } + + + private Lock getQueueLock() { + return queueLock; + } + + private Queue>> getQueue() { + return queue; + } + + /** + * Capture any failure that may occur during execution of {@link #receiveTask}. + */ + private void setReceiveFailure(Exception receiveFailure) { + this.receiveFailure = receiveFailure; + } + + private Exception getReceiveFailure() { + return receiveFailure; + } + + private FutureTask getReceiveTask() { + return receiveTask; + } + + private void setReceiveTask(FutureTask receiveTask) { + this.receiveTask = receiveTask; + } + } + + private final Map queues = new ConcurrentHashMap<>(); + + + // Defines the yield duration when waiting on a channel-queue to drain. + // TODO: This should be a configuration setting. See Duration#parse + private static final Duration blockDuration = Duration.ofMillis(100L); + + private final ExecutorService executorService = + Executors.newCachedThreadPool(runnable -> { + Thread thread = Executors.defaultThreadFactory().newThread(runnable); + thread.setDaemon(true); + return thread; + } + ); + + /** + * Enqueue a set of messages for a given channel. + * + * @param channelRefs The target channels for which to broadcast. + * @param messages The messages being broadcast. + */ + public void enqueue(List channelRefs, List> messages) { + for (ChannelReference channelRef : channelRefs) { + QueueReference queueRef = queues.computeIfAbsent(channelRef.getHash(), key -> new QueueReference()); + + queueRef.getQueueLock().lock(); + try { + queueRef.getQueue().add(messages); + if (queueRef.getReceiveTask() == null || queueRef.getReceiveTask().isDone()) { + queueRef.setReceiveTask(new FutureTask<>(receiveAsync(channelRef, queueRef), null)); + executorService.submit(queueRef.getReceiveTask()); + } + } finally { + queueRef.getQueueLock().unlock(); + } + } + + } + + /** + * Blocks until a channel-queue is not in a receive state to ensure that + * channel history is complete. + * + * @param channelRef A {@link ChannelReference} structure. + * @return false when channel is no longer receiving. + * @throws KernelException When channel is out of sync. + */ + public Mono ensureSynchronizedAsync(ChannelReference channelRef) { + // Either won race with Enqueue or lost race with ReceiveAsync. + // Missing queue is synchronized by definition. + QueueReference queueRef = queues.get(channelRef.getHash()); + if (queueRef == null) { + return Mono.just(channelRef.getChannel()); + } + + FutureTask receiveTask = queueRef.getReceiveTask(); + if (receiveTask == null || receiveTask.isDone()) { + return Mono.just(channelRef.getChannel()); + } + + return Mono.fromRunnable(() -> { + try { + receiveTask.get(blockDuration.toMillis(), TimeUnit.MILLISECONDS); + } catch (CancellationException | TimeoutException e) { + // TODO: Should log the TimeoutException + // Swallow the exception and move on. + // If a TimeoutException occurs, the queue is probably still processing. + // If a CancellationException occurs, the task was cancelled so there is no point in waiting. + } catch (InterruptedException e) { + // Propogate the interrupt + Thread.currentThread().interrupt(); + } catch (ExecutionException e) { + // Exception thrown by the receiveTask + queueRef.setReceiveFailure(e); + } + + // Propagate prior failure (inform caller of synchronization issue) + Exception failure = queueRef.getReceiveFailure(); + if (failure != null) { + queueRef.setReceiveFailure(null); + throw new SKException("Unexpected failure broadcasting to channel: " + channelRef.getChannel().getClass(), failure); + } + }) + .then(Mono.just(channelRef.getChannel())); + } + + /** + * Processes the specified queue with the provided channel, until queue is empty. + * @param channelRef the channel reference + * @param queueRef the queue reference + */ + private static Runnable receiveAsync(ChannelReference channelRef, QueueReference queueRef) + { + return () -> { + // Need to capture any failure that may occur during execution of receiveTask. + // It's an array to get around the final requirement for lambdas. + Exception[] failures = new Exception[1]; + + boolean isEmpty = true; // Default to fall-through state + + // This is a somewhat faithful translation of the .NET code. + do + { + failures[0] = null; + + Mono receiveTask; + + // Queue state is only changed within acquired QueueLock. + // If its empty here, it is synchronized. + queueRef.getQueueLock().lock(); + try { + isEmpty = queueRef.isEmpty(); + + // Process non empty queue + if (isEmpty) { + break; + } + + List> messages = queueRef.getQueue().peek(); + receiveTask = channelRef.getChannel().receiveAsync(messages); + } finally { + queueRef.getQueueLock().unlock(); + } + + // Queue not empty. + receiveTask.onErrorMap(e -> { + if (e instanceof Exception) { + failures[0] = (Exception)e; + } + return e; + }) + .block(); + + queueRef.getQueueLock().lock(); + try { + // Propagate failure or update queue + if (failures[0] != null) { + queueRef.setReceiveFailure(failures[0]); + break; // Failure on non-empty queue means, still not empty. + } + + // Queue has already been peeked. Remove head on success. + queueRef.getQueue().remove(); + + isEmpty = queueRef.isEmpty(); // Re-evaluate state + } finally { + queueRef.getQueueLock().unlock(); + } + } + while (!isEmpty); + }; + } + +} + diff --git a/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/internal/ChannelReference.java b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/internal/ChannelReference.java new file mode 100644 index 00000000..b0b5f573 --- /dev/null +++ b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/internal/ChannelReference.java @@ -0,0 +1,34 @@ +// Copyright (c) Microsoft. All rights reserved. +package com.microsoft.semantickernel.agents.internal; + +import com.microsoft.semantickernel.agents.AgentChannel; + +/** + * Tracks channel along with its hashed key. + */ +public class ChannelReference { + + private final AgentChannel channel; + private final String hash; + + public ChannelReference(AgentChannel channel, String hash) { + this.channel = channel; + this.hash = hash; + } + + /** + * The referenced channel. + * @return The referenced channel. + */ + public AgentChannel getChannel() { + return channel; + } + + /** + * The channel hash. + * @return The channel hash. + */ + public String getHash() { + return hash; + } +} diff --git a/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/internal/KeyEncoder.java b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/internal/KeyEncoder.java new file mode 100644 index 00000000..78f0f062 --- /dev/null +++ b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/internal/KeyEncoder.java @@ -0,0 +1,35 @@ +// Copyright (c) Microsoft. All rights reserved. +package com.microsoft.semantickernel.agents.internal; + +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.List; + +import com.microsoft.semantickernel.exceptions.SKException; + +import java.util.Base64; + +/** + * Utility to encode a list of string keys to a base-64 encoded hash. + */ +public class KeyEncoder { + /** + * Produces a base-64 encoded hash for a set of input strings. + * + * @param keys A set of input strings + * @return A base-64 encoded hash + */ + public static String generateHash(List keys) { + final MessageDigest digest; + try { + digest = MessageDigest.getInstance("SHA-256"); + } catch (NoSuchAlgorithmException e) { + // if this happens, something is very wrong with the JVM + throw new SKException("Failed to generate hash", e); + } + String key = String.join(":", keys); + byte[] hash = digest.digest(key.getBytes(StandardCharsets.UTF_8)); + return Base64.getEncoder().encodeToString(hash); + } +} From 869d571d63db80b9abeeea09da71e7e9f840060d Mon Sep 17 00:00:00 2001 From: David Grieve Date: Tue, 13 Aug 2024 14:34:49 -0400 Subject: [PATCH 7/7] Agents work in progress --- .../semantickernel/agents/AgentChannel.java | 4 +- .../agents/ChatHistoryChannel.java | 60 ----------------- .../agents/ChatHistoryHandler.java | 26 -------- .../agents/ChatHistoryKernelAgent.java | 65 ------------------- 4 files changed, 3 insertions(+), 152 deletions(-) delete mode 100644 semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/ChatHistoryChannel.java delete mode 100644 semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/ChatHistoryHandler.java delete mode 100644 semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/ChatHistoryKernelAgent.java diff --git a/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/AgentChannel.java b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/AgentChannel.java index 3c7b12ca..627a68f6 100644 --- a/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/AgentChannel.java +++ b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/AgentChannel.java @@ -3,6 +3,8 @@ import java.util.List; +import javax.annotation.Nonnull; + import com.microsoft.semantickernel.services.chatcompletion.ChatMessageContent; import reactor.core.publisher.Mono; @@ -29,7 +31,7 @@ public interface AgentChannel { * @param agent The agent actively interacting with the chat. * @return Asynchronous enumeration of messages. */ - Mono>> invokeAsync(Agent agent); + Mono>> invokeAsync(@Nonnull Agent agent); /** * Retrieve the message history specific to this channel. diff --git a/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/ChatHistoryChannel.java b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/ChatHistoryChannel.java deleted file mode 100644 index 7237dd33..00000000 --- a/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/ChatHistoryChannel.java +++ /dev/null @@ -1,60 +0,0 @@ -// Copyright (c) Microsoft. All rights reserved. -package com.microsoft.semantickernel.agents; - -import java.util.List; - -import com.microsoft.semantickernel.services.chatcompletion.ChatHistory; -import com.microsoft.semantickernel.services.chatcompletion.ChatMessageContent; - -import reactor.core.publisher.Mono; - -/** - * A {@link AgentChannel} specialization that acts upon a {@link ChatHistoryHandler}. - */ -public class ChatHistoryChannel implements AgentChannel { - - private final ChatHistory history; - - public ChatHistoryChannel(ChatHistory history) { - this.history = history; - } - - /** - * Invokes the channel asynchronously. - * - * @param agent The agent. - * @return An asynchronous stream of chat messages. - */ - @Override - public Mono>> invokeAsync(Agent agent) { - return Mono.error(new UnsupportedOperationException("Not implemented")); - } - - /** - * Receives chat messages asynchronously. - * - * @param history The chat message history. - */ - @Override - public Mono receiveAsync(List> history) { - return null; - } - - /** - * Gets the chat message history asynchronously. - * - * @return An asynchronous stream of chat messages. - */ - @Override - public Mono>> getHistoryAsync() { - return Mono.just(this.history.getMessages()); - } - - /** - * Initializes a new instance of the {@link ChatHistoryChannel} class. - */ - public ChatHistoryChannel() { - this.history = new ChatHistory(); - } - -} diff --git a/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/ChatHistoryHandler.java b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/ChatHistoryHandler.java deleted file mode 100644 index aaa22b73..00000000 --- a/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/ChatHistoryHandler.java +++ /dev/null @@ -1,26 +0,0 @@ -// Copyright (c) Microsoft. All rights reserved. -package com.microsoft.semantickernel.agents; - -import java.util.List; - -import org.reactivestreams.Publisher; - -import com.microsoft.semantickernel.services.chatcompletion.ChatHistory; -import com.microsoft.semantickernel.services.chatcompletion.ChatMessageContent; - -import reactor.core.publisher.Mono; - -/** - * Contract for an agent that utilizes a {@link ChatHistoryChannel}. - */ -public interface ChatHistoryHandler extends AgentChannel { - - /** - * Entry point for calling into an agent from a {@link ChatHistoryChannel}. - * - * @param history The chat history at the point the channel is created. - * @return A {@link Publisher} of {@link ChatMessageContent}. - */ - Mono>> invokeAsync(ChatHistory history); - -} diff --git a/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/ChatHistoryKernelAgent.java b/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/ChatHistoryKernelAgent.java deleted file mode 100644 index a8c5004b..00000000 --- a/semantickernel-experimental/src/main/java/com/microsoft/semantickernel/agents/ChatHistoryKernelAgent.java +++ /dev/null @@ -1,65 +0,0 @@ -// Copyright (c) Microsoft. All rights reserved. -package com.microsoft.semantickernel.agents; - -import java.util.Collections; -import java.util.List; - -import javax.annotation.Nullable; - -import com.microsoft.semantickernel.Kernel; -import com.microsoft.semantickernel.services.chatcompletion.ChatHistory; -import com.microsoft.semantickernel.services.chatcompletion.ChatMessageContent; - -import reactor.core.publisher.Mono; - -/** - * A {@link KernelAgent} specialization bound to a {@link ChatHistoryChannel}. - */ -public abstract class ChatHistoryKernelAgent extends KernelAgent implements ChatHistoryHandler { - - /** - * Construct a new {@link ChatHistoryKernelAgent} instance. - * - * @param id The identifier of the agent. - * @param name The name of the agent. - * @param description The description of the agent. - * @param instructions The instructions for the agent. - * @param kernel The kernel. - */ - public ChatHistoryKernelAgent( - @Nullable String id, - @Nullable String name, - @Nullable String description, - @Nullable String instructions, - @Nullable Kernel kernel) { - super(id, name, description, instructions, kernel); - } - - @Override - public List getChannelKeys() { - return Collections.singletonList(ChatHistoryChannel.class.getName()); - } - - - @Override - public Mono createChannelAsync() { - return Mono.just(new ChatHistoryChannel()); - } - - /** - * Invokes asynchronously. - * - * @param history The chat history. - * @return An asynchronous sequence of {@link ChatMessageContent}. - */ - public abstract Mono>> invokeAsync(ChatHistory history); - - public static abstract class Builder extends KernelAgent.Builder { - // No additional properties. - - protected Builder() { - super(); - } - } - -}