Using Zookeeper Locks in Jruby

For a while we’ve been using an in-house CLI tool based on the Pidl orchestration framework to run our ETL pipelines in Hadoop. With a small number of pipelines running at any one point, we could run this on a single server within the cluster, but with a growing number of pipelines and the limited resiliency a single server gives we had to make a few changes.

The original code used a text file in the /tmp directory on the local file system. It was simple to use, simple to debug, and easy to remove stale locks. But it didn’t work across multiple servers. We considered using an NFS share for the locks, but given we already need a zookeeper cluster running for our existing services it made sense to use that.

def self.lock name, &block
  fn = "/tmp/#{name}.tmp"
  File.open(fn, File::RDWR | File::CREAT) do |f|
    if not f.flock(File::LOCK_EX | File::LOCK_NB)
      raise RuntimeError.new "Cannot acquire lock"
    end

    begin
      yield
    ensure
      File.unlink(fn)
    end
  end
end

Any time a pipeline needed to be run, it would be wrapped around a lock pipeline.name do block. Simples.

There’s a really nice zk ruby gem for high-level interactions with Zookeeper but unfortunately it doesn’t work with Jruby out of the box, which is a problem because we use Jruby to interact with Hive and HBase Java APIs rather heavily. So after a period of head scratching, we realised the zookeeper gem does have code to support Jruby but it needs building using JRuby.

git checkout https://github.com/zk-ruby/zookeeper.git
cd zookeeper
gem build zookeeper.gemspec

With the zookeeper gem built and uploaded to our internal gemserver, the actual code to use locking in Zookeeper with the zk gem is actually fairly simple.

First we need to find out where the zookeeper servers are. Thankfully we’ve already got an ini file with this information in it, as we need to know for interacting with HBase.

require 'inifile'
maximus_config = IniFile.load('/etc/maximus.cfg')
zookeeper_servers = maximus_config['hbase']['zookeeper.quorom'] + '/maximus'

The ini file it loads contains something like the following:

[hbase]
zookeeper.quorum=zk01:2181,zk02:2181,zk03:2181

Once we know where the zookeeper servers are and have added a chroot to not affect other systems, we can connect to them

zk = ZK.new(zookeeper_servers)

And then we can create a lock and yield to the code we need to run

begin
  if pipeline_lock.lock!
    yield
  else
    raise "Failed to get the lock! #{name}"
  end
ensure
  pipeline_lock.unlock! # We also drop the lock on disconnection to Zookeeper, so this isn't strictly necessary
end

With the final function as follows

def self.lock name, &block
  require 'zk'
  require 'inifile'
  maximus_config = IniFile.load('/etc/maximus.cfg')
  zookeeper_servers = maximus_config['hbase']['zookeeper.quorum'] + '/maximus'
  zk = ZK.new(zookeeper_servers)
  pipeline_lock = zk.locker(name)
  begin
    if pipeline_lock.lock!
      yield
    else
      raise "Oh noes, we didn't get teh lock! #{name}"
    end
  ensure
    pipeline_lock.unlock! # We also drop the lock on disconnection to Zookeeper, so this isn't strictly necessary
  end
end

OK, but how do we debug this? How do we see locks and delete them? Whilst it’s not as simple as deleting a file in /tmp, it’s also not as easy to accidentally delete a lock. You have to explicitly go looking for them. And the code to do this is very simple.

require 'zk'
zk = ZK.new('localhost:2181/maximus')
zk.find('/_zklocking') {|lock| puts lock }
zk.delete('/_zklocking/Alice/ex0000000000')
zk.delete('/_zklocking/Alice/')

This does introduce a change of behaviour which may not be immediately obvious. Where the original code will leave the lock file in place if the pipeline fails, the new code drops the lock as soon as it ends - regardless of success or failure. Quite often the fix to failed pipelines has been to manually remove the locks and just rerun the pipeline, so this is actually desired behaviour most of the time.

We can now run our pipelines on any number of servers, and the locks will be available to them all. It enables us to schedule the running of pipelines using multiple Jenkins slaves, but that’s a topic for another day.