Introduction
This is a tutorial for beginners who are interested in CEP (Complex Event Processing) using Esper. We will build a fictional real-time surveillance system which alerts on conditions we will define. You will learn how to use Esper to process real-time events and you may also learn a thing or two about sockets in Java.
The Story
Boys at the station have been tipped off regarding some illegal activities at The Genco Pura Olive Oil Company. They suspect that some illegal cargo is about to move through the company. In order to bust the gig they have come out with a plan to monitor phone calls to and from certain phone numbers. They have identified phone numbers used by Vito and Tom and they’d like to know when they make a call or they receive a call. The scenario can be expanded to record their conversations but that would require a court’s approval and it’s just too much work for a Esper tutorial.
Tools of Trade
This tutorial is about Esper which should be downloaded and extracted to an accessible location.
Language of choice is Java so it’s Esper for Java we are after.
Phone numbers are fictitious and taken from this site. http://www.acma.gov.au/WEB/STANDARD..PC/pc=PC_2330
Vito’s number is: 1800 160 401. Yes he has a 1800 number. He does not mind paying when other people call him.
Tom’s number is: 0491 570 110
IDE: I use Netbeans but it doesn’t really matter. Any IDE or text editor of choice will do.
Design
Our bust the bad guys solution relies on a server component which sends messages about calls being made. These messages are received by a client application which feeds them into Esper engine. Esper then looks at these messages and based on instructions given to it, alerts the user. Alert mechanism is simply displaying attention grabbing text on console. This is a CEP and Esper tutorial remember? Do not get carried away.
Below is a pictorial representation of the design.

The Message
The server sends a message to the client whenever a call is made. This message is then received by client. We can express our message as a POJO which are supported by Esper. The message object maintains information about from and to phone numbers, time of call and duration of call. For this tutorial we are only interested in the phone numbers.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
package com.thereforesystems.cdrlibrary; import java.io.Serializable; import java.util.Date; public class CallDataRecord implements Serializable{ String fromNumber; String toNumber; Date callDateTime; int duration; public Date getCallDateTime() { return callDateTime; } public String getFromNumber() { return fromNumber; } public String getToNumber() { return toNumber; } public int getDuration() { return duration; } public CallDataRecord(String from, String to, Date time, int dur) { fromNumber = from; toNumber = to; callDateTime = time; duration = dur; } @Override public String toString() { return "From : " + fromNumber + " To: " + toNumber; } } |
Server
The server application accepts socket connections on a port and then starts sending messages to the connected client. It maintains a list of phone numbers and simulates call data records also known as CDR in telco lingo. This is a stock standard sockets server which has nothing to do with Esper.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
package com.thereforesystems.cdrserver; import com.thereforesystems.cdrlibrary.CallDataRecord; import java.io.*; import java.net.*; import java.util.Calendar; import java.util.Random; public class CDRServer { String phoneNumbers[] = {"0491 570 156", "0491 570 157", "0491 570 158", "0491 570 159", "0491 570 110", "1800 160 401", "1800 975 707", "1800 975 708", "1800 975 709", "1800 975 710", "1800 975 711", "1300 975 707", "1300 975 708", "1300 975 709", "1300 975 710", "1300 975 711"}; ServerSocket providerSocket; Socket connection = null; ObjectOutputStream out; String message; private static Random generator = new Random(); void run() { try { providerSocket = new ServerSocket(9999, 10); System.out.println("Waiting for connection to CDR Server"); connection = providerSocket.accept(); System.out.println("Connection received from " + connection.getInetAddress().getHostName()); out = new ObjectOutputStream(connection.getOutputStream()); out.flush(); String from; String to; int duration; while (true) { if (connection.isConnected()) { from = phoneNumbers[generator.nextInt(phoneNumbers.length - 1)]; to = phoneNumbers[generator.nextInt(phoneNumbers.length - 1)]; if (!from.equals(to)) { // no need to send when from and to are same numbers duration = generator.nextInt(10); Calendar calendar = Calendar.getInstance(); CallDataRecord cdr = new CallDataRecord(from, to, calendar.getTime(), duration); sendMessage(cdr); } } else { break; } } } catch (IOException ioException) { ioException.printStackTrace(); } finally { try { out.close(); connection.close(); providerSocket.close(); } catch (IOException ioException) { ioException.printStackTrace(); } } } void sendMessage(CallDataRecord cdr) { try { out.writeObject(cdr); out.flush(); System.out.println("Sending CDR: " + cdr); } catch (IOException ioException) { ioException.printStackTrace(); } } public static void main(String[] args) { CDRServer server = new CDRServer(); while (true) { server.run(); } } } |
Client
Client application is where interesting stuff happens. By establishing a socket connection it receives a message for each call from server.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
requestSocket = new Socket("localhost", 9999); System.out.println("Connected to localhost in port 9999"); in = new ObjectInputStream(requestSocket.getInputStream()); while (true) { try { cdr = (CallDataRecord) in.readObject(); runtime.sendEvent(cdr); } catch (ClassNotFoundException ex) { Logger.getLogger(CallDetector.class.getName()) .log(Level.SEVERE, null, ex); } } |
You can see in the code above that an instance of CallDataRecord class is created from data in ObjectInputStream. A call is made to sendEvent method of EPRuntime object and retrieved CallDataRecord is sent as an argument. How is this data handled by Esper? We have two classes which are Esper listeners and they are ready to act when invoked. We have one listener for calls made “from” and another for calls made “to”. For a class to act as a listener in Esper it must implement UpdateListener interface.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
public static class FromListener implements UpdateListener { @Override public void update(EventBean[] newData, EventBean[] oldData) { System.out.println("Bad guy made a call: " + newData[0].getUnderlying()); } } public static class ToListener implements UpdateListener { @Override public void update(EventBean[] newData, EventBean[] oldData) { System.out.println("Bad guy received a call: " + newData[0].getUnderlying()); } } |
So what invokes a particular listener? This is handled by Esper when we register a listener to a particular EPL statement. An EPL statement is like a SQL query with the familiar Select * from syntax. This is our EPL statement for “from” calls. Note that after creating an instance of EPStatement class we register the appropriate listener.
|
1 2 3 4 5 6 7 8 9 10 11 |
EPStatement eplFrom = admin.createEPL("select * from CDR(fromNumber='1800 160 401' " + "or fromNumber='0491 570 110')"); eplFrom.addListener(new FromListener()); EPStatement eplTo = admin.createEPL("select * from CDR(toNumber='1800 160 401' " + "or toNumber='0491 570 110')"); eplTo.addListener(new ToListener()); |
For it to all work there is a little more setup required. Let’s go through it. First of all we need a Configuration object to which we will add an event type. Note that in our EPL statements we are querying from CDR. This CDR is defined as a Event Type in the configuration object.
|
1 2 3 4 |
Configuration config = new Configuration(); config.addEventType("CDR", CallDataRecord.class.getName()); |
Next thing we need is a EPRuntime which is the Esper runtime object.
|
1 2 3 4 5 |
EPServiceProvider provider = EPServiceProviderManager.getProvider("CDREngine", config); EPRuntime runtime = provider.getEPRuntime(); |
Finally we run our client and pass it an instance of EPRuntime so that our client can send events to Esper engine.
|
1 2 3 4 |
CallDetector client = new CallDetector(); client.run(runtime); |
Here is the full code for Client application.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
package com.thereforesystems.calldetector; import com.espertech.esper.client.*; import com.thereforesystems.cdrlibrary.CallDataRecord; import java.io.*; import java.net.*; import java.util.logging.Level; import java.util.logging.Logger; public class CallDetector { Socket requestSocket; ObjectInputStream in; CallDataRecord cdr; void run(EPRuntime runtime) { try { requestSocket = new Socket("localhost", 9999); System.out.println("Connected to localhost in port 9999"); in = new ObjectInputStream(requestSocket.getInputStream()); while (true) { try { cdr = (CallDataRecord) in.readObject(); runtime.sendEvent(cdr); } catch (ClassNotFoundException ex) { Logger.getLogger(CallDetector.class.getName()) .log(Level.SEVERE, null, ex); } } } catch (UnknownHostException unknownHost) { System.err.println("You are trying to connect to an unknown host!"); } catch (IOException ioException) { ioException.printStackTrace(); } finally { try { in.close(); requestSocket.close(); } catch (IOException ioException) { ioException.printStackTrace(); } } } public static class FromListener implements UpdateListener { @Override public void update(EventBean[] newData, EventBean[] oldData) { System.out.println("Bad guy made a call: " + newData[0].getUnderlying()); } } public static class ToListener implements UpdateListener { @Override public void update(EventBean[] newData, EventBean[] oldData) { System.out.println("Bad guy received a call: " + newData[0].getUnderlying()); } } public static void main(String[] args) { Configuration config = new Configuration(); config.addEventType("CDR", CallDataRecord.class.getName()); EPServiceProvider provider = EPServiceProviderManager.getProvider("CDREngine", config); EPRuntime runtime = provider.getEPRuntime(); EPAdministrator admin = provider.getEPAdministrator(); EPStatement eplFrom = admin.createEPL("select * from CDR(fromNumber='1800 160 401' " + "or fromNumber='0491 570 110')"); eplFrom.addListener(new FromListener()); EPStatement eplTo = admin.createEPL("select * from CDR(toNumber='1800 160 401' " + "or toNumber='0491 570 110')"); eplTo.addListener(new ToListener()); CallDetector client = new CallDetector(); client.run(runtime); } } |
For the client to build we need to reference following libraries.

These jars can be found in the root esper folder and root esper/esper/lib folders.
Output
After running Server and Client for few seconds we can see that Esper is notifying us of calls made to or from Vito or Tom’s mobile. Watch out guys.

Conclusion
In this tutorial we saw how simple it is to create an application which can process events in real-time. Real power of Esper is the raw speed by which it can do complex real-time processing. Esper is also simple to understand and work with. There is a lot more to Esper than this simple tutorial. Maybe I will write a bit more in future posts.





