Thursday, March 9, 2017

Logical Decoding using the JDBC Driver

Logical Decoding


What is Logical Decoding

It's useful to understand what physical replication is in order to understand logical decoding.

Physical replication extends the functionality of recovery mode.  Write Ahead Logs are written to disk before the actual database. These files contain enough information to recreate the transaction in the event of a catastrophic shutdown

In the event of an emergency shutdown (power fail, OOM kill) when the server comes back online it will attempt to apply the outstanding WAL up to the point of the shutdown. This is referred to as recovery mode.

Physical replication takes advantage of this infrastructure built into the server. The standby is started in recovery mode and WAL created by the master are applied to the standby. How that occurs is beyond the scope but you can read about it here .

The interesting bit here is that we have a mechanism by which to access the changes in the heap without connecting to the database.

There are a few caveats though which is where Logical Decoding comes to the rescue. First; WAL's are binary and their format is not guaranteed to be stable (in other words they can change from version to version) and second they contain changes for every database in the server.

Logical decoding changes all of that by

  1. Providing changes for only one database per slot
  2. Defining an API which facilitates writing an output plugin to output the changes in any format you define.


Concepts of Logical Decoding

Above I mentioned two new concepts slots, and plugins

A slot is a stream of changes in a database. As previously mentioned logical decoding works on a single database. A slot represents a sequence of changes in that database. There can be more than one slot per database. The slot manages a set of changes sent over a particular stream such as which transaction is currently being streamed and which transaction has been acknowledged.

A plugin is a library which accepts the changes and decodes the changes into a format of your choosing. Plugins need to be compiled and installed before they can be utilized by a slot. 

Creating a slot with JDBC


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
public void createLogicalReplicationSlot(String slotName, String outputPlugin ) throws InterruptedException, SQLException, TimeoutException
    {
        //drop previous slot
        dropReplicationSlot(connection, slotName);

        try (PreparedStatement preparedStatement =
                     connection.prepareStatement(
                      "SELECT * FROM pg_create_logical_replication_slot(?, ?)") )
        {

            preparedStatement.setString(1, slotName);
            preparedStatement.setString(2, outputPlugin);
            try (ResultSet rs = preparedStatement.executeQuery())
            {
                while (rs.next())
                {
                    System.out.println("Slot Name: " + rs.getString(1));
                    System.out.println("Xlog Position: " + rs.getString(2));
                }
            }

        }
    }

This just calls the function pg_create_logical_replication_slot(slotname, plugin_name) which is the same as executing the SQL  "CREATE REPLICATION SLOT LOGICAL"

The function returns the slot name and the current xlog_position.

Note: before we create the slot we check to make sure there isn't an existing slot already running. If there is we use pg_terminate_backend() to terminate it.



 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
public void dropReplicationSlot(Connection connection, String slotName)
            throws SQLException, InterruptedException, TimeoutException
    {
        try (PreparedStatement preparedStatement = connection.prepareStatement(
                        "select pg_terminate_backend(active_pid) from pg_replication_slots "
                                + "where active = true and slot_name = ?"))
        {
            preparedStatement.setString(1, slotName);
            preparedStatement.execute();
        }

        waitStopReplicationSlot(connection, slotName);

        try (PreparedStatement preparedStatement = connection.prepareStatement("select pg_drop_replication_slot(slot_name) "
                            + "from pg_replication_slots where slot_name = ?")) {
            preparedStatement.setString(1, slotName);
            preparedStatement.execute();
        }
    }



So at this point we have a slot and we know the current xlog_location. In order to read the current xlog location, we provide a class in the postgresql driver which can decode the xlog location called org.postgresql.replication.LogicalSequenceNumber.

This class can be used to do things like 

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
 private LogSequenceNumber getCurrentLSN() throws SQLException
    {
        try (Statement st = connection.createStatement())
        {
            try (ResultSet rs = st.executeQuery("select "
                    + (((BaseConnection) connection).haveMinimumServerVersion(ServerVersion.v10)
                    ? "pg_current_wal_location()" : "pg_current_xlog_location()"))) {

                if (rs.next()) {
                    String lsn = rs.getString(1);
                    return LogSequenceNumber.valueOf(lsn);
                } else {
                    return LogSequenceNumber.INVALID_LSN;
                }
            }
        }
    }


The class also provides asLong(), equals(), and asString()


Before we can open a PGReplicationStream we need to create a connection capable of replication. Replication connections can only use the Simple Query protocol, as well as some other requirements.
The code looks like:


