Mercurial > hg > drupalISMI
diff relation_processor/FeedsProcessor.inc @ 0:124ef8f3b22d
initial
author | Dirk Wintergruen <dwinter@mpiwg-berlin.mpg.de> |
---|---|
date | Fri, 27 Mar 2015 19:21:42 +0100 |
parents | |
children | 1c73c660c2f2 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/relation_processor/FeedsProcessor.inc Fri Mar 27 19:21:42 2015 +0100 @@ -0,0 +1,824 @@ +<?php + +/** + * @file + * Contains FeedsProcessor and related classes. + */ + +// Update mode for existing items. +define('FEEDS_SKIP_EXISTING', 0); +define('FEEDS_REPLACE_EXISTING', 1); +define('FEEDS_UPDATE_EXISTING', 2); + +// Default limit for creating items on a page load, not respected by all +// processors. +define('FEEDS_PROCESS_LIMIT', 50); + +/** + * Thrown if a validation fails. + */ +class FeedsValidationException extends Exception {} + +/** + * Thrown if a an access check fails. + */ +class FeedsAccessException extends Exception {} + +/** + * Abstract class, defines interface for processors. + */ +abstract class FeedsProcessor extends FeedsPlugin { + + /** + * Implements FeedsPlugin::pluginType(). + */ + public function pluginType() { + return 'processor'; + } + + /** + * @defgroup entity_api_wrapper Entity API wrapper. + */ + + /** + * Entity type this processor operates on. + */ + public abstract function entityType(); + + /** + * Bundle type this processor operates on. + * + * Defaults to the entity type for entities that do not define bundles. + * + * @return string|NULL + * The bundle type this processor operates on, or NULL if it is undefined. + */ + public function bundle() { + return $this->config['bundle']; + } + + /** + * Provides a list of bundle options for use in select lists. + * + * @return array + * A keyed array of bundle => label. + */ + public function bundleOptions() { + $options = array(); + foreach (field_info_bundles($this->entityType()) as $bundle => $info) { + if (!empty($info['label'])) { + $options[$bundle] = $info['label']; + } + else { + $options[$bundle] = $bundle; + } + } + return $options; + } + + /** + * Create a new entity. + * + * @param $source + * The feeds source that spawns this entity. + * + * @return + * A new entity object. + */ + protected abstract function newEntity(FeedsSource $source); + + /** + * Load an existing entity. + * + * @param $source + * The feeds source that spawns this entity. + * @param $entity_id + * The unique id of the entity that should be loaded. + * + * @return + * A new entity object. + * + * @todo We should be able to batch load these, if we found all of the + * existing ids first. + */ + protected function entityLoad(FeedsSource $source, $entity_id) { + if ($this->config['update_existing'] == FEEDS_UPDATE_EXISTING) { + $entities = entity_load($this->entityType(), array($entity_id)); + return reset($entities); + } + + $info = $this->entityInfo(); + + $args = array(':entity_id' => $entity_id); + + $table = db_escape_table($info['base table']); + $key = db_escape_field($info['entity keys']['id']); + + return db_query("SELECT * FROM {" . $table . "} WHERE $key = :entity_id", $args)->fetchObject(); + } + + /** + * Validate an entity. + * + * @throws FeedsValidationException $e + * If validation fails. + */ + protected function entityValidate($entity) {} + + /** + * Access check for saving an enity. + * + * @param $entity + * Entity to be saved. + * + * @throws FeedsAccessException $e + * If the access check fails. + */ + protected function entitySaveAccess($entity) {} + + /** + * Save an entity. + * + * @param $entity + * Entity to be saved. + */ + protected abstract function entitySave($entity); + + /** + * Delete a series of entities. + * + * @param $entity_ids + * Array of unique identity ids to be deleted. + */ + protected abstract function entityDeleteMultiple($entity_ids); + + /** + * Wrap entity_get_info() into a method so that extending classes can override + * it and more entity information. Allowed additional keys: + * + * 'label plural' ... the plural label of an entity type. + */ + protected function entityInfo() { + return entity_get_info($this->entityType()); + } + + /** + * @} + */ + + /** + * Process the result of the parsing stage. + * + * @param FeedsSource $source + * Source information about this import. + * @param FeedsParserResult $parser_result + * The result of the parsing stage. + */ + public function process(FeedsSource $source, FeedsParserResult $parser_result) { + $state = $source->state(FEEDS_PROCESS); + + while ($item = $parser_result->shiftItem()) { + + // Check if this item already exists. + $entity_id = $this->existingEntityId($source, $parser_result); + $skip_existing = $this->config['update_existing'] == FEEDS_SKIP_EXISTING; + + module_invoke_all('feeds_before_update', $source, $item, $entity_id); + + // If it exists, and we are not updating, pass onto the next item. + if ($entity_id && $skip_existing) { + continue; + } + + $hash = $this->hash($item); + $changed = ($hash !== $this->getHash($entity_id)); + $force_update = $this->config['skip_hash_check']; + + // Do not proceed if the item exists, has not changed, and we're not + // forcing the update. + if ($entity_id && !$changed && !$force_update) { + continue; + } + + try { + + // Load an existing entity. + if ($entity_id) { + $entity = $this->entityLoad($source, $entity_id); + + // The feeds_item table is always updated with the info for the most + // recently processed entity. The only carryover is the entity_id. + $this->newItemInfo($entity, $source->feed_nid, $hash); + $entity->feeds_item->entity_id = $entity_id; + $entity->feeds_item->is_new = FALSE; + } + + // Build a new entity. + else { + + $entity = $this->newEntity($source); + $this->newItemInfo($entity, $source->feed_nid, $hash); + } + + // Set property and field values. + $this->map($source, $parser_result, $entity); + $this->entityValidate($entity); + + // Allow modules to alter the entity before saving. + module_invoke_all('feeds_presave', $source, $entity, $item, $entity_id); + if (module_exists('rules')) { + rules_invoke_event('feeds_import_'. $source->importer()->id, $entity); + } + + // Enable modules to skip saving at all. + if (!empty($entity->feeds_item->skip)) { + continue; + } + + // This will throw an exception on failure. + $this->entitySaveAccess($entity); + $this->entitySave($entity); + + // Allow modules to perform operations using the saved entity data. + // $entity contains the updated entity after saving. + module_invoke_all('feeds_after_save', $source, $entity, $item, $entity_id); + + // Track progress. + if (empty($entity_id)) { + $state->created++; + } + else { + $state->updated++; + } + } + + // Something bad happened, log it. + catch (Exception $e) { + $state->failed++; + drupal_set_message($e->getMessage(), 'warning'); + $message = $this->createLogMessage($e, $entity, $item); + $source->log('import', $message, array(), WATCHDOG_ERROR); + } + } + + // Set messages if we're done. + if ($source->progressImporting() != FEEDS_BATCH_COMPLETE) { + return; + } + $info = $this->entityInfo(); + $tokens = array( + '@entity' => strtolower($info['label']), + '@entities' => strtolower($info['label plural']), + ); + $messages = array(); + if ($state->created) { + $messages[] = array( + 'message' => format_plural( + $state->created, + 'Created @number @entity.', + 'Created @number @entities.', + array('@number' => $state->created) + $tokens + ), + ); + } + if ($state->updated) { + $messages[] = array( + 'message' => format_plural( + $state->updated, + 'Updated @number @entity.', + 'Updated @number @entities.', + array('@number' => $state->updated) + $tokens + ), + ); + } + if ($state->failed) { + $messages[] = array( + 'message' => format_plural( + $state->failed, + 'Failed importing @number @entity.', + 'Failed importing @number @entities.', + array('@number' => $state->failed) + $tokens + ), + 'level' => WATCHDOG_ERROR, + ); + } + if (empty($messages)) { + $messages[] = array( + 'message' => t('There are no new @entities.', array('@entities' => strtolower($info['label plural']))), + ); + } + foreach ($messages as $message) { + drupal_set_message($message['message']); + $source->log('import', $message['message'], array(), isset($message['level']) ? $message['level'] : WATCHDOG_INFO); + } + } + + /** + * Remove all stored results or stored results up to a certain time for a + * source. + * + * @param FeedsSource $source + * Source information for this expiry. Implementers should only delete items + * pertaining to this source. The preferred way of determining whether an + * item pertains to a certain souce is by using $source->feed_nid. It is the + * processor's responsibility to store the feed_nid of an imported item in + * the processing stage. + */ + public function clear(FeedsSource $source) { + $state = $source->state(FEEDS_PROCESS_CLEAR); + + // Build base select statement. + $info = $this->entityInfo(); + $select = db_select($info['base table'], 'e'); + $select->addField('e', $info['entity keys']['id'], 'entity_id'); + $select->join( + 'feeds_item', + 'fi', + "e.{$info['entity keys']['id']} = fi.entity_id AND fi.entity_type = '{$this->entityType()}'"); + $select->condition('fi.id', $this->id); + $select->condition('fi.feed_nid', $source->feed_nid); + + // If there is no total, query it. + if (!$state->total) { + $state->total = $select->countQuery() + ->execute() + ->fetchField(); + } + + // Delete a batch of entities. + $entities = $select->range(0, $this->getLimit())->execute(); + $entity_ids = array(); + foreach ($entities as $entity) { + $entity_ids[$entity->entity_id] = $entity->entity_id; + } + $this->entityDeleteMultiple($entity_ids); + + // Report progress, take into account that we may not have deleted as + // many items as we have counted at first. + if (count($entity_ids)) { + $state->deleted += count($entity_ids); + $state->progress($state->total, $state->deleted); + } + else { + $state->progress($state->total, $state->total); + } + + // Report results when done. + if ($source->progressClearing() == FEEDS_BATCH_COMPLETE) { + if ($state->deleted) { + $message = format_plural( + $state->deleted, + 'Deleted @number @entity', + 'Deleted @number @entities', + array( + '@number' => $state->deleted, + '@entity' => strtolower($info['label']), + '@entities' => strtolower($info['label plural']), + ) + ); + $source->log('clear', $message, array(), WATCHDOG_INFO); + drupal_set_message($message); + } + else { + drupal_set_message(t('There are no @entities to be deleted.', array('@entities' => $info['label plural']))); + } + } + } + + /* + * Report number of items that can be processed per call. + * + * 0 means 'unlimited'. + * + * If a number other than 0 is given, Feeds parsers that support batching + * will only deliver this limit to the processor. + * + * @see FeedsSource::getLimit() + * @see FeedsCSVParser::parse() + */ + public function getLimit() { + return variable_get('feeds_process_limit', FEEDS_PROCESS_LIMIT); + } + + /** + * Delete feed items younger than now - $time. Do not invoke expire on a + * processor directly, but use FeedsImporter::expire() instead. + * + * @see FeedsImporter::expire(). + * @see FeedsDataProcessor::expire(). + * + * @param $time + * If implemented, all items produced by this configuration that are older + * than REQUEST_TIME - $time should be deleted. + * If $time === NULL processor should use internal configuration. + * + * @return + * FEEDS_BATCH_COMPLETE if all items have been processed, a float between 0 + * and 0.99* indicating progress otherwise. + */ + public function expire($time = NULL) { + return FEEDS_BATCH_COMPLETE; + } + + /** + * Counts the number of items imported by this processor. + */ + public function itemCount(FeedsSource $source) { + return db_query("SELECT count(*) FROM {feeds_item} WHERE id = :id AND entity_type = :entity_type AND feed_nid = :feed_nid", array(':id' => $this->id, ':entity_type' => $this->entityType(), ':feed_nid' => $source->feed_nid))->fetchField(); + } + + /** + * Execute mapping on an item. + * + * This method encapsulates the central mapping functionality. When an item is + * processed, it is passed through map() where the properties of $source_item + * are mapped onto $target_item following the processor's mapping + * configuration. + * + * For each mapping FeedsParser::getSourceElement() is executed to retrieve + * the source element, then FeedsProcessor::setTargetElement() is invoked + * to populate the target item properly. Alternatively a + * hook_x_targets_alter() may have specified a callback for a mapping target + * in which case the callback is asked to populate the target item instead of + * FeedsProcessor::setTargetElement(). + * + * @ingroup mappingapi + * + * @see hook_feeds_parser_sources_alter() + * @see hook_feeds_data_processor_targets_alter() + * @see hook_feeds_node_processor_targets_alter() + * @see hook_feeds_term_processor_targets_alter() + * @see hook_feeds_user_processor_targets_alter() + */ + protected function map(FeedsSource $source, FeedsParserResult $result, $target_item = NULL) { + + // Static cache $targets as getMappingTargets() may be an expensive method. + static $sources; + if (!isset($sources[$this->id])) { + $sources[$this->id] = feeds_importer($this->id)->parser->getMappingSources(); + } + static $targets; + if (!isset($targets[$this->id])) { + $targets[$this->id] = $this->getMappingTargets(); + } + $parser = feeds_importer($this->id)->parser; + if (empty($target_item)) { + $target_item = array(); + } + + // Many mappers add to existing fields rather than replacing them. Hence we + // need to clear target elements of each item before mapping in case we are + // mapping on a prepopulated item such as an existing node. + foreach ($this->config['mappings'] as $mapping) { + if (isset($targets[$this->id][$mapping['target']]['real_target'])) { + unset($target_item->{$targets[$this->id][$mapping['target']]['real_target']}); + } + elseif (isset($target_item->{$mapping['target']})) { + unset($target_item->{$mapping['target']}); + } + } + + /* + This is where the actual mapping happens: For every mapping we envoke + the parser's getSourceElement() method to retrieve the value of the source + element and pass it to the processor's setTargetElement() to stick it + on the right place of the target item. + + If the mapping specifies a callback method, use the callback instead of + setTargetElement(). + */ + self::loadMappers(); + foreach ($this->config['mappings'] as $mapping) { + // Retrieve source element's value from parser. + if (isset($sources[$this->id][$mapping['source']]) && + is_array($sources[$this->id][$mapping['source']]) && + isset($sources[$this->id][$mapping['source']]['callback']) && + function_exists($sources[$this->id][$mapping['source']]['callback'])) { + $callback = $sources[$this->id][$mapping['source']]['callback']; + $value = $callback($source, $result, $mapping['source']); + } + else { + $value = $parser->getSourceElement($source, $result, $mapping['source']); + } + + // Map the source element's value to the target. + if (isset($targets[$this->id][$mapping['target']]) && + is_array($targets[$this->id][$mapping['target']]) && + isset($targets[$this->id][$mapping['target']]['callback']) && + function_exists($targets[$this->id][$mapping['target']]['callback'])) { + $callback = $targets[$this->id][$mapping['target']]['callback']; + $callback($source, $target_item, $mapping['target'], $value, $mapping); + } + else { + $this->setTargetElement($source, $target_item, $mapping['target'], $value, $mapping); + } + } + return $target_item; + } + + /** + * Per default, don't support expiry. If processor supports expiry of imported + * items, return the time after which items should be removed. + */ + public function expiryTime() { + return FEEDS_EXPIRE_NEVER; + } + + /** + * Declare default configuration. + */ + public function configDefaults() { + $info = $this->entityInfo(); + $bundle = NULL; + if (empty($info['entity keys']['bundle'])) { + $bundle = $this->entityType(); + } + return array( + 'mappings' => array(), + 'update_existing' => FEEDS_SKIP_EXISTING, + 'input_format' => NULL, + 'skip_hash_check' => FALSE, + 'bundle' => $bundle, + ); + } + + /** + * Overrides parent::configForm(). + */ + public function configForm(&$form_state) { + $info = $this->entityInfo(); + $form = array(); + + if (!empty($info['entity keys']['bundle'])) { + $form['bundle'] = array( + '#type' => 'select', + '#options' => $this->bundleOptions(), + '#title' => !empty($info['bundle name']) ? $info['bundle name'] : t('Bundle'), + '#required' => TRUE, + '#default_value' => $this->bundle(), + ); + } + else { + $form['bundle'] = array( + '#type' => 'value', + '#value' => $this->entityType(), + ); + } + + $tokens = array('@entities' => strtolower($info['label plural'])); + + $form['update_existing'] = array( + '#type' => 'radios', + '#title' => t('Update existing @entities', $tokens), + '#description' => + t('Existing @entities will be determined using mappings that are a "unique target".', $tokens), + '#options' => array( + FEEDS_SKIP_EXISTING => t('Do not update existing @entities', $tokens), + FEEDS_REPLACE_EXISTING => t('Replace existing @entities', $tokens), + FEEDS_UPDATE_EXISTING => t('Update existing @entities', $tokens), + ), + '#default_value' => $this->config['update_existing'], + ); + global $user; + $formats = filter_formats($user); + foreach ($formats as $format) { + $format_options[$format->format] = $format->name; + } + $form['skip_hash_check'] = array( + '#type' => 'checkbox', + '#title' => t('Skip hash check'), + '#description' => t('Force update of items even if item source data did not change.'), + '#default_value' => $this->config['skip_hash_check'], + ); + $form['input_format'] = array( + '#type' => 'select', + '#title' => t('Text format'), + '#description' => t('Select the input format for the body field of the nodes to be created.'), + '#options' => $format_options, + '#default_value' => isset($this->config['input_format']) ? $this->config['input_format'] : 'plain_text', + '#required' => TRUE, + ); + + return $form; + } + + /** + * Get mappings. + */ + public function getMappings() { + return isset($this->config['mappings']) ? $this->config['mappings'] : array(); + } + + /** + * Declare possible mapping targets that this processor exposes. + * + * @ingroup mappingapi + * + * @return + * An array of mapping targets. Keys are paths to targets + * separated by ->, values are TRUE if target can be unique, + * FALSE otherwise. + */ + public function getMappingTargets() { + + // The bundle has not been selected. + if (!$this->bundle()) { + $info = $this->entityInfo(); + $bundle_name = !empty($info['bundle name']) ? drupal_strtolower($info['bundle name']) : t('bundle'); + $plugin_key = feeds_importer($this->id)->config['processor']['plugin_key']; + $url = url('admin/structure/feeds/' . $this->id . '/settings/' . $plugin_key); + drupal_set_message(t('Please <a href="@url">select a @bundle_name</a>.', array('@url' => $url, '@bundle_name' => $bundle_name)), 'warning', FALSE); + } + + return array( + 'url' => array( + 'name' => t('URL'), + 'description' => t('The external URL of the item. E. g. the feed item URL in the case of a syndication feed. May be unique.'), + 'optional_unique' => TRUE, + ), + 'guid' => array( + 'name' => t('GUID'), + 'description' => t('The globally unique identifier of the item. E. g. the feed item GUID in the case of a syndication feed. May be unique.'), + 'optional_unique' => TRUE, + ), + ); + } + + /** + * Set a concrete target element. Invoked from FeedsProcessor::map(). + * + * @ingroup mappingapi + */ + public function setTargetElement(FeedsSource $source, $target_item, $target_element, $value) { + switch ($target_element) { + case 'url': + case 'guid': + $target_item->feeds_item->$target_element = $value; + break; + default: + $target_item->$target_element = $value; + break; + } + } + + /** + * Retrieve the target entity's existing id if available. Otherwise return 0. + * + * @ingroup mappingapi + * + * @param FeedsSource $source + * The source information about this import. + * @param $result + * A FeedsParserResult object. + * + * @return + * The serial id of an entity if found, 0 otherwise. + */ + protected function existingEntityId(FeedsSource $source, FeedsParserResult $result) { + $query = db_select('feeds_item') + ->fields('feeds_item', array('entity_id')) + ->condition('feed_nid', $source->feed_nid) + ->condition('entity_type', $this->entityType()) + ->condition('id', $source->id); + + // Iterate through all unique targets and test whether they do already + // exist in the database. + foreach ($this->uniqueTargets($source, $result) as $target => $value) { + switch ($target) { + case 'url': + $entity_id = $query->condition('url', $value)->execute()->fetchField(); + break; + case 'guid': + $entity_id = $query->condition('guid', $value)->execute()->fetchField(); + break; + } + if (isset($entity_id)) { + // Return with the content id found. + return $entity_id; + } + } + return 0; + } + + + /** + * Utility function that iterates over a target array and retrieves all + * sources that are unique. + * + * @param $batch + * A FeedsImportBatch. + * + * @return + * An array where the keys are target field names and the values are the + * elements from the source item mapped to these targets. + */ + protected function uniqueTargets(FeedsSource $source, FeedsParserResult $result) { + $parser = feeds_importer($this->id)->parser; + $targets = array(); + foreach ($this->config['mappings'] as $mapping) { + if (!empty($mapping['unique'])) { + // Invoke the parser's getSourceElement to retrieve the value for this + // mapping's source. + $targets[$mapping['target']] = $parser->getSourceElement($source, $result, $mapping['source']); + } + } + return $targets; + } + + /** + * Adds Feeds specific information on $entity->feeds_item. + * + * @param $entity + * The entity object to be populated with new item info. + * @param $feed_nid + * The feed nid of the source that produces this entity. + * @param $hash + * The fingerprint of the source item. + */ + protected function newItemInfo($entity, $feed_nid, $hash = '') { + $entity->feeds_item = new stdClass(); + $entity->feeds_item->is_new = TRUE; + $entity->feeds_item->entity_id = 0; + $entity->feeds_item->entity_type = $this->entityType(); + $entity->feeds_item->id = $this->id; + $entity->feeds_item->feed_nid = $feed_nid; + $entity->feeds_item->imported = REQUEST_TIME; + $entity->feeds_item->hash = $hash; + $entity->feeds_item->url = ''; + $entity->feeds_item->guid = ''; + } + + /** + * Loads existing entity information and places it on $entity->feeds_item. + * + * @param $entity + * The entity object to load item info for. Id key must be present. + * + * @return + * TRUE if item info could be loaded, false if not. + */ + protected function loadItemInfo($entity) { + $entity_info = entity_get_info($this->entityType()); + $key = $entity_info['entity keys']['id']; + if ($item_info = feeds_item_info_load($this->entityType(), $entity->$key)) { + $entity->feeds_item = $item_info; + return TRUE; + } + return FALSE; + } + + /** + * Create MD5 hash of item and mappings array. + * + * Include mappings as a change in mappings may have an affect on the item + * produced. + * + * @return Always returns a hash, even with empty, NULL, FALSE: + * Empty arrays return 40cd750bba9870f18aada2478b24840a + * Empty/NULL/FALSE strings return d41d8cd98f00b204e9800998ecf8427e + */ + protected function hash($item) { + return hash('md5', serialize($item) . serialize($this->config['mappings'])); + } + + /** + * Retrieves the MD5 hash of $entity_id from the database. + * + * @return string + * Empty string if no item is found, hash otherwise. + */ + protected function getHash($entity_id) { + + if ($hash = db_query("SELECT hash FROM {feeds_item} WHERE entity_type = :type AND entity_id = :id", array(':type' => $this->entityType(), ':id' => $entity_id))->fetchField()) { + // Return with the hash. + return $hash; + } + return ''; + } + + /** + * Creates a log message for when an exception occured during import. + * + * @param Exception $e + * The exception that was throwned during processing the item. + * @param $entity + * The entity object. + * @param $item + * The parser result for this entity. + * + * @return string + * The message to log. + */ + protected function createLogMessage(Exception $e, $entity, $item) { + include_once DRUPAL_ROOT . '/includes/utility.inc'; + $message = $e->getMessage(); + $message .= '<h3>Original item</h3>'; + $message .= '<pre>' . drupal_var_export($item). '</pre>'; + $message .= '<h3>Entity</h3>'; + $message .= '<pre>' . drupal_var_export($entity) . '</pre>'; + return $message; + } + +} + +class FeedsProcessorBundleNotDefined extends Exception {}