Sunday, December 16, 2012

Using WS-Policy in CXF projects


WS-Policy provides flexible mechanism to activate desired functionality on the client or service sides. Article describes how to define policies in custom CXF projects, implement policy-aware interceptors and explains some aspects of internal CXF design regarding WS-Policy.


How to define policies

There are basically 3 main possibilities to define WS-Policy in CXF projects:
  1. WSDL Policy attachment
  2. Spring configuration
  3. Dynamically via message context property
Let look into them in details.

WSDL Policy attachment
WS-Policies can be attached and referenced in WSDL elements. Web Services Policy 1.5 - Attachment standard describes all possible alternatives. WS-Policies can be placed inside WSDL itself or referenced as external documents. CXF will automatically recognize, read and use policies defined or referenced in WSDL. Sample of attached policy is shown below:
<wsdl:definitions name="HelloWorld" targetNamespace="http://apache.org/hello_world_soap_http"<wsdl:service name="SOAPService">
    <wsdl:port binding="tns:Greeter_SOAPBinding" name="SoapPort">
        <soap:address location="http://localhost:9000/SoapContext/SoapPort"/>
        <wsp:Policy xmlns:wsp="http://www.w3.org/ns/ws-policy">
             <wsam:Addressing xmlns:wsam="http://www.w3.org/2007/02/addressing/metadata">
                 <wsp:Policy/>
              </wsam:Addressing>
         </wsp:Policy>
    </wsdl:port>
</wsdl:service>
</wsdl:definitions>  

Spring configuration
It is possible to define policies directly in Spring configuration of client and service as jaxws feature. CFX will recognize and use configured WS-Policies:
Client:
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:jaxws="http://cxf.apache.org/jaxws"
       xmlns:cxf="http://cxf.apache.org/core"
       xmlns:p="http://cxf.apache.org/policy"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
http://cxf.apache.org/core http://cxf.apache.org/schemas/core.xsd
http://cxf.apache.org/policy http://cxf.apache.org/schemas/policy.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"
http://cxf.apache.org/jaxws http://cxf.apache.org/schemas/jaxws.xsd">
    <jaxws:client id="CRMServiceClient" name="{http://services.talend.org/CRMService}CRMServiceProvider"
            xmlns:serviceNamespace="http://services.talend.org/CRMService"
            serviceClass="org.talend.services.crmservice.CRMService"
            serviceName="serviceNamespace:CRMServiceProvider"
            endpointName="serviceNamespace:CRMServicePort"
            address="${endpoint.prefix}/CRMServiceProvider">
            <jaxws:features>
                <p:policies>
                    <wsp:PolicyReference xmlns:wsp="http://www.w3.org/ns/ws-policy" URI="classpath:/saml.policy"/>
                </p:policies>
            </jaxws:features>
    </jaxws:client>
</beans>
Service:
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:jaxws="http://cxf.apache.org/jaxws"
       xmlns:cxf="http://cxf.apache.org/core"
       xmlns:p="http://cxf.apache.org/policy"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
http://cxf.apache.org/core http://cxf.apache.org/schemas/core.xsd
http://cxf.apache.org/policy http://cxf.apache.org/schemas/policy.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"
http://cxf.apache.org/jaxws http://cxf.apache.org/schemas/jaxws.xsd">
    <jaxws:endpoint id="CRMService"
            xmlns:serviceNamespace="http://services.talend.org/CRMService"
            serviceName="serviceNamespace:CRMServiceProvider"
            endpointName="serviceNamespace:CRMServicePort"
            implementor="#CRMServiceBean"
            address="/CRMServiceProvider">
            <jaxws:features>
                <p:policies>
                    <wsp:PolicyReference xmlns:wsp="http://www.w3.org/ns/ws-policy" URI="classpath:/saml.policy"/>
                </p:policies>
            </jaxws:features>
    </jaxws:endpoint>
</beans>
Dynamically through message property
Sometimes policies cannot be configured statically, because they are obtained or calculated dynamically for concrete message (for example using Policy Server or Service Registry). For such cases CXF provide a possibility to load policy dynamically and set it into the message context property. It can be done for example in custom interceptor that fulfils the following:
  1. Get policy from external location and build it for current message.
  2. Parse WS-Policy XML using Neethi library.
  3. Store result Policy object into PolicyConstants.POLICY_OVERRIDE message content property.
    Important is that this custom policy interceptor is called before CXF PolicyInInterceptor or PolicyOutInterceptor. Than CXF will automatically recognize Policy stored into this property and use it with highest priority. 
 I have published a small sample illustrating how to apply policy dynamically.

Create custom policy assertions and associate interceptors

