ローカルDBとVPS・レンタルサーバーのSQLをAPIで同期する

データベース

開発現場では「ローカル環境のデータベースと本番サーバーのデータベースを常に同期させたい」という要件がよく発生します。本記事では、REST APIを介してローカルのSQLiteやMySQL/PostgreSQLのデータを、VPSやレンタルサーバー上のSQLデータベースと同期する方法を、設計から実装・運用まで体系的に解説します。


1. データ同期の基本概念と設計方針

同期の種類

種類 特徴 主なユースケース
一方向同期(プッシュ) ローカル → サーバーへデータ送信 ログ収集、センサーデータ蓄積
一方向同期(プル) サーバー → ローカルへデータ取得 マスタデータの配布、設定同期
双方向同期 両者の差分を相互に反映 マルチデバイス対応アプリ
増分同期 最後の同期以降の変更のみ転送 大規模DB・通信量削減が必要な場面

競合(コンフリクト)への対処

双方向同期で最も難しい問題は「同じレコードがローカルとサーバー両方で更新された場合」の競合処理です。代表的な戦略:

  • Last Write Wins(最後の更新が勝つ):実装がシンプル。タイムスタンプで判定
  • Server Wins:サーバー側の変更を常に優先。マスタデータ配布に適する
  • Client Wins:クライアント側の変更を常に優先。オフライン編集アプリに適する
  • カスタムマージ:フィールド単位で細かく制御。最も柔軟だが実装コストが高い

💡 推奨:まずは「Last Write Wins + updated_at タイムスタンプ」方式から始めるのが最もバランスが取れています。


2. 必要なデータベース設計

同期を正しく機能させるために、テーブルには以下のカラムを追加します。

-- 同期対応テーブルの基本設計
CREATE TABLE items (
  id          VARCHAR(36)  PRIMARY KEY,  -- UUIDを使用(ローカルとリモートで衝突しない)
  name        VARCHAR(255) NOT NULL,
  value       TEXT,
  created_at  DATETIME     NOT NULL DEFAULT CURRENT_TIMESTAMP,
  updated_at  DATETIME     NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  deleted_at  DATETIME     NULL,         -- 論理削除(物理削除すると差分が追えない)
  sync_status VARCHAR(20)  DEFAULT 'pending',  -- pending / synced / conflict
  version     INT          DEFAULT 1     -- 楽観的ロック用
);

⚠️ 注意:AUTO_INCREMENTではなくUUIDを主キーに使うことで、ローカルとリモートで同じIDが発行されるリスクを回避できます。


3. APIサーバーの実装(Node.js / Express)

プロジェクトセットアップ

mkdir sync-api && cd sync-api
npm init -y
npm install express mysql2 jsonwebtoken uuid dotenv
npm install -D nodemon

差分取得エンドポイント(GET /api/sync/pull)

クライアントは「最後に同期した日時」以降に変更されたレコードだけを取得します。

// routes/sync.js
const express = require('express');
const router = express.Router();

// GET /api/sync/pull?since=2024-01-01T00:00:00Z&table=items
router.get('/pull', authenticateToken, async (req, res) => {
  const { since, table } = req.query;
  const allowedTables = ['items', 'categories', 'users'];

  if (!allowedTables.includes(table)) {
    return res.status(400).json({ error: 'Invalid table name' });
  }

  try {
    const [rows] = await db.query(
      `SELECT * FROM ?? WHERE updated_at > ? OR deleted_at > ?
       ORDER BY updated_at ASC LIMIT 500`,
      [table, since, since]
    );
    res.json({
      data: rows,
      syncedAt: new Date().toISOString(),
      count: rows.length
    });
  } catch (err) {
    res.status(500).json({ error: err.message });
  }
});

データ送信エンドポイント(POST /api/sync/push)

