大型文件的 JSON 流式处理:无需完整加载即可处理
标准 JSON 解析会将整个文档加载到内存中,构建完整的数据结构,然后才允许你访问。对于 10 MB 的文件,这没问题。但对于 10 GB 的文件,你的进程会耗尽内存并崩溃。流式解析器通过增量处理 JSON 来解决这个问题——在数据到达时即时读取和处理,无需将整个文档保存在内存中。
标准解析的问题
import json
# 这会将整个文件加载到内存中
with open('huge.json') as f:
data = json.load(f) # 10 GB 文件 = 10+ GB 的 RAM
# 处理在完全加载后才开始
for item in data['records']:
process(item)
对于包含 100 万条记录、每条 10 KB 的文件,标准解析需要:
- 文件大小:~10 GB
- 解析所需内存:~10 GB(原始字符串)
- 数据结构所需内存:~15-20 GB(Python 对象比原始 JSON 更大)
- 总计:~25-30 GB 的 RAM
流式处理可将其降至几兆字节。
流式处理方法
SAX 风格(基于事件)
解析器在遇到 JSON 标记时发出事件:
import ijson
# 逐个处理条目——恒定的内存使用
with open('huge.json', 'rb') as f:
for record in ijson.items(f, 'records.item'):
process(record) # 每条记录被独立解析
# 之前的记录会被垃圾回收
事件包括:start_map、map_key、end_map、start_array、end_array、string、number、boolean、null。
JSON Lines (JSONL / NDJSON)
一种更简单的方法:每行一个 JSON 对象。每行都是一个完整的、有效的 JSON 文档:
{"id": 1, "name": "Alice", "email": "alice@example.com"}
{"id": 2, "name": "Bob", "email": "bob@example.com"}
{"id": 3, "name": "Charlie", "email": "charlie@example.com"}
处理非常简单——逐行读取:
with open('data.jsonl') as f:
for line in f:
record = json.loads(line)
process(record)
JSON Lines 的优势:
- 每行可独立解析(并行处理)
- 方便追加(只需添加新行)
- 兼容标准 Unix 工具(
grep、wc、head、tail) - 天然适合日志文件和流式数据
分块处理
对于标准 JSON 数组,将处理分成块:
import ijson
def process_in_chunks(filename, chunk_size=1000):
chunk = []
with open(filename, 'rb') as f:
for record in ijson.items(f, 'item'):
chunk.append(record)
if len(chunk) >= chunk_size:
process_batch(chunk)
chunk = []
if chunk:
process_batch(chunk)
各语言实现
Python (ijson)
import ijson
# 从文件流式读取
with open('large.json', 'rb') as f:
parser = ijson.parse(f)
for prefix, event, value in parser:
if prefix == 'records.item.name':
print(value)
# 从 HTTP 响应流式读取
import urllib.request
response = urllib.request.urlopen('https://api.example.com/data')
for record in ijson.items(response, 'records.item'):
process(record)
JavaScript (Node.js)
const { createReadStream } = require('fs');
const { parser } = require('stream-json');
const { streamArray } = require('stream-json/streamers/StreamArray');
const pipeline = createReadStream('large.json')
.pipe(parser())
.pipe(streamArray());
pipeline.on('data', ({ value }) => {
process(value);
});
pipeline.on('end', () => {
console.log('Done processing');
});
命令行 (jq)
# 流模式——逐个处理对象
jq --stream 'select(length == 2) | .[1]' large.json
# 处理 JSON Lines
cat data.jsonl | jq -c 'select(.age > 30)'
# 将数组转换为 JSON Lines
jq -c '.[]' large_array.json > data.jsonl
何时使用流式处理
| 场景 | 标准解析 | 流式处理 |
|---|---|---|
| 文件小于 100 MB | 首选 | 没必要 |
| 文件 100 MB 到 1 GB | 取决于 RAM | 推荐 |
| 文件超过 1 GB | 不可行 | 必须 |
| HTTP 响应(大型) | 有超时风险 | 边接收边处理 |
| 实时数据流 | 不适用 | 必须 |
| 简单的一次性读取 | 首选 | 没必要 |
性能对比
处理 100 万条记录(1 GB 文件):
| 方法 | 内存使用 | 处理时间 | 复杂度 |
|---|---|---|---|
| json.load() | 3-5 GB | 15 秒 | 简单 |
| ijson 流式处理 | 50 MB | 45 秒 | 中等 |
| JSON Lines | 10 MB | 12 秒 | 简单 |
| 分块处理 (1000) | 100 MB | 20 秒 | 中等 |
流式处理使用的内存要少得多,但 SAX 风格的解析由于事件驱动的开销而速度较慢。JSON Lines 既快速又节省内存,因为每行都是独立的解析操作。
最佳实践
- 尽可能使用 JSON Lines:这是最简单的流式格式,兼容标准工具。
- 设置缓冲区大小:配置读取缓冲区以获得最佳吞吐量(通常为 64 KB 到 1 MB)。
- 批量处理:将数据库插入和 API 调用批量化,而不是逐条处理。
- 优雅地处理错误:在流式处理中,一条格式错误的记录不应导致整个管道崩溃。
- 监控内存:使用性能分析来验证流式处理确实控制了内存使用。
对于格式化和验证较小的 JSON 文件,我们的 JSON 格式化工具 可以处理最大 100 MB 的文档并提供实时格式化。
常见问题
流式解析器可以使用 JSONPath 吗?
一些流式库支持基于路径的过滤。Python 的 ijson.items 支持在流式处理期间按路径过滤。然而,复杂的 JSONPath 查询(过滤器、跨层级通配符)通常需要将完整文档加载到内存中。关于基于路径的查询,请参阅我们的 JSONPath 指南。
如何将大型 JSON 数组转换为 JSON Lines?
对于能放入内存的文件,使用 jq -c '.[]' input.json > output.jsonl。对于真正的大文件,使用流式转换器:用流式解析器读取数组,将每个元素作为一行写出。
相关资源
- JSON 格式化工具 — 格式化和验证 JSON 文件
- JSONPath 查询指南 — 高效地从 JSON 中提取数据
- JSON 编辑器技巧 — 处理大型 JSON 文档