Overview
エムスリーエンジニアリンググループ AI・機械学習チームでソフトウェアエンジニアをしている中村(po3rin) です。検索とGoが好きです。
エムスリーではChatGPTの可能性にいち早く注目して活用を検討している段階ですが、本格的なデータ投入にはまだ懸念もあり、セキュリティチームと検討を進めている段階です。
そんな中で個人または組織のドキュメントのセマンティック検索と取得を可能にするChatGPTプラグイン「ChatGPT Retrieval Plugin」が登場しました。
情報検索好きとしては黙っていられず、外部公開用のエムスリーAI・機械学習チームのメンバー紹介ドキュメントを使ってローカルで試してみました。
# 用意したドキュメント 中村弘武は東京都在住で、エムスリーという企業で働いでいます。 エムスリーの検索基盤を主に担当しています。また、書籍推薦システム開発なども行っています。 農見俊明は前職では自然言語処理をタンパク質解析に応用するモデルの開発に従事 エムスリーでは推薦アルゴリズム・システムの構築とREST API開発を担当 浮田純平は医学部出身 (MD)・医学博士 (PhD)。学部時より約8年間、機械学習 (特に深層学習) を用いた生体データの解析や深層学習の研究を行った。 大学院では自身の研究のほか、計算機サーバーの立ち上げや学部生の研究指導にも従事した。
質問
# (省略: コードは後ほど紹介します).. response = index.query("エムスリーAIチームに所属する医学部出身のエンジニアを“名前:特徴“のフォーマットで一人教えてください") print(response)
結果
名前: 浮田純平 特徴: 医学部出身 (MD)・医学博士 (PhD)であり、機械学習 (特に深層学習) を用いた生体データの解析や深層学習の研究を約8年間行ってきた。また、大学院では自身の研究のほか、計算機サーバーの立ち上げや学部生の研究指導にも従事している。さらに、エムスリーでは検索基盤を主に担当し、書籍推薦システム...
なんか僕のプロフィールをしれっと最後に混ざて嘘を言っていますが(笑)、それらしい回答は得られました。これでChatGPTの情報源を拡張できることを確認できました。
動かすだけだとつまらないので、今回はChatGPT Retrieval Pluginがサポートしているベクトル検索エンジンではなく、AWSで利用できるOpenSearch のProviderを実装して、ChatGPTにOpenSearchのベクトル検索を提供する方法を試してみました。
このブログを読むことで、ChatGPT Retrieval Pluginの動作の理解と、皆さんが日頃使っている慣れ親しんだベクトル検索エンジンをChatGPT Retrieval Pluginに対応させる知見を獲得できます。
- Overview
- ChatGPT Retrieval Plugin とは
- ChatGPTがPluginとやり取りする仕組み
- 既にサポートされているベクトル検索エンジンを使ってChatGPT Retrieval Pluginを試す
- 任意のベクトル検索エンジンが使えるようにProviderを実装する
- DataStoreの用意
- delete実装
- Elasticsearch Providerは作れるか
- まとめ
- We're hiring!
ChatGPT Retrieval Plugin とは
個人または組織のドキュメントのセマンティック検索と取得を可能にするChatGPTプラグインであり、ソースから最も関連性の高いドキュメントスニペットを取得してChatGPTで利用します。これを利用することで社内ドキュメントや個人のTODOリストなどのプラグインなどが作れます。プラグインには認証機能もつけられるので、サービスの有料ユーザー限定の医療相談プラグインなども作れます。
内部ではベクトル検索エンジンが使われており、図にすると下記のような構造になっています。
ChatGPTがChatGPT Retrieval Pluginとして立てたAPIにクエリを投げて、返ってきたスニペットをユーザーへの回答に利用します。内部ではtext-embedding-ada-002
embeddings modelを使ってクエリやドキュメントのベクトルを取得して、ベクトル検索エンジンにインデックスしたり検索したりします。ベクトル検索エンジンはChatGPT Retrieval Pluginがサポートしているものであれば、すぐに使い始めることができます。
ChatGPTがPluginとやり取りする仕組み
ChatGPT Retrieval Pluginを触ってみる前にChatGPTが各種プラグインを利用する仕組みを確認しておきましょう。これを理解することで、後に説明するChatGPT Retrieval Pluginの理解が容易になるとともに、ChatGPT Retrieval Pluginに囚われない独自のPluginを作ることも可能です。
PluginはAPIとして立てる必要があります。APIは/.well-known/ai-plugin.json
でPluginのマニフェストをホストする必要があります。下記はドキュメントから持ってきたマニフェストの例です。
{ "schema_version": "v1", "name_for_human": "TODO Plugin", "name_for_model": "todo", "description_for_human": "Plugin for managing a TODO list. You can add, remove and view your TODOs.", "description_for_model": "Plugin for managing a TODO list. You can add, remove and view your TODOs.", "auth": { "type": "none" }, "api": { "type": "openapi", "url": "http://localhost:3333/openapi.yaml", "is_user_authenticated": false }, "logo_url": "https://vsq7s0-5001.preview.csb.app/logo.png", "contact_email": "support@example.com", "legal_info_url": "http://www.example.com/legal" }
modelへのAPIの説明やAPIのドキュメントへのリンク(http://localhost:3333/openapi.yaml
)などがあります。APIのドキュメントはOpenAPIで記述されている必要があります。ChatGPTがこれらの設定を読み込み、APIの使い方を理解します。
つまり、マニフェストを返すAPIさえ作れれば任意のPluginがすぐに実装可能です。
今回のChatGPT Retrieval Pluginを実行すると、PythonのWebフレームワークであるFastAPIでAPIが立ち上がるようになっており、マニフェストファイルだけ修正すれば、OpenAPIドキュメント含めすぐに利用できる状態になっています。
Pluginの作り方については下記のドキュメントをご覧ください。
既にサポートされているベクトル検索エンジンを使ってChatGPT Retrieval Pluginを試す
今回のブログの本題ではないのでサクッと紹介します。
初めてChatGPT Retrieval Pluginを触る際にはLlamaIndexを使ってローカルで試してみるのが簡単です。 LlamaIndexはOpenAIのLLMに独自のデータを読み込ませる仕組みでローカルでも動かせます。
実行方法の紹介はnpakaさんのブログの方が詳しいのでこちらもご覧ください。
今回はRust製ベクトル検索エンジンであるQdrantを利用します。
Qdrantはコンテナイメージが提供されているのでそちらを使います。
$ docker pull qdrant/qdrant $ docker run -p 6333:6333 qdrant/qdrant
そして、Plugin環境変数をセットします。Qdrantの接続先はローカルホストに向いたデフォルトの値が用意されているので、今回は設定しなくてOKです。
DATASTORE=qdrant BEARER_TOKEN=XXXXXXXXX OPENAI_API_KEY=XXXXXXXXX
BEARER_TOKEN
はhttps://jwt.io/で所得でき、OPENAI_API_KEY
下記から作成できます。
今回使うPluginをローカルに持ってきて環境変数を読み込んでAPIサーバーを起動します。
$ git clone https://github.com/openai/chatgpt-retrieval-plugin.git $ cd chatgpt-retrieval-plugin $ poetry install $ poetry run start
これでPluginとして呼び出すAPIが立ち上がりました。OpenAPIの仕様をhttp://0.0.0.0:8000/docsで確認できます。
このAPIを呼び出すことで、ChatGPTがクエリに関するスニペットを取得して利用します。
このAPI経由でベクトル検索エンジンにインデックスするドキュメントを用意します。data/sample.txt
にドキュメントの内容を記載します。私の動作確認では最近作ったAI・機械学習チームのメンバー紹介スライドの文章を拝借します。皆さんは各々試してみたいドキュメントを用意してください。
今回のローカルにおける動作確認はLlamaIndex
を使います。
import os import openai import numpy as np from dotenv import load_dotenv from llama_index import SimpleDirectoryReader from llama_index.indices.vector_store import ChatGPTRetrievalPluginIndex documents = SimpleDirectoryReader("data").load_data() openai.api_key = os.getenv('OPENAI_API_KEY') index = ChatGPTRetrievalPluginIndex( documents, endpoint_url="http://localhost:8000", bearer_token=os.getenv("BEARER_TOKEN"), ) response = index.query("エムスリーで働く中村弘武とはどういう人物ですか?") print(response)
結果
中村弘武は32歳の東京都在住のエムスリーのAI・機械学習チームで働くメンバーです。エムスリーの検索基盤を主に担当しています。また、書籍推薦システム開発なども行っています。趣味は麻雀、サウナ、筋トレです。
ドキュメントから取得したスニペットを使って結果が返ってきました。
任意のベクトル検索エンジンが使えるようにProviderを実装する
本題です。今回はChatGPT Retrieval Pluginの内部を覗いて、サポートされていないベクトル検索エンジンであるOpenSearchのProvider実装を試みます。
OpenSearchはベクトル検索をサポートしており、NMSLIBやFaiss、Lucene各種ベクトル検索ライブラリを選ぶことができます。
下記のステップで実装していきます。
- 実装するべきものをコードから探る
- DataStoreの用意
- _upsert実装
- _query実装
- delete実装
- Provider動作確認
実装するべきものをコードから探る
Provider実装の方法を確認するために、現在サポートされているベクトル検索エンジンの実装を見ていきます。Providerの実装はdatastore/providers
で見ることができます。例えばRust製のベクトル検索エンジンのQdrantのProviderはdatastore/providers/qdrant_datastore.py
にあります。
class QdrantDataStore(DataStore): # ...
ここで抽象クラスのDataStore
を継承しているのを確認できます。DataStore
の実装はdatastore/datastore.py
にあります。
class DataStore(ABC): # ...
このクラスを見ると_upsert
、_query
、delete
メソッドを実装すればいいことがわかります。どのDataStoreを使うかはdatastore/factory.py
で決定しています。
async def get_datastore() -> DataStore: datastore = os.environ.get("DATASTORE") assert datastore is not None match datastore: case "pinecone": from datastore.providers.pinecone_datastore import PineconeDataStore return PineconeDataStore() # (省略)... case "qdrant": from datastore.providers.qdrant_datastore import QdrantDataStore return QdrantDataStore() case _: raise ValueError(f"Unsupported vector database: {datastore}")
つまり私たちが任意のベクトル検索エンジンを使いたい時はDataStore
の具象を実装し、get_datastore
関数の条件に加えてあげるだけで対応が完了します。これならすぐにできそうです。
DataStoreの用意
早速、datastore/providers/opensearch.py
を追加します。実装はぱっと見る限りOpenSearchに必要な処理に一番近そうだったdatastore/providers・pinecone_datastore.py
を参考に実装していきます。最初に必要なmoduleをimportしておきます。
import os import json from typing import Dict, List, Optional from opensearchpy import OpenSearch from opensearchpy.helpers import bulk from tenacity import retry, wait_random_exponential, stop_after_attempt import asyncio from datastore.datastore import DataStore from models.models import ( DocumentChunk, DocumentChunkMetadata, DocumentChunkWithScore, DocumentMetadataFilter, QueryResult, QueryWithEmbedding, Source, )
続いてOpenSearchのクライアントの初期化、次元数などを設定しておきます。
OPENSEARCH_INDEX = os.environ.get("OPENSEARCH_INDEX") OPENSEARCH_URL = os.environ.get("OPENSEARCH_URL") or "http://localhost:9200" OPENSEARCH_USER = os.environ.get("OPENSEARCH_USER") OPENSEARCH_PASSWORD = os.environ.get("OPENSEARCH_PASSWORD") assert OPENSEARCH_INDEX is not None assert OPENSEARCH_URL is not None es = OpenSearch(hosts=[OPENSEARCH_URL], basic_auth=f"{OPENSEARCH_USER}:{OPENSEARCH_PASSWORD}") UPSERT_BATCH_SIZE = 100 OUTPUT_DIM = 1536
続いてProviderに渡されるデータを格納するためのmappingを用意します。OpenSearchでベクトル検索する時にはindex
の設定とフィールドのtypeをknn_vector
に設定する必要があります。
mapping = { "settings": { "index.knn": True }, "mappings": { "properties": { "chunk_id": {"type": "keyword"}, "document_id": {"type": "keyword"}, "text": {"type": "text"}, "text_vector": { "type": "knn_vector", "dimension": OUTPUT_DIM, }, "source": {"type": "keyword"}, "source_id": {"type": "keyword"}, "url": {"type": "text"}, "created_at": {"type": "date"}, "author": {"type": "text"}, } } }
より詳細な設定は下記のドキュメントをご覧ください。今回はデフォルトのNMSLIBを使っています。
そして、DataStore
クラスを継承したOpenSearchDataStore
を実装します。ChatGPT Retrieval Pluginの他のProviderでは初期化時にインデックスの作成をしているので、その方法に合わせてOpenSearchDataStore
の初期化時にもインデックスを作成、もしくは存在チェックを行います。
class OpenSearchDataStore(DataStore): def __init__(self): # Check if the index name is specified and exists in Pinecone index_exists = es.indices.exists(index=OPENSEARCH_INDEX) if OPENSEARCH_INDEX and not index_exists: # Get all fields in the metadata object in a list fields_to_index = list(DocumentChunkMetadata.__fields__.keys()) # Create a new index with the specified name, dimension, and metadata configuration try: print( f"Creating index {OPENSEARCH_INDEX} with metadata config {fields_to_index}" ) es.indices.create(index=OPENSEARCH_INDEX, body=mapping) print(f"Index {OPENSEARCH_INDEX} created successfully") except Exception as e: print(f"Error creating index {OPENSEARCH_INDEX}: {e}") raise e elif OPENSEARCH_INDEX and index_exists: # Connect to an existing index with the specified name try: print(f"Connected to index {OPENSEARCH_INDEX} successfully") except Exception as e: print(f"Error connecting to index {OPENSEARCH_INDEX}: {e}") raise e
_upsert実装
続いて、_upsert
です。_upsert
メソッドを呼ぶ前に、ドキュメントをchunkに分けてembeddingを生成しているので、それをOpenSearchにbulk insertします。bulk insertの細かい設定はドキュメントをご覧ください。
@retry(wait=wait_random_exponential(min=1, max=20), stop=stop_after_attempt(3)) async def _upsert(self, chunks: Dict[str, List[DocumentChunk]]) -> List[str]: """ Takes in a dict from document id to list of document chunks and inserts them into the index. Return a list of document ids. """ # Initialize a list of ids to return doc_ids: List[str] = [] # Initialize a list of vectors to upsert index_actions = [] # Loop through the dict items for doc_id, chunk_list in chunks.items(): # Append the id to the ids list doc_ids.append(doc_id) print(f"Upserting document_id: {doc_id}") for chunk in chunk_list: print(f"chunk: {chunk.id}") print(chunk.text) index_action = { "_id": f"{doc_id}-{chunk.id}", "_op_type": "update", "doc_as_upsert": True, "doc": { "chunk_id": chunk.id, "text": chunk.text, "text_vector": chunk.embedding, "source": chunk.metadata.source, "source_id": chunk.metadata.source_id, "url": chunk.metadata.url, "created_at": chunk.metadata.created_at, "author": chunk.metadata.author }, } index_actions.append(index_action) try: bulk(es, index_actions, index=OPENSEARCH_INDEX, raise_on_error=False) except Exception as e: print(f"Error upserting batch: {e}") raise e return doc_ids
_query実装
そして_query
メソッドを実装します。ベクトル検索の結果をQueryResultにつめて返却します。
@retry(wait=wait_random_exponential(min=1, max=20), stop=stop_after_attempt(3)) async def _query( self, queries: List[QueryWithEmbedding], ) -> List[QueryResult]: """ Takes in a list of queries with embeddings and filters and returns a list of query results with matching document chunks and scores. """ # Define a helper coroutine that performs a single query and returns a QueryResult async def _single_query(query: QueryWithEmbedding) -> QueryResult: print(f"Query: {query.query}") q = { "query": { "knn": { "text_vector": { "vector": query.embedding, "k": query.top_k } } } } try: # Query the index with the query embedding, filter, and top_k query_response = es.search(index=OPENSEARCH_INDEX, body=json.dumps(q)) except Exception as e: print(f"Error querying index: {e}") raise e query_results: List[DocumentChunkWithScore] = [] for result in query_response["hits"]["hits"]: score = result["_score"] metadata = result["_source"] # Remove document id and text from metadata and store it in a new variable metadata_without_text = ( {key: value for key, value in metadata.items() if key != "text"} if metadata else None ) # If the source is not a valid Source in the Source enum, set it to None if ( metadata_without_text and "source" in metadata_without_text and metadata_without_text["source"] not in Source.__members__ ): metadata_without_text["source"] = None # Create a document chunk with score object with the result data result = DocumentChunkWithScore( id=result["_id"], score=score, text=metadata["text"] if metadata and "text" in metadata else None, metadata=metadata_without_text, ) query_results.append(result) return QueryResult(query=query.query, results=query_results) # Use asyncio.gather to run multiple _single_query coroutines concurrently and collect their results results: List[QueryResult] = await asyncio.gather( *[_single_query(query) for query in queries] ) return results
delete実装
そして最後にdelete
を実装します。
@retry(wait=wait_random_exponential(min=1, max=20), stop=stop_after_attempt(3)) async def delete( self, ids: Optional[List[str]] = None, filter: Optional[DocumentMetadataFilter] = None, delete_all: Optional[bool] = None, ) -> bool: """ Removes vectors by ids, filter, or everything from the index. """ # Delete all vectors from the index if delete_all is True if delete_all == True: try: print(f"Deleting all vectors from index") es.delete_by_query(index=OPENSEARCH_INDEX, body={"query": {"match_all": {}}}) print(f"Deleted all vectors successfully") return True except Exception as e: print(f"Error deleting all vectors: {e}") raise e # Delete vectors that match the document ids from the index if the ids list is not empty if ids != None and len(ids) > 0: try: print(f"Deleting vectors with ids {ids}") for document_id in ids: es.delete(index=OPENSEARCH_INDEX, id=document_id) # type: ignore print(f"Deleted vectors with ids successfully") except Exception as e: print(f"Error deleting vectors with ids: {e}") raise e return True
_delete
は下記のコードのようにDataStore
クラスのupsert
で呼ばれるのでupsert時にも必要な処理です。ここはupsertを直接的にサポートしているベクトル検索エンジンにとっては不要な処理なので、今後改善余地がある箇所だと思います(PRチャンスか?)。
class DataStore(ABC): async def upsert( self, documents: List[Document], chunk_token_size: Optional[int] = None ) -> List[str]: """ Takes in a list of documents and inserts them into the database. First deletes all the existing vectors with the document id (if necessary, depends on the vector db), then inserts the new ones. Return a list of document ids. """ # Delete any existing vectors for documents with the input document ids await asyncio.gather( *[ self.delete( filter=DocumentMetadataFilter( document_id=document.id, ), delete_all=False, ) for document in documents if document.id ] ) chunks = get_document_chunks(documents, chunk_token_size) return await self._upsert(chunks)
Provider動作確認
これでOpenSearch Providerの実装が完了しました。最後に環境変数DATASTORE
にopensearch
が指定されたときにこのProviderを使うようにdatastore/factory.py
に条件を追加します。
def get_datastore() -> DataStore: datastore = os.environ.get("DATASTORE") assert datastore is not None match datastore: # ... case "opensearch": from datastore.providers.opensearch_datastore import OpenSearchDataStore return OpenSearchDataStore() case _: raise ValueError(f"Unsupported vector database: {datastore}")
全ての準備が整いました。最初に動作確認用に作ったスクリプトを実行すれば同じような結果が取得できるはずです。これでOpenSearch Providerの実装ができました。
Elasticsearch Providerは作れるか
実は最初は自分が1番慣れ親しんだElasticsearchで試してみようと思ったのですが、現在ElasticsearchはChatGPT Retrieval Pluginでは利用できないことが分かりました。text-embedding-ada-002が出力する次元数が1536
で、Elasticsearchが内部で利用しているLuceneがサポートする次元数の最大値が1024
なので、ElasticsearchをProviderとして利用できません。
次元数の制限を増やす議論がされているので、将来的にはこの制限がなくなるかもしれません。
日本語でこの辺の動向をまとめてくれている記事もあります。
まとめ
今回はChatGPT Retrieval Pluginがサポートしているベクトル検索エンジンではなく、AWSのOpenSearch Providerを実装してChatGPTにベクトル検索を提供する方法を試してみました。
今回の実装を通して、ChatGPT Retrieval PluginのProviderとして利用できる条件としては下記あります。
- 1536次元のベクトルを扱える
- upsert/search/deleteが行える
上記を満たすベクトル検索エンジンならDataStore抽象クラスを実装するだけで対応できます。ValdやFaissなんかのProviderも実装できますね。ベクトル検索エンジン実装者や、自分のお気に入りのベクトル検索エンジンがある方は是非試してみてください。
今回の実装はもう少し綺麗にして隙があればPRを投げる予定です(OpenSearchを使ったことがないので、これで良いか確認する必要あり)。
弊社でもOpenAIを活用していく流れが来ているので楽しみです。
We're hiring!
弊社では情報検索や機械学習の力で医療を前進させるメンバーを募集中です。少しでも興味があれば1on1しましょう!