Skip to content

文件写入的原子化与并发操作

原子化文件写入,是属于 原子化操作 的一种。那么问题来了,什么又是原子化操作呢?

原子化操作:英语 Atomic,指的是操作要么全部发生,要么完全不发生,对读者而言,中间状态是不可见的。

原子写入(Atomic Write) 可以保证文件不存在中间半写入状态,与之相对应的是 撕裂写(Torn Write)。例如,代码里看似简单的一次 writeFile,在线上宕机崩溃后,文件可能变成 空文件半截 JSON

另外,两个人同时对一个文件做写入操作,那么文件最终的状态以谁的为准呢?这就是文件写入操作时的并发问题。

这篇文章,我们就来探讨一下文件写入时的原子化与并发操作问题。

回顾文件写入#

一般一门标准的编程语言,都会提供对文件以及目录相关的操作。比如笔者因为工作原因,曾接触过 C、PHP、Ruby、Java、Go 等语言,这些语言都存在专门处理文件以及目录的模块,而且都是语言内置模块。

Node.js 中同样也提供了 fs 模块,专门用于处理和文件相关的操作。由于我们这篇文章是介绍原子化 写入 操作的相关内容,所以我们先来回顾一下 Node.js 有关写入的操作,其它操作就不展开了。

Node.js 写入的基本语法:

fs.promises.writeFile(path, data, options)

回顾一下 flag 各种值,不同的值决定了不同的写入方式:

Flag可读可写创建不存在文件已有文件会被截断仅末尾追加已存在时报错典型用途
r只读打开;文件必须已存在。
rs只读并“同步模式/绕过本地缓存”,用于 NFS 等;谨慎使用。
r+可读写但不截断;定点覆盖旧内容要小心撕裂写。
rs+可读写并绕过本地缓存;非必要不建议用。
w覆盖写(默认);快照式写入但有截断风险。
wx仅首次创建并写入;已存在则 EEXIST
w+覆盖写且可读回;生成后立即校验。
wx+首次创建的读写快照;防误覆盖。
a追加写。
ax仅首次创建的追加文件;已存在报错。
a+可读又可持续追加(查看历史+继续写)。
ax+仅首次创建的读写追加。
as同步模式的追加(创建如无);极少用。
as+同步模式的读/追加(创建如无)。

通过上面的表格,可以看到光是和写入操作相关的 flag,就有很多种。这里没有必要全部挨着过一遍,咱们就看一下默认的 w 这种 flag。

这种 flag 的行为是:覆盖写。会先将文件截断为 0 长度,然后再执行写入操作。如果文件不存在,那就创建。

所以这也被称之为一种 快照式写入,每次保存都使用新文件内容整体替换旧文件。听上去这种写入方式不会存在文件内容撕裂的问题,但其实不然,因为撕裂分为两类:

  1. 内容撕裂
  2. 半截快照

例如 r+ 这种 flag,如果写入长度小于原文件内容长度,那么就会存在 新开头 + 旧尾巴 混在一起的情况,这种就称之为 内容撕裂。而 w 是直接覆盖,不会出现内容层面的撕裂。

但是,w 的覆盖操作,背后对应的具体步骤是先将原有文件截断为 0,然后再写新的内容,因此会存在第二种撕裂情况,也就是 半截快照。例如:

  1. 刚截断文件,突然电脑宕机,此时得到的就是一个空文件
  2. 截断后文件还没有写完,电脑宕机,此时得到的就是半截内容

所以,想要真正的实现快照语义,也就是读者要么看到旧版,要么看到新版,还是得用原子替换。

原子写入#

那么原子写入具体如何来做呢?其实背后的原理相当简单,就是在写入的时候,先将内容写入到一份临时的文件里面,待文件写入操作完成时,再使用临时文件来替换原有文件。倘若在写临时文件时突然宕机,对原文件也没有任何影响。

例如:

