Thursday, March 9, 2017

Logical Decoding using the JDBC Driver

Logical Decoding


What is Logical Decoding

It's useful to understand what physical replication is in order to understand logical decoding.

Physical replication extends the functionality of recovery mode.  Write Ahead Logs are written to disk before the actual database. These files contain enough information to recreate the transaction in the event of a catastrophic shutdown

In the event of an emergency shutdown (power fail, OOM kill) when the server comes back online it will attempt to apply the outstanding WAL up to the point of the shutdown. This is referred to as recovery mode.

Physical replication takes advantage of this infrastructure built into the server. The standby is started in recovery mode and WAL created by the master are applied to the standby. How that occurs is beyond the scope but you can read about it here .

The interesting bit here is that we have a mechanism by which to access the changes in the heap without connecting to the database.

There are a few caveats though which is where Logical Decoding comes to the rescue. First; WAL's are binary and their format is not guaranteed to be stable (in other words they can change from version to version) and second they contain changes for every database in the server.

Logical decoding changes all of that by

  1. Providing changes for only one database per slot
  2. Defining an API which facilitates writing an output plugin to output the changes in any format you define.


Concepts of Logical Decoding

Above I mentioned two new concepts slots, and plugins

A slot is a stream of changes in a database. As previously mentioned logical decoding works on a single database. A slot represents a sequence of changes in that database. There can be more than one slot per database. The slot manages a set of changes sent over a particular stream such as which transaction is currently being streamed and which transaction has been acknowledged.

A plugin is a library which accepts the changes and decodes the changes into a format of your choosing. Plugins need to be compiled and installed before they can be utilized by a slot. 

Creating a slot with JDBC


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
public void createLogicalReplicationSlot(String slotName, String outputPlugin ) throws InterruptedException, SQLException, TimeoutException
    {
        //drop previous slot
        dropReplicationSlot(connection, slotName);

        try (PreparedStatement preparedStatement =
                     connection.prepareStatement(
                      "SELECT * FROM pg_create_logical_replication_slot(?, ?)") )
        {

            preparedStatement.setString(1, slotName);
            preparedStatement.setString(2, outputPlugin);
            try (ResultSet rs = preparedStatement.executeQuery())
            {
                while (rs.next())
                {
                    System.out.println("Slot Name: " + rs.getString(1));
                    System.out.println("Xlog Position: " + rs.getString(2));
                }
            }

        }
    }

This just calls the function pg_create_logical_replication_slot(slotname, plugin_name) which is the same as executing the SQL  "CREATE REPLICATION SLOT LOGICAL"

The function returns the slot name and the current xlog_position.

Note: before we create the slot we check to make sure there isn't an existing slot already running. If there is we use pg_terminate_backend() to terminate it.



 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
public void dropReplicationSlot(Connection connection, String slotName)
            throws SQLException, InterruptedException, TimeoutException
    {
        try (PreparedStatement preparedStatement = connection.prepareStatement(
                        "select pg_terminate_backend(active_pid) from pg_replication_slots "
                                + "where active = true and slot_name = ?"))
        {
            preparedStatement.setString(1, slotName);
            preparedStatement.execute();
        }

        waitStopReplicationSlot(connection, slotName);

        try (PreparedStatement preparedStatement = connection.prepareStatement("select pg_drop_replication_slot(slot_name) "
                            + "from pg_replication_slots where slot_name = ?")) {
            preparedStatement.setString(1, slotName);
            preparedStatement.execute();
        }
    }



So at this point we have a slot and we know the current xlog_location. In order to read the current xlog location, we provide a class in the postgresql driver which can decode the xlog location called org.postgresql.replication.LogicalSequenceNumber.

This class can be used to do things like 

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
 private LogSequenceNumber getCurrentLSN() throws SQLException
    {
        try (Statement st = connection.createStatement())
        {
            try (ResultSet rs = st.executeQuery("select "
                    + (((BaseConnection) connection).haveMinimumServerVersion(ServerVersion.v10)
                    ? "pg_current_wal_location()" : "pg_current_xlog_location()"))) {

                if (rs.next()) {
                    String lsn = rs.getString(1);
                    return LogSequenceNumber.valueOf(lsn);
                } else {
                    return LogSequenceNumber.INVALID_LSN;
                }
            }
        }
    }


The class also provides asLong(), equals(), and asString()


Before we can open a PGReplicationStream we need to create a connection capable of replication. Replication connections can only use the Simple Query protocol, as well as some other requirements.
The code looks like:


1
2
3
4
5
6
7
Connection openReplicationConnection() throws Exception {
        Properties properties = new Properties();
        PGProperty.ASSUME_MIN_SERVER_VERSION.set(properties, "9.4");
        PGProperty.REPLICATION.set(properties, "database");
        PGProperty.PREFER_QUERY_MODE.set(properties, "simple");
        return DriverManager.getConnection("jdbc:postgresql://localhost/test",properties);
    }

In order to read any changes in the database we can just open a PGReplicationStream and read from it: 

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
 public void receiveChangesOccursBeforStartReplication() throws Exception {
        PGConnection pgConnection = (PGConnection) replicationConnection;

        LogSequenceNumber lsn = getCurrentLSN();

        Statement st = connection.createStatement();
        st.execute("insert into test_logic_table(name) values('previous value')");
        st.close();

        PGReplicationStream stream =
                pgConnection
                        .getReplicationAPI()
                        .replicationStream()
                        .logical()
                        .withSlotName(SLOT_NAME)
                        .withStartPosition(lsn)
                        .withSlotOption("include-xids", true)
                        .withSlotOption("pretty-print",true)
                        .withSlotOption("skip-empty-xacts", true)
                        .withStatusInterval(20, TimeUnit.SECONDS)
                        .start();
        ByteBuffer buffer;
        while(true)
        {
            buffer = stream.readPending();
            if (buffer == null) {
                TimeUnit.MILLISECONDS.sleep(10L);
                continue;
            }

            System.out.println( toString(buffer));
            //feedback
            stream.setAppliedLSN(stream.getLastReceiveLSN());
            stream.setFlushedLSN(stream.getLastReceiveLSN());
        }

    }


this will output something like:

BEGIN 3547
table public.test_logic_table: INSERT: pk[integer]:28 name[character varying]:'previous value'
COMMIT 3547


Note we see the following information


  • Table name with schema "public.test_logic_table"
  • The command, one of INSERT, UPDATE, DELETE
  • The column name and type pk[integer] and value 28

So lets break down the above code and what it does:
  1. Create the replication stream with options, note the status interval. This number is chosen to be shorter than the wal_sender_timeout If the server does not receive a ping message from this client within the wal_sender timeout the server will consider this client to have crashed. A good value of statusInterval is wal_sender_timeout/3 
  2. Use readPending to read, this is non-blocking. 
  3. If we have data do something useful with it. In this case nothing useful, but the opportunities for Change Data Capture are endless
  4. Acknowledge that we have received the data. This bit is important as it tells the server that it is free to discard the WAL that captured the change.


In Summary logical replication is a game changer we can now do change data capture without using triggers. We have the ability to audit the database into another database/system without triggers! There are even a few complete database replication solutions which allow write-able slaves without triggers.


By way of attribution I shamelessly ripped off code and documentation from the JDBC documentation

Code for this can be found https://github.com/davecramer/LogicalDecode

No comments: