エムスリーテックブログ

エムスリー(m3)のエンジニア・開発メンバーによる技術ブログです

ChatGPT Retrieval Pluginに任意のベクトル検索エンジンProviderを実装する

Overview

エムスリーエンジニアリンググループ AI・機械学習チームでソフトウェアエンジニアをしている中村(po3rin) です。検索とGoが好きです。

エムスリーではChatGPTの可能性にいち早く注目して活用を検討している段階ですが、本格的なデータ投入にはまだ懸念もあり、セキュリティチームと検討を進めている段階です。

そんな中で個人または組織のドキュメントのセマンティック検索と取得を可能にするChatGPTプラグイン「ChatGPT Retrieval Plugin」が登場しました。

github.com

情報検索好きとしては黙っていられず、外部公開用のエムスリーAI・機械学習チームのメンバー紹介ドキュメントを使ってローカルで試してみました。

# 用意したドキュメント

中村弘武は東京都在住で、エムスリーという企業で働いでいます。
エムスリーの検索基盤を主に担当しています。また、書籍推薦システム開発なども行っています。

農見俊明は前職では自然言語処理をタンパク質解析に応用するモデルの開発に従事
エムスリーでは推薦アルゴリズム・システムの構築とREST API開発を担当

浮田純平は医学部出身 (MD)・医学博士 (PhD)。学部時より約8年間、機械学習 (特に深層学習) を用いた生体データの解析や深層学習の研究を行った。
大学院では自身の研究のほか、計算機サーバーの立ち上げや学部生の研究指導にも従事した。

質問

# (省略: コードは後ほど紹介します)..

response = index.query("エムスリーAIチームに所属する医学部出身のエンジニアを“名前:特徴“のフォーマットで一人教えてください")
print(response)

結果

名前: 浮田純平 特徴: 医学部出身 (MD)・医学博士 (PhD)であり、機械学習 (特に深層学習) を用いた生体データの解析や深層学習の研究を約8年間行ってきた。また、大学院では自身の研究のほか、計算機サーバーの立ち上げや学部生の研究指導にも従事している。さらに、エムスリーでは検索基盤を主に担当し、書籍推薦システム...

なんか僕のプロフィールをしれっと最後に混ざて嘘を言っていますが(笑)、それらしい回答は得られました。これでChatGPTの情報源を拡張できることを確認できました。

動かすだけだとつまらないので、今回はChatGPT Retrieval Pluginがサポートしているベクトル検索エンジンではなく、AWSで利用できるOpenSearch のProviderを実装して、ChatGPTにOpenSearchのベクトル検索を提供する方法を試してみました。

このブログを読むことで、ChatGPT Retrieval Pluginの動作の理解と、皆さんが日頃使っている慣れ親しんだベクトル検索エンジンをChatGPT Retrieval Pluginに対応させる知見を獲得できます。

ChatGPT Retrieval Plugin とは

個人または組織のドキュメントのセマンティック検索と取得を可能にするChatGPTプラグインであり、ソースから最も関連性の高いドキュメントスニペットを取得してChatGPTで利用します。これを利用することで社内ドキュメントや個人のTODOリストなどのプラグインなどが作れます。プラグインには認証機能もつけられるので、サービスの有料ユーザー限定の医療相談プラグインなども作れます。

内部ではベクトル検索エンジンが使われており、図にすると下記のような構造になっています。

ChatGPT Retrieval Pluginを利用する際のアーキテクチャ

ChatGPTがChatGPT Retrieval Pluginとして立てたAPIにクエリを投げて、返ってきたスニペットをユーザーへの回答に利用します。内部ではtext-embedding-ada-002 embeddings modelを使ってクエリやドキュメントのベクトルを取得して、ベクトル検索エンジンにインデックスしたり検索したりします。ベクトル検索エンジンはChatGPT Retrieval Pluginがサポートしているものであれば、すぐに使い始めることができます。

