Dataflow Bench in C

11 Feb 2016

Neil Conway wrote a little C program in order to answer a question, something like “How much upside might there be, if we switched from Ruby to C for this project?”.

One, writing a little program to answer a question is a cool thing to do and I should do more of it.

Two, it looks pretty comprehensible even if it doesn’t do very much.

So I tried porting it to use D. R. Hanson’s libcii instead of Apache Portable Runtime.

#include <math.h>
#include <stdio.h>
#include <time.h>

#include "table.h"
#include "arena.h"

struct Op;

typedef void (*op_invoke_func)(struct Op *op, int *t);

struct Op {
  struct Op* next;
  op_invoke_func invoke;
};

struct PredicateOp {
  struct Op op;
  int pred;
};

struct JoinOp {
  struct Op op;
  Table_T tbl;
};

struct ListNode {
  struct ListNode* next;
  int* t;
};

struct SinkOp {
  struct Op op;
  Arena_T pool;
  struct ListNode* list_head;
};

static void pred_op_invoke(struct Op* op, int* t);
static void join_op_invoke(struct Op* op, int* t);
static void sink_op_invoke(struct Op* op, int* t);

static struct Op* pred_op_init(Arena_T p, struct Op* next, int pred) {
  struct PredicateOp* result;

  result = Arena_alloc(p, sizeof(*result), __FILE__, __LINE__);
  result->op.next = next;
  result->op.invoke = pred_op_invoke;
  result->pred = pred;
  
  return (struct Op*) result;
}

static void pred_op_invoke(struct Op *op, int *t) {
  struct PredicateOp* self = (struct PredicateOp*) op;

  if (self->pred != *t) {
    struct Op* next = self->op.next;
    next->invoke(next, t);
  }
}

static struct Op* join_op_init(Arena_T p, struct Op *next) {
  struct JoinOp* result;
  int i;

  result = Arena_alloc(p, sizeof(*result), __FILE__, __LINE__);
  result->op.next = next;
  result->op.invoke = join_op_invoke;
  result->tbl = Table_new(0, 0, 0);
  
  for (i = 0; i < 12000; i++) {
    int* iptr;
    
    if (i % 2 != 0) {
      continue;
    }
    
    iptr = Arena_alloc(p, sizeof(*iptr), __FILE__, __LINE__);
    *iptr = i;
    Table_put(result->tbl, iptr, iptr);
  }

  return (struct Op*) result;
}

static void join_op_invoke(struct Op* op, int *t) {
  struct JoinOp* self = (struct JoinOp*) op;

  if (Table_get(self->tbl, t) == NULL) {
    struct Op* next = self->op.next;
    next->invoke(next, t);
  }
}

static struct Op* sink_op_init(Arena_T p) {
  struct SinkOp* result;

  result = Arena_alloc(p, sizeof(*result), __FILE__, __LINE__);
  result->op.next = NULL;
  result->op.invoke = sink_op_invoke;
  result->pool = p;
  result->list_head = NULL;
  
  return (struct Op*) result;
}

static void sink_op_invoke(struct Op* op, int* t) {
  struct SinkOp* self = (struct SinkOp*) op;
  struct ListNode* n;
  
  n = Arena_alloc(self->pool, sizeof(*n), __FILE__, __LINE__);
  n->next = self->list_head;
  n->t = t;
  self->list_head = n;
}

static int* intdup(Arena_T p, int i) {
  int* copy;
  
  copy = Arena_alloc(p, sizeof(*copy), __FILE__, __LINE__);
  *copy = i;
  return copy;
}

static double SEC_PER_NS() {
  return pow(10.0, -9);
}

int main() {
  int i;
  
  for (i = 0; i < 10; i++) {
    Arena_T pool;
    struct Op *op;
    int j;
    struct timespec start_time, finish_time;

    clock_gettime(CLOCK_MONOTONIC, &start_time);

    pool = Arena_new();
    
    op = sink_op_init(pool);
    op = pred_op_init(pool, op, 4);
    op = pred_op_init(pool, op, 6);
    op = join_op_init(pool, op);
    op = pred_op_init(pool, op, 8);
    op = pred_op_init(pool, op, 10);
    op = join_op_init(pool, op);

    for (j = 0; j < 2000000; j++) {
      if (j % 2 == 1) {
        op->invoke(op, intdup(pool, j));
      }
    }
    
    Arena_free(pool);

    clock_gettime(CLOCK_MONOTONIC, &finish_time);

    double duration = finish_time.tv_sec - start_time.tv_sec;
    duration += (finish_time.tv_nsec - start_time.tv_nsec) * SEC_PER_NS();

    printf("Duration: %f seconds\n", duration);
  }

  return 0;
}

I dont know to what extent I succeeded, or to what extent I destroyed the important properties of the original.

However, in breaking it (by removing Apache Portable Runtime) and getting it to work again (by adding libcii), I think I understand what it’s doing. It builds a pipeline of nodes of a few different types, and then sends a bunch of integers into the pipeline.

I wonder if it would be interesting or valuable to try to devise a sequence of TDD requirements (something like a kata), intended to come up with this design incrementally and in a test-driven way.

Something like this, maybe?

Every Op can be invoked with an int.

A PredicateOp is-a Op.
A PredicateOp has-a pred, which is an integer.
A PredicateOp has-a next, which is an Op.
When a PredicateOp is invoked, it compares the incoming integer argument to its pred,
and if they are different, invokes its next with the incoming integer argument,
and if they are the same, does nothing.

A JoinOp is-a Op.
A JoinOp has-a next, which is an Op.
A JoinOp has-a table, which is a set of integers.
When a JoinOp is invoked, it looks up the incoming integer argument in its table,
and if it is not in the table, invokes its next with the incoming integer argument,
and if it already is in the table, does nothing.

A SinkOp is-a Op.
A SinkOp does not have a next.
A SinkOp has-a pool.
A SinkOp has-a list of integers.
When a SinkOp is invoked, it uses its pool to allocate a new list node,
which points to the incoming integer, and pushes it onto its list.

To benchmark:

First, store the current time.

Second, create this "network" of ops:
A JoinOp, connected to a PredOp, connected to a PredOp, connected to
a JoinOp, connected to a PredOp, connected to a PredOp, connected to
a SinkOp.

Third drive 1 million integers through the network,
making sure that each of them slips through all of the filters
and gets stored in the SinkOp.

Fourth, subtract the start time from the current time to find the duration,
and print it out.

Finally, run the benchmark ten times.

Seems a bit similar to the DeltaBlue benchmark and the Richards benchmark - maybe those would make good katas too?