Tuesday, May 3, 2011

Spring Integration: Hooking web services to a FIFO queue

I'm still struggling with Spring Integration- here's my scenario:

  1. A web service gets a request from client A
  2. web service puts the request onto a queue
  3. A queue consumer processes the messages FIFO and sends a response that gets routed back to the web service
  4. Web services sends response back to client A

There will be multiple web services all feeding messages onto this queue, and I need to ensure that they are truly processed in the order that they're received.

What pieces of Spring Integration do I need to wire together?

From stackoverflow
  • Note sure about Spring Integration but Java 5 has a number of BlockingQueues that can handle FIFO.

    Mike Sickler : Thanks, but I'm looking for a Spring Integration answer, and in particular a solution to the problem of hooking up a web service to a queue
  • The problem ist not spring. I think you will need a queue with elements containing the request and offering a response. But the response need to block until the element is dequed and processed. So the queue element looks like:

    public class BlockingPair {
      private final RequestBodyType request;
      private ResponseBodyType response;
    
      public BlockingPair(RequestBodyType request) {
        this.request = request;
      }
    
      public RequestBodyType getRequest() {
        return request;
      }
    
      public ResponseBodyType getResponse() {
        while (response == null) {
          Thread.currentThread().sleep(10);
        }
        return response;
      }
    
      public void setResponse(ResponseBodyType response) {
        this.response = response;
      }
    }
    

    The webservice enqueing creates the BlockingPair with its request body. Than pushes the BlockingPair element to the queue. Afterwards it creates the response getting the response body from the BlockingPair, but blocks.

    The consumer deques one BlockingPair and sets the response body. From there the webservice continues writing the response.

    You need three beans: webservice, a blocking queue and the consumer. Both webservice and consumer need the queue as bean property.

    The queue and the consumer beans need to be planed in the application context (as initilialized by the ContextLoaderListener). The queue needs a bean id to be references by the webservice (which has its own context, but the application context as a parent so the queue reference can be referenced):

    Part of the web.xml:

    <context-param>
      <param-name>contextConfigLocation</param-name>
      <param-value>classpath:/WEB-INF/applicationContext.xml</param-value>
    </context-param>
    
    <listener>
      <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
    </listener>
    
    <servlet>
      <servlet-name>service</servlet-name>
      <servlet-class>org.springframework.ws.transport.http.MessageDispatcherServlet</servlet-class>
    </servlet>
    
    <servlet-mapping>
      <servlet-name>service</servlet-name>
      <url-pattern>/*</url-pattern>
    </servlet-mapping>
    

    The applicationContext.xml contains two beans:

    <bean id="queue" class="java.util.concurrent.LinkedBlockingQueue"/>
    
    <bean id="consumer" class="...">
      <property name="queue" ref="queue"/>
    </bean>
    

    The webservice has its own context definition, here service-servlet.xml:

    <bean id="endpoint" class="org.springframework.ws.server.endpoint....PayloadEndpoint">
      <property name="queue" ref="queue"/>
    </bean>
    

    For more information on defining a spring ws endpoint, see the spring tutorial.

    The consumer need to be a background task, so i would prefer quartz, for spring integration see here.

    Mike Sickler : As I say in the question, I'm looking for a Spring Integration solution. Spring Integration has queues and other classes, but I'm not sure how to wire them together.
  • Based on the Javadoc for the QueueChannel here's my attempt at it. This does not address the web service configuration, just the code that would go in the web service back end implementation.

    This is the code that would add something to a queue (your web service).

    public class TheWebService {
    
      // Could also use QueueChannel, or PollableChannel here instead
      // just picked the most general one
      private org.springframework.integration.channel.MessageChannel queue;
    
      public void yourWebServiceMethod(SomeArg arg) {
         SomeObjectToPassThatExtendsMessage passed = someInitialProcessing(arg);
         queue.send(passed);
      }
    }
    

    This is the code that would go in your receiver/processor/dequeue class

    public class TheProcessor {
    
      // Could also use QueueChannel here instead
      // just picked the most general one
      private org.springframework.integration.channel.PollableChannel queue;
    
      // This method needs to be setup to be called by a separate thread.
      // See http://static.springframework.org/spring/docs/2.5.x/api/org/springframework/scheduling/package-summary.html
      // and it's sub-packages.
      public void someProcessingPoller() {
         SomeObjectToPassThatExtendsMessage passed = queue.receive();
         // Do some processing with the passed object.
      }
    
    }
    

    The Spring configuration for this would look something like

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans">
    
      <bean id="webService" class="mypackage.TheWebService">
          <property name="queue" ref="queue" />
      </bean>
    
      <bean id="processor" class="mypackage.TheProcessor ">
          <property name="queue" ref="queue" />
      </bean>
    
      <bean id="queue" class="org.springframework.integration.channel.QueueChannel"/>
    </beans>
    
  • I can't help you with Spring Integration, but perhaps you need to give your architecture a second thought. In ESB systems you usually place a message in a Queue, when you know that the processing of the message will take considerable time or if you aren't certain that the remote end is ready (another reason is to bridge incompatible components). When you add the message to the queue, you immediately return to the requester indicating that the message is received, but not providing the result of the operation. The requester would then need to poll for the result or you could alternatively provide some sort of "push" functionality.

    So, if the processing of the messages in the Queue takes a lot of time, I recommend to modify your architecture. It is not common for a Web Client to wait long times for the reply and many requests could also timeout.

    If on the other hand the processing of the messages is quick and reliable, then using Queue channels is not needed. Have all your messages communicate with a central component (JEE Session Bean, Spring Bean, web service) and implement a queue mechanism yourself. They are already answers covering how you could do this.

0 comments:

Post a Comment