sync-guide/SKILL.md を確認 : hkob の雑記録 (495)

はじめに

hkob の雑記録の第495回目(連続68日目)は、昨日に引き続きテンプレートの中身を確認していきます。次は skills の中にある sync-guide の SKILL.md を読んでみます。

sync-guide/SKILL.md

name: sync-guide description: Notion Workers の sync を構築するための包括的ガイド。2-sync アーキテクチャ(backfill+delta)、replace モード、ページネーション、整合性バッファ、pacer、削除戦略、よくある落とし穴を扱う。sync 関連作業を検知すると自動ロードされる。 user-invocable: false

昨日と同じように SKILL の概要が書かれています。昨日は具体的な作成方法でしたが、こちらは Sync の原理の解説やミスの例などが書かれているようです。

Sync とは?

sync は、Notion データベースを埋めるための「データ変更」を返す、定期的に実行される execute 関数である。ランタイムは execute をループで呼び出す:

const db = worker.database("myDb", {
  type: "managed",
  initialTitle: "My Data",
  primaryKeyProperty: "ID",
  schema: {
    properties: {
      Name: Schema.title(),
      ID: Schema.richText(),
    },
  },
});

worker.sync("mySync", {
  database: db,
  execute: async (state, { notion }) => ({
    changes: [
      { type: "upsert", key: "1", properties: { Name: Builder.title("Item 1"), ID: Builder.richText("1") } },
    ],
    hasMore: false,
    nextState: undefined,
  }),
});

各呼び出しは { changes, hasMore, nextState } を返す。hasMoretrue の場合、ランタイムは nextState を渡して execute を再実行する。hasMorefalse になるまでこれが続き、1 サイクル(cycle)が完了する。次のサイクルはスケジュール間隔ごとに開始され、直前サイクルの最終 state を引き継ぐ。

Imports:

import { Worker } from "@notionhq/workers";
import * as Builder from "@notionhq/workers/builder";
import * as Schema from "@notionhq/workers/schema";

Sync は外部ツールとの同期をするための仕組みです。一度に大量の同期はできないので、適当な数ごとに execute で実行されます。1回の同期は hasMore が false になるまで、繰り返し実行されます。同期自体もスケジュール間隔ごとに適宜実行されます。

判断フレームワーク

Step 1: アーキテクチャを選ぶ

判断要因は API の能力とデータセット規模。2 段階:

条件 アーキテクチャ
小規模(<1k レコード)または API に変更追跡がない Simple replace sync — 1 本の sync、mode: "replace"
それ以外(updated_at、変更フィード、events 等がある) Backfill + delta pair — 同一 DB に書く 2 本の sync

Simple replace sync: 1 サイクルごとに全件を返す。最終的に hasMore: false を返した時点で「見えなかったレコード」は自動削除される。全件再取得が現実的な小規模データに使う。

Backfill + delta pair: 2 つの sync が 1 つのデータベースを共有する。backfill syncmode: "replace", schedule: "manual")はトリガーされたときに全件再取得する。delta syncmode: "incremental", 高頻度スケジュール)は前回以降の変更のみ取得する。責務が明確に分離され、二相 state machine や backfill→delta 遷移バグを避けられる。

昨日まで解説したように Simple replace sync か Backfill + delta pair を選びます。基本的には外部ツールが情報の変更差分を取れる場合には、後者を選びます。

Step 2: API のページネーションを理解する

多くの API は結果をページングする必要がある。1 回の execute ではおおむね ~100 件程度の changes を返す。1 回で返しすぎると失敗する。

Backfill のページング(全件ロード):

  1. 不透明カーソルトークン — GraphQL の endCursor、Stripe の starting_after
  2. ページ番号 / オフセット?page=N&limit=100
  3. キーセット(timestamp + id)WHERE created_at > X OR (created_at = X AND id > Y)(タイムスタンプでソートされうる可変データに対する最も堅牢な方式)

Delta のページング(変更分のみ、incremental):

  1. タイムスタンプカーソル?updated_since=<cursor>(整合性バッファ付き)
  2. updated_at + id のキーセット — 変更日時に対して同じキーセット方式を適用
  3. イベント/変更ログフィードGET /events?after=<eventId>
  4. 同じ不透明カーソル — API が updated_at ソートを提供する場合、backfill 用カーソルを delta にも流用できる

API では大量のデータを取り扱うことができないので、ページネーションで対応する必要があります。Backfill の場合には全件取り出す必要があるので、具体的な切れ目を指定してページネーションを実行する形になります。Delta の場合には変更差分を取り出すので、あまりページネーションに頼ることは多くないかもしれませんが、それでも100件以上の差分がある場合にはページネーションが必要になります。こちらも変更差分の切れ目を何かのキーとしてページネーションを実行します。

Step 3: 整合性バッファ(Delta Sync)

