PythonでCSVファイルを効率的に読み書きする方法:csv・pandas・polars比較

Pythonのcsvモジュール、pandas、polarsを使ったCSVファイルの読み書き方法を比較。大容量ファイルの処理やエンコーディング問題の対処法も解説。

CSVファイルとは

CSV(Comma-Separated Values)は、カンマ区切りでデータを格納するテキスト形式のファイルです。データの受け渡しや分析の入出力として広く利用されています。Pythonには標準ライブラリのcsvモジュールのほか、pandaspolarsといったサードパーティライブラリがあり、用途や規模に応じて使い分けることが重要です。

1. 標準ライブラリ csvモジュール

1.1 csv.reader:基本的な読み込み

csv.readerはCSVファイルを行ごとにリストとして読み込みます。

import csv

with open("data.csv", "r", encoding="utf-8") as f:
    reader = csv.reader(f)
    header = next(reader)  # ヘッダー行を取得
    print(f"カラム: {header}")
    for row in reader:
        print(row)  # ['値1', '値2', '値3']

1.2 csv.writer:基本的な書き込み

import csv

data = [
    ["名前", "年齢", "都市"],
    ["田中", 30, "東京"],
    ["佐藤", 25, "大阪"],
    ["鈴木", 35, "名古屋"],
]

with open("output.csv", "w", encoding="utf-8", newline="") as f:
    writer = csv.writer(f)
    writer.writerows(data)

newline=""を指定しないと、Windows環境で空行が挿入される問題が発生します。

1.3 DictReader / DictWriter:辞書形式での操作

カラム名をキーとした辞書形式で操作できるため、コードの可読性が向上します。

import csv

# 辞書形式で読み込み
with open("data.csv", "r", encoding="utf-8") as f:
    reader = csv.DictReader(f)
    for row in reader:
        print(row["名前"], row["年齢"])  # カラム名でアクセス

# 辞書形式で書き込み
fieldnames = ["名前", "年齢", "都市"]
rows = [
    {"名前": "田中", "年齢": 30, "都市": "東京"},
    {"名前": "佐藤", "年齢": 25, "都市": "大阪"},
]

with open("output.csv", "w", encoding="utf-8", newline="") as f:
    writer = csv.DictWriter(f, fieldnames=fieldnames)
    writer.writeheader()
    writer.writerows(rows)

1.4 区切り文字やクォートのカスタマイズ

TSV(タブ区切り)やセミコロン区切りにも対応できます。

import csv

# TSVファイルの読み込み
with open("data.tsv", "r", encoding="utf-8") as f:
    reader = csv.reader(f, delimiter="\t")
    for row in reader:
        print(row)

# クォート処理のカスタマイズ
with open("data.csv", "w", encoding="utf-8", newline="") as f:
    writer = csv.writer(f, quoting=csv.QUOTE_ALL)  # すべてのフィールドをクォート
    writer.writerow(["名前", "住所, 東京都", "備考"])

2. pandasによるCSV操作

2.1 read_csv:柔軟な読み込み

pandas.read_csvは非常に多くのオプションを備えており、実務で最もよく使われます。

import pandas as pd

# 基本的な読み込み
df = pd.read_csv("data.csv")

# よく使うオプションを指定した読み込み
df = pd.read_csv(
    "data.csv",
    encoding="utf-8",       # エンコーディング指定
    header=0,               # ヘッダー行のインデックス(Noneでヘッダーなし)
    index_col=0,            # インデックスに使用する列
    dtype={"年齢": int, "売上": float},  # 型を明示的に指定
    usecols=["名前", "年齢", "売上"],    # 必要な列のみ読み込み
    na_values=["N/A", "-", ""],          # 欠損値として扱う文字列
    parse_dates=["日付"],   # 日付型に変換する列
    nrows=1000,             # 先頭N行のみ読み込み
)

2.2 to_csv:書き出し

import pandas as pd

df = pd.DataFrame({
    "名前": ["田中", "佐藤", "鈴木"],
    "年齢": [30, 25, 35],
    "都市": ["東京", "大阪", "名古屋"],
})

