回忆 RAG 关键步骤:
- 文本切割
- 嵌入处理
- 存储向量数据库
嵌入处理,又称之为向量化操作。核心就是将文本转为向量的形式,从而为下一步做数学运算做准备。
"今天的天气真好,万里无云"[ 0.3297254741191864, 0.7386181354522705, -3.342341899871826, -0.7811917066574097, -0.08536303788423538, 0.05086381733417511, ... 668 more items]该操作一般需要依赖专门做嵌入处理的模型。例如:
import { OpenAIEmbeddings } from '@langchain/openai'
const embeddings = new OpenAIEmbeddings({ apiKey: process.env.OPENAI_API_KEY, model: 'text-embedding-3-large', // OpenAI 官方提供的专门用于做嵌入的模型})
const vectors = await embeddings.embedDocuments( documents.map((doc) => doc.pageContent))在上面的代码中,使用的是 OpenAI 官方提供的专用嵌入模型。而 OpenAIEmbeddings 则是 Embeddings 的子类,关于 Embeddings 这个工具类,后面再来介绍。
快速上手#
课堂练习:使用 nomic-embed-text 模型做嵌入操作
import { TextLoader } from 'langchain/document_loaders/fs/text'import { RecursiveCharacterTextSplitter } from 'langchain/text_splitter'
const loader = new TextLoader('data/kong.txt')
const docs = await loader.load()
const splitter = new RecursiveCharacterTextSplitter({ chunkSize: 64, chunkOverlap: 0,})
const splittedDocs = await splitter.splitDocuments(docs)
// console.log(splittedDocs)
async function getEmbedding(text) { const res = await fetch('http://localhost:11434/api/embeddings', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ model: 'nomic-embed-text', prompt: text, }), }) const result = await res.json() return result.embedding}
const results = []for (const doc of splittedDocs) { const embedding = await getEmbedding(doc.pageContent) results.push({ ...doc, embedding })}
console.log(results)不过目前这个例子,在进行嵌入操作时花费的时间是比较久的,这里我们可以做一个计时来测量一下花费的时间。
究其原因,是因为现在在做嵌入操作时,采用的是串行的形式。
所以,接下来一个工作重点,支持并发的嵌入操作。
自定义嵌入类#
要实现并发的嵌入操作,我们可以自己来自定义一个嵌入类。
不过在此之前,需要先了解 Embeddings 工具类。
Embeddings 是 LangChain 中抽象出来的嵌入操作基类,不同厂商的向量模型,都通过继承该基类实现暴露统一方法,从而能在向量库、检索器等组件里互换使用。
基类提供两组最核心的方法:
embedDocuments(texts: string[]) => Promise<number[][]>:批量嵌入用于索引的文本(返回二维向量数组)。embedQuery(text: string) => Promise<number[]>:为查询文本生成向量(返回一维向量)。
文档地址:https://js.langchain.com/docs/concepts/embedding_models/?utm_source=chatgpt.com
使用并发工具方法:runWithConcurrency
export async function runWithConcurrency(items, worker, maxConcurrency) { if (!items?.length) return
let i = 0
const workers = []
async function spawn() { while (i < items.length) { const idx = i++ await worker(items[idx], idx) } }
const n = Math.max(1, Math.min(maxConcurrency, items.length))
for (let k = 0; k < n; k++) workers.push(spawn())
await Promise.allSettled(workers)}课堂练习:自定义嵌入类
import { Embeddings } from '@langchain/core/embeddings'import { runWithConcurrency } from './concurrency.js'
export class NomicEmbeddings extends Embeddings { constructor(concurrency = 3) { super() this.model = 'nomic-embed-text' this.apiUrl = 'http://localhost:11434/api/embeddings' this.concurrency = concurrency }
/** * 对单个文本做嵌入操作,这是一个内部方法 * @param {*} text 单个文本 */ async #fetchEmbedding(text) { const res = await fetch(this.apiUrl, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ model: this.model, prompt: text, }), }) const result = await res.json() return result.embedding }
/** * 对单个文本做嵌入操作 * @param {*} text */ async embedQuery(text) { return await this.#fetchEmbedding(text) }
/** * 对一组文本做嵌入操作 * @param {*} documents */ async embedDocuments(documents) { const results = Array.from({ length: documents.length }) // 存放结果的数组
// 添加一个并发的探针 let active = 0 // 并发数 let maxActive = 0 // 最大并发数 const t0 = performance.now()
await runWithConcurrency( documents, async (text, idx) => { // 开始了一个任务,需要对并发数做一个计数 active++ maxActive = Math.max(maxActive, active) console.log( `[start] #${idx} +${(performance.now() - t0).toFixed( 0 )}ms active=${active}` )
try { results[idx] = await this.#fetchEmbedding(text) } catch (err) { results[idx] = err } finally { // 任务结束 active-- console.log( `[end ] #${idx} +${(performance.now() - t0).toFixed( 0 )}ms active=${active}` ) } }, this.concurrency )
return results }}第三方并发库#
关于并发的控制,还可以使用一个第三方库:p-limit
该库是一个极小的工具,用来限制并发执行的 Promise 个数,可以用于 Node.js 和浏览器环境。
基本用法:
import pLimit from "p-limit";
const limit = pLimit(3); // 同时最多跑 3 个
const tasks = urls.map(url => limit(() => fetchJson(url)););const results = await Promise.all(tasks);下面是一个快速上手示例,添加并发探针,检测并发的数量:
import pLimit from 'p-limit'
const urls = [ 'https://jsonplaceholder.typicode.com/todos/1', 'https://jsonplaceholder.typicode.com/todos/2', 'https://jsonplaceholder.typicode.com/todos/3', 'https://jsonplaceholder.typicode.com/todos/4', 'https://jsonplaceholder.typicode.com/todos/5', 'https://jsonplaceholder.typicode.com/todos/6',]
// 基础请求函数:拿到 JSON,不是 2xx 则抛错async function fetchJson(url) { const res = await fetch(url, { headers: { accept: 'application/json' } }) if (!res.ok) throw new Error(`HTTP ${res.status} for ${url}`) return res.json()}
async function main() { // 同时最多跑 3 个请求 const limit = pLimit(3)
// 并发探针 let active = 0 let maxActive = 0 const t0 = performance.now()
const tasks = urls.map((url, idx) => limit(async () => { // 任务开始,并发探针检测并发数量 active++ if (active > limit.concurrency) { console.warn(`并发超限: active=${active} > limit=${limit.concurrency}`) } maxActive = Math.max(maxActive, active) console.log( `[start] #${idx} +${(performance.now() - t0).toFixed( 3 )}ms active=${active}` )
try { return await fetchJson(url) } finally { // 任务结束 active-- console.log( `[end ] #${idx} +${(performance.now() - t0).toFixed( 3 )}ms active=${active}` ) } }) )
try { const results = await Promise.all(tasks) console.log('结果:', results) } catch (err) { console.error('至少有一个请求失败:', err) } finally { console.log( `并发观测:maxActive=${maxActive}, limit=${limit.concurrency}, ` + `activeCount=${limit.activeCount}, pendingCount=${limit.pendingCount}` ) }}
main().catch((e) => console.error(e))课堂练习:使用 p-limit 重构前面自定义的嵌入类。