import { open, rename, mkdir, rm } from 'fs/promises'
import path from 'path'
/**
*
* @param {*} filePath 写入文件的路径
* @param {*} text 写入内容
* @param {*} param2 配置对象 - durable 代表是否刷盘
* durable: true “稳一点”,抗掉电更好,性能稍慢。
* durable: false “快一点”,不主动刷盘,吞吐更高,但极端情况下可能丢最后一次写。
*/
async function atomicWrite(filePath, text, { durable = true } = {}) {
const dir = path.dirname(filePath)
const tmp = path.join(dir, `.${path.basename(filePath)}.${Date.now()}.tmp`)
await mkdir(dir, { recursive: true })
const fh = await open(tmp, 'w', 0o600)
try {
await fh.writeFile(text, 'utf8')
if (durable) await fh.sync() // 刷盘:让 tmp 的内容真正落到介质
} finally {
await fh.close()
}
await rename(tmp, filePath) // 同目录原子替换
if (durable) {
const dh = await open(dir, 'r')
try {
await dh.sync()
} finally {
await dh.close()
} // 刷盘目录项
}
}

上面的代码,也非常好理解。首先创建一个临时文件 tmp,然后在做文件写入操作的时候,写入到 tmp 文件里面,最后通过 rename 来替换原有的文件。

另外,上面的代码涉及到了刷盘操作,什么是刷盘呢?

这是把还在内存缓存里的文件数据,强制写到持久介质(例如 SSD/HDD)的过程。普通 writeFile/write 返回时,数据通常只是进了内核页缓存,操作系统会在稍后才异步落盘。如果这时机器掉电/崩溃,缓存里的数据可能还没写到盘,导致丢失或半截。fsync/datasync/sync 这类“刷盘”调用会要求 OS 立刻把对应的数据(以及必要的元数据)写到磁盘控制器,尽量保证掉电后仍在。不过缺点就是需要耗费额外的时间。

所以在做原子化操作的时候,更加稳定的一套顺序是:

  1. 写临时文件
  2. await fh.sync():把内容刷盘
  3. rename(tmp, target):原子替换目录项
  4. 打开目录句柄并 await dirHandle.sync():把“重命名这件事”也刷盘

另外,在进行原子替换的时候,Windows 系统下可能会偶发 EPERM/EEXIST

因此针对 Windows 系统可以做一个兜底处理:

// 之前
await rename(tmp, filePath)
// 更新为兜底操作
try {
await rename(tmp, filePath) // POSIX: 同目录原子替换
} catch (e) {
// Windows 兜底:目标存在/被占用时可能 EPERM/EEXIST(非严格原子,但实用)
if (
process.platform === 'win32' &&
(e.code === 'EPERM' || e.code === 'EEXIST')
) {
await rm(filePath, { force: true }) // 尝试移除旧文件(可能仍因占用/只读失败)
await rename(tmp, filePath) // 再次改名
} else {
// 其他平台或其他错误:清理 tmp 临时文件后抛出
await rm(tmp, { force: true }).catch(() => {}) // 清理 tmp 临时文件
throw e
}
}

并发问题#

原子写入解决的是 半截文件 的问题,文件写入还存在另一个常见的问题,那就是并发写入。

例如当前文件的状态为 S0:

count: 0

A 用户对文件做 +1 操作,B 用户对文件做 +2 操作。当两个写入操作几乎同时发生的时候,意味着 A 和 B 读取到的都是 S0 状态,那么:

A 用户更新文件状态

S0 --> S1
count: 0 --> count: 1

B 用户更新文件状态

S0 --> S2
count: 0 --> count: 2

最终文件的状态为 count: 2,这就是典型的 “Last Write Wins” (最后写的是赢家)的问题。

明确了问题之后,接下来是如何解决问题。

我们可以使用一个队列,把 读→改→写(Read-Modify-Write, RMW)这一整段逻辑串起来顺序执行,前一个写完并落盘,后一个才开始读取最新文件。

