In this post I will present a simple PubSub system for OCaml using the Continuous Query feature of Oracle Coherence. The code is all on GitHub. For this example, we will have three subscribers interested in different kinds of messages:
- High priority messages
- Info messages
- Messages which mention
XYZ
in their body
A “real” application might have a much more detailed message structure than just priority, subject and body, however the code will be very easy to modify to add new fields, etc. The important thing to understand is that the filters run within the Coherence grid itself; a subscriber only sees messages in which it has registered an interest, the routing and filtering is entirely offloaded from the client. Filters can be combined, for example with AndFilter, tho’ they are not quite as powerful as the WHERE
construct in SQL (e.g. no joins). Coherence-based systems are known to scale to thousands of messages/sec with thousands of connected subscribers, and OCaml is an ideal language for CEP applications such as trading.
The nature of PubSub is that the publishers and subscribers are fully de-coupled; a publisher does not know if any subscriber is listening, nor does a subscriber know who has sent the message. If reliable delivery or a stronger relationship or is required, then a message bus such as Oracle AQ should be used (the tradeoff being lower throughput/higher latency). An analogy is UDP vs TCP. In all cases, so long as the data can be interchanged via a common format (database column types for AQ, POF for Coherence, Apache Thrift, etc), it does not matter what language any of the individual participants is written in. This is the fundamental principle of OPERATION FOOTHOLD
.
The code for the subscriber is as follows.
open Cohml open Callback open Printf open Log_message open Unix (* the message record struct *) type message = {msg_id : int; msg_priority : int; msg_subject : string; msg_body : string} (* the query predicate for the message records we are interested in receiving events for. Fields are 0-3 (see definition of message above), conditions are 0: less than, 1: equal to, 2: greater than, 3: like, types are 0 for int, 1 for string. Search term is a string and we'll atoi() it in C if we need to. Existing is if we want anything already in the cache before we begin. *) type field = | ID | PRIORITY | SUBJECT | BODY type field_type = | INT | STRING type condition = | LESS_THAN | EQUAL_TO | GREATER_THAN | LIKE type query = {field : field; field_type: field_type; condition : condition; search_term : string; existing: bool} (* function to register message listener callbacks - insert, update, delete*) external coh_msg_listen: coh_ptr -> query -> string -> string -> string -> unit = "caml_coh_addmessagelistener" let string_of_message m = sprintf "id=%d, priority=%d, subject='%s', body='%s'" m.msg_id m.msg_priority m.msg_subject m.msg_body (* callback functions *) let _ = register "cbf_msg_insert" (fun k m -> log_message (sprintf "New message: %s" (string_of_message m))) let _ = register "cbf_msg_update" (fun k om nm -> log_message (sprintf "Message changed %s/%s" (string_of_message om) (string_of_message nm))) let _ = register "cbf_msg_delete" (fun k -> log_message (sprintf "Message %d deleted" k)) let () = let st = (match Array.length Sys.argv with |1 -> 1 (* default to subscriber type 1 *) |_ -> int_of_string Sys.argv.(1) ) in let q = (match st with |1 -> { field=PRIORITY; field_type=INT; condition=LESS_THAN; search_term="2"; existing=false } |2 -> { field=SUBJECT; field_type=STRING; condition=EQUAL_TO; search_term="INFO"; existing=false } |_ -> { field=BODY; field_type=STRING; condition=LIKE; search_term="%XYZ%"; existing=false }) in log_message (sprintf "Subscriber starting as type %d" st); let c = coh_getcache "message_cache" in coh_msg_listen c q "cbf_msg_insert" "cbf_msg_update" "cbf_msg_delete"; sleep 1000
It builds on the previous example, the new feature being the query
struct which describes the type of message the subscriber is interested (all three of the example subscribers are in the same source file, selectable on the command line). This is clearly highly specific to the message
record type and it’s C++ representation the Message
class, however at least the subscription can be defined from OCaml code. To test it, I have written a simple load generator that composes random messages:
open Cohml open Log_message type message = {msg_id : int; msg_priority : int; msg_subject : string; msg_body : string} external coh_put_message: coh_ptr -> message -> unit = "caml_put_message" let print_message m = log_message (Printf.sprintf "id=%d, priority=%d, subject='%s', body='%s'" m.msg_id m.msg_priority m.msg_subject m.msg_body) let p () = (Random.int 5) + 1 let subjects = [|"INFO"; "WARN"; "FAIL"; "RECOVER"|] let s () = Random.int (Array.length subjects) let bodies = [|"ABC"; "DEF"; "GHI"; "JKL"; "SOS"; "XYZ"|] let b () = Random.int (Array.length bodies) let pad = String.make 40 'Z' (* for Tibco-style 50-byte messages *) let () = Random.self_init (); let x = (match Array.length Sys.argv with |1 -> 10000|_ -> int_of_string Sys.argv.(1)) in let c = coh_getcache "message_cache" in let t1 = Unix.gettimeofday () in for i = 1 to x do let m = {msg_id = i; msg_priority = p (); msg_subject = subjects.(s ()); msg_body = pad ^ (bodies.(b ()))} in coh_put_message c m; print_message m; Unix.sleep 1 done; log_message (Printf.sprintf "Sent %d messages in %.3fs" x (Unix.gettimeofday () -. t1))
The sleep function allows the log messages from all the different programs to be correlated. Running it:
And observing all three clients (started already):
A simple test shows that on my test system (a 1G Debian VM in VirtualBox on a MacBook Pro) can handle up to 1200 msgs/sec†, making this solution ~12× faster than AQ on the same hardware when accessed from OCaml; the difference being of course that AQ is a reliable transport. This may or may not be more important to the application than pure performance.
I think this is probably as far as I am going to take COH*ML for now; I need to get a lot more familiar with its native C++ interface, and re-acquainted C++ in general, before going much further. Nevertheless, this is more than sufficient as a proof-of-concept and will serve as a general framework for further development. I have a much better understanding than when I began of what is and is not feasible with this approach, and the trade-offs involved – it is ideally suited for relatively simple messages on which relatively complex algorithms written in OCaml can be run (then perhaps call back into C/C++ for, say, trade execution).
† when not updating the screen
Pingback: One Year On | So I decided to take my work back underground