Mike Cantrell

2>&1

Spring Batch and Mule

Posted by mcantrell on March 29, 2009

Unfortunately, I still have to deal with flat files quite a bit when dealing with business partners. Flat files aren’t all bad.. they’re easily understood and somewhat effecient to transmit payloads between disperate systems (when compared to XML at least). In the past, I’ve rolled out my own flat file mapping systems but I’ve recently caught a glimpse of Spring Batch and I must say that I like what I see.

For me, Mule and Spring Batch seems like a natural fit for processing data that must be recieved or delivered as a flat file. I created a  couple of Mule transfomers based upon the framework:

FlatFileToBeans converts your flat file to a list of java beans
BeansToFlatFile converts a list of java beans to a flat file

FlatFileToBeans.groovy

public class FlatFileToBeans extends AbstractTransformer {

  private static final def log = LogFactory.getLog(FlatFileToBeans)

  ObjectPool pool

  FlatFileToBeans() {
    super.registerSourceType(byte[].class)
    super.registerSourceType(InputStream.class)
  }

  protected Object doTransform(Object src, String encoding) {
    FlatFileItemReader reader = null
    try {
      reader = (FlatFileItemReader) pool.borrowObject()
      reader.encoding = encoding
      reader.resource = convertToresource(src)
      return transform(reader)
    }
    finally {
      pool.returnObject(reader)
    }
  }

  protected Resource convertToresource(Object src) {
    return src instanceof InputStream ? new InputStreamResource(src) : new  ByteArrayResource(src)
  }

  protected def transform(FlatFileItemReader reader) {
    reader.open(new ExecutionContext())
    def beans = []
    def bean
    while (bean = reader.read()) {
      beans += bean
    }
    return beans
  }

}

BeansToFlatFile.groovy

public class BeansToFlatFile extends AbstractTransformer {
  private static final def log = LogFactory.getLog(BeansToFlatFile)

  ObjectPool pool
  String headers
  String lineDelimeter = "\r\n"

  BeansToFlatFile() {
    super.registerSourceType(List)
  }

  byte[] transform(List domainObjects) {
    LineAggregator aggregator = null
    try {
      aggregator = (LineAggregator)pool.borrowObject()
      StringBuffer sb = new StringBuffer()
      if (headers) {
        sb.append("${headers}${lineDelimeter}")
      }
      domainObjects.each {bean ->
        String line = aggregator.aggregate(bean)
        sb.append("${line}${lineDelimeter}")
      }
      String response = sb.toString()
      if (log.isTraceEnabled()) {
        log.trace("Flat file output data:\n ${response}")
      }
      return response.bytes
    } finally {
      pool.returnObject(aggregator)
    }
  }

  protected Object doTransform(Object src, String encoding) {
    return transform(src)
  }
}

The real magic here is in the wiring (of course). You might also notice that I’m using a commons object pool. Mule transformers are not thread safe (learned this the hard way). Each transformer is created only once per endpoint and is shared for each thread. Orginally, I synchronized my transformer but that’s not really performant. Instead, I created a quick object factory that make suse of the spring bean factory to create objects for the pool:

SpringBeanPoolableObjectFactory.groovy

public class SpringBeanPoolableObjectFactory extends BasePoolableObjectFactory implements BeanFactoryAware {
  String beanName
  BeanFactory beanFactory

  public Object makeObject() {
    if (beanFactory.isSingleton(beanName)) {
      throw new InvalidObjectPoolException("${beanName} is configured as a singleton and cannot be pooled")
    }
    return beanFactory.getBean(beanName)
  }
}

Now, you can wire it all together. Here’s an example of both:

