Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] Addition of OpenSearchChatMessageHistory to langchain #701

Open
eswarthammana opened this issue Mar 22, 2024 · 1 comment
Open
Labels
enhancement New feature or request

Comments

@eswarthammana
Copy link

Tested the following code with
python 3.11,
langchain 0.1.13
langchain-community 0.0.29
opensearch-py 2.4.2

from time import time
from typing import List, Optional
import json

from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.messages import (
    BaseMessage,
    message_to_dict,
    messages_from_dict,
)

from opensearchpy import OpenSearch
from logs.logger import Log #my custom logger class


class OpenSearchChatMessageHistory(Log, BaseChatMessageHistory):
    """Chat message history that stores history in OpenSearch.

    Args:
        index (str): Name of the index to use.
        session_id (str): Arbitrary key that is used to store the messages
            of a single chat session.
        opensearch_url (Optional[str]): URL of the OpenSearch instance to connect to.
            Defaults to "http://localhost:9200".
        ensure_ascii (Optional[bool]): Used to escape ASCII symbols in json.dumps.
            Defaults to True.
    """

    def __init__(
        self,
        index: str,
        session_id: str,
        opensearch_url: Optional[str] = "http://localhost:9200",
        ensure_ascii: Optional[bool] = True,
    ) -> None:
        super().__init__()
        self.log_info("Initializing the OpenSearchChatMessageHistory class.")
        self.index: str = index
        self.session_id: str = session_id
        self.ensure_ascii: bool = ensure_ascii

        self.client: OpenSearch = OpenSearch([opensearch_url])

        if self.client.indices.exists(index=index):
            self.log_info(
                f"Chat history index '{index}' already exists, skipping creation."
            )
        else:
            self.log_info(f"Creating index '{index}' for storing chat history.")
            self.client.indices.create(
                index=index,
                body={
                    "mappings": {
                        "properties": {
                            "session_id": {"type": "keyword"},
                            "created_at": {"type": "date"},
                            "history": {"type": "text"},
                        }
                    }
                },
            )
        self.log_info("OpenSearchChatMessageHistory class initialized successfully.")

    @property
    def messages(self) -> List[BaseMessage]:
        """Retrieve the messages from OpenSearch."""
        self.log_info("Loading messages from OpenSearch to buffer.")
        result = self.client.search(
            index=self.index,
            body={
                "query": {
                    "term": {
                        "session_id": self.session_id
                    }
                }
            },
            sort="created_at:asc",
        )

        items = [
            json.loads(document["_source"]["history"])
            for document in result.get("hits", {}).get("hits", [])
        ] if result else []

        self.log_info("Messages loaded from OpenSearch to buffer.")
        return [messages_from_dict(item) for item in items]

    def add_message(self, message: BaseMessage) -> None:
        """Add a message to the chat session in OpenSearch."""
        self.log_info("Adding messages to OpenSearch.")
        self.client.index(
            index=self.index,
            body={
                "session_id": self.session_id,
                "created_at": round(time() * 1000),
                "history": json.dumps(
                    message_to_dict(message),
                    ensure_ascii=self.ensure_ascii,
                ),
            },
            refresh=True,
        )
        self.log_info("Messages added to OpenSearch.")

    def clear(self) -> None:
        """Clear session memory in OpenSearch."""
        self.log_info("Purging data in OpenSearch started.")
        self.client.delete_by_query(
            index=self.index,
            body={
                "query": {
                    "term": {
                        "session_id": self.session_id
                        }
                    }
                },
            refresh=True,
        )
        self.log_info("OpenSearch data purged.")

please add this or similar feature to langchain. It is useful when performing Chat conversations ConversationSummaryBufferMemory or ConversationalRetrievalChain from the langchain implementations.

@eswarthammana eswarthammana added enhancement New feature or request untriaged Need triage labels Mar 22, 2024
@dblock dblock removed the untriaged Need triage label Mar 22, 2024
@wbeckler
Copy link
Contributor

Would this be a PR on the langchain repo?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants