Location: PHPKode > scripts > Phirehose > phirehose/example/ghetto-queue-collect.php
<?php
require_once('../lib/Phirehose.php');
/**
 * Example of using Phirehose to collect tweets to a "ghetto queue" (ie: simple, filesystem based queue). 
 * This is not designed to be a production-ready/scalable collection system but is simple and does not rely on any
 * additional software or PHP modules. 
 * 
 * The idea with the ghetto queue is that a file is opened to collect tweets and rotate periodically to be processed by
 * a separate processing process. If you need "live" processing (ie: realtime auto-responses to tweets) you'd want to 
 * keep this low - ie: 5-10 seconds. If you're doing analytics, you should rotate less often (perhaps every hour or so).
 * 
 */
class GhettoQueueCollector extends Phirehose
{
  
  /**
   * Subclass specific constants
   */
  const QUEUE_FILE_PREFIX = 'phirehose-ghettoqueue';
  const QUEUE_FILE_ACTIVE = '.phirehose-ghettoqueue.current';
  
  /**
   * Member attributes specific to this subclass
   */
  protected $queueDir;
  protected $rotateInterval;
  protected $streamFile;
  protected $statusStream;
  protected $lastRotated;
  
  /**
   * Overidden constructor to take class-specific parameters
   * 
   * @param string $username
   * @param string $password
   * @param string $queueDir
   * @param integer $rotateInterval
   */
  public function __construct($username, $password, $queueDir = '/tmp', $rotateInterval = 10)
  {
    
    // Sanity check
    if ($rotateInterval < 5) {
      throw new Exception('Rotate interval set too low - Must be >= 5 seconds');
    }
    
    // Set subclass parameters
    $this->queueDir = $queueDir;
    $this->rotateInterval = $rotateInterval;
    
    // Call parent constructor
    return parent::__construct($username, $password, Phirehose::METHOD_FILTER);
  }
  
  /**
   * Enqueue each status
   *
   * @param string $status
   */
  public function enqueueStatus($status)
  {
    
    // Write the status to the stream (must be via getStream())
    fputs($this->getStream(), $status);

    /* Are we due for a file rotate? Note this won't be called if there are no statuses coming through - 
     * This is (probably) a good thing as it means the collector won't needlessly rotate empty files. That said, if
     * you have a very sparse/quiet stream that you need highly regular analytics on, this may not work for you. 
     */
    $now = time();
    if (($now - $this->lastRotated) > $this->rotateInterval) {
      // Mark last rotation time as now
      $this->lastRotated = $now;
      
      // Rotate it
      $this->rotateStreamFile();
    }
    
  }
  
  /**
   * Returns a stream resource for the current file being written/enqueued to
   * 
   * @return resource
   */
  private function getStream() 
  {
    // If we have a valid stream, return it
    if (is_resource($this->statusStream)) {
      return $this->statusStream;
    }
    
    // If it's not a valid resource, we need to create one
    if (!is_dir($this->queueDir) || !is_writable($this->queueDir)) {
      throw new Exception('Unable to write to queueDir: ' . $this->queueDir);
    }

    // Construct stream file name, log and open
    $this->streamFile = $this->queueDir . '/' . self::QUEUE_FILE_ACTIVE;
    $this->log('Opening new active status stream: ' . $this->streamFile);
    $this->statusStream = fopen($this->streamFile, 'a'); // Append if present (crash recovery)
    
    // Ok?
    if (!is_resource($this->statusStream)) {
      throw new Exception('Unable to open stream file for writing: ' . $this->streamFile);
    }
    
    // If we don't have a last rotated time, it's effectively now
    if ($this->lastRotated == NULL) {
      $this->lastRotated = time();
    }
    
    // Looking good, return the resource
    return $this->statusStream;
    
  }
  
  /**
   * Rotates the stream file if due
   */
  private function rotateStreamFile()
  {
    // Close the stream
    fclose($this->statusStream);
    
    // Create queue file with timestamp so they're both unique and naturally ordered
    $queueFile = $this->queueDir . '/' . self::QUEUE_FILE_PREFIX . '.' . date('Ymd-His') . '.queue';
    
    // Do the rotate
    rename($this->streamFile, $queueFile);
    
    // Did it work?
    if (!file_exists($queueFile)) {
      throw new Exception('Failed to rotate queue file to: ' . $queueFile);
    }
    
    // At this point, all looking good - the next call to getStream() will create a new active file
    $this->log('Successfully rotated active stream to queue file: ' . $queueFile);
  }
  
} // End of class

// Start streaming/collecting
$sc = new GhettoQueueCollector('username', 'password');
$sc->setTrack(array('morning', 'goodnight', 'hello', 'the', 'and'));
$sc->consume();
Return current item: Phirehose