springbatch-config.xml

	<bean id="fooheaders" class="java.lang.String">
		<constructor-arg  value="name, description, comments"/>
	</bean>
	<bean class="com.acme.model.Foo" id="foo" scope="prototype"/>
	<bean id="fooFileItemReader" scope="prototype" class="org.springframework.batch.item.file.FlatFileItemReader">
		<property name="lineMapper">
			<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
				<property name="lineTokenizer">
					<bean class="org.springframework.batch.item.file.transform.FixedLengthTokenizer">
						<property name="names" value="name, description, comments"/>
						<property name="columns" value="1-4,5-12,13-22"/>
					</bean>
				</property>
				<property name="fieldSetMapper">
					<bean class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
						<property name="prototypeBeanName" value="foo"/>
					</bean>
				</property>
			</bean>
		</property>
	</bean>
	<bean id="baseBeanPool" abstract="true" class="org.apache.commons.pool.impl.GenericObjectPool">
		<property name="whenExhaustedAction">
			<util:constant static-field="org.apache.commons.pool.impl.GenericObjectPool.WHEN_EXHAUSTED_BLOCK"/>
		</property>
		<property name="maxWait" value="30000"/>
		<property name="maxActive" value="20"/>
	</bean>
	<bean id="fooFileItemReaderPool" class="org.apache.commons.pool.impl.GenericObjectPool"
		  parent="baseBeanPool">
		<property name="factory">
			<bean class="com.sonicdrivein.esb.share.pool.SpringBeanPoolableObjectFactory">
				<property name="beanName" value="fooFileItemReader"/>
			</bean>
		</property>
	</bean>
	<bean id="fooLineAggregator" scope="prototype"
		  class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">
		<property name="delimiter" value=","/>
		<property name="fieldExtractor">
			<bean class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor">
				<property name="names" ref="fooheaders"/>
			</bean>
		</property>
	</bean>
	<bean id="fooLineAggregatorPool" class="org.apache.commons.pool.impl.GenericObjectPool"
		  parent="baseBeanPool">
		<property name="factory">
			<bean class="com.sonicdrivein.esb.share.pool.SpringBeanPoolableObjectFactory">
				<property name="beanName" value="fooLineAggregator"/>
			</bean>
		</property>
	</bean>

mule-config.xml

    <spring:beans>
        <spring:import resource="classpath:springbatch-config.xml"/>
    </spring:beans>
	<custom-transformer name="flatFileToFoo" class="com.sonicdrivein.esb.share.transformer.FlatFileToBeans">
		<spring:property name="pool" ref="fooFileItemReaderPool"/>
	</custom-transformer>
	<custom-transformer name="fooToFlatFile" class="com.sonicdrivein.esb.share.transformer.BeansToFlatFile">
		<spring:property name="pool" ref="fooLineAggregatorPool"/>
		<spring:property name="headers" ref="fooheaders"/>
	</custom-transformer>

I see myself getting a lot of reuse out of these transformers and I hope you do too :)

Posted in Groovy, Mule, Spring | 2 Comments »

Better Living Through Cooking

Posted by mcantrell on November 8, 2008

Nothing resets my brain after a long, hard day better than coming home to cook dinner. It’s like a complete mind shift. Anything that might have been bothering me from the work day is completely faded by the end of the meal.

It has plenty of other benefits as well. You’ll lose weight and save money. After a while, you’ll wonder why you ever paid for someone to cook your food for you!

If you have a stressful job (other than the food industry), I highly recommend cooking as a relaxation technique.

Pro-Tip: buy a 10-12″ iron skillet

Posted in Cooking | Leave a Comment »

JavaMail and Exchange Woes

Posted by mcantrell on September 11, 2008

Recently, I had the displeasure of uncovering a rather nasty compatibility problem between JavaMail (Sun’s reference implementation) and Microsoft Exchange. Surprisingly, both JavaMail and Exchange appear to be somewhat at fault.

If you take a look at the isConnected method in SMTPTransport.java, you’ll see a rather sloppy piece of code:

// NOOP should return 250 on success, however, SIMS 3.2 returns
// 200, so we work around it.
//
// Hotmail doesn't implement the NOOP command at all so assume
// any kind of response means we're still connected.
// That is, any response except 421, which means the server
// is shutting down the connection.
//
if (resp >= 0 && resp != 421) {
	return true;
} else {
	try {
		closeConnection();
	} catch (MessagingException mex) { }	// ignore it
	return false;
}

I can understand trying to work with the lowest common denominator but this is just plain bad. There are a lot of really bad SMTP response codes that I really wouldn’t consider a successful response.

That brings us to Exchange’s half of this problem. Exchange times out clients after 10 minutes by default. The problem is that they return a generic error code (451) when there is a very specific code for things like timeouts (421). See section 3.9, Terminating Sessions and Connections of RFC 2821.

This isn’t normally a problem because most applications close the connection to the server after sending an email. It does become a problem when you want to pool the connections to the SMTP server for performance optimizations. This is how I stumbled upon the problem. Mule pools dispatchers for its connectors. For the Smtp connector, the dispatcher uses the SMTPTransport object’s isConnected() method to determine if it should reconnect before sending the message:

