Shogo's Blog

Jun 22, 2024 - 2 minute read - typescript

TypeScriptで同時実行数を制限しながら並行実行する

タスクがたくさんあって並行実行することを考えます。 何も考えずにすべてのタスクを並行実行すると負荷が高すぎるので、 同時実行数を制限したいことがありました。

ググってみるといくつか実装例が見つかりますが、その多くは配列を受け入れるものです。 AsyncIterator を受け入れるバージョンが欲しいなと思い、 他の人の記事を参考に実装してみました。

IteratorまたはIterableを受け入れる版

いきなりAsyncIterator版を実装するのは大変なので、Iterator版で練習してみました。 以下の関数 limitConcurrency は、タスクのIteratorまたはIterableを受け取って、並行実行します。

async function limitConcurrency<T>(
  iter: Iterator<() => Promise<T>> | Iterable<() => Promise<T>>,
  limit: number
) {
  const iterator = Symbol.iterator in iter ? iter[Symbol.iterator]() : iter;
  async function runNext(): Promise<void> {
    for (;;) {
      const { value: task, done } = iterator.next();
      if (done) {
        return;
      }
      await task();
    }
  }

  try {
    const initialTasks: Promise<void>[] = [];
    for (let i = 0; i < limit; i++) {
      initialTasks.push(runNext());
    }

    await Promise.all(initialTasks);
  } finally {
    iterator.return?.();
  }
}

Promise.all を使うことで、シンプルに書けました。 参考にした例では new Promise を使っていましたが、 async / await だけ使って実現するよう改良しました。 new Promise を使うと、例外が発生したときにスタックトレースが途中で途切れてしまうためです。

以下のようにして使います。

function* taskGenerator(): Generator<() => Promise<void>> {
  for (let i = 0; i < 10; i++) {
    yield async () => {
      console.log(`Task ${i} started`);
      await new Promise((resolve) => setTimeout(resolve, 1000)); // 1秒待つ
      console.log(`Task ${i} finished`);
    };
  }
}

const iterator = taskGenerator();
limitConcurrency(iterator, 2);

AsyncIteratorまたはAsyncIterableを受け入れる版

以下の関数 asyncLimitConcurrencylimitConcurrency の AsyncIterator版です。

async function asyncLimitConcurrency<T>(
  iter: AsyncIterator<() => Promise<T>> | AsyncIterable<() => Promise<T>>,
  limit: number
) {
  const iterator =
    Symbol.asyncIterator in iter ? iter[Symbol.asyncIterator]() : iter;
  async function runNext(): Promise<void> {
    for (;;) {
      const { value: task, done } = await iterator.next();
      if (done) {
        return;
      }
      await task();
    }
  }

  try {
    const initialTasks: Promise<void>[] = [];
    for (let i = 0; i < limit; i++) {
      initialTasks.push(runNext());
    }

    await Promise.all(initialTasks);
  } finally {
    iterator.return?.();
  }
}

以下は使用例です。

async function* asyncTaskGenerator(): AsyncGenerator<() => Promise<void>> {
  for (let i = 0; i < 10; i++) {
    await new Promise((resolve) => setTimeout(resolve, 100)); // 0.1秒待つ

    yield async () => {
      console.log(`Task ${i} started`);
      await new Promise((resolve) => setTimeout(resolve, 1000)); // 1秒待つ
      console.log(`Task ${i} finished`);
    };
  }
}

const asyncIterator = asyncTaskGenerator();
asyncLimitConcurrency(asyncIterator, 2);

想定する用途

  • タスクの一覧のネットワーク経由で取ってくるので、非同期処理が必要
  • たくさんのタスクがあるので、ページネーションが必要

といった用途を想定しています。

たとえば祝日APIを使って祝日の一覧を取得し、 それぞれの祝日に対して時間のかかる処理を行う、というケースを考えてみます。 祝日APIで年をまたいで一覧を取得するにはページネーションが必要です。 祝日の処理には時間がかかるので並行実行したい。 でもあまり並列度が高いと困るので、並列度は制限したい。

全部の要件を一度に満たすのは大変ですが、 asyncLimitConcurrency を使えば以下のように書けます。

async function* holidays(): AsyncGenerator<() => Promise<void>> {
  for (const year of [2024, 2025, 2026]) {
    // API経由で休日の一覧を取得する
    const response = await fetch(`https://holidays-jp.shogo82148.com/${year}`);
    const holidays = await response.json();

    // 各休日に対してなんらかの処理を行う
    for (const holiday of holidays.holidays) {
      yield async () => {
        // 時間のかかる処理がここに入る
        await new Promise((resolve) => setTimeout(resolve, 1000));
        console.log(`${holiday.date} ${holiday.name}`);
      };
    }
    yield async () => {};
  }
}

asyncLimitConcurrency(holidays(), 3);

まとめ

TypeScriptで同時実行数を制限しながら並行実行するプログラムを書いてみました。 ネットワーク経由のAPIを叩くときに重宝するのではと思ってます。

並行の波を制する知恵、
静かな秩序が今ここに、
コードの調和、タスクのリズム、
ラビットは跳ねる、喜びの瞬間🐇✨

by CodeRabbit

参考