下面落实到代码:

const fileQueues = new Map()
function withFileQueue(filePath, task) {
const prev = fileQueues.get(filePath) ?? Promise.resolve()
const run = () => task()
const next = prev.then(run, run)
fileQueues.set(
filePath,
next.catch(() => {})
)
next.finally(() => {
if (fileQueues.get(filePath) === next) {
fileQueues.delete(filePath)
}
})
return next
}

这段代码实现了“文件操作队列”机制,确保对同一个文件路径的异步操作按顺序执行,避免并发冲突。

下面我们来逐行看一下:

1. 获取之前的任务链

const prev = fileQueues.get(filePath) ?? Promise.resolve()

2. 定义任务函数

const run = () => task()

3. 创建新的任务链

const next = prev.then(run, run)

这是关键一步。prev.then(run, run)表示:

这确保了无论前一个任务成功还是失败,下一个任务都会被执行,避免了因前一个任务失败而导致后续所有任务被阻塞的情况。

4. 更新队列状态

fileQueues.set(
filePath,
next.catch(() => {})
)

这样做是为了避免因为某个 task 抛出未处理的 Promise rejection 而导致整个程序报错(Uncaught (in promise))。它让队列本身对错误“免疫”,但错误仍然可能在 task 内部被处理或需要外部关注。

5. 清理队列

next.finally(() => {
if (fileQueues.get(filePath) === next) {
fileQueues.delete(filePath)
}
})

next 这个 Promise 最终完成(无论是成功还是失败)时,执行清理:

6. 返回新任务的 Promise

return next

举个例子:

调用 A: withFileQueue(file, runA)

prev = Promise.resolve() // 队列起点(空队列)
nextA = prev.then(runA, runA) // 把 A 接到队尾
fileQueues[file] = nextA.catch(() => {}) // 存“队尾副本”,吞错防断链

调用 B: withFileQueue(file, runB),注意 A 仍在执行中

prev = fileQueues[file] ──► A' // A' nextA.catch(()=>{})
nextB = prev.then(runB, runB) // 等 A' 结论后再跑 B
fileQueues[file] = nextB.catch(()=>{}) // 队尾推进到 B'

调用 C: withFileQueue(file, runC)

prev = fileQueues[file] ──► B'
nextC = prev.then(runC, runC) // 等 B' 结论后再跑 C
fileQueues[file] = nextC.catch(()=>{}) // 队尾推进到 C'
执行时序(保证串行):
runA 开始 → runA 结束
runB 开始 → runB 结束
runC 开始 → runC 结束
完成后:
nextC settle → finally 做清理工作,若队尾仍是 C',从 Map 删除该 key

下面是对上述代码的一个调用示例:

import fs from 'fs/promises'
// 上述代码略...
async function updateJSON(file, updater) {
return withFileQueue(file, async () => {
const oldText = await fs.readFile(file, 'utf8').catch(() => '{}')
const old = JSON.parse(oldText)
const next = await updater(old) // 在队列任务里“读→改”
await fs.writeFile(file + '.tmp', JSON.stringify(next, null, 2))
await fs.rename(file + '.tmp', file) // 原子替换(写)
})
}
// 三个更新并发发起:A(+1)、B(+2)、C(+3),但是保证串行执行
const file = './state.json'
await Promise.all([
updateJSON(file, (s) => ({ count: (s.count ?? 0) + 1 })), // A
updateJSON(file, (s) => ({ count: (s.count ?? 0) + 2 })), // B
updateJSON(file, (s) => ({ count: (s.count ?? 0) + 3 })), // C
])

效果:

state.json
{
"count": 6
}

完整版#

最后,我们将前面的原子写入以及并发队列做一个整合,代码如下:

atomic-queued.js
import { open, rename, mkdir, readFile } from 'fs/promises'
import path from 'path'
// 原有的原子化写入
async function atomicWrite(filePath, text, { durable = true } = {}) {
const dir = path.dirname(filePath)
const tmp = path.join(dir, `.${path.basename(filePath)}.${Date.now()}.tmp`)
await mkdir(dir, { recursive: true })
const fh = await open(tmp, 'w', 0o600)
try {
await fh.writeFile(text, 'utf8')
if (durable) await fh.sync() // 刷盘
} finally {
await fh.close()
}
try {
await rename(tmp, filePath) // POSIX: 同目录原子替换
} catch (e) {
// Windows 兜底:目标存在/被占用时可能 EPERM/EEXIST(非严格原子,但实用)
if (
process.platform === 'win32' &&
(e.code === 'EPERM' || e.code === 'EEXIST')
) {
await rm(filePath, { force: true }) // 尝试移除旧文件(可能仍因占用/只读失败)
await rename(tmp, filePath) // 再次改名
} else {
// 其他平台或其他错误:清理 tmp 临时文件后抛出
await rm(tmp, { force: true }).catch(() => {}) // 清理 tmp 临时文件
throw e
}
}
if (durable) {
const dh = await open(dir, 'r')
try {
await dh.sync()
} finally {
await dh.close()
} // 刷新目录项
}
}
// 维护“每个文件路径 → 该文件队列尾Promise”的映射
const fileQueues = new Map()
// 串行化对同一 filePath 的写任务
function withFileQueue(filePath, task) {
const prev = fileQueues.get(filePath) ?? Promise.resolve()
const run = () => task()
const next = prev.then(run, run)
// 不让队列因未捕获拒绝而“断链”
fileQueues.set(
filePath,
next.catch(() => {})
)
// 任务完结后可清理
next.finally(() => {
if (fileQueues.get(filePath) === next) fileQueues.delete(filePath)
})
return next
}
async function readJSONSafe(file) {
try {
return JSON.parse(await readFile(file, 'utf8'))
} catch (e) {
if (e?.code === 'ENOENT' || e instanceof SyntaxError) return null
throw e
}
}
// 对外暴露的 2 个工具方法
export function queuedAtomicWrite(filePath, text, opts) {
return withFileQueue(filePath, () => atomicWrite(filePath, text, opts))
}
export function queuedUpdateJSON(filePath, updater, opts) {
return withFileQueue(filePath, async () => {
const oldState = (await readJSONSafe(filePath)) ?? {}
const newState = await updater(oldState) // 由调用方决定如何“改”
const text = JSON.stringify(newState, null, 2)
await atomicWrite(filePath, text, opts)
})
}

下面是一个调用示例:

import { queuedUpdateJSON, queuedAtomicWrite } from './atomic-queued.js'
// 并发 10 次自增:最终 counter 一定是 10
await Promise.all(
Array.from({ length: 10 }, () =>
queuedUpdateJSON('./state.json', (s) => ({ counter: (s.counter ?? 0) + 1 }))
)
)
// 纯文本也可以直接排队写,最终文本内容为 C
await Promise.all([
queuedAtomicWrite('./note.txt', 'A\n'),
queuedAtomicWrite('./note.txt', 'B\n'),
queuedAtomicWrite('./note.txt', 'C\n'),
])

写在最后#

关于文件写入,看起来只是一次 writeFile,但背后要权衡语义、故障与并发。默认的覆盖写是“快照式”但非原子,宕机可能出现空文件或半截内容。

要实现“要么旧、要么完整新”,应采用原子写入(先写同目录临时文件,再 rename),必要时配合 fsync 提升掉电后的可靠性。

并发场景下,仅有原子写还会遇到 Last Write Wins。将“读→改→写”封装进同一个队列任务,确保每次更新都在前一次落盘后再读取最新状态,从而避免丢更新。

可选补充:若存在多进程/多服务同时写同一文件,再叠加锁(锁文件或系统级文件锁)实现跨进程互斥。

好啦,这就是这篇文章的全部内容了,我们下篇文章再见👋