Hacking Cassandra

by Tadas Vilkeliskis, December 28, 2013

At Chartbeat we are thinking about adding probabilistic counters to our infrastructure, HyperLogLog (HLL) in particular. One of the challenges with something like this is to make it redundant and have somewhat good performance. Since HyperLogLog is a relatively new approach to cardinality approximation there are not many off the shelf solutions, so why not try and implement HLL in Cassandra?

The last hackweek I spent hacking Apache Cassandra. My goal was to add a HyperLogLog data type to the database, so I could count billions of unique items with very small memory footprint and get redundancy for free. However, the work I’ve done was not entirely a success, but it wasn’t a failure either. The project was much more complicated than I initially anticipated. There are tons and tons of layers of abstractions and it requires a significant amount of time to how things work. So, In this post I am going to walk you through some Cassandra internals and the changes that are necessary to add a new data type. All code examples will refer to Cassandra 2.x. Of course I don’t have fully working HLL implementation in cassandra (see the final section), but it was a nice hack.

Interfacing With Cassandra

You can talk to Cassandra in two different ways: Thrift and CQL. Thrift is a legacy interface which provides access to all basic data types (integers, text, etc.), however it falls short when it comes to complex data types that were introduced in CQL3 such as lists. The reason for this is that in Thrift you get the data back that has very close representation to how it’s stored on disk. CQL on the other hand gives you SQL like syntax for querying data in Cassandra and in this post I’ll be covering a little bit of CQL3.

Extending The Grammar

We can think of the HyperLogLog (HLL) as a unique item set where we are not storing actual items but cardinality approximation of the set. Our HLL needs at least two operations: add and get. add operation for adding new items to the set and get to retrieve the set’s cardinality. In terms of CQL3 we want to be able to do the following:

CREATE TABLE my_table (id int primary key, hll hyperloglog<text, 6>);
INSERT INTO my_table (id, hll) VALUES(1, ['item1', 'item2', 'item3']);
UPDATE my_table SET hll = hll + ['item4', 'item5'] WHERE id=1;
SELECT hll FROM my_table;

CQL3 grammar is defined in src/java/org/apache/cassandra/cql3/Cql.g. During build time this file gets processed with ANTLR in order to generate java code which will later be used to parse the actual CQL3 queries received by the server. First, we need to add hyperloglog keyword. This can be easily done by appending K_HYPERLOGLOG: H Y P E R L O G L O G to the case-insensitive keywords section of Cql.g. Next, we need to extend the collection_type since HLL naturally falls into this category (at least that’s what I thought). We have the code that looks something like this:

collection_type returns [CQL3Type pt]
    @init {
        Term.Raw precision = null;
    }
    : K_MAP '<' t1=comparatorType ',' t2=comparatorType '>'
        { try {
            // if we can't parse either t1 or t2, antlr will "recover" and we may have t1 or t2 null.
            if (t1 != null && t2 != null)
                $pt = CQL3Type.Collection.map(t1, t2);
          } catch (InvalidRequestException e) { addRecognitionError(e.getMessage()); } }
    | K_LIST '<' t=comparatorType '>'
        { try { if (t != null) $pt = CQL3Type.Collection.list(t); } catch (InvalidRequestException e) { addRecognitionError(e.getMessage()); } }
    | K_SET '<' t=comparatorType '>'
        { try { if (t != null) $pt = CQL3Type.Collection.set(t); } catch (InvalidRequestException e) { addRecognitionError(e.getMessage()); } }
    | K_HYPERLOGLOG '<' t1=comparatorType ',' t3=intValue { precision=t3; } '>'
        { try { if (t1 != null && t3 != null) $pt = CQL3Type.Collection.hyperloglog(t1, precision); } catch (InvalidRequestException e) { addRecognitionError(e.getMessage()); } }
    ;

The pieces that were added have been highlighted. In the code above we are extending grammar for the collection type. These changes will allow us to match hyperloglog keyword with two terms inside the braces. The first term indicates the type of data we will allow to be put in our HLL type, whereas the second one specifies HLL precision (or it could be something else depending on the implementation). The data type is instantiated with a factory method CQL3Type.Collection.hyperloglog. We are going to take a look at it in the next section.

Implementing The New Data Type

We’ve successfully made CQL3 grammar modifications; however, before we are able to build Cassandra we must implement the HLL data type. CQL3 data type helper methods are listed in src/java/org/apache/cassandra/cql3/CQL3Type.java. The only modification we need to make is to add the hyperloglog factory method to the Collection class:

public static Collection hyperloglog(CQL3Type t, Term.Raw precision) throws InvalidRequestException
{
    if (t.isCollection())
        throw new InvalidRequestException("hyperloglog type cannot contain another collection");
    if (t.isCounter())
        throw new InvalidRequestException("counters are not allowed inside a collection");

    return new Collection(HyperLogLogType.getInstance(t.getType()));
}

The function above is the same function that gets called from the code generated by ANTLR. The precision argument is currently not used but it could be easily added to the HyperLogLogType class. Let’s create src/java/org/apache/cassandra/db/marshal/HyperLogLogType.java. The HyperLogLogType class is based off SetsType and gets included in CQL3Type.java.

public class HyperLogLogType<T> extends CollectionType<HyperLogLog<T>>
{
    // interning instances
    private static final Map<AbstractType<?>, HyperLogLogType> instances = new HashMap<AbstractType<?>, HyperLogLogType>();

    public final AbstractType<T> elements;
    private final HyperLogLogSerializer<T> serializer;

    public static HyperLogLogType<?> getInstance(TypeParser parser) throws ConfigurationException, SyntaxException
    {
        List<AbstractType<?>> l = parser.getTypeParameters();
        if (l.size() != 1)
            throw new ConfigurationException("HyperLogLogType takes exactly 1 type parameter");

        return getInstance(l.get(0));
    }

    public static synchronized <T> HyperLogLogType<T> getInstance(AbstractType<T> elements)
    {
        HyperLogLogType<T> t = instances.get(elements);
        if (t == null)
        {
            t = new HyperLogLogType<T>(elements);
            instances.put(elements, t);
        }
        return t;
    }

    // some placeholder methods were omitted, but you can look at other
    // types for sample code.

    public HyperLogLogType(AbstractType<T> elements)
    {
        super(Kind.HYPERLOGLOG);
        this.elements = elements;
        this.serializer = HyperLogLogSerializer.getInstance(elements.getSerializer());
    }

    public ByteBuffer serialize(List<Pair<ByteBuffer, Column>> columns)
    {
        // serialize for CQL/thrift transport.
        ...
    }
}

The HyperLogLogType class acts almost like a singleton and it keeps track of all possible HLL type instances, e.g. hyperloglog<text> or hyperloglog<int> and knows which serializers to use. The serialize method seems to be the place where we prepare data for transport. So, on the read side we would probably want to return cardinality instead of the actual raw HLL data (see operations below).

When a CQL query arrives it gets processed by the QueryProcessor class which gives you back a statement instance and executes it. Each data type needs to implements operations that it supports. For example, when we are executing an UPDATE statement, the columns which are getting modified must implement a set or add operation. The operations are defined in cql3/Operation.java file. For our HLL type we need to add something similar to this:

// For SetValue operation
case HYPERLOGLOG:
    return new HyperLogLogs.Setter(receiver.name, v);
...

// For Addition operation
case HYPERLOGLOG:
    return new HyperLogLogs.Adder(receiver.name, v);
...

// Substraction
case HYPERLOGLOG:
    throw new InvalidRequestException(String.format("Invalid operation (%s) for hyperloglog column %s", toString(receiver), receiver));
...

// ElementDeletion
case HYPERLOGLOG:
    throw new InvalidRequestException(String.format("Invalid deletion operation for hyperloglog collection column %s", receiver));
...

The SetValue operation gets called when we execute an INSERT statement or do something like UPDATE t SET a=new_value. Addition, as you may have guessed, is performed when running things like UPDATE t SET a = a + new_value. In the example above HyperLogLogs is a class cql3 directory which contains HLL helper methods and operation. Most of the code in there is based on Sets, Lists and Maps.

The way I imagined HLL being implemented in Cassandra was to store HLL bitmap with additional metadata, such as precision and error tolerance, in Cassandra. So, when we are talking about various HLL operations we want to be able to pull out existing data from the disk, instantiate our HLL implementation with existing data, add new items and write out the new bitmap back to disk. This does not seem to be very difficult to achieve once you understand what type of data your Adder and Setter methods receive. For example, when executing UPDATE statement from the grammar section, your add operation will receive an instance of list Literal class defined in Lists.java and so on. One tricky part that took me a while to figure out (basically because I was not paying too much attention reading some code...) was implementing the Adder. When adding new items to your HLL collection you want to read in the existing data. Reading in the existing data happens magically behind the scenes once your operation class implements public boolean requiresRead() method and then all existing data will be passed down to the execute method.

With all these grammar changes and your custom collection type implementation you should be able to create a new table with a column of the new type and perform reads and writes.

Notes On Debugging

You need JDK 7 to build Cassandra 2.x. I’m not a Java expert, so it was a bit tricky for me to figure out what is the best way to debug Cassandra. Once Cassandra is built you can launch it like this:

$ JVM_OPTS="-Xrunjdwp:transport=dt_socket,address=8000,server=y,suspend=n" ./cassandra

This will bind debugger on port 8000 to which you can connect and explore what the dang is going on inside the Cassandra server:

$ jdb -connect com.sun.jdi.SocketAttach:hostname=localhost,port=8000

I think debugging is the only sane way to understand how different pieces fit together in Cassandra.

And The Winner Is...

There is no winner this time. You can find all my unfinished “work in progress” code on github. One of the reasons why I didn’t finish implementing the HLL data type, was that every new type requires an identifier in transport/data layer. So, if I were to add a new data type I would have to modify any existing drivers in order to support it. There might be better approaches to do this; however, when I initially started it seemed like the right choice, but it probably wasn’t. Cassandra’s issue tracker gives back a few HLL results, so I’m hoping that someone will make HLL part of Cassandra in the upcoming version.