# 基本的な書き出し
df.to_csv("output.csv", index=False, encoding="utf-8")

# よく使うオプション
df.to_csv(
    "output.csv",
    index=False,            # インデックス列を出力しない
    encoding="utf-8-sig",   # Excelで開く場合はBOM付きUTF-8
    columns=["名前", "都市"],  # 出力する列を指定
    sep="\t",               # タブ区切りで出力
    na_rep="N/A",           # 欠損値の表記
)

2.3 行のフィルタリングと集計

import pandas as pd

df = pd.read_csv("sales.csv")

# 条件によるフィルタリング
tokyo_sales = df[df["都市"] == "東京"]
high_sales = df[df["売上"] > 100000]

# 複合条件
result = df[(df["都市"] == "東京") & (df["年齢"] >= 30)]

# グループ別集計
summary = df.groupby("都市").agg(
    売上合計=("売上", "sum"),
    売上平均=("売上", "mean"),
    件数=("売上", "count"),
).reset_index()

print(summary)

2.4 複数CSVのマージ

import pandas as pd
import glob

# 同じ形式のCSVを結合
files = glob.glob("data/sales_*.csv")
dfs = [pd.read_csv(f) for f in files]
combined = pd.concat(dfs, ignore_index=True)

# 2つのCSVをキーで結合
customers = pd.read_csv("customers.csv")
orders = pd.read_csv("orders.csv")
merged = pd.merge(orders, customers, on="顧客ID", how="left")

3. polarsによるCSV操作

polarsはRust製の高速データフレームライブラリです。pandasと似たAPIを持ちながら、大規模データで大幅に高速に動作します。

import polars as pl

# 読み込み
df = pl.read_csv("data.csv")

# 型指定付き読み込み
df = pl.read_csv(
    "data.csv",
    dtypes={"年齢": pl.Int32, "売上": pl.Float64},
    encoding="utf8",
)

# フィルタリングと集計
result = (
    df.filter(pl.col("都市") == "東京")
    .group_by("カテゴリ")
    .agg(
        pl.col("売上").sum().alias("売上合計"),
        pl.col("売上").mean().alias("売上平均"),
        pl.col("売上").count().alias("件数"),
    )
)

# 書き出し
result.write_csv("output.csv")

遅延評価(Lazy API)

polarsの大きな特徴であるLazy APIを使うと、クエリの最適化が自動的に行われます。

import polars as pl

# 遅延評価でクエリを構築
result = (
    pl.scan_csv("large_data.csv")  # LazyFrameとして読み込み
    .filter(pl.col("売上") > 10000)
    .group_by("都市")
    .agg(pl.col("売上").sum())
    .sort("売上", descending=True)
    .collect()  # ここで初めて実行される
)

scan_csvは必要なデータのみを読み込むため、大容量ファイルでもメモリ効率が良くなります。

4. 大容量CSVの効率的な処理

数GB以上のCSVファイルを処理する場合、メモリに全データを載せられないことがあります。

4.1 pandasのchunksize

import pandas as pd

# チャンク単位で読み込み・処理
chunk_size = 10000
results = []

for chunk in pd.read_csv("large_data.csv", chunksize=chunk_size):
    # 各チャンクを個別に処理
    filtered = chunk[chunk["売上"] > 10000]
    results.append(filtered)

# 結果を結合
final = pd.concat(results, ignore_index=True)

4.2 itertoolsとcsvモジュールによる省メモリ処理

メモリ使用量を最小限に抑えたい場合は、csvモジュールとジェネレータを組み合わせます。

import csv
from itertools import islice

def read_csv_chunks(filepath, chunk_size=10000):
    """CSVファイルをチャンク単位で読み込むジェネレータ"""
    with open(filepath, "r", encoding="utf-8") as f:
        reader = csv.reader(f)
        header = next(reader)
        while True:
            chunk = list(islice(reader, chunk_size))
            if not chunk:
                break
            yield header, chunk

# 使用例:売上の合計を省メモリで計算
total_sales = 0
for header, chunk in read_csv_chunks("large_data.csv"):
    sales_idx = header.index("売上")
    total_sales += sum(float(row[sales_idx]) for row in chunk)

