Multi-threaded Processes in Drupal

Specs

Version
Drupal 6
Tools
Batch Processes
Drush
Multi-threading
Created
28 Feb 2016

Summary

This snippet demonstrates how to do multi-threading in Drupal. Multi-threading can be useful when processing large sets of data because it allows you to run multiple concurrent processes. So you can break up your data set in to 2 or more separate processes that run at the same time, providing reduced processing time.

This example uses the Background Process module to break up a large CSV in to a variable number of processes. So you can pass the number of processes as an argument depending on your use case and infrastructure. It uses a custom Drush command that can be run by cron or your continuous integration application.

Drush Command Example
drush multi-threaded-process-example --threads=4

Code

1. Drush command definition and parent process

MODULE_NAME.drush.inc
The code defines the Drush command, launches the child processes, and monitors their progress.
          
function MODULE_NAME_drush_command() {
  $items = array();
  $items['multi-threaded-process-example'] = array(
    'description' => "Imports an example dataset.",
    'aliases' => array('mtpe'),
    'options' => array(
      'threads' => '',
    ),
    'bootstrap' => DRUSH_BOOTSTRAP_DRUPAL_FULL,
  );
  return $items;
}

/*
 * Follows Drush naming convention of drush_MODULE_NAME_command_name
 */
function drush_MODULE_NAME_multi_threaded_process_example() {

  $filename =''; // Use your own logic to determine filename here.

  $fp = file($filename);
  $total = count($fp);

  // File has less than 5000 records. Just use a single process.
  if ($total <= 5000) {
    $processes = array();
    $process = array();
    $process['start'] = 1;
    $process['stop'] = $total;
    $processes[] = $process;
  }
  else {

    $processes = array();

    $threads = drush_get_option('threads');

    if (empty($threads)) {
      $threads = 4;
    }

    if (!empty($threads) && ($threads <= 1) || ($threads > 4)) {
      drush_log(t('Number of threads can only be between 1 and 4.'), 'error');
      return;
    }

    $remainder = $total % $threads;
    $add = $threads - $remainder;
    $total += $add;
    $rows_per_process = $total / $threads;

    $processes = array();

    $process = array();
    $process['start'] = 1;
    $process['stop'] = $rows_per_process;
    $processes[] = $process;

    $pid = 0;
    $start_factor = 1;
    $end_factor = 2;

    while ($pid < ($threads-1)) {

      $process = array();
      $process['start'] = $rows_per_process*$start_factor;
      $process['stop'] = $rows_per_process*$end_factor;
      $processes[] = $process;

      $start_factor++;
      $end_factor++;
      $pid++;

    }

  }

  // This is designed to run it in any number of processes.
  $done = FALSE;
  $id = 0; // ID of the thread (internal)
  $batch_ids = array(); // The ID numbers of the created Batch jobs to monitor in the second step.
  $background_processes = array();
  $job_id = time();
  
  // the child processes use a variable stored in the DB to communicate with this (parent) process
  variable_set('MODULE_NAME_multi_threaded_process_example_thread_data_' . $job_id, array());

  drush_print(t('Breaking up file and launching !count child processes', array('!count' => count($processes))));
  while (($done != TRUE) && ($id < count($processes))) {

    drush_print(t('Attempting to start process !number', array('!number' => $id)));
    $handle = background_process_start('MODULE_NAME_multi_threaded_process_example_batch_init', $job_id, $id, $processes[$id]['start'], $processes[$id]['stop'], $filename);

    drush_print(t('Verifying that the process started okay...'));
    sleep(15); // this assumes the child process first page request takes less than 15 seconds

    $current_thread_data = MODULE_NAME_multi_threaded_process_example_get_threads_status($job_id);

    if (isset($current_thread_data[$id]) && !empty($current_thread_data[$id]['id'])) {
      drush_log(t('Process !number was created okay using batch id !batch_id.', array('!number' => $id, '!batch_id' => $current_thread_data[$id]['id'])), 'success');
      $id++; // only increment the ID of the next process if this one was created
    }
    else {
      drush_log(t('Process !number was not created okay. Deleting the process and trying again.', array('!number' => $id)), 'warning');
      background_process_remove_process($handle);
    }

  }

  drush_log(t('All processes have started. Waiting 60 seconds to check their status.'), 'success');
  sleep(60);

  // Now monitor the progress of the background batch jobs.
  $done = FALSE;
  while ($done != TRUE) {
    $current_thread_data = MODULE_NAME_multi_threaded_process_example_get_threads_status($job_id);

    $output = '';
    $processes_running = array();
    foreach ($current_thread_data as $pid => $data) {
      $batch_from_db = db_fetch_object(db_query('SELECT * FROM batch WHERE bid = %d', $data['id']));
      if (!empty($batch_from_db->bid)) {
        $batch_data = unserialize($batch_from_db->batch);
        if (!empty($batch_data['sets'][0]['sandbox']['progress'])) {
          $total = $batch_data['sets'][0]['sandbox']['stop'] - $batch_data['sets'][0]['sandbox']['start'];
          $current = $batch_data['sets'][0]['sandbox']['progress'] - $batch_data['sets'][0]['sandbox']['start'];
          $output .= t('P!pid: !current/!total  ', array('!pid' => $pid, '!current' => $current, '!total' => $total));
        }
        $handle = 'background_batch:' . $data['id'];
        $background_process_exists = db_result(db_query("SELECT handle FROM background_process WHERE handle = '%s'", $handle));
        if (!empty($background_process_exists)) {
          $processes_running[] = $pid;
        }
      }
    }

    if (!empty($output)) {
      drush_print($output);
    }

    $seconds = time() - $job_id;
    $hours = floor($seconds / 3600);
    $mins = floor(($seconds - ($hours*3600)) / 60);
    $secs = floor($seconds % 60);
    drush_print(t('Processes have been running for !hours hours and !mins minutes', array('!hours' => $hours, '!mins' => $mins)));

    if (count($processes_running) > 0) {
      sleep(60);
    }
    else {
      drush_log(t('All threads have been closed. Job is complete.'), 'success');
      $done = TRUE;
    }
  }

}