1
2
3
4
5
6
7
Connection openReplicationConnection() throws Exception {
        Properties properties = new Properties();
        PGProperty.ASSUME_MIN_SERVER_VERSION.set(properties, "9.4");
        PGProperty.REPLICATION.set(properties, "database");
        PGProperty.PREFER_QUERY_MODE.set(properties, "simple");
        return DriverManager.getConnection("jdbc:postgresql://localhost/test",properties);
    }

In order to read any changes in the database we can just open a PGReplicationStream and read from it: 

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
 public void receiveChangesOccursBeforStartReplication() throws Exception {
        PGConnection pgConnection = (PGConnection) replicationConnection;

        LogSequenceNumber lsn = getCurrentLSN();

        Statement st = connection.createStatement();
        st.execute("insert into test_logic_table(name) values('previous value')");
        st.close();

        PGReplicationStream stream =
                pgConnection
                        .getReplicationAPI()
                        .replicationStream()
                        .logical()
                        .withSlotName(SLOT_NAME)
                        .withStartPosition(lsn)
                        .withSlotOption("include-xids", true)
                        .withSlotOption("pretty-print",true)
                        .withSlotOption("skip-empty-xacts", true)
                        .withStatusInterval(20, TimeUnit.SECONDS)
                        .start();
        ByteBuffer buffer;
        while(true)
        {
            buffer = stream.readPending();
            if (buffer == null) {
                TimeUnit.MILLISECONDS.sleep(10L);
                continue;
            }

            System.out.println( toString(buffer));
            //feedback
            stream.setAppliedLSN(stream.getLastReceiveLSN());
            stream.setFlushedLSN(stream.getLastReceiveLSN());
        }

    }


this will output something like:

BEGIN 3547
table public.test_logic_table: INSERT: pk[integer]:28 name[character varying]:'previous value'
COMMIT 3547


Note we see the following information


  • Table name with schema "public.test_logic_table"
  • The command, one of INSERT, UPDATE, DELETE
  • The column name and type pk[integer] and value 28

So lets break down the above code and what it does:
  1. Create the replication stream with options, note the status interval. This number is chosen to be shorter than the wal_sender_timeout If the server does not receive a ping message from this client within the wal_sender timeout the server will consider this client to have crashed. A good value of statusInterval is wal_sender_timeout/3 
  2. Use readPending to read, this is non-blocking. 
  3. If we have data do something useful with it. In this case nothing useful, but the opportunities for Change Data Capture are endless
  4. Acknowledge that we have received the data. This bit is important as it tells the server that it is free to discard the WAL that captured the change.


In Summary logical replication is a game changer we can now do change data capture without using triggers. We have the ability to audit the database into another database/system without triggers! There are even a few complete database replication solutions which allow write-able slaves without triggers.


By way of attribution I shamelessly ripped off code and documentation from the JDBC documentation

Code for this can be found https://github.com/davecramer/LogicalDecode

Thursday, September 17, 2015

PostgreSQL JDBC 1203 Driver released

Mostly some small bug fixes that didn't get in 1202.

  • fix: Implemented getFunctions
  • fix: changed getProcedureColumns to getFunctionColumns
  • fix: CopyManager fails to copy a file, reading just part of the data #366
Author: Lonny Jacobson
  • add: Added PGTime/PGTimestamp
Author: Patric Bechtel
  • fix: setObject(int parameterIndex, Object x, int targetSqlType) as it will set scale of BigDecimal 'x' to 0 as default, resulting in rounded whole values (!). PR #353 (24312c6)
  • fix: round to correct amount test: add test for BigDecimal rounding behaviour in setObject(index,Object,targetSqlType) and setObject(index,Object,targetSqlType,scale) PR #353 (ff14f62)

Thursday, August 27, 2015

PostgreSQL JDBC Driver Version 9_4_1202 released


Lots of bug fixes and some awesome performance enhancements, including statement caching

Version 9.4-1201 (2015-02-25)

Author: Alexis Meneses
  • ResultSet positioning methods in some particular cases PR #296 (282536b)
Author: Craig Ringer
  • Disable binary xfer on batches returning generated keys PR #273 (763ae84)
  • Add a new test case demonstrating a bug in returning support PR #273 (4d2b046)
  • Always Describe a query in a batch that returns generated keys PR #273 (a6bd36f)
