Multi-threaded Processes in Drupal

Specs

Version
Drupal 6
Tools
Batch Processes
Drush
Multi-threading
Created
28 Feb 2016

Summary

Este fragmento de código se muestra cómo hacerlo multi-threading en Drupal. Multi-threading puede ser útil en el tratamiento de grandes conjuntos de datos, ya que le permite ejecutar múltiples procesos concurrentes. Por lo que puede romper el conjunto de datos en la que 2 o más procesos separados que se ejecutan al mismo tiempo, proporcionando una reducción del tiempo de procesamiento.

En este ejemplo se utiliza el módulo de proceso de fondo para romper un gran CSV en un número variable de procesos. Por lo que puede pasar el número de procesos como un argumento en función de su caso de uso y la infraestructura. Se utiliza un comando personalizado Drush que pueda ser ejecutado por cron o su solicitud de integración continua.

Drush Command Example
drush multi-threaded-process-example --threads=4

Code

1. Drush command definition and parent process

MODULE_NAME.drush.inc
The code defines the Drush command, launches the child processes, and monitors their progress.
          
function MODULE_NAME_drush_command() {
  $items = array();
  $items['multi-threaded-process-example'] = array(
    'description' => "Imports HDS CM.",
    'aliases' => array('mtpe'),
    'options' => array(
      'threads' => '',
    ),
    'bootstrap' => DRUSH_BOOTSTRAP_DRUPAL_FULL,
  );
  return $items;
}

/*
 * Follows Drush naming convention of drush_MODULE_NAME_command_name
 */
function drush_MODULE_NAME_multi_threaded_process_example() {

  $filename =''; // Use your own logic to determine filename here.

  $fp = file($filename);
  $total = count($fp);

  // File has less than 5000 records. Just use a single process.
  if ($total <= 5000) {
    $processes = array();
    $process = array();
    $process['start'] = 1;
    $process['stop'] = $total;
    $processes[] = $process;
  }
  else {

    $processes = array();

    $threads = drush_get_option('threads');

    if (empty($threads)) {
      $threads = 4;
    }

    if (!empty($threads) && ($threads <= 1) || ($threads > 4)) {
      drush_log(t('Number of threads can only be between 1 and 4.'), 'error');
      return;
    }

    $remainder = $total % $threads;
    $add = $threads - $remainder;
    $total += $add;
    $rows_per_process = $total / $threads;

    $processes = array();

    $process = array();
    $process['start'] = 1;
    $process['stop'] = $rows_per_process;
    $processes[] = $process;

    $pid = 0;
    $start_factor = 1;
    $end_factor = 2;

    while ($pid < ($threads-1)) {

      $process = array();
      $process['start'] = $rows_per_process*$start_factor;
      $process['stop'] = $rows_per_process*$end_factor;
      $processes[] = $process;

      $start_factor++;
      $end_factor++;
      $pid++;

    }

  }

  // This is designed to run it in any number of processes.
  $done = FALSE;
  $id = 0; // ID of the thread (internal)
  $batch_ids = array(); // The ID numbers of the created Batch jobs to monitor in the second step.
  $background_processes = array();
  $job_id = time();
  
  // the child processes use a variable stored in the DB to communicate with this (parent) process
  variable_set('MODULE_NAME_multi_threaded_process_example_thread_data_' . $job_id, array());

  drush_print(t('Breaking up file and launching !count child processes', array('!count' => count($processes))));
  while (($done != TRUE) && ($id < count($processes))) {

    drush_print(t('Attempting to start process !number', array('!number' => $id)));
    $handle = background_process_start('MODULE_NAME_multi_threaded_process_example_batch_init', $job_id, $id, $processes[$id]['start'], $processes[$id]['stop'], $filename);

    drush_print(t('Verifying that the process started okay...'));
    sleep(15); // this assumes the child process first page request takes less than 15 seconds

    $current_thread_data = MODULE_NAME_multi_threaded_process_example_get_threads_status($job_id);

    if (isset($current_thread_data[$id]) && !empty($current_thread_data[$id]['id'])) {
      drush_log(t('Process !number was created okay using batch id !batch_id.', array('!number' => $id, '!batch_id' => $current_thread_data[$id]['id'])), 'success');
      $id++; // only increment the ID of the next process if this one was created
    }
    else {
      drush_log(t('Process !number was not created okay. Deleting the process and trying again.', array('!number' => $id)), 'warning');
      background_process_remove_process($handle);
    }

  }

  drush_log(t('All processes have started. Waiting 60 seconds to check their status.'), 'success');
  sleep(60);

  // Now monitor the progress of the background batch jobs.
  $done = FALSE;
  while ($done != TRUE) {
    $current_thread_data = MODULE_NAME_multi_threaded_process_example_get_threads_status($job_id);

    $output = '';
    $processes_running = array();
    foreach ($current_thread_data as $pid => $data) {
      $batch_from_db = db_fetch_object(db_query('SELECT * FROM batch WHERE bid = %d', $data['id']));
      if (!empty($batch_from_db->bid)) {
        $batch_data = unserialize($batch_from_db->batch);
        if (!empty($batch_data['sets'][0]['sandbox']['progress'])) {
          $total = $batch_data['sets'][0]['sandbox']['stop'] - $batch_data['sets'][0]['sandbox']['start'];
          $current = $batch_data['sets'][0]['sandbox']['progress'] - $batch_data['sets'][0]['sandbox']['start'];
          $output .= t('P!pid: !current/!total  ', array('!pid' => $pid, '!current' => $current, '!total' => $total));
        }
        $handle = 'background_batch:' . $data['id'];
        $background_process_exists = db_result(db_query("SELECT handle FROM background_process WHERE handle = '%s'", $handle));
        if (!empty($background_process_exists)) {
          $processes_running[] = $pid;
        }
      }
    }

    if (!empty($output)) {
      drush_print($output);
    }

    $seconds = time() - $job_id;
    $hours = floor($seconds / 3600);
    $mins = floor(($seconds - ($hours*3600)) / 60);
    $secs = floor($seconds % 60);
    drush_print(t('Processes have been running for !hours hours and !mins minutes', array('!hours' => $hours, '!mins' => $mins)));

    if (count($processes_running) > 0) {
      sleep(60);
    }
    else {
      drush_log(t('All threads have been closed. Job is complete.'), 'success');
      $done = TRUE;
    }
  }

}

