Location: PHPKode > scripts > Stream > Stream.class.php
<?php
/**
 * Asyncronous Stream Loader Class
 *
 * @author Alexander Over <hide@address.com>
 */
abstract class Stream
{
  /**
   * @var int $timeout default stream timeout in sec.
   */
  private $timeout = 5;

  /**
   * @var int $readbuffer default readbuffer size in bytes.
   */
  private $readbuffer = 1024;

  /**
   * @var string $protocol default transport protocol
   */
  protected $protocol = 'tcp';

  /**
   * @var int $port connection port
   */
  protected $port;

  /**
   * @var bool $isSSL use tls or not
   */
  protected $isSSL = false;

  /**
   * @var array $request empty array to hold the request data
   */
  protected $request = array();

  const ASYNC_CONNECT = STREAM_CLIENT_ASYNC_CONNECT;
  const CONNECT = STREAM_CLIENT_CONNECT;

  /**
   * magic function to set protected class variables
   */
  final public function __set($name, $value)
  {
    if (property_exists($this, $name))
    {
      $this->$name = $value;
    }
    else
    {
      return false;
    }
  }

  /**
   * Core request handling
   * @param int $maxParallel limit the count of parallel requests. 0 for no limitation
   *
   * @return array $buffer
   */
  protected function doRequest($maxParallel = 0)
  {
    $requestCount = count($this->request);
    if ( $requestCount > 0 )
    {
      if($maxParallel == 0 || $maxParallel >= $requestCount)
      {
        $maxParallel = $requestCount;
      }

      $buffer = $info = array();
      $progressCounter = 0;

      for (;$progressCounter < $requestCount; $progressCounter += $maxParallel)
      {
        $requestQueue = array_slice($this->request, $progressCounter, $maxParallel, true);
        // start adding the requests to the pool
        foreach ($requestQueue as $key => $request)
        {
          $errno = $errstr = false;

          $sockets[$key] = stream_socket_client($this->protocol.'://' . $request['path'], &$errno, &$errstr, $this->timeout, self::CONNECT|self::ASYNC_CONNECT);
          if (!$sockets[$key])
          {
            unset($sockets[$key]);
            continue;
          }
          else
          {
            stream_set_timeout($sockets[$key], $this->timeout);
            if ($this->isSSL)
            {
              stream_socket_enable_crypto($sockets[$key], true, STREAM_CRYPTO_METHOD_SSLv23_CLIENT);
            }
          }
        }

        // we have opened connections, so do some stuff
        while (count($sockets))
        {
          $read = $write = $sockets;
          $e = null;
          $n = stream_select($read, $write, $e, $this->timeout);
          if($n < 1)
          {
            break;
          }

          // iterate and read from sockets ...
          foreach ($read as $r)
          {
            $key = array_search($r, $sockets);
            $info[$key] = socket_get_status($sockets[$key]);
            $buffer[$key] = '';
            while (!feof($sockets[$key]) && !$info[$key]['timed_out'])
            {
              $sRead = fgets($sockets[$key], $this->readbuffer);
              if (strlen($sRead) == 0)
              {
                break;
              }
              $buffer[$key].= $sRead;
              $info[$key] = socket_get_status($sockets[$key]);
            }
            // we have a custom callback, so do something other with result and return only the result of the custom method.
            if (isset($requestQueue[$key]['cust']))
            {
              if (method_exists( $this, $requestQueue[$key]['cust']))
              {
                $result = call_user_func_array( array($this, $requestQueue[$key]['cust']),
                                                array('config' => $requestQueue[$key],
                                                      'data' => $buffer[$key]));
                unset($buffer[$key]);
                $buffer[$key] = $result;
              } else
              {
                throw new Exception('Fatal: Custom Callback: '.$requestQueue[$key]['cust'].' not found!');
              }
            }
            // the socket is done, so we can close him.
            fclose($sockets[$key]);
            unset($sockets[$key]);
          }

          // iterate and write through active sockets ...
          foreach ($write as $w)
          {
            $key = array_search($w, $sockets);
            if (isset($writehistory[$key]))
            {
              continue;
            }
            if (isset($sockets[$key]) && is_resource($sockets[$key]))
            {
              fputs($sockets[$key], $requestQueue[$key]['data']);
            }
            $writehistory[$key] = true;
          }
        }
      }

      return $buffer;
    } else
    {
      throw new Exception('nothing to do');
    }
  }

  /**
   * Clear the interal request handler
   */
  public function clearRequest()
  {
    $this->request = array();
  }
}
Return current item: Stream