print(f"売上合計: {total_sales:,.0f}")

4.3 polarsのストリーミング処理

import polars as pl

# ストリーミングモードで大容量ファイルを処理
result = (
    pl.scan_csv("very_large_data.csv")
    .filter(pl.col("ステータス") == "完了")
    .group_by("都市")
    .agg(pl.col("売上").sum())
    .collect(streaming=True)  # ストリーミングモードで実行
)

5. エンコーディング問題の対処法

日本語環境では、CSVファイルのエンコーディングに関する問題が頻繁に発生します。

5.1 よくあるエンコーディングと対処

エンコーディング用途Pythonでの指定
UTF-8標準的なエンコーディングencoding="utf-8"
UTF-8 BOMExcel出力encoding="utf-8-sig"
Shift_JISWindows日本語環境encoding="shift_jis"
CP932Shift_JISの拡張encoding="cp932"
EUC-JP旧Unix系システムencoding="euc_jp"

5.2 エンコーディング自動判定

import chardet

def detect_encoding(filepath):
    """ファイルのエンコーディングを自動判定する"""
    with open(filepath, "rb") as f:
        raw_data = f.read(10000)  # 先頭10KBを読み込み
        result = chardet.detect(raw_data)
    return result["encoding"], result["confidence"]

# 使用例
encoding, confidence = detect_encoding("unknown.csv")
print(f"エンコーディング: {encoding} (確信度: {confidence:.2%})")

import pandas as pd
df = pd.read_csv("unknown.csv", encoding=encoding)

5.3 エンコーディングエラーへの対処

import pandas as pd

# エラーを無視して読み込み(データ欠損の可能性あり)
df = pd.read_csv("data.csv", encoding="utf-8", encoding_errors="ignore")

# エラー箇所を置換文字に変換
df = pd.read_csv("data.csv", encoding="utf-8", encoding_errors="replace")

# 複数のエンコーディングを順番に試す
def read_csv_auto(filepath):
    """複数のエンコーディングを試してCSVを読み込む"""
    encodings = ["utf-8", "utf-8-sig", "cp932", "shift_jis", "euc_jp"]
    for enc in encodings:
        try:
            return pd.read_csv(filepath, encoding=enc)
        except (UnicodeDecodeError, UnicodeError):
            continue
    raise ValueError(f"読み込みに失敗しました: {filepath}")

5.4 ExcelでCSVを開くためのBOM付きUTF-8出力

Microsoft ExcelでUTF-8のCSVを正しく表示するには、BOM(Byte Order Mark)を付ける必要があります。

import pandas as pd

df = pd.DataFrame({"名前": ["田中", "佐藤"], "売上": [100, 200]})

# BOM付きUTF-8で書き出し(Excelで文字化けしない)
df.to_csv("for_excel.csv", index=False, encoding="utf-8-sig")

6. 実践例:データの前処理パイプライン

複数の処理を組み合わせた実践的な例を示します。文字列のパターンマッチングには正規表現が便利です。

import pandas as pd
import re

def process_sales_data(input_path, output_path):
    """売上データの前処理パイプライン"""
    # 1. 読み込み(エンコーディング自動対応)
    df = pd.read_csv(input_path, encoding="cp932", parse_dates=["日付"])

    # 2. 不要な列の削除
    df = df.drop(columns=["備考", "更新日時"], errors="ignore")

    # 3. 欠損値の処理
    df["売上"] = df["売上"].fillna(0)
    df["都市"] = df["都市"].fillna("不明")

    # 4. データ型の変換
    df["売上"] = df["売上"].astype(int)

    # 5. 正規表現による電話番号のクレンジング
    df["電話番号"] = df["電話番号"].apply(
        lambda x: re.sub(r"[^\d]", "", str(x)) if pd.notna(x) else ""
    )

    # 6. フィルタリング(売上が0より大きい行のみ)
    df = df[df["売上"] > 0]

    # 7. 集計列の追加
    df["月"] = df["日付"].dt.to_period("M")

    # 8. 書き出し
    df.to_csv(output_path, index=False, encoding="utf-8-sig")
    print(f"処理完了: {len(df)}行を出力しました")

