Saumitra's blog

Search / Analytics / Distributed Systems / Machine Learning / DSLs

How Cassandra Stores Data on Filesystem

In order to get optimal performance from cassandra, its important to understand how it stores the data on disk. Its common problem among new users coming from RDBMS to not consider the queries while designing their column families(a.k.a tables). Cassandra’s cql interface return data in tabular format and it might give the illusion that we can query it just like any RDBMS, but that’s not the case.

All cassandra data is persisted in SSTables(Sorted String tables) inside data directory. Default location of data directory is $CASSANDRA_HOME/data/data. You can change it using data_file_directories config in conf/cassandra.yaml. On a fresh setup, here’s what data directory looks like:

1
2
3
4
5
6
data/
├── system
├── system_auth
├── system_distributed
├── system_schema
└── system_traces

Each directory in data represent a keyspace. These are internal keyspaces used by cassandra.

Lets create a new keyspace

1
CREATE KEYSPACE ks1 WITH replication={'class':'SimpleStrategy','replication_factor':1};

Since we have not yet created any table, so you will not see ks1 named directory yet in data dir, but you can check cassandra' system_schema.keyspace table.

1
2
3
4
5
cqlsh> select * from system_schema.keyspaces where keyspace_name = 'ks1';

 keyspace_name | durable_writes | replication
---------------+----------------+-------------------------------------------------------------------------------------
           ks1 |           True | {'class': 'org.apache.cassandra.locator.SimpleStrategy', 'replication_factor': '1'}

Let create a table now:

1
2
3
4
5
6
7
8
CREATE TABLE user_tracking (
  user_id text,
  action_category text,
  action_id text,
  action_detail text,

  PRIMARY KEY(user_id, action_category, action_id)
);

As soon as you create the table, here’s how the data directory will look like

1
2
3
ks1/
└── tb1-ed4784f0b64711e7b18a2f179b6f38f9
    └── backups

It created a directory by name of <table>-<table_id>. Cassandra creates a sub directory for each table. All the data for this table will be contained in this dir. You can move around this dir to different location in future by creating a symlink.

You can check for this table in system_schema.tables table

1
2
3
4
5
6
7
cqlsh:ks1> select * from system_schema.tables where keyspace_name = 'ks1';

 keyspace_name | table_name | bloom_filter_fp_chance | caching                                       | comment | compaction                                                                                                                | compression                                                                             | crc_check_chance | dclocal_read_repair_chance | default_time_to_live | extensions | flags        | gc_grace_seconds | id                                   | max_index_interval | memtable_flush_period_in_ms | min_index_interval | read_repair_chance | speculative_retry
---------------+------------+------------------------+-----------------------------------------------+---------+---------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------+------------------+----------------------------+----------------------+------------+--------------+------------------+--------------------------------------+--------------------+-----------------------------+--------------------+--------------------+-------------------
           ks1 |        tb1 |                   0.01 | {'keys': 'ALL', 'rows_per_partition': 'NONE'} |         | {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'} | {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'} |                1 |                        0.1 |                    0 |           {} | {'compound'} |           864000 | ed4784f0-b647-11e7-b18a-2f179b6f38f9 |               2048 |                           0 |                128 |                  0 |      99PERCENTILE

(1 rows)

You will notice the table defaults like gc_grace_seconds and memtable_flush_period_in_ms which got applied.

Lets insert some data to this table:

1
insert into ks1.user_tracking(user_id, action_category, action_id,  action_detail) VALUES ('user1', 'auth', 'a1',  'Logged in from home page');

Now lets check the table data directory. You will notice that there is no new file created. Thats because C* first writes data in memory and then flushed it to disk after a certain threshold is reached. For durability purpose it writes data to a append only commit log. Commit log is shared across keyspaces. By default its location is $CASSANDRA_HOME/data/commitlog and it can be changed in conf/cassandra.yaml

To force flushing memtables data to disk as SSTable, use following command

1
2
3
bin/nodetool flush  //flush all tables from of all keyspace
bin/nodetool flush <keyspace>    //flush all tables for a single keyspace 
bin/nodetool flush <keyspace> <table_name>  //flush a single table from a keyspace

After flushing, here’s what the content of table directory looks like. Each flush create a new SSTable on disk. For each SSTable, following set of files is created

1
2
3
4
5
6
7
8
9
10
11
12
user_tracking-49eb78d0b65a11e7b18a2f179b6f38f9/
├── backups
├── mc-5-big-CompressionInfo.db
├── mc-5-big-Data.db
├── mc-5-big-Digest.crc32
├── mc-5-big-Filter.db
├── mc-5-big-Index.db
├── mc-5-big-Statistics.db
├── mc-5-big-Summary.db
└── mc-5-big-TOC.txt

