<?php
/**
* Copyright 2007 Maintainable Software, LLC
* Copyright 2008 The Horde Project (http://www.horde.org/)
*
* @author Mike Naberezny <hide@address.com>
* @author Derek DeVries <hide@address.com>
* @author Chuck Hagenbuch <hide@address.com>
* @license http://opensource.org/licenses/bsd-license.php
* @category Horde
* @package Horde_Db
* @subpackage Adapter
*/
/**
* @author Mike Naberezny <hide@address.com>
* @author Derek DeVries <hide@address.com>
* @author Chuck Hagenbuch <hide@address.com>
* @license http://opensource.org/licenses/bsd-license.php
* @category Horde
* @package Horde_Db
* @subpackage Adapter
*/
class Horde_Db_Adapter_Postgresql_Schema extends Horde_Db_Adapter_Abstract_Schema
{
/**
* @var string
*/
protected $_schemaSearchPath = '';
/*##########################################################################
# Quoting
##########################################################################*/
/**
* Quotes column names for use in SQL queries.
*
* @return string
*/
public function quoteColumnName($name)
{
return '"' . str_replace('"', '""', $name) . '"';
}
/**
* Quotes PostgreSQL-specific data types for SQL input.
*/
public function quote($value, $column = null)
{
if (!$column)
return parent::quote($value, $column);
if (is_string($value) && $column->getType() == 'binary' && method_exists($column, 'stringToBinary')) {
/*@TODO test blobs/bytea fields with postgres/pdo and figure out how
this should work */
return $this->quotedStringPrefix() . "'" . $column->stringToBinary($value) . "'";
} elseif (is_string($value) && $column->getSqlType() == 'xml') {
return "xml '" . $this->quoteString($value) . "'";
} elseif (is_numeric($value) && $column->getSqlType() == 'money') {
// Not truly string input, so doesn't require (or allow) escape string syntax.
return "'" . $value . "'";
} elseif (is_string($value) && substr($column->getSqlType(), 0, 3) == 'bit') {
if (preg_match('/^[01]*$/', $value)) {
// Bit-string notation
return "B'" . $value . "'";
} elseif (preg_match('/^[0-9A-F]*$/i')) {
// Hexadecimal notation
return "X'" . $value . "'";
}
}
return parent::quote($value, $column);
}
/*##########################################################################
# Schema Statements
##########################################################################*/
/**
* The db column types for this adapter
*
* @return array
*/
public function nativeDatabaseTypes()
{
return array(
'primaryKey' => 'serial primary key',
'string' => array('name' => 'character varying', 'limit' => 255),
'text' => array('name' => 'text', 'limit' => null),
'integer' => array('name' => 'integer', 'limit' => null),
'float' => array('name' => 'float', 'limit' => null),
'decimal' => array('name' => 'decimal', 'limit' => null),
'datetime' => array('name' => 'timestamp', 'limit' => null),
'timestamp' => array('name' => 'timestamp', 'limit' => null),
'time' => array('name' => 'time', 'limit' => null),
'date' => array('name' => 'date', 'limit' => null),
'binary' => array('name' => 'bytea', 'limit' => null),
'boolean' => array('name' => 'boolean', 'limit' => null),
);
}
/**
* Returns the configured supported identifier length supported by PostgreSQL,
* or report the default of 63 on PostgreSQL 7.x.
*/
public function tableAliasLength()
{
if ($this->postgresqlVersion() >= 80000) {
return (int)$this->selectValue('SHOW max_identifier_length');
} else return 63;
}
/**
* Returns a string of <tt>CREATE TABLE</tt> SQL statement(s) for recreating the
* entire structure of the database.
*
* @param string $table
* @return string
*/
public function structureDump($table=null)
{
}
/**
* Create a new PostgreSQL database. Options include <tt>:owner</tt>, <tt>:template</tt>,
* <tt>:encoding</tt>, <tt>:tablespace</tt>, and <tt>:connection_limit</tt> (note that MySQL uses
* <tt>:charset</tt> while PostgreSQL uses <tt>:encoding</tt>).
*
* Example:
* create_database config[:database], config
* create_database 'foo_development', :encoding => 'unicode'
*/
public function createDatabase($name, $options = array())
{
$options = array_merge(array('encoding' => 'utf8'), $options);
$optionString = '';
foreach ($options as $key => $value) {
switch ($key) {
case 'owner':
$optionString .= " OWNER = '$value'";
break;
case 'template':
$optionString .= " TEMPLATE = $value";
break;
case 'encoding':
$optionString .= " ENCODING = '$value'";
break;
case 'tablespace':
$optionString .= " TABLESPACE = $value";
case 'connection_limit':
$optionString .= " CONNECTION LIMIT = $value";
}
}
return $this->execute('CREATE DATABASE ' . $this->quoteTableName($name) . $optionString);
}
/**
* Drops a PostgreSQL database
*
* Example:
* dropDatabase('matt_development')
*/
public function dropDatabase($name)
{
if ($this->postgresqlVersion() >= 80200) {
return $this->execute('DROP DATABASE IF EXISTS ' . $this->quoteTableName($name));
} else {
try {
return $this->execute('DROP DATABASE ' . $this->quoteTableName($name));
} catch (Horde_Db_Exception $e) {
/*@TODO logger.warn "#{name} database doesn't exist." if logger */
}
}
}
/**
* Returns the current database name.
*/
public function currentDatabase()
{
return $this->selectValue('SELECT current_database()');
}
/**
* Returns the list of all tables in the schema search path or a specified schema.
*/
public function tables($name = null)
{
$schemas = array();
foreach (explode(',', $this->getSchemaSearchPath()) as $p) {
$schemas[] = $this->quote($p);
}
return $this->selectValues('SELECT tablename FROM pg_tables WHERE schemaname IN (' . implode(',', $schemas) . ')', $name);
}
/**
* Returns the list of all indexes for a table.
*/
public function indexes($tableName, $name = null)
{
$schemas = array();
foreach (explode(',', $this->getSchemaSearchPath()) as $p) {
$schemas[] = $this->quote($p);
}
$sql = "
SELECT distinct i.relname, d.indisunique, a.attname
FROM pg_class t, pg_class i, pg_index d, pg_attribute a
WHERE i.relkind = 'i'
AND d.indexrelid = i.oid
AND d.indisprimary = 'f'
AND t.oid = d.indrelid
AND t.relname = " . $this->quote($tableName) . "
AND i.relnamespace IN (SELECT oid FROM pg_namespace WHERE nspname IN (" . implode(',', $schemas) . ") )
AND a.attrelid = t.oid
AND ( d.indkey[0]=a.attnum OR d.indkey[1]=a.attnum
OR d.indkey[2]=a.attnum OR d.indkey[3]=a.attnum
OR d.indkey[4]=a.attnum OR d.indkey[5]=a.attnum
OR d.indkey[6]=a.attnum OR d.indkey[7]=a.attnum
OR d.indkey[8]=a.attnum OR d.indkey[9]=a.attnum )
ORDER BY i.relname";
$result = $this->select($sql, $name);
$currentIndex = null;
$indexes = array();
foreach ($result as $row) {
if ($currentIndex != $row[0]) {
$indexes[] = (object)array('table' => $tableName,
'name' => $row[0],
'unique' => $row[1] == 't',
'columns' => array());
$currentIndex = $row[0];
}
$indexes[sizeof($indexes)-1]->columns[] = $row[2];
}
return $indexes;
}
/**
* Returns the list of all column definitions for a table.
*/
public function columns($tableName, $name = null)
{
// check cache
$rows = @unserialize($this->_cache->get("tables/$tableName"));
// query to build rows
if (!$rows) {
$rows = $this->columnDefinitions($tableName, $name);
// write cache
$this->_cache->set("tables/$tableName", serialize($rows));
}
// create columns from rows
$columns = array();
foreach ($rows as $row) {
$columns[] = new Horde_Db_Adapter_Postgresql_Column(
$row[0], $row[2], $row[1], !(boolean)$row[3]);
}
return $columns;
}
/**
* Returns the current database encoding format.
*/
public function encoding()
{
return $this->selectValue(
'SELECT pg_encoding_to_char(pg_database.encoding) FROM pg_database
WHERE pg_database.datname LIKE ' . $this->quote($this->currentDatabase()));
}
/**
* Sets the schema search path to a string of comma-separated schema names.
* Names beginning with $ have to be quoted (e.g. $user => '$user').
* See: http://www.postgresql.org/docs/current/static/ddl-schemas.html
*
* This should be not be called manually but set in database.yml.
*/
public function setSchemaSearchPath($schemaCsv)
{
if ($schemaCsv) {
$this->execute("SET search_path TO $schemaCsv");
$this->_schemaSearchPath = $schemaCsv;
}
}
/**
* Returns the active schema search path.
*/
public function getSchemaSearchPath()
{
if (!$this->_schemaSearchPath) {
$this->_schemaSearchPath = $this->selectValue('SHOW search_path');
}
return $this->_schemaSearchPath;
}
/**
* Returns the current client message level.
*/
public function getClientMinMessages()
{
return $this->selectValue('SHOW client_min_messages');
}
/**
* Set the client message level.
*/
public function setClientMinMessages($level)
{
return $this->execute('SET client_min_messages TO ' . $this->quote($level));
}
/**
* Returns the sequence name for a table's primary key or some other specified key.
*/
public function defaultSequenceName($tableName, $pk = null)
{
list($defaultPk, $defaultSeq) = $this->pkAndSequenceFor($tableName);
if (!$defaultSeq) {
$defaultSeq = $tableName . '_' . ($pk ? $pk : ($defaultPk ? $defaultPk : 'id')) . '_seq';
}
return $defaultSeq;
}
/**
* Resets the sequence of a table's primary key to the maximum value.
*/
public function resetPkSequence($table, $pk = null, $sequence = null)
{
if (!($pk && $sequence)) {
list($defaultPk, $efaultSequence) = $this->pkAndSequenceFor($table);
if (!$pk) $pk = $defaultPk;
if (!$sequence) $sequence = $defaultSequence;
}
if ($pk) {
if ($sequence) {
$quotedSequence = $this->quoteColumnName($sequence);
$quotedTable = $this->quoteTableName($table);
$quotedPk = $this->quoteColumnName($pk);
$sql = "SELECT setval('$quotedSequence', (SELECT COALESCE(MAX($quotedPk)+(SELECT increment_by FROM $quotedSequence), (SELECT min_value FROM $quotedSequence)) FROM $quotedTable), false)";
$this->selectValue($sql, 'Reset sequence');
} else {
/*@TODO logger.warn "$table has primary key $pk with no default sequence" if logger*/
}
}
}
/**
* Returns a table's primary key and belonging sequence.
*/
public function pkAndSequenceFor($table)
{
// First try looking for a sequence with a dependency on the
// given table's primary key.
$sql = "
SELECT attr.attname, seq.relname
FROM pg_class seq,
pg_attribute attr,
pg_depend dep,
pg_namespace name,
pg_constraint cons
WHERE seq.oid = dep.objid
AND seq.relkind = 'S'
AND attr.attrelid = dep.refobjid
AND attr.attnum = dep.refobjsubid
AND attr.attrelid = cons.conrelid
AND attr.attnum = cons.conkey[1]
AND cons.contype = 'p'
AND dep.refobjid = '$table'::regclass";
$result = $this->selectOne($sql, 'PK and serial sequence');
if (!$result) {
// If that fails, try parsing the primary key's default value.
// Support the 7.x and 8.0 nextval('foo'::text) as well as
// the 8.1+ nextval('foo'::regclass).
$sql = "
SELECT attr.attname,
CASE
WHEN split_part(def.adsrc, '''', 2) ~ '.' THEN
substr(split_part(def.adsrc, '''', 2),
strpos(split_part(def.adsrc, '''', 2), '.')+1)
ELSE split_part(def.adsrc, '''', 2)
END
FROM pg_class t
JOIN pg_attribute attr ON (t.oid = attrelid)
JOIN pg_attrdef def ON (adrelid = attrelid AND adnum = attnum)
JOIN pg_constraint cons ON (conrelid = adrelid AND adnum = conkey[1])
WHERE t.oid = '$table'::regclass
AND cons.contype = 'p'
AND def.adsrc ~* 'nextval'";
$result = $this->selectOne($sql, 'PK and custom sequence');
}
// [primary_key, sequence]
return array($result[0], $result[1]);
}
/**
* Renames a table.
*/
public function renameTable($name, $newName)
{
$this->_clearTableCache($name);
return $this->execute('ALTER TABLE ' . $this->quoteTableName($name) . ' RENAME TO ' . $this->quoteTableName($newName));
}
/**
* Adds a new column to the named table.
* See TableDefinition#column for details of the options you can use.
*/
public function addColumn($tableName, $columnName, $type, $options = array())
{
$this->_clearTableCache($tableName);
$limit = isset($options['limit']) ? $options['limit'] : null;
$precision = isset($options['precision']) ? $options['precision'] : null;
$scale = isset($options['scale']) ? $options['scale'] : null;
// Add the column.
$this->execute('ALTER TABLE '.$this->quoteTableName($tableName).' ADD COLUMN '.$this->quoteColumnName($columnName).' '.$this->typeToSql($type, $limit, $precision, $scale));
$default = isset($options['default']) ? $options['default'] : null;
$notnull = isset($options['null']) && $options['null'] === false;
if (array_key_exists('default', $options))
$this->changeColumnDefault($tableName, $columnName, $fault);
if ($notnull)
$this->changeColumnNull($tableName, $columnName, false, $default);
}
/**
* Changes the column of a table.
*/
public function changeColumn($tableName, $columnName, $type, $options = array())
{
$this->_clearTableCache($tableName);
$limit = isset($options['limit']) ? $options['limit'] : null;
$precision = isset($options['precision']) ? $options['precision'] : null;
$scale = isset($options['scale']) ? $options['scale'] : null;
$quotedTableName = $this->quoteTableName($tableName);
try {
$this->execute('ALTER TABLE '.$quotedTableName.' ALTER COLUMN '.$this->quoteColumnName($columnName).' TYPE '.$this->typeToSql($type, $limit, $precision, $scale));
} catch (Horde_Db_Exception $e) {
// This is PostgreSQL 7.x, or the old type could not be coerced to
// the new type, so we have to use a more arcane way of doing it.
try {
// Booleans can't always be cast to other data types; do extra
// work to handle them.
$oldType = null;
$columns = $this->columns($tableName);
foreach ($this->columns($tableName) as $column) {
if ($column->getName() == $columnName) {
$oldType = $column->getType();
break;
}
}
if ($oldType === null)
throw new Horde_Db_Exception("$tableName does not have a column '$columnName'");
$this->beginDbTransaction();
$tmpColumnName = $columnName.'_change_tmp';
$this->addColumn($tableName, $tmpColumnName, $type, $options);
if ($oldType == 'boolean') {
$this->execute('UPDATE '.$quotedTableName.' SET '.$this->quoteColumnName($tmpColumnName).' = CAST(CASE WHEN '.$this->quoteColumnName($columnName).' IS TRUE THEN 1 ELSE 0 END AS '.$this->typeToSql($type, $limit, $precision, $scale).')');
} else {
$this->execute('UPDATE '.$quotedTableName.' SET '.$this->quoteColumnName($tmpColumnName).' = CAST('.$this->quoteColumnName($columnName).' AS '.$this->typeToSql($type, $limit, $precision, $scale).')');
}
$this->removeColumn($tableName, $columnName);
$this->renameColumn($tableName, $tmpColumnName, $columnName);
$this->commitDbTransaction();
} catch (Horde_Db_Exception $e) {
$this->rollbackDbTransaction();
throw $e;
}
}
$default = isset($options['default']) ? $options['default'] : null;
if (array_key_exists('default', $options))
$this->changeColumnDefault($tableName, $columnName, $default);
if (array_key_exists('null', $options))
$this->changeColumnNull($tableName, $columnName, $options['null'], $default);
}
/**
* Changes the default value of a table column.
*/
public function changeColumnDefault($tableName, $columnName, $default)
{
$this->_clearTableCache($tableName);
return $this->execute('ALTER TABLE '.$this->quoteTableName($tableName). ' ALTER COLUMN '.$this->quoteColumnName($columnName).' SET DEFAULT '.$this->quote($default));
}
public function changeColumnNull($tableName, $columnName, $null, $default = null)
{
$this->_clearTableCache($tableName);
if (!($null || is_null($default))) {
$this->execute('UPDATE '.$this->quoteTableName($tableName).' SET '.$this->quoteColumnName($columName).' = '.$this->quote($default).' WHERE '.$this->quoteColumnName($columnName).' IS NULL');
}
return $this->execute('ALTER TABLE '.$this->quoteTableName($tableName).' ALTER '.$this->quoteColumnName($columnName).' '.($null ? 'DROP' : 'SET').' NOT NULL');
}
/**
* Renames a column in a table.
*/
public function renameColumn($tableName, $columnName, $newColumnName)
{
$this->_clearTableCache($tableName);
return $this->execute('ALTER TABLE '.$this->quoteTableName($tableName).' RENAME COLUMN '.$this->quoteColumnName($columnName).' TO '.$this->quoteColumnName($newColumnName));
}
/**
* Drops an index from a table.
*/
public function removeIndex($tableName, $options = array())
{
$this->_clearTableCache($tableName);
return $this->execute('DROP INDEX '.$this->indexName($tableName, $options));
}
/**
* Maps logical Rails types to PostgreSQL-specific data types.
*/
public function typeToSql($type, $limit = null, $precision = null, $scale = null)
{
if ($type != 'integer') return parent::typeToSql($type, $limit, $precision, $scale);
switch ($limit) {
case 1:
case 2:
return 'smallint';
case 3:
case 4:
case null:
return 'integer';
case 5:
case 6:
case 7:
case 8:
return 'bigint';
default:
throw new Horde_Db_Exception("No integer type has byte size $limit. Use a numeric with precision 0 instead.");
}
}
/**
* Returns a SELECT DISTINCT clause for a given set of columns and a given ORDER BY clause.
*
* PostgreSQL requires the ORDER BY columns in the select list for distinct queries, and
* requires that the ORDER BY include the distinct column.
*
* distinct("posts.id", "posts.created_at desc")
*/
public function distinct($columns, $orderBy = null)
{
if (empty($orderBy)) {
return "DISTINCT $columns";
}
// Construct a clean list of column names from the ORDER BY clause, removing
// any ASC/DESC modifiers
$orderColumns = array();
foreach (preg_split('/\s*,\s*/', $orderBy, -1, PREG_SPLIT_NO_EMPTY) as $orderByClause) {
$orderColumns[] = current(preg_split('/\s+/', $orderByClause, -1, PREG_SPLIT_NO_EMPTY)) . ' AS alias_' . count($orderColumns);
}
// Return a DISTINCT ON() clause that's distinct on the columns we want but includes
// all the required columns for the ORDER BY to work properly.
return 'DISTINCT ON ('.$colummns.') '.$columns.', '.implode(', ', $orderColumns);
}
/**
* Returns an ORDER BY clause for the passed order option.
*
* PostgreSQL does not allow arbitrary ordering when using DISTINCT ON, so we work around this
* by wrapping the +sql+ string as a sub-select and ordering in that query.
*/
public function addOrderByForAssociationLimiting($sql, $options)
{
if (empty($options['order'])) return $sql;
$order = array();
foreach (preg_split('/\s*,\s*/', $options['order'], -1, PREG_SPLIT_NO_EMPTY) as $s) {
if (preg_match('/\bdesc$/i', $s)) $s = 'DESC';
$order[] = 'id_list.alias_'.count($order).' '.$s;
}
$order = implode(', ', $order);
return "SELECT * FROM ($sql) AS id_list ORDER BY $order";
}
/**
* Returns the list of a table's column names, data types, and default values.
*
* The underlying query is roughly:
* SELECT column.name, column.type, default.value
* FROM column LEFT JOIN default
* ON column.table_id = default.table_id
* AND column.num = default.column_num
* WHERE column.table_id = get_table_id('table_name')
* AND column.num > 0
* AND NOT column.is_dropped
* ORDER BY column.num
*
* If the table name is not prefixed with a schema, the database will
* take the first match from the schema search path.
*
* Query implementation notes:
* - format_type includes the column size constraint, e.g. varchar(50)
* - ::regclass is a function that gives the id for a table name
*/
public function columnDefinitions($tableName, $name = null)
{
return $this->selectAll('
SELECT a.attname, format_type(a.atttypid, a.atttypmod), d.adsrc, a.attnotnull
FROM pg_attribute a LEFT JOIN pg_attrdef d
ON a.attrelid = d.adrelid AND a.attnum = d.adnum
WHERE a.attrelid = '.$this->quote($tableName).'::regclass
AND a.attnum > 0 AND NOT a.attisdropped
ORDER BY a.attnum', $name);
}
/**
* Returns the version of the connected PostgreSQL version.
*/
public function postgresqlVersion()
{
try {
$version = $this->selectValue('SELECT version()');
if (preg_match('/PostgreSQL (\d+)\.(\d+)\.(\d+)/', $version, $matches))
return ($matches[1] * 10000) . ($matches[2] * 100) . $matches[3];
} catch (Exception $e) {}
return 0;
}
}