Source

ConcurrentCallbackQueue.js

"use strict";

/*
 * @module @diomeh/concurrent_callback_queue
 * @license MIT
 * Copyright 2024 David Urbina <davidurbina.dev@gmail.com>
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy
 * of this software and associated documentation files (the "Software"), to deal
 * in the Software without restriction, including without limitation the rights
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 * copies of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in all
 * copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
 */

/**
 * Defines all the possible options that can be set when creating a new queue.
 *
 * @typedef {Object} QueueOptions
 * @property {boolean} autoStart - Indicates if the queue should start execution automatically when a callback is added.
 * @property {number} maxConcurrent - Maximum number of callbacks that can be executed in parallel.
 * @property {Function} onCallbackError - Callback that is executed when an error occurs while executing a callback, receives the error as a parameter.
 * @property {Function} onCallbackSuccess - Callback that is executed after a callback successfully executes.
 * @property {Function} onQueueIdle - Callback that is executed when the queue goes to IDLE state.
 * @property {Function} onQueueBusy - Callback that is executed when the queue goes to BUSY state.
 * @property {Function} onQueueStop - Callback that is executed when the queue stops.
 */

/**
 * Defines all the possible states the queue can be in.
 *
 * @typedef {Object} QueueStates
 * @property {string} IDLE - There are no pending callbacks and the queue will start processing as soon as a callback is added.
 * @property {string} BUSY - Currently processing callbacks up to the maximum concurrent limit.
 * @property {string} STOPPED - Processing has been stopped and no further callbacks will be executed unless the queue is started again.
 */

/**
 * Defines the structure of a callback tuple.
 *
 * @typedef {Object} CallbackTuple
 * @property {Function} callback - The callback function to execute.
 * @property {number} retries - Number of retry attempts in case of an error.
 */

/**
 * Empty function used as a default callback for lifecycle hooks and callback events.
 *
 * @returns {void}
 * @ignore
 */
const noop = () => {};

/**
 * The default options that are used when creating a new queue.
 * This object is frozen to prevent modifications to the default values.
 * For hooks, a do-nothing (noop) function is used.
 *
 * @type {QueueOptions}
 *
 * @example
 * const defaultQueueOptions = {
 *   autoStart: true,
 *   maxConcurrent: 10,
 *   onCallbackError: noop,
 *   onCallbackSuccess: noop,
 *   onQueueIdle: noop,
 *   onQueueBusy: noop,
 *   onQueueStop: noop,
 * };
 */
const defaultQueueOptions = Object.freeze({
  autoStart: true,
  maxConcurrent: 10,
  onCallbackError: noop,
  onCallbackSuccess: noop,
  onQueueIdle: noop,
  onQueueBusy: noop,
  onQueueStop: noop,
});

/**
 * Enumerates the possible states that the queue can be in.
 * This object is frozen to prevent modifications to the state values.
 *
 * @type {QueueStates}
 * @example
 * const QueueState = {
 *   IDLE: "IDLE",
 *   BUSY: "BUSY",
 *   STOPPED: "STOPPED",
 * };
 */
const QueueState = Object.freeze({
  IDLE: "IDLE",
  BUSY: "BUSY",
  STOPPED: "STOPPED",
});

/**
 * A queue implementation that allows for concurrent execution of asynchronous operations.
 *
 * This class provides a way to schedule and manage a queue of callback functions that will be executed concurrently, up to a specified limit.
 * When the maximum number of concurrent callbacks is reached, the queue will wait until one of the running callbacks finishes before starting a new one.
 * The queue can be configured to start automatically when a callback is added, and it can also be stopped and restarted as needed.
 *
 * A main use case is to manage asynchronous operations that need to be executed in parallel,
 * such as making multiple API requests or processing a large number of files.
 *
 * @example Basic usage
 * const queue = new ConcurrentCallbackQueue();
 * queue.enqueue(() => fetch('https://httpstat.us/200,400?sleep=2000'));
 *
 * @example Custom options
 * const queue = new ConcurrentCallbackQueue({
 *    autoStart: false,
 *    onCallbackError: (error) => console.error(error),
 * });
 * queue.enqueue(() => throw new Error('Error'));
 * queue.start();
 *
 * @example Multiple callbacks
 * const queue = new ConcurrentCallbackQueue();
 * queue.enqueueAll([
 *   () => fetch('https://httpstat.us/200,400?sleep=2000'),
 *   () => fetch('https://httpstat.us/200,400?sleep=2000'),
 * ]);
 *
 * @author David Urbina (davidurbina.dev@gmail.com)
 * @version 0.8.32
 * @since 2023-03-23
 * @tutorial 01 - Basic Usage
 * @tutorial 02 - Advanced Usage
 */
