Tuesday, August 1, 2017

PostgreSQL JDBC 4.1.4 driver released

The PostgreSQL JDBC team is pleased to announce the release of version 4.1.4.

Below are changes included since 42.1.1

Version 42.1.4 (2017-08-01)

Notable changes

  • Statements with non-zero fetchSize no longer require server-side named handle. This might cause issues when using old PostgreSQL versions (pre-8.4)+fetchSize+interleaved ResultSet processing combo. see issue 869

Version 42.1.3 (2017-07-14)

Notable changes
  • fixed NPE in PreparedStatement.executeBatch in case of empty batch (regression since 42.1.2) PR#867

Version 42.1.2 (2017-07-12)

Notable changes
  • Better logic for returning keyword detection. Previously, pgjdbc could be defeated by column names that contain returning, so pgjdbc failed to "return generated keys" as it considered statement as already having returning keyword PR#824 201daf1d
  • Replication API: fix issue #834 setting statusIntervalUpdate causes high CPU load PR#83559236b74
  • perf: use server-prepared statements for batch inserts when prepareThreshold>0. Note: this enables batch to use server-prepared from the first executeBatch() execution (previously it waited for prepareThreshold executeBatch() calls) abc3d9d7

Saturday, May 6, 2017

PostgreSQL JDBC driver 42.1.1 released

The JDBC development group has released the latest driver with the following notable changes

Notable changes
  • fix: data being trucated in setCharacterStream (the bug introduced in 42.0.0) PR#802
  • fix: calculation of lastReceiveLSN for logical replication PR#801
  • fix: make sure org.postgresql.Driver is loaded when accessing though DataSource interface #768
  • feat: support fetching a REF_CURSOR using getObject PR#809
  • note: there's no 42.1.0.jre6 due to infinity handling bug. Fixed in 42.1.1.jre6

  • fix: infinite dates might be corrupted when transferred in binary for certain JREs. For instance, 5881610-07-11 instead of infinity.

Friday, April 7, 2017

Trusted Languages in PostgreSQL ... finally with PL/Container

Today PL/Container has been released as open source under the BSD license.

PL/Container runs Python code (and hopefully R code when we can open source it) inside a Docker container. The server side code running inside GPDB communicates with the container using an RPC protocol very similar to the GPDB FE/BE protocol.

Of course there is some overhead but with complicated enough functions this overhead is overcome by the ability to throw more computing power at the problem.

     For example a complicated function that brute force computes the prime numbers below 10,000 runs in approx. 250ms, actually runs faster.


Implementing SCRAM in the JDBC driver

PostgreSQL 10.0 has a fancy new authentication mechanism; Salted Challenge Response Authentication Method or SCRAM for short.

SCRAM will be an alternative to the somewhat controversial MD5 passwords currently being used by PostgreSQL, Michael Paquier has more to say on that here

This post isn't so much about SCRAM but the implementation of it in the JDBC driver. Last year in Ottawa at pgcon Alvaro from 8kdata stepped up and volunteered to write the Java implementation for the driver.

We've decided to enable this in Java 8+ versions of the driver only for a number of reasons the most important being that the cryptographic libraries required to implement this are only available in the JDK from version 8 and up.

Also factoring into the decision is the fact that SCRAM is only available in PostgreSQL 10.0+ and we are of the opinion that there will be very few people that will upgrade or use PostgreSQL 10.x without upgrading their JDK as well.

Thursday, March 9, 2017

Logical Decoding using the JDBC Driver

Logical Decoding


What is Logical Decoding

It's useful to understand what physical replication is in order to understand logical decoding.

Physical replication extends the functionality of recovery mode.  Write Ahead Logs are written to disk before the actual database. These files contain enough information to recreate the transaction in the event of a catastrophic shutdown

In the event of an emergency shutdown (power fail, OOM kill) when the server comes back online it will attempt to apply the outstanding WAL up to the point of the shutdown. This is referred to as recovery mode.

Physical replication takes advantage of this infrastructure built into the server. The standby is started in recovery mode and WAL created by the master are applied to the standby. How that occurs is beyond the scope but you can read about it here .

The interesting bit here is that we have a mechanism by which to access the changes in the heap without connecting to the database.

There are a few caveats though which is where Logical Decoding comes to the rescue. First; WAL's are binary and their format is not guaranteed to be stable (in other words they can change from version to version) and second they contain changes for every database in the server.

Logical decoding changes all of that by

  1. Providing changes for only one database per slot
  2. Defining an API which facilitates writing an output plugin to output the changes in any format you define.


Concepts of Logical Decoding

Above I mentioned two new concepts slots, and plugins

A slot is a stream of changes in a database. As previously mentioned logical decoding works on a single database. A slot represents a sequence of changes in that database. There can be more than one slot per database. The slot manages a set of changes sent over a particular stream such as which transaction is currently being streamed and which transaction has been acknowledged.

A plugin is a library which accepts the changes and decodes the changes into a format of your choosing. Plugins need to be compiled and installed before they can be utilized by a slot. 

