github linkedin
Dropwizard and Atmosphere
2015-02-20

In this blog post I’m going to write about combining Dropwizard and Atmosphere. Atmosphere is a framework for handling websockets. If you take a look at the samples, it seems like the integration is a no-brainer. I thought so too. Fool.

Atmosphere provides a Jersey integration, Dropwizard also uses Jersey, so everything will be alright. Hah.

First, add org.atmosphere:atmosphere-jersey and org.eclipse.jetty.websocket:websocket-server:9.0.7.v20131107 (use the same Jetty version as in Dropwizard) to your dependencies. Then register the Atmosphere Servlet in Dropwizard:

@Override
public void run(RestwarsConfiguration configuration, Environment environment) throws Exception {
  AtmosphereServlet servlet = new AtmosphereServlet();
  servlet.framework().addInitParameter(ApplicationConfig.ANNOTATION_PACKAGE, WebsocketResource.class.getPackage().getName());
  servlet.framework().addInitParameter(ApplicationConfig.WEBSOCKET_SUPPORT, "true");

  ServletRegistration.Dynamic registration = environment.servlets().addServlet("atmosphere", servlet);
  registration.addMapping("/websocket/*");
}

Now we can implement our websocket handler:

@Path("/")
@Produces(MediaType.APPLICATION_JSON)
public class WebsocketResource {
    /**
     * Name of the round broadcaster.
     */
    private static final String ROUND_BROADCASTER_NAME = "round";

    @Context
    private BroadcasterFactory broadcasterFactory;

    @Suspend
    @GET
    @Path("/round")
    public SuspendResponse<String> registerForRoundEvent(@Context AtmosphereResource resource) {
        return new SuspendResponse.SuspendResponseBuilder<String>()
                .broadcaster(broadcasterFactory.lookup(DefaultBroadcaster.class, ROUND_BROADCASTER_NAME, true))
                .build();
    }
}

In this code, we let Jersey inject the BroadcasterFactory trough the @Context annotation, so we can created a named broadcaster. When a client opens a websocket to http://localhost:8080/websocket/round, registerForRoundEvent will be called and the connection will be held open (@Suspend annotation).

Now we can use the named broadcaster somewhere in the application to send events to the listening client:

public void broadcastRound(BroadcasterFactory broadcasterFactory, long round) {
  broadcasterFactory.lookup(DefaultBroadcaster.class, ROUND_BROADCASTER_NAME, true).broadcast(Long.toString(round));
}

An instance of the BroadcasterFactory can be obtained as follows:

@Override
public void run(RestwarsConfiguration configuration, Environment environment) throws Exception {
  AtmosphereServlet servlet = new AtmosphereServlet();
  // ...
  BroadcasterFactory broadcasterFactory = servlet.framework().getBroadcasterFactory();
}

Now, everything works. Or so I thought. I pushed the code to GitHub, and travis complained that tests are failing. As it turns out, Atmosphere is not compatible with Dropwizard Testing. Or Dropwizard is not compatible with Atmosphere, whatever.

If you use the ResourceTestRule from Dropwizard, which starts an in-memory Jersey, strange errors pop up:

ERROR [2015-02-20 16:08:06,098] com.sun.jersey.spi.inject.Errors: The following errors and warnings have been detected with resource and/or provider classes:
  SEVERE: Missing dependency for field: javax.servlet.http.HttpServletRequest org.atmosphere.jersey.BaseInjectableProvider.req
  SEVERE: Missing dependency for field: javax.servlet.http.HttpServletRequest org.atmosphere.jersey.BaseInjectableProvider.req
  SEVERE: Missing dependency for field: javax.servlet.http.HttpServletRequest org.atmosphere.jersey.AtmosphereProviders$BroadcasterProvider.req
  SEVERE: Missing dependency for field: javax.servlet.http.HttpServletRequest org.atmosphere.jersey.BaseInjectableProvider.req
  SEVERE: Missing dependency for field: javax.servlet.http.HttpServletRequest org.atmosphere.jersey.BaseInjectableProvider.req
  SEVERE: Missing dependency for field: protected javax.servlet.http.HttpServletRequest org.atmosphere.jersey.AtmosphereFilter.servletReq
  SEVERE: Missing dependency for field: javax.servlet.http.HttpServletRequest org.atmosphere.jersey.BaseInjectableProvider.req
  SEVERE: Missing dependency for field: javax.servlet.http.HttpServletRequest org.atmosphere.jersey.BaseInjectableProvider.req

