-
Notifications
You must be signed in to change notification settings - Fork 23
Add Mem0 integration - support for Mem0 platform #391
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Pull Request Test Coverage Report for Build 20066169239Details
💛 - Coveralls |
…ental into mem0-integration
haystack_experimental/memory_stores/examples/memory_store_with_agent.py
Outdated
Show resolved
Hide resolved
| confirmation_strategies: Optional[dict[str, ConfirmationStrategy]] = None, | ||
| tool_invoker_kwargs: Optional[dict[str, Any]] = None, | ||
| chat_message_store: Optional[ChatMessageStore] = None, | ||
| memory_store: Optional["Mem0MemoryStore"] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should use the type defined in memory_stores.types.protocol
| # SPDX-FileCopyrightText: 2022-present deepset GmbH <[email protected]> | ||
| # | ||
| # SPDX-License-Identifier: Apache-2.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should move this example out of this PR and it could be the start of the cookbook or tutorial
| from __future__ import annotations | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this needed?
| if TYPE_CHECKING: | ||
| from haystack_experimental.memory_stores.mem0.memory_store import Mem0MemoryStore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be dropped once this comment is addressed
| result = {**exe_context.state.data} | ||
| if msgs := result.get("messages"): | ||
| result["last_message"] = msgs[-1] | ||
| result["messages"] = msgs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be removed, it looks redundant. msgs is already equal to result["messages"]
| new_memories = [ | ||
| message for message in msgs if message.role.value == "user" or message.role.value == "assistant" | ||
| ] | ||
| if self._memory_store: | ||
| self._memory_store.add_memories(messages=new_memories, **memory_store_kwargs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's create the new list only if it will be used. Also I think it's simpler to just throw out the system message, since that's what contains the retrieved memories currently right?
| new_memories = [ | |
| message for message in msgs if message.role.value == "user" or message.role.value == "assistant" | |
| ] | |
| if self._memory_store: | |
| self._memory_store.add_memories(messages=new_memories, **memory_store_kwargs) | |
| if self._memory_store: | |
| new_memories = [ | |
| message for message in msgs if message.role.value != "system" | |
| ] | |
| self._memory_store.add_memories(messages=new_memories, **memory_store_kwargs) |
| if msgs := result.get("messages"): | ||
| result["last_message"] = msgs[-1] | ||
| result["messages"] = msgs | ||
| result["last_message"] = msgs[-1] if msgs else None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Revert back. The if msgs := result.get("messages"): already covers this. Using the walrus operator here means that if msgs is None then everything under the if statement is skipped
| result["last_message"] = msgs[-1] if msgs else None | |
| result["last_message"] = msgs[-1] |
|
@Amnah199 some remaining tasks:
|
| Search for memories in the store. | ||
| :param query: Text query to search for. If not provided, all memories may be returned. | ||
| :param filters: Backend-specific filter structure. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think ideally we don't directly use the backend-specific filter structure but instead use the Haystack filter syntax. I think we should reuse the filter structure we already have here https://docs.haystack.deepset.ai/docs/metadata-filtering
So action items would-be:
- update this docstring to indicate that Haystack filters are used. Look at the protocol for DocumentStore for inspiration
- update the Mem0MemoryStore to convert Haystack filters into ones used by Mem0. You can find an example of this how we do this for our OpenSearch integration here. Basically the
normalize_filtersfunction converts Haystack filters to OpenSearch filters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the clear instructions!
| self, | ||
| *, | ||
| query: Optional[str] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get the impression that query should always be required right? Or is there a way to call Mem0 without a query to search for memories?
| user_id: Optional[str] = None, | ||
| run_id: Optional[str] = None, | ||
| agent_id: Optional[str] = None, | ||
| include_memory_metadata: bool = False, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's replace this with just kwargs
| include_memory_metadata: bool = False, | |
| **kwargs: Any, |
this way any specific implementation of MemoryStore can add additional keyword arguments and still satisfy the protocol.
And the include_memory_metadata doesn't feel appropriate to enforce for a generic protocol.
| def add_memories( | ||
| self, | ||
| *, | ||
| messages: list[ChatMessage], | ||
| user_id: Optional[str] = None, | ||
| run_id: Optional[str] = None, | ||
| agent_id: Optional[str] = None, | ||
| ) -> list[str]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a good middle of the road approach here would be to change this to
| def add_memories( | |
| self, | |
| *, | |
| messages: list[ChatMessage], | |
| user_id: Optional[str] = None, | |
| run_id: Optional[str] = None, | |
| agent_id: Optional[str] = None, | |
| ) -> list[str]: | |
| def add_memories( | |
| self, | |
| *, | |
| messages: list[ChatMessage], | |
| user_id: Optional[str] = None, | |
| **kwargs: Any, | |
| ) -> list[str]: |
so we always require an implementation of MemoryStore at least has a user_id but then all other ids that may be specific to the specific backend aren't required for all implementations.
If you agree let's update all functions in this protocol to work this way
|
|
||
| # Retrieve memories from the memory store | ||
| if self._memory_store: | ||
| retrieved_memory = self._memory_store.search_memories_as_single_message( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should use the search_memories method here otherwise if we implement another backend MemoryStore it won't work with Agent since Agent relies on a method not defined in the protocol
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, but I think one of the concerns was that we wanted to pass memories as a single message instead of a bunch of system messages for the model to process memories more efficiently.
I am thinking we can add this method to the protocol perhaps.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right okay. Perhaps we can go back on what I said earlier and do the formatting of the single ChatMessage inside of agent here and drop the search_memories_as_single_message from the memory store
| if include_memory_metadata: | ||
| # we also include the mem0 related metadata i.e. memory_id, score, etc. | ||
| # metadata | ||
| for memory in memories["results"]: | ||
| meta = memory["metadata"].copy() if memory["metadata"] else {} | ||
| meta["retrieved_memory_metadata"] = memory.copy() | ||
| meta["retrieved_memory_metadata"].pop("memory") | ||
| messages = [ | ||
| ChatMessage.from_system(text=memory["memory"], meta=meta) for memory in memories["results"] | ||
| ] | ||
| else: | ||
| # we only include the metadata stored in the memory in ChatMessage | ||
| messages = [ | ||
| ChatMessage.from_system(text=memory["memory"], meta=memory["metadata"]) | ||
| for memory in memories["results"] | ||
| ] | ||
| return messages |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can simplify this to
| if include_memory_metadata: | |
| # we also include the mem0 related metadata i.e. memory_id, score, etc. | |
| # metadata | |
| for memory in memories["results"]: | |
| meta = memory["metadata"].copy() if memory["metadata"] else {} | |
| meta["retrieved_memory_metadata"] = memory.copy() | |
| meta["retrieved_memory_metadata"].pop("memory") | |
| messages = [ | |
| ChatMessage.from_system(text=memory["memory"], meta=meta) for memory in memories["results"] | |
| ] | |
| else: | |
| # we only include the metadata stored in the memory in ChatMessage | |
| messages = [ | |
| ChatMessage.from_system(text=memory["memory"], meta=memory["metadata"]) | |
| for memory in memories["results"] | |
| ] | |
| return messages | |
| messages = [] | |
| for memory in memories["results"]: | |
| meta = memory["metadata"].copy() if memory["metadata"] else {} | |
| # we also include the mem0 related metadata i.e. memory_id, score, etc. | |
| if include_memory_metadata: | |
| meta["retrieved_memory_metadata"] = memory.copy() | |
| meta["retrieved_memory_metadata"].pop("memory") | |
| messages.append( | |
| ChatMessage.from_system(text=memory["memory"], meta=meta) | |
| ) | |
| return messages |
|
|
||
| try: | ||
| self.client.delete_all(**ids, **kwargs) | ||
| logger.info(f"All memories deleted successfully for scope {ids}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make sure to not use f-strings in log statements and use the expected Haystack format
| logger.info(f"All memories deleted successfully for scope {ids}") | |
| logger.info("All memories deleted successfully for scope {ids}", ids=ids) |
| """ | ||
| try: | ||
| self.client.delete(memory_id=memory_id, **kwargs) | ||
| logger.info(f"Memory {memory_id} deleted successfully") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| logger.info(f"Memory {memory_id} deleted successfully") | |
| logger.info("Memory {memory_id} deleted successfully", memory_id=memory_id) |
| # mem0 doesn't allow passing filter to delete endpoint, | ||
| # we can delete all memories for a user by passing the user_id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It makes sense it doesn't all passing the filters otherwise it wouldn't be all right?
I think it's fine to remove this comment.
| def delete_all_memories( | ||
| self, | ||
| *, | ||
| user_id: Optional[str] = None, | ||
| run_id: Optional[str] = None, | ||
| agent_id: Optional[str] = None, | ||
| ) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed that in your Mem0 implementation that you are exposing additional kwargs in this function so we should also add it to the protocol. Dropping run_id and agent_id based on this comment
| def delete_all_memories( | |
| self, | |
| *, | |
| user_id: Optional[str] = None, | |
| run_id: Optional[str] = None, | |
| agent_id: Optional[str] = None, | |
| ) -> None: | |
| def delete_all_memories( | |
| self, | |
| *, | |
| user_id: Optional[str] = None, | |
| **kwargs: Any | |
| ) -> None: |
| """ | ||
| ... | ||
|
|
||
| def delete_memory(self, memory_id: str) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here I noticed that you expose kwargs in the Mem0 implementation so let's add it to the protocol
| def delete_memory(self, memory_id: str) -> None: | |
| def delete_memory(self, memory_id: str, **kwargs: Any) -> None: |
| memory_text = f"Here are the relevant memories for the user's query: {retrieved_memory.text}" | ||
| updated_memory = ChatMessage.from_system(text=memory_text, meta=retrieved_memory.meta) | ||
| else: | ||
| updated_memory = None | ||
|
|
||
| combined_messages = messages + [updated_memory] if updated_memory else messages | ||
| if updated_system_prompt is not None: | ||
| combined_messages = [ChatMessage.from_system(updated_system_prompt)] + combined_messages |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure this is a good idea. Typically only one system prompt is ever used in the Chat History.
Sorry if we discussed this before, but which model providers have you tested that this works with?
If we really want to put the memories in a system prompt (I'm still not 100% this is best idea) we should probably add them directly to the updated_system_prompt so then at least we have a single system message.
Related Issues
Proposed Changes:
Key Changes
Mem0MemoryStoreusing the latest user message as the query.How did you test it?
Notes for the reviewer
Support for
Memory.from_configwill be revised in a separate PR as it has a slightly different implementation than mem0MemoryClient.https://docs.mem0.ai/platform/platform-vs-oss
Checklist
fix:,feat:,build:,chore:,ci:,docs:,style:,refactor:,perf:,test:.