1 directory, 8 files

All files share common naming convention which is --.db

  • version is an alphabetic string which represents SSTable storage format version. Like mc in above in example
  • generation is an index number which is incremented every time a new SSTable is created for a table. Like 5 in above example
  • component represents the type of information stored in the file. Like CompressionInfo, Data, Digest, Index, etc

The table below provides a brief description for each component.

1
2
3
4
5
6
7
8
9
File Description
mc-1-big-TOC.txt  A file that lists the components for the given SSTable.
mc-1-big-Digest.crc32 A file that consists of a checksum of the data file.
mc-1-big-CompressionInfo.db   A file that contains meta data for the compression algorithm, if enabled.
mc-1-big-Statistics.db    A file that holds statistical metadata about the SSTable.
mc-1-big-Index.db A file that contains the primary index data.
mc-1-big-Summary.db   This file provides summary data of the primary index, e.g. index boundaries, and is supposed to be stored in memory.
mc-1-big-Filter.db    This file embraces a data structure used to validate if row data exists in memory i.e. to minimize the access of data on disk.
mc-1-big-Data.db  This file contains the base data itself. Note: All the other component files can be regenerated from the base data file.

Lets insert few more entries into the table and see what data directory looks like:

1
2
3
4
5
insert into ks1.user_tracking(user_id, action_category, action_id,  action_detail) VALUES ('user1', 'auth', 'a1',  'Logged in from home page');

insert into ks1.user_tracking(user_id, action_category, action_id,  action_detail) VALUES ('user1', 'auth', 'a2', 'Logged in from email link');
insert into ks1.user_tracking(user_id, action_category, action_id,  action_detail) VALUES ('user1', 'dashboard', 'a3', 'Opened dashboard link');
insert into ks1.user_tracking(user_id, action_category, action_id,  action_detail) VALUES ('user2', 'auth', 'a4', 'Logged in');

Again do bin/nodetool flush to flush data to SSTable and check filesystem:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
sam@sam-ub:ks1$ tree user_tracking-49eb78d0b65a11e7b18a2f179b6f38f9/
user_tracking-49eb78d0b65a11e7b18a2f179b6f38f9/
├── backups
├── mc-7-big-CompressionInfo.db
├── mc-7-big-Data.db
├── mc-7-big-Digest.crc32
├── mc-7-big-Filter.db
├── mc-7-big-Index.db
├── mc-7-big-Statistics.db
├── mc-7-big-Summary.db
├── mc-7-big-TOC.txt
├── mc-8-big-CompressionInfo.db
├── mc-8-big-Data.db
├── mc-8-big-Digest.crc32
├── mc-8-big-Filter.db
├── mc-8-big-Index.db
├── mc-8-big-Statistics.db
├── mc-8-big-Summary.db
└── mc-8-big-TOC.txt

1 directory, 16 files

You will notice that a new SSTable with generation number 7 is created. Cassandra periodically keep merging these SSTables through a process called compaction. You can force compaction by running following command.

1
2
3
bin/nodetool compact  //compact SSTables for tables from of all keyspace
bin/nodetool compact <keyspace>    //compact SSTables for tables of a single keyspace
bin/nodetool compact <keyspace> <table_name>  //compact SSTables of a single table

You will notice that after compaction is complete there is just a single SSTable with a new generation. Compaction causes a temporary spike in disk space usage and disk I/O while old and new SSTables co-exist. As it completes, compaction frees up disk space occupied by old SSTables.

Now lets see how data is exactly stored in this SSTable.

C* comes bundled with a utility called sstabledump which can be used to see content of SSTable in json or row format. Here’s what you will see

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
[
  {
    "partition" : {
      "key" : [ "user2" ],
      "position" : 0
    },
    "rows" : [
      {
        "type" : "row",
        "position" : 45,
        "clustering" : [ "auth", "a4" ],
        "liveness_info" : { "tstamp" : "2017-10-24T20:33:32.772370Z" },
        "cells" : [
          { "name" : "action_detail", "value" : "Logged in" }
        ]
      }
    ]
  },
  {
    "partition" : {
      "key" : [ "user1" ],
      "position" : 46
    },
    "rows" : [
      {
        "type" : "row",
        "position" : 104,
        "clustering" : [ "auth", "a1" ],
        "liveness_info" : { "tstamp" : "2017-10-24T20:33:32.074848Z" },
        "cells" : [
          { "name" : "action_detail", "value" : "Logged in from home page" }
        ]
      },
      {
        "type" : "row",
        "position" : 104,
        "clustering" : [ "auth", "a2" ],
        "liveness_info" : { "tstamp" : "2017-10-24T20:33:32.085959Z" },
        "cells" : [
          { "name" : "action_detail", "value" : "Logged in from email link" }
        ]
      },
      {
        "type" : "row",
        "position" : 145,
        "clustering" : [ "dashboard", "a3" ],
        "liveness_info" : { "tstamp" : "2017-10-24T20:33:32.099739Z" },
        "cells" : [
          { "name" : "action_detail", "value" : "Opened dashboard link" }
        ]
      }
    ]
  }
]

