Posts Tagged ‘activiti’

Exposing Activiti BPMN Engine events via websockets extending its REST application

The latest release of Activiti (5.15) introduced a new interesting mechanism – support for event handlers. You can now get notified just when something interesting happens inside process engine. (You can read more on events in Activiti HERE).

I instantly thought that would be even more interesting if external applications integrated with Activiti REST could also get easily notified about the process events. Using websockets as a medium for broadcasting these messages seemed quite natural.
In this article I will present step by step how I extended the default REST application to serve events via websockets. I also developed a client test consuming these events to prove the thing is working, it will also be described later on.

Prerequisites

You can git clone or download zip package with sources of this project here:

https://github.com/wrobelm/activiti-websockets

To run it just go to activiti-websockets-server directory and execute

mvn jetty:run

This will build the server application and run it in maven embedded jetty server.

Then to run automated client test go to activiti-websockets-client directory and execute

mvn test

You will need Maven running on JDK 8 to run the test project.

activiti-websockets server project

Broad view on main concepts :

  • The project is based on original Activiti REST application. It is included in the project as maven dependency and extended where needed.
  • The websockets server has been developed using standard Java EE7 web-api libraries. They are compatible with modern webapp containers like Jetty 9 and Tomcat 8.

Let’s go into some more details:

1. How websocket server is hooked to the Activiti REST webapp?

The standard web.xml file has been altered to use a custom ServletContextListener :

...
 <listener>
 <listener-class>pl.mwrobel.activiti.extensions.CustomActivitiServletContextListener</listener-class>
 </listener>
...

This listener extends default ActivitiServletContextListener.

Look at its source code now:

public class CustomActivitiServletContextListener extends ActivitiServletContextListener {
private static final Logger log = LoggerFactory.getLogger(CustomActivitiServletContextListener.class);
@Override
 public void contextInitialized(ServletContextEvent event) {
 super.contextInitialized(event);
 event.getServletContext().setAttribute("activiti-engine", ProcessEngines.getDefaultProcessEngine());
 addProcessEventsEndpoint(event);
 }
public void addProcessEventsEndpoint(ServletContextEvent ce) {
 log.info("Deploying process-engine-events websockets server endpoint");
 ServletContext sc = ce.getServletContext();
final ServerContainer server_container
 = (ServerContainer) ce.getServletContext().getAttribute("javax.websocket.server.ServerContainer");
try {
 ServerEndpointConfig config
 = ServerEndpointConfig.Builder.create(ProcessEngineEventsWebsocket.class,
 "/process-engine-events").build();
 config.getUserProperties().put("servlet.context", sc);
 server_container.addEndpoint(config);
 } catch (DeploymentException e) {
 throw new RuntimeException(e);
 }
 }
}

Two most important lines:

ServerEndpointConfig config
 = ServerEndpointConfig.Builder.create(ProcessEngineEventsWebsocket.class,
 "/process-engine-events").build();

New endpoint will be deployed to path “/process-engine-events“, it is implemented by websocket server class ProcessEngineEventsWebsocket.

event.getServletContext().setAttribute("activiti-engine", ProcessEngines.getDefaultProcessEngine());

Process engine reference is stored in ‘activiti-engine‘ servlet context attribute, it allows to be later acquired in ProcessEngineEventsWebsocket class.

2. Websocket server endpoint

public class ProcessEngineEventsWebsocket extends Endpoint {
private static final Logger log = LoggerFactory.getLogger(ProcessEngineEventsWebsocket.class);
private ServletContext servletContext;
 private ProcessEngine processEngine;
@Override
 public void onOpen(final Session session, EndpointConfig config) {
 log.info("Websockets connection opened");
this.servletContext = (ServletContext) config.getUserProperties().get("servlet.context");
 processEngine = (ProcessEngine) servletContext.getAttribute("activiti-engine");
 processEngine.getRuntimeService().addEventListener(new ActivitiProcessEventsWebsocketBroadcaster(session));
 }
}
processEngine = (ProcessEngine) servletContext.getAttribute("activiti-engine");

When a new websockets session is opened, processEngine is retrieved from the servlet context attribute stored in servlet context listener.

processEngine.getRuntimeService().addEventListener(new ActivitiProcessEventsWebsocketBroadcaster(session));

Registering ActivitiProcessEventsWebsocketBroadcaster as an event listener for all events published by the Activiti engine. Websockets session object is passed to the event handler to allow sending messages to the client.

3. ActivitiProcessEventsWebsocketBroadcaster – Activiti events handler

This class implements ActivitiEventListener interface. Every time new event occurs this method gets called:

@Override
 public void onEvent(ActivitiEvent event) {
 switch (event.getType()) {
 case ACTIVITY_STARTED: {
 broadcastEvent((ActivitiActivityEvent)event);
 break;
 }
 case ACTIVITY_COMPLETED: {
 broadcastEvent((ActivitiActivityEvent)event);
 break;
 }
 }
 }

I decided to publish two selected types of Activiti events. Every time they occur they are broadcasted to the client. Quite self-explanatory.

private void broadcastEvent(ActivitiActivityEvent e) {
 ProcessEventDTO dto = ProcessEventDTO.builder().activityId(e.getActivityId())
 .activityName(e.getActivityId())
 .activityType(e.getType().toString())
 .processId(e.getProcessInstanceId())
 .build();
 log.info("Activiti event received: " + e.getType());
 RemoteEndpoint.Basic remoteEndpoint = session.getBasicRemote();
 try {
 remoteEndpoint.sendText(om.writeValueAsString(dto));
 } catch (IOException ex) {
 throw new RuntimeException(ex);
 }
 }

Here the event properties are wrapped in transport class object (ProcessEventDTO), then serialized to JSON using Jackson ObjectMapper, and send to the client as string. The client endpoint is retrieved from the session object injected in ProcessEngineEventsWebsocket during session opening.

And that’s basically all about publishing Activiti events via websockets.

Example process and websockets client as test case

To demonstrate it working I developed a simple BPMN process which gets auto deployed during start of the REST webapp.
It is done in DemoDataGenerator class substituting the original class from REST webapp. The file ‘event-demo-process.bpmn20.xml‘ is deployed in method initDemoProcessDefinitions(). You can find this process in graphic notation below:

events-process

Notes:
– first-task and second-task just print some info to server log
– timer-catch waits 8 seconds, then process is resumed again

Let’s move to activiti-websockets-client project

I developed a client going automatically through this process in the form of unit test.

The main test method goes as follows:

@Test
 public void processShouldComplete() throws InterruptedException, Exception {
 final ProcessEventsWebSocket ws = con.getProcessEventsWebsocket();
String processInstanceId = createProcessInstance();
ws.addExpectedEventAndWait(15000, ProcessEventDTO.builder()
 .activityName("user-task").processId(processInstanceId).activityType("ACTIVITY_STARTED")
 .build());
completeUserTask(processInstanceId);
 checkIfProcessHasFinished(processInstanceId);
 }

Notes:
– A new process instance is created using Activiti REST services. You can read documentation of these services HERE. I used Jersey client for dealing with HTTP requests and again Jackson for deserialization of JSON objects representing server events.
– In class ProcessEventsWebSocket using eclipse jetty websocket client library and CountDownLatch (you can read more about it HERE) I implemented simple mechanisms to wait for an event with given field values set. For demo purposes every event incoming in onMessage() method is printed to standard output.
– In this particular test we wait for ACTIVITY_STARTED event of ‘user-task‘, then we can push process forward in completeUserTask() method, finally we check if process has finished. It is also achieved using Activiti REST API mentioned before.

The output of the test should be like:

Running pl.mwrobel.activiti.websockets.test.ActivitiWebsocketsIntegrationTest
 2014-04-28 12:54:52.081:INFO::main: Logging initialized @863ms
 http://127.0.0.1:9080/service/runtime/process-instances
 {"processId":"287","activityId":"start-event","activityName":"start-event","activityType":"ACTIVITY_STARTED"}
 {"processId":"287","activityId":"start-event","activityName":"start-event","activityType":"ACTIVITY_COMPLETED"}
 {"processId":"287","activityId":"first-task","activityName":"first-task","activityType":"ACTIVITY_STARTED"}
 {"processId":"287","activityId":"first-task","activityName":"first-task","activityType":"ACTIVITY_COMPLETED"}
 {"processId":"287","activityId":"timer-catch","activityName":"timer-catch","activityType":"ACTIVITY_STARTED"}
 {"processId":"287","activityId":"timer-catch","activityName":"timer-catch","activityType":"ACTIVITY_COMPLETED"}
 {"processId":"287","activityId":"second-task","activityName":"second-task","activityType":"ACTIVITY_STARTED"}
 {"processId":"287","activityId":"second-task","activityName":"second-task","activityType":"ACTIVITY_COMPLETED"}
 {"processId":"287","activityId":"user-task","activityName":"user-task","activityType":"ACTIVITY_STARTED"}
 MATCHED ****ProcessEventDTO(processId=287, activityId=null, activityName=user-task, activityType=ACTIVITY_STARTED, customActivityType=null)
 {"processId":"287","activityId":"user-task","activityName":"user-task","activityType":"ACTIVITY_COMPLETED"}
 {"processId":"287","activityId":"end-event","activityName":"end-event","activityType":"ACTIVITY_STARTED"}
 {"processId":"287","activityId":"end-event","activityName":"end-event","activityType":"ACTIVITY_COMPLETED"}
 closed connection
 Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 9.358 sec

And that is all on Proof of Concept integration of Activiti with websockets server. Hope it helped someone 😉

Postscript:

My thread on Activiti forums about somewhat more complicated example involving signals and problems regarding transactions and events in Activiti:
http://forums.activiti.org/content/possible-bug-eventing-activiti-not-ready-receive-signal-even-after-dispatching

So – in some cases this concept may still need some refining 😉

Resources :

Article on Java EE7 Websockets:
http://www.oracle.com/technetwork/articles/java/jsr356-1937161.html
CountDownLatch:
http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CountDownLatch.html
Detailed Activiti user guide:
http://www.activiti.org/userguide/
Jackson JSON Processor
http://jackson.codehaus.org/