com.sun.jersey.spi.inject.Errors$ErrorMessagesException
    at com.sun.jersey.spi.inject.Errors.processErrorMessages(Errors.java:170)
    at com.sun.jersey.spi.inject.Errors.postProcess(Errors.java:136)
    at com.sun.jersey.spi.inject.Errors.processWithErrors(Errors.java:199)
    at com.sun.jersey.server.impl.application.WebApplicationImpl.initiate(WebApplicationImpl.java:795)
    at com.sun.jersey.server.impl.application.WebApplicationImpl.initiate(WebApplicationImpl.java:790)
    at com.sun.jersey.test.framework.spi.container.inmemory.InMemoryTestContainerFactory$InMemoryTestContainer.start(InMemoryTestContainerFactory.java:165)
    at com.sun.jersey.test.framework.JerseyTest.setUp(JerseyTest.java:310)
    at io.dropwizard.testing.junit.ResourceTestRule$1.evaluate(ResourceTestRule.java:149)
    at org.junit.rules.RunRules.evaluate(RunRules.java:20)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:74)
    at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:211)
    at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:67)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)

I assume Jersey tries to load the Atmosphere providers through classpath scanning and fails. This error appears even if you don’t use any code from Atmosphere, just by having the atmosphere-jersey in your classpath. Very nice.

My workaround:

Don’t use the atmosphere-jersey module but atmosphere-runtime instead. The drawback is that you have to write a little bit more code. Lets take a look at our new WebsocketResource:

@AtmosphereHandlerService
public class WebsocketResource implements AtmosphereHandler {
    /**
     * Name of the round broadcaster.
     */
    private static final String ROUND_BROADCASTER_NAME = "round";

    @Override
    public void onRequest(AtmosphereResource atmosphereResource) throws IOException {
        BroadcasterFactory broadcasterFactory = atmosphereResource.getAtmosphereConfig().getBroadcasterFactory();
        Broadcaster broadcaster = getBroadcaster(broadcasterFactory);
        atmosphereResource.setBroadcaster(broadcaster);
        atmosphereResource.suspend();
    }

    private boolean isBroadcast(AtmosphereResourceEvent event) {
        return event.getMessage() != null && !event.isCancelled() && !event.isClosedByClient() && !event.isClosedByApplication();
    }

    @Override
    public void onStateChange(AtmosphereResourceEvent event) throws IOException {
        AtmosphereResource resource = event.getResource();

        if (isBroadcast(event)) {
            resource.write(event.getMessage().toString());

            switch (resource.transport()) {
                case WEBSOCKET:
                case STREAMING:
                    resource.getResponse().flushBuffer();
                    break;
                default:
                    resource.resume();
                    break;
            }
        }
    }

    @Override
    public void destroy() {
    }

    private Broadcaster getBroadcaster(BroadcasterFactory broadcasterFactory) {
        return broadcasterFactory.lookup(ROUND_BROADCASTER_NAME, true);
    }
}

First, we annotate our WebsocketResource with @AtmosphereHandlerService and implement Atmosphere’s AtmosphereHandler. onRequest is called when a websocket is opened from the client. Here we assign the named broadcaster and suspend the connection. onStateChange is called, as the name suggests, wenn a broadcast needs to be sent (WTF?) or any other state change (disconnect, timeout, …) happens. If the event is a broadcast, them some logic is triggered: depending on the transport protocol (websocket, long-polling, etc.) we write the broadcast message and flush the buffer or, in case of long-polling, even close the connection.

Now the code works the same way as if we used the Atmosphere-Jersey combination but our tests are still working. Great!

You can see the whole setup in RESTwars.



Back to posts