Filtering logs using logstash

Our application produces a lot of logs. Sometimes we want to extract some specific information from it, for example the timestamps and duration of the REST calls in it or have a specific list of all the errors (possible excluding some common ones).

You can use logstash to do this kind of filtering. In this case, extracting information from one or more log files into specific files.

Let’s start by defining the input files.

input {
 
  file {
    type => "logfile"
    path => [ "/home/joachim/temp/logs/*.log" ]
    sincedb_path => "/dev/null"
  }
 
}

We just specify where the logs we want to process can be found. Note that only changes which occur after logstash is started will be processed.

Our logged items can contain multiple lines so lines need to be combined. The data is also structured, so the fields can be extracted. This uses the format I already explained earlier.

filter {
 
  multiline {
    pattern => "^[\[~]"
    negate => true
    what => "previous"
    # enable_flush is not recommended but assures that the last log statement (multi-line) of the file is also processed
    enable_flush => true 
  }
 
  grok {
    match => [ 
      "message", "~%{NOTSPACE:application} %{TIMESTAMP_ISO8601:timestamp} \[%{DATA:server}\-%{DATA:thread}\] %{LOGLEVEL:severity}\s+%{JAVAFILE:category} \- %{GREEDYDATA:shortmessage}"
    ]
  }

Now extract some additional fields which are not available in each message.

  grok {
    match => [ "message", "Duration: %{NUMBER:duration:float}s" ]
    tag_on_failure => [] 
  }
 
  grok {
    match => [ "message", "ldapid: %{WORD:ldapid}" ]
    tag_on_failure => [] 
  }
 
  grok {
    match => [ "message", "role: %{WORD:role}" ]
    tag_on_failure => [] # deze regel matcht niet altijd
  }
 
  grok {
    match => [ "message", "organisation: %{WORD:organisation}" ]
    tag_on_failure => [] # deze regel matcht niet altijd
  }
 
  grok {
    #match => [ "message", "Service: %{WORD:http_command} %{URI:endpoint}" ]
    match => [ "message", "Service: %{WORD:http_command} %{NOTSPACE:endpoint}" ]
    tag_on_failure => [] 
  }

Specify the date format.

  date {
    # 2013-09-23T11:27:14.177+0200
    match => [
        "timestamp", "yyyy-MM-dd'T'HH:mm:ss.SSSZ"
    ]
  }

Now add some tags for the type of output. These are very practical for determining the output files. This is the last part of the filter section in the configuration file.

  # add tag to type of output
  if [severity] == "ERROR" {
    mutate {
      add_tag => "error"
    }
  }
  if "b.v.a.d.s.util.ProfilingSchedule" in [message] {
    mutate {
      add_tag => "profile"
    }
  }
  if [endpoint] =~ /http/ {
    mutate {
      add_tag => "endpoint"
    }
  }
 
}

Now put the details we want in different files. We have one file which only contains the errors, one file which contains the profiling info and one file which contains a CSV file (space separated) of REST requests.

output {
 
  if "error" in [tags] and ( "huidige_gebruiker" not in [endpoint] ) {
    file 
    {
      path => "dc-errors.txt"
      max_size => 20000000
      message_format => "%{message}"
    }
  }
 
  if "profile" in [tags] {
    file 
    {
      path => "dc-profile.txt"
      max_size => 20000000
      message_format => "%{message}"
    }
  }
 
  if "endpoint" in [tags] {
    file 
    {
      path => "dc-requests.txt"
      max_size => 20000000
      message_format => "%{timestamp} %{http_command} %{endpoint} %{duration}"
    }
  }
 
  #remove this line and uncomment lines below for debugging
  #stdout {
  #  message_format => "%{timestamp}"
  #}
 
}

Logging JVM and OS metrics in a spring application

We have a problem. We have an important application which is running in production, but we do not have access to details about the production system. When we get get reports of slowness, there is no way for us to see what was happening on the server at that time. Was memory low, CPU high, is it a garbage collection problem?