ChatGPTがPluginとやり取りする仕組み

ChatGPT Retrieval Pluginを触ってみる前にChatGPTが各種プラグインを利用する仕組みを確認しておきましょう。これを理解することで、後に説明するChatGPT Retrieval Pluginの理解が容易になるとともに、ChatGPT Retrieval Pluginに囚われない独自のPluginを作ることも可能です。

PluginはAPIとして立てる必要があります。APIは/.well-known/ai-plugin.jsonでPluginのマニフェストをホストする必要があります。下記はドキュメントから持ってきたマニフェストの例です。

{
  "schema_version": "v1",
  "name_for_human": "TODO Plugin",
  "name_for_model": "todo",
  "description_for_human": "Plugin for managing a TODO list. You can add, remove and view your TODOs.",
  "description_for_model": "Plugin for managing a TODO list. You can add, remove and view your TODOs.",
  "auth": {
    "type": "none"
  },
  "api": {
    "type": "openapi",
    "url": "http://localhost:3333/openapi.yaml",
    "is_user_authenticated": false
  },
  "logo_url": "https://vsq7s0-5001.preview.csb.app/logo.png",
  "contact_email": "support@example.com",
  "legal_info_url": "http://www.example.com/legal"
}

modelへのAPIの説明やAPIのドキュメントへのリンク(http://localhost:3333/openapi.yaml)などがあります。APIのドキュメントはOpenAPIで記述されている必要があります。ChatGPTがこれらの設定を読み込み、APIの使い方を理解します。

つまり、マニフェストを返すAPIさえ作れれば任意のPluginがすぐに実装可能です。

今回のChatGPT Retrieval Pluginを実行すると、PythonのWebフレームワークであるFastAPIでAPIが立ち上がるようになっており、マニフェストファイルだけ修正すれば、OpenAPIドキュメント含めすぐに利用できる状態になっています。

Pluginの作り方については下記のドキュメントをご覧ください。

platform.openai.com

既にサポートされているベクトル検索エンジンを使ってChatGPT Retrieval Pluginを試す

今回のブログの本題ではないのでサクッと紹介します。

初めてChatGPT Retrieval Pluginを触る際にはLlamaIndexを使ってローカルで試してみるのが簡単です。 LlamaIndexはOpenAIのLLMに独自のデータを読み込ませる仕組みでローカルでも動かせます。

gpt-index.readthedocs.io

実行方法の紹介はnpakaさんのブログの方が詳しいのでこちらもご覧ください。

note.com

今回はRust製ベクトル検索エンジンであるQdrantを利用します。

qdrant.tech

Qdrantはコンテナイメージが提供されているのでそちらを使います。

$ docker pull qdrant/qdrant
$ docker run -p 6333:6333 qdrant/qdrant

そして、Plugin環境変数をセットします。Qdrantの接続先はローカルホストに向いたデフォルトの値が用意されているので、今回は設定しなくてOKです。

DATASTORE=qdrant
BEARER_TOKEN=XXXXXXXXX
OPENAI_API_KEY=XXXXXXXXX

BEARER_TOKENhttps://jwt.io/で所得でき、OPENAI_API_KEY下記から作成できます。

platform.openai.com

今回使うPluginをローカルに持ってきて環境変数を読み込んでAPIサーバーを起動します。

$ git clone https://github.com/openai/chatgpt-retrieval-plugin.git
$ cd chatgpt-retrieval-plugin
$ poetry install
$ poetry run start

これでPluginとして呼び出すAPIが立ち上がりました。OpenAPIの仕様をhttp://0.0.0.0:8000/docsで確認できます。

OpenAPIの仕様を確認

このAPIを呼び出すことで、ChatGPTがクエリに関するスニペットを取得して利用します。

このAPI経由でベクトル検索エンジンにインデックスするドキュメントを用意します。data/sample.txtにドキュメントの内容を記載します。私の動作確認では最近作ったAI・機械学習チームのメンバー紹介スライドの文章を拝借します。皆さんは各々試してみたいドキュメントを用意してください。

今回のローカルにおける動作確認はLlamaIndexを使います。

import os

import openai
import numpy as np
from dotenv import load_dotenv
from llama_index import SimpleDirectoryReader
from llama_index.indices.vector_store import ChatGPTRetrievalPluginIndex

documents = SimpleDirectoryReader("data").load_data()
openai.api_key = os.getenv('OPENAI_API_KEY')

index = ChatGPTRetrievalPluginIndex(
    documents, 
    endpoint_url="http://localhost:8000",
    bearer_token=os.getenv("BEARER_TOKEN"),
)

response = index.query("エムスリーで働く中村弘武とはどういう人物ですか?")
print(response)

結果

中村弘武は32歳の東京都在住のエムスリーのAI・機械学習チームで働くメンバーです。エムスリーの検索基盤を主に担当しています。また、書籍推薦システム開発なども行っています。趣味は麻雀、サウナ、筋トレです。

ドキュメントから取得したスニペットを使って結果が返ってきました。

任意のベクトル検索エンジンが使えるようにProviderを実装する

本題です。今回はChatGPT Retrieval Pluginの内部を覗いて、サポートされていないベクトル検索エンジンであるOpenSearchのProvider実装を試みます。

OpenSearchはベクトル検索をサポートしており、NMSLIBFaissLucene各種ベクトル検索ライブラリを選ぶことができます。

opensearch.org

下記のステップで実装していきます。

  • 実装するべきものをコードから探る
  • DataStoreの用意
  • _upsert実装
  • _query実装
  • delete実装
  • Provider動作確認

実装するべきものをコードから探る

Provider実装の方法を確認するために、現在サポートされているベクトル検索エンジンの実装を見ていきます。Providerの実装はdatastore/providersで見ることができます。例えばRust製のベクトル検索エンジンのQdrantのProviderはdatastore/providers/qdrant_datastore.pyにあります。

class QdrantDataStore(DataStore):
    # ...

ここで抽象クラスのDataStoreを継承しているのを確認できます。DataStoreの実装はdatastore/datastore.pyにあります。

class DataStore(ABC):
    # ...

このクラスを見ると_upsert_querydeleteメソッドを実装すればいいことがわかります。どのDataStoreを使うかはdatastore/factory.pyで決定しています。

async def get_datastore() -> DataStore:
    datastore = os.environ.get("DATASTORE")
    assert datastore is not None

    match datastore:
        case "pinecone":
            from datastore.providers.pinecone_datastore import PineconeDataStore

            return PineconeDataStore()

        # (省略)...

        case "qdrant":
            from datastore.providers.qdrant_datastore import QdrantDataStore

            return QdrantDataStore()
        case _:
            raise ValueError(f"Unsupported vector database: {datastore}")

つまり私たちが任意のベクトル検索エンジンを使いたい時はDataStoreの具象を実装し、get_datastore関数の条件に加えてあげるだけで対応が完了します。これならすぐにできそうです。

DataStoreの用意

早速、datastore/providers/opensearch.pyを追加します。実装はぱっと見る限りOpenSearchに必要な処理に一番近そうだったdatastore/providers・pinecone_datastore.pyを参考に実装していきます。最初に必要なmoduleをimportしておきます。

import os
import json
from typing import Dict, List, Optional
from opensearchpy import OpenSearch
from opensearchpy.helpers import bulk
from tenacity import retry, wait_random_exponential, stop_after_attempt
import asyncio

from datastore.datastore import DataStore
from models.models import (
    DocumentChunk,
    DocumentChunkMetadata,
    DocumentChunkWithScore,
    DocumentMetadataFilter,
    QueryResult,
    QueryWithEmbedding,
    Source,
)

続いてOpenSearchのクライアントの初期化、次元数などを設定しておきます。

OPENSEARCH_INDEX = os.environ.get("OPENSEARCH_INDEX")
OPENSEARCH_URL = os.environ.get("OPENSEARCH_URL") or "http://localhost:9200"
OPENSEARCH_USER = os.environ.get("OPENSEARCH_USER")
OPENSEARCH_PASSWORD = os.environ.get("OPENSEARCH_PASSWORD")
assert OPENSEARCH_INDEX is not None
assert OPENSEARCH_URL is not None

es = OpenSearch(hosts=[OPENSEARCH_URL], basic_auth=f"{OPENSEARCH_USER}:{OPENSEARCH_PASSWORD}")

UPSERT_BATCH_SIZE = 100
OUTPUT_DIM = 1536

続いてProviderに渡されるデータを格納するためのmappingを用意します。OpenSearchでベクトル検索する時にはindexの設定とフィールドのtypeをknn_vectorに設定する必要があります。

mapping = {
    "settings": {
        "index.knn": True
    },
    "mappings": {
        "properties": {
            "chunk_id": {"type": "keyword"},
            "document_id": {"type": "keyword"},
            "text": {"type": "text"},
            "text_vector": {
                "type": "knn_vector",
                "dimension": OUTPUT_DIM,
            },
            "source": {"type": "keyword"},
            "source_id": {"type": "keyword"},
            "url": {"type": "text"},
            "created_at": {"type": "date"},
            "author": {"type": "text"},
        }
    }
}

より詳細な設定は下記のドキュメントをご覧ください。今回はデフォルトのNMSLIBを使っています。

opensearch.org

そして、DataStoreクラスを継承したOpenSearchDataStoreを実装します。ChatGPT Retrieval Pluginの他のProviderでは初期化時にインデックスの作成をしているので、その方法に合わせてOpenSearchDataStoreの初期化時にもインデックスを作成、もしくは存在チェックを行います。

class OpenSearchDataStore(DataStore):
    def __init__(self):
        # Check if the index name is specified and exists in Pinecone
        index_exists = es.indices.exists(index=OPENSEARCH_INDEX)
        if OPENSEARCH_INDEX and not index_exists:

            # Get all fields in the metadata object in a list
            fields_to_index = list(DocumentChunkMetadata.__fields__.keys())

            # Create a new index with the specified name, dimension, and metadata configuration
            try:
                print(
                    f"Creating index {OPENSEARCH_INDEX} with metadata config {fields_to_index}"
                )
                es.indices.create(index=OPENSEARCH_INDEX, body=mapping)
                print(f"Index {OPENSEARCH_INDEX} created successfully")
            except Exception as e:
                print(f"Error creating index {OPENSEARCH_INDEX}: {e}")
                raise e
        elif OPENSEARCH_INDEX and index_exists:
            # Connect to an existing index with the specified name
            try:
                print(f"Connected to index {OPENSEARCH_INDEX} successfully")
            except Exception as e:
                print(f"Error connecting to index {OPENSEARCH_INDEX}: {e}")
                raise e

_upsert実装

続いて、_upsertです。_upsertメソッドを呼ぶ前に、ドキュメントをchunkに分けてembeddingを生成しているので、それをOpenSearchにbulk insertします。bulk insertの細かい設定はドキュメントをご覧ください。

opensearch-project.github.io

    @retry(wait=wait_random_exponential(min=1, max=20), stop=stop_after_attempt(3))
    async def _upsert(self, chunks: Dict[str, List[DocumentChunk]]) -> List[str]:
        """
        Takes in a dict from document id to list of document chunks and inserts them into the index.
        Return a list of document ids.
        """

        # Initialize a list of ids to return
        doc_ids: List[str] = []
        # Initialize a list of vectors to upsert
        index_actions = []
        # Loop through the dict items
        for doc_id, chunk_list in chunks.items():
            # Append the id to the ids list
            doc_ids.append(doc_id)
            print(f"Upserting document_id: {doc_id}")
            for chunk in chunk_list:
                print(f"chunk: {chunk.id}")
                print(chunk.text)
                index_action = {
                    "_id": f"{doc_id}-{chunk.id}",
                    "_op_type": "update",
                    "doc_as_upsert": True,
                    "doc": {
                        "chunk_id": chunk.id,
                        "text": chunk.text,
                        "text_vector": chunk.embedding,
                        "source": chunk.metadata.source,
                        "source_id": chunk.metadata.source_id,
                        "url": chunk.metadata.url,
                        "created_at": chunk.metadata.created_at,
                        "author": chunk.metadata.author
                    },
                }
                index_actions.append(index_action)

        try:
            bulk(es, index_actions, index=OPENSEARCH_INDEX, raise_on_error=False)
        except Exception as e:
            print(f"Error upserting batch: {e}")
            raise e

        return doc_ids

_query実装

そして_queryメソッドを実装します。ベクトル検索の結果をQueryResultにつめて返却します。

    @retry(wait=wait_random_exponential(min=1, max=20), stop=stop_after_attempt(3))
    async def _query(
        self,
        queries: List[QueryWithEmbedding],
    ) -> List[QueryResult]:
        """
        Takes in a list of queries with embeddings and filters and returns a list of query results with matching document chunks and scores.
        """

        # Define a helper coroutine that performs a single query and returns a QueryResult
        async def _single_query(query: QueryWithEmbedding) -> QueryResult:
            print(f"Query: {query.query}")

            q = {
                "query": {
                    "knn": {
                        "text_vector": {
                            "vector": query.embedding,
                            "k": query.top_k
                        }
                    }
                }
            }
            
            try:
                # Query the index with the query embedding, filter, and top_k
                query_response = es.search(index=OPENSEARCH_INDEX, body=json.dumps(q))
            except Exception as e:
                print(f"Error querying index: {e}")
                raise e

            query_results: List[DocumentChunkWithScore] = []
            for result in query_response["hits"]["hits"]:
                score = result["_score"]
                metadata = result["_source"]

                # Remove document id and text from metadata and store it in a new variable
                metadata_without_text = (
                    {key: value for key, value in metadata.items() if key != "text"}
                    if metadata
                    else None
                )

                # If the source is not a valid Source in the Source enum, set it to None
                if (
                    metadata_without_text
                    and "source" in metadata_without_text
                    and metadata_without_text["source"] not in Source.__members__
                ):
                    metadata_without_text["source"] = None

                # Create a document chunk with score object with the result data
                result = DocumentChunkWithScore(
                    id=result["_id"],
                    score=score,
                    text=metadata["text"] if metadata and "text" in metadata else None,
                    metadata=metadata_without_text,
                )
                query_results.append(result)

            return QueryResult(query=query.query, results=query_results)

        # Use asyncio.gather to run multiple _single_query coroutines concurrently and collect their results
        results: List[QueryResult] = await asyncio.gather(
            *[_single_query(query) for query in queries]
        )

        return results

delete実装

そして最後にdeleteを実装します。

    @retry(wait=wait_random_exponential(min=1, max=20), stop=stop_after_attempt(3))
    async def delete(
        self,
        ids: Optional[List[str]] = None,
        filter: Optional[DocumentMetadataFilter] = None,
        delete_all: Optional[bool] = None,
    ) -> bool:
        """
        Removes vectors by ids, filter, or everything from the index.
        """
        # Delete all vectors from the index if delete_all is True
        if delete_all == True:
            try:
                print(f"Deleting all vectors from index")
                es.delete_by_query(index=OPENSEARCH_INDEX, body={"query": {"match_all": {}}})
                print(f"Deleted all vectors successfully")
                return True
            except Exception as e:
                print(f"Error deleting all vectors: {e}")
                raise e

        # Delete vectors that match the document ids from the index if the ids list is not empty
        if ids != None and len(ids) > 0:
            try:
                print(f"Deleting vectors with ids {ids}")
                for document_id in ids:
                    es.delete(index=OPENSEARCH_INDEX, id=document_id)  # type: ignore
                print(f"Deleted vectors with ids successfully")
            except Exception as e:
                print(f"Error deleting vectors with ids: {e}")
                raise e

        return True

_deleteは下記のコードのようにDataStoreクラスのupsertで呼ばれるのでupsert時にも必要な処理です。ここはupsertを直接的にサポートしているベクトル検索エンジンにとっては不要な処理なので、今後改善余地がある箇所だと思います(PRチャンスか?)。

class DataStore(ABC):
    async def upsert(
        self, documents: List[Document], chunk_token_size: Optional[int] = None
    ) -> List[str]:
        """
        Takes in a list of documents and inserts them into the database.
        First deletes all the existing vectors with the document id (if necessary, depends on the vector db), then inserts the new ones.
        Return a list of document ids.
        """
        # Delete any existing vectors for documents with the input document ids
        await asyncio.gather(
            *[
                self.delete(
                    filter=DocumentMetadataFilter(
                        document_id=document.id,
                    ),
                    delete_all=False,
                )
                for document in documents
                if document.id
            ]
        )

        chunks = get_document_chunks(documents, chunk_token_size)

        return await self._upsert(chunks)

Provider動作確認

これでOpenSearch Providerの実装が完了しました。最後に環境変数DATASTOREopensearchが指定されたときにこのProviderを使うようにdatastore/factory.pyに条件を追加します。

def get_datastore() -> DataStore:
    datastore = os.environ.get("DATASTORE")
    assert datastore is not None

    match datastore:
        # ...
        case "opensearch":
            from datastore.providers.opensearch_datastore import OpenSearchDataStore

            return OpenSearchDataStore()
        case _:
            raise ValueError(f"Unsupported vector database: {datastore}")

全ての準備が整いました。最初に動作確認用に作ったスクリプトを実行すれば同じような結果が取得できるはずです。これでOpenSearch Providerの実装ができました。

Elasticsearch Providerは作れるか

実は最初は自分が1番慣れ親しんだElasticsearchで試してみようと思ったのですが、現在ElasticsearchはChatGPT Retrieval Pluginでは利用できないことが分かりました。text-embedding-ada-002が出力する次元数が1536で、Elasticsearchが内部で利用しているLuceneがサポートする次元数の最大値が1024なので、ElasticsearchをProviderとして利用できません。

次元数の制限を増やす議論がされているので、将来的にはこの制限がなくなるかもしれません。

github.com

日本語でこの辺の動向をまとめてくれている記事もあります。

shunyaueta.com

まとめ

今回はChatGPT Retrieval Pluginがサポートしているベクトル検索エンジンではなく、AWSのOpenSearch Providerを実装してChatGPTにベクトル検索を提供する方法を試してみました。

今回の実装を通して、ChatGPT Retrieval PluginのProviderとして利用できる条件としては下記あります。

  • 1536次元のベクトルを扱える
  • upsert/search/deleteが行える

上記を満たすベクトル検索エンジンならDataStore抽象クラスを実装するだけで対応できます。ValdFaissなんかのProviderも実装できますね。ベクトル検索エンジン実装者や、自分のお気に入りのベクトル検索エンジンがある方は是非試してみてください。

今回の実装はもう少し綺麗にして隙があればPRを投げる予定です(OpenSearchを使ったことがないので、これで良いか確認する必要あり)。

弊社でもOpenAIを活用していく流れが来ているので楽しみです。

We're hiring!

弊社では情報検索や機械学習の力で医療を前進させるメンバーを募集中です。少しでも興味があれば1on1しましょう!

jobs.m3.com