It is quite easy to define own policy assertions and associate interceptors with it. Topic is already well described in CXF document http://cxf.apache.org/docs/developing-assertions.html, I just provide a list of steps necessary to do:
  1. Provide Assertion Builder class for custom assertion implementing AssertionBuilder<T> interface.
    Interface type can be Element, XMLStreamReader or OMElement.
    Interface contains two methods: build() and getKnownElements().
    Implementation of build() method should construct Assertion from the incoming type. It can be PrimitiveAssertion (without attributes or child elements), NestedPrimitiveAssertion (without attributes but with nested policy element) and JaxbAssertion (assertion described by any XML schema).
    getKnownElements() method must return QNames of assertion elements from which assertion can be built.
  2. Implement policy interceptor provider class extending AbstractPolicyInterceptorProvider class. The main task of policy interceptor provider is to say which interceptors must be activated for specified policy assertion. Policy interceptor provider constructor gives assertions QNames as argument of super constructor and adds corresponded interceptors using getters:
public class AuthorizationInterceptorProvider extends AbstractPolicyInterceptorProvider {
    private static final long serialVersionUID = -5248428637449096540L;
    private static final AuthorizationInInterceptor IN_AUTHZ_INTERCEPTOR = new AuthorizationInInterceptor();
    private static final AuthorizationInInterceptor OUT_AUTHZ_INTERCEPTOR = new AuthorizationOutInterceptor();
    private static final Collection<QName> ASSERTION_TYPES;
    static {
        ASSERTION_TYPES = new ArrayList<QName>();
        ASSERTION_TYPES.add(AuthorizationConstants.AUTHORIZATION_ASSERTION);
    }
    public AuthorizationInterceptorProvider() {
        super(ASSERTION_TYPES);
        getInInterceptors().add(IN_AUTHZ_INTERCEPTOR);        
        getOutInterceptors().add(OUT_AUTHZ_INTERCEPTOR);        
    }
}

Assertion builder and policy interceptor provider can be registered using CXF bus extension mechanism: just create a file META-INF/cxf/bus-extensions.txt containing the following:
org.company.AuthorizationInterceptorProvider::true
org.company.AuthorizationAssertionBuilder::true  
Boolean value at the end specifies lazy loading strategy.
CXF automatically recognizes the assertion builder and policy interceptor provider and store them into registries: AssertionBuilderRegistry and PolicyInterceptorProviderRegistry. Since CXF 2.6.0 it is possible to register multiple interceptor providers for single assertion.

How and where CXF processes policies

As I already mentioned, CXF provides two interceptors: org.apache.cxf.ws.policy.PolicyInInterceptor and org.apache.cxf.ws.policy.PolicyOutInterceptor. These interceptors are responsible to load policy from destination, parse, merge them and add all associated interceptors into message interceptor chain. Functionality of policy interceptors are represented on the following figure:


Briefly, policy interceptors make following steps:
  1. Check message property PolicyConstants.POLICY_OVERRIDE.
  2. If PolicyConstants.POLICY_OVERRIDE contains policy, it will be taken for further processing.
  3. If property is empty, policy will be asked from ServiceModel. Here CXF loads policies attached to WSDL or provided via Spring configuration.
  4. If any policy on step 2 or step 3 is found, EffectivePolicy will be created. Appropriate WS-policies will be merged for the current message and built into Neethi Policy object.
  5. All interceptors registered for result policy assertions will be added to message interceptor chain.
Additionally, CXF verifies satisfied policy assertions in PolicyVerfificationInInterceptor, PolicyVerificationInFaultInterceptor, PolicyVerificationOutInterceptor. If assertion is not processed and not satisfied in corresponded interceptor, than In- interceptors throw Fault and Out- interceptors provide appropriate log messages.
The practical using of WS-Policy is illustrated in ws_policy and ws_security CXF samples.

Friday, November 30, 2012

CXF XSLT Feature

XSLT Feature

 CXF XSLT Feature is introduced as alternative of lightweight CXF Transformation Feature.
The most transformation use cases can be implemented using Transformation Feature: dropping the namespace of the outbound messages; qualifying the incoming message; changing namespaces; appending or dropping elements; converting attributes to elements.

But it can be not enough for some advanced transformations. If you have to implement non-trivial transformation, not supported by Transformation Feature - it is use case for XSLT Feature. Here the full power of XSL is available.
Therefore decision which feature should be used is very easy: CXF Transformation Feature for simple transformations and CXF XSLT Feature for advanced transformations, not supported by lightweight variant.

Configuration