class ConcurrentCallbackQueue {
  /**
   * List of pending callbacks to execute concurrently.
   *
   * This property holds an array of functions representing the callbacks that are waiting to be executed.
   *
   * @type {Array<CallbackTuple>}
   * @private
   */
  #pending;

  /**
   * Callbacks currently running
   *
   * Each callback is stored with a unique index to identify it
   *
   * @type {Map<number, Function>}
   * @private
   */
  #running;

  /**
   * Represents the current state of the queue.
   *
   * This property holds the state of the queue as a string, indicating its current status.
   * The possible states are IDLE, BUSY, and STOPPED.
   *
   * @type {string}
   * @see QueueState
   * @see QueueStates
   * @private
   */
  #state;

  /**
   * Number of callbacks currently running.
   *
   * This property keeps track of the count of callbacks that are currently being executed.
   *
   * @type {number}
   * @see ConcurrentCallbackQueue#maxConcurrent
   * @private
   */
  #concurrent;

  /**
   * Queue configuration options
   *
   * @type {QueueOptions}
   * @private
   */
  #options;

  /**
   * Creates a new concurrent callback queue.
   *
   * @param {QueueOptions} options - Queue configuration options.
   * @class
   * @see QueueOptions
   * @public
   */
  constructor(options = defaultQueueOptions) {
    this.#pending = [];
    this.#running = new Map();
    this.#concurrent = 0;
    this.#initOptions(options);

    // Set the initial state of the queue
    this.#state = this.#options.autoStart
      ? QueueState.IDLE
      : QueueState.STOPPED;
  }

  /**
   * Creates a new concurrent callback queue.
   *
   * @param {QueueOptions} options - Queue configuration options.
   * @returns {ConcurrentCallbackQueue} The new queue instance.
   * @class
   * @see QueueOptions
   * @static
   * @public
   */
  static create(options = defaultQueueOptions) {
    return new ConcurrentCallbackQueue(options);
  }

  /********************************************/
  /** Internal API ******************************/

