From fd0b053d6736ca24ac5f2c5c9af170f7af61bf4f Mon Sep 17 00:00:00 2001
From: Alex Barth <alex_b@53995.no-reply.drupal.org>
Date: Sun, 3 Oct 2010 15:15:48 +0000
Subject: [PATCH] #927892: Add "Process in background" feature.

---
 CHANGELOG.txt              |   3 +
 feeds.module               | 113 ++++++++---------
 feeds.pages.inc            |  56 +++++---
 includes/FeedsImporter.inc |  16 ++-
 includes/FeedsSource.inc   | 253 ++++++++++++++++++++++++++++++-------
 5 files changed, 315 insertions(+), 126 deletions(-)

diff --git a/CHANGELOG.txt b/CHANGELOG.txt
index 5de2c50a..8b77a863 100644
--- a/CHANGELOG.txt
+++ b/CHANGELOG.txt
@@ -3,6 +3,9 @@
 Feeds 7.x 2.0 XXXXXXXXXXXXXXXXXXX
 ---------------------------------
 
+- #927892 alex_b: Add "Process in background" feature. Allows one-off imports to
+  be processed in the background rather than using Batch API. Useful for very
+  large imports.
 - #929058 alex_b: Report status of source on import and delete forms, track
   last updated time on a source level.
 - #928836: Set progress floating point directly. Note: fetchers and parsers
