20 Days of Clojure: Day 11

Ok, so now we’ve covered some basic clojure as lisp with immutable datastructures and built-in support for streaming (lazy) interfaces. I was playing around with Erlang right before I found clojure because I was looking for a language with direct support for concurrency, the lispiness and the JVM hosting were the deciding factor for me.

I’m just getting started with clojure’s concurrency support, so you’ll have to bear with me as I might struggle a bit. Yesterday, I showed you a lazy prime number sieve built using the seq interface. Today, I’m going to try to parallelize it. Since I don’t really know what I’m doing, I’ll take you through my thinking.

There are three ways in clojure to deal with memory in threads: dynamic vars, software transactional memory, and agents. I get dynamic vars and agents because I use things like them all the time. Dynamic vars are basically thread-local storage and agents are a kind of ActiveObject [pdf]. Thread local storage isn’t going to work for me since I need to gather answers from my worker threads, not have each work independently (there may be dynamic vars in the solution, but it’s not the way you communicate between threads).

ActiveObject is a pattern that serializes access to an object so that one thing happens to it at a time. Individual calls are put in a queue which is serviced by a thread assigned to the object, and that thread is the only one to touch the object. In clojure, there’s actually a thread pool that services all agents. Now, that I think about it — maybe this will work. I kind of wanted to play with STM’s, but I think I understand this better and I feel I can still make it streamable.

Here’s my naive idea. I’m going to make an agent per prime, then ask them all if a number is divisible by them, then wait for them all and if it is, make a new agent and go on. That seems really dumb unless agents are super-cheap. Let’s see.

I’ll start with something simple:

(defn divisible? [x y] (= (rem x y) 0))

I’ll define my agents to be maps that look like this at the start {:prime 2} and then define this:

(defn check-divisible [agent x]
    (merge agent {:last x :notdivisible (not (divisible? x (:prime agent)))}))

Here’s a quick Repl session so far (no real agents yet)

user=> (check-divisible {:prime 2} 3)
  {:last 3, :prime 2, :notdivisible true}
  user=> (check-divisible {:prime 2} 4)
  {:last 4, :prime 2, :notdivisible false}
  user=> (check-divisible {:prime 2} 5)
  {:last 5, :prime 2, :notdivisible true}

Ok, now I’ll start. Here’s something weird — I just did a CPU snapshot running the sequential prime sieve from yesterday and it looks like this:

 

I don’t know what to make of that — I know purely functional programs are easy to parallelize, but this sieve seems particularly serial in implementation. Anyway, I’ll press on:

Here’s my agent-sieve

(defn agent-sieve [a ints]
    (let [p (next-prime a ints)]
      (lazy-cons p (agent-sieve (cons {:prime p} a) (iterate inc (inc p))))))

I keep stuffing a new agent list into the rest of a lazy-cons — just need to write next-prime:

(defn next-prime [agents ints]
    (let [x (first ints)]
      (check-divisible-by-agents (take-nth 2 agents) x)
      (check-divisible-by-agents (take-nth 2 (drop 1 agents)) x)
      (apply await agents)
      (if (every? (fn [a] (or (not= (:last @a) x) (:notdivisible @a))) agents)
        x
        (next-prime agents (rest ints)))))

Ok, so I’m starting to see problems — for one, I really want to short-circuit if an agent is divisible, but I also want to split up the work. So I decide to make a check-divisible-by-agents that will chain along each agent and stop when it finds an agent that divides into the prime. I start two chains each with half of the agents and wait — this will mean that if one is done, the other won’t know and it will keep going (which is bad). I press on, here’s the chain

(defn check-divisible-by-agents [agents x]
    (if (not= (count agents) 0)
      (! (first agents)
        (fn [a y rest-agents]
          (let [newstate (check-divisible a y)]
            (if (:notdivisible newstate)
              (check-divisible-by-agents rest-agents y))
            newstate))
        x (rest agents))))

So, I run a function on the first agent in the list (with !) that sets the new state to whether it was divisible. If it’s not divisible then I recursively call the check on the rest of the agents.

I’m not getting primes yet — but it’s close. I don’t think await works like I think it does — even though I am chaining ! inside of !, I think that doesn’t count for await. Here’s is next-prime rewritten.

  (defn next-prime [agents ints]
(let [x (first ints)]
(check-divisible-by-agents (take-nth 2 agents) x)
(check-divisible-by-agents (take-nth 2 (drop 1 agents)) x)
(if (every?
(fn [a]
(await a)
(or (not= (:last @a) x)
(:notdivisible @a)))
agents)
x
(next-prime agents (rest ints)))))

Ok, I get primes now

To tell the truth, I can’t believe I got this working at all, but running (time) on the serial and parallel versions give me this:

sequential: “Elapsed time: 9.517 msecs”
parallel: “Elapsed time: 251.025 msecs”

So, obviously, this is not a good way to do this. I don’t know yet if it’s because I’m using too many agents or the extra work I’m doing because I’m not short-circuiting. Anyway, that’s enough for now.

Update: After a night’s rest, I think I know what’s going on with the other thread in the sequential version. My guess is the GC. The only other alternative I can come up with is that under the hoods clojure parallelizes some things I’m using. I started a thread on parallelizing prime sieves on the clojure google group.

(clojure day (March 20) in Northampton, MA is coming soon)