/*
 * Double check that the transport is still connected as some SMTP servers may
 * disconnect idle connections.
 */
if (!transport.isConnected())
{
	EndpointURI uri = endpoint.getEndpointURI();
	if (logger.isInfoEnabled())
	{
		logger.info("Connection closed by remote server. Reconnecting.");
	}
	transport.connect(uri.getHost(), uri.getPort(), uri.getUser(), uri.getPassword());
}

This will ultimately result in a socket write error or the following error:

com.sun.mail.smtp.SMTPSendFailedException: [EOF]
        at com.sun.mail.smtp.SMTPTransport.issueSendCommand(SMTPTransport.java:1388)
        at com.sun.mail.smtp.SMTPTransport.mailFrom(SMTPTransport.java:959)
        at com.sun.mail.smtp.SMTPTransport.sendMessage(SMTPTransport.java:583)
        at org.mule.transport.email.SmtpMessageDispatcher.sendMailMessage(SmtpMessageDispatcher.java:124)
        at org.mule.transport.email.SmtpMessageDispatcher.doDispatch(SmtpMessageDispatcher.java:95)
        at org.mule.transport.AbstractMessageDispatcher$Worker.run(AbstractMessageDispatcher.java:262)
        at org.mule.work.WorkerContext.run(WorkerContext.java:310)
        at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1061)
        at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:575)
        at java.lang.Thread.run(Thread.java:613)

Mule has a configurable time to live for dispatchers in the connector’s pool but I could not get it to work for some reason:

    <smtp:connector name="FooSmtpConnector">
        <dispatcher-threading-profile threadTTL="30000"/>
        <default-connector-exception-strategy>
            <jms:outbound-endpoint queue="foo/error"/>
        </default-connector-exception-strategy>
    </smtp:connector>

After much frustration, I finally figured out that it’s relatively simple to explicitly configure what the JavaMail Session object uses for it’s transport. You just need to place a javamail.providers file in your META-INF directory:

protocol=smtp; type=transport; class=com.acme.mail.ExchangeSmtpTransport; vendor=Acme Inc;

Knowing this, I simply extended the existing SMTPTransport and overrode the isConnected method:

package com.acme.mail

import javax.mail.Transport
import javax.mail.Message
import javax.mail.Address
import javax.mail.Session
import javax.mail.URLName
import com.sun.mail.smtp.SMTPTransport
import org.apache.commons.logging.LogFactory

class ExchangeSmtpTransport extends SMTPTransport {

	static private final def log = LogFactory.getLog(ExchangeSmtpTransport)

	public ExchangeSmtpTransport(Session session, URLName urlname) {
		super(session, urlname);
	}

	public boolean isConnected() {
		def connected = super.isConnected()
		if (connected) {
			// still connected: 250 2.0.0 OK
			// client timeout: 451 Timeout waiting for client input
			if (!lastServerResponse?.startsWith("250")) {
				connected = false
				close()
			}
			if (log.isDebugEnabled()) {
				log.debug("Last Server Response: " + lastServerResponse)
			}
		}
		return connected
	}

	public void close() {
		try {
			super.close()
		}
		catch (e) {
			log.error("Error closing connection", e)
		}
	}

}

Posted in Java, Mule | 4 Comments »

Groovy & Dynamic Typing vs. Weak Typing

Posted by mcantrell on August 30, 2008

Silly test incomming:

class DataTypeConversionTests extends GroovyTestCase {
    void testInteger() {
        Integer i = 49
        String s = '1'
        assert s == i
    }
}

Stupid test huh? No way it would pass.. Think again. Try it yourself. Then go lookup the ASCII value of 1.

You see.. it passes because Groovy has no concept of a character literal. In Java, single quotes are normally reserved to represent characters. Groovy allows them to represent a String. See the problem? Groovy does it’s best to compensate by considering all single character Strings as char date types when trying to convert.

Why would you ever do something so silly? It’s not as silly as you might think. Groovy and Grails provide a lot of abstractions. It’s not always obvious what types of data you are dealing with. Dealing with JSON or XML builders for instance. Additionally, Grails hides data binding behind a transparent beanwrapper in Controllers so you don’t see your HTTP parameters getting auto-converted inf the binding process. You can see how you could get lazy here.

