原子化文件写入,是属于 原子化操作 的一种。那么问题来了,什么又是原子化操作呢?
原子化操作:英语 Atomic,指的是操作要么全部发生,要么完全不发生,对读者而言,中间状态是不可见的。
原子写入(Atomic Write) 可以保证文件不存在中间半写入状态,与之相对应的是 撕裂写(Torn Write)。例如,代码里看似简单的一次 writeFile,在线上宕机崩溃后,文件可能变成 空文件 或 半截 JSON。
另外,两个人同时对一个文件做写入操作,那么文件最终的状态以谁的为准呢?这就是文件写入操作时的并发问题。
这篇文章,我们就来探讨一下文件写入时的原子化与并发操作问题。
回顾文件写入#
一般一门标准的编程语言,都会提供对文件以及目录相关的操作。比如笔者因为工作原因,曾接触过 C、PHP、Ruby、Java、Go 等语言,这些语言都存在专门处理文件以及目录的模块,而且都是语言内置模块。
Node.js 中同样也提供了 fs 模块,专门用于处理和文件相关的操作。由于我们这篇文章是介绍原子化 写入 操作的相关内容,所以我们先来回顾一下 Node.js 有关写入的操作,其它操作就不展开了。
Node.js 写入的基本语法:
fs.promises.writeFile(path, data, options)- path:要写入的文件路径
- data:要写入的数据
- options:配置对象,可以配置的项目有
- encoding:编码
- mode:新建文件时的权限位,只在文件被创建时生效,如果目标文件已存在,mode 一般不会改动原有权限
- flag:打开标志
- signal:AbortSignal 实例对象,可以用来取消写入操作
回顾一下 flag 各种值,不同的值决定了不同的写入方式:
| Flag | 可读 | 可写 | 创建不存在文件 | 已有文件会被截断 | 仅末尾追加 | 已存在时报错 | 典型用途 |
|---|---|---|---|---|---|---|---|
r | ✅ | ❌ | ❌ | ❌ | ❌ | — | 只读打开;文件必须已存在。 |
rs | ✅ | ❌ | ❌ | ❌ | ❌ | — | 只读并“同步模式/绕过本地缓存”,用于 NFS 等;谨慎使用。 |
r+ | ✅ | ✅ | ❌ | ❌ | ❌ | — | 可读写但不截断;定点覆盖旧内容要小心撕裂写。 |
rs+ | ✅ | ✅ | ❌ | ❌ | ❌ | — | 可读写并绕过本地缓存;非必要不建议用。 |
w | ❌ | ✅ | ✅ | ✅ | ❌ | — | 覆盖写(默认);快照式写入但有截断风险。 |
wx | ❌ | ✅ | ✅ | ✅ | ❌ | ✅ | 仅首次创建并写入;已存在则 EEXIST。 |
w+ | ✅ | ✅ | ✅ | ✅ | ❌ | — | 覆盖写且可读回;生成后立即校验。 |
wx+ | ✅ | ✅ | ✅ | ✅ | ❌ | ✅ | 首次创建的读写快照;防误覆盖。 |
a | ❌ | ✅ | ✅ | ❌ | ✅ | — | 追加写。 |
ax | ❌ | ✅ | ✅ | ❌ | ✅ | ✅ | 仅首次创建的追加文件;已存在报错。 |
a+ | ✅ | ✅ | ✅ | ❌ | ✅ | — | 可读又可持续追加(查看历史+继续写)。 |
ax+ | ✅ | ✅ | ✅ | ❌ | ✅ | ✅ | 仅首次创建的读写追加。 |
as | ❌ | ✅ | ✅ | ❌ | ✅ | — | 同步模式的追加(创建如无);极少用。 |
as+ | ✅ | ✅ | ✅ | ❌ | ✅ | — | 同步模式的读/追加(创建如无)。 |
通过上面的表格,可以看到光是和写入操作相关的 flag,就有很多种。这里没有必要全部挨着过一遍,咱们就看一下默认的 w 这种 flag。
这种 flag 的行为是:覆盖写。会先将文件截断为 0 长度,然后再执行写入操作。如果文件不存在,那就创建。
所以这也被称之为一种 快照式写入,每次保存都使用新文件内容整体替换旧文件。听上去这种写入方式不会存在文件内容撕裂的问题,但其实不然,因为撕裂分为两类:
- 内容撕裂
- 半截快照
例如 r+ 这种 flag,如果写入长度小于原文件内容长度,那么就会存在 新开头 + 旧尾巴 混在一起的情况,这种就称之为 内容撕裂。而 w 是直接覆盖,不会出现内容层面的撕裂。
但是,w 的覆盖操作,背后对应的具体步骤是先将原有文件截断为 0,然后再写新的内容,因此会存在第二种撕裂情况,也就是 半截快照。例如:
- 刚截断文件,突然电脑宕机,此时得到的就是一个空文件
- 截断后文件还没有写完,电脑宕机,此时得到的就是半截内容
所以,想要真正的实现快照语义,也就是读者要么看到旧版,要么看到新版,还是得用原子替换。
原子写入#
那么原子写入具体如何来做呢?其实背后的原理相当简单,就是在写入的时候,先将内容写入到一份临时的文件里面,待文件写入操作完成时,再使用临时文件来替换原有文件。倘若在写临时文件时突然宕机,对原文件也没有任何影响。
例如:
import { open, rename, mkdir, rm } from 'fs/promises'import path from 'path'
/** * * @param {*} filePath 写入文件的路径 * @param {*} text 写入内容 * @param {*} param2 配置对象 - durable 代表是否刷盘 * durable: true “稳一点”,抗掉电更好,性能稍慢。 * durable: false “快一点”,不主动刷盘,吞吐更高,但极端情况下可能丢最后一次写。 */async function atomicWrite(filePath, text, { durable = true } = {}) { const dir = path.dirname(filePath) const tmp = path.join(dir, `.${path.basename(filePath)}.${Date.now()}.tmp`)
await mkdir(dir, { recursive: true })
const fh = await open(tmp, 'w', 0o600) try { await fh.writeFile(text, 'utf8') if (durable) await fh.sync() // 刷盘:让 tmp 的内容真正落到介质 } finally { await fh.close() }
await rename(tmp, filePath) // 同目录原子替换
if (durable) { const dh = await open(dir, 'r') try { await dh.sync() } finally { await dh.close() } // 刷盘目录项 }}上面的代码,也非常好理解。首先创建一个临时文件 tmp,然后在做文件写入操作的时候,写入到 tmp 文件里面,最后通过 rename 来替换原有的文件。
另外,上面的代码涉及到了刷盘操作,什么是刷盘呢?
这是把还在内存缓存里的文件数据,强制写到持久介质(例如 SSD/HDD)的过程。普通 writeFile/write 返回时,数据通常只是进了内核页缓存,操作系统会在稍后才异步落盘。如果这时机器掉电/崩溃,缓存里的数据可能还没写到盘,导致丢失或半截。fsync/datasync/sync 这类“刷盘”调用会要求 OS 立刻把对应的数据(以及必要的元数据)写到磁盘控制器,尽量保证掉电后仍在。不过缺点就是需要耗费额外的时间。
所以在做原子化操作的时候,更加稳定的一套顺序是:
- 写临时文件
await fh.sync():把内容刷盘rename(tmp, target):原子替换目录项- 打开目录句柄并
await dirHandle.sync():把“重命名这件事”也刷盘
另外,在进行原子替换的时候,Windows 系统下可能会偶发 EPERM/EEXIST:
- POSIX(macOS/Linux):同一分区内
rename(tmp → target)可以原子覆盖已存在的target,要么得到旧文件、要么得到完整新文件。 - Windows/NTFS:
fs.rename对“目标已存在”不总是同样的语义。目标存在、被占用、只读、被安全软件/索引器短暂钩住等情况时,Node 这层会报:EEXIST:目标存在且不允许覆盖EPERM/EBUSY:权限/占用类问题
因此针对 Windows 系统可以做一个兜底处理:
// 之前await rename(tmp, filePath)
// 更新为兜底操作try { await rename(tmp, filePath) // POSIX: 同目录原子替换} catch (e) { // Windows 兜底:目标存在/被占用时可能 EPERM/EEXIST(非严格原子,但实用) if ( process.platform === 'win32' && (e.code === 'EPERM' || e.code === 'EEXIST') ) { await rm(filePath, { force: true }) // 尝试移除旧文件(可能仍因占用/只读失败) await rename(tmp, filePath) // 再次改名 } else { // 其他平台或其他错误:清理 tmp 临时文件后抛出 await rm(tmp, { force: true }).catch(() => {}) // 清理 tmp 临时文件 throw e }}并发问题#
原子写入解决的是 半截文件 的问题,文件写入还存在另一个常见的问题,那就是并发写入。
例如当前文件的状态为 S0:
count: 0A 用户对文件做 +1 操作,B 用户对文件做 +2 操作。当两个写入操作几乎同时发生的时候,意味着 A 和 B 读取到的都是 S0 状态,那么:
A 用户更新文件状态
S0 --> S1count: 0 --> count: 1B 用户更新文件状态
S0 --> S2count: 0 --> count: 2最终文件的状态为 count: 2,这就是典型的 “Last Write Wins” (最后写的是赢家)的问题。
明确了问题之后,接下来是如何解决问题。
我们可以使用一个队列,把 读→改→写(Read-Modify-Write, RMW)这一整段逻辑串起来顺序执行,前一个写完并落盘,后一个才开始读取最新文件。
下面落实到代码:
const fileQueues = new Map()function withFileQueue(filePath, task) { const prev = fileQueues.get(filePath) ?? Promise.resolve() const run = () => task() const next = prev.then(run, run) fileQueues.set( filePath, next.catch(() => {}) ) next.finally(() => { if (fileQueues.get(filePath) === next) { fileQueues.delete(filePath) } }) return next}这段代码实现了“文件操作队列”机制,确保对同一个文件路径的异步操作按顺序执行,避免并发冲突。
下面我们来逐行看一下:
1. 获取之前的任务链
const prev = fileQueues.get(filePath) ?? Promise.resolve()fileQueues是一个Map,以filePath为键,存储着当前正在执行或已排队的针对该文件的 Promise。- 如果该文件已有排队的任务(
prev存在),则新任务将接在它后面。 - 如果该文件没有排队的任务(
prev为undefined),则使用Promise.resolve()作为起点,表示可以立即开始。
2. 定义任务函数
const run = () => task()task是传入的、需要执行的异步操作(通常返回一个 Promise)。run是一个函数,调用它就会执行task。
3. 创建新的任务链
const next = prev.then(run, run)这是关键一步。prev.then(run, run)表示:
- 当
prev成功完成时,执行run(即执行新任务task)。 - 当
prev失败时,也执行run(即仍然执行新任务task)。
这确保了无论前一个任务成功还是失败,下一个任务都会被执行,避免了因前一个任务失败而导致后续所有任务被阻塞的情况。
4. 更新队列状态
fileQueues.set( filePath, next.catch(() => {}))- 将
fileQueues中该filePath对应的值更新为next。 next.catch(() => {})的作用是:捕获next任务自身的错误,但不处理(空函数),防止错误冒泡。
这样做是为了避免因为某个 task 抛出未处理的 Promise rejection 而导致整个程序报错(Uncaught (in promise))。它让队列本身对错误“免疫”,但错误仍然可能在 task 内部被处理或需要外部关注。
5. 清理队列
next.finally(() => { if (fileQueues.get(filePath) === next) { fileQueues.delete(filePath) }})当 next 这个 Promise 最终完成(无论是成功还是失败)时,执行清理:
- 检查
fileQueues中该filePath对应的是否还是next(防止在finally执行前又有新任务加入,导致误删)。 - 如果是,则从
fileQueues中删除该filePath的记录,表示该文件当前没有排队的任务了。
6. 返回新任务的 Promise
return next- 函数返回
next,即代表这个新加入任务的 Promise。 - 调用者可以
.then()或await这个返回值,来获取任务执行的结果或处理错误。
举个例子:
调用 A: withFileQueue(file, runA)
prev = Promise.resolve() // 队列起点(空队列)nextA = prev.then(runA, runA) // 把 A 接到队尾fileQueues[file] = nextA.catch(() => {}) // 存“队尾副本”,吞错防断链调用 B: withFileQueue(file, runB),注意 A 仍在执行中
prev = fileQueues[file] ──► A' // A' 是 nextA.catch(()=>{})nextB = prev.then(runB, runB) // 等 A' 结论后再跑 BfileQueues[file] = nextB.catch(()=>{}) // 队尾推进到 B'调用 C: withFileQueue(file, runC)
prev = fileQueues[file] ──► B'nextC = prev.then(runC, runC) // 等 B' 结论后再跑 CfileQueues[file] = nextC.catch(()=>{}) // 队尾推进到 C'执行时序(保证串行): runA 开始 → runA 结束 ↓ runB 开始 → runB 结束 ↓ runC 开始 → runC 结束完成后:nextC settle → finally 做清理工作,若队尾仍是 C',从 Map 删除该 key-
prev总是“上一次的队尾副本”(next.catch(()=>{})),因此 B 一定在 A 完成后开始,C 一定在 B 完成后开始。 -
then(run, run)表示“无论前一个成功还是失败,都继续往下跑”;即使你以后改动存储逻辑,队列也不易断。
下面是对上述代码的一个调用示例:
import fs from 'fs/promises'
// 上述代码略...
async function updateJSON(file, updater) { return withFileQueue(file, async () => { const oldText = await fs.readFile(file, 'utf8').catch(() => '{}') const old = JSON.parse(oldText) const next = await updater(old) // 在队列任务里“读→改” await fs.writeFile(file + '.tmp', JSON.stringify(next, null, 2)) await fs.rename(file + '.tmp', file) // 原子替换(写) })}
// 三个更新并发发起:A(+1)、B(+2)、C(+3),但是保证串行执行const file = './state.json'await Promise.all([ updateJSON(file, (s) => ({ count: (s.count ?? 0) + 1 })), // A updateJSON(file, (s) => ({ count: (s.count ?? 0) + 2 })), // B updateJSON(file, (s) => ({ count: (s.count ?? 0) + 3 })), // C])效果:
{ "count": 6}完整版#
最后,我们将前面的原子写入以及并发队列做一个整合,代码如下:
import { open, rename, mkdir, readFile } from 'fs/promises'import path from 'path'
// 原有的原子化写入async function atomicWrite(filePath, text, { durable = true } = {}) { const dir = path.dirname(filePath) const tmp = path.join(dir, `.${path.basename(filePath)}.${Date.now()}.tmp`)
await mkdir(dir, { recursive: true })
const fh = await open(tmp, 'w', 0o600) try { await fh.writeFile(text, 'utf8') if (durable) await fh.sync() // 刷盘 } finally { await fh.close() }
try { await rename(tmp, filePath) // POSIX: 同目录原子替换 } catch (e) { // Windows 兜底:目标存在/被占用时可能 EPERM/EEXIST(非严格原子,但实用) if ( process.platform === 'win32' && (e.code === 'EPERM' || e.code === 'EEXIST') ) { await rm(filePath, { force: true }) // 尝试移除旧文件(可能仍因占用/只读失败) await rename(tmp, filePath) // 再次改名 } else { // 其他平台或其他错误:清理 tmp 临时文件后抛出 await rm(tmp, { force: true }).catch(() => {}) // 清理 tmp 临时文件 throw e } }
if (durable) { const dh = await open(dir, 'r') try { await dh.sync() } finally { await dh.close() } // 刷新目录项 }}
// 维护“每个文件路径 → 该文件队列尾Promise”的映射const fileQueues = new Map()
// 串行化对同一 filePath 的写任务function withFileQueue(filePath, task) { const prev = fileQueues.get(filePath) ?? Promise.resolve()
const run = () => task() const next = prev.then(run, run)
// 不让队列因未捕获拒绝而“断链” fileQueues.set( filePath, next.catch(() => {}) )
// 任务完结后可清理 next.finally(() => { if (fileQueues.get(filePath) === next) fileQueues.delete(filePath) })
return next}
async function readJSONSafe(file) { try { return JSON.parse(await readFile(file, 'utf8')) } catch (e) { if (e?.code === 'ENOENT' || e instanceof SyntaxError) return null throw e }}
// 对外暴露的 2 个工具方法export function queuedAtomicWrite(filePath, text, opts) { return withFileQueue(filePath, () => atomicWrite(filePath, text, opts))}
export function queuedUpdateJSON(filePath, updater, opts) { return withFileQueue(filePath, async () => { const oldState = (await readJSONSafe(filePath)) ?? {} const newState = await updater(oldState) // 由调用方决定如何“改” const text = JSON.stringify(newState, null, 2) await atomicWrite(filePath, text, opts) })}下面是一个调用示例:
import { queuedUpdateJSON, queuedAtomicWrite } from './atomic-queued.js'
// 并发 10 次自增:最终 counter 一定是 10await Promise.all( Array.from({ length: 10 }, () => queuedUpdateJSON('./state.json', (s) => ({ counter: (s.counter ?? 0) + 1 })) ))
// 纯文本也可以直接排队写,最终文本内容为 Cawait Promise.all([ queuedAtomicWrite('./note.txt', 'A\n'), queuedAtomicWrite('./note.txt', 'B\n'), queuedAtomicWrite('./note.txt', 'C\n'),])写在最后#
关于文件写入,看起来只是一次 writeFile,但背后要权衡语义、故障与并发。默认的覆盖写是“快照式”但非原子,宕机可能出现空文件或半截内容。
要实现“要么旧、要么完整新”,应采用原子写入(先写同目录临时文件,再 rename),必要时配合 fsync 提升掉电后的可靠性。
并发场景下,仅有原子写还会遇到 Last Write Wins。将“读→改→写”封装进同一个队列任务,确保每次更新都在前一次落盘后再读取最新状态,从而避免丢更新。
可选补充:若存在多进程/多服务同时写同一文件,再叠加锁(锁文件或系统级文件锁)实现跨进程互斥。
好啦,这就是这篇文章的全部内容了,我们下篇文章再见👋