Creating a slot with JDBC


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
public void createLogicalReplicationSlot(String slotName, String outputPlugin ) throws InterruptedException, SQLException, TimeoutException
    {
        //drop previous slot
        dropReplicationSlot(connection, slotName);

        try (PreparedStatement preparedStatement =
                     connection.prepareStatement(
                      "SELECT * FROM pg_create_logical_replication_slot(?, ?)") )
        {

            preparedStatement.setString(1, slotName);
            preparedStatement.setString(2, outputPlugin);
            try (ResultSet rs = preparedStatement.executeQuery())
            {
                while (rs.next())
                {
                    System.out.println("Slot Name: " + rs.getString(1));
                    System.out.println("Xlog Position: " + rs.getString(2));
                }
            }

        }
    }

This just calls the function pg_create_logical_replication_slot(slotname, plugin_name) which is the same as executing the SQL  "CREATE REPLICATION SLOT LOGICAL"

The function returns the slot name and the current xlog_position.

Note: before we create the slot we check to make sure there isn't an existing slot already running. If there is we use pg_terminate_backend() to terminate it.



 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
public void dropReplicationSlot(Connection connection, String slotName)
            throws SQLException, InterruptedException, TimeoutException
    {
        try (PreparedStatement preparedStatement = connection.prepareStatement(
                        "select pg_terminate_backend(active_pid) from pg_replication_slots "
                                + "where active = true and slot_name = ?"))
        {
            preparedStatement.setString(1, slotName);
            preparedStatement.execute();
        }

        waitStopReplicationSlot(connection, slotName);

        try (PreparedStatement preparedStatement = connection.prepareStatement("select pg_drop_replication_slot(slot_name) "
                            + "from pg_replication_slots where slot_name = ?")) {
            preparedStatement.setString(1, slotName);
            preparedStatement.execute();
        }
    }



So at this point we have a slot and we know the current xlog_location. In order to read the current xlog location, we provide a class in the postgresql driver which can decode the xlog location called org.postgresql.replication.LogicalSequenceNumber.

This class can be used to do things like 

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
 private LogSequenceNumber getCurrentLSN() throws SQLException
    {
        try (Statement st = connection.createStatement())
        {
            try (ResultSet rs = st.executeQuery("select "
                    + (((BaseConnection) connection).haveMinimumServerVersion(ServerVersion.v10)
                    ? "pg_current_wal_location()" : "pg_current_xlog_location()"))) {

                if (rs.next()) {
                    String lsn = rs.getString(1);
                    return LogSequenceNumber.valueOf(lsn);
                } else {
                    return LogSequenceNumber.INVALID_LSN;
                }
            }
        }
    }


The class also provides asLong(), equals(), and asString()


Before we can open a PGReplicationStream we need to create a connection capable of replication. Replication connections can only use the Simple Query protocol, as well as some other requirements.
The code looks like:


1
2
3
4
5
6
7
Connection openReplicationConnection() throws Exception {
        Properties properties = new Properties();
        PGProperty.ASSUME_MIN_SERVER_VERSION.set(properties, "9.4");
        PGProperty.REPLICATION.set(properties, "database");
        PGProperty.PREFER_QUERY_MODE.set(properties, "simple");
        return DriverManager.getConnection("jdbc:postgresql://localhost/test",properties);
    }

In order to read any changes in the database we can just open a PGReplicationStream and read from it: 

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
 public void receiveChangesOccursBeforStartReplication() throws Exception {
        PGConnection pgConnection = (PGConnection) replicationConnection;

        LogSequenceNumber lsn = getCurrentLSN();

        Statement st = connection.createStatement();
        st.execute("insert into test_logic_table(name) values('previous value')");
        st.close();

        PGReplicationStream stream =
                pgConnection
                        .getReplicationAPI()
                        .replicationStream()
                        .logical()
                        .withSlotName(SLOT_NAME)
                        .withStartPosition(lsn)
                        .withSlotOption("include-xids", true)
                        .withSlotOption("pretty-print",true)
                        .withSlotOption("skip-empty-xacts", true)
                        .withStatusInterval(20, TimeUnit.SECONDS)
                        .start();
        ByteBuffer buffer;
        while(true)
        {
            buffer = stream.readPending();
            if (buffer == null) {
                TimeUnit.MILLISECONDS.sleep(10L);
                continue;
            }

            System.out.println( toString(buffer));
            //feedback
            stream.setAppliedLSN(stream.getLastReceiveLSN());
            stream.setFlushedLSN(stream.getLastReceiveLSN());
        }

    }


this will output something like:

BEGIN 3547
table public.test_logic_table: INSERT: pk[integer]:28 name[character varying]:'previous value'
COMMIT 3547


Note we see the following information


  • Table name with schema "public.test_logic_table"
  • The command, one of INSERT, UPDATE, DELETE
  • The column name and type pk[integer] and value 28

So lets break down the above code and what it does:
  1. Create the replication stream with options, note the status interval. This number is chosen to be shorter than the wal_sender_timeout If the server does not receive a ping message from this client within the wal_sender timeout the server will consider this client to have crashed. A good value of statusInterval is wal_sender_timeout/3 
  2. Use readPending to read, this is non-blocking. 
  3. If we have data do something useful with it. In this case nothing useful, but the opportunities for Change Data Capture are endless
  4. Acknowledge that we have received the data. This bit is important as it tells the server that it is free to discard the WAL that captured the change.


In Summary logical replication is a game changer we can now do change data capture without using triggers. We have the ability to audit the database into another database/system without triggers! There are even a few complete database replication solutions which allow write-able slaves without triggers.


By way of attribution I shamelessly ripped off code and documentation from the JDBC documentation

Code for this can be found https://github.com/davecramer/LogicalDecode