Skip to content
Snippets Groups Projects
FeedsScheduler.inc 7.37 KiB
Newer Older
<?php
// $Id$

/**
 * @file
 * FeedsScheduler class and related.
 */

/**
 * Describe a scheduler.
 */
interface FeedsSchedulerInterface {

  /**
   * Run Drupal cron.
   */
  public function cron();

  /**
   * Add a feed to the schedule.
   *
   * @param $importer_id
   *   Id of a FeedsImporter object.
   * @param $callback
   *   The callback to invoke on importer. Either 'import' or 'expire'.
   * @param $feed_nid
   *   Feed nid that identifies the source for this configuration.
   */
  public function add($importer_id, $callback, $feed_nid = 0);

  /**
   * Remove a feed from the schedule.
   *
   * @param $feed_nid
   *   Feed nid that identifies the source for this configuration.
   */
  public function remove($importer_id, $callback, $feed_nid = 0);

  /**
   * Work off a given feed identified by $job.
   * @param $job
   *   Array where 'id' key is the id of a FeedsImporter object,
   *   and 'feed_nid' is the feed node id that identifies the
   *   source of a FeedsSource object.
   */
  public function work($job);
 * Implementation of FeedsSchedulerInterface.
 *
 * This scheduler uses the last_scheduled_time paradigm: By storing the time
 * when a particular feed was scheduled to be refreshed last rather than
 * storing when a feed should be _refreshed_ next, we gain two advantages:
 *
 * 1) If a feed's import_period setting changes, it has immediate effects -
 *    without batch updating an existing schedule.
 * 2) The time between refreshes will always be scheduled based on when it
 *    has been scheduled last. Less drift occurs.
 */
class FeedsScheduler implements FeedsSchedulerInterface {

  /**
   * Create a single instance of FeedsScheduler.
   */
  public static function instance() {
    static $instance;
    if (!isset($instance)) {
      $class = variable_get('feeds_scheduler_class', 'FeedsScheduler');
      $instance = new $class();
    }
    return $instance;
  }

  /**
   * Protect constructor.
   */
  protected function __construct() {}

  /**
   * Implementation of FeedsSchedulerInterface::cron().
   *
   * Refreshes scheduled feeds.
   *
   * If drupal_queue is present, only pushes refresh tasks to queue and
   * returns. If drupal_queue is not available, works off tasks.
   */
  public function cron() {
    // Check and set scheduler semaphore, take time.
    if (variable_get('feeds_scheduler_cron', FALSE)) {
      watchdog('FeedsScheduler', 'Last cron process did not finish.', array(), WATCHDOG_ERROR);
    }
    variable_set('feeds_scheduler_cron', TRUE);
    // Release schedule lock where the lock is older than 1 hour.
    db_query('UPDATE {feeds_schedule} SET scheduled = 0 WHERE scheduled < %d', FEEDS_REQUEST_TIME - 3600);
    // Iterate over feed importers, pick $num jobs for each of them and
    // schedule them.
    if ($importers = feeds_importer_load_all()) {
      $num = $this->queue() ? variable_get('feeds_schedule_queue_num', 200) : variable_get('feeds_schedule_num', 5);
        foreach ($importer->getScheduleCallbacks() as $callback) {
          $period = $importer->getSchedulePeriod($callback);
          if ($period != FEEDS_SCHEDULE_NEVER) {
            $result = db_query_range('SELECT feed_nid, id, callback, last_executed_time FROM {feeds_schedule} WHERE id = "%s" AND callback = "%s" AND scheduled = 0 AND (last_executed_time < %d OR last_executed_time = 0) ORDER BY last_executed_time ASC', $importer->id, $callback, FEEDS_REQUEST_TIME - $period, 0, $num);
            while ($job = db_fetch_array($result)) {
              $this->schedule($job);
              // @todo Add time limit.
            }
          }
        }
      }
    }
    // Unflag and post a message that we're done.
    variable_set('feeds_scheduler_cron', FALSE);
    watchdog('FeedsScheduler', 'Finished processing schedule after !time.', array('!time' => format_interval(time() - $start)));
   * Implementation of FeedsSchedulerInterface::add().
   * @todo Create optional parameter $last_executed_time to pass in. Set this
   *   value if a feed is refreshed on creation.
   */
  public function add($importer_id, $callback, $feed_nid = 0) {
    $save = array(
      'id' => $importer_id,
      'callback' => $callback,
      'feed_nid' => $feed_nid,
      'last_executed_time' => 0,
      'scheduled' => 0, // Means NOT scheduled at the moment.
    );
    drupal_write_record('feeds_schedule', $save, array('id', 'callback', 'feed_nid'));
    if (!db_affected_rows()) {
      drupal_write_record('feeds_schedule', $save);
    }
  }

  /**
   * Implementation of FeedsSchedulerInterface::remove().
   */
  public function remove($importer_id, $callback, $feed_nid = 0) {
    db_query('DELETE FROM {feeds_schedule} WHERE id = "%s" AND callback = "%s" AND feed_nid = %d', $importer_id, $callback, $feed_nid);
  }

  /**
   * Implementation of FeedsSchedulerInterface::work().
   *
   * Refresh a feed.
   *
   * Used as worker callback invoked from feeds_scheduler_refresh() or
   * if drupal_queue is not enabled, directly from $this->cron().
   */
  public function work($job) {
    $importer = feeds_importer($job['id']);
      if (FEEDS_BATCH_COMPLETE == $importer->work($job)) {
        $this->finished($job);
      }
    }
    catch (Exception $e) {
      watchdog('FeedsScheduler', $e->getMessage(), array(), WATCHDOG_ERROR);
    // Make sure that job is not scheduled after this method has executed.
    $this->unschedule($job);
   * @return
   *   Drupal Queue if available, NULL otherwise.
   */
  protected function queue() {
    if (module_exists('drupal_queue')) {
      drupal_queue_include();
      return drupal_queue_get(FEEDS_SCHEDULER_QUEUE);
    }
  }

  /**
   * Attempt to reserve a job. If successful work it off or - if Drupal Queue is
   * available - queue it.
   * The lock/release mechanism makes sure that an item does not get queued
   * twice. It has a different purpose than the FeedsSource level locking
   * which is in place to avoid concurrent import/clear operations on a source.
   * @param $job
   *   A job array.
   protected function schedule($job) {
     db_query("UPDATE {feeds_schedule} SET scheduled = %d WHERE id = '%s' AND feed_nid = %d AND callback = '%s'", FEEDS_REQUEST_TIME, $job['id'], $job['feed_nid'], $job['callback']);
     if (db_affected_rows()) {
       if ($this->queue()) {
         if (!$queue->createItem($job)) {
           $this->unschedule($job);
           watchdog('FeedsScheduler', 'Error adding item to queue.', WATCHDOG_CRITICAL);
           return;
         }
       }
       else {
         $this->work($job);
       }
     }
   }
   * Remove a job from schedule.
   * This function sets the source's scheduled bit to 0 and thus makes
   * @param $job
   *   A job array.
  protected function unschedule($job) {
    unset($job['last_executed_time']);
    $job = array(
    ) + $job;
    drupal_write_record('feeds_schedule', $job, array('id', 'callback', 'feed_nid'));
  }

  /**
   * Release a job and set its last_executed_time flag.
   *
   * @param $job
   *   A job array.
   */
  protected function finished($job) {
    $job = array(
      'scheduled' => 0,
      'last_executed_time' => FEEDS_REQUEST_TIME,
    ) + $job;
    drupal_write_record('feeds_schedule', $job, array('id', 'callback', 'feed_nid'));