Pipes and Filters with Oracle

Thursday, 23rd February 2012

I’ve been working for a while on a cash flow forecasting system for the Asset Management division of a major global bank. I inherited it in a mainly unfinished form, after it should have been delivered, “feature complete” and with a healthy list of bugs that needed fixing!

The architecture of the app is pretty typical of what I’ve found in large financial organisations (either first hand or through friends) and that is a whole suite of stored procedures on the database that do all of the work with an anaemic web app front-end to present the results to the user. So it goes.

The funds are organised into fund groups, and some of these groups contain extremely large numbers of funds. Now one of the requirements was that the fund manager should be able to see all of the funds in his fund group at the same time. This obviously can result in a very large amount of data being produced, and as the app collects all the data and processes it every time the user hits “Go!” (metaphorically speaking of course, there’s no crass “Go!” button - honest!) it often resulted in delays of over thirty minutes (timing out the user’s session) while it did so!

Now, the app has to collate data from a number of upstream systems, process them through some precedence rules and end up with the final data showing the current state. This was done live with the trading platform, and the back-office systems would reset the funds’ positions once overnight and then periodically during the day (“twice every fifteen minutes”).

When I took it over, I realised that there was no real need to do this every time the user hits “Go!” and that if I could find a way to pre-process the data to get it into a state that could simply be queried by the Java app, then the users would benefit from extremely quick response times. No more dreaded timeouts.

Also, a number of other requirements that were coming through required more processing to be done on the front-end. So, I proposed a two-phase solution to: improve the responsiveness of the application for the users (by pre-processing the data as described) as phase one, and to enrich the capabilities of the front-end Java app so that more of the work could be done up there for phase two.

This was agreed, and I set about the work.

Now, you’ve got to understand, there were dozens of stored procs. A few of them were over 2,000 lines each! In total there were over 22,000 lines of SQL that made up the database part of the app. It wasn’t pretty.

The original developers had been under immense pressure to develop something that really wasn’t clear, and they’d been very pragmatic in their approach in order to get it delivered. Unfortunately this resulted in duplicated logic, spread across those procs, with what amounted to minor differences in each one for each business scenario.

Of course, this was extremely brittle. While I was working on getting the bugs fixed for release, I would change one proc to implement it correctly, and that would break another, or it would suddenly cause duplicates to appear and so on. Oh, and I forgot to mention … none of this was tested.

So the first thing I did, when I set about the work, was create a suite of tests. For this I used Cucumber and RSpec and Raimonds Simanovskis’ excellent ruby-plsql-spec gem and I listed out all of the scenarios that I could find in the code and from the docs.

This highlighted that the existing code wasn’t working properly, but I knew that already from the constant stream of bug reports I was receiving now that it was live.

Once I had a decent suite of tests I set about creating the new procedures (well, packages and procedures) that would eventually come to replace the old code.

The design I’d settled on was to process the incoming trades from the trading platform through a series of pipes and filters applying the different transformations, precedence rules and filtering steps along the way before merging with the data from the back-office systems (again applying the appropriate precedence rules) to result in a kind of canonical data model that represented the final state of the trades and positions for query by the front-end.

The concept of pipes and filters is very straight forward. Basically you have processors that are connected (via the pipes) that process data received in from a source and publish it out to another processor (via another pipe). The pipe is the mechanism by which one processor sends and receives the data to and from other processors. It can be as simple as STDIN and STDOUT (and is in Unix! Hence the reason why the | symbol is called a pipe!)

However, I’d never seen pipes and filters implemented in Oracle before, and had no idea how to go about doing so.

What I found was not only that the approach is very well supported by Oracle, but that it also appeared to be somewhat novel in practice. At least there was very little information about its practical use as a technique at the time, and there’s still very little about it now. I don’t know, maybe database developers are a secretive lot!

Anyway, as I said, what I wanted was to have a series of processing steps that would operate on the data and allow me to DRY the code by having each business or processing rule in its own filter. I could then combine these (with the pipes) to produce the final output.

For example, the trading platform would provide one row per trade. If the trade was cancelled then the system needed to output a contra row. So I created a cancellation filter that would detect whether the incoming row was cancelled and if so output the original row and a second row with the price reversed.

To do this I used pipelined table functions to implement the filters. So, I created a record type and a collection type (same page, at the top) for the shape of the data in the pipe1:

type rec is record (
  trade_id number(10),
  instrument_type varchar2(4),
  amount number(18,2), -- amounts are always positive
  cancelled char(1),
  trade_date date -- etc.
);

type rec_set is table of rec;

This is the shape of the data throughout the entire pipeline. Then there was the filter itself:

function cancellation_filter(inputCursor sys_refcursor)
  return rec_set pipelined deterministic
is
  in_rec rec;
  out_rec rec;
begin
  loop
    fetch inputCursor into in_rec;
    exit when inputCursor%NOTFOUND;

    out_rec := in_rec;

    if in_rec.cancelled = 'Y' then
      -- First output the original trade, as if it wasn't cancelled:
      out_rec.cancelled := 'N';
      pipe row(out_rec);

      -- Now, prepare the cancellation row inverting the amount:
      out_rec.amount := in_rec.amount * -1;
      out_rec.cancelled := 'Y';
    end if;

    -- Output the original row or the prepared cancellation row:
    pipe row(out_rec);
  end loop;
end;

Then to run the data through the pipeline (imagining another filter called fx_filter) I did this:

select * from table(fx_filter(cursor(
  select * from table(cancellation_filter(cursor(
    select * from pending_trade
  )))
)));

Operating inside out as necessary – albeit a bit unsettling on the eye because you expect it to run in the order you read it!

And that’s it. Obviously, in the real one there were more filters, but as you can see each filter was very simple. This example isn’t far off what the original actually was.

It kept the logic separate and in one place and it was easy to understand. Also, performance wise, it was a tremendous success. I ran lots of test queries to see the performance characteristics, and it performed admirably. I ran it over the entire trade history (in the test database, a few million trades) and it did the lot in 22 seconds, far more than enough given the trade volumes we were expecting.

In the end, I was able to delete about 15,000 lines of SQL from 8 or 9 stored procedures in favour of about 1,200 lines in total, including all the new infrastructure and other features and fixes. The filters amounted to about 200 lines and replaced the functionality of about 12,000 lines of the original code.

It was a good job I wasn’t measured on lines of code!

You can read more about this kind of approach here: improving performance with pipelined table functions.

  1. I actually wrote this article some time after I left the bank, and I didn’t have access to Oracle at the time I wrote it (and wasn’t about to go and install it just for the post!), so I have been unable to test the code. Please use it as an example of the concept and dig into the documentation yourself!