We basically wanted to get some insights added in our logs. Fortunately using metrics and metrics-spring this is easy using a configuration file like the following:

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:metrics="http://www.ryantenney.com/schema/metrics"
       xsi:schemaLocation="
           http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
           http://www.ryantenney.com/schema/metrics
           http://www.ryantenney.com/schema/metrics/metrics-3.0.xsd">
 
    <!-- Registry should be defined in only one context XML file -->
    <metrics:metric-registry id="metrics" />
 
    <!-- (Optional) Registry should be defined in only one context XML file -->
    <metrics:reporter type="slf4j" metric-registry="metrics" period="1m" />
 
    <!-- The registry defines the data to capture. Metrics in this example require the metrics-jvm jar. -->
    <metrics:register metric-registry="metrics">
        <bean metrics:name="jvm.gc" class="com.codahale.metrics.jvm.GarbageCollectorMetricSet" />
        <bean metrics:name="jvm.memory" class="com.codahale.metrics.jvm.MemoryUsageGaugeSet" />
        <bean metrics:name="jvm.thread-states" class="com.codahale.metrics.jvm.ThreadStatesGaugeSet" />
        <bean metrics:name="os" class="be.vlaanderen.awv.dc.util.metrics.OperatingSystemGaugeSet" />
    </metrics:register>
 
</beans>

metrics-spring by default contains gauges to register the garbage collector behaviour, the memory usage and thread states (including info deadlocks). Is also contains a gauge for the number of file descriptors, but wanted to track more information including

  • committed virtual memory size
  • total swap space (size)
  • free swap space (size)
  • process CPU time
  • total physical memory (size)
  • free physical memory (size)
  • max file descriptor count
  • free file descriptor count
  • system CPU load (average for last minute)
  • process CPU load (average for last minute)

This can be done using the following class:

import com.codahale.metrics.Gauge;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricSet;
 
import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
 
/**
 * A set of gauges for operating system settings.
 */
public class OperatingSystemGaugeSet implements MetricSet {
 
    private final OperatingSystemMXBean mxBean;
    private final Optional<Method> committedVirtualMemorySize;
    private final Optional<Method> totalSwapSpaceSize;
    private final Optional<Method> freeSwapSpaceSize;
    private final Optional<Method> processCpuTime;
    private final Optional<Method> freePhysicalMemorySize;
    private final Optional<Method> totalPhysicalMemorySize;
    private final Optional<Method> openFileDescriptorCount;
    private final Optional<Method> maxFileDescriptorCount;
    private final Optional<Method> systemCpuLoad;
    private final Optional<Method> processCpuLoad;
 
    /**
     * Creates new gauges using the platform OS bean.
     */
    public OperatingSystemGaugeSet() {
        this(ManagementFactory.getOperatingSystemMXBean());
    }
 