Spring configuration of XSLT Feature is very easy, it is necessary just to configure XSL script for inbound and/or outbound transformation:
<bean id="xsltFeature" class="org.apache.cxf.feature.transform.XSLTFeature">          
      <property name="inXSLTPath" value="requestTransformation.xsl" />          
      <property name="outXSLTPath" value="responseTransformation.xsl" />  
</bean>

After it, feature can be added to client or endpoint:
<jaxws:client id="customerService" serviceName="customer:CustomerServiceService"
  endpointName="customer:CustomerServiceEndpoint" address="http://localhost:9091/CustomerServicePort"
  serviceClass="com.example.customerservice.CustomerService">
  <jaxws:features>
     <ref bean="xsltFeature" />
  </jaxws:features>
</jaxws:client>

<jaxws:endpoint xmlns:customer="http://customerservice.example.com/"
  id="CustomerServiceHTTP" address="http://localhost:9090/CustomerServicePort"
  serviceName="customer:CustomerServiceService" endpointName="customer:CustomerServiceEndpoint"
  implementor="com.example.customerservice.server.CustomerServiceImpl">
  <jaxws:features>
     <ref bean="xsltFeature" />
  </jaxws:features>
</jaxws:endpoint>

It is also possible to add XSLT interceptors directly in Java code:
 CustomerServiceService service = new CustomerServiceService();
 CustomerService customerService = service.getCustomerServicePort();
 Client client = ClientProxy.getClient(customerService);
 XSLTOutInterceptor outInterceptor = new XSLTOutInterceptor(Phase.PRE_STREAM, StaxOutInterceptor.class, null,
                                                            XSLT_REQUEST_PATH);
 client.getOutInterceptors().add(outInterceptor);

By default XSLT interceptors run on POST_STREAM and PRE_STREAM phases.
In some cases it may be needed to change the phase, for example, the in transformation has to be applied after the encrypted payload has been decrypted and its signature checked.
For such transformations to succeed XLSTInInterceptor/XSLTOutInterceptor will need to be created with a constructor accepting a 'phase' String parameter.

The XSLT interceptors support following message contents:
  • InputStream/OutputStream;
  • Reader/Writer;
  • XMLStreamReader/XMLStreamWriter.
Therefore the interceptors can be used on different phases and can also work with JMS Transport using JMS Text messages (produces Reader/Writer message contents).
 

Wednesday, September 19, 2012

CXF Performance on Sun Solaris Platform: some numbers

Introduction
I would like to show some performance benchmarks for Apache CXF on SPARC/Solaris 11 Platform. CXF was deployed in OSGi Karaf container and uses Jetty as Web Server.
JMS communication was done using Apache ActiveMQ broker, database access was based on EclipseLink JPA with Apache Aries managed transactions and Oracle DB server.


Acknowledgments
A lot of thanks for colleges Marian Kueffner for composing and integration of benchmark results and Christian Schneider for supporting me with the tests.

Benchmark Goals
  • Measure maximum CXF service invocation throughput on a particular hardware configuration: SPARC/Solaris 11
  •  Identify bottlenecks, areas of tuning and enhancements
Source Code
Source code is available on GitHub: cxf.benchmarks
 
Hardware
SPARC / Solaris 11 SUN-T4-2-FAMILY-0 30351017 SPARC T4-2 512GB – 8 GbE Ports – 4 FC Ports – 4 10GbE *604532* Ports
  • 2x SPARC T4 8-Core 2,85 Ghz. processor
  • 512GB RAM (32x 16GB DDR3 DIMM MODULES)
  • 6x 600GB SAS2 10k HDD
  • 2x Qlogic FC 8Gbps HBA PCIe
  • 2x 10 GbE SFP+ HBA PCIe
  • 1x 1 GbE Quad Gigabit Ethernet HBA PCIe


Software
Deployment: Karaf OSGi (Equinox based) container
Versions:
  • Java: 6 (build 1.6.0_26-b3)
  • CXF: 2.6.0
  • Karaf: 2.2.6
  • Jetty: 7.5.4
  • Aries: 0.3.1
  • ActiveMQ: 5.6.0
  • EclipseLink: 2.3.0

Scenarios
Benchmarks were measured  for three main scenario:
  • (A) JAX-WS based Web Services (SOAP over http)
  • (B) Messaging over JMS/ActiveMQ (SOAP over JMS)
  • (C) Database access via Service (EclipseLink, container managed transaction, Oracle)


 Every scenario was executed with messages of three sizes: small (1 KB), medium (10 KB) and large (1 MB). Service implementation simply redirects request to response and in scenario (C) additionally fulfills DB access operations. The focus of benchmarks was DOM-based message model, but for some scenarios service was tested also using stream oriented message processing. Throughput on the service side and CPU load are measured.


Benchmark results

Scenario A: small messages with DOM model; HTTP