If you’ve read Programming Groovy by Venkat Subramaniam (great book and even better speaker), you might take note with chapter 4.3. Venkat asserts that Groovy is both a dynamic and strongly typed language (aka dynamic != weakly typed). You should see an error at runtime if you do some funny business. That is true 99% of the time but it’s clearly not applicable for the scenario mentioned here.

I’m not sure any of this is cool but I’m not sure what else you would do if you allow Strings with single quotes and you also want a character data type.

Posted in Groovy | Leave a Comment »

Extended JMS Configuration with Grails Plugin

Posted by mcantrell on August 25, 2008

Glenn Smith put together a really good blog post the other day regarding the JMS Plugin for Grails. When I started playing around with it, one thing became particularly troublesome to me. It requires one class per destination queue/topic.

Example from the grails plugin page:

class SampleQueueService {
    static expose = ['jms']
    def onMessage = { println "GOT MESSAGE: $it" }
}

I’d much rather have a service with multiple destinations:

class SampleQueueService {
	static jmsMethods = [
			"lookup": [
				concurrentConsumers: 1,
				destinationName: "foo/lookup"
			],
			"save": [
				concurrentConsumers: 1,
				destinationName: "foo/save"
			]
	]

	def lookup(message)  {
		println ">>>> Lookup: ${message}"
	}
	def save(message)  {
		println ">>>> Save: ${message}"
	}
}

If you take a look at what the JMS plugin is doing, it’s really just wrapper around Spring’s MessageListenerAdapter with some dynamic context generation. The meat and potatoes of the plugin is ClosureMessageListenerAdapter which extends MessageListenerAdapter to allow calling closures. The rest of it is just standard Spring config.

I took the concept that the JMS plugin uses and implemented it in my own resources.groovy:

beans = {
	connectionFactory(ActiveMQConnectionFactory) {
		brokerURL = CH.config.jms.brokerUrl
	}
	jmsTemplate(JmsTemplate) {
		connectionFactory = ref(connectionFactory)
	}
	def jmsMethods = GrailsClassUtils.getStaticPropertyValue(SampleQueueService, 'jmsMethods')
	jmsMethods.each {method, jmsOptions ->
		log.info "Registering JMS Listener for SampleQueueService.${method} with config: ${jmsOptions}"
		"SampleQueueService${method}JmsListener"(ClosureMessageListenerAdapter, ref("sampleQueueService")) {
			defaultListenerMethod = method
		}
		"SampleQueueService${method}JmsContainer"(DefaultMessageListenerContainer) {
			autoStartup = true
			concurrentConsumers = jmsOptions.concurrentConsumers ?: 1
			destinationName = jmsOptions.destinationName ?: "foo/${method}"
			connectionFactory = ref(connectionFactory)
			messageListener = ref("SampleQueueService${method}JmsListener")
		}
	}
}

It obviously needs to be a bit smarter. More jms configuration options (durable, topics, etc.). Possibly find all services instead of a single class, etc..

Posted in Grails, JMS | Leave a Comment »

Pencil – Firefox UI Sketching

Posted by mcantrell on August 23, 2008

I saw a really cool add-on for Firefox today called Pencil. It’s highly touted by Mozilla labs folks.

Pencil Homepage

Essentially, it’s a light weight UI prototyping tool. It comes with quite a few usable stencils but it also supports adding custom stencils. More details at the project’s homepage.

Posted in User Interface | Leave a Comment »

Grails Criteria and Data Type Conversions

Posted by mcantrell on August 23, 2008

I find it kind of curious that Grails provides automatic type conversion for the domain classes (for setters and in the constructor) but not in the HibernateCriteriaBuilder. At worst, this makes it difficult to create dynamic, reusable criteria queries that use the values in the parameters. At best, it leads to ugly code when you know the domain class and it’s property class.

Luckily, HibernateCriteriaBuilder exposes the target’s class for us so it makes it easier to do the type conversion ourselves. Maybe someone knows a “Groovier” way to approach this but here is what I’ve come up with:

Controller code:

    def list = {
        def pagingConfig = buildPagingConfig(params)
        def results = Product.createCriteria().list(pagingConfig, buildFilterCriteria)
    }

Some reusable utility code:

    def converter = new ClassPropertyTypeConverter()

    def buildFilterCriteria= {
        def domainClass = delegate.targetClass
        10.times {index ->
            def field = params["filter[${index}].name"]
            def value = params["filter[${index}].value"]
            if (field && value) {
                like(field, "%${converter.convert(domainClass, field, value)}%").ignoreCase()
            }
        }
    }

