Skip to content

嵌入处理

回忆 RAG 关键步骤:

  1. 文本切割
  2. 嵌入处理
  3. 存储向量数据库

嵌入处理,又称之为向量化操作。核心就是将文本转为向量的形式,从而为下一步做数学运算做准备。

"今天的天气真好,万里无云"
[
0.3297254741191864, 0.7386181354522705, -3.342341899871826,
-0.7811917066574097, -0.08536303788423538, 0.05086381733417511,
... 668 more items
]

该操作一般需要依赖专门做嵌入处理的模型。例如:

import { OpenAIEmbeddings } from '@langchain/openai'
const embeddings = new OpenAIEmbeddings({
apiKey: process.env.OPENAI_API_KEY,
model: 'text-embedding-3-large', // OpenAI 官方提供的专门用于做嵌入的模型
})
const vectors = await embeddings.embedDocuments(
documents.map((doc) => doc.pageContent)
)

在上面的代码中,使用的是 OpenAI 官方提供的专用嵌入模型。而 OpenAIEmbeddings 则是 Embeddings 的子类,关于 Embeddings 这个工具类,后面再来介绍。

快速上手#

课堂练习:使用 nomic-embed-text 模型做嵌入操作

import { TextLoader } from 'langchain/document_loaders/fs/text'
import { RecursiveCharacterTextSplitter } from 'langchain/text_splitter'
const loader = new TextLoader('data/kong.txt')
const docs = await loader.load()
const splitter = new RecursiveCharacterTextSplitter({
chunkSize: 64,
chunkOverlap: 0,
})
const splittedDocs = await splitter.splitDocuments(docs)
// console.log(splittedDocs)
async function getEmbedding(text) {
const res = await fetch('http://localhost:11434/api/embeddings', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
model: 'nomic-embed-text',
prompt: text,
}),
})
const result = await res.json()
return result.embedding
}
const results = []
for (const doc of splittedDocs) {
const embedding = await getEmbedding(doc.pageContent)
results.push({ ...doc, embedding })
}
console.log(results)

不过目前这个例子,在进行嵌入操作时花费的时间是比较久的,这里我们可以做一个计时来测量一下花费的时间。

究其原因,是因为现在在做嵌入操作时,采用的是串行的形式。

所以,接下来一个工作重点,支持并发的嵌入操作。

自定义嵌入类#

要实现并发的嵌入操作,我们可以自己来自定义一个嵌入类

不过在此之前,需要先了解 Embeddings 工具类。

Embeddings 是 LangChain 中抽象出来的嵌入操作基类,不同厂商的向量模型,都通过继承该基类实现暴露统一方法,从而能在向量库、检索器等组件里互换使用。

基类提供两组最核心的方法:

文档地址:https://js.langchain.com/docs/concepts/embedding_models/?utm_source=chatgpt.com

使用并发工具方法:runWithConcurrency

export async function runWithConcurrency(items, worker, maxConcurrency) {
if (!items?.length) return
let i = 0
const workers = []
async function spawn() {
while (i < items.length) {
const idx = i++
await worker(items[idx], idx)
}
}
const n = Math.max(1, Math.min(maxConcurrency, items.length))
for (let k = 0; k < n; k++) workers.push(spawn())
await Promise.allSettled(workers)
}

课堂练习:自定义嵌入类

import { Embeddings } from '@langchain/core/embeddings'
import { runWithConcurrency } from './concurrency.js'
export class NomicEmbeddings extends Embeddings {
constructor(concurrency = 3) {
super()
this.model = 'nomic-embed-text'
this.apiUrl = 'http://localhost:11434/api/embeddings'
this.concurrency = concurrency
}
/**
* 对单个文本做嵌入操作,这是一个内部方法
* @param {*} text 单个文本
*/
async #fetchEmbedding(text) {
const res = await fetch(this.apiUrl, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
model: this.model,
prompt: text,
}),
})
const result = await res.json()
return result.embedding
}
/**
* 对单个文本做嵌入操作
* @param {*} text
*/
async embedQuery(text) {
return await this.#fetchEmbedding(text)
}
/**
* 对一组文本做嵌入操作
* @param {*} documents
*/
async embedDocuments(documents) {
const results = Array.from({ length: documents.length }) // 存放结果的数组
// 添加一个并发的探针
let active = 0 // 并发数
let maxActive = 0 // 最大并发数
const t0 = performance.now()
await runWithConcurrency(
documents,
async (text, idx) => {
// 开始了一个任务,需要对并发数做一个计数
active++
maxActive = Math.max(maxActive, active)
console.log(
`[start] #${idx} +${(performance.now() - t0).toFixed(
0
)}ms active=${active}`
)
try {
results[idx] = await this.#fetchEmbedding(text)
} catch (err) {
results[idx] = err
} finally {
// 任务结束
active--
console.log(
`[end ] #${idx} +${(performance.now() - t0).toFixed(
0
)}ms active=${active}`
)
}
},
this.concurrency
)
return results
}
}

第三方并发库#

关于并发的控制,还可以使用一个第三方库:p-limit

该库是一个极小的工具,用来限制并发执行的 Promise 个数,可以用于 Node.js 和浏览器环境。

基本用法:

import pLimit from "p-limit";
const limit = pLimit(3); // 同时最多跑 3 个
const tasks = urls.map(url =>
limit(() => fetchJson(url));
);
const results = await Promise.all(tasks);

下面是一个快速上手示例,添加并发探针,检测并发的数量:

import pLimit from 'p-limit'
const urls = [
'https://jsonplaceholder.typicode.com/todos/1',
'https://jsonplaceholder.typicode.com/todos/2',
'https://jsonplaceholder.typicode.com/todos/3',
'https://jsonplaceholder.typicode.com/todos/4',
'https://jsonplaceholder.typicode.com/todos/5',
'https://jsonplaceholder.typicode.com/todos/6',
]
// 基础请求函数:拿到 JSON,不是 2xx 则抛错
async function fetchJson(url) {
const res = await fetch(url, { headers: { accept: 'application/json' } })
if (!res.ok) throw new Error(`HTTP ${res.status} for ${url}`)
return res.json()
}
async function main() {
// 同时最多跑 3 个请求
const limit = pLimit(3)
// 并发探针
let active = 0
let maxActive = 0
const t0 = performance.now()
const tasks = urls.map((url, idx) =>
limit(async () => {
// 任务开始,并发探针检测并发数量
active++
if (active > limit.concurrency) {
console.warn(`并发超限: active=${active} > limit=${limit.concurrency}`)
}
maxActive = Math.max(maxActive, active)
console.log(
`[start] #${idx} +${(performance.now() - t0).toFixed(
3
)}ms active=${active}`
)
try {
return await fetchJson(url)
} finally {
// 任务结束
active--
console.log(
`[end ] #${idx} +${(performance.now() - t0).toFixed(
3
)}ms active=${active}`
)
}
})
)
try {
const results = await Promise.all(tasks)
console.log('结果:', results)
} catch (err) {
console.error('至少有一个请求失败:', err)
} finally {
console.log(
`并发观测:maxActive=${maxActive}, limit=${limit.concurrency}, ` +
`activeCount=${limit.activeCount}, pendingCount=${limit.pendingCount}`
)
}
}
main().catch((e) => console.error(e))

课堂练习:使用 p-limit 重构前面自定义的嵌入类。