Scenario A: small messages with stream model; HTTP

 

Scenario A: medium messages with DOM model; HTTP

 

Scenario A: large messages with DOM model; HTTP

 

Special Scenario: maximal service load

In this scenario service was loaded using 4 clients: three SoapUI and one CXF. Clients were activated sequentially with 1 minute interval. You can see how service load changing when the next client becomes active.

 

 

  Scenario B: small messages with DOM model; JMS

 

Scenario B: medium messages with DOM model; JMS

 

Scenario B: large messages with DOM model; JMS

 

Scenario C: Inserts in Oracle DB using EclipseLink JPA and Aries transactions

 

Scenario C: Selects from Oracle DB using EclipseLink JPA and Aries transactions

 

Conclusion
Generally CXF shows high performance in described scenarios on Sun Solaris Platform.

I have some remarks regarding the benchmark results:
  1. Single thread shows pretty low performance, but system is highly scalable. Perhaps Solaris should be additionally tuned for single thread.
  2. Service (provider) is very scalable. As you can see in the Test Case 5, adding more consumers just increases the service throughput. I reach more than 10K messages per second, but it still was not the top.
  3. JMS scenario with ActiveMQ shows relative low performance with default configuration. The numbers are much higher if persistence is deactivated or at least immediate synchronization is switched off.
  4. Stream oriented message processing increases throughput on 25-30% even for small messages.

Monday, July 23, 2012

Custom CXF Transport



Abstract
Apache CXF is Web Services and REST framework designed in very extensible and flexible way. One very important aspect of CXF framework is transports. They are responsible for physical communication between clients and services. This paper describes how transports are organized in CXF. Paper consists from the two parts: first one gives a general overview of architecture and design of CXF transport layer, it also describes how to create own custom transport and what are the use cases for it. The second part concentrates on JMS transport and shows how to design scalable CXF applications using JMS.
 
Introduction
Presently the CXF distribution provides transport implementations for the following protocols: HTTP(S), JMS, JBI, and Local [REF-1]. HTTP(S) and JMS transports support corresponded protocols and interfaces; JBI transport provides communication with JBI service engines and binding components; Local transport is designed for optimized communication between participants in the same JVM. Apache Camel project additionally provides Camel transport for CXF [REF-2].
Normally creation of a new custom transport is required for a protocol not yet supported by CXF: UDP or FTP, for example. Of course, this case can be implemented using Camel based solution, but if it is not appropriate because of any reason – CXF custom transport is a valid alternative. New CXF transports can be also a solution for legacy ESB participants have to be implemented using standard JAX-WS interface, but should communicate using high level protocol based on old ESB (JBI transport is the example of such use case). Let analyse the CXF transport layer in details.

CXF Transport Layer

Architecture and Design
The transport functionality is based on two fundamental definitions: conduit and destination. Conduits are responsible for sending a message to recipients and destinations for receiving a message from the sender. In order to send a response, a destination needs its own back-channel conduit (in case of request-response communication). Conduits and destinations are created by a TransportFactory. CXF selects the correct TransportFactory based on the transport URL. SOAP is also considered a high level transport and has its own conduit and destination in CXF.
To send a message into a physical channel, the conduit should access the message content. As far as CXF is streaming oriented, normal practice in this case is to use a subclass of OutputStream extending CachedOutputStream. The custom stream will be fed the message and provides a possibility to access context in streaming or buffered form depending on the transport requirements. CachedOutputStream is configured to keep message in memory only up to a predefined size. If this size is exceeded, the message is swapped to disk.
A class diagram of TransportFactory, Conduit, Destination and OutputStream is shown below:



How it Works
Interaction between JAX-WS client and service using CXF transport is represented in the following figure:


What happens in transport layer on the client and on the service side by sending/receiving message? Let see it in details.

