Putting it all together: PubSub for OCaml with Coherence

In this post I will present a simple PubSub system for OCaml using the Continuous Query feature of Oracle Coherence. The code is all on GitHub. For this example, we will have three subscribers interested in different kinds of messages:

  1. High priority messages
  2. Info messages
  3. Messages which mention XYZ in their body

A “real” application might have a much more detailed message structure than just priority, subject and body, however the code will be very easy to modify to add new fields, etc. The important thing to understand is that the filters run within the Coherence grid itself; a subscriber only sees messages in which it has registered an interest, the routing and filtering is entirely offloaded from the client. Filters can be combined, for example with AndFilter, tho’ they are not quite as powerful as the WHERE construct in SQL (e.g. no joins). Coherence-based systems are known to scale to thousands of messages/sec with thousands of connected subscribers, and OCaml is an ideal language for CEP applications such as trading.

The nature of PubSub is that the publishers and subscribers are fully de-coupled; a publisher does not know if any subscriber is listening, nor does a subscriber know who has sent the message. If reliable delivery or a stronger relationship or is required, then a message bus such as Oracle AQ should be used (the tradeoff being lower throughput/higher latency). An analogy is UDP vs TCP. In all cases, so long as the data can be interchanged via a common format (database column types for AQ, POF for Coherence, Apache Thrift, etc), it does not matter what language any of the individual participants is written in. This is the fundamental principle of OPERATION FOOTHOLD.

The code for the subscriber is as follows.

open Cohml
open Callback
open Printf
open Log_message
open Unix

(* the message record struct *)
type message = {msg_id : int;  msg_priority : int; msg_subject : string; msg_body : string}

(* the query predicate for the message records we are interested in receiving 
   events for. Fields are 0-3 (see definition of message above), conditions are
   0: less than, 1: equal to, 2: greater than, 3: like, types are 0 for int, 1
   for string. Search term is a string and we'll atoi() it in C if we need to.
   Existing is if we want anything already in the cache before we begin.
*)
type field = | ID | PRIORITY | SUBJECT | BODY
type field_type = | INT | STRING
type condition = | LESS_THAN | EQUAL_TO | GREATER_THAN | LIKE 
type query = {field : field; field_type: field_type; condition : condition; search_term : string; existing: bool}

(* function to register message listener callbacks - insert, update, delete*)
external coh_msg_listen: coh_ptr -> query -> string -> string -> string -> unit = "caml_coh_addmessagelistener"

let string_of_message m = sprintf "id=%d, priority=%d, subject='%s', body='%s'" m.msg_id m.msg_priority m.msg_subject m.msg_body

(* callback functions *)
let _ = register "cbf_msg_insert" (fun k m -> 
  log_message (sprintf "New message: %s" (string_of_message m)))
let _ = register "cbf_msg_update" (fun k om nm ->
  log_message (sprintf "Message changed %s/%s" (string_of_message om) (string_of_message nm)))
let _ = register "cbf_msg_delete" (fun k ->
  log_message (sprintf "Message %d deleted" k))
  
let () =
  let st = (match Array.length Sys.argv with
    |1 -> 1 (* default to subscriber type 1 *)
    |_ -> int_of_string Sys.argv.(1) ) in
  let q = (match st with
    |1 -> { field=PRIORITY; field_type=INT; condition=LESS_THAN; search_term="2"; existing=false }
    |2 -> { field=SUBJECT; field_type=STRING; condition=EQUAL_TO; search_term="INFO"; existing=false }
    |_ -> { field=BODY; field_type=STRING; condition=LIKE; search_term="%XYZ%"; existing=false }) in
  log_message (sprintf "Subscriber starting as type %d" st);
  let c = coh_getcache "message_cache" in
  coh_msg_listen c q "cbf_msg_insert" "cbf_msg_update" "cbf_msg_delete";
  sleep 1000

It builds on the previous example, the new feature being the query struct which describes the type of message the subscriber is interested (all three of the example subscribers are in the same source file, selectable on the command line). This is clearly highly specific to the message record type and it’s C++ representation the Message class, however at least the subscription can be defined from OCaml code. To test it, I have written a simple load generator that composes random messages:

open Cohml
open Log_message

type message = {msg_id : int;  msg_priority : int; msg_subject : string; msg_body : string}
external coh_put_message: coh_ptr -> message -> unit = "caml_put_message"
let print_message m = log_message (Printf.sprintf "id=%d, priority=%d, subject='%s', body='%s'" m.msg_id m.msg_priority m.msg_subject m.msg_body) 

let p () = (Random.int 5) + 1 
let subjects = [|"INFO"; "WARN"; "FAIL"; "RECOVER"|] 
let s () = Random.int (Array.length subjects)
let bodies = [|"ABC"; "DEF"; "GHI"; "JKL"; "SOS"; "XYZ"|]
let b () = Random.int (Array.length bodies)
let pad = String.make 40 'Z' (* for Tibco-style 50-byte messages *) 

let () = 
  Random.self_init ();
  let x = (match Array.length Sys.argv with |1 -> 10000|_ -> int_of_string Sys.argv.(1)) in
  let c = coh_getcache "message_cache" in
  let t1 = Unix.gettimeofday () in
  for i = 1 to x do
    let m = {msg_id = i; msg_priority = p (); msg_subject = subjects.(s ()); msg_body = pad ^ (bodies.(b ()))} in
    coh_put_message c m;
    print_message m;
    Unix.sleep 1
  done;
  log_message (Printf.sprintf "Sent %d messages in %.3fs" x (Unix.gettimeofday () -. t1))

The sleep function allows the log messages from all the different programs to be correlated. Running it:

Publisher


And observing all three clients (started already):

Subscriber 1 - high priority messages


Subscriber 2 - INFO messages


Subscriber 3 - Messages mentioning XYZ


A simple test shows that on my test system (a 1G Debian VM in VirtualBox on a MacBook Pro) can handle up to 1200 msgs/sec†, making this solution ~12× faster than AQ on the same hardware when accessed from OCaml; the difference being of course that AQ is a reliable transport. This may or may not be more important to the application than pure performance.

I think this is probably as far as I am going to take COH*ML for now; I need to get a lot more familiar with its native C++ interface, and re-acquainted C++ in general, before going much further. Nevertheless, this is more than sufficient as a proof-of-concept and will serve as a general framework for further development. I have a much better understanding than when I began of what is and is not feasible with this approach, and the trade-offs involved – it is ideally suited for relatively simple messages on which relatively complex algorithms written in OCaml can be run (then perhaps call back into C/C++ for, say, trade execution).

† when not updating the screen

About Gaius

Jus' a good ol' boy, never meanin' no harm
This entry was posted in C++, Coherence, COHML, Ocaml, Operation Foothold. Bookmark the permalink.

1 Response to Putting it all together: PubSub for OCaml with Coherence

  1. Pingback: One Year On | So I decided to take my work back underground

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s