process_sales_data("raw_sales.csv", "cleaned_sales.csv")

繰り返し実行する処理にはデコレータでログ出力やリトライ機能を追加すると便利です。

7. csv・pandas・polars比較

項目csv(標準)pandaspolars
インストール不要(標準)pip install pandaspip install polars
速度遅い中速高速
メモリ効率良い(行単位)普通良い(列指向)
機能の豊富さ最小限非常に豊富豊富
型安全性なし弱い強い
並列処理なしなし自動並列化
遅延評価なしなしあり(scan_csv)
日本語対応手動良好良好
学習コスト低い中程度中程度
推奨用途小規模・単純処理分析・変換全般大規模データ処理

性能比較ベンチマーク

100万行 x 10列のCSVファイルでの読み込み時間の目安です。

import time
import csv
import pandas as pd
import polars as pl

filepath = "benchmark_data.csv"  # 100万行のCSV

# csv.reader
start = time.perf_counter()
with open(filepath, "r") as f:
    reader = csv.reader(f)
    data = list(reader)
csv_time = time.perf_counter() - start

# pandas
start = time.perf_counter()
df_pd = pd.read_csv(filepath)
pandas_time = time.perf_counter() - start

# polars
start = time.perf_counter()
df_pl = pl.read_csv(filepath)
polars_time = time.perf_counter() - start

print(f"csv.reader: {csv_time:.2f}秒")
print(f"pandas:     {pandas_time:.2f}秒")
print(f"polars:     {polars_time:.2f}秒")
# 典型的な結果例:
# csv.reader: 3.50秒
# pandas:     1.20秒
# polars:     0.35秒

8. パフォーマンスTips

大規模CSVを扱う際のパフォーマンス改善テクニックをまとめます。

8.1 必要な列だけ読み込む

import pandas as pd

# 全列読み込み(遅い)
df = pd.read_csv("large.csv")

# 必要な列だけ(速い・省メモリ)
df = pd.read_csv("large.csv", usecols=["名前", "売上"])

8.2 型を明示して読み込む

pandasは型推論に時間がかかるため、大容量ファイルでは型を明示すると高速化できます。

import pandas as pd

dtypes = {
    "ID": "int32",         # int64 → int32でメモリ半減
    "名前": "string",      # objectよりstringが効率的
    "売上": "float32",     # float64 → float32でメモリ半減
    "カテゴリ": "category", # カテゴリ型で大幅にメモリ削減
}

df = pd.read_csv("large.csv", dtype=dtypes)

8.3 Parquetフォーマットへの変換

繰り返し読み込むCSVは、Parquet形式に変換しておくと大幅に高速化されます。

import pandas as pd

# CSVをParquetに変換(初回のみ)
df = pd.read_csv("large.csv")
df.to_parquet("large.parquet", engine="pyarrow")

# 以降はParquetから読み込み(数倍〜数十倍高速)
df = pd.read_parquet("large.parquet")

8.4 大量のCSVを非同期で読み込む

大量のCSVファイルを同時に読み込む場合、非同期処理やマルチプロセスを活用できます。

from concurrent.futures import ProcessPoolExecutor
import pandas as pd
import glob

def process_file(filepath):
    """個別のCSVファイルを処理する関数"""
    df = pd.read_csv(filepath)
    return df[df["売上"] > 10000]

# マルチプロセスで並列処理
files = glob.glob("data/sales_*.csv")
with ProcessPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(process_file, files))

combined = pd.concat(results, ignore_index=True)

まとめ

CSVファイルの処理は用途に応じてツールを選択することが重要です。

  • 小規模・単純な処理csvモジュール(依存なし)
  • データ分析・変換pandas(最も広く使われている)
  • 大規模データ・高速処理polars(自動並列化・遅延評価)

日本語環境ではエンコーディングの問題が避けられないため、chardetによる自動判定やutf-8-sigの使い分けを習慣づけるとよいでしょう。また、繰り返し読み込むファイルはParquet形式に変換しておくことで、読み込み速度を大幅に改善できます。

関連記事

参考


関連ツール