The delegate variable is a special object available to all closures which contains the object which is calling the closure. In this case, it’s HibernateCriteriaBuilder.

A simple converter:

import org.springframework.beans.SimpleTypeConverter
import org.codehaus.groovy.grails.commons.GrailsClassUtils

class ClassPropertyTypeConverter {
    def converter =  new SimpleTypeConverter()

    def convert(clazz, property, value) {
        def targetClass = GrailsClassUtils.getPropertyType(clazz, property)
        return converter.convertIfNecessary(value, targetClass)
    }
}

It would be really nice for the G2One guys to add these types of conversions automatically. It should be pretty easy.

Posted in Grails, Groovy | 1 Comment »

ExtJS Grids and Grails

Posted by mcantrell on August 22, 2008

I’ve been playing around with ExtJS and Grails lately. I have to say that I’m quite impressed with the quality of ExtJS (I’ve used YUI a lot in the past). There are some really cool community extensions as well. The GridsFilter extension puts filter controls on the grids:

Grid Control

This all ties in to backend (JSON) pagination, filtering, sorting very easily with Grails. I’d hate to do this with POJOs. Here’s an example closure from one of the controllers. It still needs some detection for special types of filters but you get the idea.

    def list = {
        def pagingConfig = [
                max: params.limit ?: 25,
                offset: params.start ?: 0,
                sort: params.sort ?: 'id',
                order: params.dir ?: 'asc'
        ]
        def results = Product.createCriteria().list(pagingConfig) {
            5.times {index ->
                def field = params["filter[${index}][field]"]
                def type = params["filter[${index}][data][type]"]
                def value = params["filter[${index}][data][value]"]
                if (field && value) {
                    like(field, "%${value}% ).ignoreCase()
                }
            }
        }
        def json = [
                metaData: [
                        totalProperty: 'totalCount',
                        root: 'products',
                        id: 'id',
                        fields: [
                                [name: "id", type: "int"],
                                [name: "name"],
                                [name: "description"],
                                [name: "price", type: "float"],
                                [name: "weight", type: "float"]
                        ],
                        sortInfo: [
                                field: pagingConfig.sort,
                                direction: pagingConfig.order
                        ]
                ],
                totalCount: results.totalCount,
                products: results.list
        ]
        render json as JSON
    }

The javascript looks something like:

Avail.ProductGrid = Ext.extend(Ext.grid.GridPanel, {
    constructor: function(url) {
        this.pageSize = 25;
        var filters = new Ext.ux.grid.GridFilters({filters:[
            {type: 'numeric',  dataIndex: 'id'},
            {type: 'string',  dataIndex: 'name'},
            {type: 'string', dataIndex: 'description'},
            {type: 'numeric',  dataIndex: 'price'},
            {type: 'numeric', dataIndex: 'weight'}
        ]});

        var store = new Ext.data.JsonStore({
            url: url,
            remoteSort: true,
            fields: [] // initialized from json metadata
        });
        var pagingBar = new Ext.PagingToolbar({
            pageSize: this.pageSize,
            store: store,
            displayInfo: true,
            displayMsg: 'Displaying products {0} - {1} of {2}',
            emptyMsg: "No products to display",
            plugins: filters
        });
        var config = {
            store: store,
            columns: [
                {id:'id', header: "ID", width: 40, sortable: true, dataIndex: 'id'},
                {id:'name', header: "Name", width: 160, sortable: true, dataIndex: 'name'},
                {id: 'description', header: "Description", width: 75, sortable: true, dataIndex: 'description'},
                {id: 'price', header: "Price", width: 75, sortable: true, renderer: 'usMoney', dataIndex: 'price'},
                {id: 'weight', header: "Weight", width: 75, sortable: true, dataIndex: 'weight'}
            ],
            stripeRows: true,
            autoExpandColumn: 'description',
            height:350,
            width:600,
            title:'Products',
            bbar:pagingBar,
            plugins: filters
        }
        Avail.ProductGrid.superclass.constructor.apply(this, [config])
    },

    onRender:function() {
        this.store.load({params:{start:0, limit:this.pageSize}});
        Avail.ProductGrid.superclass.onRender.apply(this, arguments);
    }
});
Ext.reg('ProductGrid', Avail.ProductGrid);

Posted in Grails, Groovy, Javascript | 2 Comments »