How to propagate data source changes to all clients by using the Real-Time Messaging Module

Description

This example will demonstrate how to use the real-time messaging module to propagate data source changes to all clients.  We will create a ListGrid that updates in real-time after anyone makes changes to the backing database.

Article conventions

This article uses $SGWTEE_HOME to refer to the installation folder of Smart GWT Enterprise Edition.

Project set up

Open the real-time-grid-updates project in Eclipse.  There are detailed instructions on how to do this in the README.txt.  We will need three JARs to enable real-time messaging:

# isomorphic_js_parser.jar

# isomorphic_realtime_messaging.jar, and

# messaging.jar

These can be found in the $SGWTEE_HOME/lib/ folder.  To add the JARs to the Eclipse project classpath, right click on the *real-time-grid-updates* project folder in the *Package Explorer* and then go to:  *Build Path* > *Configure Build Path...*

In the *Java Build Path* section, click on the *Libraries* tab, then the *Add Variable...* button.  Click on the variable starting with SGWTEE_HOME then click the *Extend...* button.  Navigate to $SGWTEE_HOME/lib/isomorphic_js_parser.jar and click *Ok*.  Do this for the other two JARs as well.  Finally, click *Ok* to leave the properties dialog.  The JARs should now be included under *Referenced Libraries* in the *Package Explorer*.

MessagingServlet is declared in war/WEB-INF/web.xml:

<!-- Messaging servlet for realtime messaging subsystem -->
<servlet>
    <servlet-name>MessagingServlet</servlet-name>
    <servlet-class>com.isomorphic.messaging.MessagingServlet</servlet-class>