Client Workflow

    • Step1: JAX-WS client invokes a service, in this manner for example:
              URL wsdlURL = his.getClass().getResource("/HelloWorld.wsdl");
              HelloWorldService service = new HelloWorldService(wsdlURL, SERVICE_NAME);        
              HelloWorld hw = service.getHelloWorldPort();       
              String result = hw.sayHi(TEST_REQUEST);
      
        • Step 2: CXF runtime selects the correct TransportFactory based on some criteria (described below)
        • Step 3: CXF runtime calls TransportFactory.getConduit() method to obtain the conduit
        • Step 4: CXF runtime invokes Conduit.prepare() and passes outgoing message as argument
        • Step 5: Conduit sets up own OutputStream (normally extended CachedOutputStream) as outgoing message content
        • Step 6: CXF runtime processes outgoing message, calls the interceptor chain and writes outgoing message to conduit’s OutputStream stream. Messaging in CXF is stream-oriented; therefore the message normally is proceed and sent not as one bunch, but as a stream. The last bytes of the message can still be in processing, and the first one already sent to recipient. It is responsibility of Conduit how to send the message: using streaming or collecting the whole message and send it at once
        • Step 6: When CXF runtime completely proceeded outgoing message, it invokes Conduit.close(Message) method. It means that the message is completely written into OutputStream. Correspondingly, OutputStream.doClose() method will be called
        • Step 8: In the doClose() method, the OutputStream class has access to the whole marshalled outgoing message and exchange and will send this message to the service using the corresponding transport protocol. In case of streaming, the part of the message can be already sent to the network at this time, and Conduit just sends the last part and finishes the request sending
        • Step 9: In case of one-way communication exchange will be closed. Skip to Step 14
        • Step 10: In case of request-response communication, the conduit will wait for the service response in synchronous or asynchronous manner
        • Step 11: When response is received, the conduit creates a new message, sets its context and puts it as In-Message in the exchange as an incoming message. Content of new message is also available as a stream. Therefore runtime and business logic can start message processing even it still not be completely received.
        • Step 12: When fault is received, Conduit also creates a new Message, sets its context and puts it in exchange as in-fault message.
        • Step 13: Conduit notifies incomingObserver (that is ClientImpl object) about the response using incomingObserver.onMessage() call
        • Step 14: Conduit.close(Message) method is invoked for incoming message. Normally the conduit implementation decreases the reference count of current network connection, potentially closing it if the count is zero.
        • Step 15: JAX-WS client code receives the response in sync or async style

          Service Workflow
          • Step1: JAX-WS service is registered for example in this way:
               HelloWorldImpl serverImpl = new HelloWorldImpl();
               Endpoint.publish("udp://localhost:9000/hello", serverImpl);
          
          • Step 2: CXF runtime selects correct TransportFactory based on some criteria (described below)
          • Step 3: CXF runtime calls TransportFactory.getDestination() method to obtain the destination
          • Step 4: As soon as CXF runtime activates endpoint (adds listener, etc) Destination.activate() method is automatically invoked
          • Step 5: Implementation of Destination.activate() normally opens network connections and listens to incoming requests
          • Step 6: When a request comes, the destination creates a message, sets the content and notifies message observer (that is ChainInitializationObserver object) via incomingObserver.onMessage() about request. Message content is saved as a stream; therefore runtime and business logic can start processing even not completely received message. Normally an incoming connection is saved in a correlation map to be extracted for the sending of appropriate response.
          • Step 7: The business service implementation will be called with the request message in stream form. In case of one-way communication the exchange is now finished. In case of request-response, the business implementation either returns a response or throws a fault exception.
          • Step 8: The CXF Runtime requests a back-channel conduit from the destination via Destination.getInbuiltBackChannel()
          • Step 9: The Back-channel conduit's prepare() method will be called with a response message as argument
          • Step 10: Back-channel conduit sets its own OutputStream as a message context
          • Step 11: CXF runtime processes the response message, calls the interceptor chain and invokes Conduit.close(Message) for the response message.
          • Step 12. Finally OutputStream.doClose() method for the response message is invoked
          • Step 13: In doClose() method the OutputStream class has access to the marshalled response message and will send this message through the network as a response to the client. In case of streaming, the part of the message can be already sent to the network at this time, and Conduit just sends the last part and closes the sending. Normally incoming connection for specified protocol is cached and created only if necessary.

          Registration of Transport Factory
          There are two ways to register transport factory: programmatically or via Spring configuration.
          To register transport factory programmatically it is necessary to execute the following code:
          CustomTransportFactory transportFactory = new CustomTransportFactory(); 
          Bus bus = BusFactory.getThreadDefaultBus();     
          DestinationFactoryManagerImpl dfm = bus.getExtension(DestinationFactoryManagerImpl.class);
          dfm.registerDestinationFactory(TRANSPORT_IDENTIFIER, transportFactory); 
          ConduitInitiatorManager extension = bus.getExtension(ConduitInitiatorManager.class);      
          extension.registerConduitInitiator(TRANSPORT_IDENTIFIER, transportFactory);
          TRANSPORT_IDENTIFIER is unique transport id (normally in form "http://apache.org/transports/PROTOCOL_PREFIX").

          For Spring configuration, the following could be used instead:

          <bean class="org.company.cxf.transport.CustomTransportFactory"
            lazy-init="false">
            <property name="transportIds">
             <list>
                  <value>TRANSPORT_IDENTIFIER</value>
             </list>
            </property>
           </bean>

          How CXF chooses the TransportFactory
          The TransportFactory is now registered, and CXF participant will send or receive the message. How CXF finds the correct TransportFactory to do it?


          It will be fulfilled in two steps:

          1.
          Binding TransportFactory selection
          CXF interprets bindings like SOAP as high level transport and also choose appropriate
          TransportFactory for it. TransportFactory provides list of Transport IDs in method TransportFactory.getTransportIds(). As far as this list contains value of binding transport attribute and binding namespace defined in WSDL document, CXF will select this TransportFactory:
          WSDL:
          <wsdl:definitions xmlns:soap="http://schemas.xmlsoap.org/wsdl/soap/" …>
              …
              …
              <wsdl:binding name="GreeterPortBinding" type="tns: GreeterPortType">
                  <soap:binding style="document"
                      transport="http://schemas.xmlsoap.org/soap/http" />
                  …
                  <wsdl:service name="GreeterService">
                      <wsdl:port binding="tns:GreeterPortBinding" name="GreeterPort">
                          <transport:address location="LOCATION_URL">

          TransportFactory class:
            …
          public static final List
          <String>
              DEFAULT_ NAMESPACES = Arrays.asList(
              "http://schemas.xmlsoap.org/soap/",
              "http://schemas.xmlsoap.org/wsdl/soap/",
              "http://schemas.xmlsoap.org/wsdl/soap12/",
              "http://schemas.xmlsoap.org/soap/http/",
              "http://schemas.xmlsoap.org/ws dl/soap/http",
              "http://www.w3.org/2010/soapjms/",
              "http://www.w3.org/2003/05/soap/bindings/HTTP/",
              "http://schemas.xmlsoap.org/soap/http");
              public final List
              <String> getTransportIds() {
                  return DEFAULT_NAMESP ACES;
                  }

          2. Protocol TransportFactory selection
          As far as binding TransportFactory is found, CXF looking for protocol TransportFactory responsible for physical network communication. In this case important is method TransportFactory.getUriPrefixes(). This method returns list of protocol prefixes supported by this TransportFactory.
          When CXF client or service try to communicate using URL with specified protocol prefix (http://, https://, jms://, local://), CXF looks into registered transport factories map and gets the right one for this prefix. If no TransportFactory for this protocol is found, CXF throws corresponded exception. 
          Client configuration:
          <jaxws:client id="FlightReservationClient"
              xmlns:serviceNamespace="http://www.apache.org/cxf/samples/FlightReservation"
              serviceClass="org.apache.cxf.samples.flightreservation.FlightReservation"
              serviceName="serviceNamespace:FlightReservationService"
              endpointName="serviceNamespace:FlightReservationSOAP">
              address="http://localhost:8040/services/FlightReservationService">
          </jaxws:client>…

          TransportFactory class:
          …
          private static final Set
          <String>
              URI_PREFIXES = new HashSet
              <String>
                  ();
                  static {
                  URI_PREFIXES.add("http://");
                  URI_PREFIXES.add("https:");
                  }
                  public Set
                  <String> getUriPrefixes() {
                      return URI_PREFIXES;
                      }
          Conduit and Destination Lifecycle
          Destinations are normally created by service on startup and released by shutdown. Conduits can be either recreated for each request or cached based on endpoint information for whole client life time. Clients can make concurrent calls to endpoints using different protocols and bound them to different conduits.

          Concurrency Aspects
          Conduit and destination objects can by concurrently accessed by multiple threads. Implementations should care about thread safety of the class.

          Streaming
          It is strongly recommended to don’t break streaming in Conduit and Destination implementations, if physical protocol supports it. CXF is completely streaming oriented – it causes high performance and scalability.

          How to start
          What is the start point to understand the CXF transport layer and implement own transport? I would recommend to read CXF documentation [REF-1] and analyse source code of existing CXF transports (Local and JMS once are more straightforward). They are located into packages: org.apache.cxf.transport.local and org.apache.cxf.transport.jms correspondingly.
           
          Conclusion
          CXF provides very flexible and pluggable transports layer. It is possible to configure standard transport implementations delivered with CXF as well as implement and integrate a new custom transport. Creation of custom transport in CXF is straightforward.
          Some Apache projects can be easily integrated using CXF transports: Camel provides the possibility to bind CXF participants directly to Camel route; JBI transport simplifies communication with ServiceMix applications.

          References
          REF-1: CXF transports overview http://cxf.apache.org/docs/transports.html  
          REF-2: Camel CXF transport http://camel.apache.org/camel-transport-for-cxf.html

          Sunday, February 26, 2012

          Scalable CXF applications using JMS transport

          Java Message Service (JMS) is wide spread and popular messaging API. As far JMS is standardized, the same application code can successfully work with different JMS implementations: WS MQ, Active MQ, Tibco, Joram, BEA WebLogic, OpenJMS. CXF provides a transport that enables endpoints to use JMS queues and topics.

          Default CXF consumer and producer using JMS
          Implementing CXF client and service using JMS transport is trivial.
          Basically, it is enough to configure two things in WSDL:
          a) specify jms transport URI in binding element;
          b) define jms address in port element


          WSDL binding and port should look like:


          <wsdl:definitions xmlns:jms="http://cxf.apache.org/transports/jms" ...
              <wsdl:binding name="Greeter_SOAPBinding" type="tns:Greeter">
                  <soap:binding style="document"
                      transport="http://cxf.apache.org/transports/jms" />
                  ...
              </wsdl:binding>
              <wsdl:service name="JMSGreeterService">
                  <wsdl:port binding="tns:JMSGreeterPortBinding" name="GreeterPort">
                      <jms:address destinationStyle="queue"
                          jndiConnectionFactoryName="ConnectionFactory"
                          jndiDestinationName="dynamicQueues/test.cxf.jmstransport.queue">
                          <jms:JMSNamingProperty name="java.naming.factory.initial"
                              value="org.apache.activemq.jndi.ActiveMQInitialContextFactory" />
                          <jms:JMSNamingProperty name="java.naming.provider.url"
                              value="tcp://localhost:61616" />
                      </jms:address>
                  </wsdl:port>
              </wsdl:service> 

          CXF clients and servers implemented in java or using Spring configuration magically work for this WSDL (under the hood CXF selects correct JMS Conduit and Destination based on address URL). Details are described in [REF-3].
          CXF also delivers jms_queue and jms_pubsub examples illustrating using of JMS transport with default settings for ActiveMQ [REF-4].


          Scalability problems
          Unfortunately there are two main scalability drawbacks when using default JMS configuration:


          1. It doesn't provide sessions pooling and consumers/producers cache (*).
          2. Default JMS message consumer is single threaded. It means that only one thread will get messages from the queue or topic and pass them to further processing.
          Both aspects are critical for enterprise applications and their implementation is not an easy task. Is there any easy solution? Yes: Spring JMS functionality and CXF Features. Let discuss them in detail.

          (*) - Some JMS vendors provide integrated session pooling and consumers/producers cache in ConnectionFactory. In this case using Spring CachingConnectionFactory is not necessary. Please refer vendor documentation to clear it.


          Spring JMS functionality
          Spring provides a number of useful classes that helps to implement scalable JMS application. Important for us are: org.springframework.jms.connection.CachingConnectionFactory
          org.springframework.jms.listener.DefaultMessageListenerContainer

          1. CachingConnectionFactory
          CachingConnectionFactory provides the possibility to configure session pooling, consumers and producers cache. Bellow is a sample configuration of CachingConnectionFactory:>


          <bean id="cachingConnectionFactory"
           class="org.springframework.jms.connection.CachingConnectionFactory">
              <property name="targetConnectionFactory">
                  <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                      <property name="brokerURL" value="tcp://localhost:61616" />
                  </bean>
              </property>
              <property name="sessionCacheSize" value="20" />
              <property name="cacheProducers" value="true" />
              <property name="cacheConsumers" value="true" />
          </bean>

          As you can see it is possible to set the size of the session pool and switch on producers and consumers caching.

          2. DefaultMessageListenerContainer
          DefaultMessageListenerContainer enables getting messages from the destination in parallel, using multiple threads.
          Configuration of DefaultMessageListenerContainer looks like:
          
          <bean id="queueContainerListener"
           class="org.springframework.jms.listener.DefaultMessageListenerContainer">
              <property name="connectionFactory" ref="connectionFactory" />
              <property name="destinationName" value="Q_WM_OUT" />
              <property name="messageListener" ref="simpleListener" />
              <property name="cacheLevel" value="3" />
              <property name="concurrentConsumers" value="10" />
              <property name="maxConcurrentConsumers" value="50" />
          </bean> 

          It is possible to define here:
          • Initial and maximal number of concurrent consumers. This tells the Spring to always start up a initial number of consumers (concurrentConsumers). When a new message has been received, if the maxConcurrentConsumers has not been reached, then a new consumer is created to process the message.
          • Cache level (3- cache connections, sessions and consumers; 2 – cache connections and sessions, 1 – cache connections only)
          • Specify message listener class (implementing MessageListener interface) and connection factory.
          It is important to be aware of following things related to consumers caching:
          • Normally it makes no sense to increase the number of concurrent consumers for a JMS topic. It leads to concurrent consumption of the same message which is not desirable.
          • The concurrentConsumers property and the maxConcurrentConsumers property can be modified at runtime, for example, via JMX.
          Details about spring based configuration are very good described in Bruce Snider's Blog [REF-5].
          You can see that Spring provides solution for both mentioned scalability aspects. But how we can use it in CXF?

          CXF JMS Configuration Feature
          As the CXF JMS implementation is based the Spring JMS classes the user can benefit from described Spring JMS functionality.
          CXF provides a Feature to configure details of the JMS transport: JmsConfigFeature. A CXF Feature is something that is able to customize a Server, Client, or Bus, typically adding capabilities. In our case we will add a feature in jaxws:endpoint and jaxws:client to tune the JMS transport. Let see how to configure CXF client and service using this feature.

          Server configuration
          
          <bean id="cachingConnectionFactory"
          class="org.springframework.jms.connection.CachingConnectionFactory">
              <property name="targetConnectionFactory">
                  <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                      <property name="brokerURL" value="tcp://localhost:61616" />
                  </bean>
              </property>
              <property name="sessionCacheSize" value="20" />
              <property name="cacheConsumers" value="true" />
          </bean>
          <bean id="jmsConfig" class="org.apache.cxf.transport.jms.JMSConfiguration"
              p:connectionFactory-ref="cachingConnectionFactory"
          
             p:cacheLevel="3"
              p:concurrentConsumers="16" 
          
              p:maxConcurrentConsumers="16"
              p:targetDestination="Q_HSC"
              p:wrapInSingleConnectionFactory="false" />
          <jaxws:endpoint id=" JMSGreeterService" address="jms://"
              implementor="#JMSGreeterServiceImpl">
              <jaxws:features>
                  <bean class="org.apache.cxf.transport.jms.JMSConfigFeature">
                      <p:jmsConfig-ref = jmsConfig ">
                  </bean>
              </jaxws:features>
          </jaxws:endpoint>

          The jaxws:endpoint configuration containing the JMSConfigFeature. This feature has a property with JMSConfiguration type.
          JMSConfiguration property supports all settings that we have seen in Spring DefaultMessageListenerContainer: cached connection factory with session pool size, number of concurrent consumers, cache level. All settings of JMSConfiguration are described in [REF-6].
          Using this configuration the server application can be tuned to achieve optimal performance in our target environment. 
           
          Client configuration

          
          <bean id="cachingConnectionFactory"
             class="org.springframework.jms.connection.CachingConnectionFactory">
              <property name="targetConnectionFactory">
                  <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                      <property name="brokerURL" value="tcp://localhost:61616" />
                  </bean>
              </property>
              <property name="sessionCacheSize" value="20" />
              <property name="cacheProducers" value="true" />
          </bean>
          <bean id="jmsConfig" class="org.apache.cxf.transport.jms.JMSConfiguration"
              p:connectionFactory-ref="connectionFactory" p:targetDestination="Q_HSC"
              p:cacheLevel="3" p:concurrentConsumers="16"
              p:maxConcurrentConsumers="16" p:wrapInSingleConnectionFactory="false" />
          <jaxws:client id="JMSGreeterService" address="jms://"
              serviceClass="com.sopera.services.tpoc.eventgenerator.EventGenerator">
          <jaxws:features>
          <bean class="
              org.apache.cxf.transport.jms.JMSConfigFeature ">
              <property name="jmsConfig" ref="jmsConfig" />
          </bean>
          </jaxws:features>
          </jaxws:client>


          Client configuration looks very similar to the server one except one thing: CachingConnectionFactory activates producers caching instead consumers caching.

          Conclusion
          CXF gives a user the possibility to tune the standard transports. To achieve a high scalability of a CXF client and service in JMS communication they can use the CXF JMS Configuration Feature. It is not necessary to write any line of code, just configure and leverage already existing stuff.
          Using this feature can have essential influence on the performance for some environments: in one Prove of Concept I have improved CXF service throughput on 360% (from 500 to 1800 msg/sec) just using session pool and multithread JMS consumer!
          Reference performance numbers for SOAP over JMS are represented in [REF-7], you can easily compare it with own results and make appropriate tuning if necessary.

          References
          REF-3: CXF JMS Transport http://cxf.apache.org/docs/jms-transport.html
          REF-4: cxf-distribution/examples: jms_queue; jms_pubsub
          REF-5: Bruce Snider's blog http://bsnyderblog.blogspot.de/2010/05/tuning-jms-message-consumption-in.html
          REF-6: CXF JMS Configuration Feature http://cxf.apache.org/docs/using-the-jmsconfigfeature.html
          REF-7: Christian Schneider's blog: CXF performance http://www.liquid-reality.de/pages/viewpage.action?pageId=5865562