開発現場では「ローカル環境のデータベースと本番サーバーのデータベースを常に同期させたい」という要件がよく発生します。本記事では、REST APIを介してローカルのSQLiteやMySQL/PostgreSQLのデータを、VPSやレンタルサーバー上のSQLデータベースと同期する方法を、設計から実装・運用まで体系的に解説します。
1. データ同期の基本概念と設計方針
同期の種類
| 種類 | 特徴 | 主なユースケース |
|---|---|---|
| 一方向同期(プッシュ) | ローカル → サーバーへデータ送信 | ログ収集、センサーデータ蓄積 |
| 一方向同期(プル) | サーバー → ローカルへデータ取得 | マスタデータの配布、設定同期 |
| 双方向同期 | 両者の差分を相互に反映 | マルチデバイス対応アプリ |
| 増分同期 | 最後の同期以降の変更のみ転送 | 大規模DB・通信量削減が必要な場面 |
競合(コンフリクト)への対処
双方向同期で最も難しい問題は「同じレコードがローカルとサーバー両方で更新された場合」の競合処理です。代表的な戦略:
- Last Write Wins(最後の更新が勝つ):実装がシンプル。タイムスタンプで判定
- Server Wins:サーバー側の変更を常に優先。マスタデータ配布に適する
- Client Wins:クライアント側の変更を常に優先。オフライン編集アプリに適する
- カスタムマージ:フィールド単位で細かく制御。最も柔軟だが実装コストが高い
💡 推奨:まずは「Last Write Wins + updated_at タイムスタンプ」方式から始めるのが最もバランスが取れています。
2. 必要なデータベース設計
同期を正しく機能させるために、テーブルには以下のカラムを追加します。
-- 同期対応テーブルの基本設計
CREATE TABLE items (
id VARCHAR(36) PRIMARY KEY, -- UUIDを使用(ローカルとリモートで衝突しない)
name VARCHAR(255) NOT NULL,
value TEXT,
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
deleted_at DATETIME NULL, -- 論理削除(物理削除すると差分が追えない)
sync_status VARCHAR(20) DEFAULT 'pending', -- pending / synced / conflict
version INT DEFAULT 1 -- 楽観的ロック用
);
⚠️ 注意:AUTO_INCREMENTではなくUUIDを主キーに使うことで、ローカルとリモートで同じIDが発行されるリスクを回避できます。
3. APIサーバーの実装(Node.js / Express)
プロジェクトセットアップ
mkdir sync-api && cd sync-api
npm init -y
npm install express mysql2 jsonwebtoken uuid dotenv
npm install -D nodemon
差分取得エンドポイント(GET /api/sync/pull)
クライアントは「最後に同期した日時」以降に変更されたレコードだけを取得します。
// routes/sync.js
const express = require('express');
const router = express.Router();
// GET /api/sync/pull?since=2024-01-01T00:00:00Z&table=items
router.get('/pull', authenticateToken, async (req, res) => {
const { since, table } = req.query;
const allowedTables = ['items', 'categories', 'users'];
if (!allowedTables.includes(table)) {
return res.status(400).json({ error: 'Invalid table name' });
}
try {
const [rows] = await db.query(
`SELECT * FROM ?? WHERE updated_at > ? OR deleted_at > ?
ORDER BY updated_at ASC LIMIT 500`,
[table, since, since]
);
res.json({
data: rows,
syncedAt: new Date().toISOString(),
count: rows.length
});
} catch (err) {
res.status(500).json({ error: err.message });
}
});
データ送信エンドポイント(POST /api/sync/push)
router.post('/push', authenticateToken, async (req, res) => {
const { table, records } = req.body;
const results = { success: [], conflict: [], error: [] };
for (const record of records) {
try {
const [existing] = await db.query(
'SELECT version, updated_at FROM ?? WHERE id = ?',
[table, record.id]
);
if (existing.length > 0) {
const serverRecord = existing[0];
// サーバー側が新しければコンフリクト
if (new Date(serverRecord.updated_at) > new Date(record.updated_at)) {
results.conflict.push({ id: record.id, serverVersion: serverRecord });
continue;
}
}
// アップサート(INSERT or UPDATE)
await db.query(
`INSERT INTO ?? SET ? ON DUPLICATE KEY UPDATE
name=VALUES(name), value=VALUES(value),
updated_at=VALUES(updated_at), version=version+1`,
[table, record]
);
results.success.push(record.id);
} catch (err) {
results.error.push({ id: record.id, error: err.message });
}
}
res.json(results);
});
4. ローカルクライアントの実装(Python)
同期エージェント本体
# sync_agent.py
import sqlite3, requests
from datetime import datetime
API_BASE = 'https://your-server.com/api/sync'
API_TOKEN = 'your-jwt-token'
DB_PATH = 'local.db'
class SyncAgent:
def __init__(self):
self.conn = sqlite3.connect(DB_PATH)
self.conn.row_factory = sqlite3.Row
self._ensure_meta_table()
def _ensure_meta_table(self):
self.conn.execute('''
CREATE TABLE IF NOT EXISTS sync_meta (
table_name TEXT PRIMARY KEY,
last_pull TEXT DEFAULT '1970-01-01T00:00:00Z',
last_push TEXT DEFAULT '1970-01-01T00:00:00Z'
)''')
self.conn.commit()
def pull(self, table: str):
"""サーバーから差分を取得してローカルDBに反映"""
last_pull = self._get_meta(table, 'last_pull')
resp = requests.get(
f'{API_BASE}/pull',
params={'since': last_pull, 'table': table},
headers={'Authorization': f'Bearer {API_TOKEN}'}
)
resp.raise_for_status()
data = resp.json()
cursor = self.conn.cursor()
for rec in data['data']:
local = cursor.execute(
f'SELECT updated_at FROM {table} WHERE id=?', (rec['id'],)
).fetchone()
if not local or local['updated_at'] < rec['updated_at']:
cols = ', '.join(rec.keys())
placeholders = ', '.join(['?'] * len(rec))
updates = ', '.join(f'{k}=excluded.{k}' for k in rec.keys())
cursor.execute(
f'INSERT INTO {table} ({cols}) VALUES ({placeholders})'
f' ON CONFLICT(id) DO UPDATE SET {updates}',
list(rec.values())
)
self._set_meta(table, 'last_pull', data['syncedAt'])
self.conn.commit()
print(f'[PULL] {table}: {len(data["data"])}件 取得')
def push(self, table: str):
"""ローカルの未同期レコードをサーバーへ送信"""
rows = self.conn.execute(
f'SELECT * FROM {table} WHERE sync_status = ?', ('pending',)
).fetchall()
if not rows:
print(f'[PUSH] {table}: 送信するデータなし')
return
records = [dict(r) for r in rows]
resp = requests.post(
f'{API_BASE}/push',
json={'table': table, 'records': records},
headers={'Authorization': f'Bearer {API_TOKEN}'}
)
resp.raise_for_status()
result = resp.json()
for rid in result['success']:
self.conn.execute(
f'UPDATE {table} SET sync_status=? WHERE id=?', ('synced', rid)
)
self.conn.commit()
print(f'[PUSH] {table}: 成功{len(result["success"])}件, コンフリクト{len(result["conflict"])}件')
定期実行スクリプト
# main.py
from sync_agent import SyncAgent
import schedule, time
from datetime import datetime
agent = SyncAgent()
TABLES = ['items', 'categories']
def sync_all():
print(f'--- 同期開始: {datetime.now()} ---')
for table in TABLES:
agent.pull(table) # まずサーバーから取得
agent.push(table) # 次にローカルの変更を送信
# 5分ごとに自動同期
schedule.every(5).minutes.do(sync_all)
sync_all() # 初回即時実行
while True:
schedule.run_pending()
time.sleep(1)
5. セキュリティ設計
JWT認証ミドルウェア
// middleware/auth.js (Node.js)
const jwt = require('jsonwebtoken');
function authenticateToken(req, res, next) {
const token = req.headers['authorization']?.split(' ')[1];
if (!token) return res.status(401).json({ error: 'Token required' });
jwt.verify(token, process.env.JWT_SECRET, (err, user) => {
if (err) return res.status(403).json({ error: 'Invalid token' });
req.user = user;
next();
});
}
module.exports = { authenticateToken };
セキュリティチェックリスト
- 通信はHTTPS(TLS)必須。HTTP経由の同期は絶対に避ける
- SQLインジェクション対策:プリペアドステートメントを必ず使用
- テーブル名・カラム名のホワイトリスト検証(動的クエリの場合)
- レート制限(Rate Limiting)を実装してAPI過負荷を防止
- 送受信データのサイズ制限(例:1リクエスト500件まで)
- APIキー・JWTシークレットは環境変数で管理。コードにハードコードしない
- 同期ログを保存してデータの流れを追跡可能にする
🔒 重要:同期APIはデータベースへの直接アクセスと同等の権限を持ちます。認証・認可の実装を最優先してください。
6. 運用と監視
同期ログテーブルの設計
CREATE TABLE sync_log (
id INT AUTO_INCREMENT PRIMARY KEY,
direction ENUM('push', 'pull') NOT NULL,
table_name VARCHAR(64),
record_count INT DEFAULT 0,
conflict_count INT DEFAULT 0,
error_count INT DEFAULT 0,
duration_ms INT,
status ENUM('success', 'partial', 'failed') NOT NULL,
error_detail TEXT,
executed_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
よくあるトラブルと対処法
| 症状 | 原因 | 対処法 |
|---|---|---|
| データが重複する | 同じレコードを複数回プッシュ | UPSERTを使用し冪等性を担保 |
| 同期が止まる | ネットワークエラーや例外処理漏れ | リトライロジックと例外ハンドリングを実装 |
| タイムスタンプがずれる | ローカルとサーバーの時計が違う | サーバー時刻を正としてNTPで同期 |
| コンフリクトが多発 | 同じデータを複数端末が更新 | ロック戦略の見直し、UUIDで行を特定 |
| 同期が遅い | データ量が多すぎる | ページネーション・LIMIT実装で分割取得 |
まとめ
- UUIDを主キーに使い、ローカル・リモートでIDが衝突しないよう設計する
- updated_at / deleted_at カラムで増分同期と論理削除を実現する
- APIには必ずJWT等の認証を実装し、HTTPS通信のみ許可する
- コンフリクト戦略はシンプルな「Last Write Wins」から始め、必要に応じて複雑化する
- 同期ログを残してトラブル時のデバッグを容易にする
- エラーが起きても途中で止まらないよう、リトライと部分同期に対応する
今回紹介したパターンはシンプルなREST API + SQLite / MySQLの組み合わせですが、スケールが必要になれば WebSocket によるリアルタイム同期や、専用の同期フレームワーク(PouchDB / CouchDB、ElectricSQL 等)の採用も検討してください。
📚 参考:より高度な同期が必要な場合は、ElectricSQL(PostgreSQL対応)やPowerSync(SQLite対応)といった専用ライブラリも選択肢です。


コメント