From 7c553901d7ab010ad1cb5bf44f9ca2fdae83909a Mon Sep 17 00:00:00 2001
From: Dave Cramer <davec@fastcrypt.com>
Date: Tue, 1 Oct 2002 00:39:02 +0000
Subject: [PATCH] schema awareness patch provided by Kris Jurka

---
 .../jdbc1/AbstractJdbc1Connection.java        |   43 +-
 .../jdbc1/AbstractJdbc1DatabaseMetaData.java  | 1651 +++++++++++------
 .../jdbc1/AbstractJdbc1Statement.java         |    6 +-
 .../largeobject/LargeObjectManager.java       |   27 +-
 .../jdbc/org/postgresql/test/TestUtil.java    |   39 +-
 .../test/jdbc2/DatabaseMetaDataTest.java      |  396 ++--
 .../postgresql/test/jdbc2/Jdbc2TestSuite.java |   11 +-
 .../test/jdbc2/ServerPreparedStmtTest.java    |    6 +-
 .../jdbc/org/postgresql/util/Serialize.java   |   70 +-
 9 files changed, 1353 insertions(+), 896 deletions(-)

diff --git a/src/interfaces/jdbc/org/postgresql/jdbc1/AbstractJdbc1Connection.java b/src/interfaces/jdbc/org/postgresql/jdbc1/AbstractJdbc1Connection.java
index bf4ee76bd74..463cfb2cca2 100644
--- a/src/interfaces/jdbc/org/postgresql/jdbc1/AbstractJdbc1Connection.java
+++ b/src/interfaces/jdbc/org/postgresql/jdbc1/AbstractJdbc1Connection.java
@@ -14,7 +14,7 @@ import org.postgresql.largeobject.LargeObjectManager;
 import org.postgresql.util.*;
 
 
