Skip to content
Snippets Groups Projects
Commit 9e4acf98 authored by Alex Barth's avatar Alex Barth
Browse files

#600584 alexb: Use Batch API.

parent 777d6f2a
No related branches found
No related tags found
No related merge requests found
Showing with 2057 additions and 205 deletions
...@@ -3,6 +3,10 @@ ...@@ -3,6 +3,10 @@
Feeds 6.x 1.0 xxxxx xx, xxxx-xx-xx Feeds 6.x 1.0 xxxxx xx, xxxx-xx-xx
---------------------------------- ----------------------------------
- #600584 alexb: Use Batch API. NOTE: third party plugins/extensions
implementing FeedsProcessor::process(), FeedsProcessor::clear() or
FeedsImporter::expire() need to adjust their implementations.
Feeds 6.x 1.0 Alpha 11, 2010-02-10 Feeds 6.x 1.0 Alpha 11, 2010-02-10
---------------------------------- ----------------------------------
......
...@@ -125,6 +125,11 @@ Description: The table used by FeedsDataProcessor to store feed items. Usually a ...@@ -125,6 +125,11 @@ Description: The table used by FeedsDataProcessor to store feed items. Usually a
and the importer's id ($importer_id). This default table name can and the importer's id ($importer_id). This default table name can
be overridden by defining a variable with the same name. be overridden by defining a variable with the same name.
Name: feeds_node_batch_size
Default: 50
The number of nodes feed node processor creates or deletes in one
page load.
Glossary Glossary
======== ========
......
...@@ -70,6 +70,13 @@ function feeds_schema() { ...@@ -70,6 +70,13 @@ function feeds_schema() {
'not null' => TRUE, 'not null' => TRUE,
'description' => t('Main source resource identifier. E. g. a path or a URL.'), 'description' => t('Main source resource identifier. E. g. a path or a URL.'),
), ),
'batch' => array(
'type' => 'text',
'size' => 'big',
'not null' => FALSE,
'description' => t('Cache for batching.'),
'serialize' => TRUE,
),
), ),
'primary key' => array('id', 'feed_nid'), 'primary key' => array('id', 'feed_nid'),
'indexes' => array( 'indexes' => array(
...@@ -102,27 +109,26 @@ function feeds_schema() { ...@@ -102,27 +109,26 @@ function feeds_schema() {
'default' => '', 'default' => '',
'description' => 'Callback to be invoked.', 'description' => 'Callback to be invoked.',
), ),
'last_scheduled_time' => array( 'last_executed_time' => array(
'type' => 'int', 'type' => 'int',
'unsigned' => FALSE, 'unsigned' => TRUE,
'default' => 0, 'default' => 0,
'not null' => TRUE, 'not null' => TRUE,
'description' => 'Timestamp when this feed was last scheduled to be refreshed.', 'description' => 'Timestamp when a job was last executed.',
), ),
'scheduled' => array( 'scheduled' => array(
'type' => 'int', 'type' => 'int',
'unsigned' => FALSE, 'unsigned' => TRUE,
'size' => 'tiny',
'default' => 0, 'default' => 0,
'not null' => TRUE, 'not null' => TRUE,
'description' => 'Flags whether a feed is scheduled to be refreshed or not.', 'description' => 'Timestamp when a job was scheduled. 0 if a job is currently not scheduled.',
), ),
), ),
'indexes' => array( 'indexes' => array(
'feed_nid' => array('feed_nid'), 'feed_nid' => array('feed_nid'),
'id' => array('id'), 'id' => array('id'),
'id_callback' => array('id', 'callback'), 'id_callback' => array('id', 'callback'),
'last_scheduled_time' => array('last_scheduled_time'), 'last_executed_time' => array('last_executed_time'),
'scheduled' => array('scheduled'), 'scheduled' => array('scheduled'),
), ),
); );
...@@ -315,5 +321,45 @@ function feeds_update_6007() { ...@@ -315,5 +321,45 @@ function feeds_update_6007() {
); );
db_add_field($ret, 'feeds_node_item', 'hash', $spec); db_add_field($ret, 'feeds_node_item', 'hash', $spec);
return $ret;
}
/**
* Add batch field to feeds_source table, adjust feeds_schedule table.
*/
function feeds_update_6008() {
$ret = array();
$spec = array(
'type' => 'text',
'size' => 'big',
'not null' => FALSE,
'description' => t('Cache for batching.'),
'serialize' => TRUE,
);
db_add_field($ret, 'feeds_source', 'batch', $spec);
// Make scheduled flag a timestamp.
$spec = array(
'type' => 'int',
'size' => 'normal',
'unsigned' => TRUE,
'default' => 0,
'not null' => TRUE,
'description' => 'Timestamp when a job was scheduled. 0 if a job is currently not scheduled.',
);
db_change_field($ret, 'feeds_schedule', 'scheduled', 'scheduled', $spec);
// Rename last_scheduled_time to last_executed_time, fix unsigned property.
$spec = array(
'type' => 'int',
'size' => 'normal',
'unsigned' => TRUE,
'default' => 0,
'not null' => TRUE,
'description' => 'Timestamp when a job was last executed.',
);
db_change_field($ret, 'feeds_schedule', 'last_scheduled_time', 'last_executed_time', $spec);
return $ret; return $ret;
} }
\ No newline at end of file
...@@ -283,7 +283,7 @@ function feeds_nodeapi(&$node, $op, $form) { ...@@ -283,7 +283,7 @@ function feeds_nodeapi(&$node, $op, $form) {
// Refresh feed if import on create is selected and suppress_import is // Refresh feed if import on create is selected and suppress_import is
// not set. // not set.
if ($op == 'insert' && feeds_importer($importer_id)->config['import_on_create'] && !isset($node->feeds['suppress_import'])) { if ($op == 'insert' && feeds_importer($importer_id)->config['import_on_create'] && !isset($node->feeds['suppress_import'])) {
$source->import(); feeds_batch_set(t('Importing'), 'import', $importer_id, $node->nid);
} }
// Add import to scheduler. // Add import to scheduler.
feeds_scheduler()->add($importer_id, 'import', $node->nid); feeds_scheduler()->add($importer_id, 'import', $node->nid);
...@@ -369,6 +369,68 @@ function feeds_scheduler_work($feed_info) { ...@@ -369,6 +369,68 @@ function feeds_scheduler_work($feed_info) {
* @} End of "defgroup hooks". * @} End of "defgroup hooks".
*/ */
/**
* @defgroup batch Batch functions.
*/
/**
* Batch helper.
*
* @param $title
* Title to show to user when executing batch.
* @param $method
* Method to execute on importer; one of 'import', 'clear' or 'expire'.
* @param $importer_id
* Identifier of a FeedsImporter object.
* @param $feed_nid
* If importer is attached to content type, feed node id identifying the
* source to be imported.
*/
function feeds_batch_set($title, $method, $importer_id, $feed_nid = 0) {
$batch = array(
'title' => $title,
'operations' => array(
array('feeds_batch', array($method, $importer_id, $feed_nid)),
),
'progress_message' => '',
);
batch_set($batch);
}
/**
* Batch callback.
*
* @param $method
* Method to execute on importer; one of 'import' or 'clear'.
* @param $importer_id
* Identifier of a FeedsImporter object.
* @param $feed_nid
* If importer is attached to content type, feed node id identifying the
* source to be imported.
* @param $context
* Batch context.
*/
function feeds_batch($method, $importer_id, $feed_nid = 0, &$context) {
$context['finished'] = 1;
try {
switch ($method) {
case 'import':
$context['finished'] = feeds_source($importer_id, $feed_nid)->import();
break;
case 'clear':
$context['finished'] = feeds_source($importer_id, $feed_nid)->clear();
break;
}
}
catch (Exception $e) {
drupal_set_message($e->getMessage(), 'error');
}
}
/**
* @} End of "defgroup batch".
*/
/** /**
* @defgroup utility Utility functions * @defgroup utility Utility functions
* @{ * @{
......
...@@ -83,7 +83,7 @@ function feeds_import_form_submit($form, &$form_state) { ...@@ -83,7 +83,7 @@ function feeds_import_form_submit($form, &$form_state) {
// Refresh feed if import on create is selected. // Refresh feed if import on create is selected.
if ($source->importer->config['import_on_create']) { if ($source->importer->config['import_on_create']) {
$source->import(); feeds_batch_set(t('Importing'), 'import', $form['#importer_id']);
} }
// Add importer to schedule. // Add importer to schedule.
...@@ -107,8 +107,9 @@ function feeds_import_tab_form(&$form_state, $node) { ...@@ -107,8 +107,9 @@ function feeds_import_tab_form(&$form_state, $node) {
/** /**
* Submit handler for feeds_import_tab_form(). * Submit handler for feeds_import_tab_form().
*/ */
function feeds_import_tab_form_submit($form, $form_state) { function feeds_import_tab_form_submit($form, &$form_state) {
feeds_source($form['#importer_id'], $form['#feed_nid'])->import(); $form_state['redirect'] = $form['#redirect'];
feeds_batch_set(t('Importing'), 'import', $form['#importer_id'], $form['#feed_nid']);
} }
/** /**
...@@ -135,5 +136,6 @@ function feeds_delete_tab_form(&$form_state, $importer_id, $node = NULL) { ...@@ -135,5 +136,6 @@ function feeds_delete_tab_form(&$form_state, $importer_id, $node = NULL) {
* Submit handler for feeds_delete_tab_form(). * Submit handler for feeds_delete_tab_form().
*/ */
function feeds_delete_tab_form_submit($form, &$form_state) { function feeds_delete_tab_form_submit($form, &$form_state) {
feeds_source($form['#importer_id'], empty($form['#feed_nid']) ? 0 : $form['#feed_nid'])->clear(); $form_state['redirect'] = $form['#redirect'];
feeds_batch_set(t('Deleting'), 'clear', $form['#importer_id'], empty($form['#feed_nid']) ? 0 : $form['#feed_nid']);
} }
<?php <?php
// $Id$ // $Id$
/**
* A FeedsBatch object holds the state of an import or clear batch. Used in
* FeedsSource class.
*/
class FeedsBatch {
// Public counters for easier access.
public $total;
public $created;
public $updated;
public $deleted;
public function __construct() {
$this->total = 0;
$this->created = 0;
$this->updated = 0;
$this->deleted = 0;
}
}
/** /**
* A FeedsImportBatch is the actual content retrieved from a FeedsSource. On * A FeedsImportBatch is the actual content retrieved from a FeedsSource. On
* import, it is created on the fetching stage and passed through the parsing * import, it is created on the fetching stage and passed through the parsing
...@@ -9,7 +27,7 @@ ...@@ -9,7 +27,7 @@
* @see FeedsSource class * @see FeedsSource class
* @see FeedsFetcher class * @see FeedsFetcher class
*/ */
class FeedsImportBatch { class FeedsImportBatch extends FeedsBatch {
protected $url; protected $url;
protected $file_path; protected $file_path;
...@@ -26,6 +44,7 @@ class FeedsImportBatch { ...@@ -26,6 +44,7 @@ class FeedsImportBatch {
$this->url = $url; $this->url = $url;
$this->file_path = $file_path; $this->file_path = $file_path;
$this->items = array(); $this->items = array();
parent::__construct();
} }
/** /**
...@@ -33,21 +52,17 @@ class FeedsImportBatch { ...@@ -33,21 +52,17 @@ class FeedsImportBatch {
* The raw content of the feed. * The raw content of the feed.
*/ */
public function getRaw() { public function getRaw() {
if (empty($this->raw)) { if ($this->file_path) {
// Prefer file. return file_get_contents(realpath($this->file_path));
if ($this->file_path) { }
$this->raw = file_get_contents(realpath($this->file_path)); elseif ($this->url) {
} feeds_include_library('http_request.inc', 'http_request');
elseif ($this->url) { $result = http_request_get($this->url);
feeds_include_library('http_request.inc', 'http_request'); if ($result->code != 200) {
$result = http_request_get($this->url); throw new Exception(t('Download of @url failed with code !code.', array('@url' => $this->url, '!code' => $result->code)));
if ($result->code != 200) {
throw new Exception(t('Download of @url failed with code !code.', array('@url' => $this->url, '!code' => $result->code)));
}
$this->raw = $result->data;
} }
return $result->data;
} }
return $this->raw;
} }
/** /**
...@@ -133,12 +148,13 @@ class FeedsImportBatch { ...@@ -133,12 +148,13 @@ class FeedsImportBatch {
/** /**
* Set items. * Set items.
* *
* @param $items * @param $items
* An array of the items in the feed. Cannot be NULL. * An array of the items in the feed. Cannot be NULL.
*/ */
public function setItems($items) { public function setItems($items) {
$this->items = $items; $this->items = $items;
$this->total = count($items);
} }
/** /**
......
...@@ -11,6 +11,10 @@ require_once(dirname(__FILE__) .'/FeedsConfigurable.inc'); ...@@ -11,6 +11,10 @@ require_once(dirname(__FILE__) .'/FeedsConfigurable.inc');
require_once(dirname(__FILE__) .'/FeedsSource.inc'); require_once(dirname(__FILE__) .'/FeedsSource.inc');
require_once(dirname(__FILE__) .'/FeedsBatch.inc'); require_once(dirname(__FILE__) .'/FeedsBatch.inc');
// Status of batched operations.
define('FEEDS_BATCH_COMPLETE', 1);
define('FEEDS_BATCH_ACTIVE', 0);
/** /**
* Class defining an importer object. This is the main hub for Feeds module's * Class defining an importer object. This is the main hub for Feeds module's
* functionality. * functionality.
...@@ -60,15 +64,23 @@ class FeedsImporter extends FeedsConfigurable { ...@@ -60,15 +64,23 @@ class FeedsImporter extends FeedsConfigurable {
} }
/** /**
* Remove items older than $time. If $time is not given, processor settings * Remove items older than $time.
* will be used. *
* @param $time
* All items older than FEEDS_REQUEST_TIME - $time will be deleted. If not
* given, internal processor settings will be used.
*
* @return
* FEEDS_BATCH_COMPLETE if complete, a float between 0 and 1 indicating
* progress otherwise.
*/ */
public function expire($time = NULL) { public function expire($time = NULL) {
try { try {
$this->processor->expire($time); return $this->processor->expire($time);
} }
catch (Exception $e) { catch (Exception $e) {
drupal_set_message($e->getMessage(), 'error'); drupal_set_message($e->getMessage(), 'error');
return FEEDS_BATCH_COMPLETE;
} }
} }
...@@ -76,22 +88,25 @@ class FeedsImporter extends FeedsConfigurable { ...@@ -76,22 +88,25 @@ class FeedsImporter extends FeedsConfigurable {
* Callback for scheduler to invoke task. Do not execute if this importer is * Callback for scheduler to invoke task. Do not execute if this importer is
* not persistent at all. * not persistent at all.
* *
* @param $feed_info
* FeedsScheduler feed infor array.
*
* @see FeedsScheduler::work(). * @see FeedsScheduler::work().
*
* @param $job
* Array that is a FeedsScheduler job definition.
*
* @return
* FEEDS_BATCH_COMPLETE if complete, a float between 0 and 1 indicating
* progress otherwise.
*/ */
public function work($feed_info) { public function work($job) {
if ($this->export_type == FEEDS_EXPORT_NONE) { if ($this->export_type == FEEDS_EXPORT_NONE) {
return; return;
} }
switch ($feed_info['callback']) { switch ($job['callback']) {
case 'import': case 'import':
feeds_source($feed_info['importer_id'], $feed_info['feed_nid'])->import(); return feeds_source($job['id'], $job['feed_nid'])->import();
break; break;
case 'expire': case 'expire':
$this->expire(); return $this->expire();
break;
} }
} }
......
...@@ -37,14 +37,14 @@ interface FeedsSchedulerInterface { ...@@ -37,14 +37,14 @@ interface FeedsSchedulerInterface {
public function remove($importer_id, $callback, $feed_nid = 0); public function remove($importer_id, $callback, $feed_nid = 0);
/** /**
* Work off a given feed identified by $feed_info. * Work off a given feed identified by $job.
* *
* @param $feed_info * @param $job
* Array where 'importer_id' key is the id of a FeedsImporter object, * Array where 'id' key is the id of a FeedsImporter object,
* and 'feed_nid' is the feed node id that identifies the * and 'feed_nid' is the feed node id that identifies the
* source of a FeedsSource object. * source of a FeedsSource object.
*/ */
public function work($feed_info); public function work($job);
} }
/** /**
...@@ -87,7 +87,6 @@ class FeedsScheduler implements FeedsSchedulerInterface { ...@@ -87,7 +87,6 @@ class FeedsScheduler implements FeedsSchedulerInterface {
* returns. If drupal_queue is not available, works off tasks. * returns. If drupal_queue is not available, works off tasks.
*/ */
public function cron() { public function cron() {
// Check and set scheduler semaphore, take time. // Check and set scheduler semaphore, take time.
if (variable_get('feeds_scheduler_cron', FALSE)) { if (variable_get('feeds_scheduler_cron', FALSE)) {
watchdog('FeedsScheduler', 'Last cron process did not finish.', array(), WATCHDOG_ERROR); watchdog('FeedsScheduler', 'Last cron process did not finish.', array(), WATCHDOG_ERROR);
...@@ -95,55 +94,26 @@ class FeedsScheduler implements FeedsSchedulerInterface { ...@@ -95,55 +94,26 @@ class FeedsScheduler implements FeedsSchedulerInterface {
variable_set('feeds_scheduler_cron', TRUE); variable_set('feeds_scheduler_cron', TRUE);
$start = time(); $start = time();
// Get feeds configuration, check whether drupal_queue is present and set // Release schedule lock where the lock is older than 1 hour.
// parameters accordingly. db_query('UPDATE {feeds_schedule} SET scheduled = 0 WHERE scheduled < %d', FEEDS_REQUEST_TIME - 3600);
if ($importers = feeds_importer_load_all()) {
if ($use_queue = module_exists('drupal_queue')) {
drupal_queue_include();
$queue = drupal_queue_get(FEEDS_SCHEDULER_QUEUE);
$num = variable_get('feeds_schedule_queue_num', 200);
}
else {
$num = variable_get('feeds_schedule_num', 5);
}
// Iterate over feed configurations, pick $num feeds for each // Iterate over feed importers, pick $num jobs for each of them and
// configuration, push to queue or refresh feeds. // schedule them.
if ($importers = feeds_importer_load_all()) {
$num = $this->queue() ? variable_get('feeds_schedule_queue_num', 200) : variable_get('feeds_schedule_num', 5);
foreach ($importers as $importer) { foreach ($importers as $importer) {
foreach ($importer->getScheduleCallbacks() as $callback) { foreach ($importer->getScheduleCallbacks() as $callback) {
// Check whether jobs are scheduled.
$period = $importer->getSchedulePeriod($callback); $period = $importer->getSchedulePeriod($callback);
if ($period != FEEDS_SCHEDULE_NEVER) { if ($period != FEEDS_SCHEDULE_NEVER) {
$result = db_query_range('SELECT feed_nid, id, callback, last_executed_time FROM {feeds_schedule} WHERE id = "%s" AND callback = "%s" AND scheduled = 0 AND (last_executed_time < %d OR last_executed_time = 0) ORDER BY last_executed_time ASC', $importer->id, $callback, FEEDS_REQUEST_TIME - $period, 0, $num);
// Refresh feeds that have a refresh time older than now minus while ($job = db_fetch_array($result)) {
// refresh period. $this->schedule($job);
$time = FEEDS_REQUEST_TIME - $period; // @todo Add time limit.
$result = db_query_range('SELECT feed_nid, id AS importer_id, callback, last_scheduled_time FROM {feeds_schedule} WHERE id = "%s" AND callback = "%s" AND scheduled = 0 AND (last_scheduled_time < %d OR last_scheduled_time = 0) ORDER BY last_scheduled_time ASC', $importer->id, $callback, $time, 0, $num);
while ($feed_info = db_fetch_array($result)) {
// If drupal_queue is present, add to queue, otherwise work off
// immediately.
if ($use_queue) {
if ($queue->createItem($feed_info)) {
$this->flag($feed_info['importer_id'], $feed_info['callback'], $feed_info['feed_nid']);
}
else {
watchdog('FeedsScheduler', 'Error adding item to queue.', WATCHDOG_ALERT);
}
}
else {
$this->flag($feed_info['importer_id'], $feed_info['callback'], $feed_info['feed_nid']);
$this->work($feed_info);
}
} }
} }
} }
} }
} }
// Unflag and post a message that we're done. // Unflag and post a message that we're done.
variable_set('feeds_scheduler_cron', FALSE); variable_set('feeds_scheduler_cron', FALSE);
watchdog('FeedsScheduler', 'Finished processing schedule after !time.', array('!time' => format_interval(time() - $start))); watchdog('FeedsScheduler', 'Finished processing schedule after !time.', array('!time' => format_interval(time() - $start)));
...@@ -154,16 +124,15 @@ class FeedsScheduler implements FeedsSchedulerInterface { ...@@ -154,16 +124,15 @@ class FeedsScheduler implements FeedsSchedulerInterface {
* *
* Add a feed to the scheduler. * Add a feed to the scheduler.
* *
* @todo Create optional parameter $last_scheduled_time to pass in. Set this * @todo Create optional parameter $last_executed_time to pass in. Set this
* value if a feed is refreshed on creation. * value if a feed is refreshed on creation.
* @todo Create an abstract interface for items that can be added?
*/ */
public function add($importer_id, $callback, $feed_nid = 0) { public function add($importer_id, $callback, $feed_nid = 0) {
$save = array( $save = array(
'id' => $importer_id, 'id' => $importer_id,
'callback' => $callback, 'callback' => $callback,
'feed_nid' => $feed_nid, 'feed_nid' => $feed_nid,
'last_scheduled_time' => 0, 'last_executed_time' => 0,
'scheduled' => 0, // Means NOT scheduled at the moment. 'scheduled' => 0, // Means NOT scheduled at the moment.
); );
drupal_write_record('feeds_schedule', $save, array('id', 'callback', 'feed_nid')); drupal_write_record('feeds_schedule', $save, array('id', 'callback', 'feed_nid'));
...@@ -187,62 +156,86 @@ class FeedsScheduler implements FeedsSchedulerInterface { ...@@ -187,62 +156,86 @@ class FeedsScheduler implements FeedsSchedulerInterface {
* Used as worker callback invoked from feeds_scheduler_refresh() or * Used as worker callback invoked from feeds_scheduler_refresh() or
* if drupal_queue is not enabled, directly from $this->cron(). * if drupal_queue is not enabled, directly from $this->cron().
*/ */
public function work($feed_info) { public function work($job) {
$importer = feeds_importer($feed_info['importer_id']); $importer = feeds_importer($job['id']);
// Remove scheduled flag, if we fail after this we'd like to try again asap.
$this->unflag($feed_info['importer_id'], $feed_info['callback'], $feed_info['feed_nid']);
try { try {
$importer->work($feed_info); if (FEEDS_BATCH_COMPLETE == $importer->work($job)) {
$this->finished($job);
}
} }
catch (Exception $e) { catch (Exception $e) {
watchdog('FeedsScheduler', $e->getMessage(), array(), WATCHDOG_ERROR); watchdog('FeedsScheduler', $e->getMessage(), array(), WATCHDOG_ERROR);
} }
// Make sure that job is not scheduled after this method has executed.
$this->unschedule($job);
} }
/** /**
* Helper function to flag a feed scheduled. * @return
* Drupal Queue if available, NULL otherwise.
*/
protected function queue() {
if (module_exists('drupal_queue')) {
drupal_queue_include();
return drupal_queue_get(FEEDS_SCHEDULER_QUEUE);
}
}
/**
* Attempt to reserve a job. If successful work it off or - if Drupal Queue is
* available - queue it.
* *
* This function sets the feed's scheduled bit to 1 and updates * The lock/release mechanism makes sure that an item does not get queued
* last_scheduled_time to FEEDS_REQUEST_TIME. * twice. It has a different purpose than the FeedsSource level locking
* which is in place to avoid concurrent import/clear operations on a source.
* *
* @param $id * @param $job
* Id of the importer configuration. * A job array.
* @param $callback
* Callback of the job.
* @param $feed_nid
* Identifier of the feed node.
*/ */
protected function flag($id, $callback, $feed_nid) { protected function schedule($job) {
$save = array( db_query("UPDATE {feeds_schedule} SET scheduled = %d WHERE id = '%s' AND feed_nid = %d AND callback = '%s'", FEEDS_REQUEST_TIME, $job['id'], $job['feed_nid'], $job['callback']);
'id' => $id, if (db_affected_rows()) {
'callback' => $callback, if ($this->queue()) {
'feed_nid' => $feed_nid, if (!$queue->createItem($job)) {
'last_scheduled_time' => FEEDS_REQUEST_TIME, $this->unschedule($job);
'scheduled' => 1, watchdog('FeedsScheduler', 'Error adding item to queue.', WATCHDOG_CRITICAL);
); return;
drupal_write_record('feeds_schedule', $save, array('id', 'callback', 'feed_nid')); }
} }
else {
$this->work($job);
}
}
}
/** /**
* Helper function to flag a feed unscheduled. * Remove a job from schedule.
* *
* This function sets the feed's scheduled bit to 0 and thus makes * This function sets the source's scheduled bit to 0 and thus makes
* it eligible for being added to the queue again. * it eligible for being added to the queue again.
* *
* @param $id * @param $job
* Id of the importer configuration. * A job array.
* @param $callback
* Callback of the job.
* @param $feed_nid
* Identifier of the feed node.
*/ */
protected function unflag($id, $callback, $feed_nid) { protected function unschedule($job) {
$save = array( unset($job['last_executed_time']);
'id' => $id, $job = array(
'callback' => $callback,
'feed_nid' => $feed_nid,
'scheduled' => 0, 'scheduled' => 0,
); ) + $job;
drupal_write_record('feeds_schedule', $save, array('id', 'callback', 'feed_nid')); drupal_write_record('feeds_schedule', $job, array('id', 'callback', 'feed_nid'));
}
/**
* Release a job and set its last_executed_time flag.
*
* @param $job
* A job array.
*/
protected function finished($job) {
$job = array(
'scheduled' => 0,
'last_executed_time' => FEEDS_REQUEST_TIME,
) + $job;
drupal_write_record('feeds_schedule', $job, array('id', 'callback', 'feed_nid'));
} }
} }
...@@ -76,6 +76,9 @@ class FeedsSource extends FeedsConfigurable { ...@@ -76,6 +76,9 @@ class FeedsSource extends FeedsConfigurable {
// The FeedsImporter object that this source is expected to be used with. // The FeedsImporter object that this source is expected to be used with.
protected $importer; protected $importer;
// A FeedsBatch object. NULL if there is no active batch.
protected $batch;
/** /**
* Instantiate a unique object per class/id/feed_nid. Don't use * Instantiate a unique object per class/id/feed_nid. Don't use
* directly, use feeds_source() instead. * directly, use feeds_source() instead.
...@@ -102,21 +105,31 @@ class FeedsSource extends FeedsConfigurable { ...@@ -102,21 +105,31 @@ class FeedsSource extends FeedsConfigurable {
/** /**
* Import a feed: execute, fetching, parsing and processing stage. * Import a feed: execute, fetching, parsing and processing stage.
* *
* Lock a source before importing by using FeedsSource::lock(), after
* importing, release with FeedsSource::release().
*
* @todo Iron out and document potential Exceptions. * @todo Iron out and document potential Exceptions.
* @todo Support batching.
* @todo catch exceptions outside of import(), clear() and expire(). * @todo catch exceptions outside of import(), clear() and expire().
*/ */
public function import() { public function import() {
try { try {
$feed = $this->importer->fetcher->fetch($this); if (!$this->batch || !($this->batch instanceof FeedsImportBatch)) {
$this->importer->parser->parse($feed, $this); $this->batch = $this->importer->fetcher->fetch($this);
$this->importer->processor->process($feed, $this); $this->importer->parser->parse($this->batch, $this);
unset($feed); }
$result = $this->importer->processor->process($this->batch, $this);
if ($result == FEEDS_BATCH_COMPLETE) {
unset($this->batch);
module_invoke_all('feeds_after_import', $this->importer, $this);
}
} }
catch (Exception $e) { catch (Exception $e) {
unset($this->batch);
$result = FEEDS_BATCH_ACTIVE;
drupal_set_message($e->getMessage(), 'error'); drupal_set_message($e->getMessage(), 'error');
} }
module_invoke_all('feeds_after_import', $this->importer, $this); $this->save();
return $result;
} }
/** /**
...@@ -126,11 +139,21 @@ class FeedsSource extends FeedsConfigurable { ...@@ -126,11 +139,21 @@ class FeedsSource extends FeedsConfigurable {
try { try {
$this->importer->fetcher->clear($this); $this->importer->fetcher->clear($this);
$this->importer->parser->clear($this); $this->importer->parser->clear($this);
$this->importer->processor->clear($this); if (!$this->batch) {
$this->batch = new FeedsBatch();
}
$result = $this->importer->processor->clear($this->batch, $this);
if ($result == FEEDS_BATCH_COMPLETE) {
unset($this->batch);
}
} }
catch (Exception $e) { catch (Exception $e) {
unset($this->batch);
$result = FEEDS_BATCH_COMPLETE;
drupal_set_message($e->getMessage(), 'error'); drupal_set_message($e->getMessage(), 'error');
} }
$this->save();
return $result;
} }
/** /**
...@@ -149,6 +172,7 @@ class FeedsSource extends FeedsConfigurable { ...@@ -149,6 +172,7 @@ class FeedsSource extends FeedsConfigurable {
'feed_nid' => $this->feed_nid, 'feed_nid' => $this->feed_nid,
'config' => $config, 'config' => $config,
'source' => $source, 'source' => $source,
'batch' => isset($this->batch) ? $this->batch : FALSE,
); );
// Make sure a source record is present at all time, try to update first, // Make sure a source record is present at all time, try to update first,
// then insert. // then insert.
...@@ -164,12 +188,13 @@ class FeedsSource extends FeedsConfigurable { ...@@ -164,12 +188,13 @@ class FeedsSource extends FeedsConfigurable {
* @todo Patch CTools to move constants from export.inc to ctools.module. * @todo Patch CTools to move constants from export.inc to ctools.module.
*/ */
public function load() { public function load() {
if ($config = db_result(db_query('SELECT config FROM {feeds_source} WHERE id = "%s" AND feed_nid = %d', $this->id, $this->feed_nid))) { if ($record = db_fetch_object(db_query('SELECT config, batch FROM {feeds_source} WHERE id = "%s" AND feed_nid = %d', $this->id, $this->feed_nid))) {
// While FeedsSource cannot be exported, we still use CTool's export.inc // While FeedsSource cannot be exported, we still use CTool's export.inc
// export definitions. // export definitions.
ctools_include('export'); ctools_include('export');
$this->export_type = EXPORT_IN_DATABASE; $this->export_type = EXPORT_IN_DATABASE;
$this->config = unserialize($config); $this->config = unserialize($record->config);
$this->batch = unserialize($record->batch);
} }
} }
......
...@@ -50,6 +50,8 @@ class FeedsDataProcessor extends FeedsProcessor { ...@@ -50,6 +50,8 @@ class FeedsDataProcessor extends FeedsProcessor {
else { else {
drupal_set_message(t('There are no new items.')); drupal_set_message(t('There are no new items.'));
} }
return FEEDS_BATCH_COMPLETE;
} }
/** /**
...@@ -57,12 +59,13 @@ class FeedsDataProcessor extends FeedsProcessor { ...@@ -57,12 +59,13 @@ class FeedsDataProcessor extends FeedsProcessor {
* *
* Delete all data records for feed_nid in this table. * Delete all data records for feed_nid in this table.
*/ */
public function clear(FeedsSource $source) { public function clear(FeedsBatch $batch, FeedsSource $source) {
$clause = array( $clause = array(
'feed_nid' => $source->feed_nid, 'feed_nid' => $source->feed_nid,
); );
$num = $this->handler()->delete($clause); $num = $this->handler()->delete($clause);
drupal_set_message(t('Deleted !number items.', array('!number' => $num))); drupal_set_message(t('Deleted !number items.', array('!number' => $num)));
return FEEDS_BATCH_COMPLETE;
} }
/** /**
...@@ -73,7 +76,7 @@ class FeedsDataProcessor extends FeedsProcessor { ...@@ -73,7 +76,7 @@ class FeedsDataProcessor extends FeedsProcessor {
$time = $this->expiryTime(); $time = $this->expiryTime();
} }
if ($time == FEEDS_EXPIRE_NEVER) { if ($time == FEEDS_EXPIRE_NEVER) {
return; return FEEDS_BATCH_COMPLETE;
} }
$clause = array( $clause = array(
'timestamp' => array( 'timestamp' => array(
...@@ -83,6 +86,7 @@ class FeedsDataProcessor extends FeedsProcessor { ...@@ -83,6 +86,7 @@ class FeedsDataProcessor extends FeedsProcessor {
); );
$num = $this->handler()->delete($clause); $num = $this->handler()->delete($clause);
drupal_set_message(t('Expired !number records from !table.', array('!number' => $num, '!table' => $this->tableName()))); drupal_set_message(t('Expired !number records from !table.', array('!number' => $num, '!table' => $this->tableName())));
return FEEDS_BATCH_COMPLETE;
} }
/** /**
......
...@@ -16,10 +16,6 @@ class FeedsFeedNodeProcessor extends FeedsProcessor { ...@@ -16,10 +16,6 @@ class FeedsFeedNodeProcessor extends FeedsProcessor {
* Implementation of FeedsProcessor::process(). * Implementation of FeedsProcessor::process().
*/ */
public function process(FeedsImportBatch $batch, FeedsSource $source) { public function process(FeedsImportBatch $batch, FeedsSource $source) {
// Count number of created and updated nodes.
$created = $updated = 0;
while ($item = $batch->shiftItem()) { while ($item = $batch->shiftItem()) {
// If the target item does not exist OR if update_existing is enabled, // If the target item does not exist OR if update_existing is enabled,
...@@ -39,30 +35,32 @@ class FeedsFeedNodeProcessor extends FeedsProcessor { ...@@ -39,30 +35,32 @@ class FeedsFeedNodeProcessor extends FeedsProcessor {
node_save($node); node_save($node);
if ($nid) { if ($nid) {
$updated++; $batch->updated++;
} }
else { else {
$created++; $batch->created++;
} }
} }
} }
// Set messages. // Set messages.
if ($created) { if ($batch->created) {
drupal_set_message(t('Created !number !type nodes.', array('!number' => $created, '!type' => $this->config['content_type']))); drupal_set_message(t('Created !number !type nodes.', array('!number' => $batch->created, '!type' => $this->config['content_type'])));
} }
elseif ($updated) { elseif ($batch->updated) {
drupal_set_message(t('Updated !number !type nodes.', array('!number' => $updated, '!type' => $this->config['content_type']))); drupal_set_message(t('Updated !number !type nodes.', array('!number' => $batch->updated, '!type' => $this->config['content_type'])));
} }
else { else {
drupal_set_message(t('There is no new content.')); drupal_set_message(t('There is no new content.'));
} }
return FEEDS_BATCH_COMPLETE;
} }
/** /**
* Implementation of FeedsProcessor::clear(). * Implementation of FeedsProcessor::clear().
*/ */
public function clear(FeedsSource $source) { public function clear(FeedsBatch $batch, FeedsSource $source) {
// Do not support deleting imported items as we would have to delete all // Do not support deleting imported items as we would have to delete all
// items of the content type we imported which may contain nodes that a // items of the content type we imported which may contain nodes that a
// user created by hand. // user created by hand.
......
...@@ -6,6 +6,9 @@ ...@@ -6,6 +6,9 @@
* Class definition of FeedsNodeProcessor. * Class definition of FeedsNodeProcessor.
*/ */
// Create or delete FEEDS_NODE_BATCH_SIZE at a time.
define('FEEDS_NODE_BATCH_SIZE', 50);
/** /**
* Creates nodes from feed items. * Creates nodes from feed items.
*/ */
...@@ -16,8 +19,8 @@ class FeedsNodeProcessor extends FeedsProcessor { ...@@ -16,8 +19,8 @@ class FeedsNodeProcessor extends FeedsProcessor {
*/ */
public function process(FeedsImportBatch $batch, FeedsSource $source) { public function process(FeedsImportBatch $batch, FeedsSource $source) {
// Count number of created and updated nodes. // Keep track of processed items in this pass.
$created = $updated = 0; $processed = 0;
while ($item = $batch->shiftItem()) { while ($item = $batch->shiftItem()) {
...@@ -38,10 +41,10 @@ class FeedsNodeProcessor extends FeedsProcessor { ...@@ -38,10 +41,10 @@ class FeedsNodeProcessor extends FeedsProcessor {
// If updating populate nid and vid avoiding an expensive node_load(). // If updating populate nid and vid avoiding an expensive node_load().
$node->nid = $nid; $node->nid = $nid;
$node->vid = db_result(db_query('SELECT vid FROM {node} WHERE nid = %d', $nid)); $node->vid = db_result(db_query('SELECT vid FROM {node} WHERE nid = %d', $nid));
$updated++; $batch->updated++;
} }
else { else {
$created++; $batch->created++;
} }
// Populate and prepare node object. // Populate and prepare node object.
...@@ -69,41 +72,52 @@ class FeedsNodeProcessor extends FeedsProcessor { ...@@ -69,41 +72,52 @@ class FeedsNodeProcessor extends FeedsProcessor {
// Save the node. // Save the node.
node_save($node); node_save($node);
} }
$processed++;
if ($processed >= variable_get('feeds_node_batch_size', FEEDS_NODE_BATCH_SIZE)) {
return (1.0 / ($batch->total + 1)) * ($batch->updated + $batch->created); // Add + 1 to make sure that result is not 1.0 = finished.
}
} }
// Set messages. // Set messages.
if ($created) { if ($batch->created) {
drupal_set_message(t('Created !number !type nodes.', array('!number' => $created, '!type' => node_get_types('name', $this->config['content_type'])))); drupal_set_message(t('Created !number !type nodes.', array('!number' => $batch->created, '!type' => node_get_types('name', $this->config['content_type']))));
} }
elseif ($updated) { elseif ($batch->updated) {
drupal_set_message(t('Updated !number !type nodes.', array('!number' => $updated, '!type' => node_get_types('name', $this->config['content_type'])))); drupal_set_message(t('Updated !number !type nodes.', array('!number' => $batch->updated, '!type' => node_get_types('name', $this->config['content_type']))));
} }
else { else {
drupal_set_message(t('There is no new content.')); drupal_set_message(t('There is no new content.'));
} }
return FEEDS_BATCH_COMPLETE;
} }
/** /**
* Implementation of FeedsProcessor::clear(). * Implementation of FeedsProcessor::clear().
*/ */
public function clear(FeedsSource $source) { public function clear(FeedsBatch $batch, FeedsSource $source) {
// Count number of deleted nodes. if (empty($batch->total)) {
$deleted = 0; $batch->total = db_result(db_query("SELECT COUNT(nid) FROM {feeds_node_item} WHERE feed_nid = %d", $source->feed_nid));
}
$result = db_query('SELECT nid FROM {feeds_node_item} WHERE feed_nid = %d', $source->feed_nid); $result = db_query_range('SELECT nid FROM {feeds_node_item} WHERE feed_nid = %d', $source->feed_nid, 0, variable_get('feeds_node_batch_size', FEEDS_NODE_BATCH_SIZE));
while ($node = db_fetch_object($result)) { while ($node = db_fetch_object($result)) {
_feeds_node_delete($node->nid); _feeds_node_delete($node->nid);
$deleted++; $batch->deleted++;
}
if (db_result(db_query_range('SELECT nid FROM {feeds_node_item} WHERE feed_nid = %d', $source->feed_nid, 0, 1))) {
return (1.0 / ($batch->total + 1)) * $batch->deleted;
} }
// Set message. // Set message.
drupal_get_messages('status'); drupal_get_messages('status');
if ($deleted) { if ($batch->deleted) {
drupal_set_message(t('Deleted !number nodes.', array('!number' => $deleted))); drupal_set_message(t('Deleted !number nodes.', array('!number' => $batch->deleted)));
} }
else { else {
drupal_set_message(t('There is no content to be deleted.')); drupal_set_message(t('There is no content to be deleted.'));
} }
return FEEDS_BATCH_COMPLETE;
} }
/** /**
...@@ -116,14 +130,14 @@ class FeedsNodeProcessor extends FeedsProcessor { ...@@ -116,14 +130,14 @@ class FeedsNodeProcessor extends FeedsProcessor {
if ($time == FEEDS_EXPIRE_NEVER) { if ($time == FEEDS_EXPIRE_NEVER) {
return; return;
} }
// @todo Expires 50 at a time at the moment. $result = db_query_range('SELECT n.nid FROM {node} n JOIN {feeds_node_item} fni ON n.nid = fni.nid WHERE fni.id = "%s" AND n.created < %d', $this->id, FEEDS_REQUEST_TIME - $time, 0, variable_get('feeds_node_batch_size', FEEDS_NODE_BATCH_SIZE));
// Create a way of letting the caller know whether all nodes could be
// deleted. Has to be thought through in a larger context of batch
// processing support for import and expiry.
$result = db_query('SELECT n.nid FROM {node} n JOIN {feeds_node_item} fni ON n.nid = fni.nid WHERE fni.id = "%s" AND n.created < %d', $this->id, FEEDS_REQUEST_TIME - $time, 0, 50);
while ($node = db_fetch_object($result)) { while ($node = db_fetch_object($result)) {
_feeds_node_delete($node->nid); _feeds_node_delete($node->nid);
} }
if (db_result(db_query_range('SELECT n.nid FROM {node} n JOIN {feeds_node_item} fni ON n.nid = fni.nid WHERE fni.id = "%s" AND n.created < %d', $this->id, FEEDS_REQUEST_TIME - $time, 0, 1))) {
return FEEDS_BATCH_ACTIVE;
}
return FEEDS_BATCH_COMPLETE;
} }
/** /**
......
...@@ -14,6 +14,10 @@ abstract class FeedsProcessor extends FeedsPlugin { ...@@ -14,6 +14,10 @@ abstract class FeedsProcessor extends FeedsPlugin {
* The current feed import data passed in from the parsing stage. * The current feed import data passed in from the parsing stage.
* @param FeedsSource $source * @param FeedsSource $source
* Source information about this import. * Source information about this import.
*
* @return
* FEEDS_BATCH_COMPLETE if all items have been processed, a float between 0
* and 0.99* indicating progress otherwise.
*/ */
public abstract function process(FeedsImportBatch $batch, FeedsSource $source); public abstract function process(FeedsImportBatch $batch, FeedsSource $source);
...@@ -21,26 +25,41 @@ abstract class FeedsProcessor extends FeedsPlugin { ...@@ -21,26 +25,41 @@ abstract class FeedsProcessor extends FeedsPlugin {
* Remove all stored results or stored results up to a certain time for this * Remove all stored results or stored results up to a certain time for this
* configuration/this source. * configuration/this source.
* *
* @param FeedsBatch $batch
* A FeedsBatch object for tracking information such as how many
* items have been deleted total between page loads.
* @param FeedsSource $source * @param FeedsSource $source
* Source information for this expiry. Implementers should only delete items * Source information for this expiry. Implementers should only delete items
* pertaining to this source. The preferred way of determining whether an * pertaining to this source. The preferred way of determining whether an
* item pertains to a certain souce is by using $source->feed_nid. It is the * item pertains to a certain souce is by using $source->feed_nid. It is the
* processor's responsibility to store the feed_nid of an imported item in * processor's responsibility to store the feed_nid of an imported item in
* the processing stage. * the processing stage.
*
* @return
* FEEDS_BATCH_COMPLETE if all items have been processed, a float between 0
* and 0.99* indicating progress otherwise.
*/ */
public abstract function clear(FeedsSource $source); public abstract function clear(FeedsBatch $batch, FeedsSource $source);
/** /**
* Delete feed items younger than now - $time. * Delete feed items younger than now - $time. Do not invoke expire on a
* processor directly, but use FeedsImporter::expire() instead.
* *
* @see FeedsImporter::expire().
* @see FeedsDataProcessor::expire(). * @see FeedsDataProcessor::expire().
* *
* @param $time * @param $time
* If implemented, all items produced by this configuration that are older * If implemented, all items produced by this configuration that are older
* than FEEDS_REQUEST_TIME - $time * than FEEDS_REQUEST_TIME - $time should be deleted.
* If $time === NULL processor should use internal configuration. * If $time === NULL processor should use internal configuration.
*
* @return
* FEEDS_BATCH_COMPLETE if all items have been processed, a float between 0
* and 0.99* indicating progress otherwise.
*/ */
public function expire($time = NULL) {} public function expire($time = NULL) {
return FEEDS_BATCH_COMPLETE;
}
/** /**
* Execute mapping on an item. * Execute mapping on an item.
......
...@@ -69,16 +69,14 @@ class FeedsTermProcessor extends FeedsProcessor { ...@@ -69,16 +69,14 @@ class FeedsTermProcessor extends FeedsProcessor {
else { else {
drupal_set_message(t('There are no new terms.')); drupal_set_message(t('There are no new terms.'));
} }
return FEEDS_NODE_BATCH_SIZE;
} }
/** /**
* Implement clear. * Implementation of FeedsProcessor::clear().
*
* @param $source
* FeedsSource of this term. FeedsTermProcessor does not heed this
* parameter, it deletes all terms from a vocabulary.
*/ */
public function clear(FeedsSource $source) { public function clear(FeedsBatch $batch, FeedsSource $source) {
$deleted = 0; $deleted = 0;
$result = db_query('SELECT tid FROM {term_data} WHERE vid = %d', $this->config['vocabulary']); $result = db_query('SELECT tid FROM {term_data} WHERE vid = %d', $this->config['vocabulary']);
...@@ -97,6 +95,7 @@ class FeedsTermProcessor extends FeedsProcessor { ...@@ -97,6 +95,7 @@ class FeedsTermProcessor extends FeedsProcessor {
else { else {
drupal_set_message(t('No terms to be deleted.')); drupal_set_message(t('No terms to be deleted.'));
} }
return FEEDS_BATCH_COMPLETE;
} }
/** /**
......
...@@ -63,16 +63,14 @@ class FeedsUserProcessor extends FeedsProcessor { ...@@ -63,16 +63,14 @@ class FeedsUserProcessor extends FeedsProcessor {
else { else {
drupal_set_message(t('There are no new users.')); drupal_set_message(t('There are no new users.'));
} }
return FEEDS_BATCH_COMPLETE;
} }
/** /**
* Implement clear. * Implementation of FeedsProcessor::clear().
*
* @param $source
* FeedsSource of this term. FeedsTermProcessor does not heed this
* parameter, it deletes all terms from a vocabulary.
*/ */
public function clear(FeedsSource $source) { public function clear(FeedsBatch $batch, FeedsSource $source) {
// Do not support deleting users as we have no way of knowing which ones we // Do not support deleting users as we have no way of knowing which ones we
// imported. // imported.
throw new Exception(t('User processor does not support deleting users.')); throw new Exception(t('User processor does not support deleting users.'));
......
...@@ -592,7 +592,6 @@ class FeedsSchedulerTestCase extends FeedsWebTestCase { ...@@ -592,7 +592,6 @@ class FeedsSchedulerTestCase extends FeedsWebTestCase {
*/ */
public function setUp() { public function setUp() {
parent::setUp('feeds', 'feeds_ui', 'ctools'); parent::setUp('feeds', 'feeds_ui', 'ctools');
$this->drupalLogin( $this->drupalLogin(
$this->drupalCreateUser( $this->drupalCreateUser(
array( array(
...@@ -600,13 +599,6 @@ class FeedsSchedulerTestCase extends FeedsWebTestCase { ...@@ -600,13 +599,6 @@ class FeedsSchedulerTestCase extends FeedsWebTestCase {
) )
) )
); );
}
/**
* Test scheduling on cron.
*/
public function testScheduling() {
// Create default configuration.
$this->createFeedConfiguration(); $this->createFeedConfiguration();
$this->addMappings('syndication', $this->addMappings('syndication',
array( array(
...@@ -637,7 +629,12 @@ class FeedsSchedulerTestCase extends FeedsWebTestCase { ...@@ -637,7 +629,12 @@ class FeedsSchedulerTestCase extends FeedsWebTestCase {
), ),
) )
); );
}
/**
* Test scheduling on cron.
*/
public function testScheduling() {
// Create 10 feed nodes. Turn off import on create before doing that. // Create 10 feed nodes. Turn off import on create before doing that.
$edit = array( $edit = array(
'import_on_create' => FALSE, 'import_on_create' => FALSE,
...@@ -656,7 +653,7 @@ class FeedsSchedulerTestCase extends FeedsWebTestCase { ...@@ -656,7 +653,7 @@ class FeedsSchedulerTestCase extends FeedsWebTestCase {
// There should be feeds_schedule_num (= 10) feeds updated now. // There should be feeds_schedule_num (= 10) feeds updated now.
$schedule = array(); $schedule = array();
$count = db_result(db_query('select COUNT(*) from {feeds_schedule} WHERE last_scheduled_time <> 0')); $count = db_result(db_query('select COUNT(*) from {feeds_schedule} WHERE last_executed_time <> 0'));
$this->assertEqual($count, 10, '10 feeds refreshed on cron.'); $this->assertEqual($count, 10, '10 feeds refreshed on cron.');
// There should be 100 story nodes in the database. // There should be 100 story nodes in the database.
...@@ -669,7 +666,7 @@ class FeedsSchedulerTestCase extends FeedsWebTestCase { ...@@ -669,7 +666,7 @@ class FeedsSchedulerTestCase extends FeedsWebTestCase {
// There should be feeds_schedule_num X 2 (= 20) feeds updated now. // There should be feeds_schedule_num X 2 (= 20) feeds updated now.
$schedule = array(); $schedule = array();
$result = db_query('select feed_nid, last_scheduled_time, scheduled from {feeds_schedule} WHERE last_scheduled_time <> 0'); $result = db_query('select feed_nid, last_executed_time, scheduled from {feeds_schedule} WHERE last_executed_time <> 0');
while ($row = db_fetch_object($result)) { while ($row = db_fetch_object($result)) {
$schedule[$row->feed_nid] = $row; $schedule[$row->feed_nid] = $row;
} }
...@@ -691,9 +688,9 @@ class FeedsSchedulerTestCase extends FeedsWebTestCase { ...@@ -691,9 +688,9 @@ class FeedsSchedulerTestCase extends FeedsWebTestCase {
// The import_period setting of the feed configuration is 1800, there // The import_period setting of the feed configuration is 1800, there
// shouldn't be any change to the database now. // shouldn't be any change to the database now.
$equal = TRUE; $equal = TRUE;
$result = db_query('select feed_nid, last_scheduled_time, scheduled from {feeds_schedule} WHERE last_scheduled_time <> 0'); $result = db_query('select feed_nid, last_executed_time, scheduled from {feeds_schedule} WHERE last_executed_time <> 0');
while ($row = db_fetch_object($result)) { while ($row = db_fetch_object($result)) {
$equal = $equal && ($row->last_scheduled_time == $schedule[$row->feed_nid]->last_scheduled_time); $equal = $equal && ($row->last_executed_time == $schedule[$row->feed_nid]->last_executed_time);
} }
$this->assertTrue($equal, 'Schedule did not change.'); $this->assertTrue($equal, 'Schedule did not change.');
...@@ -712,19 +709,18 @@ class FeedsSchedulerTestCase extends FeedsWebTestCase { ...@@ -712,19 +709,18 @@ class FeedsSchedulerTestCase extends FeedsWebTestCase {
$this->assertText('Refresh: as often as possible'); $this->assertText('Refresh: as often as possible');
// Hit cron again, 4 times now. // Hit cron again, 4 times now.
$this->drupalGet($GLOBALS['base_url'] .'/cron.php'); for ($i = 0; $i < 4; $i++) {
$this->drupalGet($GLOBALS['base_url'] .'/cron.php'); $this->drupalGet($GLOBALS['base_url'] .'/cron.php');
$this->drupalGet($GLOBALS['base_url'] .'/cron.php'); }
$this->drupalGet($GLOBALS['base_url'] .'/cron.php');
// Refresh period is set to 'as often as possible'. All scheduled times // Refresh period is set to 'as often as possible'. All scheduled times
// should have changed now. // should have changed now.
// There should not be more nodes than before. // There should not be more nodes than before.
$equal = FALSE; $equal = FALSE;
$output = ''; $output = '';
$result = db_query('select feed_nid, last_scheduled_time, scheduled from {feeds_schedule} WHERE last_scheduled_time <> 0'); $result = db_query('select feed_nid, last_executed_time, scheduled from {feeds_schedule} WHERE last_executed_time <> 0');
while ($row = db_fetch_object($result)) { while ($row = db_fetch_object($result)) {
$equal = $equal || ($row->last_scheduled_time == $schedule[$row->feed_nid]->last_scheduled_time); $equal = $equal || ($row->last_executed_time == $schedule[$row->feed_nid]->last_executed_time);
} }
$this->assertFalse($equal, 'Every feed schedule time changed.'); $this->assertFalse($equal, 'Every feed schedule time changed.');
...@@ -735,6 +731,34 @@ class FeedsSchedulerTestCase extends FeedsWebTestCase { ...@@ -735,6 +731,34 @@ class FeedsSchedulerTestCase extends FeedsWebTestCase {
// @todo Use debug time feature in FeedsScheduler and test behavior in future. // @todo Use debug time feature in FeedsScheduler and test behavior in future.
// @todo How do I call an API function on the test system from the test script? // @todo How do I call an API function on the test system from the test script?
} }
/**
* Test batching on cron.
*/
function testBatching() {
// Verify that there are 150 nodes total.
$nid = $this->createFeedNode('syndication', $GLOBALS['base_url'] .'/'. drupal_get_path('module', 'feeds') .'/tests/feeds/many_items.rss2');
$this->assertText('Created 150 Story nodes.');
$this->drupalPost('node/'. $nid .'/delete-items', array(), 'Delete');
$this->assertText('Deleted 150 nodes.');
// Hit cron 3 times, assert correct number of story nodes.
for ($i = 0; $i < 3; $i++) {
$this->drupalGet($GLOBALS['base_url'] .'/cron.php');
// 50 == FEEDS_NODE_BATCH_SIZE
$this->assertEqual(50 * ($i + 1), db_result(db_query("SELECT COUNT(*) FROM {node} WHERE type = 'story'")));
}
// Delete a couple of nodes, then hit cron again. They should not be replaced
// as the minimum update time is 30 minutes.
$result = db_query_range("SELECT nid FROM {node} WHERE type = 'story'", 0, 2);
while ($node = db_fetch_object($result)) {
$this->drupalPost("node/{$node->nid}/delete", array(), 'Delete');
}
$this->assertEqual(148, db_result(db_query("SELECT COUNT(*) FROM {node} WHERE type = 'story'")));
$this->drupalGet($GLOBALS['base_url'] .'/cron.php');
$this->assertEqual(148, db_result(db_query("SELECT COUNT(*) FROM {node} WHERE type = 'story'")));
}
} }
/** /**
......
...@@ -182,9 +182,9 @@ class FeedsWebTestCase extends DrupalWebTestCase { ...@@ -182,9 +182,9 @@ class FeedsWebTestCase extends DrupalWebTestCase {
$this->assertEqual($config['FeedsHTTPFetcher']['source'], $feed_url, t('URL in DB correct.')); $this->assertEqual($config['FeedsHTTPFetcher']['source'], $feed_url, t('URL in DB correct.'));
// Check whether feed got properly added to scheduler. // Check whether feed got properly added to scheduler.
$this->assertEqual(1, db_result(db_query('SELECT COUNT(*) FROM {feeds_schedule} WHERE id = "%s" AND feed_nid = %d AND callback = "import" AND last_scheduled_time = 0 AND scheduled = 0', $id, $nid))); $this->assertEqual(1, db_result(db_query('SELECT COUNT(*) FROM {feeds_schedule} WHERE id = "%s" AND feed_nid = %d AND callback = "import" AND last_executed_time = 0 AND scheduled = 0', $id, $nid)));
// There must be only one entry for 'expire' - no matter how many actual feed nodes exist. // There must be only one entry for 'expire' - no matter how many actual feed nodes exist.
$this->assertEqual(1, db_result(db_query('SELECT COUNT(*) FROM {feeds_schedule} WHERE id = "%s" AND callback = "expire" AND last_scheduled_time = 0 AND scheduled = 0', $id))); $this->assertEqual(1, db_result(db_query('SELECT COUNT(*) FROM {feeds_schedule} WHERE id = "%s" AND callback = "expire" AND last_executed_time = 0 AND scheduled = 0', $id)));
return $nid; return $nid;
} }
...@@ -248,9 +248,9 @@ class FeedsWebTestCase extends DrupalWebTestCase { ...@@ -248,9 +248,9 @@ class FeedsWebTestCase extends DrupalWebTestCase {
$this->assertEqual($config['FeedsHTTPFetcher']['source'], $feed_url, t('URL in DB correct.')); $this->assertEqual($config['FeedsHTTPFetcher']['source'], $feed_url, t('URL in DB correct.'));
// Check whether feed got properly added to scheduler. // Check whether feed got properly added to scheduler.
$this->assertEqual(1, db_result(db_query('SELECT COUNT(*) FROM {feeds_schedule} WHERE id = "%s" AND feed_nid = 0 AND callback = "import" AND last_scheduled_time = 0 AND scheduled = 0', $id))); $this->assertEqual(1, db_result(db_query('SELECT COUNT(*) FROM {feeds_schedule} WHERE id = "%s" AND feed_nid = 0 AND callback = "import" AND last_executed_time = 0 AND scheduled = 0', $id)));
// There must be only one entry for callback 'expire' - no matter what the feed_nid is. // There must be only one entry for callback 'expire' - no matter what the feed_nid is.
$this->assertEqual(1, db_result(db_query('SELECT COUNT(*) FROM {feeds_schedule} WHERE id = "%s" AND callback = "expire" AND last_scheduled_time = 0 AND scheduled = 0', $id))); $this->assertEqual(1, db_result(db_query('SELECT COUNT(*) FROM {feeds_schedule} WHERE id = "%s" AND callback = "expire" AND last_executed_time = 0 AND scheduled = 0', $id)));
} }
/** /**
......
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment