Location: PHPKode > scripts > Shard-Query > shard-query/store_resultset.php
<?php
#THIS JOB INSERTS DATA DIRECTLY INTO the tmp_shard
#This job also uses an unbuffered query
require_once 'Net/Gearman/Job.php';

class Net_Gearman_Job_store_resultset extends Net_Gearman_Job_Common
{
    public function run($arg) {
	$stmt = false;
	if(!$arg) return;
	$arg = (object)$arg;
	$arg->shard = (object)$arg->shard;
	$arg->tmp_shard = (object)$arg->tmp_shard;
	$conn = false;
        $conn = @mysql_connect($arg->shard->host, $arg->shard->user, $arg->shard->password,true);
	if(!$conn) throw new Net_Gearman_Job_Exception(mysql_error());

        $tmp_conn = @mysql_connect($arg->shard->host, $arg->shard->user, $arg->shard->password,true);
	if(!$tmp_conn) throw new Net_Gearman_Job_Exception(mysql_error());

	if(!mysql_select_db($arg->shard->db, $conn)) throw new Net_Gearman_Job_Exception(mysql_error());
	if(!mysql_select_db($arg->shard->db, $tmp_conn)) throw new Net_Gearman_Job_Exception(mysql_error());

	$stmt = mysql_query("select @@max_allowed_packet", $tmp_conn);
	if(!$stmt) throw new Net_Gearman_Job_Exception(mysql_error());
	$row = mysql_fetch_array($stmt);
	$max_len = $row[0] - 4096;
	mysql_free_result($stmt);

	#get the data from the shard using MYSQL_USE_RESULT
    	$stmt = mysql_unbuffered_query($arg->sql, $conn);    
	if(!$stmt) throw new Net_Gearman_Job_Exception(mysql_error());
	$created_sql = false;	
        $sql = "INSERT INTO `{$arg->table_name}` VALUES ";
	$values = "";
	while($row = mysql_fetch_assoc($stmt)) {
		if(!$created_sql)  {
                	$created_sql = "CREATE TABLE IF NOT EXISTS `{$arg->table_name}` (";
	                $col_sql = "";
        	        $cols = array_keys($row);
	                for($i = 0; $i<count($cols);++$i) {

        	                if($cols[$i] == "0") continue;
	                        if($col_sql) $col_sql .= ", ";
	                        $col_sql .= "`{$cols[$i]}` BINARY(255)";
	                }
	                $created_sql .= $col_sql . ") ENGINE=INNODB";
	        
			#echo $created_sql . "\n";
                	if(!mysql_query($created_sql, $tmp_conn)) {
				throw new Net_Gearman_Job_Exception(mysql_error());
			}
		}
        
		if($values) $values .= ",";
		$val_list = "";
		foreach($row as $col => $val) {
			#echo "COL: $col, VAL: $val\n";
			if($col == "0") continue;
			if($val_list) $val_list .= ',';
			$val_list .= "'{$val}'";
		}
		$values .= "({$val_list})";
		#we don't want to exceed max_packet_len
		if(strlen($values) >= $max_len) {
			#echo $sql . "$values\n";
			if(!mysql_query($sql . $values, $tmp_conn)) {
				throw new Net_Gearman_Job_Exception(mysql_error($tmp_conn));
			}
			$values = "";
		}
	}
	#any rows left over?
	if($values) {
		#echo $sql . "$values\n";
		if(!mysql_query($sql . $values, $tmp_conn)) {
			throw new Net_Gearman_Job_Exception(mysql_error($tmp_conn));
		}
	}
	mysql_free_result($stmt);
    	return(array('done' =>true));
    }

}

?>
Return current item: Shard-Query