-/* $Header: /cvsroot/pgsql/src/interfaces/jdbc/org/postgresql/jdbc1/Attic/AbstractJdbc1Connection.java,v 1.9 2002/09/11 05:38:44 barry Exp $
+/* $Header: /cvsroot/pgsql/src/interfaces/jdbc/org/postgresql/jdbc1/Attic/AbstractJdbc1Connection.java,v 1.10 2002/10/01 00:39:01 davec Exp $
  * This class defines methods of the jdbc1 specification.  This class is
  * extended by org.postgresql.jdbc2.AbstractJdbc2Connection which adds the jdbc2
  * methods.  The real Connection class (for jdbc1) is org.postgresql.jdbc1.Jdbc1Connection
@@ -1147,6 +1147,11 @@ public abstract class AbstractJdbc1Connection implements org.postgresql.PGConnec
 		return dbVersionNumber;
 	}
 
+	/**
+	 * Is the server we are connected to running at least this version?
+	 * This comparison method will fail whenever a major or minor version
+	 * goes to two digits (10.3.0) or (7.10.1).
+	 */
 	public boolean haveMinimumServerVersion(String ver) throws SQLException
 	{
 		return (getDBVersionNumber().compareTo(ver) >= 0);
@@ -1184,16 +1189,29 @@ public abstract class AbstractJdbc1Connection implements org.postgresql.PGConnec
 		// it's not in the cache, so perform a query, and add the result to the cache
 		if (sqlType == null)
 		{
-			ResultSet result = ExecSQL("select typname from pg_type where oid = " + oid);
-			if (((AbstractJdbc1ResultSet)result).getColumnCount() != 1 || ((AbstractJdbc1ResultSet)result).getTupleCount() != 1)
-				throw new PSQLException("postgresql.unexpected");
-			result.next();
-			String pgType = result.getString(1);
+			String pgType;
+			// The opaque type does not exist in the system catalogs.
+			if (oid == 0) {
+				pgType = "opaque";
+			} else {
+				String sql;
+				if (haveMinimumServerVersion("7.3")) {
+					sql = "SELECT typname FROM pg_catalog.pg_type WHERE oid = " +oid;
+				} else {
+					sql = "SELECT typname FROM pg_type WHERE oid = " +oid;
+				}
+				ResultSet result = ExecSQL(sql);
+				if (((AbstractJdbc1ResultSet)result).getColumnCount() != 1 || ((AbstractJdbc1ResultSet)result).getTupleCount() != 1) {
+					throw new PSQLException("postgresql.unexpected");
+				}
+				result.next();
+				pgType = result.getString(1);
+				result.close();
+			}
 			Integer iOid = new Integer(oid);
-			sqlType = new Integer(getSQLType(result.getString(1)));
+			sqlType = new Integer(getSQLType(pgType));
 			sqlTypeCache.put(iOid, sqlType);
 			pgTypeCache.put(iOid, pgType);
-			result.close();
 		}
 
 		return sqlType.intValue();
@@ -1217,8 +1235,13 @@ public abstract class AbstractJdbc1Connection implements org.postgresql.PGConnec
 			else
 			{
 				// it's not in the cache, so perform a query, and add the result to the cache
-				ResultSet result = ExecSQL("select oid from pg_type where typname='"
-										   + typeName + "'");
+				String sql;
+				if (haveMinimumServerVersion("7.3")) {
+					sql = "SELECT oid FROM pg_catalog.pg_type WHERE typname='" + typeName + "'";
+				} else {
+					sql = "SELECT oid FROM pg_type WHERE typname='" + typeName + "'";
+				}
+				ResultSet result = ExecSQL(sql);
 				if (((AbstractJdbc1ResultSet)result).getColumnCount() != 1 || ((AbstractJdbc1ResultSet)result).getTupleCount() != 1)
 					throw new PSQLException("postgresql.unexpected");
 				result.next();
diff --git a/src/interfaces/jdbc/org/postgresql/jdbc1/AbstractJdbc1DatabaseMetaData.java b/src/interfaces/jdbc/org/postgresql/jdbc1/AbstractJdbc1DatabaseMetaData.java
index cdf6e8073ec..8e5638fbcbb 100644
--- a/src/interfaces/jdbc/org/postgresql/jdbc1/AbstractJdbc1DatabaseMetaData.java
+++ b/src/interfaces/jdbc/org/postgresql/jdbc1/AbstractJdbc1DatabaseMetaData.java
@@ -27,29 +27,50 @@ public abstract class AbstractJdbc1DatabaseMetaData
 	protected static final int iInt2Oid = 21; // OID for int2
 	protected static final int iInt4Oid = 23; // OID for int4
 	protected static final int VARHDRSZ = 4;	// length for int4
-	protected static int NAME_SIZE = 63;	// length for name datatype
 
-	public AbstractJdbc1DatabaseMetaData(AbstractJdbc1Connection conn)
-	{
-		this.connection = conn;
-		String sql;
-		try {
+	private int NAMEDATALEN = 0;	// length for name datatype
+	private int INDEX_MAX_KEYS = 0; // maximum number of keys in an index.
+
+	protected int getMaxIndexKeys() throws SQLException {
+		if (INDEX_MAX_KEYS == 0) {
+			String from;
+			if (connection.haveMinimumServerVersion("7.3")) {
+				from = "pg_catalog.pg_namespace n, pg_catalog.pg_type t1, pg_catalog.pg_type t2 WHERE t1.typnamespace=n.oid AND n.nspname='pg_catalog' AND ";
+			} else {
+				from = "pg_type t1, pg_type t2 WHERE ";
+			}
+			String sql = "SELECT t1.typlen/t2.typlen FROM "+from+" t1.typelem=t2.oid AND t1.typname='oidvector'";
+			ResultSet rs = connection.createStatement().executeQuery(sql);
+			if (!rs.next()) {
+				throw new PSQLException("postgresql.unexpected");
+			}
+			INDEX_MAX_KEYS = rs.getInt(1);
+			rs.close();
+		}
+		return INDEX_MAX_KEYS;
+	}
+
+	protected int getMaxNameLength() throws SQLException {
+		if (NAMEDATALEN == 0) {
+			String sql;
 			if (connection.haveMinimumServerVersion("7.3")) {
 				sql = "SELECT t.typlen FROM pg_catalog.pg_type t, pg_catalog.pg_namespace n WHERE t.typnamespace=n.oid AND t.typname='name' AND n.nspname='pg_catalog'";
-				NAME_SIZE = 63;
 			} else {
 				sql = "SELECT typlen FROM pg_type WHERE typname='name'";
-				NAME_SIZE = 31;
 			}
 			ResultSet rs = connection.createStatement().executeQuery(sql);
-			if (rs.next()) {
-				NAME_SIZE = rs.getInt("typlen") - 1;
+			if (!rs.next()) {
+				throw new PSQLException("postgresql.unexpected");
 			}
+			NAMEDATALEN = rs.getInt("typlen");
 			rs.close();
-		} catch (SQLException l_se) {
-			// depending on the error the NAME_SIZE value will
-			// be the original or the value set before the query.
 		}
+		return NAMEDATALEN - 1;
+	}
+
+	public AbstractJdbc1DatabaseMetaData(AbstractJdbc1Connection conn)
+	{
+		this.connection = conn;
 	}
 
 	/*
@@ -512,16 +533,16 @@ public abstract class AbstractJdbc1DatabaseMetaData
 
 	/*
 	 * Is "ALTER TABLE" with a drop column supported?
-	 * Peter 10/10/2000 This was set to true, but 7.1devel doesn't support it!
 	 *
 	 * @return true if so
 	 * @exception SQLException if a database access error occurs
 	 */
 	public boolean supportsAlterTableWithDropColumn() throws SQLException
 	{
+		boolean dropColumn = connection.haveMinimumServerVersion("7.3");
 		if (Driver.logDebug)
-			Driver.debug("supportsAlterTableWithDropColumn " + false);
-		return false;
+			Driver.debug("supportsAlterTableWithDropColumn " + dropColumn);
+		return dropColumn;
 	}
 
 	/*
@@ -967,33 +988,35 @@ public abstract class AbstractJdbc1DatabaseMetaData
 	}
 
 	/*
-	 * Can a schema name be used in a data manipulation statement?	Nope.
+	 * Can a schema name be used in a data manipulation statement?
 	 *
 	 * @return true if so
 	 * @exception SQLException if a database access error occurs
 	 */
 	public boolean supportsSchemasInDataManipulation() throws SQLException
 	{
+		boolean schemas = connection.haveMinimumServerVersion("7.3");
 		if (Driver.logDebug)
-			Driver.debug("supportsSchemasInDataManipulation false");
-		return false;
+			Driver.debug("supportsSchemasInDataManipulation "+schemas);
+		return schemas;
 	}
 
 	/*
-	 * Can a schema name be used in a procedure call statement?  Nope.
+	 * Can a schema name be used in a procedure call statement?
 	 *
 	 * @return true if so
 	 * @exception SQLException if a database access error occurs
 	 */
 	public boolean supportsSchemasInProcedureCalls() throws SQLException
 	{
+		boolean schemas = connection.haveMinimumServerVersion("7.3");
 		if (Driver.logDebug)
-			Driver.debug("supportsSchemasInProcedureCalls false");
-		return false;
+			Driver.debug("supportsSchemasInProcedureCalls "+schemas);
+		return schemas;
 	}
 
 	/*
-	 * Can a schema be used in a table definition statement?  Nope.
+	 * Can a schema be used in a table definition statement?
 	 *
 	 * @return true if so
 	 * @exception SQLException if a database access error occurs
@@ -1015,9 +1038,10 @@ public abstract class AbstractJdbc1DatabaseMetaData
 	 */
 	public boolean supportsSchemasInIndexDefinitions() throws SQLException
 	{
+		boolean schemas = connection.haveMinimumServerVersion("7.3");
 		if (Driver.logDebug)
-			Driver.debug("supportsSchemasInIndexDefinitions false");
-		return false;
+			Driver.debug("supportsSchemasInIndexDefinitions "+schemas);
+		return schemas;
 	}
 
 	/*
@@ -1028,9 +1052,10 @@ public abstract class AbstractJdbc1DatabaseMetaData
 	 */
 	public boolean supportsSchemasInPrivilegeDefinitions() throws SQLException
 	{
+		boolean schemas = connection.haveMinimumServerVersion("7.3");
 		if (Driver.logDebug)
-			Driver.debug("supportsSchemasInPrivilegeDefinitions false");
-		return false;
+			Driver.debug("supportsSchemasInPrivilegeDefinitions "+schemas);
+		return schemas;
 	}
 
 	/*
@@ -1316,7 +1341,7 @@ public abstract class AbstractJdbc1DatabaseMetaData
 	 */
 	public int getMaxColumnNameLength() throws SQLException
 	{
-		return NAME_SIZE;
+		return getMaxNameLength();
 	}
 
 	/*
@@ -1332,15 +1357,13 @@ public abstract class AbstractJdbc1DatabaseMetaData
 
 	/*
 	 * What's the maximum number of columns allowed in an index?
-	 * 6.0 only allowed one column, but 6.1 introduced multi-column
-	 * indices, so, theoretically, its all of them.
 	 *
 	 * @return max number of columns
 	 * @exception SQLException if a database access error occurs
 	 */
 	public int getMaxColumnsInIndex() throws SQLException
 	{
-		return getMaxColumnsInTable();
+		return getMaxIndexKeys();
 	}
 
 	/*
@@ -1407,7 +1430,7 @@ public abstract class AbstractJdbc1DatabaseMetaData
 	 */
 	public int getMaxCursorNameLength() throws SQLException
 	{
-		return NAME_SIZE;
+		return getMaxNameLength();
 	}
 
 	/*
@@ -1426,8 +1449,7 @@ public abstract class AbstractJdbc1DatabaseMetaData
 
 	public int getMaxSchemaNameLength() throws SQLException
 	{
-		// XXX-Not Implemented
-		return 0;
+		return getMaxNameLength();
 	}
 
 	/*
@@ -1438,13 +1460,12 @@ public abstract class AbstractJdbc1DatabaseMetaData
 	 */
 	public int getMaxProcedureNameLength() throws SQLException
 	{
-		return NAME_SIZE;
+		return getMaxNameLength();
 	}
 
 	public int getMaxCatalogNameLength() throws SQLException
 	{
-		// XXX-Not Implemented
-		return 0;
+		return getMaxNameLength();
 	}
 
 	/*
@@ -1511,7 +1532,7 @@ public abstract class AbstractJdbc1DatabaseMetaData
 	 */
 	public int getMaxTableNameLength() throws SQLException
 	{
-		return NAME_SIZE;
+		return getMaxNameLength();
 	}
 
 	/*
@@ -1534,7 +1555,7 @@ public abstract class AbstractJdbc1DatabaseMetaData
 	 */
 	public int getMaxUserNameLength() throws SQLException
 	{
-		return NAME_SIZE;
+		return getMaxNameLength();
 	}
 
 
@@ -1632,14 +1653,33 @@ public abstract class AbstractJdbc1DatabaseMetaData
 
 	/*
 	 * Is a data definition statement within a transaction ignored?
-	 * It seems to be (from experiment in previous method)
 	 *
 	 * @return true if so
 	 * @exception SQLException if a database access error occurs
 	 */
 	public boolean dataDefinitionIgnoredInTransactions() throws SQLException
 	{
-		return true;
+		return false;
+	}
+
+	/**
+	 * Escape single quotes with another single quote.
+	 */
+	protected static String escapeQuotes(String s) {
+		StringBuffer sb = new StringBuffer();
+		int length = s.length();
+		char prevChar = ' ';
+		char prevPrevChar = ' ';
+		for (int i=0; i<length; i++) {
+			char c = s.charAt(i);
+			sb.append(c);
+			if (c == '\'' && (prevChar != '\\' || (prevChar == '\\' && prevPrevChar == '\\'))) {
+				sb.append("'");
+			}
+			prevPrevChar = prevChar;
+			prevChar = c;
+		}
+		return sb.toString();
 	}
 
 	/*
@@ -1676,42 +1716,39 @@ public abstract class AbstractJdbc1DatabaseMetaData
 	 */
 	public java.sql.ResultSet getProcedures(String catalog, String schemaPattern, String procedureNamePattern) throws SQLException
 	{
-		// the field descriptors for the new ResultSet
-		Field f[] = new Field[8];
-		java.sql.ResultSet r;	// ResultSet for the SQL query that we need to do
-		Vector v = new Vector();		// The new ResultSet tuple stuff
-
-		f[0] = new Field(connection, "PROCEDURE_CAT", iVarcharOid, NAME_SIZE);
-		f[1] = new Field(connection, "PROCEDURE_SCHEM", iVarcharOid, NAME_SIZE);
-		f[2] = new Field(connection, "PROCEDURE_NAME", iVarcharOid, NAME_SIZE);
-		f[3] = f[4] = f[5] = new Field(connection, "reserved", iVarcharOid, NAME_SIZE);	// null;	// reserved, must be null for now
-		f[6] = new Field(connection, "REMARKS", iVarcharOid, 8192);
-		f[7] = new Field(connection, "PROCEDURE_TYPE", iInt2Oid, 2);
-
-		// If the pattern is null, then set it to the default
-		if (procedureNamePattern == null)
-			procedureNamePattern = "%";
-
-		r = connection.ExecSQL("select proname, proretset from pg_proc where proname like '" + procedureNamePattern.toLowerCase() + "' order by proname");
-
-		while (r.next())
-		{
-			byte[][] tuple = new byte[8][0];
-
-			tuple[0] = null;			// Catalog name
-			tuple[1] = null;			// Schema name
-			tuple[2] = r.getBytes(1);		// Procedure name
-			tuple[3] = tuple[4] = tuple[5] = null;	// Reserved
-			tuple[6] = null;			// Remarks
-
-			if (r.getBoolean(2))
-				tuple[7] = Integer.toString(java.sql.DatabaseMetaData.procedureReturnsResult).getBytes();
-			else
-				tuple[7] = Integer.toString(java.sql.DatabaseMetaData.procedureNoResult).getBytes();
-
-			v.addElement(tuple);
+		String sql;
+		if (connection.haveMinimumServerVersion("7.3")) {
+			sql = "SELECT NULL AS PROCEDURE_CAT, n.nspname AS PROCEDURE_SCHEM, p.proname AS PROCEDURE_NAME, NULL, NULL, NULL, d.description AS REMARKS, "+java.sql.DatabaseMetaData.procedureReturnsResult+" AS PROCEDURE_TYPE "+
+				" FROM pg_catalog.pg_namespace n, pg_catalog.pg_proc p "+
+				" LEFT JOIN pg_catalog.pg_description d ON (p.oid=d.objoid) "+
+				" LEFT JOIN pg_catalog.pg_class c ON (d.classoid=c.oid AND c.relname='pg_proc') "+
+				" LEFT JOIN pg_catalog.pg_namespace pn ON (c.relnamespace=pn.oid AND pn.nspname='pg_catalog') "+
+				" WHERE p.pronamespace=n.oid ";
+				if (schemaPattern != null && !"".equals(schemaPattern)) {
+					sql += " AND n.nspname LIKE '"+escapeQuotes(schemaPattern.toLowerCase())+"' ";
+				}
+				if (procedureNamePattern != null) {
+					sql += " AND p.proname LIKE '"+escapeQuotes(procedureNamePattern.toLowerCase())+"' ";
+				}
+				sql += " ORDER BY PROCEDURE_SCHEM, PROCEDURE_NAME ";
+		} else if (connection.haveMinimumServerVersion("7.1")) {
+			sql = "SELECT NULL AS PROCEDURE_CAT, NULL AS PROCEDURE_SCHEM, p.proname AS PROCEDURE_NAME, NULL, NULL, NULL, d.description AS REMARKS, "+java.sql.DatabaseMetaData.procedureReturnsResult+" AS PROCEDURE_TYPE "+
+				" FROM pg_proc p "+
+				" LEFT JOIN pg_description d ON (p.oid=d.objoid) "+
+				" LEFT JOIN pg_class c ON (d.classoid=c.oid AND c.relname='pg_proc') ";
+				if (procedureNamePattern != null) {
+					sql += " WHERE p.proname LIKE '"+escapeQuotes(procedureNamePattern.toLowerCase())+"' ";
+				}
+				sql += " ORDER BY PROCEDURE_NAME ";
+		} else {
+			sql = "SELECT NULL AS PROCEDURE_CAT, NULL AS PROCEDURE_SCHEM, p.proname AS PROCEDURE_NAME, NULL, NULL, NULL, NULL AS REMARKS, "+java.sql.DatabaseMetaData.procedureReturnsResult+" AS PROCEDURE_TYPE "+
+				" FROM pg_proc p ";
+				if (procedureNamePattern != null) {
+					sql += " WHERE p.proname LIKE '"+escapeQuotes(procedureNamePattern.toLowerCase())+"' ";
+				}
+				sql += " ORDER BY PROCEDURE_NAME ";
 		}
-		return connection.getResultSet(null, f, v, "OK", 1);
+		return connection.createStatement().executeQuery(sql);
 	}
 
 	/*
@@ -1740,7 +1777,7 @@ public abstract class AbstractJdbc1DatabaseMetaData
 	 * <li>procedureColumnResult - result column in ResultSet
 	 * </ul>
 	 * <li><b>DATA_TYPE</b> short => SQL type from java.sql.Types
-	 * <li><b>TYPE_NAME</b> String => SQL type name
+	 * <li><b>TYPE_NAME</b> String => Data source specific type name
 	 * <li><b>PRECISION</b> int => precision
 	 * <li><b>LENGTH</b> int => length in bytes of data
 	 * <li><b>SCALE</b> short => scale
@@ -1752,9 +1789,9 @@ public abstract class AbstractJdbc1DatabaseMetaData
 	 * <li><b>REMARKS</b> String => comment describing parameter/column
 	 * </ol>
 	 * @param catalog This is ignored in org.postgresql, advise this is set to null
-	 * @param schemaPattern This is ignored in org.postgresql, advise this is set to null
+	 * @param schemaPattern
 	 * @param procedureNamePattern a procedure name pattern
-	 * @param columnNamePattern a column name pattern
+	 * @param columnNamePattern a column name pattern, this is currently ignored because postgresql does not name procedure parameters.
 	 * @return each row is a stored procedure parameter or column description
 	 * @exception SQLException if a database-access error occurs
 	 * @see #getSearchStringEscape
@@ -1762,32 +1799,123 @@ public abstract class AbstractJdbc1DatabaseMetaData
 	// Implementation note: This is required for Borland's JBuilder to work
 	public java.sql.ResultSet getProcedureColumns(String catalog, String schemaPattern, String procedureNamePattern, String columnNamePattern) throws SQLException
 	{
-		if (procedureNamePattern == null)
-			procedureNamePattern = "%";
-
-		if (columnNamePattern == null)
-			columnNamePattern = "%";
-
-		// for now, this returns an empty result set.
 		Field f[] = new Field[13];
-		ResultSet r;	// ResultSet for the SQL query that we need to do
 		Vector v = new Vector();		// The new ResultSet tuple stuff
 
-		f[0] = new Field(connection, "PROCEDURE_CAT", iVarcharOid, NAME_SIZE);
-		f[1] = new Field(connection, "PROCEDURE_SCHEM", iVarcharOid, NAME_SIZE);
-		f[2] = new Field(connection, "PROCEDURE_NAME", iVarcharOid, NAME_SIZE);
-		f[3] = new Field(connection, "COLUMN_NAME", iVarcharOid, NAME_SIZE);
+		f[0] = new Field(connection, "PROCEDURE_CAT", iVarcharOid, getMaxNameLength());
+		f[1] = new Field(connection, "PROCEDURE_SCHEM", iVarcharOid, getMaxNameLength());
+		f[2] = new Field(connection, "PROCEDURE_NAME", iVarcharOid, getMaxNameLength());
+		f[3] = new Field(connection, "COLUMN_NAME", iVarcharOid, getMaxNameLength());
 		f[4] = new Field(connection, "COLUMN_TYPE", iInt2Oid, 2);
 		f[5] = new Field(connection, "DATA_TYPE", iInt2Oid, 2);
-		f[6] = new Field(connection, "TYPE_NAME", iVarcharOid, NAME_SIZE);
+		f[6] = new Field(connection, "TYPE_NAME", iVarcharOid, getMaxNameLength());
 		f[7] = new Field(connection, "PRECISION", iInt4Oid, 4);
 		f[8] = new Field(connection, "LENGTH", iInt4Oid, 4);
 		f[9] = new Field(connection, "SCALE", iInt2Oid, 2);
 		f[10] = new Field(connection, "RADIX", iInt2Oid, 2);
 		f[11] = new Field(connection, "NULLABLE", iInt2Oid, 2);
-		f[12] = new Field(connection, "REMARKS", iVarcharOid, NAME_SIZE);
+		f[12] = new Field(connection, "REMARKS", iVarcharOid, getMaxNameLength());
+
+		String sql;
+		if (connection.haveMinimumServerVersion("7.3")) {
+			sql = "SELECT n.nspname,p.proname,p.prorettype,p.proargtypes, t.typtype,t.typrelid "+
+				" FROM pg_catalog.pg_proc p,pg_catalog.pg_namespace n, pg_catalog.pg_type t "+
+				" WHERE p.pronamespace=n.oid AND p.prorettype=t.oid ";
+			if (schemaPattern != null && !"".equals(schemaPattern)) {
+				sql += " AND n.nspname LIKE '"+escapeQuotes(schemaPattern.toLowerCase())+"' ";
+			}
+			if (procedureNamePattern != null) {
+				sql += " AND p.proname LIKE '"+escapeQuotes(procedureNamePattern.toLowerCase())+"' ";
+			}
+			sql += " ORDER BY n.nspname, p.proname ";
+		} else {
+			sql = "SELECT NULL AS nspname,p.proname,p.prorettype,p.proargtypes, t.typtype,t.typrelid "+
+				" FROM pg_proc p,pg_type t "+
+				" WHERE p.prorettype=t.oid ";
+			if (procedureNamePattern != null) {
+				sql += " AND p.proname LIKE '"+escapeQuotes(procedureNamePattern.toLowerCase())+"' ";
+			}
+			sql += " ORDER BY p.proname ";
+		}
+
+		ResultSet rs = connection.createStatement().executeQuery(sql);
+		while (rs.next()) {
+			byte schema[] = rs.getBytes("nspname");
+			byte procedureName[] = rs.getBytes("proname");
+			int returnType = rs.getInt("prorettype");
+			String returnTypeType = rs.getString("typtype");
+			int returnTypeRelid = rs.getInt("typrelid");
+			String strArgTypes = rs.getString("proargtypes");
+			StringTokenizer st = new StringTokenizer(strArgTypes);
+			Vector argTypes = new Vector();
+			while (st.hasMoreTokens()) {
+				argTypes.addElement(new Integer(st.nextToken()));
+			}
+
+			// decide if we are returning a single column result.
+			if (!returnTypeType.equals("c")) {
+				byte[][] tuple = new byte[13][0];
+				tuple[0] = null;
+				tuple[1] = schema;
+				tuple[2] = procedureName;
+				tuple[3] = "returnValue".getBytes();
+				tuple[4] = Integer.toString(java.sql.DatabaseMetaData.procedureColumnReturn).getBytes();
+				tuple[5] = Integer.toString(connection.getSQLType(returnType)).getBytes();
+				tuple[6] = connection.getPGType(returnType).getBytes();
+				tuple[7] = null;
+				tuple[8] = null;
+				tuple[9] = null;
+				tuple[10] = null;
+				tuple[11] = Integer.toString(java.sql.DatabaseMetaData.procedureNullableUnknown).getBytes();
+				tuple[12] = null;
+				v.addElement(tuple);
+			}
+
+			// Add a row for each argument.
+			for (int i=0; i<argTypes.size(); i++) {
+				int argOid = ((Integer)argTypes.elementAt(i)).intValue();
+				byte[][] tuple = new byte[13][0];
+				tuple[0] = null;
+				tuple[1] = schema;
+				tuple[2] = procedureName;
+				tuple[3] = ("$"+(i+1)).getBytes();
+				tuple[4] = Integer.toString(java.sql.DatabaseMetaData.procedureColumnIn).getBytes();
+				tuple[5] = Integer.toString(connection.getSQLType(argOid)).getBytes();
+				tuple[6] = connection.getPGType(argOid).getBytes();
+				tuple[7] = null;
+				tuple[8] = null;
+				tuple[9] = null;
+				tuple[10] = null;
+				tuple[11] = Integer.toString(java.sql.DatabaseMetaData.procedureNullableUnknown).getBytes();
+				tuple[12] = null;
+				v.addElement(tuple);
+			}
 
-		// add query loop here
+			// if we are returning a multi-column result.
+			if (returnTypeType.equals("c")) {
+				String columnsql = "SELECT a.attname,a.atttypid FROM pg_catalog.pg_attribute a WHERE a.attrelid = "+returnTypeRelid+" ORDER BY a.attnum ";
+				ResultSet columnrs = connection.createStatement().executeQuery(columnsql);
+				while (columnrs.next()) {
+					int columnTypeOid = columnrs.getInt("atttypid");
+					byte[][] tuple = new byte[13][0];
+					tuple[0] = null;
+					tuple[1] = schema;
+					tuple[2] = procedureName;
+					tuple[3] = columnrs.getString("attname").getBytes();
+					tuple[4] = Integer.toString(java.sql.DatabaseMetaData.procedureColumnResult).getBytes();
+					tuple[5] = Integer.toString(connection.getSQLType(columnTypeOid)).getBytes();
+					tuple[6] = connection.getPGType(columnTypeOid).getBytes();
+					tuple[7] = null;
+					tuple[8] = null;
+					tuple[9] = null;
+					tuple[10] = null;
+					tuple[11] = Integer.toString(java.sql.DatabaseMetaData.procedureNullableUnknown).getBytes();
+					tuple[12] = null;
+					v.addElement(tuple);
+				}
+			}
+		}
+		rs.close();
 
 		return connection.getResultSet(null, f, v, "OK", 1);
 	}
@@ -1812,12 +1940,14 @@ public abstract class AbstractJdbc1DatabaseMetaData
 	 * </ol>
 	 *
 	 * <p>The valid values for the types parameter are:
-	 * "TABLE", "INDEX", "SEQUENCE", "SYSTEM TABLE" and "SYSTEM INDEX"
+	 * "TABLE", "INDEX", "SEQUENCE", "VIEW",
+	 * "SYSTEM TABLE", "SYSTEM INDEX", "SYSTEM VIEW",
+	 * "SYSTEM TOAST TABLE", "SYSTEM TOAST INDEX",
+	 * "TEMPORARY TABLE", and "TEMPORARY VIEW"
 	 *
 	 * @param catalog a catalog name; For org.postgresql, this is ignored, and
 	 * should be set to null
-	 * @param schemaPattern a schema name pattern; For org.postgresql, this is ignored, and
-	 * should be set to null
+	 * @param schemaPattern a schema name pattern
 	 * @param tableNamePattern a table name pattern. For all tables this should be "%"
 	 * @param types a list of table types to include; null returns
 	 * all types
@@ -1826,127 +1956,173 @@ public abstract class AbstractJdbc1DatabaseMetaData
 	 */
 	public java.sql.ResultSet getTables(String catalog, String schemaPattern, String tableNamePattern, String types[]) throws SQLException
 	{
-		// Handle default value for types
-		if (types == null)
-			types = defaultTableTypes;
-
-		if (tableNamePattern == null)
-			tableNamePattern = "%";
-
-		// the field descriptors for the new ResultSet
-		Field f[] = new Field[5];
-		java.sql.ResultSet r;	// ResultSet for the SQL query that we need to do
-		Vector v = new Vector();		// The new ResultSet tuple stuff
-
-		f[0] = new Field(connection, "TABLE_CAT", iVarcharOid, NAME_SIZE);
-		f[1] = new Field(connection, "TABLE_SCHEM", iVarcharOid, NAME_SIZE);
-		f[2] = new Field(connection, "TABLE_NAME", iVarcharOid, NAME_SIZE);
-		f[3] = new Field(connection, "TABLE_TYPE", iVarcharOid, NAME_SIZE);
-		f[4] = new Field(connection, "REMARKS", iVarcharOid, NAME_SIZE);
-
-		// Now form the query
-		StringBuffer sql = new StringBuffer("select relname,oid,relkind from pg_class where (");
-
-		boolean notFirst = false;
-		for (int i = 0;i < types.length;i++)
-		{
-			for (int j = 0;j < getTableTypes.length;j++)
-				if (getTableTypes[j][0].equals(types[i]))
-				{
-					if (notFirst)
-						sql.append(" or ");
-					sql.append(getTableTypes[j][1]);
-					notFirst = true;
-				}
-		}
-
-		// Added by Stefan Andreasen <stefan@linux.kapow.dk>
-		// Now take the pattern into account
-		sql.append(") and relname like '");
-		sql.append(tableNamePattern.toLowerCase());
-		sql.append("' order by relkind, relname");
-
-		// Now run the query
-		r = connection.ExecSQL(sql.toString());
-
-		while (r.next())
-		{
-			byte[][] tuple = new byte[5][0];
-
-			// Fetch the description for the table (if any)
-			String getDescriptionStatement =
-				connection.haveMinimumServerVersion("7.2") ?
-				"select obj_description(" + r.getInt(2) + ",'pg_class')" :
-				"select description from pg_description where objoid=" + r.getInt(2);
-
-			java.sql.ResultSet dr = connection.ExecSQL(getDescriptionStatement);
-
-			byte remarks[] = null;
-
-			if (((AbstractJdbc1ResultSet)dr).getTupleCount() == 1)
-			{
-				dr.next();
-				remarks = dr.getBytes(1);
+		String select;
+		String orderby;
+		String useSchemas;
+		if (connection.haveMinimumServerVersion("7.3")) {
+			useSchemas = "SCHEMAS";
+			select = "SELECT NULL AS TABLE_CAT, n.nspname AS TABLE_SCHEM, c.relname AS TABLE_NAME, "+
+			" CASE n.nspname LIKE 'pg\\\\_%' "+
+			" WHEN true THEN CASE n.nspname "+
+			"	WHEN 'pg_catalog' THEN CASE c.relkind "+
+			"		WHEN 'r' THEN 'SYSTEM TABLE' "+
+			"		WHEN 'v' THEN 'SYSTEM VIEW' "+
+			"		WHEN 'i' THEN 'SYSTEM INDEX' "+
+			"		ELSE NULL "+
+			"		END "+
+			"	WHEN 'pg_toast' THEN CASE c.relkind "+
+			"		WHEN 'r' THEN 'SYSTEM TOAST TABLE' "+
+			"		WHEN 'i' THEN 'SYSTEM TOAST INDEX' "+
+			"		ELSE NULL "+
+			"		END "+
+			"	ELSE CASE c.relkind "+
+			"		WHEN 'r' THEN 'TEMPORARY TABLE' "+
+			"		WHEN 'i' THEN 'TEMPORARY INDEX' "+
+			"		ELSE NULL "+
+			"		END "+
+			"	END "+
+			" WHEN false THEN CASE c.relkind "+
+			"	WHEN 'r' THEN 'TABLE' "+
+			"	WHEN 'i' THEN 'INDEX' "+
+			"	WHEN 'S' THEN 'SEQUENCE' "+
+			"	WHEN 'v' THEN 'VIEW' "+
+			"	ELSE NULL "+
+			"	END "+
+			" ELSE NULL "+
+			" END "+
+			" AS TABLE_TYPE, d.description AS REMARKS "+
+			" FROM pg_catalog.pg_namespace n, pg_catalog.pg_class c "+
+			" LEFT JOIN pg_catalog.pg_description d ON (c.oid = d.objoid) "+
+			" LEFT JOIN pg_catalog.pg_class dc ON (d.classoid=dc.oid AND dc.relname='pg_class') "+
+			" LEFT JOIN pg_catalog.pg_namespace dn ON (dn.oid=dc.relnamespace AND dn.nspname='pg_catalog') "+
+			" WHERE c.relnamespace = n.oid ";
+			if (schemaPattern != null && !"".equals(schemaPattern)) {
+				select += " AND n.nspname LIKE '"+escapeQuotes(schemaPattern.toLowerCase())+"' ";
 			}
-			dr.close();
-
-			String relKind;
-			switch (r.getBytes(3)[0])
-			{
-				case (byte) 'r':
-					if ( r.getString(1).startsWith("pg_") )
-					{
-						relKind = "SYSTEM TABLE";
-					}
-					else
-					{
-						relKind = "TABLE";
-					}
-					break;
-				case (byte) 'i':
-					relKind = "INDEX";
-					break;
-				case (byte) 'S':
-					relKind = "SEQUENCE";
-					break;
-				case (byte) 'v':
-					relKind = "VIEW";
-					break;
-				default:
-					relKind = null;
+			if (tableNamePattern != null) {
+				select += " AND c.relname LIKE '"+escapeQuotes(tableNamePattern.toLowerCase())+"' ";
 			}
+			orderby = " ORDER BY TABLE_TYPE,TABLE_SCHEM,TABLE_NAME ";
+		} else {
+			useSchemas = "NOSCHEMAS";
+			String tableType =  ""+
+			" CASE c.relname LIKE 'pg\\\\_%' "+
+			" WHEN true THEN CASE c.relname LIKE 'pg\\\\_toast\\\\_%' "+
+			"	WHEN true THEN CASE c.relkind "+
+			"		WHEN 'r' THEN 'SYSTEM TOAST TABLE' "+
+			"		WHEN 'i' THEN 'SYSTEM TOAST INDEX' "+
+			"		ELSE NULL "+
+			"		END "+
+			"	WHEN false THEN CASE c.relname LIKE 'pg\\\\_temp\\\\_%' "+
+			"		WHEN true THEN CASE c.relkind "+
+			"			WHEN 'r' THEN 'TEMPORARY TABLE' "+
+			"			WHEN 'i' THEN 'TEMPORARY INDEX' "+
+			"			ELSE NULL "+
+			"			END "+
+			"		WHEN false THEN CASE c.relkind "+
+			"			WHEN 'r' THEN 'SYSTEM TABLE' "+
+			"			WHEN 'v' THEN 'SYSTEM VIEW' "+
+			"			WHEN 'i' THEN 'SYSTEM INDEX' "+
+			"			ELSE NULL "+
+			"			END "+
+			"		ELSE NULL "+
+			"		END "+
+			"	ELSE NULL "+
+			"	END "+
+			" WHEN false THEN CASE c.relkind "+
+			"	WHEN 'r' THEN 'TABLE' "+
+			"	WHEN 'i' THEN 'INDEX' "+
+			"	WHEN 'S' THEN 'SEQUENCE' "+
+			"	WHEN 'v' THEN 'VIEW' "+
+			"	ELSE NULL "+
+			"	END "+
+			" ELSE NULL "+
+			" END ";
+			orderby = " ORDER BY TABLE_TYPE,TABLE_NAME ";
+			if (connection.haveMinimumServerVersion("7.1")) {
+				select = "SELECT NULL AS TABLE_CAT, NULL AS TABLE_SCHEM, c.relname AS TABLE_NAME, "+tableType+" AS TABLE_TYPE, d.description AS REMARKS "+
+					" FROM pg_class c "+
+					" LEFT JOIN pg_description d ON (c.oid=d.objoid) "+
+					" LEFT JOIN pg_class dc ON (d.classoid = dc.oid AND dc.relname='pg_class') "+
+					" WHERE true ";
+			} else {
+				select = "SELECT NULL AS TABLE_CAT, NULL AS TABLE_SCHEM, c.relname AS TABLE_NAME, "+tableType+" AS TABLE_TYPE, NULL AS REMARKS "+
+					" FROM pg_class c "+
+					" WHERE true ";
+			}
+		}
 
-			tuple[0] = null;		// Catalog name
-			tuple[1] = null;		// Schema name
-			tuple[2] = r.getBytes(1);	// Table name
-			tuple[3] = (relKind == null) ? null : relKind.getBytes();	// Table type
-			tuple[4] = remarks;		// Remarks
-			v.addElement(tuple);
+		if (types == null) {
+			types = defaultTableTypes;
 		}
-		r.close();
-		return connection.getResultSet(null, f, v, "OK", 1);
+		String sql = select;
+		sql += " AND (false ";
+		for (int i=0; i<types.length; i++) {
+			Hashtable clauses = (Hashtable)tableTypeClauses.get(types[i]);
+			if (clauses != null) {
+				String clause = (String)clauses.get(useSchemas);
+				sql += " OR ( "+clause+" ) ";
+			}
+		}
+		sql += ") ";
+		sql += orderby;
+
+		return connection.createStatement().executeQuery(sql);
+	}
+
+	private static final Hashtable tableTypeClauses;
+	static {
+		tableTypeClauses = new Hashtable();
+		Hashtable ht = new Hashtable();
+		tableTypeClauses.put("TABLE",ht);
+		ht.put("SCHEMAS","c.relkind = 'r' AND n.nspname NOT LIKE 'pg\\\\_%'");
+		ht.put("NOSCHEMAS","c.relkind = 'r' AND c.relname NOT LIKE 'pg\\\\_%'");
+		ht = new Hashtable();
+		tableTypeClauses.put("VIEW",ht);
+		ht.put("SCHEMAS","c.relkind = 'v' AND n.nspname <> 'pg_catalog'");
+		ht.put("NOSCHEMAS","c.relkind = 'v' AND c.relname NOT LIKE 'pg\\\\_%'");
+		ht = new Hashtable();
+		tableTypeClauses.put("INDEX",ht);
+		ht.put("SCHEMAS","c.relkind = 'i' AND n.nspname NOT LIKE 'pg\\\\_%'");
+		ht.put("NOSCHEMAS","c.relkind = 'i' AND c.relname NOT LIKE 'pg\\\\_%'");
+		ht = new Hashtable();
+		tableTypeClauses.put("SEQUENCE",ht);
+		ht.put("SCHEMAS","c.relkind = 'S'");
+		ht.put("NOSCHEMAS","c.relkind = 'S'");
+		ht = new Hashtable();
+		tableTypeClauses.put("SYSTEM TABLE",ht);
+		ht.put("SCHEMAS","c.relkind = 'r' AND n.nspname = 'pg_catalog'");
+		ht.put("NOSCHEMAS","c.relkind = r' AND c.relname LIKE 'pg\\\\_%' AND c.relname NOT LIKE 'pg\\\\_toast\\\\_%' AND c.relname NOT LIKE 'pg\\\\_temp\\\\_%'");
+		ht = new Hashtable();
+		tableTypeClauses.put("SYSTEM TOAST TABLE",ht);
+		ht.put("SCHEMAS","c.relkind = 'r' AND n.nspname = 'pg_toast'");
+		ht.put("NOSCHEMAS","c.relkind = r' AND c.relname LIKE 'pg\\\\_toast\\\\_%'");
+		ht = new Hashtable();
+		tableTypeClauses.put("SYSTEM TOAST INDEX",ht);
+		ht.put("SCHEMAS","c.relkind = 'i' AND n.nspname = 'pg_toast'");
+		ht.put("NOSCHEMAS","c.relkind = 'i' AND c.relname LIKE 'pg\\\\_toast\\\\_%'");
+		ht = new Hashtable();
+		tableTypeClauses.put("SYSTEM VIEW",ht);
+		ht.put("SCHEMAS","c.relkind = 'v' AND n.nspname = 'pg_catalog' ");
+		ht.put("NOSCHEMAS","c.relkind = 'v' AND c.relname LIKE 'pg\\\\_%'");
+		ht = new Hashtable();
+		tableTypeClauses.put("SYSTEM INDEX",ht);
+		ht.put("SCHEMAS","c.relkind = 'i' AND n.nspname = 'pg_catalog'");
+		ht.put("NOSCHEMAS","c.relkind = 'v' AND c.relname LIKE 'pg\\\\_%' AND c.relname NOT LIKE 'pg\\\\_toast\\\\_%' AND c.relname NOT LIKE 'pg\\\\_temp\\\\_%'");
+		ht = new Hashtable();
+		tableTypeClauses.put("TEMPORARY TABLE",ht);
+		ht.put("SCHEMAS","c.relkind = 'r' AND n.nspname LIKE 'pg\\\\_temp\\\\_%' ");
+		ht.put("NOSCHEMAS","c.relkind = 'r' AND c.relname LIKE 'pg\\\\_temp\\\\_%' ");
+		ht = new Hashtable();
+		tableTypeClauses.put("TEMPORARY INDEX",ht);
+		ht.put("SCHEMAS","c.relkind = 'i' AND n.nspname LIKE 'pg\\\\_temp\\\\_%' ");
+		ht.put("NOSCHEMAS","c.relkind = 'i' AND c.relname LIKE 'pg\\\\_temp\\\\_%' ");
 	}
 
-	// This array contains the valid values for the types argument
-	// in getTables().
-	//
-	// Each supported type consists of it's name, and the sql where
-	// clause to retrieve that value.
-	//
-	// IMPORTANT: the query must be enclosed in ( )
-	private static final String getTableTypes[][] = {
-				{"TABLE", "(relkind='r' and relhasrules='f' and relname !~ '^pg_' and relname !~ '^xinv')"},
-				{"VIEW", "(relkind='v' and relname !~ '^pg_')"},
-				{"INDEX", "(relkind='i' and relname !~ '^pg_')"},
-				{"SEQUENCE", "(relkind='S' and relname !~ '^pg_')"},
-				{"SYSTEM TABLE", "(relkind='r' and relname ~ '^pg_')"},
-				{"SYSTEM INDEX", "(relkind='i' and relname ~ '^pg_')"}
-			};
-
 	// These are the default tables, used when NULL is passed to getTables
 	// The choice of these provide the same behaviour as psql's \d
 	private static final String defaultTableTypes[] = {
-				"TABLE", "VIEW", "INDEX", "SEQUENCE"
+				"TABLE", "VIEW", "INDEX", "SEQUENCE", "TEMPORARY TABLE"
 			};
 
 	/*
@@ -1963,15 +2139,13 @@ public abstract class AbstractJdbc1DatabaseMetaData
 	 */
 	public java.sql.ResultSet getSchemas() throws SQLException
 	{
-		// We don't use schemas, so we simply return a single schema name "".
-		//
-		Field f[] = new Field[1];
-		Vector v = new Vector();
-		byte[][] tuple = new byte[1][0];
-		f[0] = new Field(connection, "TABLE_SCHEM", iVarcharOid, NAME_SIZE);
-		tuple[0] = "".getBytes();
-		v.addElement(tuple);
-		return connection.getResultSet(null, f, v, "OK", 1);
+		String sql;
+		if (connection.haveMinimumServerVersion("7.3")) {
+			sql = "SELECT nspname AS TABLE_SCHEM FROM pg_catalog.pg_namespace WHERE nspname <> 'pg_toast' AND nspname NOT LIKE 'pg\\\\_temp\\\\_%' ORDER BY TABLE_SCHEM";
+		} else {
+			sql = "SELECT ''::text AS TABLE_SCHEM ORDER BY TABLE_SCHEM";
+		}
+		return connection.createStatement().executeQuery(sql);
 	}
 
 	/*
@@ -1988,7 +2162,13 @@ public abstract class AbstractJdbc1DatabaseMetaData
 	 */
 	public java.sql.ResultSet getCatalogs() throws SQLException
 	{
-		return connection.createStatement().executeQuery("select datname as TABLE_CAT from pg_database;");
+		String sql;
+		if (connection.haveMinimumServerVersion("7.3")) {
+			sql = "SELECT datname AS TABLE_CAT FROM pg_catalog.pg_database ORDER BY TABLE_CAT";
+		} else {
+			sql = "SELECT datname AS TABLE_CAT FROM pg_database ORDER BY TABLE_CAT";
+		}
+		return connection.createStatement().executeQuery(sql);
 	}
 
 	/*
@@ -2007,13 +2187,20 @@ public abstract class AbstractJdbc1DatabaseMetaData
 	 */
 	public java.sql.ResultSet getTableTypes() throws SQLException
 	{
+		String types[] = new String[tableTypeClauses.size()];
+		Enumeration e = tableTypeClauses.keys();
+		int i=0;
+		while (e.hasMoreElements()) {
+			types[i++] = (String)e.nextElement();}
+		sortStringArray(types);
+
 		Field f[] = new Field[1];
 		Vector v = new Vector();
-		f[0] = new Field(connection, new String("TABLE_TYPE"), iVarcharOid, NAME_SIZE);
-		for (int i = 0;i < getTableTypes.length;i++)
+		f[0] = new Field(connection, new String("TABLE_TYPE"), iVarcharOid, getMaxNameLength());
+		for (i=0; i < types.length; i++)
 		{
 			byte[][] tuple = new byte[1][0];
-			tuple[0] = getTableTypes[i][0].getBytes();
+			tuple[0] = types[i].getBytes();
 			v.addElement(tuple);
 		}
 		return connection.getResultSet(null, f, v, "OK", 1);
@@ -2072,145 +2259,111 @@ public abstract class AbstractJdbc1DatabaseMetaData
 		Vector v = new Vector();		// The new ResultSet tuple stuff
 		Field f[] = new Field[18];		// The field descriptors for the new ResultSet
 
-		f[0] = new Field(connection, "TABLE_CAT", iVarcharOid, NAME_SIZE);
-		f[1] = new Field(connection, "TABLE_SCHEM", iVarcharOid, NAME_SIZE);
-		f[2] = new Field(connection, "TABLE_NAME", iVarcharOid, NAME_SIZE);
-		f[3] = new Field(connection, "COLUMN_NAME", iVarcharOid, NAME_SIZE);
+		f[0] = new Field(connection, "TABLE_CAT", iVarcharOid, getMaxNameLength());
+		f[1] = new Field(connection, "TABLE_SCHEM", iVarcharOid, getMaxNameLength());
+		f[2] = new Field(connection, "TABLE_NAME", iVarcharOid, getMaxNameLength());
+		f[3] = new Field(connection, "COLUMN_NAME", iVarcharOid, getMaxNameLength());
 		f[4] = new Field(connection, "DATA_TYPE", iInt2Oid, 2);
-		f[5] = new Field(connection, "TYPE_NAME", iVarcharOid, NAME_SIZE);
+		f[5] = new Field(connection, "TYPE_NAME", iVarcharOid, getMaxNameLength());
 		f[6] = new Field(connection, "COLUMN_SIZE", iInt4Oid, 4);
-		f[7] = new Field(connection, "BUFFER_LENGTH", iVarcharOid, NAME_SIZE);
+		f[7] = new Field(connection, "BUFFER_LENGTH", iVarcharOid, getMaxNameLength());
 		f[8] = new Field(connection, "DECIMAL_DIGITS", iInt4Oid, 4);
 		f[9] = new Field(connection, "NUM_PREC_RADIX", iInt4Oid, 4);
 		f[10] = new Field(connection, "NULLABLE", iInt4Oid, 4);
-		f[11] = new Field(connection, "REMARKS", iVarcharOid, NAME_SIZE);
-		f[12] = new Field(connection, "COLUMN_DEF", iVarcharOid, NAME_SIZE);
+		f[11] = new Field(connection, "REMARKS", iVarcharOid, getMaxNameLength());
+		f[12] = new Field(connection, "COLUMN_DEF", iVarcharOid, getMaxNameLength());
 		f[13] = new Field(connection, "SQL_DATA_TYPE", iInt4Oid, 4);
 		f[14] = new Field(connection, "SQL_DATETIME_SUB", iInt4Oid, 4);
-		f[15] = new Field(connection, "CHAR_OCTET_LENGTH", iVarcharOid, NAME_SIZE);
+		f[15] = new Field(connection, "CHAR_OCTET_LENGTH", iVarcharOid, getMaxNameLength());
 		f[16] = new Field(connection, "ORDINAL_POSITION", iInt4Oid, 4);
-		f[17] = new Field(connection, "IS_NULLABLE", iVarcharOid, NAME_SIZE);
-
-		StringBuffer sql = new StringBuffer(512);
-
-		/* Build a >= 7.1 SQL statement to list all columns */
-		sql.append("select " +
-				   (connection.haveMinimumServerVersion("7.2") ? "a.attrelid, " : "a.oid, ") +
-				   "    c.relname, " +
-				   "    a.attname, " +
-				   "    a.atttypid, " +
-				   "    a.attnum, " +
-				   "    a.attnotnull, " +
-				   "    a.attlen, " +
-				   "    a.atttypmod, " +
-				   "    d.adsrc, " +
-				   "    t.typname, " +
-				   /* Use the new col_description in 7.2 or an additional outer join in 7.1 */
-				   (connection.haveMinimumServerVersion("7.2") ? "col_description(a.attrelid, a.attnum) " : "e.description ") +
-				   "from" +
-				   "    (" +
-				   "        (pg_class c inner join pg_attribute a on" +
-				   "            (" +
-				   "                a.attrelid=c.oid");
-
-		if ((tableNamePattern != null) && ! tableNamePattern.equals("%"))
-		{
-			sql.append("                and c.relname like \'" + tableNamePattern.toLowerCase() + "\'");
-		}
+		f[17] = new Field(connection, "IS_NULLABLE", iVarcharOid, getMaxNameLength());
 
-		if ((columnNamePattern != null) && ! columnNamePattern.equals("%"))
-		{
-			sql.append("                and a.attname like \'" + columnNamePattern.toLowerCase() + "\'");
+		String sql;
+		if (connection.haveMinimumServerVersion("7.3")) {
+			sql = "SELECT n.nspname,c.relname,a.attname,a.atttypid,a.attnotnull,a.atttypmod,a.attlen,a.attnum,def.adsrc,dsc.description "+
+				" FROM pg_catalog.pg_namespace n "+
+				" JOIN pg_catalog.pg_class c ON (c.relnamespace = n.oid) "+
+				" JOIN pg_catalog.pg_attribute a ON (a.attrelid=c.oid) "+
+				" LEFT JOIN pg_catalog.pg_attrdef def ON (a.attrelid=def.adrelid AND a.attnum = def.adnum) "+
+				" LEFT JOIN pg_catalog.pg_description dsc ON (c.oid=dsc.objoid AND a.attnum = dsc.objsubid) "+
+				" LEFT JOIN pg_catalog.pg_class dc ON (dc.oid=dsc.classoid AND dc.relname='pg_class') "+
+				" LEFT JOIN pg_catalog.pg_namespace dn ON (dc.relnamespace=dn.oid AND dn.nspname='pg_catalog') "+
+				" WHERE a.attnum > 0 AND NOT a.attisdropped ";
+			if (schemaPattern != null && !"".equals(schemaPattern)) {
+				sql += " AND n.nspname LIKE '"+escapeQuotes(schemaPattern.toLowerCase())+"' ";
+			}
+		} else if (connection.haveMinimumServerVersion("7.1")) {
+			sql = "SELECT NULL::text AS nspname,c.relname,a.attname,a.atttypid,a.attnotnull,a.atttypmod,a.attlen,a.attnum,def.adsrc,dsc.description "+
+				" FROM pg_class c "+
+				" JOIN pg_attribute a ON (a.attrelid=c.oid) "+
+				" LEFT JOIN pg_attrdef def ON (a.attrelid=def.adrelid AND a.attnum = def.adnum) "+
+				" LEFT JOIN pg_description dsc ON (c.oid=dsc.objoid AND a.attnum = dsc.objsubid) "+
+				" LEFT JOIN pg_class dc ON (dc.oid=dsc.classoid AND dc.relname='pg_class') "+
+				" WHERE a.attnum > 0 ";
+		} else {
+			// if < 7.1 then don't get defaults or descriptions.
+			sql = "SELECT NULL::text AS nspname,c.relname,a.attname,a.atttypid,a.attnotnull,a.atttypmod,a.attlen,a.attnum,NULL AS adsrc,NULL AS description "+
+				" FROM pg_class c, pg_attribute a "+
+				" WHERE a.attrelid=c.oid AND a.attnum > 0 ";
 		}
 
-		sql.append(
-			"                and a.attnum > 0" +
-			"            )" +
-			"        ) inner join pg_type t on" +
-			"            (" +
-			"                t.oid = a.atttypid" +
-			"            )" +
-			"    )" +
-			"    left outer join pg_attrdef d on" +
-			"        (" +
-			"            c.oid = d.adrelid" +
-			"            and a.attnum = d.adnum" +
-			"        ) ");
-
-		if (!connection.haveMinimumServerVersion("7.2"))
-		{
-			/* Only for 7.1 */
-			sql.append(
-				"    left outer join pg_description e on" +
-				"        (" +
-				"            e.objoid = a.oid" +
-				"        ) ");
+		if (tableNamePattern != null && !"".equals(tableNamePattern)) {
+			sql += " AND c.relname LIKE '"+escapeQuotes(tableNamePattern.toLowerCase())+"' ";
 		}
+		if (columnNamePattern != null && !"".equals(columnNamePattern)) {
+			sql += " AND a.attname LIKE '"+escapeQuotes(columnNamePattern.toLowerCase())+"' ";
+		}
+		sql += " ORDER BY nspname,relname,attname ";
 
-		sql.append("order by" +
-				   "    c.relname, a.attnum");
-
-		java.sql.ResultSet r = connection.ExecSQL(sql.toString());
-		while (r.next())
+		ResultSet rs = connection.createStatement().executeQuery(sql);
+		while (rs.next())
 		{
 			byte[][] tuple = new byte[18][0];
+			int typeOid = rs.getInt("atttypid");
 
-			String nullFlag = r.getString(6);
-			String typname = r.getString(10);
-
-			tuple[0] = null;						// Catalog name, not supported
-			tuple[1] = null;						// Schema name, not supported
-			tuple[2] = r.getBytes(2);				// Table name
-			tuple[3] = r.getBytes(3);				// Column name
-			tuple[4] = Integer.toString(connection.getSQLType(typname)).getBytes(); // Data type
-			tuple[5] = typname.getBytes();			// Type name
-
-			// Column size
-			// Looking at the psql source,
-			// I think the length of a varchar as specified when the table was created
-			// should be extracted from atttypmod which contains this length + sizeof(int32)
-			if (typname.equals("bpchar") || typname.equals("varchar"))
+			tuple[0] = null;					// Catalog name, not supported
+			tuple[1] = rs.getBytes("nspname");	// Schema
+			tuple[2] = rs.getBytes("relname");	// Table name
+			tuple[3] = rs.getBytes("attname");	// Column name
+			tuple[4] = Integer.toString(connection.getSQLType(typeOid)).getBytes();
+			String pgType = connection.getPGType(typeOid);
+			tuple[5] = pgType.getBytes();		// Type name
+
+			if (pgType.equals("bpchar") || pgType.equals("varchar"))
 			{
-				int atttypmod = r.getInt(8);
+				int atttypmod = rs.getInt("atttypmod");
 				tuple[6] = Integer.toString(atttypmod != -1 ? atttypmod - VARHDRSZ : 0).getBytes();
 			}
-			else
+			else if (pgType.equals("numeric") || pgType.equals("decimal")) 
 			{
-				tuple[6] = r.getBytes(7);
+				int attypmod = rs.getInt(8) - VARHDRSZ;
+				tuple[6] = Integer.toString( ( attypmod >> 16 ) & 0xffff ).getBytes();
+				tuple[8] = Integer.toString(attypmod & 0xffff).getBytes();
+				tuple[9] = "10".getBytes();
 			}
-
-			tuple[7] = null;						// Buffer length
-			// Decimal digits = scale
-			// From the source (see e.g. backend/utils/adt/format_type.c,
-			// function numeric()) the scale and precision can be calculated
-			// from the typmod value.
-			if (typname.equals("numeric") || typname.equals("decimal"))
-			{
-				int attypmod = r.getInt(8) - VARHDRSZ;
-				tuple[8] =
-					Integer.toString(attypmod & 0xffff).getBytes();
-				tuple[9] =
-					Integer.toString( ( attypmod >> 16 ) & 0xffff ).getBytes();
+			else if (pgType.equals("bit") || pgType.equals("varbit")) {
+				tuple[6] = rs.getBytes("atttypmod");
+				tuple[9] = "2".getBytes();
 			}
-			else
-			{
-				tuple[8] = "0".getBytes();
-				tuple[9] = "10".getBytes();				// Num Prec Radix - assume decimal
+			else {
+				tuple[6] = rs.getBytes("attlen");
+				tuple[9] = "10".getBytes();
 			}
-			tuple[10] = Integer.toString(nullFlag.equals("f") ?
-										 java.sql.DatabaseMetaData.columnNullable :
-										 java.sql.DatabaseMetaData.columnNoNulls).getBytes();	// Nullable
-			tuple[11] = r.getBytes(11);				// Description (if any)
-			tuple[12] = r.getBytes(9);				// Column default
+
+			tuple[7] = null;						// Buffer length
+
+			tuple[10] = Integer.toString(rs.getBoolean("attnotnull") ? java.sql.DatabaseMetaData.columnNoNulls : java.sql.DatabaseMetaData.columnNullable).getBytes();	// Nullable
+			tuple[11] = rs.getBytes("description");				// Description (if any)
+			tuple[12] = rs.getBytes("adsrc");				// Column default
 			tuple[13] = null;						// sql data type (unused)
 			tuple[14] = null;						// sql datetime sub (unused)
 			tuple[15] = tuple[6];					// char octet length
-			tuple[16] = r.getBytes(5);				// ordinal position
-			tuple[17] = (nullFlag.equals("f") ? "YES" : "NO").getBytes();	// Is nullable
+			tuple[16] = rs.getBytes("attnum");		// ordinal position
+			tuple[17] = (rs.getBoolean("attnotnull") ? "NO" : "YES").getBytes();	// Is nullable
 
 			v.addElement(tuple);
 		}
-		r.close();
+		rs.close();
 
 		return connection.getResultSet(null, f, v, "OK", 1);
 	}
@@ -2255,28 +2408,77 @@ public abstract class AbstractJdbc1DatabaseMetaData
 		else
 			columnNamePattern = columnNamePattern.toLowerCase();
 
-		f[0] = new Field(connection, "TABLE_CAT", iVarcharOid, NAME_SIZE);
-		f[1] = new Field(connection, "TABLE_SCHEM", iVarcharOid, NAME_SIZE);
-		f[2] = new Field(connection, "TABLE_NAME", iVarcharOid, NAME_SIZE);
-		f[3] = new Field(connection, "COLUMN_NAME", iVarcharOid, NAME_SIZE);
-		f[4] = new Field(connection, "GRANTOR", iVarcharOid, NAME_SIZE);
-		f[5] = new Field(connection, "GRANTEE", iVarcharOid, NAME_SIZE);
-		f[6] = new Field(connection, "PRIVILEGE", iVarcharOid, NAME_SIZE);
-		f[7] = new Field(connection, "IS_GRANTABLE", iVarcharOid, NAME_SIZE);
-
-		// This is taken direct from the psql source
-		java.sql.ResultSet r = connection.ExecSQL("SELECT relname, relacl FROM pg_class, pg_user WHERE ( relkind = 'r' OR relkind = 'i') and relname !~ '^pg_' and relname !~ '^xin[vx][0-9]+' and usesysid = relowner and relname like '" + table.toLowerCase() + "' ORDER BY relname");
-		while (r.next())
-		{
-			byte[][] tuple = new byte[8][0];
-			tuple[0] = tuple[1] = "".getBytes();
-			if (Driver.logDebug)
-				Driver.debug("relname=\"" + r.getString(1) + "\" relacl=\"" + r.getString(2) + "\"");
+		f[0] = new Field(connection, "TABLE_CAT", iVarcharOid, getMaxNameLength());
+		f[1] = new Field(connection, "TABLE_SCHEM", iVarcharOid, getMaxNameLength());
+		f[2] = new Field(connection, "TABLE_NAME", iVarcharOid, getMaxNameLength());
+		f[3] = new Field(connection, "COLUMN_NAME", iVarcharOid, getMaxNameLength());
+		f[4] = new Field(connection, "GRANTOR", iVarcharOid, getMaxNameLength());
+		f[5] = new Field(connection, "GRANTEE", iVarcharOid, getMaxNameLength());
+		f[6] = new Field(connection, "PRIVILEGE", iVarcharOid, getMaxNameLength());
+		f[7] = new Field(connection, "IS_GRANTABLE", iVarcharOid, getMaxNameLength());
 
-			// For now, don't add to the result as relacl needs to be processed.
-			//v.addElement(tuple);
+		String sql;
+		if (connection.haveMinimumServerVersion("7.3")) {
+			sql = "SELECT n.nspname,c.relname,u.usename,c.relacl,a.attname "+
+				" FROM pg_catalog.pg_namespace n, pg_catalog.pg_class c, pg_catalog.pg_user u, pg_catalog.pg_attribute a "+
+				" WHERE c.relnamespace = n.oid "+
+				" AND u.usesysid = c.relowner "+
+				" AND c.oid = a.attrelid "+
+				" AND c.relkind = 'r' "+
+				" AND a.attnum > 0 AND NOT a.attisdropped ";
+			if (schema != null && !"".equals(schema)) {
+				sql += " AND n.nspname = '"+escapeQuotes(schema.toLowerCase())+"' ";
+			}
+		} else {
+			sql = "SELECT NULL::text AS nspname,c.relname,u.usename,c.relacl,a.attname "+
+				"FROM pg_class c, pg_user u,pg_attribute a "+
+				" WHERE u.usesysid = c.relowner "+
+				" AND c.oid = a.attrelid "+
+				" AND a.attnum > 0 "+
+				" AND c.relkind = 'r' ";
 		}
 
+		sql += " AND c.relname = '"+escapeQuotes(table.toLowerCase())+"' ";
+		if (columnNamePattern != null && !"".equals(columnNamePattern)) {
+			sql += " AND a.attname LIKE '"+escapeQuotes(columnNamePattern.toLowerCase())+"' ";
+		}
+		sql += " ORDER BY attname ";
+
+		ResultSet rs = connection.createStatement().executeQuery(sql);
+		while (rs.next()) {
+			byte schemaName[] = rs.getBytes("nspname");
+			byte tableName[] = rs.getBytes("relname");
+			byte column[] = rs.getBytes("attname");
+			String owner = rs.getString("usename");
+			String acl = rs.getString("relacl");
+			Hashtable permissions = parseACL(acl);
+			String permNames[] = new String[permissions.size()];
+			Enumeration e = permissions.keys();
+			int i=0;
+			while (e.hasMoreElements()) {
+				permNames[i++] = (String)e.nextElement();
+			}
+			sortStringArray(permNames);
+			for (i=0; i<permNames.length; i++) {
+				byte[] privilege = permNames[i].getBytes();
+				Vector grantees = (Vector)permissions.get(permNames[i]);
+				for (int j=0; j<grantees.size(); j++) {
+					String grantee = (String)grantees.elementAt(j);
+					String grantable = owner.equals(grantee) ? "YES" : "NO";
+					byte[][] tuple = new byte[8][0];
+					tuple[0] = null;
+					tuple[1] = schemaName;
+					tuple[2] = tableName;
+					tuple[3] = column;
+					tuple[4] = owner.getBytes();
+					tuple[5] = grantee.getBytes();
+					tuple[6] = privilege;
+					tuple[7] = grantable.getBytes();
+					v.addElement(tuple);
+				}
+			}
+		}
+		rs.close();
 		return connection.getResultSet(null, f, v, "OK", 1);
 	}
 
@@ -2295,7 +2497,6 @@ public abstract class AbstractJdbc1DatabaseMetaData
 	*	<LI><B>TABLE_CAT</B> String => table catalog (may be null)
 	*	<LI><B>TABLE_SCHEM</B> String => table schema (may be null)
 	*	<LI><B>TABLE_NAME</B> String => table name
-	*	<LI><B>COLUMN_NAME</B> String => column name
 	*	<LI><B>GRANTOR</B> => grantor of access (may be null)
 	*	<LI><B>GRANTEE</B> String => grantee of access
 	*	<LI><B>PRIVILEGE</B> String => name of access (SELECT,
@@ -2313,37 +2514,179 @@ public abstract class AbstractJdbc1DatabaseMetaData
 	*/
 	public java.sql.ResultSet getTablePrivileges(String catalog, String schemaPattern, String tableNamePattern) throws SQLException
 	{
-		Field f[] = new Field[8];
+		Field f[] = new Field[7];
 		Vector v = new Vector();
 
-		if (tableNamePattern == null)
-			tableNamePattern = "%";
-
-		f[0] = new Field(connection, "TABLE_CAT", iVarcharOid, NAME_SIZE);
-		f[1] = new Field(connection, "TABLE_SCHEM", iVarcharOid, NAME_SIZE);
-		f[2] = new Field(connection, "TABLE_NAME", iVarcharOid, NAME_SIZE);
-		f[3] = new Field(connection, "COLUMN_NAME", iVarcharOid, NAME_SIZE);
-		f[4] = new Field(connection, "GRANTOR", iVarcharOid, NAME_SIZE);
-		f[5] = new Field(connection, "GRANTEE", iVarcharOid, NAME_SIZE);
-		f[6] = new Field(connection, "PRIVILEGE", iVarcharOid, NAME_SIZE);
-		f[7] = new Field(connection, "IS_GRANTABLE", iVarcharOid, NAME_SIZE);
-
-		// This is taken direct from the psql source
-		java.sql.ResultSet r = connection.ExecSQL("SELECT relname, relacl FROM pg_class, pg_user WHERE ( relkind = 'r' OR relkind = 'i') and relname !~ '^pg_' and relname !~ '^xin[vx][0-9]+' and usesysid = relowner and relname like '" + tableNamePattern.toLowerCase() + "' ORDER BY relname");
-		while (r.next())
-		{
-			byte[][] tuple = new byte[8][0];
-			tuple[0] = tuple[1] = "".getBytes();
-			if (Driver.logDebug)
-				Driver.debug("relname=\"" + r.getString(1) + "\" relacl=\"" + r.getString(2) + "\"");
+		f[0] = new Field(connection, "TABLE_CAT", iVarcharOid, getMaxNameLength());
+		f[1] = new Field(connection, "TABLE_SCHEM", iVarcharOid, getMaxNameLength());
+		f[2] = new Field(connection, "TABLE_NAME", iVarcharOid, getMaxNameLength());
+		f[3] = new Field(connection, "GRANTOR", iVarcharOid, getMaxNameLength());
+		f[4] = new Field(connection, "GRANTEE", iVarcharOid, getMaxNameLength());
+		f[5] = new Field(connection, "PRIVILEGE", iVarcharOid, getMaxNameLength());
+		f[6] = new Field(connection, "IS_GRANTABLE", iVarcharOid, getMaxNameLength());
+
+		String sql;
+		if (connection.haveMinimumServerVersion("7.3")) {
+			sql = "SELECT n.nspname,c.relname,u.usename,c.relacl "+
+				" FROM pg_catalog.pg_namespace n, pg_catalog.pg_class c, pg_catalog.pg_user u "+
+				" WHERE c.relnamespace = n.oid "+
+				" AND u.usesysid = c.relowner "+
+				" AND c.relkind = 'r' ";
+			if (schemaPattern != null && !"".equals(schemaPattern)) {
+				sql += " AND n.nspname LIKE '"+escapeQuotes(schemaPattern.toLowerCase())+"' ";
+			}
+		} else {
+			sql = "SELECT NULL::text AS nspname,c.relname,u.usename,c.relacl "+
+				"FROM pg_class c, pg_user u "+
+				" WHERE u.usesysid = c.relowner "+
+				" AND c.relkind = 'r' ";
+		}
 
-			// For now, don't add to the result as relacl needs to be processed.
-			//v.addElement(tuple);
+		if (tableNamePattern != null && !"".equals(tableNamePattern)) {
+			sql += " AND c.relname LIKE '"+escapeQuotes(tableNamePattern.toLowerCase())+"' ";
+		}
+		sql += " ORDER BY nspname, relname ";
+
+		ResultSet rs = connection.createStatement().executeQuery(sql);
+		while (rs.next()) {
+			byte schema[] = rs.getBytes("nspname");
+			byte table[] = rs.getBytes("relname");
+			String owner = rs.getString("usename");
+			String acl = rs.getString("relacl");
+			Hashtable permissions = parseACL(acl);
+			String permNames[] = new String[permissions.size()];
+			Enumeration e = permissions.keys();
+			int i=0;
+			while (e.hasMoreElements()) {
+				permNames[i++] = (String)e.nextElement();
+			}
+			sortStringArray(permNames);
+			for (i=0; i<permNames.length; i++) {
+				byte[] privilege = permNames[i].getBytes();
+				Vector grantees = (Vector)permissions.get(permNames[i]);
+				for (int j=0; j<grantees.size(); j++) {
+					String grantee = (String)grantees.elementAt(j);
+					String grantable = owner.equals(grantee) ? "YES" : "NO";
+					byte[][] tuple = new byte[7][0];
+					tuple[0] = null;
+					tuple[1] = schema;
+					tuple[2] = table;
+					tuple[3] = owner.getBytes();
+					tuple[4] = grantee.getBytes();
+					tuple[5] = privilege;
+					tuple[6] = grantable.getBytes();
+					v.addElement(tuple);
+				}
+			}
 		}
+		rs.close();
 
 		return connection.getResultSet(null, f, v, "OK", 1);
 	}
 
+	private static void sortStringArray(String s[]) {
+		for (int i=0; i<s.length-1; i++) {
+			for (int j=i+1; j<s.length; j++) {
+				if (s[i].compareTo(s[j]) > 0) {
+					String tmp = s[i];
+					s[i] = s[j];
+					s[j] = tmp;
+				}
+			}
+		}
+	}
+
+	/**
+	 * Parse an String of ACLs into a Vector of ACLs.
+	 */
+	private static Vector parseACLArray(String aclString) {
+		Vector acls = new Vector();
+		if (aclString == null || aclString.length() == 0) {
+			return acls;
+		}
+		boolean inQuotes = false;
+		// start at 1 because of leading "{"
+		int beginIndex = 1;
+		char prevChar = ' ';
+		for (int i=beginIndex; i<aclString.length(); i++) {
+			
+			char c = aclString.charAt(i);
+			if (c == '"' && prevChar != '\\') {
+				inQuotes = !inQuotes;
+			} else if (c == ',' && !inQuotes) {
+				acls.addElement(aclString.substring(beginIndex,i));
+				beginIndex = i+1;
+			}
+			prevChar = c;
+		}
+		// add last element removing the trailing "}"
+		acls.addElement(aclString.substring(beginIndex,aclString.length()-1));
+
+		// Strip out enclosing quotes, if any.
+		for (int i=0; i<acls.size(); i++) {
+			String acl = (String)acls.elementAt(i);
+			if (acl.startsWith("\"") && acl.endsWith("\"")) {
+				acl = acl.substring(1,acl.length()-1);
+				acls.setElementAt(acl,i);
+			}
+		}
+		return acls;
+	}
+
+	/**
+	 * Add the user described by the given acl to the Vectors of users
+	 * with the privileges described by the acl.
+	 */
+	private void addACLPrivileges(String acl, Hashtable privileges) {
+		int equalIndex = acl.lastIndexOf("=");
+		String name = acl.substring(0,equalIndex);
+		if (name.length() == 0) {
+			name = "PUBLIC";
+		}
+		String privs = acl.substring(equalIndex+1);
+		for (int i=0; i<privs.length(); i++) {
+			char c = privs.charAt(i);
+			String sqlpriv;
+			switch (c) {
+				case 'a': sqlpriv = "INSERT"; break;
+				case 'r': sqlpriv = "SELECT"; break;
+				case 'w': sqlpriv = "UPDATE"; break;
+				case 'd': sqlpriv = "DELETE"; break;
+				case 'R': sqlpriv = "RULE"; break;
+				case 'x': sqlpriv = "REFERENCES"; break;
+				case 't': sqlpriv = "TRIGGER"; break;
+				// the folloowing can't be granted to a table, but
+				// we'll keep them for completeness.
+				case 'X': sqlpriv = "EXECUTE"; break;
+				case 'U': sqlpriv = "USAGE"; break;
+				case 'C': sqlpriv = "CREATE"; break;
+				case 'T': sqlpriv = "CREATE TEMP"; break;
+				default: sqlpriv = "UNKNOWN";
+			}
+			Vector usersWithPermission = (Vector)privileges.get(sqlpriv);
+			if (usersWithPermission == null) {
+				usersWithPermission = new Vector();
+				privileges.put(sqlpriv,usersWithPermission);
+			}
+			usersWithPermission.addElement(name);
+		}
+	}
+
+	/**
+	 * Take the a String representing an array of ACLs and return
+	 * a Hashtable mapping the SQL permission name to a Vector of
+	 * usernames who have that permission.
+	 */
+	protected Hashtable parseACL(String aclArray) {
+		Vector acls = parseACLArray(aclArray);
+		Hashtable privileges = new Hashtable();
+		for (int i=0; i<acls.size(); i++) {
+			String acl = (String)acls.elementAt(i);
+			addACLPrivileges(acl,privileges);
+		}
+		return privileges;
+	}
+
 	/*
 	 * Get a description of a table's optimal set of columns that
 	 * uniquely identifies a row. They are ordered by SCOPE.
@@ -2381,20 +2724,56 @@ public abstract class AbstractJdbc1DatabaseMetaData
 	// Implementation note: This is required for Borland's JBuilder to work
 	public java.sql.ResultSet getBestRowIdentifier(String catalog, String schema, String table, int scope, boolean nullable) throws SQLException
 	{
-		// for now, this returns an empty result set.
 		Field f[] = new Field[8];
-		ResultSet r;	// ResultSet for the SQL query that we need to do
 		Vector v = new Vector();		// The new ResultSet tuple stuff
 
 		f[0] = new Field(connection, "SCOPE", iInt2Oid, 2);
-		f[1] = new Field(connection, "COLUMN_NAME", iVarcharOid, NAME_SIZE);
+		f[1] = new Field(connection, "COLUMN_NAME", iVarcharOid, getMaxNameLength());
 		f[2] = new Field(connection, "DATA_TYPE", iInt2Oid, 2);
-		f[3] = new Field(connection, "TYPE_NAME", iVarcharOid, NAME_SIZE);
+		f[3] = new Field(connection, "TYPE_NAME", iVarcharOid, getMaxNameLength());
 		f[4] = new Field(connection, "COLUMN_SIZE", iInt4Oid, 4);
 		f[5] = new Field(connection, "BUFFER_LENGTH", iInt4Oid, 4);
 		f[6] = new Field(connection, "DECIMAL_DIGITS", iInt2Oid, 2);
 		f[7] = new Field(connection, "PSEUDO_COLUMN", iInt2Oid, 2);
 
+		/* At the moment this simply returns a table's primary key,
+		 * if there is one.  I believe other unique indexes, ctid,
+		 * and oid should also be considered. -KJ
+		 */
+
+		String from;
+		String where = "";
+		if (connection.haveMinimumServerVersion("7.3")) {
+			from = " FROM pg_catalog.pg_namespace n, pg_catalog.pg_class ct, pg_catalog.pg_class ci, pg_catalog.pg_attribute a, pg_catalog.pg_index i ";
+			where = " AND ct.relnamespace = n.oid ";
+			if (schema != null && !"".equals(schema)) {
+				where += " AND n.nspname = '"+escapeQuotes(schema.toLowerCase())+"' ";
+			}
+		} else {
+			from = " FROM pg_class ct, pg_class ci, pg_attribute a, pg_index i ";
+		}
+		String sql = "SELECT a.attname, a.atttypid "+
+			from+
+			" WHERE ct.oid=i.indrelid AND ci.oid=i.indexrelid "+
+			" AND a.attrelid=ci.oid AND i.indisprimary "+
+			" AND ct.relname = '"+escapeQuotes(table.toLowerCase())+"' "+
+			where+
+			" ORDER BY a.attnum ";
+
+		ResultSet rs = connection.createStatement().executeQuery(sql);
+		while (rs.next()) {
+			byte tuple[][] = new byte[8][];
+			int columnTypeOid = rs.getInt("atttypid");
+			tuple[0] = Integer.toString(scope).getBytes();
+			tuple[1] = rs.getBytes("attname");
+			tuple[2] = Integer.toString(connection.getSQLType(columnTypeOid)).getBytes();
+			tuple[3] = connection.getPGType(columnTypeOid).getBytes();
+			tuple[4] = null;
+			tuple[5] = null;
+			tuple[6] = null;
+			tuple[7] = Integer.toString(java.sql.DatabaseMetaData.bestRowNotPseudo).getBytes();
+			v.addElement(tuple);
+		}
 		return connection.getResultSet(null, f, v, "OK", 1);
 	}
 
@@ -2403,8 +2782,6 @@ public abstract class AbstractJdbc1DatabaseMetaData
 	 * updated when any value in a row is updated.	They are
 	 * unordered.
 	 *
-	 * This method is currently unimplemented.
-	 *
 	 * <P>Each column description has the following columns:
 	 *	<OL>
 	 *	<LI><B>SCOPE</B> short => is not used
@@ -2430,7 +2807,44 @@ public abstract class AbstractJdbc1DatabaseMetaData
 	 */
 	public java.sql.ResultSet getVersionColumns(String catalog, String schema, String table) throws SQLException
 	{
-		throw org.postgresql.Driver.notImplemented();
+		Field f[] = new Field[8];
+		Vector v = new Vector();		// The new ResultSet tuple stuff
+
+		f[0] = new Field(connection, "SCOPE", iInt2Oid, 2);
+		f[1] = new Field(connection, "COLUMN_NAME", iVarcharOid, getMaxNameLength());
+		f[2] = new Field(connection, "DATA_TYPE", iInt2Oid, 2);
+		f[3] = new Field(connection, "TYPE_NAME", iVarcharOid, getMaxNameLength());
+		f[4] = new Field(connection, "COLUMN_SIZE", iInt4Oid, 4);
+		f[5] = new Field(connection, "BUFFER_LENGTH", iInt4Oid, 4);
+		f[6] = new Field(connection, "DECIMAL_DIGITS", iInt2Oid, 2);
+		f[7] = new Field(connection, "PSEUDO_COLUMN", iInt2Oid, 2);
+
+		byte tuple[][] = new byte[8][0];
+
+		/* Postgresql does not have any column types that are
+		 * automatically updated like some databases' timestamp type.
+		 * We can't tell what rules or triggers might be doing, so we
+		 * are left with the system columns that change on an update.
+		 * An update may change all of the following system columns:
+		 * ctid, xmax, xmin, cmax, and cmin.  Depending on if we are
+		 * in a transaction and wether we roll it back or not the
+		 * only guaranteed change is to ctid. -KJ
+		 */
+
+		tuple[0] = null;
+		tuple[1] = "ctid".getBytes();
+		tuple[2] = Integer.toString(connection.getSQLType("tid")).getBytes();
+		tuple[3] = "tid".getBytes();
+		tuple[4] = null;
+		tuple[5] = null;
+		tuple[6] = null;
+		tuple[7] = Integer.toString(java.sql.DatabaseMetaData.versionColumnPseudo).getBytes();
+		v.addElement(tuple);
+
+		/* Perhaps we should check that the given
+		 * catalog.schema.table actually exists. -KJ
+		 */
+		return connection.getResultSet(null, f, v, "OK", 1);
 	}
 
 	/*
@@ -2455,22 +2869,32 @@ public abstract class AbstractJdbc1DatabaseMetaData
 	 */
 	public java.sql.ResultSet getPrimaryKeys(String catalog, String schema, String table) throws SQLException
 	{
-		return connection.createStatement().executeQuery("SELECT " +
-				"'' as TABLE_CAT," +
-				"'' AS TABLE_SCHEM," +
-				"bc.relname AS TABLE_NAME," +
-				"a.attname AS COLUMN_NAME," +
-				"a.attnum as KEY_SEQ," +
-				"ic.relname as PK_NAME " +
-				" FROM pg_class bc, pg_class ic, pg_index i, pg_attribute a" +
-				" WHERE bc.relkind = 'r' " +	//	-- not indices
-				"  and upper(bc.relname) = upper('" + table + "')" +
-				"  and i.indrelid = bc.oid" +
-				"  and i.indexrelid = ic.oid" +
-				"  and ic.oid = a.attrelid" +
-				"  and i.indisprimary='t' " +
-				" ORDER BY table_name, pk_name, key_seq"
-														);
+		String select;
+		String from;
+		String where = "";
+		if (connection.haveMinimumServerVersion("7.3")) {
+			select = "SELECT NULL AS TABLE_CAT, n.nspname AS TABLE_SCHEM, ";
+			from = " FROM pg_catalog.pg_namespace n, pg_catalog.pg_class ct, pg_catalog.pg_class ci, pg_catalog.pg_attribute a, pg_catalog.pg_index i ";
+			where = " AND ct.relnamespace = n.oid ";
+			if (schema != null && !"".equals(schema)) {
+				where += " AND n.nspname = '"+escapeQuotes(schema.toLowerCase())+"' ";
+			}
+		} else {
+			select = "SELECT NULL AS TABLE_CAT, NULL AS TABLE_SCHEM, ";
+			from = " FROM pg_class ct, pg_class ci, pg_attribute a, pg_index i ";
+		}
+		String sql = select+
+			" ct.relname AS TABLE_NAME, "+
+			" a.attname AS COLUMN_NAME, "+
+			" a.attnum AS KEY_SEQ, "+
+			" ci.relname AS PK_NAME "+
+			from+
+			" WHERE ct.oid=i.indrelid AND ci.oid=i.indexrelid "+
+			" AND a.attrelid=ci.oid AND i.indisprimary "+
+			" AND ct.relname = '"+escapeQuotes(table.toLowerCase())+"' "+
+			where+
+			" ORDER BY table_name, pk_name, key_seq";
+		return connection.createStatement().executeQuery(sql);
 	}
 
 	/*
@@ -2539,80 +2963,112 @@ public abstract class AbstractJdbc1DatabaseMetaData
 	 * @throws SQLException
 	 */
 
-	protected java.sql.ResultSet getImportedExportedKeys(String catalog, String schema, String primaryTable, String foreignTable) throws SQLException
+	protected java.sql.ResultSet getImportedExportedKeys(String primaryCatalog, String primarySchema, String primaryTable, String foreignCatalog, String foreignSchema, String foreignTable) throws SQLException
 	{
 		Field f[] = new Field[14];
 
-		f[0] = new Field(connection, "PKTABLE_CAT", iVarcharOid, NAME_SIZE);
-		f[1] = new Field(connection, "PKTABLE_SCHEM", iVarcharOid, NAME_SIZE);
-		f[2] = new Field(connection, "PKTABLE_NAME", iVarcharOid, NAME_SIZE);
-		f[3] = new Field(connection, "PKCOLUMN_NAME", iVarcharOid, NAME_SIZE);
-		f[4] = new Field(connection, "FKTABLE_CAT", iVarcharOid, NAME_SIZE);
-		f[5] = new Field(connection, "FKTABLE_SCHEM", iVarcharOid, NAME_SIZE);
-		f[6] = new Field(connection, "FKTABLE_NAME", iVarcharOid, NAME_SIZE);
-		f[7] = new Field(connection, "FKCOLUMN_NAME", iVarcharOid, NAME_SIZE);
+		f[0] = new Field(connection, "PKTABLE_CAT", iVarcharOid, getMaxNameLength());
+		f[1] = new Field(connection, "PKTABLE_SCHEM", iVarcharOid, getMaxNameLength());
+		f[2] = new Field(connection, "PKTABLE_NAME", iVarcharOid, getMaxNameLength());
+		f[3] = new Field(connection, "PKCOLUMN_NAME", iVarcharOid, getMaxNameLength());
+		f[4] = new Field(connection, "FKTABLE_CAT", iVarcharOid, getMaxNameLength());
+		f[5] = new Field(connection, "FKTABLE_SCHEM", iVarcharOid, getMaxNameLength());
+		f[6] = new Field(connection, "FKTABLE_NAME", iVarcharOid, getMaxNameLength());
+		f[7] = new Field(connection, "FKCOLUMN_NAME", iVarcharOid, getMaxNameLength());
 		f[8] = new Field(connection, "KEY_SEQ", iInt2Oid, 2);
 		f[9] = new Field(connection, "UPDATE_RULE", iInt2Oid, 2);
 		f[10] = new Field(connection, "DELETE_RULE", iInt2Oid, 2);
-		f[11] = new Field(connection, "FK_NAME", iVarcharOid, NAME_SIZE);
-		f[12] = new Field(connection, "PK_NAME", iVarcharOid, NAME_SIZE);
+		f[11] = new Field(connection, "FK_NAME", iVarcharOid, getMaxNameLength());
+		f[12] = new Field(connection, "PK_NAME", iVarcharOid, getMaxNameLength());
 		f[13] = new Field(connection, "DEFERRABILITY", iInt2Oid, 2);
 
-		java.sql.ResultSet rs = connection.ExecSQL(
-									"SELECT distinct "
-									+ "c.relname as prelname, "
-									+ "c2.relname as frelname, "
-									+ "t.tgconstrname, "
-									+ "a.attnum as keyseq, "
-									+ "ic.relname as fkeyname, "
-									+ "t.tgdeferrable, "
-									+ "t.tginitdeferred, "
-									+ "t.tgnargs,t.tgargs, "
-									+ "p1.proname as updaterule, "
-									+ "p2.proname as deleterule "
-									+ "FROM "
-									+ "pg_trigger t, "
-									+ "pg_trigger t1, "
-									+ "pg_class c, "
-									+ "pg_class c2, "
-									+ "pg_class ic, "
-									+ "pg_proc p1, "
-									+ "pg_proc p2, "
-									+ "pg_index i, "
-									+ "pg_attribute a "
-									+ "WHERE "
-									// isolate the update rule
-									+ "(t.tgrelid=c.oid "
-									+ "AND t.tgisconstraint "
-									+ "AND t.tgconstrrelid=c2.oid "
-									+ "AND t.tgfoid=p1.oid "
-									+ "and p1.proname like '%%upd') "
-
-									+ "and "
-									// isolate the delete rule
-									+ "(t1.tgrelid=c.oid "
-									+ "and t1.tgisconstraint "
-									+ "and t1.tgconstrrelid=c2.oid "
-									+ "AND t1.tgfoid=p2.oid "
-									+ "and p2.proname like '%%del') "
-
-									// if we are looking for exported keys then primary table will be used
-									+ ((primaryTable != null) ? "AND c.relname='" + primaryTable + "' " : "")
-
-									// if we are looking for imported keys then the foreign table will be used
-									+ ((foreignTable != null) ? "AND c2.relname='" + foreignTable + "' " : "")
-									+ "AND i.indrelid=c.oid "
-									+ "AND i.indexrelid=ic.oid "
-									+ "AND ic.oid=a.attrelid "
-									+ "AND i.indisprimary "
-									+ "ORDER BY "
-
-									// orderby is as follows getExported, orders by FKTABLE,
-									// getImported orders by PKTABLE
-									// getCrossReference orders by FKTABLE, so this should work for both,
-									// since when getting crossreference, primaryTable will be defined
-
-									+ (primaryTable != null ? "frelname" : "prelname") + ",keyseq");
+
+		String select;
+		String from;
+		String where = "";
+
+		/*
+		 * The addition of the pg_constraint in 7.3 table should have really
+		 * helped us out here, but it comes up just a bit short.
+		 * - The conkey, confkey columns aren't really useful without
+		 *   contrib/array unless we want to issues separate queries.
+		 * - Unique indexes that can support foreign keys are not necessarily
+		 *   added to pg_constraint.  Also multiple unique indexes covering
+		 *   the same keys can be created which make it difficult to determine
+		 *   the PK_NAME field.
+		 */
+
+		if (connection.haveMinimumServerVersion("7.3")) {
+			select = "SELECT DISTINCT n.nspname as pnspname,n2.nspname as fnspname, ";
+			from = " FROM pg_catalog.pg_namespace n, pg_catalog.pg_namespace n2, pg_catalog.pg_trigger t, pg_catalog.pg_trigger t1, pg_catalog.pg_class c, pg_catalog.pg_class c2, pg_catalog.pg_class ic, pg_catalog.pg_proc p1, pg_catalog.pg_proc p2, pg_catalog.pg_index i, pg_catalog.pg_attribute a ";
+			where = " AND c.relnamespace = n.oid AND c2.relnamespace=n2.oid ";
+			if (primarySchema != null && !"".equals(primarySchema)) {
+				where += " AND n.nspname = '"+escapeQuotes(primarySchema.toLowerCase())+"' ";
+			}
+			if (foreignSchema != null && !"".equals(foreignSchema)) {
+				where += " AND n2.nspname = '"+escapeQuotes(foreignSchema.toLowerCase())+"' ";
+			}
+		} else {
+			select = "SELECT DISTINCT NULL::text as pnspname, NULL::text as fnspname, ";
+			from = " FROM pg_trigger t, pg_trigger t1, pg_class c, pg_class c2, pg_class ic, pg_proc p1, pg_proc p2, pg_index i, pg_attribute a ";
+		}
+
+		String sql = select
+			+ "c.relname as prelname, "
+			+ "c2.relname as frelname, "
+			+ "t.tgconstrname, "
+			+ "a.attnum as keyseq, "
+			+ "ic.relname as fkeyname, "
+			+ "t.tgdeferrable, "
+			+ "t.tginitdeferred, "
+			+ "t.tgnargs,t.tgargs, "
+			+ "p1.proname as updaterule, "
+			+ "p2.proname as deleterule "
+			+ from 
+			+ "WHERE "
+			// isolate the update rule
+			+ "(t.tgrelid=c.oid "
+			+ "AND t.tgisconstraint "
+			+ "AND t.tgconstrrelid=c2.oid "
+			+ "AND t.tgfoid=p1.oid "
+			+ "and p1.proname like 'RI\\\\_FKey\\\\_%\\\\_upd') "
+
+			+ "and "
+			// isolate the delete rule
+			+ "(t1.tgrelid=c.oid "
+			+ "and t1.tgisconstraint "
+			+ "and t1.tgconstrrelid=c2.oid "
+			+ "AND t1.tgfoid=p2.oid "
+			+ "and p2.proname like 'RI\\\\_FKey\\\\_%\\\\_del') "
+			+ "AND i.indrelid=c.oid "
+			+ "AND i.indexrelid=ic.oid "
+			+ "AND ic.oid=a.attrelid "
+			+ "AND i.indisprimary "
+			+ where;
+
+		if (primaryTable != null) {
+			sql += "AND c.relname='" + escapeQuotes(primaryTable.toLowerCase()) + "' ";
+		}
+		if (foreignTable != null) {
+			sql += "AND c2.relname='" + escapeQuotes(foreignTable.toLowerCase()) + "' ";
+		}
+
+		sql += "ORDER BY ";
+
+		// orderby is as follows getExported, orders by FKTABLE,
+		// getImported orders by PKTABLE
+		// getCrossReference orders by FKTABLE, so this should work for both,
+		// since when getting crossreference, primaryTable will be defined
+
+		if (primaryTable != null) {
+			sql += "frelname";
+		} else {
+			sql += "prelname";
+		}
+
+		sql += ",keyseq";
+
+		ResultSet rs = connection.createStatement().executeQuery(sql);
 
 		// returns the following columns
 		// and some example data with a table defined as follows
@@ -2632,15 +3088,16 @@ public abstract class AbstractJdbc1DatabaseMetaData
 
 		Vector tuples = new Vector();
 
-
 		while ( rs.next() )
 		{
 			byte tuple[][] = new byte[14][];
 
-			tuple[2] = rs.getBytes(1); //PKTABLE_NAME
-			tuple[6] = rs.getBytes(2); //FKTABLE_NAME
-			String fKeyName = rs.getString(3);
-			String updateRule = rs.getString(10);
+			tuple[1] = rs.getBytes(1); //PKTABLE_SCHEM
+			tuple[5] = rs.getBytes(2); //FKTABLE_SCHEM
+			tuple[2] = rs.getBytes(3); //PKTABLE_NAME
+			tuple[6] = rs.getBytes(4); //FKTABLE_NAME
+			String fKeyName = rs.getString(5);
+			String updateRule = rs.getString(12);
 
 			if (updateRule != null )
 			{
@@ -2665,7 +3122,7 @@ public abstract class AbstractJdbc1DatabaseMetaData
 
 			}
 
-			String deleteRule = rs.getString(11);
+			String deleteRule = rs.getString(13);
 
 			if ( deleteRule != null )
 			{
@@ -2683,20 +3140,17 @@ public abstract class AbstractJdbc1DatabaseMetaData
 			}
 
 
+			int keySequence = rs.getInt(6); //KEY_SEQ
+
 			// Parse the tgargs data
 			String fkeyColumn = "";
 			String pkeyColumn = "";
-
-
 			// Note, I am guessing at most of this, but it should be close
 			// if not, please correct
 			// the keys are in pairs and start after the first four arguments
 			// the arguments are seperated by \000
 
-			int keySequence = rs.getInt(4); //KEY_SEQ
-
-			// get the args
-			String targs = rs.getString(9);
+			String targs = rs.getString(11);
 
 			// args look like this
 			//<unnamed>\000ww\000vv\000UNSPECIFIED\000m\000a\000n\000b\000
@@ -2720,14 +3174,14 @@ public abstract class AbstractJdbc1DatabaseMetaData
 			tuple[3] = pkeyColumn.getBytes(); //PKCOLUMN_NAME
 			tuple[7] = fkeyColumn.getBytes(); //FKCOLUMN_NAME
 
-			tuple[8] = rs.getBytes(4); //KEY_SEQ
+			tuple[8] = rs.getBytes(6); //KEY_SEQ
 			tuple[11] = targs.getBytes(); //FK_NAME this will give us a unique name for the foreign key
-			tuple[12] = rs.getBytes(5); //PK_NAME
+			tuple[12] = rs.getBytes(7); //PK_NAME
 
 			// DEFERRABILITY
 			int deferrability = java.sql.DatabaseMetaData.importedKeyNotDeferrable;
-			boolean deferrable = rs.getBoolean(6);
-			boolean initiallyDeferred = rs.getBoolean(7);
+			boolean deferrable = rs.getBoolean(8);
+			boolean initiallyDeferred = rs.getBoolean(9);
 			if (deferrable)
 			{
 				if (initiallyDeferred)
@@ -2796,7 +3250,7 @@ public abstract class AbstractJdbc1DatabaseMetaData
 	 */
 	public java.sql.ResultSet getImportedKeys(String catalog, String schema, String table) throws SQLException
 	{
-		return getImportedExportedKeys(catalog, schema, null, table);
+		return getImportedExportedKeys(null,null,null,catalog, schema, table);
 	}
 
 	/*
@@ -2854,7 +3308,7 @@ public abstract class AbstractJdbc1DatabaseMetaData
 	 */
 	public java.sql.ResultSet getExportedKeys(String catalog, String schema, String table) throws SQLException
 	{
-		return getImportedExportedKeys(catalog, schema, table, null);
+		return getImportedExportedKeys(catalog, schema, table, null,null,null);
 	}
 
 	/*
@@ -2915,7 +3369,7 @@ public abstract class AbstractJdbc1DatabaseMetaData
 	 */
 	public java.sql.ResultSet getCrossReference(String primaryCatalog, String primarySchema, String primaryTable, String foreignCatalog, String foreignSchema, String foreignTable) throws SQLException
 	{
-		return getImportedExportedKeys(primaryCatalog, primarySchema, primaryTable, foreignTable);
+		return getImportedExportedKeys(primaryCatalog, primarySchema, primaryTable, foreignCatalog, foreignSchema, foreignTable);
 	}
 
 	/*
@@ -2965,64 +3419,67 @@ public abstract class AbstractJdbc1DatabaseMetaData
 	 */
 	public java.sql.ResultSet getTypeInfo() throws SQLException
 	{
-		java.sql.ResultSet rs = connection.ExecSQL("select typname from pg_type");
-		if (rs != null)
-		{
-			Field f[] = new Field[18];
-			ResultSet r;	// ResultSet for the SQL query that we need to do
-			Vector v = new Vector();		// The new ResultSet tuple stuff
-
-			f[0] = new Field(connection, "TYPE_NAME", iVarcharOid, NAME_SIZE);
-			f[1] = new Field(connection, "DATA_TYPE", iInt2Oid, 2);
-			f[2] = new Field(connection, "PRECISION", iInt4Oid, 4);
-			f[3] = new Field(connection, "LITERAL_PREFIX", iVarcharOid, NAME_SIZE);
-			f[4] = new Field(connection, "LITERAL_SUFFIX", iVarcharOid, NAME_SIZE);
-			f[5] = new Field(connection, "CREATE_PARAMS", iVarcharOid, NAME_SIZE);
-			f[6] = new Field(connection, "NULLABLE", iInt2Oid, 2);
-			f[7] = new Field(connection, "CASE_SENSITIVE", iBoolOid, 1);
-			f[8] = new Field(connection, "SEARCHABLE", iInt2Oid, 2);
-			f[9] = new Field(connection, "UNSIGNED_ATTRIBUTE", iBoolOid, 1);
-			f[10] = new Field(connection, "FIXED_PREC_SCALE", iBoolOid, 1);
-			f[11] = new Field(connection, "AUTO_INCREMENT", iBoolOid, 1);
-			f[12] = new Field(connection, "LOCAL_TYPE_NAME", iVarcharOid, NAME_SIZE);
-			f[13] = new Field(connection, "MINIMUM_SCALE", iInt2Oid, 2);
-			f[14] = new Field(connection, "MAXIMUM_SCALE", iInt2Oid, 2);
-			f[15] = new Field(connection, "SQL_DATA_TYPE", iInt4Oid, 4);
-			f[16] = new Field(connection, "SQL_DATETIME_SUB", iInt4Oid, 4);
-			f[17] = new Field(connection, "NUM_PREC_RADIX", iInt4Oid, 4);
-
-			// cache some results, this will keep memory useage down, and speed
-			// things up a little.
-			byte b9[] = "9".getBytes();
-			byte b10[] = "10".getBytes();
-			byte bf[] = "f".getBytes();
-			byte bnn[] = Integer.toString(java.sql.DatabaseMetaData.typeNoNulls).getBytes();
-			byte bts[] = Integer.toString(java.sql.DatabaseMetaData.typeSearchable).getBytes();
-
-			while (rs.next())
-			{
-				byte[][] tuple = new byte[18][];
-				String typname = rs.getString(1);
-				tuple[0] = typname.getBytes();
-				tuple[1] = Integer.toString(connection.getSQLType(typname)).getBytes();
-				tuple[2] = b9;	// for now
-				tuple[6] = bnn; // for now
-				tuple[7] = bf; // false for now - not case sensitive
-				tuple[8] = bts;
-				tuple[9] = bf; // false for now - it's signed
-				tuple[10] = bf; // false for now - must handle money
-				tuple[11] = bf; // false for now - handle autoincrement
-				// 12 - LOCAL_TYPE_NAME is null
-				// 13 & 14 ?
-				// 15 & 16 are unused so we return null
-				tuple[17] = b10; // everything is base 10
-				v.addElement(tuple);
-			}
-			rs.close();
-			return connection.getResultSet(null, f, v, "OK", 1);
+
+		Field f[] = new Field[18];
+		ResultSet r;	// ResultSet for the SQL query that we need to do
+		Vector v = new Vector();		// The new ResultSet tuple stuff
+
+		f[0] = new Field(connection, "TYPE_NAME", iVarcharOid, getMaxNameLength());
+		f[1] = new Field(connection, "DATA_TYPE", iInt2Oid, 2);
+		f[2] = new Field(connection, "PRECISION", iInt4Oid, 4);
+		f[3] = new Field(connection, "LITERAL_PREFIX", iVarcharOid, getMaxNameLength());
+		f[4] = new Field(connection, "LITERAL_SUFFIX", iVarcharOid, getMaxNameLength());
+		f[5] = new Field(connection, "CREATE_PARAMS", iVarcharOid, getMaxNameLength());
+		f[6] = new Field(connection, "NULLABLE", iInt2Oid, 2);
+		f[7] = new Field(connection, "CASE_SENSITIVE", iBoolOid, 1);
+		f[8] = new Field(connection, "SEARCHABLE", iInt2Oid, 2);
+		f[9] = new Field(connection, "UNSIGNED_ATTRIBUTE", iBoolOid, 1);
+		f[10] = new Field(connection, "FIXED_PREC_SCALE", iBoolOid, 1);
+		f[11] = new Field(connection, "AUTO_INCREMENT", iBoolOid, 1);
+		f[12] = new Field(connection, "LOCAL_TYPE_NAME", iVarcharOid, getMaxNameLength());
+		f[13] = new Field(connection, "MINIMUM_SCALE", iInt2Oid, 2);
+		f[14] = new Field(connection, "MAXIMUM_SCALE", iInt2Oid, 2);
+		f[15] = new Field(connection, "SQL_DATA_TYPE", iInt4Oid, 4);
+		f[16] = new Field(connection, "SQL_DATETIME_SUB", iInt4Oid, 4);
+		f[17] = new Field(connection, "NUM_PREC_RADIX", iInt4Oid, 4);
+
+		String sql;
+		if (connection.haveMinimumServerVersion("7.3")) {
+			sql = "SELECT typname FROM pg_catalog.pg_type";
+		} else {
+			sql = "SELECT typname FROM pg_type";
 		}
 
-		throw new PSQLException("postgresql.metadata.unavailable");
+		ResultSet rs = connection.createStatement().executeQuery(sql);
+		// cache some results, this will keep memory useage down, and speed
+		// things up a little.
+		byte b9[] = "9".getBytes();
+		byte b10[] = "10".getBytes();
+		byte bf[] = "f".getBytes();
+		byte bnn[] = Integer.toString(java.sql.DatabaseMetaData.typeNoNulls).getBytes();
+		byte bts[] = Integer.toString(java.sql.DatabaseMetaData.typeSearchable).getBytes();
+
+		while (rs.next())
+		{
+			byte[][] tuple = new byte[18][];
+			String typname = rs.getString(1);
+			tuple[0] = typname.getBytes();
+			tuple[1] = Integer.toString(connection.getSQLType(typname)).getBytes();
+			tuple[2] = b9;	// for now
+			tuple[6] = bnn; // for now
+			tuple[7] = bf; // false for now - not case sensitive
+			tuple[8] = bts;
+			tuple[9] = bf; // false for now - it's signed
+			tuple[10] = bf; // false for now - must handle money
+			tuple[11] = bf; // false for now - handle autoincrement
+			// 12 - LOCAL_TYPE_NAME is null
+			// 13 & 14 ?
+			// 15 & 16 are unused so we return null
+			tuple[17] = b10; // everything is base 10
+			v.addElement(tuple);
+		}
+		rs.close();
+		return connection.getResultSet(null, f, v, "OK", 1);
 	}
 
 	/*
@@ -3078,88 +3535,46 @@ public abstract class AbstractJdbc1DatabaseMetaData
 	// Implementation note: This is required for Borland's JBuilder to work
 	public java.sql.ResultSet getIndexInfo(String catalog, String schema, String tableName, boolean unique, boolean approximate) throws SQLException
 	{
-		Field f[] = new Field[13];
-		java.sql.ResultSet r;	// ResultSet for the SQL query that we need to do
-		Vector v = new Vector();		// The new ResultSet tuple stuff
-
-		f[0] = new Field(connection, "TABLE_CAT", iVarcharOid, NAME_SIZE);
-		f[1] = new Field(connection, "TABLE_SCHEM", iVarcharOid, NAME_SIZE);
-		f[2] = new Field(connection, "TABLE_NAME", iVarcharOid, NAME_SIZE);
-		f[3] = new Field(connection, "NON_UNIQUE", iBoolOid, 1);
-		f[4] = new Field(connection, "INDEX_QUALIFIER", iVarcharOid, NAME_SIZE);
-		f[5] = new Field(connection, "INDEX_NAME", iVarcharOid, NAME_SIZE);
-		f[6] = new Field(connection, "TYPE", iInt2Oid, 2);
-		f[7] = new Field(connection, "ORDINAL_POSITION", iInt2Oid, 2);
-		f[8] = new Field(connection, "COLUMN_NAME", iVarcharOid, NAME_SIZE);
-		f[9] = new Field(connection, "ASC_OR_DESC", iVarcharOid, NAME_SIZE);
-		f[10] = new Field(connection, "CARDINALITY", iInt4Oid, 4);
-		f[11] = new Field(connection, "PAGES", iInt4Oid, 4);
-		f[12] = new Field(connection, "FILTER_CONDITION", iVarcharOid, NAME_SIZE);
-
-		r = connection.ExecSQL("select " +
-							   "c.relname, " +
-							   "x.indisunique, " +
-							   "i.relname, " +
-							   "x.indisclustered, " +
-							   "a.amname, " +
-							   "x.indkey, " +
-							   "c.reltuples, " +
-							   "c.relpages, " +
-							   "x.indexrelid " +
-							   "FROM pg_index x, pg_class c, pg_class i, pg_am a " +
-							   "WHERE ((c.relname = '" + tableName.toLowerCase() + "') " +
-							   " AND (c.oid = x.indrelid) " +
-							   " AND (i.oid = x.indexrelid) " +
-							   " AND (i.relam = a.oid)) " +
-							   "ORDER BY x.indisunique DESC, " +
-							   " x.indisclustered, a.amname, i.relname");
-		while (r.next())
-		{
-			// indkey is an array of column ordinals (integers).  In the JDBC
-			// interface, this has to be separated out into a separate
-			// tuple for each indexed column.  Also, getArray() is not yet
-			// implemented for Postgres JDBC, so we parse by hand.
-			String columnOrdinalString = r.getString(6);
-			StringTokenizer stok = new StringTokenizer(columnOrdinalString);
-			int [] columnOrdinals = new int[stok.countTokens()];
-			int o = 0;
-			while (stok.hasMoreTokens())
-			{
-				columnOrdinals[o++] = Integer.parseInt(stok.nextToken());
-			}
-			java.sql.ResultSet columnNameRS = connection.ExecSQL("select a.attname FROM pg_attribute a WHERE a.attrelid = " + r.getInt(9));
-			for (int i = 0; i < columnOrdinals.length; i++)
-			{
-				byte [] [] tuple = new byte [13] [];
-				tuple[0] = "".getBytes();
-				tuple[1] = "".getBytes();
-				tuple[2] = r.getBytes(1);
-				tuple[3] = r.getBoolean(2) ? "false".getBytes() : "true".getBytes();
-				tuple[4] = null;
-				tuple[5] = r.getBytes(3);
-				tuple[6] = r.getBoolean(4) ?
-						   Integer.toString(java.sql.DatabaseMetaData.tableIndexClustered).getBytes() :
-						   r.getString(5).equals("hash") ?
-						   Integer.toString(java.sql.DatabaseMetaData.tableIndexHashed).getBytes() :
-						   Integer.toString(java.sql.DatabaseMetaData.tableIndexOther).getBytes();
-				tuple[7] = Integer.toString(i + 1).getBytes();
-				if (columnNameRS.next())
-				{
-					tuple[8] = columnNameRS.getBytes(1);
-				}
-				else
-				{
-					tuple[8] = "".getBytes();
-				}
-				tuple[9] = null;  // sort sequence ???
-				tuple[10] = r.getBytes(7);	// inexact
-				tuple[11] = r.getBytes(8);
-				tuple[12] = null;
-				v.addElement(tuple);
+		String select;
+		String from;
+		String where = "";
+		if (connection.haveMinimumServerVersion("7.3")) {
+			select = "SELECT NULL AS TABLE_CAT, n.nspname AS TABLE_SCHEM, ";
+			from = " FROM pg_catalog.pg_namespace n, pg_catalog.pg_class ct, pg_catalog.pg_class ci, pg_catalog.pg_index i, pg_catalog.pg_attribute a, pg_catalog.pg_am am ";
+			where = " AND n.oid = ct.relnamespace ";
+			if (schema != null && ! "".equals(schema)) {
+				where += " AND n.nspname = '"+escapeQuotes(schema.toLowerCase())+"' ";
 			}
+		} else {
+			select = "SELECT NULL AS TABLE_CAT, NULL AS TABLE_SCHEM, ";
+			from = " FROM pg_class ct, pg_class ci, pg_index i, pg_attribute a, pg_am am ";
 		}
 
-		return connection.getResultSet(null, f, v, "OK", 1);
+		String sql = select+
+			" ct.relname AS TABLE_NAME, NOT i.indisunique AS NON_UNIQUE, NULL AS INDEX_QUALIFIER, ci.relname AS INDEX_NAME, "+
+			" CASE i.indisclustered "+
+			" WHEN true THEN "+java.sql.DatabaseMetaData.tableIndexClustered+
+			" ELSE CASE am.amname "+
+			"	WHEN 'hash' THEN "+java.sql.DatabaseMetaData.tableIndexHashed+
+			"	ELSE "+java.sql.DatabaseMetaData.tableIndexOther+
+			"	END "+
+			" END AS TYPE, "+
+			" a.attnum AS ORDINAL_POSITION, "+
+			" a.attname AS COLUMN_NAME, "+
+			" NULL AS ASC_OR_DESC, "+
+			" ci.reltuples AS CARDINALITY, "+
+			" ci.relpages AS PAGES, "+
+			" NULL AS FILTER_CONDITION "+
+			from+
+			" WHERE ct.oid=i.indrelid AND ci.oid=i.indexrelid AND a.attrelid=ci.oid AND ci.relam=am.oid "+
+			where+
+			" AND ct.relname = '"+escapeQuotes(tableName.toLowerCase())+"' ";
+
+		if (unique) {
+			sql += " AND i.indisunique ";
+		}
+		sql += " ORDER BY NON_UNIQUE, TYPE, INDEX_NAME ";
+		return connection.createStatement().executeQuery(sql);
 	}
 
 }
diff --git a/src/interfaces/jdbc/org/postgresql/jdbc1/AbstractJdbc1Statement.java b/src/interfaces/jdbc/org/postgresql/jdbc1/AbstractJdbc1Statement.java
index dc9bafd2bbe..98472bd4833 100644
--- a/src/interfaces/jdbc/org/postgresql/jdbc1/AbstractJdbc1Statement.java
+++ b/src/interfaces/jdbc/org/postgresql/jdbc1/AbstractJdbc1Statement.java
@@ -8,7 +8,7 @@ import java.util.Vector;
 import org.postgresql.largeobject.*;
 import org.postgresql.util.*;
 
-/* $Header: /cvsroot/pgsql/src/interfaces/jdbc/org/postgresql/jdbc1/Attic/AbstractJdbc1Statement.java,v 1.10 2002/09/14 03:52:56 barry Exp $
+/* $Header: /cvsroot/pgsql/src/interfaces/jdbc/org/postgresql/jdbc1/Attic/AbstractJdbc1Statement.java,v 1.11 2002/10/01 00:39:02 davec Exp $
  * This class defines methods of the jdbc1 specification.  This class is
  * extended by org.postgresql.jdbc2.AbstractJdbc2Statement which adds the jdbc2
  * methods.  The real Statement class (for jdbc1) is org.postgresql.jdbc1.Jdbc1Statement
@@ -1757,8 +1757,8 @@ public abstract class AbstractJdbc1Statement implements org.postgresql.PGStateme
 	 */
 	private void setSerialize(int parameterIndex, long x, String classname) throws SQLException
 	{
-		// converts . to _, toLowerCase, and ensures length<32
-		String tablename = Serialize.toPostgreSQL( classname );
+		// converts . to _, toLowerCase, and ensures length < max name length
+		String tablename = Serialize.toPostgreSQL((java.sql.Connection)connection, classname );
 		DriverManager.println("setSerialize: setting " + x + "::" + tablename );
 
 		// OID reference to tablerow-type must be cast like:  <oid>::<tablename>
diff --git a/src/interfaces/jdbc/org/postgresql/largeobject/LargeObjectManager.java b/src/interfaces/jdbc/org/postgresql/largeobject/LargeObjectManager.java
index a57c01cd232..b2bbebc949c 100644
--- a/src/interfaces/jdbc/org/postgresql/largeobject/LargeObjectManager.java
+++ b/src/interfaces/jdbc/org/postgresql/largeobject/LargeObjectManager.java
@@ -103,15 +103,24 @@ public class LargeObjectManager
 		//
 		// This is an example of Fastpath.addFunctions();
 		//
-		ResultSet res = conn.createStatement().executeQuery("select proname, oid from pg_proc" +
-						" where proname = 'lo_open'" +
-						"    or proname = 'lo_close'" +
-						"    or proname = 'lo_creat'" +
-						"    or proname = 'lo_unlink'" +
-						"    or proname = 'lo_lseek'" +
-						"    or proname = 'lo_tell'" +
-						"    or proname = 'loread'" +
-						"    or proname = 'lowrite'");
+		String sql;
+		if (conn.getMetaData().supportsSchemasInTableDefinitions()) {
+			sql = "SELECT p.proname,p.oid "+
+				" FROM pg_catalog.pg_proc p, pg_catalog.pg_namespace n "+
+				" WHERE p.pronamespace=n.oid AND n.nspname='pg_catalog' AND ";
+		} else {
+			sql = "SELECT proname,oid FROM pg_proc WHERE ";
+		}
+		sql += " proname = 'lo_open'" +
+			" or proname = 'lo_close'" +
+			" or proname = 'lo_creat'" +
+			" or proname = 'lo_unlink'" +
+			" or proname = 'lo_lseek'" +
+			" or proname = 'lo_tell'" +
+			" or proname = 'loread'" +
+			" or proname = 'lowrite'";
+
+		ResultSet res = conn.createStatement().executeQuery(sql);
 
 		if (res == null)
 			throw new PSQLException("postgresql.lo.init");
diff --git a/src/interfaces/jdbc/org/postgresql/test/TestUtil.java b/src/interfaces/jdbc/org/postgresql/test/TestUtil.java
index fc8dc70deb0..930215ed380 100644
--- a/src/interfaces/jdbc/org/postgresql/test/TestUtil.java
+++ b/src/interfaces/jdbc/org/postgresql/test/TestUtil.java
@@ -110,7 +110,7 @@ public class TestUtil
 			try
 			{
 				String sql = "DROP TABLE " + table;
-				if (con instanceof org.postgresql.jdbc1.AbstractJdbc1Connection && ((org.postgresql.jdbc1.AbstractJdbc1Connection)con).haveMinimumServerVersion("7.3")) {
+				if (haveMinimumServerVersion(con,"7.3")) {
 					sql += " CASCADE ";
 				}
 				stmt.executeUpdate(sql);
@@ -190,4 +190,41 @@ public class TestUtil
 		String s = "0000000000".substring(0, l) + Integer.toString(v);
 		return s.substring(s.length() - l);
 	}
+
+	/**
+	 * Determine if the given connection is connected to a server with
+	 * a version of at least the given version.
+	 * This is convenient because we are working with a java.sql.Connection,
+	 * not an Postgres connection.
+	 */
+	public static boolean haveMinimumServerVersion(Connection con, String version) throws SQLException {
+		if (con instanceof org.postgresql.jdbc1.AbstractJdbc1Connection) {
+			return ((org.postgresql.jdbc1.AbstractJdbc1Connection)con).haveMinimumServerVersion(version);
+		}
+		return false;
+	}
+
+	/**
+	 * Print a ResultSet to System.out.
+	 * This is useful for debugging tests.
+	 */
+	public static void printResultSet(ResultSet rs) throws SQLException {
+		ResultSetMetaData rsmd = rs.getMetaData();
+		for (int i=1; i<=rsmd.getColumnCount(); i++) {
+			if (i != 1) {
+				System.out.print(", ");
+			}
+			System.out.print(rsmd.getColumnName(i));
+		}
+		System.out.println();
+		while (rs.next()) {
+			for (int i=1; i<=rsmd.getColumnCount(); i++) {
+				if (i != 1) {
+					System.out.print(", ");
+				}
+				System.out.print(rs.getString(i));
+			}
+			System.out.println();
+		}
+	}
 }
diff --git a/src/interfaces/jdbc/org/postgresql/test/jdbc2/DatabaseMetaDataTest.java b/src/interfaces/jdbc/org/postgresql/test/jdbc2/DatabaseMetaDataTest.java
index 0d1a962049a..4c46509c7c1 100644
--- a/src/interfaces/jdbc/org/postgresql/test/jdbc2/DatabaseMetaDataTest.java
+++ b/src/interfaces/jdbc/org/postgresql/test/jdbc2/DatabaseMetaDataTest.java
@@ -9,7 +9,7 @@ import java.sql.*;
  *
  * PS: Do you know how difficult it is to type on a train? ;-)
  *
- * $Id: DatabaseMetaDataTest.java,v 1.14 2002/09/11 05:38:45 barry Exp $
+ * $Id: DatabaseMetaDataTest.java,v 1.15 2002/10/01 00:39:02 davec Exp $
  */
 
 public class DatabaseMetaDataTest extends TestCase
@@ -35,10 +35,8 @@ public class DatabaseMetaDataTest extends TestCase
 
 		TestUtil.closeDB( con );
 	}
-	/*
-	 * The spec says this may return null, but we always do!
-	 */
-	public void testGetMetaData()
+
+	public void testTables()
 	{
 		try
 		{
@@ -50,7 +48,8 @@ public class DatabaseMetaDataTest extends TestCase
 			assertTrue( rs.next() );
 			String tableName = rs.getString("TABLE_NAME");
 			assertTrue( tableName.equals("testmetadata") );
-
+			String tableType = rs.getString("TABLE_TYPE");
+			assertTrue( tableType.equals("TABLE") );
 			rs.close();
 
 			rs = dbmd.getColumns("", "", "test%", "%" );
@@ -76,162 +75,6 @@ public class DatabaseMetaDataTest extends TestCase
 		}
 	}
 
-	/*
-	 * Test default capabilities
-	 */
-	public void testCapabilities()
-	{
-		try
-		{
-
-			DatabaseMetaData dbmd = con.getMetaData();
-			assertNotNull(dbmd);
-
-			assertTrue(dbmd.allProceduresAreCallable());
-			assertTrue(dbmd.allTablesAreSelectable()); // not true all the time
-
-			// This should always be false for postgresql (at least for 7.x)
-			assertTrue(!dbmd.isReadOnly());
-
-			// does the backend support this yet? The protocol does...
-			assertTrue(!dbmd.supportsMultipleResultSets());
-
-			// yes, as multiple backends can have transactions open
-			assertTrue(dbmd.supportsMultipleTransactions());
-
-			assertTrue(dbmd.supportsMinimumSQLGrammar());
-			assertTrue(!dbmd.supportsCoreSQLGrammar());
-			assertTrue(!dbmd.supportsExtendedSQLGrammar());
-			if (((org.postgresql.jdbc1.AbstractJdbc1Connection)con).haveMinimumServerVersion("7.3"))
-				assertTrue(dbmd.supportsANSI92EntryLevelSQL());
-			else
-				assertTrue(!dbmd.supportsANSI92EntryLevelSQL());
-			assertTrue(!dbmd.supportsANSI92IntermediateSQL());
-			assertTrue(!dbmd.supportsANSI92FullSQL());
-
-			assertTrue(!dbmd.supportsIntegrityEnhancementFacility());
-
-		}
-		catch (SQLException ex)
-		{
-			fail(ex.getMessage());
-		}
-	}
-
-
-	public void testJoins()
-	{
-		try
-		{
-
-			DatabaseMetaData dbmd = con.getMetaData();
-			assertNotNull(dbmd);
-
-			assertTrue(dbmd.supportsOuterJoins());
-			assertTrue(dbmd.supportsFullOuterJoins());
-			assertTrue(dbmd.supportsLimitedOuterJoins());
-
-		}
-		catch (SQLException ex)
-		{
-			fail(ex.getMessage());
-		}
-	}
-
-	public void testCursors()
-	{
-		try
-		{
-
-			DatabaseMetaData dbmd = con.getMetaData();
-			assertNotNull(dbmd);
-
-			assertTrue(!dbmd.supportsPositionedDelete());
-			assertTrue(!dbmd.supportsPositionedUpdate());
-
-		}
-		catch (SQLException ex)
-		{
-			fail(ex.getMessage());
-		}
-	}
-
-	public void testNulls()
-	{
-		try
-		{
-
-			DatabaseMetaData dbmd = con.getMetaData();
-			assertNotNull(dbmd);
-
-			// We need to type cast the connection to get access to the
-			// PostgreSQL-specific method haveMinimumServerVersion().
-			// This is not available through the java.sql.Connection interface.
-			assertTrue( con instanceof org.postgresql.PGConnection );
-
-			assertTrue(!dbmd.nullsAreSortedAtStart());
-			assertTrue( dbmd.nullsAreSortedAtEnd() !=
-						((org.postgresql.jdbc2.AbstractJdbc2Connection)con).haveMinimumServerVersion("7.2"));
-			assertTrue( dbmd.nullsAreSortedHigh() ==
-						((org.postgresql.jdbc2.AbstractJdbc2Connection)con).haveMinimumServerVersion("7.2"));
-			assertTrue(!dbmd.nullsAreSortedLow());
-
-			assertTrue(dbmd.nullPlusNonNullIsNull());
-
-			assertTrue(dbmd.supportsNonNullableColumns());
-
-		}
-		catch (SQLException ex)
-		{
-			fail(ex.getMessage());
-		}
-	}
-
-	public void testLocalFiles()
-	{
-		try
-		{
-
-			DatabaseMetaData dbmd = con.getMetaData();
-			assertNotNull(dbmd);
-
-			assertTrue(!dbmd.usesLocalFilePerTable());
-			assertTrue(!dbmd.usesLocalFiles());
-
-		}
-		catch (SQLException ex)
-		{
-			fail(ex.getMessage());
-		}
-	}
-
-	public void testIdentifiers()
-	{
-		try
-		{
-
-			DatabaseMetaData dbmd = con.getMetaData();
-			assertNotNull(dbmd);
-
-			assertTrue(!dbmd.supportsMixedCaseIdentifiers()); // always false
-			assertTrue(dbmd.supportsMixedCaseQuotedIdentifiers());	// always true
-
-			assertTrue(!dbmd.storesUpperCaseIdentifiers());   // always false
-			assertTrue(dbmd.storesLowerCaseIdentifiers());	  // always true
-			assertTrue(!dbmd.storesUpperCaseQuotedIdentifiers()); // always false
-			assertTrue(!dbmd.storesLowerCaseQuotedIdentifiers()); // always false
-			assertTrue(!dbmd.storesMixedCaseQuotedIdentifiers()); // always false
-
-			assertTrue(dbmd.getIdentifierQuoteString().equals("\""));
-
-
-		}
-		catch (SQLException ex)
-		{
-			fail(ex.getMessage());
-		}
-	}
-
 	public void testCrossReference()
 	{
 		try
@@ -264,7 +107,7 @@ public class DatabaseMetaDataTest extends TestCase
 				assertTrue( fkColumnName.equals( "m" ) || fkColumnName.equals( "n" ) ) ;
 
 				String fkName = rs.getString( "FK_NAME" );
-				if (((org.postgresql.jdbc1.AbstractJdbc1Connection)con1).haveMinimumServerVersion("7.3")) {
+				if (TestUtil.haveMinimumServerVersion(con1,"7.3")) {
 					assertTrue(fkName.startsWith("$1"));
 				} else {
 					assertTrue( fkName.startsWith( "<unnamed>") );
@@ -354,120 +197,217 @@ public class DatabaseMetaDataTest extends TestCase
 			fail(ex.getMessage());
 		}
 	}
-	public void testTables()
+
+	public void testColumns()
 	{
+		// At the moment just test that no exceptions are thrown KJ
 		try
 		{
-
 			DatabaseMetaData dbmd = con.getMetaData();
 			assertNotNull(dbmd);
-
-			// we can add columns
-			assertTrue(dbmd.supportsAlterTableWithAddColumn());
-
-			// we can't drop columns (yet)
-			assertTrue(!dbmd.supportsAlterTableWithDropColumn());
-
+			ResultSet rs = dbmd.getColumns(null,null,"pg_class",null);
+			rs.close();
+		} catch (SQLException sqle) {
+			sqle.printStackTrace();
+			fail(sqle.getMessage());
 		}
-		catch (SQLException ex)
+	}
+
+	public void testColumnPrivileges()
+	{
+		// At the moment just test that no exceptions are thrown KJ
+		try
 		{
-			fail(ex.getMessage());
+			DatabaseMetaData dbmd = con.getMetaData();
+			assertNotNull(dbmd);
+			ResultSet rs = dbmd.getColumnPrivileges(null,null,"pg_statistic",null);
+			rs.close();
+		} catch (SQLException sqle) {
+			sqle.printStackTrace();
+			fail(sqle.getMessage());
 		}
 	}
 
-	public void testSelect()
+	public void testTablePrivileges()
 	{
+		// At the moment just test that no exceptions are thrown KJ
 		try
 		{
-
 			DatabaseMetaData dbmd = con.getMetaData();
 			assertNotNull(dbmd);
-
-			// yes we can?: SELECT col a FROM a;
-			assertTrue(dbmd.supportsColumnAliasing());
-
-			// yes we can have expressions in ORDERBY
-			assertTrue(dbmd.supportsExpressionsInOrderBy());
-
-			// Yes, an ORDER BY clause can contain columns that are not in the
-			// SELECT clause.
-			assertTrue(dbmd.supportsOrderByUnrelated());
-
-			assertTrue(dbmd.supportsGroupBy());
-			assertTrue(dbmd.supportsGroupByUnrelated());
-			assertTrue(dbmd.supportsGroupByBeyondSelect()); // needs checking
-
+			ResultSet rs = dbmd.getTablePrivileges(null,null,"grantme");
+			rs.close();
+		} catch (SQLException sqle) {
+			sqle.printStackTrace();
+			fail(sqle.getMessage());
 		}
-		catch (SQLException ex)
+	}
+
+	public void testPrimaryKeys()
+	{
+		// At the moment just test that no exceptions are thrown KJ
+		try
 		{
-			fail(ex.getMessage());
+			DatabaseMetaData dbmd = con.getMetaData();
+			assertNotNull(dbmd);
+			ResultSet rs = dbmd.getPrimaryKeys(null,null,"pg_class");
+			rs.close();
+		} catch (SQLException sqle) {
+			sqle.printStackTrace();
+			fail(sqle.getMessage());
 		}
 	}
 
-	public void testDBParams()
+	public void testIndexInfo()
 	{
+		// At the moment just test that no exceptions are thrown KJ
 		try
 		{
-
 			DatabaseMetaData dbmd = con.getMetaData();
 			assertNotNull(dbmd);
-
-			assertTrue(dbmd.getURL().equals(TestUtil.getURL()));
-			assertTrue(dbmd.getUserName().equals(TestUtil.getUser()));
-
+			ResultSet rs = dbmd.getIndexInfo(null,null,"pg_class",false,false);
+			rs.close();
+		} catch (SQLException sqle) {
+			sqle.printStackTrace();
+			fail(sqle.getMessage());
 		}
-		catch (SQLException ex)
+	}
+
+	public void testTableTypes()
+	{
+		// At the moment just test that no exceptions are thrown KJ
+		try
 		{
-			fail(ex.getMessage());
+			DatabaseMetaData dbmd = con.getMetaData();
+			assertNotNull(dbmd);
+			ResultSet rs = dbmd.getTableTypes();
+			rs.close();
+		} catch (SQLException sqle) {
+			sqle.printStackTrace();
+			fail(sqle.getMessage());
 		}
 	}
 
-	public void testDbProductDetails()
+	public void testProcedureColumns()
 	{
+		// At the moment just test that no exceptions are thrown KJ
 		try
 		{
-			assertTrue(con instanceof org.postgresql.PGConnection);
-			org.postgresql.jdbc2.AbstractJdbc2Connection pc = (org.postgresql.jdbc2.AbstractJdbc2Connection) con;
-
 			DatabaseMetaData dbmd = con.getMetaData();
 			assertNotNull(dbmd);
-
-			assertTrue(dbmd.getDatabaseProductName().equals("PostgreSQL"));
-			//The test below doesn't make sense to me, it tests that
-			//the version of the driver = the version of the database it is connected to
-			//since the driver should be backwardly compatible this test is commented out
-			//assertTrue(dbmd.getDatabaseProductVersion().startsWith(
-			//		   Integer.toString(pc.getDriver().getMajorVersion())
-			//		   + "."
-			//		   + Integer.toString(pc.getDriver().getMinorVersion())));
-			assertTrue(dbmd.getDriverName().equals("PostgreSQL Native Driver"));
-
+			ResultSet rs = dbmd.getProcedureColumns(null,null,null,null);
+			rs.close();
+		} catch (SQLException sqle) {
+			sqle.printStackTrace();
+			fail(sqle.getMessage());
 		}
-		catch (SQLException ex)
+	}
+
+	public void testVersionColumns()
+	{
+		// At the moment just test that no exceptions are thrown KJ
+		try
 		{
-			fail(ex.getMessage());
+			DatabaseMetaData dbmd = con.getMetaData();
+			assertNotNull(dbmd);
+			ResultSet rs = dbmd.getVersionColumns(null,null,"pg_class");
+			rs.close();
+		} catch (SQLException sqle) {
+			fail(sqle.getMessage());
 		}
 	}
 
-	public void testDriverVersioning()
+	public void testBestRowIdentifier()
 	{
+		// At the moment just test that no exceptions are thrown KJ
 		try
 		{
-			assertTrue(con instanceof org.postgresql.PGConnection);
-			org.postgresql.jdbc2.AbstractJdbc2Connection pc = (org.postgresql.jdbc2.AbstractJdbc2Connection) con;
-
 			DatabaseMetaData dbmd = con.getMetaData();
 			assertNotNull(dbmd);
+			ResultSet rs = dbmd.getBestRowIdentifier(null,null,"pg_type",dbmd.bestRowSession,false);
+			rs.close();
+		} catch (SQLException sqle) {
+			fail(sqle.getMessage());
+		}
+	}
 
-			assertTrue(dbmd.getDriverVersion().equals(pc.getDriver().getVersion()));
-			assertTrue(dbmd.getDriverMajorVersion() == pc.getDriver().getMajorVersion());
-			assertTrue(dbmd.getDriverMinorVersion() == pc.getDriver().getMinorVersion());
-
+	public void testProcedures()
+	{
+		// At the moment just test that no exceptions are thrown KJ
+		try
+		{
+			DatabaseMetaData dbmd = con.getMetaData();
+			assertNotNull(dbmd);
+			ResultSet rs = dbmd.getProcedures(null,null,null);
+			rs.close();
+		} catch (SQLException sqle) {
+			fail(sqle.getMessage());
+		}
+	}
 
+	public void testCatalogs()
+	{
+		try
+		{
+			DatabaseMetaData dbmd = con.getMetaData();
+			assertNotNull(dbmd);
+			ResultSet rs = dbmd.getCatalogs();
+			boolean foundTemplate0 = false;
+			boolean foundTemplate1 = false;
+			while(rs.next()) {
+				String database = rs.getString("TABLE_CAT");
+				if ("template0".equals(database)) {
+					foundTemplate0 = true;
+				} else if ("template1".equals(database)) {
+					foundTemplate1 = true;
+				}
+			}
+			rs.close();
+			assertTrue(foundTemplate0);
+			assertTrue(foundTemplate1);
+		} catch(SQLException sqle) {
+			fail(sqle.getMessage());
 		}
-		catch (SQLException ex)
+	}
+
+	public void testSchemas()
+	{
+		try
 		{
-			fail(ex.getMessage());
+			DatabaseMetaData dbmd = con.getMetaData();
+			assertNotNull(dbmd);
+
+			ResultSet rs = dbmd.getSchemas();
+			boolean foundPublic = false;
+			boolean foundEmpty = false;
+			boolean foundPGCatalog = false;
+			int count;
+		
+			for(count=0; rs.next(); count++) {
+				String schema = rs.getString("TABLE_SCHEM");
+				if ("public".equals(schema)) {
+					foundPublic = true;
+				} else if ("".equals(schema)) {
+					foundEmpty = true;
+				} else if ("pg_catalog".equals(schema)) {
+					foundPGCatalog = true;
+				}
+			}
+			rs.close();
+			if (TestUtil.haveMinimumServerVersion(con,"7.3")) {
+				assertTrue(count >= 2);
+				assertTrue(foundPublic);
+				assertTrue(foundPGCatalog);
+				assertTrue(!foundEmpty);
+			} else {
+				assertEquals(count,1);
+				assertTrue(foundEmpty);
+				assertTrue(!foundPublic);
+				assertTrue(!foundPGCatalog);
+			}
+		} catch (SQLException sqle) {
+			fail(sqle.getMessage());
 		}
 	}
+
 }
diff --git a/src/interfaces/jdbc/org/postgresql/test/jdbc2/Jdbc2TestSuite.java b/src/interfaces/jdbc/org/postgresql/test/jdbc2/Jdbc2TestSuite.java
index 2a117cecf03..1d0b7c3a36a 100644
--- a/src/interfaces/jdbc/org/postgresql/test/jdbc2/Jdbc2TestSuite.java
+++ b/src/interfaces/jdbc/org/postgresql/test/jdbc2/Jdbc2TestSuite.java
@@ -25,8 +25,7 @@ public class Jdbc2TestSuite extends TestSuite
 		// complexity.
 
 		// ANTTest should be first as it ensures that test parameters are
-		// being sent to the suite. It also initialises the database (if required)
-		// with some simple global tables (will make each testcase use its own later).
+		// being sent to the suite.
 		//
 		suite.addTestSuite(ANTTest.class);
 
@@ -34,6 +33,7 @@ public class Jdbc2TestSuite extends TestSuite
 		suite.addTestSuite(DriverTest.class);
 		suite.addTestSuite(ConnectionTest.class);
 		suite.addTestSuite(DatabaseMetaDataTest.class);
+		suite.addTestSuite(DatabaseMetaDataPropertiesTest.class);
 		suite.addTestSuite(EncodingTest.class);
 
 		// Connectivity/Protocols
@@ -48,13 +48,12 @@ public class Jdbc2TestSuite extends TestSuite
 
 		// PreparedStatement
 
-        // ServerSide Prepared Statements
-        suite.addTestSuite(ServerPreparedStmtTest.class);
+		// ServerSide Prepared Statements
+		suite.addTestSuite(ServerPreparedStmtTest.class);
 
 		// BatchExecute
 		suite.addTestSuite(BatchExecuteTest.class);
 
-		// MetaData
 
 		// Other misc tests, based on previous problems users have had or specific
 		// features some applications require.
@@ -63,6 +62,8 @@ public class Jdbc2TestSuite extends TestSuite
 
 		// Fastpath/LargeObject
 		suite.addTestSuite(BlobTest.class);
+
+		suite.addTestSuite(SerializeTest.class);
 		suite.addTestSuite(UpdateableResultTest.class );
 
 		suite.addTestSuite(CallableStmtTest.class );
diff --git a/src/interfaces/jdbc/org/postgresql/test/jdbc2/ServerPreparedStmtTest.java b/src/interfaces/jdbc/org/postgresql/test/jdbc2/ServerPreparedStmtTest.java
index 7617603857e..d374d226648 100644
--- a/src/interfaces/jdbc/org/postgresql/test/jdbc2/ServerPreparedStmtTest.java
+++ b/src/interfaces/jdbc/org/postgresql/test/jdbc2/ServerPreparedStmtTest.java
@@ -45,7 +45,7 @@ public class ServerPreparedStmtTest extends TestCase
 	{
 		PreparedStatement pstmt = con.prepareStatement("SELECT * FROM testsps WHERE id = 2");
         ((PGStatement)pstmt).setUseServerPrepare(true);
-        if (((org.postgresql.jdbc1.AbstractJdbc1Connection)con).haveMinimumServerVersion("7.3")) {
+        if (TestUtil.haveMinimumServerVersion(con,"7.3")) {
 			assertTrue(((PGStatement)pstmt).isUseServerPrepare());
 		} else {
 			assertTrue(!((PGStatement)pstmt).isUseServerPrepare());
@@ -85,7 +85,7 @@ public class ServerPreparedStmtTest extends TestCase
 	{
 		PreparedStatement pstmt = con.prepareStatement("SELECT * FROM testsps WHERE id = ?");
         ((PGStatement)pstmt).setUseServerPrepare(true);
-        if (((org.postgresql.jdbc1.AbstractJdbc1Connection)con).haveMinimumServerVersion("7.3")) {
+        if (TestUtil.haveMinimumServerVersion(con,"7.3")) {
 			assertTrue(((PGStatement)pstmt).isUseServerPrepare());
 		} else {
 			assertTrue(!((PGStatement)pstmt).isUseServerPrepare());
@@ -126,7 +126,7 @@ public class ServerPreparedStmtTest extends TestCase
 	{
 		PreparedStatement pstmt = con.prepareStatement("SELECT * FROM testsps WHERE id = ? or id = ?");
         ((PGStatement)pstmt).setUseServerPrepare(true);
-        if (((org.postgresql.jdbc1.AbstractJdbc1Connection)con).haveMinimumServerVersion("7.3")) {
+        if (TestUtil.haveMinimumServerVersion(con,"7.3")) {
 			assertTrue(((PGStatement)pstmt).isUseServerPrepare());
 		} else {
 			assertTrue(!((PGStatement)pstmt).isUseServerPrepare());
diff --git a/src/interfaces/jdbc/org/postgresql/util/Serialize.java b/src/interfaces/jdbc/org/postgresql/util/Serialize.java
index 57c77f0b9f9..48d18ef02ee 100644
--- a/src/interfaces/jdbc/org/postgresql/util/Serialize.java
+++ b/src/interfaces/jdbc/org/postgresql/util/Serialize.java
@@ -124,14 +124,14 @@ public class Serialize
 	 * This creates an instance that can be used to serialize or deserialize
 	 * a Java object from a PostgreSQL table.
 	 */
-	public Serialize(Connection c, String type) throws SQLException
+	public Serialize(Connection conn, String type) throws SQLException
 	{
 		try
 		{
-			conn = c;
+			this.conn = conn;
 			if (Driver.logDebug)
 				Driver.debug("Serialize: initializing instance for type: " + type);
-			tableName = toPostgreSQL(type);
+			tableName = toPostgreSQL(conn,type);
 			className = type;
 			ourClass = Class.forName(className);
 		}
@@ -144,7 +144,14 @@ public class Serialize
 
 		// Second check, the type must be a table
 		boolean status = false;
-		ResultSet rs = ((org.postgresql.jdbc1.AbstractJdbc1Connection)conn).ExecSQL("select typname from pg_type,pg_class where typname=relname and typname='" + tableName + "'");
+		String sql;
+		if (conn.getMetaData().supportsSchemasInTableDefinitions()) {
+			sql = "SELECT 1 FROM pg_catalog.pg_type t, pg_catalog.pg_class c WHERE t.typrelid=c.oid AND c.relkind='r' AND t.typname='" + tableName + "' AND pg_table_is_visible(c.oid) ";
+		} else {
+			sql = "SELECT 1 FROM pg_type t, pg_class c WHERE t.typrelid=c.oid AND c.relkind='r' AND t.typname='"+tableName+"'";
+		}
+
+		ResultSet rs = conn.createStatement().executeQuery(sql);
 		if (rs != null)
 		{
 			if (rs.next())
@@ -187,7 +194,7 @@ public class Serialize
 	 * @return Object relating to oid
 	 * @exception SQLException on error
 	 */
-	public Object fetch(int oid) throws SQLException
+	public Object fetch(long oid) throws SQLException
 	{
 		try
 		{
@@ -228,7 +235,7 @@ public class Serialize
 
 			if (Driver.logDebug)
 				Driver.debug("Serialize.fetch: " + sb.toString());
-			ResultSet rs = ((org.postgresql.jdbc1.AbstractJdbc1Connection)conn).ExecSQL(sb.toString());
+			ResultSet rs = conn.createStatement().executeQuery(sb.toString());
 
 			if (rs != null)
 			{
@@ -493,15 +500,22 @@ public class Serialize
 	 * @param o Class to base table on
 	 * @exception SQLException on error
 	 */
-	public static void create(Connection con, Class c) throws SQLException
+	public static void create(Connection conn, Class c) throws SQLException
 	{
 		if (c.isInterface())
 			throw new PSQLException("postgresql.serial.interface");
 
 		// See if the table exists
-		String tableName = toPostgreSQL(c.getName());
+		String tableName = toPostgreSQL(conn,c.getName());
 
-		ResultSet rs = ((org.postgresql.jdbc1.AbstractJdbc1Connection)con).ExecSQL("select relname from pg_class where relname = '" + tableName + "'");
+		String sql;
+		if (conn.getMetaData().supportsSchemasInTableDefinitions()) {
+			sql = "SELECT 1 FROM pg_catalog.pg_class WHERE relkind='r' AND relname='" + tableName + "' AND pg_table_is_visible(oid) ";
+		} else {
+			sql = "SELECT 1 FROM pg_class WHERE relkind='r' AND relname='"+tableName+"'";
+		}
+
+		ResultSet rs = conn.createStatement().executeQuery(sql);
 		if ( rs.next() )
 		{
 			if (Driver.logDebug)
@@ -549,8 +563,8 @@ public class Serialize
 						sb.append(tp[j][1]);
 					else
 					{
-						create(con, type);
-						sb.append(toPostgreSQL(n));
+						create(conn, type);
+						sb.append(toPostgreSQL(conn,n));
 					}
 				}
 			}
@@ -560,7 +574,7 @@ public class Serialize
 		// Now create the table
 		if (Driver.logDebug)
 			Driver.debug("Serialize.create: " + sb );
-		((org.postgresql.jdbc1.AbstractJdbc1Connection)con).ExecSQL(sb.toString());
+		conn.createStatement().executeUpdate(sb.toString());
 	}
 
 	// This is used to translate between Java primitives and PostgreSQL types.
@@ -582,35 +596,53 @@ public class Serialize
 											 {"byte", "int2"}
 										 };
 
-	/*
+	/**
 	 * This converts a Java Class name to a org.postgresql table, by replacing . with
 	 * _<p>
 	 *
 	 * Because of this, a Class name may not have _ in the name.<p>
 	 * Another limitation, is that the entire class name (including packages)
-	 * cannot be longer than 64 characters (a limit forced by PostgreSQL).
+	 * cannot be longer than the maximum table name length.
 	 *
+	 * @param con The database connection
 	 * @param name Class name
 	 * @return PostgreSQL table name
 	 * @exception SQLException on error
+	 * @since 7.3
 	 */
-	public static String toPostgreSQL(String name) throws SQLException
+	public static String toPostgreSQL(Connection con, String name) throws SQLException
 	{
+		DatabaseMetaData dbmd = con.getMetaData();
+		int maxNameLength = dbmd.getMaxTableNameLength();
+		return toPostgreSQL(maxNameLength,name);
+	}
+
+	/**
+	 * Convert a Java Class Name to an org.postgresql table name, by replacing .
+	 * with _ <p>
+	 *
+	 * @deprecated Replaced by toPostgresql(connection, name) in 7.3
+	 */
+	public static String toPostgreSQL(String name) throws SQLException {
+		return toPostgreSQL(31,name);
+	}
+
+	private static String toPostgreSQL(int maxNameLength, String name) throws SQLException {
+
 		name = name.toLowerCase();
 
 		if (name.indexOf("_") > -1)
 			throw new PSQLException("postgresql.serial.underscore");
 
-		// Postgres table names can only be 64 character long.
-		// Reserve 1 char, so allow only up to 63 chars.
+		// Postgres table names can only be so many characters long.
 		// If the full class name with package is too long
 		// then just use the class name. If the class name is
 		// too long throw an exception.
 		//
-		if ( name.length() > 63 )
+		if ( name.length() > maxNameLength )
 		{
 			name = name.substring(name.lastIndexOf(".") + 1);
-			if ( name.length() > 63 )
+			if ( name.length() > maxNameLength )
 				throw new PSQLException("postgresql.serial.namelength", name, new Integer(name.length()));
 		}
 		return name.replace('.', '_');
-- 
GitLab