Here you will notice 2 “rows”. Don’t confuse this row with RDBMS row. This “row” actually means a paritition. Number of rows(partitions) in sstable is determined by your primary key. Here’s the key which we used:

Primary key consists of 2 parts - partitioning and clustering fields

First part of our key, i.e. “user” will be used for partitioning, and remaining part - aka action_category and action_id will be used for clustering.

To understand this better, lets create a new table with exact same definition but changed partitioning key

1
2
3
4
5
6
7
8
CREATE TABLE user_tracking_new (
  user_id text,
  action_category text,
  action_id text,
  action_detail text,

  PRIMARY KEY((user_id, action_category), action_id)   
);

In above definition, now paritioning key will be user_id + action_category and clustering key will be just action_id. Lets insert exact same 2 rows and notice how they gets stored in sstable

1
2
3
4
insert into ks1.user_tracking_new(user_id, action_category, action_id,  action_detail) VALUES ('user1', 'auth', 'a1', 'Logged in from home page');
insert into ks1.user_tracking_new(user_id, action_category, action_id,  action_detail) VALUES ('user1', 'auth', 'a2', 'Logged in from email link');
insert into ks1.user_tracking_new(user_id, action_category, action_id,  action_detail) VALUES ('user1', 'dashboard', 'a3', 'Opened dashboard link');
insert into ks1.user_tracking_new(user_id, action_category, action_id,  action_detail) VALUES ('user2', 'auth', 'a4', 'Logged in');
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
[
  {
    "partition" : {
      "key" : [ "user1", "dashboard" ],
      "position" : 0
    },
    "rows" : [
      {
        "type" : "row",
        "position" : 67,
        "clustering" : [ "a3" ],
        "liveness_info" : { "tstamp" : "2017-10-24T20:32:45.633901Z" },
        "cells" : [
          { "name" : "action_detail", "value" : "Opened dashboard link" }
        ]
      }
    ]
  },
  {
    "partition" : {
      "key" : [ "user2", "auth" ],
      "position" : 68
    },
    "rows" : [
      {
        "type" : "row",
        "position" : 118,
        "clustering" : [ "a4" ],
        "liveness_info" : { "tstamp" : "2017-10-24T20:32:45.648367Z" },
        "cells" : [
          { "name" : "action_detail", "value" : "Logged in" }
        ]
      }
    ]
  },
  {
    "partition" : {
      "key" : [ "user1", "auth" ],
      "position" : 119
    },
    "rows" : [
      {
        "type" : "row",
        "position" : 182,
        "clustering" : [ "a1" ],
        "liveness_info" : { "tstamp" : "2017-10-24T20:32:45.614746Z" },
        "cells" : [
          { "name" : "action_detail", "value" : "Logged in from home page" }
        ]
      },
      {
        "type" : "row",
        "position" : 182,
        "clustering" : [ "a2" ],
        "liveness_info" : { "tstamp" : "2017-10-24T20:32:45.624710Z" },
        "cells" : [
          { "name" : "action_detail", "value" : "Logged in from email link" }
        ]
      }
    ]
  }
]

You will notice that in this example, for each user, each category of data went into a different partition.

First rule for data modelling is that you should choose your paritioning key in a way for a single query, only 1 partition is read.

Lets try to run some queries and see the impact of partitioning:

1
2
3
4
5
6
7
8
9
cqlsh:ks1> select * from user_tracking where user_id = 'user1';

 user_id | action_category | action_id | action_detail
---------+-----------------+-----------+---------------------------
   user1 |            auth |        a1 |  Logged in from home page
   user1 |            auth |        a2 | Logged in from email link
   user1 |       dashboard |        a3 |     Opened dashboard link

(3 rows)

Lets run same query on user_tracking_new

1
2
cqlsh:ks1> select * from user_tracking_new where user_id = 'user2';
InvalidRequest: Error from server: code=2200 [Invalid query] message="Partition key parts: action_category must be restricted as other parts are"

Reason you got the error is that at minimum you need to specify all columns which are part of partition key. C* needs to know a way to get to a parition before it can query data inside it. In first case it worked because only user_id was part of partition key.