API は最終整合性であることが多い。直近に書き込まれた/更新されたレコードは、すぐにクエリ結果に現れない場合がある。incremental モードではカーソルが巻き戻らないため、未インデックスのレコードを飛び越えると永久に取りこぼす。カーソルは「今」より 10〜60 秒遅らせる:

const bufferMs = 15_000;
const maxCursor = new Date(Date.now() - bufferMs).toISOString();
const nextCursor = records.length > 0
  ? min(lastRecord.updatedAt, maxCursor)
  : maxCursor;

Delta Sync の場合には日付などで差分を取ることが多いですが、直近に書き込まれたレコードが遅延などで検出されずに取りこぼす可能性があります。これを避けるために10秒から60秒ほど時間を巻き戻して取得する必要があるようです。

Step 4: 削除戦略

  1. Backfill sync(replace): タダで対応できる — 各サイクルで見えなかったレコードは自動削除される。API に delete シグナルがない場合の主要な削除処理になる。
  2. Delta sync(delete API がある): { type: "delete", key } を出す。delete シグナルが別エンドポイント(監査ログ、archived フィルタ等)から来る場合は flip-flop パターンを使う: まず通常の delta ストリームを hasMore: false まで回し、その後 delete ストリームに 1 サイクル切り替えて実行し、また戻す。両方のカーソルは state に独立して保持する。
  3. delete API なし、かつ大規模: backfill(replace)の mark-and-sweep に頼る。手動または低頻度スケジュールで backfill を走らせ、陳腐化レコードを掃除する。

外部ツールに削除のシグナルがある場合には、Delta sync が使えます。ない場合には、小規模であれば全部取得して存在しないものを消すことで対応ができます。ただ大規模の場合には、負荷が高くなるので、低頻度スケジュールで backfill を走らせて、陳腐化したレコードでたまに掃除する(ガベージコレクション)をする形になります。

Replace モード

基本: 全件を取得して全件返し、削除処理はランタイムに任せる。小規模ソースに対する単体 sync、または backfill+delta の backfill 側として使う。

const db = worker.database("records", {
  type: "managed",
  initialTitle: "Records",
  primaryKeyProperty: "ID",
  schema: {
    properties: { Name: Schema.title(), ID: Schema.richText() },
  },
});

const apiPacer = worker.pacer("myApi", {
  allowedRequests: 10,
  intervalMs: 1000,
});

worker.sync("recordsBackfill", {
  database: db,
  mode: "replace",
  schedule: "manual",  // 手動 or 低頻度スケジュールで実行
  execute: async (state) => {
    const page = state?.page ?? 1;
    await apiPacer.wait();
    const { items, totalPages } = await fetchPage(page, 100);
    const hasMore = page < totalPages;
    return {
      changes: items.map((item) => ({
        type: "upsert" as const,
        key: item.id,
        properties: { Name: Builder.title(item.name), ID: Builder.richText(item.id) },
      })),
      hasMore,
      nextState: hasMore ? { page: page + 1 } : undefined,
    };
  },
});

完全動作例は examples/replace-simple.tsexamples/replace-paginated.ts を参照。

Replace の場合や Backfill の場合には、変更も全件取得で差分を確認する形になります。Backfill の場合には、Delta の Incremental モードでの変更取りこぼしの確認程度の利用になるので、非常に低頻度で実施する形になります。examples にサンプルがあるので、そちらを参考にするとよさそうです。

Incremental モード(Delta Sync)

delta sync は前回以降の変更のみを取得する。同一 DB に対する replace-mode backfill sync と組み合わせることで、旧来の「二相 single sync」パターンを置き換える。

// 上で定義した `db` と `apiPacer` を再利用する

worker.sync("recordsDelta", {
  database: db,
  mode: "incremental",
  schedule: "5m",
  execute: async (state: { cursor: string } | undefined) => {
    const cursor = state?.cursor ?? new Date(0).toISOString();
    const bufferTs = new Date(Date.now() - 15_000).toISOString();

    await apiPacer.wait();
    const { items, nextCursor } = await fetchChanges(cursor);
    const done = !nextCursor;

    return {
      changes: items.map(toUpsert),
      hasMore: !done,
      nextState: {
        cursor: done ? min(nextCursor ?? cursor, bufferTs) : nextCursor,
      },
    };
  },
});

要点:

  • delta sync の state は単純(カーソルのみ)。フェーズ判定は不要。
  • backfill(replace)が初回全件ロードと、削除レコードの定期掃除を担う。
  • 両 sync は同じ db ハンドルを使って同じデータベースへ書き込む。
  • pacer は sync 間で共有できる(サーバ側で予算が均等に配分される)。

完全パターンは examples/incremental-basic.ts / examples/incremental-bimodal.ts / examples/incremental-events.ts を参照。

同期の際のレート制限を決める pacer を設定すると、複数の同期が実行された場合にはうまく均等配分されるようです。そのあたりは worker の方で対応してくれるので、コーダーは気にする必要はないようです。

スキーマ参照

Notion データベースの形は Schema 型で定義し、値は Builder で構築する:

Schema 型 Builder 値 メモ
Schema.title() Builder.title("text") 主要表示フィールド。各 schema に必ず 1 つ必要。
Schema.richText() Builder.richText("text") テキスト、ID
Schema.url() Builder.url("https://...") URL
Schema.email() Builder.email("a@b.com") Email
Schema.phoneNumber() Builder.phoneNumber("+1...") 電話番号
Schema.checkbox() Builder.checkbox(true) 真偽値
Schema.file() Builder.file("https://...", "name") ファイル URL + 表示名(任意)
Schema.number() Builder.number(42) 数値。任意のフォーマット: Schema.number("percent")
Schema.date() Builder.date("2024-01-15") 日付(YYYY-MM-DD)。他: Builder.dateTime("2024-01-15T10:30:00Z")Builder.dateRange(start, end)
Schema.select([...]) Builder.select("Option A") 単一 select。options 定義: Schema.select([{ name: "A" }, { name: "B" }])options の name は空文字不可Schema.select([]){ name: "" } は未対応)。
Schema.multiSelect([...]) Builder.multiSelect("A", "B") 複数 select
Schema.status(...) Builder.status("Done") グループ付き status
Schema.people() Builder.people("email@co.com") Email 指定の people
Schema.place() Builder.place({ latitude, longitude }) 地理位置
Schema.relation("databaseKey") [Builder.relation("pk")] 別 managed DB への relation。値は 配列

relation は関連データベースの key を使う。双方向 relation も同様に設定する:

Schema.relation("otherDatabase", { twoWay: true, relatedPropertyName: "Back Link" })

行単位のアイコンとページ本文:

changes: [{
  type: "upsert", key: "1",
  properties: { ... },
  icon: Builder.emojiIcon("🎯"),               // または Builder.notionIcon("rocket", "blue")
  pageContentMarkdown: "## Details\\nSome text", // ページ本文(Markdown)
}]

worker.database の schema では、Notion のデータベースのプロパティ名と型の定義を記述します。その時の型は上に書かれた Schema.xxxx() で指定します。実際に changes で値を設定するには Builder.xxxx() を記述します。

よくあるミス

  1. pacer を使わないexecute 内のすべての API 呼び出しは await apiPacer.wait() を前置する。ないとレート制限で失敗する。
  2. delta に整合性バッファがない — 最終整合性 API で未インデックスのレコードを永久に取りこぼす。
  3. ページングしない — 1 回で changes を返しすぎる。まずは ~100 件程度のバッチから始める。
  4. 大規模データで replace を回す — 変更追跡があるなら、全件 backfill(replace)+ 差分 delta(incremental)の組み合わせにする。
  5. カーソルが進まない — 無限ループ。反復ごとに nextState が変わることを保証する。
  6. 初回実行(state undefined)を忘れる — 1 回目は stateundefinedstate?.cursor ?? null のように扱う。
  7. backfill+delta が同じ DB を共有することを忘れる — 両方とも同じ worker.database() ハンドル、同じ key/properties 形状を使う必要がある。
  8. backfill をトリガーしないschedule: "manual" の backfill は自動実行されない。デプロイ時や定期的に実行し、削除レコードを掃除する。
  9. select の値が空Schema.select()name が空でない option を最低 1 つ要する。Schema.select([]){ name: "" } は未対応。

ここに 9 個のよくあるミスが記述されています。失敗例も説明されているのは助かりますね。

Sync 開発用 CLI コマンド

# Deploy
ntn workers deploy

# Preview(書き込まずにテスト)
ntn workers sync trigger <key> --preview
ntn workers sync trigger <key> --preview --context '<json>'  # ページング継続

# Trigger(本番実行)
ntn workers sync trigger <key>

# Sync 状態確認
ntn workers sync status

# 実行ログ
ntn workers runs list
ntn workers runs list --plain | head -n1 | cut -f1 | xargs -I{} ntn workers runs logs {}

# state リセット(フル再バックフィル)
ntn workers sync state reset <key>

# secrets 管理
ntn workers env set KEY=value
ntn workers env push

他の SKILL にも書かれていますが、よく使う ntn コマンドの説明が書かれています。

API パターン参照

Salesforce / Stripe / HubSpot / GitHub / ServiceNow などの本番 sync から抽出した戦略は、api-pagination-patterns.md を参照。

Salesforce / Stripe / HubSpot / GitHub / ServiceNow などの例が api-pagination-patterns.md に書かれているようなので、こちらも後で確認してみたいと思います。

おわりに

今日は、sync-guide を確認してみました。こちらは手順書ではなく、原理が書かれていました。人間が内容を理解するには、こちらを先に読んだ方が良かったですね。こちらを読んだおかげで昨日までの二つの書類の意味がよくわかってきました。

https://hkob.notion.site/hkob-16dd8e4e98ab807cbe3cf3cc94cdfe0f?pvs=4hkob.notion.site