Author: Dave Cramer
  • chore: fix build.xml to allow releasing to maven PR #262 (34f9361)
  • fix: BlobInputStream ignores constructor parameters #263 PR #273 (c1c6edc)
  • don't reset forceBinary transfer if we setPreparedThreshold (937a11c)
  • Revert "perf: Remove expensive finalize methods from Statement and Connection" PR #293 (a0d3997)
  • updated copyright PR #312 (263375c)
  • Revert "Issue 250 -- Adding setURL/getURL to BaseDataSource.java" PR #312 (a1ac380)
  • fixed mailing list href PR #326 (c3e86a6)
  • increment driver version PR #346 (b8ee75d)
Author: David R. Bild:
  • feat: add equality support to PSQLState PR #277 (7698cd9)
  • Improve version checking PR #355 (f7a84db)
Author: Eugene Koontz
  • Add support within "private Object buildArray (PgArrayList input, int index, int count)" for array elements whose type is jsonb PR #349 (d313138)
Author: Jeremy Whiting
  • Added setter method for logging level. The method exactly matches property name in documentation. PR #282 (d9595d1)
  • Added getter method. PR #282 (65759f0)
  • Adding XML catalog to help unit tests not remote entity resolution. PR #284 (cb87067)
  • Added support to locally resolve dtd or entity files. PR #284 (017970d)
  • Disable verbose logging of the catalog resolver. PR #284 (fcc34f5)
Author: Kris Jurka
  • Improve error message for failure to update multicolumn primary key RSs. PR #284 (05ff811)
  • Remove all JDBC3 code as JDK 1.4/1.5 are no longer supported. PR #284 (f9a956b)
  • Add preliminary support for JDBC4.2. PR #284 (bd05fd2)
Author: Lonny Jacobson
  • Added setURL/getURL methods. (fcc8f75)
  • Added a unit test for setURL PR #309 (5fa405b)
Author: Markus KARG
  • perf: use shared PGBoolean instances PR #321 (159fed6)
  • docs: parameter "database" is optional PR #332 (9a9d03f)
  • refactor: binary transfer for setObject(int, Object, int) PR #351 (3ff2129)
Author: Michael Paquier:
  • Update entries in lib/.gitignore PR #262 (8cd15a9)
Author: Phillip Ross
  • Fix for issue https://github.com/pgjdbc/pgjdbc/issues/342 - Modifications to PGline to store coefficients and constant value for linear equation representations used by postgresql for native line datatype. PR #343 (0565416)
  • Fix for issue https://github.com/pgjdbc/pgjdbc/issues/342 - Removed extra copyright comments. PR #343 (5f21a18)
  • Fix for issue https://github.com/pgjdbc/pgjdbc/issues/342 - Handle vertical lines. PR #343 (3918b24)
  • Fix for issue https://github.com/pgjdbc/pgjdbc/issues/342 - Added test method for testing PGline PR #343 (1a50585)
  • Fix for issue https://github.com/pgjdbc/pgjdbc/issues/342 - Modifications to PGline test method to only attempt database access if the postgresql version supports it (v9.4+). PR #343 (15eedb5)
Author: Rikard Pavelic
  • feat: Improved composite/array type support and type naming changes. PR #333 (cddcd18)
  • Deadlock after IO exception during copy cancel PR #363 (d535c13)
Author: Sehrope Sarkuni
  • style: clean up newline whitespace PR #273 (1b77b4c)
  • style: clean up whitespace in .travis.yml PR #274 (3ee5bbf)
  • fix: correct incorrect PG database version in .travis.yml matrix PR #274 (74b88c6)
  • style: reorder jdk versions in .travis.yml PR #274 (21289e7)
  • feat: add PostgreSQL 9.4 to .travis.yml matrix PR #274 (9e94f35)
  • feat: add escapeLiteral(...) and escapeIdentifier(...) to PGConnection PR #275 (096241f)
