alltools.one
JSON
2025-06-02
8 min
alltools.one Team
JSONStreamingPerformanceLarge FilesMemory

大型文件的 JSON 流式处理:无需完整加载即可处理

标准 JSON 解析会将整个文档加载到内存中,构建完整的数据结构,然后才允许你访问。对于 10 MB 的文件,这没问题。但对于 10 GB 的文件,你的进程会耗尽内存并崩溃。流式解析器通过增量处理 JSON 来解决这个问题——在数据到达时即时读取和处理,无需将整个文档保存在内存中。

标准解析的问题

import json

# 这会将整个文件加载到内存中
with open('huge.json') as f:
    data = json.load(f)  # 10 GB 文件 = 10+ GB 的 RAM

# 处理在完全加载后才开始
for item in data['records']:
    process(item)

对于包含 100 万条记录、每条 10 KB 的文件,标准解析需要:

  • 文件大小:~10 GB
  • 解析所需内存:~10 GB(原始字符串)
  • 数据结构所需内存:~15-20 GB(Python 对象比原始 JSON 更大)
  • 总计:~25-30 GB 的 RAM

流式处理可将其降至几兆字节。

流式处理方法

SAX 风格(基于事件)

解析器在遇到 JSON 标记时发出事件:

import ijson

# 逐个处理条目——恒定的内存使用
with open('huge.json', 'rb') as f:
    for record in ijson.items(f, 'records.item'):
        process(record)  # 每条记录被独立解析
        # 之前的记录会被垃圾回收

事件包括:start_mapmap_keyend_mapstart_arrayend_arraystringnumberbooleannull

JSON Lines (JSONL / NDJSON)

一种更简单的方法:每行一个 JSON 对象。每行都是一个完整的、有效的 JSON 文档:

{"id": 1, "name": "Alice", "email": "alice@example.com"}
{"id": 2, "name": "Bob", "email": "bob@example.com"}
{"id": 3, "name": "Charlie", "email": "charlie@example.com"}

处理非常简单——逐行读取:

with open('data.jsonl') as f:
    for line in f:
        record = json.loads(line)
        process(record)

JSON Lines 的优势

  • 每行可独立解析(并行处理)
  • 方便追加(只需添加新行)
  • 兼容标准 Unix 工具(grepwcheadtail
  • 天然适合日志文件和流式数据

分块处理

对于标准 JSON 数组,将处理分成块:

import ijson

def process_in_chunks(filename, chunk_size=1000):
    chunk = []
    with open(filename, 'rb') as f:
        for record in ijson.items(f, 'item'):
            chunk.append(record)
            if len(chunk) >= chunk_size:
                process_batch(chunk)
                chunk = []
    if chunk:
        process_batch(chunk)

各语言实现

Python (ijson)

import ijson

# 从文件流式读取
with open('large.json', 'rb') as f:
    parser = ijson.parse(f)
    for prefix, event, value in parser:
        if prefix == 'records.item.name':
            print(value)

# 从 HTTP 响应流式读取
import urllib.request
response = urllib.request.urlopen('https://api.example.com/data')
for record in ijson.items(response, 'records.item'):
    process(record)

JavaScript (Node.js)

const { createReadStream } = require('fs');
const { parser } = require('stream-json');
const { streamArray } = require('stream-json/streamers/StreamArray');

const pipeline = createReadStream('large.json')
  .pipe(parser())
  .pipe(streamArray());

pipeline.on('data', ({ value }) => {
  process(value);
});

pipeline.on('end', () => {
  console.log('Done processing');
});

命令行 (jq)

# 流模式——逐个处理对象
jq --stream 'select(length == 2) | .[1]' large.json

# 处理 JSON Lines
cat data.jsonl | jq -c 'select(.age > 30)'

# 将数组转换为 JSON Lines
jq -c '.[]' large_array.json > data.jsonl

何时使用流式处理

场景标准解析流式处理
文件小于 100 MB首选没必要
文件 100 MB 到 1 GB取决于 RAM推荐
文件超过 1 GB不可行必须
HTTP 响应(大型)有超时风险边接收边处理
实时数据流不适用必须
简单的一次性读取首选没必要

性能对比

处理 100 万条记录(1 GB 文件):

方法内存使用处理时间复杂度
json.load()3-5 GB15 秒简单
ijson 流式处理50 MB45 秒中等
JSON Lines10 MB12 秒简单
分块处理 (1000)100 MB20 秒中等

流式处理使用的内存要少得多,但 SAX 风格的解析由于事件驱动的开销而速度较慢。JSON Lines 既快速又节省内存,因为每行都是独立的解析操作。

最佳实践

  1. 尽可能使用 JSON Lines:这是最简单的流式格式,兼容标准工具。
  2. 设置缓冲区大小:配置读取缓冲区以获得最佳吞吐量(通常为 64 KB 到 1 MB)。
  3. 批量处理:将数据库插入和 API 调用批量化,而不是逐条处理。
  4. 优雅地处理错误:在流式处理中,一条格式错误的记录不应导致整个管道崩溃。
  5. 监控内存:使用性能分析来验证流式处理确实控制了内存使用。

对于格式化和验证较小的 JSON 文件,我们的 JSON 格式化工具 可以处理最大 100 MB 的文档并提供实时格式化。

常见问题

流式解析器可以使用 JSONPath 吗?

一些流式库支持基于路径的过滤。Python 的 ijson.items 支持在流式处理期间按路径过滤。然而,复杂的 JSONPath 查询(过滤器、跨层级通配符)通常需要将完整文档加载到内存中。关于基于路径的查询,请参阅我们的 JSONPath 指南

如何将大型 JSON 数组转换为 JSON Lines?

对于能放入内存的文件,使用 jq -c '.[]' input.json > output.jsonl。对于真正的大文件,使用流式转换器:用流式解析器读取数组,将每个元素作为一行写出。

相关资源

Published on 2025-06-02
JSON Streaming for Large Files: Process Without Loading All | alltools.one