    /**
     * Creates a new gauges using the given OS bean.
     *
     * @param mxBean an {@link OperatingSystemMXBean}
     */
    public OperatingSystemGaugeSet(OperatingSystemMXBean mxBean) {
        this.mxBean = mxBean;
 
        committedVirtualMemorySize = getMethod("getCommittedVirtualMemorySize");
        totalSwapSpaceSize = getMethod("getTotalSwapSpaceSize");
        freeSwapSpaceSize = getMethod("getFreeSwapSpaceSize");
        processCpuTime = getMethod("getProcessCpuTime");
        freePhysicalMemorySize = getMethod("getFreePhysicalMemorySize");
        totalPhysicalMemorySize = getMethod("getTotalPhysicalMemorySize");
        openFileDescriptorCount = getMethod("getOpenFileDescriptorCount");
        maxFileDescriptorCount = getMethod("getMaxFileDescriptorCount");
        systemCpuLoad = getMethod("getSystemCpuLoad");
        processCpuLoad = getMethod("getProcessCpuLoad");
    }
 
 
    @Override
    public Map<String, Metric> getMetrics() {
        final Map<String, Metric> gauges = new HashMap<>();
 
        gauges.put("committedVirtualMemorySize", (Gauge<Long>) () -> invokeLong(committedVirtualMemorySize));
        gauges.put("totalSwapSpaceSize", (Gauge<Long>) () -> invokeLong(totalSwapSpaceSize));
        gauges.put("freeSwapSpaceSize", (Gauge<Long>) () -> invokeLong(freeSwapSpaceSize));
        gauges.put("processCpuTime", (Gauge<Long>) () -> invokeLong(processCpuTime));
        gauges.put("freePhysicalMemorySize", (Gauge<Long>) () -> invokeLong(freePhysicalMemorySize));
        gauges.put("totalPhysicalMemorySize", (Gauge<Long>) () -> invokeLong(totalPhysicalMemorySize));
        gauges.put("fd.usage", (Gauge<Double>) () -> invokeRatio(openFileDescriptorCount, maxFileDescriptorCount));
        gauges.put("systemCpuLoad", (Gauge<Double>) () -> invokeDouble(systemCpuLoad));
        gauges.put("processCpuLoad", (Gauge<Double>) () -> invokeDouble(processCpuLoad));
 
        return gauges;
    }
 
    private Optional<Method> getMethod(String name) {
        try {
            final Method method = mxBean.getClass().getDeclaredMethod(name);
            method.setAccessible(true);
            return Optional.of(method);
        } catch (NoSuchMethodException e) {
            return Optional.empty();
        }
    }
 
    private long invokeLong(Optional<Method> method) {
        if (method.isPresent()) {
            try {
                return (long) method.get().invoke(mxBean);
            } catch (IllegalAccessException | InvocationTargetException ite) {
                return 0L;
            }
        }
        return 0L;
    }
 
    private double invokeDouble(Optional<Method> method) {
        if (method.isPresent()) {
            try {
                return (double) method.get().invoke(mxBean);
            } catch (IllegalAccessException | InvocationTargetException ite) {
                return 0.0;
            }
        }
        return 0.0;
    }
 
    private double invokeRatio(Optional<Method> numeratorMethod, Optional<Method> denominatorMethod) {
        if (numeratorMethod.isPresent() && denominatorMethod.isPresent()) {
            try {
                long numerator = (long) numeratorMethod.get().invoke(mxBean);
                long denominator = (long) denominatorMethod.get().invoke(mxBean);
                if (0 ==  denominator) {
                    return Double.NaN;
                }
                return 1.0 * numerator / denominator;
            } catch (IllegalAccessException | InvocationTargetException ite) {
                return Double.NaN;
            }
        }
        return Double.NaN;
    }
 
}

Done.

With this configuration, we get some info in the logs every minute, looking something like this:

~dc-local 2015-02-19T21:48:08.496+0100 [0.0.0.0-metrics-logger-reporter-thread-1] INFO  metrics - type=GAUGE, name=jvm.fd.usage, value=0.12158203125
~dc-local 2015-02-19T21:48:08.496+0100 [0.0.0.0-metrics-logger-reporter-thread-1] INFO  metrics - type=GAUGE, name=jvm.gc.PS-MarkSweep.count, value=6
~dc-local 2015-02-19T21:48:08.496+0100 [0.0.0.0-metrics-logger-reporter-thread-1] INFO  metrics - type=GAUGE, name=jvm.gc.PS-MarkSweep.time, value=1388
...

This is overly verbode in my opinion, so I made a custom logger which is logs everything on one line. You could say this is less readable, but I would use logstash anyway to filter the logs into something more practical.

In the XML file at the top, I want to use a reporter named “compact-slf4j”. Lets register a new reporter using the SPI, in a file named META-INF/services/com.ryantenney.metrics.spring.reporter.ReporterElementParser put

be.vlaanderen.awv.dc.util.metrics.CompactSlf4jReporterElementParser

We need to provide an element parser which parses the configuration:

import com.ryantenney.metrics.spring.reporter.AbstractReporterElementParser;
 
/**
 * Reporter for metrics-spring which logs more compact, all in one line instead of one line for each metric.
 */
public class CompactSlf4jReporterElementParser extends AbstractReporterElementParser {
 
    private static final String FILTER_REF = "filter-ref";
    private static final String FILTER_PATTERN = "filter";
 
    @Override
    public String getType() {
        return "compact-slf4j";
    }
 
    @Override
    protected Class<?> getBeanClass() {
        return CompactSlf4jReporterFactoryBean.class;
    }
 
    @Override
    protected void validate(ValidationContext c) {
        c.require(CompactSlf4jReporterFactoryBean.PERIOD, DURATION_STRING_REGEX, "Period is required and must be in the form '\\d+(ns|us|ms|s|m|h|d)'");
        c.optional(CompactSlf4jReporterFactoryBean.MARKER);
        c.optional(CompactSlf4jReporterFactoryBean.LOGGER);
        c.optional(CompactSlf4jReporterFactoryBean.RATE_UNIT, TIMEUNIT_STRING_REGEX, "Rate unit must be one of the enum constants from java.util.concurrent.TimeUnit");
        c.optional(CompactSlf4jReporterFactoryBean.DURATION_UNIT, TIMEUNIT_STRING_REGEX, "Duration unit must be one of the enum constants from java.util.concurrent.TimeUnit");
        c.optional(FILTER_PATTERN);
        c.optional(FILTER_REF);
        if (c.has(FILTER_PATTERN) && c.has(FILTER_REF)) {
            c.reject(FILTER_REF, "Reporter element must not specify both the 'filter' and 'filter-ref' attributes");
        }
        c.rejectUnmatchedProperties();
    }
 
}

This uses a factory bean which is defined as:

import com.ryantenney.metrics.spring.reporter.AbstractScheduledReporterFactoryBean;
import org.slf4j.LoggerFactory;
import org.slf4j.MarkerFactory;
 
import java.util.concurrent.TimeUnit;
 
/**
 * Slf4JReporterFactoryBean.
 */
public class CompactSlf4jReporterFactoryBean extends AbstractScheduledReporterFactoryBean<CompactSlf4jReporter> {
 
    /** Period attribute. */
    public static final String PERIOD = "period";
    /** Duration unit. */
    public static final String DURATION_UNIT = "duration-unit";
    /** Rate unit. */
    public static final String RATE_UNIT = "rate-unit";
    /** Marker. */
    public static final String MARKER = "marker";
    /** Logger. */
    public static final String LOGGER = "logger";
 
    @Override
    public Class<CompactSlf4jReporter> getObjectType() {
        return CompactSlf4jReporter.class;
    }
 
    @Override
    protected CompactSlf4jReporter createInstance() {
        final CompactSlf4jReporter.Builder reporter = CompactSlf4jReporter.forRegistry(getMetricRegistry());
        if (hasProperty(DURATION_UNIT)) {
            reporter.convertDurationsTo(getProperty(DURATION_UNIT, TimeUnit.class));
        }
        if (hasProperty(RATE_UNIT)) {
            reporter.convertRatesTo(getProperty(RATE_UNIT, TimeUnit.class));
        }
        reporter.filter(getMetricFilter());
        if (hasProperty(MARKER)) {
            reporter.markWith(MarkerFactory.getMarker(getProperty(MARKER)));
        }
        if (hasProperty(LOGGER)) {
            reporter.outputTo(LoggerFactory.getLogger(getProperty(LOGGER)));
        }
        return reporter.build();
    }
 
    @Override
    protected long getPeriod() {
        return convertDurationString(getProperty(PERIOD));
    }
 
}

The actual work in done in the reporter itself:

import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricFilter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.ScheduledReporter;
import com.codahale.metrics.Snapshot;
import com.codahale.metrics.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
 
import java.util.Map.Entry;
import java.util.SortedMap;
import java.util.concurrent.TimeUnit;
 
/**
 * A reporter class for logging metrics values to a SLF4J {@link Logger} periodically, similar to
 * {@link com.codahale.metrics.ConsoleReporter} or {@link com.codahale.metrics.CsvReporter}, but using the SLF4J framework instead. It also
 * supports specifying a {@link Marker} instance that can be used by custom appenders and filters
 * for the bound logging toolkit to further process metrics reports.
 */
public final class CompactSlf4jReporter extends ScheduledReporter {
 
    private final Logger logger;
    private final Marker marker;
 
    /**
     * Returns a new {@link Builder} for {@link CompactSlf4jReporter}.
     *
     * @param registry the registry to report
     * @return a {@link Builder} instance for a {@link CompactSlf4jReporter}
     */
    public static Builder forRegistry(MetricRegistry registry) {
        return new Builder(registry);
    }
 
    private CompactSlf4jReporter(MetricRegistry registry,
            Logger logger,
            Marker marker,
            TimeUnit rateUnit,
            TimeUnit durationUnit,
            MetricFilter filter) {
        super(registry, "logger-reporter", filter, rateUnit, durationUnit);
        this.logger = logger;
        this.marker = marker;
    }
 
    @Override
    public void report(SortedMap<String, Gauge> gauges,
            SortedMap<String, Counter> counters,
            SortedMap<String, Histogram> histograms,
            SortedMap<String, Meter> meters,
            SortedMap<String, Timer> timers) {
        StringBuilder data = new StringBuilder();
        for (Entry<String, Gauge> entry : gauges.entrySet()) {
            addGauge(data, entry.getKey(), entry.getValue());
        }
 
        for (Entry<String, Counter> entry : counters.entrySet()) {
            addCounter(data, entry.getKey(), entry.getValue());
        }
 
        for (Entry<String, Histogram> entry : histograms.entrySet()) {
            addHistogram(data, entry.getKey(), entry.getValue());
        }
 
        for (Entry<String, Meter> entry : meters.entrySet()) {
            addMeter(data, entry.getKey(), entry.getValue());
        }
 
        for (Entry<String, Timer> entry : timers.entrySet()) {
            addTimer(data, entry.getKey(), entry.getValue());
        }
        logger.info(marker, data.toString());
    }
 
    private void addTimer(StringBuilder data, String name, Timer timer) {
        final Snapshot snapshot = timer.getSnapshot();
        data.append(" type=timer.").append(name).append(":");
        data.append(" count=").append(timer.getCount());
        data.append(", min=").append(convertDuration(snapshot.getMin()));
        data.append(", max=").append(convertDuration(snapshot.getMax()));
        data.append(", mean=").append(convertDuration(snapshot.getMean()));
        data.append(", stdDev=").append(convertDuration(snapshot.getStdDev()));
        data.append(", median=").append(convertDuration(snapshot.getMedian()));
        data.append(", p75=").append(convertDuration(snapshot.get75thPercentile()));
        data.append(", p95=").append(convertDuration(snapshot.get95thPercentile()));
        data.append(", p98=").append(convertDuration(snapshot.get98thPercentile()));
        data.append(", p99=").append(convertDuration(snapshot.get99thPercentile()));
        data.append(", 999=").append(convertDuration(snapshot.get999thPercentile()));
        data.append(", mean_rate=").append(convertRate(timer.getMeanRate()));
        data.append(", m1=").append(convertRate(timer.getMeanRate()));
        data.append(", m5=").append(convertRate(timer.getMeanRate()));
        data.append(", m15=").append(convertRate(timer.getMeanRate()));
        data.append(", rate_unit=").append(getRateUnit());
        data.append(", duration_unit=").append(getDurationUnit());
    }
 
    private void addMeter(StringBuilder data, String name, Meter meter) {
        data.append(" type=meter.").append(name).append(":");
        data.append(" count=").append(meter.getCount());
        data.append(", mean_rate=").append(convertRate(meter.getMeanRate()));
        data.append(", m1=").append(convertRate(meter.getOneMinuteRate()));
        data.append(", m5=").append(convertRate(meter.getFiveMinuteRate()));
        data.append(", m15=").append(convertRate(meter.getFifteenMinuteRate()));
        data.append(", rate_unit=").append(getRateUnit());
    }
 
    private void addHistogram(StringBuilder data, String name, Histogram histogram) {
        final Snapshot snapshot = histogram.getSnapshot();
        data.append(" type=histogram.").append(name).append(":");
        data.append(" count=").append(histogram.getCount());
        data.append(", min=").append(snapshot.getMin());
        data.append(", max=").append(snapshot.getMax());
        data.append(", mean=").append(snapshot.getMean());
        data.append(", stdDev=").append(snapshot.getStdDev());
        data.append(", median=").append(snapshot.getMedian());
        data.append(", p75=").append(snapshot.get75thPercentile());
        data.append(", p95=").append(snapshot.get95thPercentile());
        data.append(", p98=").append(snapshot.get98thPercentile());
        data.append(", p99=").append(snapshot.get99thPercentile());
        data.append(", 999=").append(snapshot.get999thPercentile());
    }
 
    private void addCounter(StringBuilder data, String name, Counter counter) {
        data.append(" counter.").append(name).append(": ").append(counter.getCount());
    }
 
    private void addGauge(StringBuilder data, String name, Gauge gauge) {
        data.append(" gauge.").append(name).append(": ").append(gauge.getValue());
    }
 
    @Override
    protected String getRateUnit() {
        return "events/" + super.getRateUnit();
    }
 
    /**
     * A builder for {@link com.codahale.metrics.CsvReporter} instances. Defaults to logging to {@code metrics}, not
     * using a marker, converting rates to events/second, converting durations to milliseconds, and
     * not filtering metrics.
     */
    public static final class Builder {
        private final MetricRegistry registry;
        private Logger logger;
        private Marker marker;
        private TimeUnit rateUnit;
        private TimeUnit durationUnit;
        private MetricFilter filter;
 
        private Builder(MetricRegistry registry) {
            this.registry = registry;
            this.logger = LoggerFactory.getLogger("metrics");
            this.marker = null;
            this.rateUnit = TimeUnit.SECONDS;
            this.durationUnit = TimeUnit.MILLISECONDS;
            this.filter = MetricFilter.ALL;
        }
 
        /**
         * Log metrics to the given logger.
         *
         * @param logger an SLF4J {@link Logger}
         * @return {@code this}
         */
        public Builder outputTo(Logger logger) {
            this.logger = logger;
            return this;
        }
 
        /**
         * Mark all logged metrics with the given marker.
         *
         * @param marker an SLF4J {@link Marker}
         * @return {@code this}
         */
        public Builder markWith(Marker marker) {
            this.marker = marker;
            return this;
        }
 
        /**
         * Convert rates to the given time unit.
         *
         * @param rateUnit a unit of time
         * @return {@code this}
         */
        public Builder convertRatesTo(TimeUnit rateUnit) {
            this.rateUnit = rateUnit;
            return this;
        }
 
        /**
         * Convert durations to the given time unit.
         *
         * @param durationUnit a unit of time
         * @return {@code this}
         */
        public Builder convertDurationsTo(TimeUnit durationUnit) {
            this.durationUnit = durationUnit;
            return this;
        }
 
        /**
         * Only report metrics which match the given filter.
         *
         * @param filter a {@link MetricFilter}
         * @return {@code this}
         */
        public Builder filter(MetricFilter filter) {
            this.filter = filter;
            return this;
        }
 
        /**
         * Builds a {@link CompactSlf4jReporter} with the given properties.
         *
         * @return a {@link CompactSlf4jReporter}
         */
        public CompactSlf4jReporter build() {
            return new CompactSlf4jReporter(registry, logger, marker, rateUnit, durationUnit, filter);
        }
    }
 
}

Asynchronous handling using JMS in a Spring application

We needed to asynchronous handling in our application, so we decided to use JMS to achieve this. We had one requirement, the asynchronous operation should run after the “current” transaction but only when the transaction succeeded. That is why we chose to use JMS instead of just dispatching to a different thread or using something like the LMAX disruptor.

The application was already using the Spring framework and uses Hibernate to access the database and a Hibernate transaction manager.

To introduce JMS, we need to add a JMS provider. In our case, we are using ActiveMQ.

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jms</artifactId>
    <version>${spring.version}</version>
</dependency>
 
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-broker</artifactId>
    <version>5.10.0</version>
</dependency>
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-spring</artifactId>
    <version>5.10.0</version>
</dependency>

Now we can create a message queue with the name “myEventQueue”. This is done using the following configuration.

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:p="http://www.springframework.org/schema/p"
       xmlns:jms="http://www.springframework.org/schema/jms"
       xmlns:amq="http://activemq.apache.org/schema/core"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
                           http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd
                           http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
 
    <!--  Embedded ActiveMQ Broker -->
    <amq:broker id="broker" useJmx="false" persistent="false">
        <amq:transportConnectors>
            <amq:transportConnector uri="tcp://localhost:0" />
        </amq:transportConnectors>
    </amq:broker>
 
    <!--  ActiveMQ Destinations  -->
    <amq:queue id="myEventQueue" physicalName="my.pkg.myEventQueue" ></amq:queue>
 
    <!-- JMS ConnectionFactory to use, configuring the embedded broker using XML -->
    <amq:connectionFactory id="jmsFactory" brokerURL="vm://localhost" />
 
    <bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
        <property name="connectionFactory" ref="jmsFactory" />
    </bean>
 
    <bean id="jmsConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"
          depends-on="broker"
          p:targetConnectionFactory-ref="jmsFactory" />
 
    <jms:annotation-driven/>
 
    <bean id="jmsListenerContainerFactory" class="org.springframework.jms.config.DefaultJmsListenerContainerFactory">
        <property name="connectionFactory" ref="jmsConnectionFactory"/>
        <property name="concurrency" value="3-10"/>
    </bean>
 
    <bean id="jmsProducerTemplate" class="org.springframework.jms.core.JmsTemplate"
          p:connectionFactory-ref="jmsConnectionFactory"
          p:sessionTransacted="true" />
 
</beans>

It assures that an embedded JMS server is setup (the “broker”) and it creates a JMS transactino manager. This setup makes the sending of messages transactional. When the transaction in which the event is sent is rolled back then the JMS message will not be actually sent. Note that receipt of the message is not trasnactional in this setup. So if the transaction which processes the message fails then the message is lost. This can be solved, but we haven’t done that yet.

Send a message to the queue can be done using code like the following:

@Component
public class EventProducer {
 
    @Autowired
    private JmsTemplate template;
 
    public void sendEvent(String eventParameter) {
        try {
            template.send("my.pkg.myEventQueue", session -> {
                    TextMessage message = session.createTextMessage("myEventQueue");
                    message.setStringProperty("parameter", eventParameter);
                    return message;
                });
        } catch (JmsException je) {
            // ... handle exception
        }
    }
 
}

To listen to events, you can create a listener component like this

@Component
public class EventListener implements MessageListener {
 
    @Autowired
    private WhateverHelper whateverHelper;
 
    @Override
    @Transactional
    @JmsListener(destination = "my.pkg.myEventQueue")
    public void onMessage(Message message) {
        try {
            String parameter = message.getStringProperty("parameter");
 
            if (message instanceof TextMessage) {
                TextMessage tm = (TextMessage) message;
                String msg = tm.getText(); // can be used for logging the event
 
                whateverHelper.doSomething(parameter);
            }
        } catch (JMSException e) {
            // ... handle exception
        }
    }
 
}