Author: Stephen Nelson
  • Replace for loops with Java 5-style for loops. Replace String.indexOf with String.contains. Replace StringBuffer with StringBuilder. Remove boxing/unboxing of primitives. PR #245 (206a542)
  • feat: Customize default fetchSize for statements PR #287 (093a4bc)
  • feat: Customize default fetchSize for statements PR #287 (519bfe1)
  • perf: Read test only property "org.postgresql.forceBinary" spend many time when creating statements PR #291 (e185a48)
  • perf: Remove expensive finalize method from Statement Finalize method on Statement is moved to a separate class that is lazily created if user sets "autoCloseUnclosedConnections"="true". This dramatically improves performance of statement instantiation and reduces garbage collection overhead on several wildly used JMVs. PR #290 (eb83210)
  • docs: fix misleading statement on "can't use jdk5 features" in README.md PR #298 (5b91aed)
  • feat: implement micro-benchmark module for performance testing PR #297 (48b79a3)
  • feat: add benchmark for Parser.unmarkDoubleQuestion PR #297 (e5a7e4e)
  • feat: improve sql parsing performance PR #301 (fdd9249)
  • perf: Remove AbstractJdbc2Statement.finalize() PR #299 (b3a2f80)
  • test: add test for prepare-fetch-execute performance PR #303 (d23306c)
  • perf: improve performance of preparing statements PR #303 (7c0655b)
  • test: add test for utf8-encoder performance PR #307 (6345ab1)
  • perf: improve performance of UTF-8 encoding PR #307 (f2c175f)
  • perf: skip instantiation of testReturn and functionReturnType for non-callable statements PR #323 (8eacd06)
  • perf: parse SQL to a single string, not a array of fragments PR #319 (4797114)
  • perf: cache parsed statement across .prepareStatement calls PR #319 (5642abc)
  • refactor: cleanup constructors of JDBC4 and JDBC42 connections/statements PR #318 (a4789c0)
  • refactor: use Dequeue<...> instead of raw ArrayList in v3.QueryExecutorImpl PR #314 (787d775)
  • perf: SimpleParameterList.flags int[] -> byte[] PR #325 (f5bceda)
  • perf: cut new byte[1] from QueryExecutorImpl.receiveCommandStatus PR #326 (0ae1968)
  • perf: avoid useBinary(field) check for each sendBind PR #324 (45269b8)
  • refactor: cleanup Parser and NativeQuery after #311 PR #346 (a1029df)
  • refactor: cleanup Parser and CallableQueryKey after #319 PR #346 (5ec7dea)
  • perf: skip caching of very large queries to prevent statement cache pollution PR #346 (126b60c)
  • use current_schema() for Connection#getSchema PR #356 (ffda429)
  • chore: simple script to compose release notes PR #357 (341ff8e)
  • chore: teach release_notes.sh to identify PR ids out of merge commits PR #358 (f3214b1)

Thursday, July 10, 2014

PostgreSQL JDBC Driver version 9_3_1102 released

This is a maintenance release with a few interesting upgrades

Version 9.3-1102 (2014-07-10)

Author:epgrubmair bug #161
    fix copyOut close hanging bug #161 from epgrubmair

Author:romank0

    backpatch exception during close of fully read stream from romank0

Author:Christophe Canovas

    Added caching for ResultSetMetaData  complete commit

Author:Elizabeth Chatman
    NullPointerException in AbstractJdbc2DatabaseMetaData.getUDTs

    setNull, setString, setObject may fail if a specified type cannot be transferred in a binary mode #151

    backpatch fix for changing datestyle before copy

Author:TomonariKatsumata
    binary transfer fixes new feature -1 for forceBinaryTransfer

Author:Sergey Chernov
    connectTimeout property support backpatch
   
Author:Naoya Anzai
    fix prepared statement ERROR due to EMPTY_QUERY defined as static.

9.4 jars can also be found on the site 

Thursday, April 17, 2014

PostgreSQL JDBC example with Spring Transactions

Using PostgreSQL JDBC with Spring.

This article came about after a user filed an issue on the JDBC list

The specific issue is that I’m using SimpleJdbcCall.execute() to call the database and getting back a Jdbc4Array. When I then try to do something like Jdbc4Array.getArray() I get a SQL error that can be tracked down to the Jdbc driver trying to use a connection object which has already been closed by the Spring Framework.

The problem is that once you get the array back more work has to be done to get the values out of it. The following code is an example of how to use transactions with spring and PostgreSQL JDBC

Simple interface to get a value out of an array Implementation which does the actual work Note the @Transaction annotation on line 22, this is required to ensure that the connection is not closed after the first call on line 42 without this annotation spring would close the connection and the next line 43 would throw an exception. The constructor line 27 is required for spring to create the implementation and inject the datasource defined in the context Spring context SQL to create function and data Code for this can be found here SpringTransactionExample
Thanks to Michael Miller for the initial code used in this example

Tuesday, March 25, 2014

PostgreSQL Driver now under continuous integration

Continuous Integration is all about being able to push our code-changes to github and instantly find out if our build fails, or to discover if our code-changes fail our prepared tests. 
The last few days I have been testing a continuous integration service provided by the good folks at Travis-CI. Best of all, this continous integration service is free for open source projects!
The build can be found here

Thursday, February 20, 2014

JDBC driver 9.3 version 1101 released today.


Changes are primarily bug fixes

Changelog can be found  here