function mmis_hds_cm_import_hds_cm_import_get_threads_status($job_id) {
  $name = 'mmis_hds_cm_import_hds_cm_import_thread_data_' . $job_id;
  $value = db_result(db_query("SELECT value FROM variable WHERE name = '%s'", $name));
  if (!empty($value)) {
    $value = unserialize($value);
  }
  return $value;
}
          
          

2. Child process logic and batch processes

MODULE_NAME.module
This code needs to go in your module file or somewhere loaded on every page request. The child processes are on a separate page request than the parent process.
          
function MODULE_NAME_multi_threaded_process_example_batch_init($job_id, $thread, $start, $stop, $filename) {

  $batch = array(
    'operations' => array(),
    'title' => t('Processing Example Job'),
    'init_message' => t('Process is starting.'),
    'progress_message' => t('Processed @current out of @total.'),
    'error_message' => t('Process has encountered an error.'),
    'finished' => 'MODULE_NAME_multi_threaded_process_example_batch_finished'
  );

  $batch['progressive'] = FALSE;

  $batch['operations'][] = array('MODULE_NAME_multi_threaded_process_example_batch_process', array($job_id, $thread, $start, $stop, $filename));

  batch_set($batch);
  background_batch_process_batch();

}

function MODULE_NAME_multi_threaded_process_example_batch_process($job_id, $thread, $start, $stop, $filename, &$context) {

  $file_stream = fopen($filename, 'r');

  // Could not find a function to determine the offset of a given row number in a CSV. So this will
  // cycle through the file and store the start offset for the given $start value.
  if (!empty($context['sandbox']['offset'])) {
    $ret = fseek($file_stream, $context['sandbox']['offset']);
  }
  else {
    $file_stream_get_offset = fopen($filename, 'r');
    $line = 1;
    while ($file_contents = fgetcsv($file_stream_get_offset, 0, "|", '"')) {
      if ($line >= $start) {
        $start_offset = ftell($file_stream_get_offset);
        $ret = fseek($file_stream, $start_offset);
        $context['start_offset'] = $start_offset;
        break;
      }
      $line++;
    }
    fclose($file_stream_get_offset);
  }

  // On the first page request, just store the context values so this process can report to
  // the process handler quickly.
  if (empty($context['sandbox']['start'])) {

    $batch =& batch_get();

    // The child processes use a variable to talk to the parent process. It is database
    // stored so the parent process can use it to monitor progress.
    $current_thread_data = MODULE_NAME_multi_threaded_process_example_get_threads_status($job_id);
    $current_thread_data[$thread] = array(
      'timestamp' => time(),
      'id' => $batch['id'],
    );
    variable_set('MODULE_NAME_multi_threaded_process_example_thread_data_' . $job_id, $current_thread_data);

    $context['sandbox']['progress'] = $start;
    $context['sandbox']['thread'] = $thread;
    $context['sandbox']['start'] = $start;
    $context['sandbox']['stop'] = $stop;
    $context['sandbox']['update'] = 0;
    $context['finished'] = FALSE;
    return;
  }

  if ($ret != 0) {
    drush_print(t('Failed to seek.'));
    $context['finished'] = TRUE;
    return;
  }

  $limit = 100; // number of rows to process per page request
  $current_iteration = 1;

  while (($file_contents = fgetcsv($file_stream, 0, "|", '"')) && ($current_iteration <= $limit) && ($context['sandbox']['progress'] <= $stop)) {

    // this example updates or creates new nodes based on the NID field in the CSV
  
    $nid = $file_contents[$data_mappings['nid']];
    $save = FALSE;

    if (is_numeric($nid)) {
      // This is an existing record, so load it or return FALSE
      $node = node_load($nid, NULL, TRUE);
      if (empty($node->nid)) {
        continue;
      }
    }
    else {
      // This is a new record, so set up a new CR node.
      $node = new stdClass();
      $node->type = 'example_content_type';
      $node->title = 'Example Title';
      $node->uid = 1;
      $node->created = REQUEST_TIME;
    }
    
    // use your logic to build node data and fields here

    node_save($node);
  
    $context['sandbox']['progress']++; // total position in file
    $current_iteration++; // position in this thread

  }

  $context['sandbox']['offset'] = ftell($file_stream);

  $eof = feof($file_stream);
  $done = ($context['sandbox']['progress'] >= $stop) ? 1 : 0;

  fclose($file_stream);

  $context['finished'] = ($eof || $done) ? 1 : 0;
  $context['success'] = $context['finished'];

  return;

}

function MODULE_NAME_multi_threaded_process_example_batch_finished($success, $results, $operations) {
  if ($success) {
    drush_log(t('Finished processing files'), 'success');
  }
  else {
    // An error occurred.
    // $operations contains the operations that remained unprocessed.
    $error_operation = reset($operations);
    $message = t('An error occurred while processing %error_operation with arguments: @arguments', array('%error_operation' => $error_operation[0], '@arguments' => print_r($error_operation[1], TRUE)));
    drush_log($message, 'error');
  }
}
          
          

Comments

Placeholder: I'll extend node comments with ajax and other functionality here.