router.post('/push', authenticateToken, async (req, res) => {
  const { table, records } = req.body;
  const results = { success: [], conflict: [], error: [] };

  for (const record of records) {
    try {
      const [existing] = await db.query(
        'SELECT version, updated_at FROM ?? WHERE id = ?',
        [table, record.id]
      );

      if (existing.length > 0) {
        const serverRecord = existing[0];
        // サーバー側が新しければコンフリクト
        if (new Date(serverRecord.updated_at) > new Date(record.updated_at)) {
          results.conflict.push({ id: record.id, serverVersion: serverRecord });
          continue;
        }
      }

      // アップサート(INSERT or UPDATE)
      await db.query(
        `INSERT INTO ?? SET ? ON DUPLICATE KEY UPDATE
         name=VALUES(name), value=VALUES(value),
         updated_at=VALUES(updated_at), version=version+1`,
        [table, record]
      );
      results.success.push(record.id);

    } catch (err) {
      results.error.push({ id: record.id, error: err.message });
    }
  }

  res.json(results);
});

4. ローカルクライアントの実装(Python)

同期エージェント本体

# sync_agent.py
import sqlite3, requests
from datetime import datetime

API_BASE  = 'https://your-server.com/api/sync'
API_TOKEN = 'your-jwt-token'
DB_PATH   = 'local.db'

class SyncAgent:
    def __init__(self):
        self.conn = sqlite3.connect(DB_PATH)
        self.conn.row_factory = sqlite3.Row
        self._ensure_meta_table()

    def _ensure_meta_table(self):
        self.conn.execute('''
            CREATE TABLE IF NOT EXISTS sync_meta (
                table_name TEXT PRIMARY KEY,
                last_pull  TEXT DEFAULT '1970-01-01T00:00:00Z',
                last_push  TEXT DEFAULT '1970-01-01T00:00:00Z'
            )''')
        self.conn.commit()

    def pull(self, table: str):
        """サーバーから差分を取得してローカルDBに反映"""
        last_pull = self._get_meta(table, 'last_pull')
        resp = requests.get(
            f'{API_BASE}/pull',
            params={'since': last_pull, 'table': table},
            headers={'Authorization': f'Bearer {API_TOKEN}'}
        )
        resp.raise_for_status()
        data = resp.json()

        cursor = self.conn.cursor()
        for rec in data['data']:
            local = cursor.execute(
                f'SELECT updated_at FROM {table} WHERE id=?', (rec['id'],)
            ).fetchone()
            if not local or local['updated_at'] < rec['updated_at']:
                cols = ', '.join(rec.keys())
                placeholders = ', '.join(['?'] * len(rec))
                updates = ', '.join(f'{k}=excluded.{k}' for k in rec.keys())
                cursor.execute(
                    f'INSERT INTO {table} ({cols}) VALUES ({placeholders})'
                    f' ON CONFLICT(id) DO UPDATE SET {updates}',
                    list(rec.values())
                )
        self._set_meta(table, 'last_pull', data['syncedAt'])
        self.conn.commit()
        print(f'[PULL] {table}: {len(data["data"])}件 取得')

    def push(self, table: str):
        """ローカルの未同期レコードをサーバーへ送信"""
        rows = self.conn.execute(
            f'SELECT * FROM {table} WHERE sync_status = ?', ('pending',)
        ).fetchall()
        if not rows:
            print(f'[PUSH] {table}: 送信するデータなし')
            return

        records = [dict(r) for r in rows]
        resp = requests.post(
            f'{API_BASE}/push',
            json={'table': table, 'records': records},
            headers={'Authorization': f'Bearer {API_TOKEN}'}
        )
        resp.raise_for_status()
        result = resp.json()

        for rid in result['success']:
            self.conn.execute(
                f'UPDATE {table} SET sync_status=? WHERE id=?', ('synced', rid)
            )
        self.conn.commit()
        print(f'[PUSH] {table}: 成功{len(result["success"])}件, コンフリクト{len(result["conflict"])}件')

定期実行スクリプト