function MODULE_NAME_multi_threaded_process_example_get_threads_status($job_id) {
  $name = 'MODULE_NAME_multi_threaded_process_example_thread_data_' . $job_id;
  $value = db_result(db_query("SELECT value FROM variable WHERE name = '%s'", $name));
  if (!empty($value)) {
    $value = unserialize($value);
  }
  return $value;
}
          
          

2. Child process logic and batch processes

MODULE_NAME.module
This code needs to go in your module file or somewhere loaded on every page request. The child processes are on a separate page request than the parent process.
          
function MODULE_NAME_multi_threaded_process_example_batch_init($job_id, $thread, $start, $stop, $filename) {

  $batch = array(
    'operations' => array(),
    'title' => t('Processing Example Job'),
    'init_message' => t('Process is starting.'),
    'progress_message' => t('Processed @current out of @total.'),
    'error_message' => t('Process has encountered an error.'),
    'finished' => 'MODULE_NAME_multi_threaded_process_example_batch_finished'
  );

  $batch['progressive'] = FALSE;

  $batch['operations'][] = array('MODULE_NAME_multi_threaded_process_example_batch_process', array($job_id, $thread, $start, $stop, $filename));

  batch_set($batch);
  background_batch_process_batch();

}

function MODULE_NAME_multi_threaded_process_example_batch_process($job_id, $thread, $start, $stop, $filename, &$context) {

  $file_stream = fopen($filename, 'r');

  // Could not find a function to determine the offset of a given row number in a CSV. So this will
  // cycle through the file and store the start offset for the given $start value.
  if (!empty($context['sandbox']['offset'])) {
    $ret = fseek($file_stream, $context['sandbox']['offset']);
  }
  else {
    $file_stream_get_offset = fopen($filename, 'r');
    $line = 1;
    while ($file_contents = fgetcsv($file_stream_get_offset, 0, "|", '"')) {
      if ($line >= $start) {
        $start_offset = ftell($file_stream_get_offset);
        $ret = fseek($file_stream, $start_offset);
        $context['start_offset'] = $start_offset;
        break;
      }
      $line++;
    }
    fclose($file_stream_get_offset);
  }

  // On the first page request, just store the context values so this process can report to
  // the process handler quickly.
  if (empty($context['sandbox']['start'])) {

    $batch =& batch_get();

    // The child processes use a variable to talk to the parent process. It is database
    // stored so the parent process can use it to monitor progress.
    $current_thread_data = MODULE_NAME_multi_threaded_process_example_get_threads_status($job_id);
    $current_thread_data[$thread] = array(
      'timestamp' => time(),
      'id' => $batch['id'],
    );
    variable_set('MODULE_NAME_multi_threaded_process_example_thread_data_' . $job_id, $current_thread_data);

    $context['sandbox']['progress'] = $start;
    $context['sandbox']['thread'] = $thread;
    $context['sandbox']['start'] = $start;
    $context['sandbox']['stop'] = $stop;
    $context['sandbox']['update'] = 0;
    $context['finished'] = FALSE;
    return;
  }

  if ($ret != 0) {
    drush_print(t('Failed to seek.'));
    $context['finished'] = TRUE;
    return;
  }

  $limit = 100; // number of rows to process per page request
  $current_iteration = 1;

  while (($file_contents = fgetcsv($file_stream, 0, "|", '"')) && ($current_iteration <= $limit) && ($context['sandbox']['progress'] <= $stop)) {

    // this example updates or creates new nodes based on the NID field in the CSV
  
    $nid = $file_contents[$data_mappings['nid']];
    $save = FALSE;

    if (is_numeric($nid)) {
      // This is an existing record, so load it or return FALSE
      $node = node_load($nid, NULL, TRUE);
      if (empty($node->nid)) {
        continue;
      }
    }
    else {
      // This is a new record, so set up a new CR node.
      $node = new stdClass();
      $node->type = 'example_content_type';
      $node->title = 'Example Title';
      $node->uid = 1;
      $node->created = REQUEST_TIME;
    }
    
    // use your logic to build node data and fields here

    node_save($node);
  
    $context['sandbox']['progress']++; // total position in file
    $current_iteration++; // position in this thread

  }

  $context['sandbox']['offset'] = ftell($file_stream);

  $eof = feof($file_stream);
  $done = ($context['sandbox']['progress'] >= $stop) ? 1 : 0;

  fclose($file_stream);

  $context['finished'] = ($eof || $done) ? 1 : 0;
  $context['success'] = $context['finished'];

  return;

}

function MODULE_NAME_multi_threaded_process_example_batch_finished($success, $results, $operations) {
  if ($success) {
    drush_log(t('Finished processing files'), 'success');
  }
  else {
    // An error occurred.
    // $operations contains the operations that remained unprocessed.
    $error_operation = reset($operations);
    $message = t('An error occurred while processing %error_operation with arguments: @arguments', array('%error_operation' => $error_operation[0], '@arguments' => print_r($error_operation[1], TRUE)));
    drush_log($message, 'error');
  }
}
          
          

Comments

Placeholder: I'll extend node comments with ajax and other functionality here.