</servlet>
<!-- ... -->
<!-- messaging servlet mapping -->
<servlet-mapping>
    <servlet-name>MessagingServlet</servlet-name>
    <url-pattern>/realtimegridupdates/sc/messaging/*</url-pattern>
</servlet-mapping>

We will be using the employees data source:

<DataSource
    ID="employees"
    serverType="sql"
    serverConstructor="com.smartgwt.sample.server.GridUpdatesDataSource"
    tableName="employeeTable"
    recordName="employee"
    testFileName="employees.data.xml"
    titleField="Name"
>
    <fields>
        <field name="Name"            title="Name"            type="text"     length="128"/>
        <field name="EmployeeId"      title="Employee ID"     type="sequence"  primaryKey="true"
               required="false"       hidden="true"/>
        <field name="ReportsTo"       title="Manager"         type="integer"  required="false"
               foreignKey="employees.EmployeeId"  rootValue="1" detail="true"/>
        <field name="Job"             title="Title"           type="text"     length="128"/>
        <field name="Email"           title="Email"           type="text"     length="128"/>
        <field name="EmployeeType"    title="Employee Type"   type="text"     length="40"/>
        <field name="EmployeeStatus"  title="Status"          type="text"     length="40"/>
        <field name="Salary"          title="Salary"          type="float"/>
        <field name="OrgUnit"         title="Org Unit"        type="text"     length="128"/>
        <field name="Gender"          title="Gender"          type="text"     length="7">
            <valueMap>
                <value>male</value>
                <value>female</value>
            </valueMap>
        </field>
        <field name="MaritalStatus"   title="Marital Status"  type="text"     length="10">
            <valueMap>
                <value>married</value>
                <value>single</value>
            </valueMap>
        </field>
    </fields>
</DataSource>

Server-Side Approach

Because we want to know when any change is made to a data source, we need to be able to intercept all calls that will modify the database.  In each call we can insert our own code to send real-time notifications to active clients.  One way of accomplishing this is to write our own subclass of com.isomorphic.sql.SQLDataSource and override the executeAdd(), executeRemove(), and executeUpdate() methods.  Each method takes a DSRequest argument and returns a DSResponse.  We will invoke the corresponding method of SQLDataSource, so that these operations pass through to the database.  We will not only return the DSResponse generated by SQLDataSource to the one client making the original request, but will also send the same response to all other clients using a helper function, sendToClients().

GridUpdatesDataSource
package com.smartgwt.sample.server;

import com.isomorphic.datasource.DSRequest;
import com.isomorphic.datasource.DSResponse;
import com.isomorphic.sql.SQLDataSource;
import com.smartgwt.client.types.DSOperationType;

public final class GridUpdatesDataSource extends SQLDataSource {

    @Override
    public DSResponse executeAdd(DSRequest req) throws Exception {
        final DSResponse resp = super.executeAdd(req);
        sendToClients(resp, DSOperationType.ADD);
        return resp;
    }

    @Override
    public DSResponse executeRemove(DSRequest req) throws Exception {
        final DSResponse resp = super.executeRemove(req);
        sendToClients(resp, DSOperationType.REMOVE);
        return resp;
    }

    @Override
    public DSResponse executeUpdate(DSRequest req) throws Exception {
        final DSResponse resp = super.executeUpdate(req);
        sendToClients(resp, DSOperationType.UPDATE);
        return resp;
    }

    private void sendToClients(final DSResponse resp, final DSOperationType operationType) {
        // TODO
    }
}

The sendToClients() method is responsible for sending the real-time message containing the data source response and the type of operation, represented by the values of the enum DSOperationType.  To use the real time messaging module we must create an ISCMessageDispatcher instance and invoke its send() method with a channel name and a data object.  The channel name is simply a String to specify the group of clients that should receive the message.  Here we want all clients to receive all messages so we will use a single, common channel name "gridUpdates".  The data object will consist of a boolean flag indicating whether to invalidate the cache, the operation type (add, remove, or update), and an array of the data records.  The real-time messaging module converts this data object into equivalent JSON and sends it to the clients.  The data object contains all of the information needed by each client to update exactly as if they had made the original data source request.

sendToClients()
private void sendToClients(final DSResponse resp, final DSOperationType operationType)
        throws Exception {

    final ISCMessageDispatcher dispatcher = ISCMessageDispatcher.instance();

    // Convert the data source response and the operation type into a form
    // usage by the client-side to construct a corresponding DSResponse and
    // DSRequest.

    final Map data = new HashMap();

    data.put("invalidateCache", resp.getInvalidateCache());
    data.put("operationType", operationType.getValue());
    data.put("records", resp.getRecords());

    dispatcher.send(new ISCMessage("gridUpdates", (Object) data));
}

Client-Side Response

The client-side can register to receive the real-time messages by calling the Messaging.subscribe() function:

package com.smartgwt.sample.client;

import com.google.gwt.core.client.EntryPoint;
import com.google.gwt.core.client.JavaScriptObject;
import com.smartgwt.client.rpc.Messaging;
import com.smartgwt.client.rpc.MessagingCallback;
import com.smartgwt.client.util.SC;
import com.smartgwt.client.widgets.HTMLFlow;

public class RealTimeGridUpdates implements EntryPoint {

    private ListGrid listGrid;
    private DataSource ds;

    public void onModuleLoad() {

        ds = DataSource.get("employees");

        listGrid = new ListGrid();
        listGrid.setDataSource(ds);
        listGrid.setAutoFetchData(true);
        listGrid.draw();

        // ...

        Messaging.subscribe("gridUpdates", new MessagingCallback() {
            public void execute(Object data) {
                updateGrid((JavaScriptObject) data);
            }
        });
    }

    private void updateGrid(final JavaScriptObject dataObj) {
        // ...
    }
}

updateGrid receives the real-time messages from the server in a JavaScriptObject and updates the ListGrid with the changes represented by the message.  It can use the updateCaches() method to update the ListGrid, so it just needs to construct a DSResponse and a DSRequest to use for the arguments.

We first convert the data object into a Java object that we can manipulate.  The data is exactly the same as what we sent from the server-side.  The data is a Map of String to objects.  The value for "invalidateCache" is a Boolean flag.  The value for "operationType" is a String that can be used to construct a DSOperationType.  The value for "records" is a List of key-value maps representing the data records.  The complete code for updateGrid is:

private void updateGrid(final JavaScriptObject dataObj) {

    // Create a fake DSRequest and DSResponse to send to the list grid's underlying
    // data source to force it to update its cache.

    final DSResponse dsResponse = new DSResponse();
    final DSRequest dsRequest = new DSRequest();

    @SuppressWarnings("unchecked")
    final Map<String, ?> data = (Map<String, ?>) convertToJava(dataObj);

    // Determine whether to invalidate cache
    dsResponse.setInvalidateCache((Boolean) data.get("invalidateCache"));

    // Determine the operation type
    final DSOperationType operationType =
            valueOfDSOperationType((String) data.get("operationType"));
    dsRequest.setOperationType(operationType);

    // Construct the record data
    @SuppressWarnings("unchecked")
    List<Map<?, ?>> dataRecords = (List<Map<?, ?>>) data.get("records");
    if (dataRecords != null && !dataRecords.isEmpty()) {

        final int dataRecordsSize = dataRecords.size();

        final Record[] records = new Record[dataRecordsSize];
        int i = 0;
        for (Map<?, ?> dataRecord : dataRecords) {
            records[i++] = new Record(dataRecord);
        }

        dsResponse.setData(records);
    }

    // Update the list grid's cache
    listGrid.getDataSource().updateCaches(dsResponse, dsRequest);
}

private static DSOperationType valueOfDSOperationType(final String value) {
    for (DSOperationType type : DSOperationType.values()) {
        if (type.getValue().equals(value)) {
            return type;
        }
    }
    return null;
}

Testing the Real-Time Updates

Now after recompiling, we can open our web application simultaneously in multiple web browsers to see the effects of the real-time messaging system.  Make changes to the list grid in one web browser and see the changes automatically applied in the others!

Conclusion

We can configure the real-time messaging module in the configuration file src/server.properties.  These are the available configuration options:

* messaging.keepaliveInterval

* messaging.keepaliveReestablishDelay

* messaging.connectTimeout

* messaging.connectionTTL

* messaging.flushBufferSize

* messaging.dispatcherImplementer

* messaging.jms.context

* messaging.jms.jndiPrefix

* messaging.jms.topicConnectionFactory

Information about these options and the real-time messaging module in general is available in the Messaging Quick Reference PDF:  $SGWTEE_HOME/doc/Messaging_QuickRef.pdf.

An Eclipse project containing the full source code can be downloaded from here: real-time-grid-updates.zip