# main.py
from sync_agent import SyncAgent
import schedule, time
from datetime import datetime

agent = SyncAgent()
TABLES = ['items', 'categories']

def sync_all():
    print(f'--- 同期開始: {datetime.now()} ---')
    for table in TABLES:
        agent.pull(table)  # まずサーバーから取得
        agent.push(table)  # 次にローカルの変更を送信

# 5分ごとに自動同期
schedule.every(5).minutes.do(sync_all)

sync_all()  # 初回即時実行
while True:
    schedule.run_pending()
    time.sleep(1)

5. セキュリティ設計

JWT認証ミドルウェア

// middleware/auth.js (Node.js)
const jwt = require('jsonwebtoken');

function authenticateToken(req, res, next) {
  const token = req.headers['authorization']?.split(' ')[1];
  if (!token) return res.status(401).json({ error: 'Token required' });

  jwt.verify(token, process.env.JWT_SECRET, (err, user) => {
    if (err) return res.status(403).json({ error: 'Invalid token' });
    req.user = user;
    next();
  });
}

module.exports = { authenticateToken };

セキュリティチェックリスト

  • 通信はHTTPS(TLS)必須。HTTP経由の同期は絶対に避ける
  • SQLインジェクション対策:プリペアドステートメントを必ず使用
  • テーブル名・カラム名のホワイトリスト検証(動的クエリの場合)
  • レート制限(Rate Limiting)を実装してAPI過負荷を防止
  • 送受信データのサイズ制限(例:1リクエスト500件まで)
  • APIキー・JWTシークレットは環境変数で管理。コードにハードコードしない
  • 同期ログを保存してデータの流れを追跡可能にする

🔒 重要:同期APIはデータベースへの直接アクセスと同等の権限を持ちます。認証・認可の実装を最優先してください。


6. 運用と監視

同期ログテーブルの設計

CREATE TABLE sync_log (
  id            INT AUTO_INCREMENT PRIMARY KEY,
  direction     ENUM('push', 'pull') NOT NULL,
  table_name    VARCHAR(64),
  record_count  INT DEFAULT 0,
  conflict_count INT DEFAULT 0,
  error_count   INT DEFAULT 0,
  duration_ms   INT,
  status        ENUM('success', 'partial', 'failed') NOT NULL,
  error_detail  TEXT,
  executed_at   DATETIME DEFAULT CURRENT_TIMESTAMP
);

よくあるトラブルと対処法

症状 原因 対処法
データが重複する 同じレコードを複数回プッシュ UPSERTを使用し冪等性を担保
同期が止まる ネットワークエラーや例外処理漏れ リトライロジックと例外ハンドリングを実装
タイムスタンプがずれる ローカルとサーバーの時計が違う サーバー時刻を正としてNTPで同期
コンフリクトが多発 同じデータを複数端末が更新 ロック戦略の見直し、UUIDで行を特定
同期が遅い データ量が多すぎる ページネーション・LIMIT実装で分割取得

まとめ

  • UUIDを主キーに使い、ローカル・リモートでIDが衝突しないよう設計する
  • updated_at / deleted_at カラムで増分同期と論理削除を実現する
  • APIには必ずJWT等の認証を実装し、HTTPS通信のみ許可する
  • コンフリクト戦略はシンプルな「Last Write Wins」から始め、必要に応じて複雑化する
  • 同期ログを残してトラブル時のデバッグを容易にする
  • エラーが起きても途中で止まらないよう、リトライと部分同期に対応する

今回紹介したパターンはシンプルなREST API + SQLite / MySQLの組み合わせですが、スケールが必要になれば WebSocket によるリアルタイム同期や、専用の同期フレームワーク(PouchDB / CouchDB、ElectricSQL 等)の採用も検討してください。

📚 参考:より高度な同期が必要な場合は、ElectricSQL(PostgreSQL対応)やPowerSync(SQLite対応)といった専用ライブラリも選択肢です。

コメント

タイトルとURLをコピーしました