diff --git a/feeds.module b/feeds.module
index e139bd4a..6511c94c 100644
--- a/feeds.module
+++ b/feeds.module
@@ -49,6 +49,9 @@ function feeds_cron_job_scheduler_info() {
   $info['feeds_source_import'] = array(
     'queue name' => 'feeds_source_import',
   );
+  $info['feeds_source_clear'] = array(
+    'queue name' => 'feeds_source_clear',
+  );
   $info['feeds_importer_expire'] = array(
     'queue name' => 'feeds_importer_expire',
   );
@@ -64,6 +67,10 @@ function feeds_cron_queue_info() {
     'worker callback' => 'feeds_source_import',
     'time' => 15,
   );
+  $queues['feeds_source_clear'] = array(
+    'worker callback' => 'feeds_source_clear',
+    'time' => 15,
+  );
   $queues['feeds_importer_expire'] = array(
     'worker callback' => 'feeds_importer_expire',
     'time' => 15,
@@ -83,7 +90,22 @@ function feeds_source_import($job) {
   catch (Exception $e) {
     watchdog('feeds_source_import()', $e->getMessage(), array(), WATCHDOG_ERROR);
   }
-  $source->schedule();
+  $source->scheduleImport();
+}
+
+/**
+ * Scheduler callback for deleting all items from a source.
+ */
+function feeds_source_clear($job) {
+  $source = feeds_source($job['type'], $job['id']);
+  try {
+    $source->existing()->clear();
+  }
+  catch (FeedsNotExistingException $e) {}
+  catch (Exception $e) {
+    watchdog('feeds_source_clear()', $e->getMessage(), array(), WATCHDOG_ERROR);
+  }
+  $source->scheduleClear();
 }
 
 /**
@@ -98,7 +120,34 @@ function feeds_importer_expire($job) {
   catch (Exception $e) {
     watchdog('feeds_importer_expire()', $e->getMessage(), array(), WATCHDOG_ERROR);
   }
-  $importer->schedule();
+  $importer->scheduleExpire();
+}
+
+/**
+ * Batch API worker callback. Used by FeedsSource::startBatchAPIJob().
+ *
+ * @see FeedsSource::startBatchAPIJob().
+ *
+ * @todo Harmonize Job Scheduler API callbacks with Batch API callbacks?
+ *
+ * @param $method
+ *   Method to execute on importer; one of 'import' or 'clear'.
+ * @param $importer_id
+ *   Identifier of a FeedsImporter object.
+ * @param $feed_nid
+ *   If importer is attached to content type, feed node id identifying the
+ *   source to be imported.
+ * @param $context
+ *   Batch context.
+ */
+function feeds_batch($method, $importer_id, $feed_nid = 0, &$context) {
+  $context['finished'] = FEEDS_BATCH_COMPLETE;
+  try {
+    $context['finished'] = feeds_source($importer_id, $feed_nid)->$method();
+  }
+  catch (Exception $e) {
+    drupal_set_message($e->getMessage(), 'error');
+  }
 }
 
 /**
@@ -417,12 +466,13 @@ function feeds_node_insert($node) {
   _feeds_node_processor_node_insert($node);
   feeds_node_update($node);
   if ($importer_id = feeds_get_importer_id($node->type)) {
+    $source = feeds_source($importer_id, $node->nid);
     // Start import if requested.
     if (feeds_importer($importer_id)->config['import_on_create'] && !isset($node->feeds['suppress_import'])) {
-      feeds_batch_set(t('Importing'), 'import', $importer_id, $node->nid);
+      $source->startImport();
     }
     // Schedule source and importer.
-    feeds_source($importer_id, $node->nid)->schedule();
+    $source->schedule();
     feeds_importer($importer_id)->schedule();
   }
 }
@@ -549,61 +599,6 @@ function feeds_form_alter(&$form, $form_state, $form_id) {
   }
 }
 
-/**
- * @}
- */
-
-/**
- * @defgroup batch Batch functions
- */
-
-/**
- * Batch helper.
- *
- * @param $title
- *   Title to show to user when executing batch.
- * @param $method
- *   Method to execute on importer; one of 'import', 'clear' or 'expire'.
- * @param $importer_id
- *   Identifier of a FeedsImporter object.
- * @param $feed_nid
- *   If importer is attached to content type, feed node id identifying the
- *   source to be imported.
- */
-function feeds_batch_set($title, $method, $importer_id, $feed_nid = 0) {
-  $batch = array(
-    'title' => $title,
-    'operations' => array(
-      array('feeds_batch', array($method, $importer_id, $feed_nid)),
-    ),
-    'progress_message' => '',
-  );
-  batch_set($batch);
-}
-
-/**
- * Batch callback.
- *
- * @param $method
- *   Method to execute on importer; one of 'import' or 'clear'.
- * @param $importer_id
- *   Identifier of a FeedsImporter object.
- * @param $feed_nid
- *   If importer is attached to content type, feed node id identifying the
- *   source to be imported.
- * @param $context
- *   Batch context.
- */
-function feeds_batch($method, $importer_id, $feed_nid = 0, &$context) {
-  $context['finished'] = 1;
-  try {
-    $context['finished'] = feeds_source($importer_id, $feed_nid)->$method();
-  }
-  catch (Exception $e) {
-    drupal_set_message($e->getMessage(), 'error');
-  }
-}
-
 /**
  * @}
  */
diff --git a/feeds.pages.inc b/feeds.pages.inc
index 0ae1de26..ecf8bb63 100644
--- a/feeds.pages.inc
+++ b/feeds.pages.inc
@@ -66,6 +66,12 @@ function feeds_import_form($form, &$form_state, $importer_id) {
     '#type' => 'submit',
     '#value' => t('Import'),
   );
+  $progress = $source->progressImporting();
+  if ($progress !== FEEDS_BATCH_COMPLETE) {
+    $form['submit']['#disabled'] = TRUE;
+    $form['submit']['#value'] =
+      t('Importing (@progress %)', array('@progress' => number_format(100 * $progress, 0)));
+  }
   return $form;
 }
 
@@ -89,7 +95,7 @@ function feeds_import_form_submit($form, &$form_state) {
 
   // Refresh feed if import on create is selected.
   if ($source->importer->config['import_on_create']) {
-    feeds_batch_set(t('Importing'), 'import', $form['#importer_id']);
+    $source->startImport();
   }
 
   // Add to schedule, make sure importer is scheduled, too.
@@ -114,7 +120,14 @@ function feeds_import_tab_form($form, &$form_state, $node) {
     '#tree' => TRUE,
     '#value' => feeds_source_status($source),
   );
-  return confirm_form($form, t('Import all content from source?'), 'node/'. $node->nid, '', t('Import'), t('Cancel'), 'confirm feeds update');
+  $form = confirm_form($form, t('Import all content from source?'), 'node/'. $node->nid, '', t('Import'), t('Cancel'), 'confirm feeds update');
+  $progress = $source->progressImporting();
+  if ($progress !== FEEDS_BATCH_COMPLETE) {
+    $form['actions']['submit']['#disabled'] = TRUE;
+    $form['actions']['submit']['#value'] =
+      t('Importing (@progress %)', array('@progress' => number_format(100 * $progress, 0)));
+  }
+  return $form;
 }
 
 /**
@@ -122,7 +135,7 @@ function feeds_import_tab_form($form, &$form_state, $node) {
  */
 function feeds_import_tab_form_submit($form, &$form_state) {
   $form_state['redirect'] = $form['#redirect'];
-  feeds_batch_set(t('Importing'), 'import', $form['#importer_id'], $form['#feed_nid']);
+  feeds_source($form['#importer_id'], $form['#feed_nid'])->startImport();
 }
 
 /**
@@ -150,7 +163,14 @@ function feeds_delete_tab_form($form, &$form_state, $importer_id, $node = NULL)
     '#tree' => TRUE,
     '#value' => feeds_source_status($source),
   );
-  return confirm_form($form, t('Delete all items from source?'), $form['#redirect'], '', t('Delete'), t('Cancel'), 'confirm feeds update');
+  $form = confirm_form($form, t('Delete all items from source?'), $form['#redirect'], '', t('Delete'), t('Cancel'), 'confirm feeds update');
+  $progress = $source->progressClearing();
+  if ($progress !== FEEDS_BATCH_COMPLETE) {
+    $form['actions']['submit']['#disabled'] = TRUE;
+    $form['actions']['submit']['#value'] =
+      t('Deleting (@progress %)', array('@progress' => number_format(100 * $progress, 0)));
+  }
+  return $form;
 }
 
 /**
@@ -159,7 +179,7 @@ function feeds_delete_tab_form($form, &$form_state, $importer_id, $node = NULL)
 function feeds_delete_tab_form_submit($form, &$form_state) {
   $form_state['redirect'] = $form['#redirect'];
   $feed_nid = empty($form['#feed_nid']) ? 0 : $form['#feed_nid'];
-  feeds_batch_set(t('Deleting'), 'clear', $form['#importer_id'], $feed_nid);
+  feeds_source($form['#importer_id'], $feed_nid)->startClear();
 }
 
 /**
@@ -196,10 +216,8 @@ function feeds_source_status($source) {
   if ($progress_clearing != FEEDS_BATCH_COMPLETE) {
     $v['progress_clearing'] = $progress_clearing;
   }
-  if (empty($v)) {
-    $v['imported'] = $source->imported;
-    $v['count'] = $source->itemCount();
-  }
+  $v['imported'] = $source->imported;
+  $v['count'] = $source->itemCount();
   if (!empty($v)) {
     return theme('feeds_source_status', $v);
   }
@@ -211,15 +229,6 @@ function feeds_source_status($source) {
 function theme_feeds_source_status($v) {
   $output = '<div class="info-box feeds-source-status">';
   $items = array();
-  if ($v['count']) {
-    if ($v['imported']) {
-      $items[] = t('Last import: @ago ago.', array('@ago' => format_interval(REQUEST_TIME - $v['imported'], 1)));
-    }
-    $items[] = t('@count imported items total.', array('@count' => $v['count']));
-  }
-  else {
-    $items[] = t('No imported items.');
-  }
   if ($v['progress_importing']) {
     $progress = number_format(100.0 * $v['progress_importing'], 0);
     $items[] = t('Importing - @progress % complete.', array('@progress' => $progress));
@@ -228,6 +237,17 @@ function theme_feeds_source_status($v) {
     $progress = number_format(100.0 * $v['progress_clearing'], 0);
     $items[] = t('Deleting items - @progress % complete.', array('@progress' => $progress));
   }
+  if (!count($items)) {
+    if ($v['count']) {
+      if ($v['imported']) {
+        $items[] = t('Last import: @ago ago.', array('@ago' => format_interval(REQUEST_TIME - $v['imported'], 1)));
+      }
+      $items[] = t('@count imported items total.', array('@count' => $v['count']));
+    }
+    else {
+      $items[] = t('No imported items.');
+    }
+  }
   $output .= theme('item_list', array('items' => $items));
   $output .= '</div>';
   return $output;
diff --git a/includes/FeedsImporter.inc b/includes/FeedsImporter.inc
index 0fc3902b..b5c487d8 100644
--- a/includes/FeedsImporter.inc
+++ b/includes/FeedsImporter.inc
@@ -74,9 +74,16 @@ class FeedsImporter extends FeedsConfigurable {
   }
 
   /**
-   * Schedule this importer.
+   * Schedule all periodic tasks for this importer.
    */
   public function schedule() {
+    $this->scheduleExpire();
+  }
+
+  /**
+   * Schedule expiry of items.
+   */
+  public function scheduleExpire() {
     $job = array(
       'type' => $this->id,
       'period' => 0,
@@ -232,6 +239,7 @@ class FeedsImporter extends FeedsConfigurable {
       'import_period' => 1800, // Refresh every 30 minutes by default.
       'expire_period' => 3600, // Expire every hour by default, this is a hidden setting.
       'import_on_create' => TRUE, // Import on submission.
+      'process_in_background' => FALSE,
     );
   }
 
@@ -285,6 +293,12 @@ class FeedsImporter extends FeedsConfigurable {
       '#description' => t('Check if import should be started at the moment a standalone form or node form is submitted.'),
       '#default_value' => $config['import_on_create'],
     );
+    $form['process_in_background'] = array(
+      '#type' => 'checkbox',
+      '#title' => t('Process in background'),
+      '#description' => t('For very large imports. If checked, import and delete tasks started from the web UI will be handled by a cron task in the background rather than by the browser. This does not affect periodic imports, they are handled by a cron task in any case.') . $cron_required,
+      '#default_value' => $config['process_in_background'],
+    );
     return $form;
   }
 
diff --git a/includes/FeedsSource.inc b/includes/FeedsSource.inc
index 97837fb2..a886e1de 100644
--- a/includes/FeedsSource.inc
+++ b/includes/FeedsSource.inc
@@ -6,6 +6,11 @@
  * Definition of FeedsSourceInterface and FeedsSource class.
  */
 
+/**
+ * Distinguish exceptions occuring when handling locks.
+ */
+class FeedsLockException extends Exception {}
+
 /**
  * Denote a import or clearing stage. Used for multi page processing.
  */
@@ -213,7 +218,106 @@ class FeedsSource extends FeedsConfigurable {
   }
 
   /**
-   * Import a feed: execute fetching, parsing and processing stage.
+   * Start importing a source.
+   *
+   * This method starts an import job. Depending on the configuration of the
+   * importer of this source, a Batch API job or a background job with Job
+   * Scheduler will be created.
+   *
+   * @throws Exception
+   *   If processing in background is enabled, the first batch chunk of the
+   *   import will be executed on the current page request. This means that this
+   *   method may throw the same exceptions as FeedsSource::import().
+   */
+  public function startImport() {
+    $config = $this->importer->getConfig();
+    if ($config['process_in_background']) {
+      $this->startBackgroundJob('import');
+    }
+    else {
+      $this->startBatchAPIJob(t('Importing'), 'import');
+    }
+  }
+
+  /**
+   * Start deleting all imported items of a source.
+   *
+   * This method starts a clear job. Depending on the configuration of the
+   * importer of this source, a Batch API job or a background job with Job
+   * Scheduler will be created.
+   *
+   * @throws Exception
+   *   If processing in background is enabled, the first batch chunk of the
+   *   clear task will be executed on the current page request. This means that
+   *   this method may throw the same exceptions as FeedsSource::clear().
+   */
+  public function startClear() {
+    $config = $this->importer->getConfig();
+    if ($config['process_in_background']) {
+      $this->startBackgroundJob('clear');
+    }
+    else {
+      $this->startBatchAPIJob(t('Deleting'), 'clear');
+    }
+  }
+
+  /**
+   * Schedule all periodic tasks for this source.
+   */
+  public function schedule() {
+    $this->scheduleImport();
+  }
+
+  /**
+   * Schedule periodic or background import tasks.
+   */
+  public function scheduleImport() {
+    // Check whether any fetcher is overriding the import period.
+    $period = $this->importer->config['import_period'];
+    $fetcher_period = $this->importer->fetcher->importPeriod($this);
+    if (is_numeric($fetcher_period)) {
+      $period = $fetcher_period;
+    }
+    $period = $this->progressImporting() === FEEDS_BATCH_COMPLETE ? $period : 0;
+    $job = array(
+      'type' => $this->id,
+      'id' => $this->feed_nid,
+      // Schedule as soon as possible if a batch is active.
+      'period' => $period,
+      'periodic' => TRUE,
+    );
+    if ($period != FEEDS_SCHEDULE_NEVER) {
+      JobScheduler::get('feeds_source_import')->set($job);
+    }
+    else {
+      JobScheduler::get('feeds_source_import')->remove($job);
+    }
+  }
+
+  /**
+   * Schedule background clearing tasks.
+   */
+  public function scheduleClear() {
+    // Schedule as soon as possible if batch is not complete.
+    if ($this->progressClearing() !== FEEDS_BATCH_COMPLETE) {
+      $job = array(
+        'type' => $this->id,
+        'id' => $this->feed_nid,
+        'period' => 0,
+        'periodic' => TRUE,
+      );
+      JobScheduler::get('feeds_source_clear')->set($job);
+    }
+    else {
+      JobScheduler::get('feeds_source_clear')->remove($job);
+    }
+  }
+
+  /**
+   * Import a source: execute fetching, parsing and processing stage.
+   *
+   * This method only executes the current batch chunk, then returns. If you are
+   * looking to import an entire source, use FeedsSource::startImport() instead.
    *
    * @return
    *   FEEDS_BATCH_COMPLETE if the import process finished. A decimal between
@@ -223,6 +327,7 @@ class FeedsSource extends FeedsConfigurable {
    *   Throws Exception if an error occurs when importing.
    */
   public function import() {
+    $this->acquireLock();
     try {
       // Fetch.
       if (empty($this->fetcher_result) || FEEDS_BATCH_COMPLETE == $this->progressParsing()) {
@@ -237,29 +342,31 @@ class FeedsSource extends FeedsConfigurable {
 
       // Process.
       $this->importer->processor->process($this, $parser_result);
-
-      // Find out whether we are done.
-      $result = $this->progressImporting();
-      if ($result === FEEDS_BATCH_COMPLETE) {
-        module_invoke_all('feeds_after_import', $this);
-        $this->imported = REQUEST_TIME;
-        unset($this->fetcher_result);
-        unset($this->state);
-      }
     }
-    catch (Exception $e) {
-      unset($this->fetcher_result);
-      unset($this->state);
-      $this->save();
-      throw $e;
+    catch (Exception $e) {}
+    $this->releaseLock();
+
+    // Clean up.
+    $result = $this->progressImporting();
+    if ($result == FEEDS_BATCH_COMPLETE || isset($e)) {
+      module_invoke_all('feeds_after_import', $this);
+      $this->imported = REQUEST_TIME;
+      unset($this->fetcher_result, $this->state);
     }
     $this->save();
+    if (isset($e)) {
+      throw $e;
+    }
     return $result;
   }
 
   /**
    * Remove all items from a feed.
    *
+   * This method only executes the current batch chunk, then returns. If you are
+   * looking to delete all items of a source, use FeedsSource::startClear()
+   * instead.
+   *
    * @return
    *   FEEDS_BATCH_COMPLETE if the clearing process finished. A decimal between
    *   0.0 and 0.9 periodic if clearing is still in progress.
@@ -268,22 +375,25 @@ class FeedsSource extends FeedsConfigurable {
    *   Throws Exception if an error occurs when clearing.
    */
   public function clear() {
+    $this->acquireLock();
     try {
       $this->importer->fetcher->clear($this);
       $this->importer->parser->clear($this);
       $this->importer->processor->clear($this);
-      $result = $this->progressClearing();
-      if ($result == FEEDS_BATCH_COMPLETE) {
-        unset($this->state);
-        module_invoke_all('feeds_after_clear', $this);
-      }
     }
-    catch (Exception $e) {
+    catch (Exception $e) {}
+    $this->releaseLock();
+
+    // Clean up.
+    $result = $this->progressClearing();
+    if ($result == FEEDS_BATCH_COMPLETE || isset($e)) {
+      module_invoke_all('feeds_after_clear', $this);
       unset($this->state);
-      $this->save();
-      throw $e;
     }
     $this->save();
+    if (isset($e)) {
+      throw $e;
+    }
     return $result;
   }
 
@@ -342,31 +452,6 @@ class FeedsSource extends FeedsConfigurable {
     return $this->state[$stage];
   }
 
-  /**
-   * Schedule this source.
-   */
-  public function schedule() {
-    // Check whether any fetcher is overriding the import period.
-    $period = $this->importer->config['import_period'];
-    $fetcher_period = $this->importer->fetcher->importPeriod($this);
-    if (is_numeric($fetcher_period)) {
-      $period = $fetcher_period;
-    }
-    $job = array(
-      'type' => $this->id,
-      'id' => $this->feed_nid,
-      // Schedule as soon as possible if a batch is active.
-      'period' => $this->progressImporting() === FEEDS_BATCH_COMPLETE ? $period : 0,
-      'periodic' => TRUE,
-    );
-    if ($job['period'] != FEEDS_SCHEDULE_NEVER) {
-      JobScheduler::get('feeds_source_import')->set($job);
-    }
-    else {
-      JobScheduler::get('feeds_source_import')->remove($job);
-    }
-  }
-
   /**
    * Count items imported by this source.
    */
@@ -539,4 +624,76 @@ class FeedsSource extends FeedsConfigurable {
       }
     }
   }
+
+  /**
+   * Background job helper. Starts a background job using Job Scheduler.
+   *
+   * Execute the first batch chunk of a background job on the current page load,
+   * moves the rest of the job processing to a cron powered background job.
+   *
+   * Executing the first batch chunk is important, otherwise, when a user
+   * submits a source for import or clearing, we will leave her without any
+   * visual indicators of an ongoing job.
+   *
+   * @see FeedsSource::startImport().
+   * @see FeedsSource::startClear().
+   *
+   * @param $method
+   *   Method to execute on importer; one of 'import' or 'clear'.
+   *
+   * @throws Exception $e
+   */
+  protected function startBackgroundJob($method) {
+    if (FEEDS_BATCH_COMPLETE != $this->$method()) {
+      $job = array(
+        'type' => $this->id,
+        'id' => $this->feed_nid,
+        'period' => 0,
+        'periodic' => FALSE,
+      );
+      JobScheduler::get("feeds_source_{$method}")->set($job);
+    }
+  }
+
+  /**
+   * Batch API helper. Starts a Batch API job.
+   *
+   * @see FeedsSource::startImport().
+   * @see FeedsSource::startClear().
+   * @see feeds_batch()
+   *
+   * @param $title
+   *   Title to show to user when executing batch.
+   * @param $method
+   *   Method to execute on importer; one of 'import' or 'clear'.
+   */
+  protected function startBatchAPIJob($title, $method) {
+    $batch = array(
+      'title' => $title,
+      'operations' => array(
+        array('feeds_batch', array($method, $this->id, $this->feed_nid)),
+      ),
+      'progress_message' => '',
+    );
+    batch_set($batch);
+  }
+
+  /**
+   * Acquires a lock for this source.
+   *
+   * @throws FeedsLockException
+   *   If a lock for the requested job could not be acquired.
+   */
+  protected function acquireLock() {
+    if (!lock_acquire("feeds_source_{$this->id}_{$this->feed_nid}", 60.0)) {
+      throw new FeedsLockException(t('Cannot acquire lock for source @id / @feed_nid.', array('@id' => $this->id, '@feed_nid' => $this->feed_nid)));
+    }
+  }
+
+  /**
+   * Releases a lock for this source.
+   */
+  protected function releaseLock() {
+    lock_release("feeds_source_{$this->id}_{$this->feed_nid}");
+  }
 }
-- 
GitLab