That means our user_tracking_new can only be used to query user data when category is also know. You might wonder then why do you even prefer that over first CF. Reason is that in first CF, for huge volume of user active, parition can grow very large and hence have performance issues. Our goal is to keep partition size to a reasonable size to not effect query performance.

Lets try another query:

1
2
3
cqlsh:ks1> select * from user_tracking_new where user_id = 'user1' and action_category = 'auth' and action_detail = 'Logged in from home page';

InvalidRequest: Error from server: code=2200 [Invalid query] message="Cannot execute this query as it might involve data filtering and thus may have unpredictable performance. If you want to execute this query despite the performance unpredictability, use ALLOW FILTERING"

You will see that it failed to fetch the result. Reason being that above filtering needs to span 2 partitions and hence C* is warning against it. If we want to force it, then we can add ALLOW FILTERING at the end of query

1
2
3
4
5
6
7
cqlsh:ks1> select * from user_tracking_new where user_id = 'user1' and action_category = 'auth' and action_detail = 'Logged in from home page' ALLOW FILTERING;

 user_id | action_category | action_id | action_detail
---------+-----------------+-----------+--------------------------
   user1 |            auth |        a1 | Logged in from home page

(1 rows)

Clustering order

This is another extremely powerful feature available in Cassandra, and it allows you to naturally store records in a given order based on the value of a particular column. So every time you write to the Stocks table, Cassandra will figure out where that record is supposed to go in the physical data partitions and store the record in the order you told it to. Storing data in sorted order gives drastic query performance improvements for range queries, which is very significant in timeseries data.

1
2
3
4
5
6
7
8
CREATE TABLE user_tracking_ordered (
  user_id text,
  action_category text,
  action_id text,
  action_detail text,

  PRIMARY KEY((user_id, action_category), action_id)
) WITH CLUSTERING ORDER BY (action_id DESC); 

Now insert same data in this CF:

1
2
3
4
5
insert into ks1.user_tracking_ordered(user_id, action_category, action_id,  action_detail) VALUES ('user1', 'auth', 'a1', 'Logged in');
insert into ks1.user_tracking_ordered(user_id, action_category, action_id,  action_detail) VALUES ('user1', 'auth', 'a2', 'Logged in');
insert into ks1.user_tracking_ordered(user_id, action_category, action_id,  action_detail) VALUES ('user1', 'auth', 'a3', 'Logged in');
insert into ks1.user_tracking_ordered(user_id, action_category, action_id,  action_detail) VALUES ('user1', 'auth', 'a4', 'Logged in');
insert into ks1.user_tracking_ordered(user_id, action_category, action_id,  action_detail) VALUES ('user1', 'auth', 'a5', 'Logged in');

If you try to fetch 1 row, you will get it in sorted order

1
2
3
4
5
6
7
cqlsh:ks1> select * from user_tracking_ordered limit 1;

 user_id | action_category | action_id | action_detail
---------+-----------------+-----------+---------------
   user1 |            auth |        a1 |     Logged in

(1 rows)

If you change CLUSTERING order to DESC like WITH CLUSTERING ORDER BY (action_id DESC) and do same thing, you can see it now return it sorted in DESC order:

1
2
3
4
5
6
7
cqlsh:ks1> select * from user_tracking_ordered limit 1;

 user_id | action_category | action_id | action_detail
---------+-----------------+-----------+---------------
   user1 |            auth |        a5 |     Logged in

(1 rows)

You can see same ordered getting reflected in sstable too

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
$ sstabledump mc-*-big-Data.db

[
  {
    "partition" : {
      "key" : [ "user1", "auth" ],
      "position" : 0
    },
    "rows" : [
      {
        "type" : "row",
        "position" : 50,
        "clustering" : [ "a5" ],
        "liveness_info" : { "tstamp" : "2017-10-25T08:21:43.628921Z" },
        "cells" : [
          { "name" : "action_detail", "value" : "Logged in" }
        ]
      },
      {
        ...
        "clustering" : [ "a4" ],
        ...
      },
      {
        ...
        "clustering" : [ "a3" ],
        ...
      },
      {
        ...
        "clustering" : [ "a2" ],
        ...
      },
      {
          ...
        "clustering" : [ "a1" ],
        ...
      }
    ]
  }
]

You will notice that for each column stored in cassandra, it has to maintain some other metadata too. You need to understand these overheads too for better capacity planning. I will cover those in some future post.

I hope you got some idea about why cqlsh looks like SQL but doesn’t behave like one. Play around with different combinations of primary key and see what data looks like in SSTable to get more understanding of data storage which will help you to model it based on your queries.