  /**
   * Initializes the queue configuration options, merging the defaults with the provided options.
   *
   * @param {QueueOptions|Object} options - Queue configuration options.
   * @returns {void}
   *
   * @private
   */
  #initOptions(options) {
    this.#options = {
      ...defaultQueueOptions,
      ...(options || {}),
    };

    const hooks = [
      "onCallbackError",
      "onCallbackSuccess",
      "onQueueIdle",
      "onQueueBusy",
      "onQueueStop",
    ];

    // Strip properties not defined in QueueOptions
    for (const key in this.#options) {
      if (!(key in defaultQueueOptions)) {
        delete this.#options[key];
        continue;
      }

      // Set a noop for unspecified callbacks to avoid checking if they are functions in each call
      // We do this as a means of ensuring that the callbacks are always functions
      if (
        hooks.includes(key) &&
        (!this.#options[key] || typeof this.#options[key] !== "function")
      ) {
        this.#options[key] = noop;
      }
    }
  }

  /**
   * Sets the state of the queue and triggers the corresponding event.
   *
   * @param {string} state - The new state of the queue.
   * @returns {string} - The previous state of the queue.
   * @private
   */
  #setState(state) {
    const prevState = this.#state;
    this.#state = state;

    // Trigger queue state events as needed
    switch (state) {
      case QueueState.IDLE:
        this.#options.onQueueIdle();
        break;
      case QueueState.BUSY:
        this.#options.onQueueBusy();
        break;
      case QueueState.STOPPED:
        this.#options.onQueueStop();
        break;
    }

    return prevState;
  }

  /**
   * Handles errors that occur during the execution of a callback.
   *
   * @param {Error} error - The error object.
   * @returns {void}
   * @private
   */
  #handleError(error) {
    this.#options.onCallbackError(error);
  }

  /**
   * Builds a retry mechanism for a callback function.
   *
   * @param {CallbackTuple} tuple - Tuple containing the callback function and the number of retries.
   * @returns {Function} - The callback function wrapped in a retry mechanism.
   */
  #buildRetryCallback(tuple) {
    const callback = tuple?.callback;
    const retries = tuple?.retries || 0;

    const retryCallback = async (currentRetry) => {
      try {
        await callback();
      } catch (error) {
        if (currentRetry < retries) {
          // Retry the callback if the number of retries has not been reached
          // Exec error hook
          this.#handleError(error);
          await retryCallback(currentRetry + 1);
        } else {
          // Rethrow the error if the number of retries has been reached
          throw error;
        }
      }
    };

    return () => retryCallback(0);
  }

  /**
   * Processes the next callback in the queue, if possible.
   *
   * @returns {void}
   * @private
   */
  #processNext() {
    if (
      // Has the user stopped the queue?
      this.#state === QueueState.STOPPED ||
      // Do we have room for more concurrent tasks?
      this.#concurrent >= this.#options.maxConcurrent
    ) {
      return;
    }

    // Are there any more pending callbacks?
    if (this.#pending.length === 0) {
      // Are we done processing all callbacks?
      // If so, idle until a new callback is added
      if (this.#concurrent === 0) {
        this.#setState(QueueState.IDLE);
      }

      return;
    }

    // Prevent any race conditions by checking the tuple directly
    // as dequeue() could return undefined if the queue is empty
    const tuple = this.dequeue();
    if (!tuple) {
      // idle until a new callback is added
      this.#setState(QueueState.IDLE);
      return;
    }

    // Build a retry mechanism for the callback
    const callback = this.#buildRetryCallback(tuple);
    const index = Date.now();

    Promise.resolve()
      .then(() => {
        // Update state
        this.#concurrent++;
        this.#running.set(index, callback);

        return callback();
      })
      .then(() => this.#options.onCallbackSuccess())
      .catch((error) => this.#handleError(error))
      .finally(() => {
        this.#concurrent--;
        this.#running.delete(index);

        // This block could be executed after the queue has been stopped
        // or the main execution interval has been cleared
        // therefore, we need to update state even if it looks redundant
        // to avoid race conditions

        // Check if this is the last running task and set to IDLE if so
        // only when user has not stopped the queue
        if (
          this.#state === QueueState.BUSY &&
          this.#concurrent === 0 &&
          this.#pending.length === 0
        ) {
          this.#setState(QueueState.IDLE);
        }
      });
  }

  /**
   * Main execution loop for the queue.
   *
   * @returns {void}
   * @private
   */
  #run() {
    // Immediately process the first callback
    this.#processNext();

    // Use an interval to process the next callback
    // This allows promises to resolve in the next microtask
    // while still processing the next callback in the queue

    // A loop will run forever as queue state updates are done within promise resolution
    // and promises are only resolved on the next microtask after loop iteration

    // OTOH, while a recursive approach could be used, with big enough queues
    // it could lead to a stack overflow
    const interval = setInterval(() => {
      this.#processNext();

      // Clear the interval after we're done processing all callbacks
      // or if the queue has been stopped by the user
      // State updates are done within promise resolution in #processNext
      if (this.#state !== QueueState.BUSY || this.#pending.length === 0) {
        clearInterval(interval);
      }
    });
  }

  /********************************************/
  /** Public API ******************************/

  /********************************************/
  /** Getters *********************************/

  /**
   * Returns the current state of the queue.
   *
   * @returns {string}
   * @public
   */
  getState() {
    return this.#state;
  }

  /**
   * Returns the number of pending callbacks in the queue.
   *
   * @returns {number}
   * @public
   */
  getPendingCount() {
    return this.#pending.length;
  }

  /**
   * Returns the number of running callbacks in the queue.
   *
   * @returns {number}
   * @public
   */
  getRunningCount() {
    return this.#concurrent;
  }

  /**
   * Returns the current queue configuration options.
   */
  getOptions() {
    return this.#options;
  }

  /********************************************/
  /** Queue Operations ************************/

  /**
   * Adds a callback to the queue, if autoStart is enabled the queue execution starts.
   * You can specify an optional number of retries in case of an error.
   *
   * @param {Function} callback - The callback function to add to the queue.
   * @param {number} [retries=0] - Number of retry attempts in case of an error (optional).
   * @returns {void}
   * @throws {Error} If the callback is not a function or retries is not a number.
   *
   * @public
   */
  enqueue(callback, retries = 0) {
    if (typeof callback !== "function") {
      throw new Error(
        'The "callback" parameter must be a function or a promise',
      );
    }

    if (typeof retries !== "number" || retries < 0) {
      throw new Error('The "retries" parameter must be a positive number');
    }

    this.#pending.push({ callback, retries });
    if (this.#options.autoStart) {
      this.start();
    }
  }

  /**
   * Adds multiple callbacks to the queue, if autoStart is enabled the queue execution starts.
   * You can specify an optional number of retry attempts in case of an error.
   *
   * @param {Array<Function>} callbacks - The array of callback functions to add to the queue.
   * @param {number} [retries=0] - Number of retry attempts in case of an error for all callbacks (optional).
   * @returns {void}
   * @throws {Error} If callbacks is not an array of functions or retries is not a number.
   *
   * @public
   */
  enqueueAll(callbacks, retries = 0) {
    if (
      !Array.isArray(callbacks) ||
      !callbacks.every((callback) => typeof callback === "function")
    ) {
      throw new Error(
        'The "callbacks" parameter must be an array of functions or promises',
      );
    }

    if (typeof retries !== "number" || retries < 0) {
      throw new Error('The "retries" parameter must be a positive number');
    }

    // Map callbacks to a <function, number> tuple to store the number of retries
    const tuples = callbacks.map((callback) => ({ callback, retries }));
    this.#pending.push(...tuples);

    if (this.#options.autoStart) {
      this.start();
    }
  }

  /**
   * Removes a pending callback from the queue without stopping execution.
   *
   * @return {CallbackTuple|undefined} Removed callback tuple or undefined if the queue is empty.
   * @public
   */
  dequeue() {
    return this.#pending.shift();
  }

  /**
   * Removes all pending callbacks from the queue without stopping the queue execution.
   *
   * @returns {Array<CallbackTuple>} List of pending callbacks
   * @public
   */
  dequeueAll() {
    const queue = this.#pending;
    this.#pending = [];
    return queue;
  }

  /********************************************/
  /** Queue Control ***************************/

  /**
   * Starts the execution of the queue.
   *
   * If the queue is stopped at any point and then restarted,
   * the execution resumes from the last pending callback.
   *
   * @returns {void}
   * @public
   */
  start() {
    if (this.#state === QueueState.BUSY || this.#pending.length === 0) {
      return;
    }

    this.#setState(QueueState.BUSY);
    this.#run();
  }

  /**
   * Stops the execution of the queue, but does not remove pending callbacks.
   *
   * Calling this method will not stop the execution of callbacks that are already being processed,
   * nor will it remove pending callbacks from the queue, so if the queue is restarted,
   * it will resume from the last pending callback.
   * Sets the queue state to STOPPED.
   *
   * @returns {void}
   * @public
   */
  stop() {
    this.#setState(QueueState.STOPPED);
  }

  /**
   * Stops the execution of the queue and removes all pending callbacks from it.
   *
   * @returns {Array<CallbackTuple>} List of pending callbacks
   * @public
   */
  clear() {
    this.stop();
    return this.dequeueAll();
  }
}

module.exports = {
  ConcurrentCallbackQueue,
  QueueState,
  defaultQueueOptions,
};