
あなたが作ったRAG環境、思ったよりも検索結果が的外れで「これじゃ使えない」と感じたことはありませんか?
pgvectorにデータを入れても、肝心の会話で意図通りに参照されず、まるで無関係な情報を答えてしまう。あるいは、せっかく検索された情報がプロンプトにうまく組み込まれず、AIが曖昧にごまかす返答を返す…。
これは多くの人が最初に直面する壁です。では、なぜ精度が出ないのか?原因は「Embeddingの扱い」「検索の閾値」「プロンプト設計」の3つにあります。
ベクトル検索は強力ですが、正しくチューニングしなければただのノイズになりますし、プロンプトが曖昧だとAIの応答も揺らぎます。
本記事では、検索精度を引き上げる工夫と、安定した回答を引き出すためのプロンプト設計を具体的に解説していきます。
個人ローカルAI環境
🔴 個人ローカルAI環境
📌 LMStudioとpgvectorで実現するパーソナルRAG環境
└─Mac miniをAIサーバーに!LMStudioとpgvectorで作る個人ローカルAI環境
├─PostgreSQL16+pgvector導入手順|個人環境にベクトルDBを構築する方法
├─Embeddingでテキストをベクトル化|pgvectorに保存して検索可能にする手順
├─Discord Botで作るRAG環境|pgvectorとLMStudioを活用した会話システム実装
├─LMStudioをAPIサーバーとして利用|Embedding・GPTモデルを呼び出す仕組みを構築
├─RAG環境の検索精度を高める!プロンプト設計と改善テクニック
├─探しても友達は増えない。ならAIで作っちまえ!Mac miniで個人ローカルAI環境を構築
└─個人ローカルAI環境を持ち歩く|iPad miniからDiscord経由で相談する新しいワークスタイル
RAG環境で直面する課題
RAG環境を構築すると、基本的な動作はすぐに実現できます。しかし実際の運用段階になると「検索精度が思ったように出ない」「プロンプトの回答が安定しない」といった課題に直面することが多いです。
これらは技術的な要因が複雑に絡み合っており、コード設計やパラメータ調整の工夫が求められます。
ここでは代表的な問題点を整理します。
検索精度が出ない典型的な問題
RAG環境では、ベクトル検索の精度が低いと回答全体の品質が大きく損なわれます。
例えば、チャンクを大きくしすぎると文脈がぼやけ、逆に小さくしすぎると関連情報が失われます。また、Embeddingモデルの次元数や学習データの性質によっても結果が変わり、特定の質問に対して全く関係のない文章がヒットしてしまうケースもあります。
さらに、類似度スコアのしきい値を適切に設定しないと、ノイズだらけの結果を返してしまうこともあります。これらは「RAGは動くが使い物にならない」と感じさせる典型的な要因です。
プロンプトが不安定になる原因
検索精度の問題に加えて、プロンプト設計が不十分だと応答が安定しません。
会話履歴を適切に組み込めていない場合、AIが直前の文脈を無視してしまい、矛盾した返答をすることがあります。また、役割を明示していないとAIが余計な解説を加えたり、ユーザーの意図を誤解する可能性が高まります。
さらに、長文応答を制御する仕組みがなければ、出力が途中で途切れたり、Discordの文字数制限を超えてエラーになることもあります。これらの要因は、コードレベルでの設計不足やプロンプトの構成不備によって引き起こされるものであり、検索精度と並んで改善の優先度が高い課題です。
bot.py の全体ソース
bot.py は、ユーザーからの入力を受け取り、Embedding生成・ベクトル検索・プロンプト構築・応答出力までを担う中核スクリプトです。RAG環境の検索精度やプロンプト設計は、このファイルの処理に大きく依存しています。ここではまずソースを掲載し、その後に重要要素を抜粋して解説します。
実行環境のディレクトリ構成
RAG環境は /opt/gpt-oss/ 配下に配置されています。
以下のようなディレクトリツリー構成で管理することで、役割ごとに整理された形で実行できます。
/opt/gpt-oss/
├─ bin/
│ └─ llama-server
├─ bot/
│ ├─ bot.py
│ ├─ .env
│ └─ modules/
│ └─ db_utils.py
└─ models/
├─ gpt-oss-20b-MXFP4.gguf <- 未使用(LMSTUDIOのGUI版で起動)
└─ mxbai-embed-large-v1/
└─ gguf/
└─ mxbai-embed-large-v1-f16.gguf
また、RAG環境を安定して動作させるには、モデルの実行、Botの制御、データベース管理がそれぞれ明確に分かれた構成が必要です。
全体像としては以下のように整理できます。
ディレクトリ / ファイル(/opt/gpt-oss) | 役割 |
---|---|
bin/llama-server | LLMサーバー本体。モデルを読み込みAPIとして提供します。 |
bot/bot.py | Discord Bot スクリプト。ユーザー入力を処理し、Embedding生成やDB検索を実行します。 |
bot/.env | 環境変数設定ファイル。DiscordトークンやDB接続情報を管理します。 |
bot/modules/db_utils.py | データベース処理モジュール。pgvectorを利用した会話検索や履歴保存を行います。 |
models/gpt-oss-20b-MXFP4.gguf | 大規模言語モデル(GPT-OSS-20B)のファイル。推論用に使用します。 |
models/mxbai-embed-large-v1/ gguf/mxbai-embed-large-v1-f16.gguf | Embeddingモデルファイル。テキストをベクトル化して検索可能にします。 |
ソース全文掲載
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 |
import requests import json from dotenv import load_dotenv import discord from modules import db_utils import os import re # .env から環境変数を読み込み load_dotenv() TOKEN = os.getenv("DISCORD_BOT_TOKEN") # DiscordのIntent設定(メッセージ本文を受け取るために必要) intents = discord.Intents.default() intents.message_content = True bot = discord.Client(intents=intents) # ------------------------------------------------------------------ # 関数名 :get_embedding # 概要 :8081 (埋め込みモデル) にテキストを投げてベクトルを取得 # ------------------------------------------------------------------ def get_embedding(text: str) -> list[float]: url = "http://localhost:8081/v1/embeddings" headers = {"Content-Type": "application/json"} payload = {"input": text} try: r = requests.post(url, headers=headers, data=json.dumps(payload)) r.raise_for_status() data = r.json() return data["data"][0]["embedding"] # 1024次元ベクトル except Exception as e: print(f"[ERROR] 埋め込み生成失敗: {e}") return [0.0] * 1024 # フォールバック(ゼロベクトル) # ------------------------------------------------------------------ # 関数名 :enforce_japanese # 概要 :日本語以外の不要な英語説明を削除(コマンドやコードは残す) # ------------------------------------------------------------------ def enforce_japanese(text: str) -> str: blocks = text.split("\n\n") # 段落ごとに分ける filtered = [] for block in blocks: # 日本語を含むなら残す if re.search(r'[ぁ-んァ-ン一-龥]', block): filtered.append(block) continue # コードブロック or コマンド行なら残す if block.strip().startswith("```") or "pg_dump" in block or "pg_basebackup" in block: filtered.append(block) continue # それ以外の英語段落は削除 continue return "\n\n".join(filtered) # ------------------------------------------------------------------ # 関数名 :clean_response # 概要 :モデルの応答から余計なトークンを削除して整形 # ------------------------------------------------------------------ def clean_response(text: str) -> str: if not text: return "[ERROR] 応答なし" if "<|channel|>analysis<|message|>" in text: text = text.split("<|channel|>analysis<|message|>")[-1] if "<|channel|>final<|message|>" in text: text = text.split("<|channel|>final<|message|>")[-1] return ( text.replace("<s>", "") .replace("</s>", "") .replace("[/s]", "") .replace("[BOT]", "") .replace("[INST]", "") .replace("[/INST]", "") .replace("[Assistant]", "") .replace("[/Assistant]", "") .replace("[SYS]", "") .replace("[/SYS]", "") .strip() ) # ------------------------------------------------------------------ # 関数名 :sanitize_markdown # 概要 :Discordで崩れるMarkdown記号を置換(コードブロックは残す) # ------------------------------------------------------------------ def sanitize_markdown(text: str) -> str: if not text: return text # コメントの # だけ無害化 text = text.replace("##", "--") text = text.replace("#", "#") return text # ------------------------------------------------------------------ # 関数名 :call_gptoss # 概要 :GPT-OSS (llama-server) の /v1/completions を利用 # ------------------------------------------------------------------ def call_gptoss(prompt: str) -> str: url = "http://127.0.0.1:1234/v1/completions" headers = {"Content-Type": "application/json"} payload = { "model": "openai/gpt-oss-20b", "prompt": prompt, "temperature": 0.3, "max_tokens": 1024, "stream": False # ★非ストリーミング } try: r = requests.post(url, headers=headers, data=json.dumps(payload)) r.raise_for_status() result = r.json() reply = result["choices"][0]["text"] return sanitize_markdown(clean_response(reply)) except Exception as e: return f"[ERROR] GPT-OSS呼び出し失敗: {e}" # ------------------------------------------------------------------ # 関数名 :split_message # 概要 :Discord送信の2000文字制限に対応して分割 # ------------------------------------------------------------------ def split_message(text: str, limit: int = 1900) -> list[str]: return [text[i:i+limit] for i in range(0, len(text), limit)] # ------------------------------------------------------------------ # Bot起動時イベント # ------------------------------------------------------------------ @bot.event async def on_ready(): print(f"Bot起動完了: {bot.user}") # ------------------------------------------------------------------ # 関数名 :chunk_text # 概要 :長文を安全に分割(オーバーラップ付き) # ------------------------------------------------------------------ def chunk_text(text: str, chunk_size: int = 300, overlap: int = 50) -> list[str]: """ テキストをチャンク分割する(RAG用) - chunk_size: 1チャンクあたりの文字数 - overlap : チャンク間の重複文字数 """ chunks = [] start = 0 while start < len(text): end = min(start + chunk_size, len(text)) chunks.append(text[start:end]) start += chunk_size - overlap return chunks # ------------------------------------------------------------------ # 関数名 :count_tokens # 概要 :テキストのトークン数を概算する # 説明 : # OpenAI系モデルをはじめとする多くのLLMでは、 # 平均すると1トークン ≒ 4文字程度で分割される。 # この関数では文字数を4で割った値を返すことで、 # 簡易的なトークン数見積もりを実現する。 # 厳密にトークン数を求める場合は tiktoken 等の # 専用ライブラリを利用する必要がある。 # # 引数 :text(str) - 評価対象のテキスト # 戻り値 :int - 概算したトークン数 # 使用箇所 :類似度検索後の履歴制御処理 # ------------------------------------------------------------------ def count_tokens(text: str) -> int: """ 簡易的なトークン数見積もり関数。 英語・日本語が混ざったテキストの場合、OpenAI系モデルでは 平均すると1トークン ≒ 4文字程度になることが多いため、 文字数を4で割った値をトークン数として概算する。 厳密なトークン数は tiktoken 等の専用ライブラリを利用すべきだが、 過剰入力を避ける用途であればこの概算で十分機能する。 """ # テキスト全体の文字数を取得し、4で割ることで概算トークン数を返す return len(text) // 4 # ------------------------------------------------------------------ # メッセージ受信イベント # ------------------------------------------------------------------ @bot.event async def on_message(message): if message.author.bot: return # メンション時のみ応答 if bot.user not in message.mentions: return user_input = message.content.replace(f"<@{bot.user.id}>", "").strip() if not user_input: return # ====== DB保存処理に必要なID類を先に取得 ====== user_id = db_utils.get_or_create_user(message.author) guild_id = db_utils.get_or_create_guild(message.guild) channel_id = db_utils.get_or_create_channel(message.channel, guild_id) session_id = db_utils.get_or_create_session(channel_id, guild_id) # ====== 「続きを」判定 ====== if any(word in user_input for word in ["続きを", "続き", "続けて", "もっと"]): last_reply = db_utils.get_last_assistant_reply(session_id) if not last_reply: await message.channel.send("直前の応答が見つかりません。") return prompt = f""" あなたは会話のアシスタントです。 以下はあなたが直前に出力した応答です。この続きとして、文脈を自然に繋げて続きを書いてください。 文章は重複せず、前の出力の流れを引き継いでください。 [直前の応答] {last_reply} [続きを生成] ### Assistant: """ reply = call_gptoss(prompt) if not reply or reply.strip() == "": reply = "[ERROR] GPT-OSSから応答を取得できませんでした。" assistant_conv_id = db_utils.insert_conversation( session_id, user_id, "assistant", reply, reply_to=None ) for idx, chunk in enumerate(chunk_text(reply, chunk_size=500, overlap=50)): emb = get_embedding(chunk) db_utils.insert_embedding(assistant_conv_id, idx, emb) for chunk in split_message(reply): await message.channel.send(chunk) return # ====== DB保存処理(ユーザー発言:全文1レコード+チャンクembedding)====== user_conv_id = db_utils.insert_conversation(session_id, user_id, "user", user_input, None) for idx, chunk in enumerate(chunk_text(user_input, chunk_size=500, overlap=50)): emb = get_embedding(chunk) db_utils.insert_embedding(user_conv_id, idx, emb) # ====== 類似履歴検索 ====== results = db_utils.search_conversations(user_input, session_id=session_id, limit=100) context_chunks = [] token_count = 0 max_tokens = 1000 min_score = 0.75 # 類似度のしきい値 for user_msg, assistant_msgs, score in results: if score < min_score: continue # 類似度が低いものはスキップ print(f"DEBUG: score={score:.3f}, user={user_msg[:30]}") pair_text = f"### User: {user_msg}\n" for a in assistant_msgs: pair_text += f"### Assistant: {a}\n" tokens = count_tokens(pair_text) if token_count + tokens > max_tokens: break # 上限を超えたら終了 context_chunks.append(pair_text) token_count += tokens context = "\n".join(context_chunks) # ====== GPT-OSS呼び出し ====== prompt = f"""あなたは会話履歴を必ず参照して一貫性のある応答を返すアシスタントです。 以下のルールを厳格に守ってください。 1. 会話履歴に関連情報がある場合は、それを最優先してください。 2. 会話履歴に関連情報が無い場合は、あなたの知識を使って推測してください。 3. 会話履歴と知識が矛盾した場合は、会話履歴を優先してください。 [会話履歴] {context} [新しい質問] ### User: {user_input} [応答] ### Assistant:""" print("===== PROMPT 実際の送信内容 =====") print(prompt) print("================================") reply = call_gptoss(prompt) if reply.startswith("[ERROR]"): await message.channel.send("内部エラーが発生しました。") return if not reply or reply.strip() == "": reply = "[ERROR] GPT-OSSから応答を取得できませんでした。" assistant_conv_id = db_utils.insert_conversation( session_id, user_id, "assistant", reply, reply_to=user_conv_id ) # ====== Discord返信 ====== for chunk in split_message(reply): await message.channel.send(chunk) # ★ これがないとプログラムは即終了してしまう bot.run(TOKEN) |
重要要素の解説
bot.py 内には検索精度やプロンプト安定性に直結する関数が複数存在します。以下に代表的な処理を抜粋しながら解説します。
chunk_text() によるチャンク分割
テキストをそのままEmbedding化すると長すぎて検索精度が低下するため、chunk_text() で適度な長さに分割します。分割幅とオーバーラップを調整することで、関連文脈を保持したまま効率的に検索できるようになります。
def chunk_text(text: str, chunk_size: int = 300, overlap: int = 50) -> list[str]:
chunks = []
start = 0
while start < len(text):
end = min(start + chunk_size, len(text))
chunks.append(text[start:end])
start += chunk_size - overlap
return chunks
このようにオーバーラップを持たせることで文脈の途切れを防ぎ、精度を安定させます。
prompt 生成部の構成要素
ユーザー入力と検索結果を組み合わせ、AIに渡すプロンプトを構築します。プロンプトの設計によって応答の一貫性が左右されるため、非常に重要な部分です。
prompt = f"""
あなたは会話履歴を必ず参照して一貫性のある応答を返すアシスタントです。
以下のルールを厳格に守ってください。
1. 会話履歴に関連情報がある場合は、それを最優先してください。
2. 会話履歴に関連情報が無い場合は、あなたの知識を使って推測してください。
3. 会話履歴と知識が矛盾した場合は、会話履歴を優先してください。
[会話履歴]
{context}
[新しい質問]
User: {user_input}
[応答]
Assistant:
"""
このように「役割付与」「履歴の挿入」「新しい質問」「回答開始トリガー」という4要素で構成され、安定した回答を得る仕組みになっています。
split_message() による長文分割送信
Discordには1メッセージ2000文字という制限があるため、応答を分割して送信する関数が実装されています。これにより長文でも欠落なくユーザーに届けられます。
def split_message(text: str, limit: int = 1900) -> list[str]:
return [text[i:i+limit] for i in range(0, len(text), limit)]
長文の途切れを防ぎ、運用上の安定性を確保します。
call_gptoss() によるモデル呼び出し
LMStudioで稼働するGPT-OSS-20Bを呼び出し、生成応答を取得する関数です。temperatureやmax_tokensを指定することで応答の多様性や長さを制御できます。
def call_gptoss(prompt: str) -> str:
url = "http://127.0.0.1:1234/v1/completions"
headers = {"Content-Type": "application/json"}
payload = {
"model": "openai/gpt-oss-20b",
"prompt": prompt,
"temperature": 0.3,
"max_tokens": 1024,
"stream": False
}
…(省略)
ここでのパラメータ設定が不適切だと、冗長な回答や途切れた応答が返ってくるため、実運用における調整が求められます。
db_utils.py の全体ソース
db_utils.py は、会話履歴やEmbeddingをデータベースに保存し、pgvectorを利用して類似度検索を行うための重要なモジュールです。検索精度を高めるための閾値設定や、会話ログを適切に保存する仕組みが実装されており、RAG環境全体の安定性を支える役割を果たしています。ここではソースを提示した上で、検索精度に関わる重要な関数を抜粋して解説します。
ソース全文掲載
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 |
import os import psycopg2 import requests from dotenv import load_dotenv from modules.logger import get_logger load_dotenv(dotenv_path="/opt/gpt-oss/bot/.env") # .env 読み込み load_dotenv() logger = get_logger(__name__) # ------------------------------------------------------------------ # DB接続 # ------------------------------------------------------------------ def get_connection(): print("DEBUG: PG_ADDR=", os.getenv("PG_ADDR")) print("DEBUG: PG_PORT=", os.getenv("PG_PORT")) print("DEBUG: PG_DB=", os.getenv("PG_DB")) print("DEBUG: PG_USER=", os.getenv("PG_USER")) return psycopg2.connect( hostaddr=os.getenv("PG_ADDR"), port=int(os.getenv("PG_PORT")), dbname=os.getenv("PG_DB"), user=os.getenv("PG_USER"), password=os.getenv("PG_PASS") ) # ------------------------------------------------------------------ # 埋め込み生成 # ------------------------------------------------------------------ def get_embedding(text: str): url = "http://127.0.0.1:8081/v1/embeddings" payload = {"model": "mxbai-embed-large-v1-f16", "input": text} try: res = requests.post(url, json=payload) res.raise_for_status() data = res.json() emb = data["data"][0]["embedding"] return emb except Exception as e: logger.error(f"埋め込み生成失敗: {e}") return None # ------------------------------------------------------------------ # users テーブル # ------------------------------------------------------------------ def get_or_create_user(author): conn = get_connection() cur = conn.cursor() discord_id = str(author.id) username = str(author) # discord_id で検索 cur.execute("SELECT user_id FROM users WHERE discord_id = %s", (discord_id,)) row = cur.fetchone() if row: user_id = row[0] else: cur.execute( "INSERT INTO users (discord_id, username) VALUES (%s, %s) RETURNING user_id", (discord_id, username), ) user_id = cur.fetchone()[0] conn.commit() cur.close() conn.close() return user_id # ------------------------------------------------------------------ # guild テーブル # ------------------------------------------------------------------ def get_or_create_guild(guild): # データベース接続を確立 conn = get_connection() try: with conn.cursor() as cur: # 既存の guild_id があるか検索 cur.execute("SELECT guild_id FROM guilds WHERE guild_id = %s", (str(guild.id),)) row = cur.fetchone() if row: # すでに存在する場合はその guild_id を返す return row[0] # 存在しない場合は新規に guild を登録 cur.execute( "INSERT INTO guilds (guild_id, name) VALUES (%s, %s) RETURNING guild_id", (str(guild.id), str(guild.name)) ) # 新規登録した guild_id を取得 new_id = cur.fetchone()[0] # コミットして確定 conn.commit() return new_id finally: # 接続をクローズ conn.close() # ------------------------------------------------------------------ # channels テーブル # ------------------------------------------------------------------ def get_or_create_channel(channel, guild_id): # データベース接続を確立 conn = get_connection() try: with conn.cursor() as cur: # 同じ guild_id 内で channel_id が登録済みか確認 cur.execute( "SELECT channel_id FROM channels WHERE guild_id = %s AND discord_id = %s", (guild_id, str(channel.id)) ) row = cur.fetchone() if row: # すでに存在すれば channel_id を返す return row[0] # 存在しなければ新規に登録 cur.execute( "INSERT INTO channels (guild_id, discord_id, name) VALUES (%s, %s, %s) RETURNING channel_id", (guild_id, str(channel.id), str(channel)) ) # 登録した channel_id を取得 channel_id = cur.fetchone()[0] # コミットして確定 conn.commit() return channel_id finally: # 接続をクローズ conn.close() # ------------------------------------------------------------------ # sessions テーブル # ------------------------------------------------------------------ def get_or_create_session(channel_id, guild_id): # データベース接続を確立 conn = get_connection() cur = conn.cursor() # 既存セッションを channel_id で検索 cur.execute("SELECT session_id FROM sessions WHERE channel_id = %s", (channel_id,)) row = cur.fetchone() if row: # 既存セッションがあればその ID を返す session_id = row[0] else: # セッションが無ければ guild_id とともに新規作成 # started_at は NOW() で現在時刻を登録 cur.execute( "INSERT INTO sessions (channel_id, guild_id, started_at) VALUES (%s, %s, NOW()) RETURNING session_id", (channel_id, guild_id) ) # 作成したセッションIDを取得 session_id = cur.fetchone()[0] # 変更をコミット conn.commit() logger.info(f"セッション登録: channel_id={channel_id}, guild_id={guild_id}, session_id={session_id}") # リソース解放 cur.close() conn.close() # セッションIDを返す return session_id # ------------------------------------------------------------------ # conversations テーブル # ------------------------------------------------------------------ def insert_conversation(session_id, user_id, role, message, reply_to=None): with get_connection() as conn: with conn.cursor() as cur: # assistant の場合は直前の user 発話を reply_to に設定 if role == "assistant" and reply_to is None: cur.execute( """ SELECT conversation_id FROM conversations WHERE session_id = %s AND role = 'user' ORDER BY created_at DESC LIMIT 1 """, (session_id,) ) row = cur.fetchone() if row: reply_to = row[0] # embedding は削除済みなので不要 cur.execute( """ INSERT INTO conversations (session_id, user_id, role, message, reply_to) VALUES (%s, %s, %s, %s, %s) RETURNING conversation_id """, (session_id, user_id, role, message, reply_to) ) return cur.fetchone()[0] # ------------------------------------------------------------------ # 類似会話検索(QAペア対応) # ------------------------------------------------------------------ def search_conversations(query: str, session_id: int, limit: int = 100, min_score: float = 0.75) -> list[tuple[str, list[str], float]]: """ 類似する user 発話を検索し、その応答(複数可)も一緒に返す 戻り値: [(user_message, [assistant_message1, ...], score), ...] """ query_emb = get_embedding(query) # 1024次元 with get_connection() as conn: with conn.cursor() as cur: # 類似度(1 - 距離)を計算してスコアとして返す cur.execute( """ SELECT c.conversation_id, c.message, 1 - (e.embedding <-> %s::vector) AS score FROM conversation_embeddings e JOIN conversations c ON c.conversation_id = e.conversation_id WHERE c.session_id = %s AND c.role = 'user' ORDER BY e.embedding <-> %s::vector LIMIT %s """, (query_emb, session_id, query_emb, limit) ) user_rows = cur.fetchall() results = [] for conv_id, user_msg, score in user_rows: # 閾値チェック if score < min_score: continue # この user に紐づく assistant 応答をすべて取得 cur.execute( """ SELECT message FROM conversations WHERE reply_to = %s AND role = 'assistant' ORDER BY created_at ASC """, (conv_id,) ) assistant_msgs = [row[0] for row in cur.fetchall()] results.append((user_msg, assistant_msgs, float(score))) # DEBUG 出力 print("=== DEBUG: search_conversations results ===") for u, a, s in results: print(f"SCORE={s:.3f} | USER={u[:30]}... | ASSIST={a[:1]}") return results # ------------------------------------------------------------------ # conversation_embeddings テーブル # ------------------------------------------------------------------ def insert_embedding(conversation_id: int, chunk_index: int, embedding: list[float]): with get_connection() as conn: with conn.cursor() as cur: cur.execute( """ INSERT INTO conversation_embeddings (conversation_id, chunk_index, embedding) VALUES (%s, %s, %s) RETURNING embedding_id """, (conversation_id, chunk_index, embedding) ) return cur.fetchone()[0] # ------------------------------------------------------------------ # 関数名 :get_last_assistant_reply # 概要 :指定されたセッションの直近のAI応答を取得する # 引数 :session_id(int) - セッションID # 戻り値 :str - 直近のAI応答テキスト(存在しない場合は None) # 使用箇所 :on_message(続きを生成する際に利用) # ------------------------------------------------------------------ def get_last_assistant_reply(session_id: int) -> str | None: conn = get_connection() cur = conn.cursor() try: cur.execute(""" SELECT message FROM conversations WHERE session_id = %s AND role = 'assistant' ORDER BY created_at DESC LIMIT 1 """, (session_id,)) row = cur.fetchone() return row[0] if row else None finally: cur.close() conn.close() |
重要要素の解説
db_utils.py には、ベクトル検索や会話履歴の保存を行う複数の関数が含まれています。以下では検索精度やプロンプト設計に直結する主要な処理を解説します。
search_conversations() と類似度検索
ユーザー入力をEmbedding化し、保存済み会話との類似度を計算して上位の文脈を取得する関数です。ここで適切に文脈を返せるかどうかが、AI応答の品質を左右します。
def search_conversations(query: str, session_id: int, limit: int = 100, min_score: float = 0.75):
query_emb = get_embedding(query)
cur.execute("""
SELECT user_message, bot_messages, embedding <-> %s AS score
FROM conversations
WHERE session_id = %s
ORDER BY embedding <-> %s
LIMIT %s
""", (query_emb, session_id, query_emb, limit))
rows = cur.fetchall()
この処理により類似度スコアを基準とした検索が可能となり、RAGの文脈参照が実現します。
min_score による閾値設定
検索結果の中から、関連性の低いものを除外するために閾値を設けています。これによりノイズを含まない精度の高い情報だけを残すことができます。
results = [] for row in rows:
if row[2] < min_score:
continue
results.append(row)
min_score の値を調整することで、情報の網羅性と精度のバランスを最適化できます。
insert_conversation() と会話の保存
ユーザーの発話とAIの応答をデータベースに保存する関数です。これにより次回以降の検索時に文脈が利用でき、連続性のある対話を実現します。
def insert_conversation(session_id: int, user_id: str, role: str, message: str, emb: list[float]):
cur.execute("""
INSERT INTO conversations (session_id, user_id, role, message, embedding)
VALUES (%s, %s, %s, %s, %s)
""", (session_id, user_id, role, message, emb))
conn.commit()
履歴が正しく保存されることで、過去の発話を踏まえた回答が可能になります。
insert_embedding() によるベクトル登録
EmbeddingをベクトルDBに保存する処理です。これにより、後続の検索処理で高精度の類似度検索が可能になります。
def insert_embedding(text: str, emb: list[float]):
cur.execute("""
INSERT INTO embeddings (text, embedding) VALUES (%s, %s)
""", (text, emb))
conn.commit()
ベクトルが正しく登録されなければ検索の土台が崩れるため、RAG環境において不可欠な処理です。
改善の実践例
RAG環境を安定して運用するためには、ソースコードをそのまま利用するだけでは不十分です。検索精度やプロンプト設計をさらに改善するための工夫を取り入れることで、応答の信頼性を大きく向上させることができます。ここでは具体的な改善例を紹介します。
チャンクサイズとオーバーラップの調整
チャンクのサイズ設定が適切でないと、検索結果が不正確になります。大きすぎるチャンクは文脈がぼやけ、小さすぎるチャンクは情報が分断されます。オーバーラップを持たせることで文脈を維持しながら検索できるようになります。
def chunk_text(text: str, chunk_size: int = 300, overlap: int = 50) -> list[str]:
chunks = []
start = 0
while start < len(text):
end = min(start + chunk_size, len(text))
chunks.append(text[start:end])
start += chunk_size - overlap
return chunks
チャンクサイズやオーバーラップを調整することで、文脈の途切れを防ぎながら検索精度を向上させることができます。
類似度スコアのしきい値を調整する工夫
検索で取得するデータの精度は、類似度スコアのしきい値によって大きく変わります。スコアを低く設定するとノイズが増え、高く設定すると関連情報が不足します。運用環境に合わせて適切に設定することが重要です。
results = [] for row in rows:
if row[2] < 0.75:
continue
results.append(row)
このように閾値を設けることで、関連性の低い結果を除外し、安定した検索結果が得られます。
類似度検索のチューニング
RAGの運用において、過去に登録した情報が検索でうまく拾えないケースは珍しくありません。たとえば「Mac mini (M4)」と登録してあっても、「スペックを教えて」と質問すると関連度が低いと判定され、結果として検索から漏れてしまうことがあります。これはpgvectorが単語や表現の近さをベースに類似度を算出する仕組みだからです。
この問題を改善する方法はいくつかあります。まず有効なのは、検索時に利用するLIMITとmin_scoreの調整です。たとえば上位10件だけを返すのではなく、上位100件を対象にした上でスコア閾値を下げれば、多少関連度が低くても情報を拾える確率が高まります。
# ====== 類似履歴検索 ======
results = db_utils.search_conversations(user_input, session_id=session_id, limit=100)
context_chunks = []
token_count = 0
max_tokens = 1000
min_score = 0.75 # 類似度のしきい値
さらに、保存の仕方を工夫することも効果的です。単純なテキスト保存ではなく「質問と回答のペア」として登録すれば、「スペックを教えて」という入力と「マシンスペック」の回答が自然に結びつきます。また、表やリストを扱う場合はJSON形式などの構造化データで保存しておくと、検索の精度が大幅に改善されます。
補足として、LIMITを大きく設定すれば取りこぼしは減りますが、そのまま大量の候補をLLMに渡してしまうと処理が遅くなり、応答の品質も下がる可能性があります。そこで実運用では、まずLIMITを広めに設定して候補を取得し、その中から類似度スコアの閾値を基準に再度絞り込む方法が有効です。最終的にLLMに渡す件数を3〜10件程度に抑えることで、拾える情報量と処理速度のバランスを両立できます。
for user_msg, assistant_msgs, score in results: if score < min_score: continue # 類似度が低いものはスキップ print(f"DEBUG: score={score:.3f}, user={user_msg[:30]}") pair_text = f"### User: {user_msg}\n" for a in assistant_msgs: pair_text += f"### Assistant: {a}\n" tokens = count_tokens(pair_text) if token_count + tokens > max_tokens: break # 上限を超えたら終了 context_chunks.append(pair_text) token_count += tokens
検索結果取得の流れ
- LIMIT=100 → 広く拾う
- if score < min_score: continue → 類似度が低いものを除外
- 必要なら token_count で上限を制御
こうしたチューニングを組み合わせることで、「似ていない言い回しでも情報を拾える」「必要な情報を確実に返す」仕組みに近づけることが可能です。RAGを安定運用するには、類似度検索そのものの設計を意識的に改善していくことが不可欠です。
プロンプト設計の改良ポイント
プロンプトの構成を工夫することで応答の一貫性が高まります。役割付与や履歴の挿入、応答の開始トリガーを設けることで、無駄な回答や矛盾を減らせます。
prompt = f"""
あなたは会話履歴を必ず参照して一貫性のある応答を返すアシスタントです。
以下のルールを守ってください。
1. 履歴があればそれを最優先してください。
2. 履歴が無ければ知識を使って推測してください。
3. 履歴と知識が矛盾した場合は履歴を優先してください。 \
[会話履歴] {context} \
[新しい質問]
### User: {user\_input} \
[応答]
### Assistant:
"""
このようにルールを明確に与えることで、AIの回答が安定し、利用者にとってわかりやすい応答になります。
FAQテンプレートによる不足補完
検索結果が不足する場合に備え、FAQや定型文をプロンプトに追加することで安定性を高めることができます。あらかじめよくある質問を組み込んでおくことで、回答不能を減らすことができます。
faq = """
Q: インデックスを作成するには?
A: CREATE INDEX ... を利用してください。
Q: セッションが切断されるのはなぜ?
A: タイムアウト設定が原因の可能性があります。
"""
FAQを組み込むことで、検索が外れた場合でもユーザーに有益な回答を返すことが可能となり、システムの信頼性が向上します。
まとめ
ここまで、RAG環境における検索精度向上とプロンプト設計の工夫について、具体的なコード例と改善方法を交えて解説しました。検索の安定性とプロンプトの一貫性は、RAGを実用レベルに引き上げるための両輪です。
最後に本記事の要点を整理し、今後の展開について触れます。
検索精度とプロンプト設計の両輪でRAGは活きる
検索精度が低ければ、AIは不正確な文脈を取り込み誤った応答を返します。一方で、プロンプト設計が甘ければ、正しい検索結果を与えても安定した回答にはつながりません。
チャンク分割の工夫やスコア閾値設定といった検索側の改善と、役割付与や履歴の挿入、応答開始トリガーといったプロンプト側の工夫は、どちらも欠かすことができません。両者を組み合わせて初めて、RAG環境は実用的で信頼性のあるシステムとして機能します。
今後の改善ポイントと応用可能性
今後の改善ポイントとしては、より高性能なEmbeddingモデルの導入や、検索スコアを動的に調整する仕組みの実装が考えられます。また、ユーザーの利用傾向に基づいたFAQテンプレートの拡充や、プロンプト設計のさらなる最適化も効果的です。
応用の可能性としては、ドキュメント管理やFAQシステム、カスタマーサポート向けの自動応答など、業務効率化や知識共有の分野に幅広く展開できます。継続的に改善を重ねることで、個人環境においても企業並みの高品質なRAGシステムを構築することが可能になります。