Location: PHPKode > scripts > Phirehose > phirehose/example/ghetto-queue-consume.php
<?php
/**
 * A simple example of how you could consume (ie: process) statuses collected by the ghetto-queue-collect.
 * 
 * This script in theory supports multi-processing assuming your filesystem supports flock() semantics. If you're not 
 * sure what that means, you probably don't need to worry about it :)
 * 
 * Caveat: I'm not sure if this works properly/at all on windows.
 * 
 * See: http://code.google.com/p/phirehose/wiki/Introduction
 */
class GhettoQueueConsumer
{
  
  /**
   * Member attribs
   */
  protected $queueDir;
  protected $filePattern;
  protected $checkInterval;
  
  /**
   * Construct the consumer and start processing
   */
  public function __construct($queueDir = '/tmp', $filePattern = 'phirehose-ghettoqueue*.queue', $checkInterval = 10)
  {
    $this->queueDir = $queueDir;
    $this->filePattern = $filePattern;
    $this->checkInterval = $checkInterval;
    
    // Sanity checks
    if (!is_dir($queueDir)) {
      throw new ErrorException('Invalid directory: ' . $queueDir);
    }
    
  }
  
  /**
   * Method that actually starts the processing task (never returns).
   */
  public function process() {
    
    // Init some things
    $lastCheck = 0;
    
    // Loop infinitely
    while (TRUE) {
      
      // Get a list of queue files
      $queueFiles = glob($this->queueDir . '/' . $this->filePattern);
      $lastCheck = time();
      
      $this->log('Found ' . count($queueFiles) . ' queue files to process...');
      
      // Iterate over each file (if any)
      foreach ($queueFiles as $queueFile) {
        $this->processQueueFile($queueFile);
      }
      
      // Wait until ready for next check
      $this->log('Sleeping...');
      while (time() - $lastCheck < $this->checkInterval) {
        sleep(1);
      }
      
    } // Infinite loop
    
  } // End process()
  
  /**
   * Processes a queue file and does something with it (example only)
   * @param string $queueFile The queue file
   */
  protected function processQueueFile($queueFile) {
    $this->log('Processing file: ' . $queueFile);
    
    // Open file
    $fp = fopen($queueFile, 'r');
    
    // Check if something has gone wrong, or perhaps the file is just locked by another process
    if (!is_resource($fp)) {
      $this->log('WARN: Unable to open file or file already open: ' . $queueFile . ' - Skipping.');
      return FALSE;
    }
    
    // Lock file
    flock($fp, LOCK_EX);
    
    // Loop over each line (1 line per status)
    $statusCounter = 0;
    while ($rawStatus = fgets($fp, 4096)) {
      $statusCounter ++;
      
      /** **************** NOTE ********************
       * This is the part where you would normally do your processing. If you're extracting/trending information 
       * about the tweets it should happen here, where it doesn't matter so much if things are slow (you will
       * catch up on the next loop).
       */
      $data = json_decode($rawStatus, true);
      if (is_array($data) && isset($data['user']['screen_name'])) {
        $this->log('Decoded tweet: ' . $data['user']['screen_name'] . ': ' . urldecode($data['text']));
      }
      
    } // End while
    
    // Release lock and close
    flock($fp, LOCK_UN);
    fclose($fp);
    
    // All done with this file
    $this->log('Successfully processed ' . $statusCounter . ' tweets from ' . $queueFile . ' - deleting.');
    unlink($queueFile);    
    
  }
  
  /**
   * Basic log function.
   *
   * @see error_log()
   * @param string $messages
   */
  protected function log($message)
  {
    @error_log($message, 0);
  }
    
}

// Construct consumer and start processing
$gqc = new GhettoQueueConsumer();
$gqc->process();
Return current item: Phirehose