Bulk Loading Options for Cassandra

Cassandra's bulk loading interfaces are most useful in two use cases: initial migration or restore from another datastore/cluster and regular ETL. Bulk loading assumes that it's important to you to avoid loading via the thrift interface, for example because you haven't integrated a client library yet or because throughput is critical. 

There are two alternative techniques used for bulk loading into Cassandra: "copy-the-sstables" and sstableloader. Copying the sstables is a filesystem level operation, while sstableloader utilizes Cassandra's internal streaming system. Neither is without disadvantages; the best choice depends on your specific use case. If you are using Counter columnfamilies, neither method has been extensively tested and you are safer writing via thrift.

The key to understanding bulk-loading throughput is that potential throughput depends significantly on the nature of the operation as well as the configuration of source and target clusters and things like number of sstables, sstable size and tolerance to potentially duplicate data. Notably but not significantly, sstableloader in 1.1 is slightly improved over the (freshly re-written) version in 1.0. [1]

Below are good cases for and notable aspects of each strategy.

Copy-the-sstables/"nodetool refresh" can be useful if:

  1. Your target cluster is not running, or if it is running, is not sensitive to latency from bulk loading at "top speed" and associated operations.
  2. You are willing to manually, or have a tool to, de-duplicate sstable names and are willing to figure out where to copy them to in any non copy-all-to-all case. You are willing to run cleanup and/or major compaction understand that some disk space is wasted until you do. [2]
  3. You don't want to deal with the potential failure modes of streaming, which are especially bad in non-LAN deploys including EC2.
  4. You are restoring in a case where RF=N, because you can just copy one node's data to all nodes in the new RF=N cluster and start the cluster without bootstrapping (auto_bootstrap: false in  cassandra.yaml).
  5. The sstables you want to import are a different version than the target cluster currently creates. Example : trying to sstableload -hc- (1.0) sstables into a -hd- (1.1) cluster is reported to not work. [3]
  6. You have your source sstables in something like s3 which can easily parallelize copies to all target nodes. s3<>ec2 is fast and free, close to best case for the inefficiency during copy stage.
  7. You want to increase RF on a running cluster, and are ok with running cleanup and/or major compaction after you do.
  8. You want to restore from a cluster with RF=[x] to a cluster whose RF is the same or smaller and whose size is a multiple of [x]. Example: restoring a 9 node RF=3 cluster to a 3 node RF=3 cluster, you copy 3 source nodes worth of sstables to each target node.

sstableloader/JMX "bulkload" can be useful if:

  1. You have a running target cluster, and want the bulk loading to respect for example streaming throttle limits.
  2. You don't have access to the data directory on your target cluster, and/or JMX to call "refresh" on it.
  3. Your replica placement strategy on the target cluster is so different from the source that the overhead of understanding where to copy sstables to is unacceptable, and/or you don't want to call cleanup on a superset of sstables.
  4. You have limited network bandwidth between the source of sstables and the target(s). In this case, copying a superset of sstables around is especially ineffecient.
  5. Your infrastructure makes it easy to temporarily copy sstables to a set of sstableloader nodes or nodes on which you call "bulkLoad" via JMX. These nodes are either non-cluster-member hosts which are otherwise able to participate in the cluster as a pseudo-member from an access perspective or cluster members with sufficient headroom to bulkload. 
  6. You can tolerate the potential data duplication and/or operational complexity which results from the fragility of streaming. LAN is best case here. A notable difference between "bulkLoad" and sstableloader is that "bulkLoad" does not have sstableloader's "--ignores" option, which means you can't tell it to ignore replica targets on failure. [4]
  7. You understand that, because it uses streaming, streams on a per-sstable basis, and streaming respects a throughput cap, your performance is bounded in terms of ability to parallelize or burst, despite "bulk" loading.

Couchbase rebalance freeze issue

We came across a Couchbase bug during a rebalance while upgrading online to 1.8.1 from 1.8.0.  

Via the UI, we upgraded our first node, re-added it to the cluster, and then set the rebalance off.  It was progressing fine, then stopped around 48% for all nodes.  The tap and disk queues were quiet and there were no servers in pending rebalance.  The upgraded node was able to service requests, but with only a small percentage of the items relative to the other nodes.  The cluster as a whole did not suffer in performance during this issue though there are some spikes in cpu during any rebalance.  

We decided to stop the rebalance, wait a few minutes, then rebalance and we see it is moving again, progressing beyond what it was.  It stopped again, now at 75%. Let sit for 7 mins, then hit Stop Rebalance and Rebalance. Not progressing at all now.

Couchbase support pointed to a bug where if there are empty vbuckets, rebalancing can hang.  This is fixed in 2.0.  The work around solution is to populate buckets with a minimum of 2048 short time to live (TTL >= (10 minutes per upgrade + (2 x rebalance_time)) x num_nodes) items so all vbuckets have something in them.  We then populated all buckets successfully and were able to restart the rebalance process which completed fine.


Benchmarking NDB Against InnoDB on a Write-Intensive Workload

Last month, we evaluated Amazon's new SSD offerings with an extensive series of performance benchmarks.

As a followup, we've prepared a second series of benchmarks that specifically explore performance issues with a write-intensive workload on both NDB and InnoDB storage engines.

Download a free PDF of our findings, and as always, we welcome your feedback, comments and questions below.

Cascade Replication and Delayed servers on PostgreSQL 9.2

Postgresql 9.2 will be released with a new replication feature: Cascade Replication. It will improve streaming between wide networks, shortening distances between the servers. For example: a master server in San Francisco, a slave in Chicago and a cascade server in Barcelona.
But it has other benefit, e.g. when you need to add nodes to the cluster, you can to take backups from your slave without directly affecting the master in terms of I/O of traffic while also minimizing the connections to it.
The setup is easy and intuitive. Like from a slave from a master, but with the difference on the primary_conninfo, which will point to the slave server.
Things to consider when cascading replication:
  • One of the 9.0 limitations in replication -cleaned on 9.1- is the problem that when a master is vacuuming a big table that affects a query that is being executed on the slave. That feature can be enabled with the hot_standby_feedback on the slave server and will add more communication between the servers (if you have very slow connection, maybe you want to disable it)
  • If you have synchronous replication it will only affect 1 slave server against the master, the cascade slave won’t be affected by this setup (cascade server cannot be synchronous).
  • The control function pg_xlog_replay_pause (which stops applying the new incoming wal records) affects only the server in which you are executing it. If the server in which you are executing the control function has cascade servers, those servers will continue streaming and applying the records (occasionally you can have a=c but b!=a, besides replication has a->b->c chain), cause that function avoids the replay, not the streaming shipping. If you want to stop the cascading to all the hosts (not only replication, although the streaming), just stop the slave or change the primary_conninfo in the cascade servers. That is useful if you run BI queries on the cascade servers and don’t want to be affected by writes, so you can delay completely the replication until finish your reports.
  • If you want to use archiving, it must be set up from the master server. Archiving is necessary if you want to enlarge the availability in case of a long downtime of a slave. If you want a delayed server or if you want an incremental backup for PITR in case of disaster.
In our test we’ll have 5 servers, that I’ll label with the port number and data directory to avoid confusions. In this case you can run postgres in same port in different hosts, but just for make clear the difference I will use different ports between them:
Master (8888, data_master) Slave(8889 data_slave) Cascade Slave(8890 data_cascade_slave) delayed standby server using omnipitr-restore (8891 data_delayed) Delayed slave hot standby server using a handmade script (8892 data_streaming_delayed).
To take a backup for replication, first create a user with replication grants and add it to pg_hba.conf in the master. From psql, run:

local   replication     repl                            md5
host    replication     repl            md5
host    replication      repl               md5

*NOTE: if you want to setup several slaves in one move, make a backup locally in the master, compress it and then ship it among the slaves. Compress and backup command:
bin/pg_basebackup -Ft -p8888 -Urepl -Ddata_slave -z -Z9

Servers configuration
Master (8888) configuration:
wal_level = hot_standby
wal_keep_segments = 30
archive_command = 'cp %p /opt/pg92beta/archives_master/%f'
local   replication     repl                                md5
host    replication     repl            md5
host    replication      repl               md5
max_wal_senders should be a value -at least- the number of slaves connected to this server (cascade servers doesn’t count, cause they connect to the other slaves). If you don’t know how many slaves you need, you can setup a higher value (like 10 for example). That won’t affect performance and the process of wal sender only will work if another slave is brought up.
wal_keep_segments is optional, but is a very comfortable setup when we need to have a large available segments on the master to update the slaves. If one slave gets into a not-so-long downtime it can get updated using directly the streaming replication, instead the archiving files. Is not a time value, and this is important to keep in mind. Keeping 30 segments maybe for a high load will represent just a few hours of activity and in a low activity server maybe a few days. archive_mode and archive_command are in charge of copy those segments to other destination to be used for incremental backups or update slaves that were in long period of downtime.
Slave (8889) Configuration:
wal_level = hot_standby
wal_keep_segments = 30
hot_standby = on
hot_standby_feedback = on
standby_mode = on
primary_conninfo = 'user=postgres host=localhost port=8888'

This is a classic setup of a asynchronous slave server through streaming replication. No delay specified, just replicate as fast as it can. The hot_standby_feedback option, allows to communicate with the master, in case of long running queries on the slave and to avoid vacuums during its execution on the master. Cascade slave (8890) configuration:
standby_mode = on
primary_conninfo = 'user=postgres host=localhost port=8889'
Configuration of this host is like every slave, the difference is that the primary_conninfo will point to the slave server where you have taken the backup.
Delayed Slave (8891) configuration:
restore_command = '/opt/pg92beta/omniti/bin/omnipitr-restore -D /opt/pg92beta/data_delayed -l /opt/pg92beta/omniti-restore.log -s /opt/pg92beta/archives_master -w 300 %f %p'
archive_cleanup_command = '/opt/pg92beta/bin/pg_archivecleanup /opt/pg92beta/archives_master %r'
standby_mode = on
You can download omnipitr-restore at . Some documentation here at OmniPitr with delay.
restore_command is in charge to apply the corresponding segments and the option “-w 300” is the “wait” to apply the delay. The server will be on standby mode, which means that is not accessible to query it.

Streaming delayed hot-standby slave (8892) configuration:
Configuration is almost the same as other slaves, just keep an eye on: postgresql.conf:
#...same configuration as the other slaves...
hot_standby_feedback = off   #not necessary cause the server is delayed

Script for delayed using recovery_target_time:
DELAYED_DATE=$(date "+%Y-%m-%d %H:%M:%S %Z" --date="5 minutes ago")
res=$($CONTROL_COMMAND stop)
sed -i "s/.*recovery_target_time[ |\=].*/recovery_target_time =\'$DELAYED_DATE\'/g" $RECOVERY_FILE_LOCATION
#Taken from the doc:
# If this recovery target is not the desired stopping point,
# then shutdown the server, change the recovery target settings
# to a later target and restart to continue recovery.
res=$($CONTROL_COMMAND start) is just a one line script (--mode=fast is required to stop queries when restarted):
su postgres -c "bin/pg_ctl -D data_streaming_delayed --mode=fast -l logfile_streaming_delayed $1"
This configuration has its cons:
The “delay” frame isn’t exact. Why? You need to cron the script - which has the recovery_target_time change- each x period of time. During that period of time, the delay will increase until you re-execute the script. So, if you have configured a “30 minutes ago” and your script is executed each 5 minutes, your delay will be between 30 and 35 minutes. The scripts executes a stop/start, which is necessary to load the new configuration. DO NOT RUN A RELOAD, because the slave will stop the recovery mode. Due this, you will need to have in mind if you want to make queries in that server.
But its advantages are:
You can query the server. You can just stop the refresh of the recovery_target_time for make your reports against that server. Due that the server replay is paused, you will experience more performance in your queries.

After every start, you will find something like:
LOG:  database system was shut down in recovery at 2012-07-04 11:14:39 UTC
LOG:  entering standby mode
LOG:  redo starts at 0/1E000020
LOG:  consistent recovery state reached at 0/1E010B48
LOG:  database system is ready to accept read only connections
LOG:  recovery stopping before commit of transaction 14024, time 2012-07-04 11:14:18.319328+00
LOG:  recovery has paused
HINT:  Execute pg_xlog_replay_resume() to continue.

Do not execute pg_xlog_replay_resume() if you want to keep recovering. If you execute it, will replay the last records and start as a stand alone server, which isn’t our objective. That function will update until the last record received and will continue doing that until something stops the replay.
Let’s see an example:
root@lapp /opt/pg92beta# bin/psql -p8888 -Upostgres
psql (9.2beta2)
Type "help" for help.
postgres=# insert into check_time values (now());
postgres=# select * from check_time ;
2012-07-04 11:19:47.667041
2012-07-04 11:43:23.794384
(2 rows)
postgres=# \q
root@lapp /opt/pg92beta# ./
root@lapp /opt/pg92beta# tail data_streaming_delayed/pg_log/postgresql-2012-07-04_11
postgresql-2012-07-04_110653.log  postgresql-2012-07-04_110853.log  postgresql-2012-07-04_111440.log  postgresql-2012-07-04_114335.log
postgresql-2012-07-04_110808.log  postgresql-2012-07-04_111315.log  postgresql-2012-07-04_111954.log
root@lapp /opt/pg92beta# tail data_streaming_delayed/pg_log/postgresql-2012-07-04_114335.log
LOG:  database system was shut down in recovery at 2012-07-04 11:43:33 UTC
LOG:  entering standby mode
LOG:  redo starts at 0/1E000020
LOG:  consistent recovery state reached at 0/1E012734
LOG:  database system is ready to accept read only connections
LOG:  recovery stopping before commit of transaction 14027, time 2012-07-04 11:43:23.797137+00
LOG:  recovery has paused
HINT:  Execute pg_xlog_replay_resume() to continue.
root@lapp /opt/pg92beta# bin/psql -p8892 -Upostgres
psql (9.2beta2)
Type "help" for help.
postgres=# select * from check_time ;
2012-07-04 11:19:47.667041
(1 row)
postgres=# \q

For large amounts of data, maybe you want to have more large delays (1 day, 6 hours) due to the spended time to start an instance. Based on some experience, 1 day shuold be find to detect problems or to work with reports.

Palomino Evaluates Amazon’s New High I/O SSD Instances

Amazon Web Services (AWS) recently introduced a new storage layer with SSD as its backend. Because their previous EBS storage has been challenging in the areas of I/O throughput and stability, we were extremely excited to run comparative benchmarks to offer a recommendation for customers considering upgrading to SSD. 

The opportunity to expand I/O throughput horizontally can create a longer runway for sharding (distributing data-sets across multiple MySQL logical clusters in order to reduce write I/O), which can be quite compelling. 

Our extensive tests, described in detail in our Evaluation Report prepared by René Cannao and Laine Campbell (click here for a free PDF), illustrate the potentially enormous gains in throughput, performance and cost for companies scaling MySQL in Amazon. Palomino would strongly recommend that clients consider upgrading to SSD storage. Learn more.

Quick script to get list of all jmx beans

Recently I've needed to get information from running Cassandra processes, in order to determine which parameters to monitor. jconsole can be used for this, however if you're using a host on AWS, or in a bandwidth-limited environment, you might find that it takes way too long, especially if all you want to do for the moment is get a list of all beans and their attributes. jmxterm is a good solution for this - a command line interface to your jmx port. So here's a couple of utility scripts that you can use to get all the beans and each of their attributes using jmxterm. You may need to change $jar, $host, and $port in the perl script to fit your environment, and also maybe change the domains in the bash script. Then you should be able to simply run the bash script to get a list of each bean and its attributes. I found this useful when looking for the correct syntax to set up some nagios monitoring for cassandra. For example, here is a snippet of the output:
Checking bean org.apache.cassandra.internal:type=FlushWriter
  %0   - ActiveCount (int, r)
  %1   - CompletedTasks (long, r)
  %2   - CurrentlyBlockedTasks (int, r)
  %3   - PendingTasks (long, r)
  %4   - TotalBlockedTasks (int, r)
And I was able to figure out the syntax for a nagios check:
check_jmx!$HOSTADDRESS$!7199!-O org.apache.cassandra.internal:type=FlushWriter -A CurrentlyBlockedTask
Hopefully, these scripts might be useful to someone else trying to query cassandra (or any java process that uses jmx).
for domain in ${DOMAINS[@]}
    echo "-------------------"
    echo $domain
    output=$(./ $domain 2>/dev/null | tr ' ' '+' | grep '=')
    for line in $output
      bean=$(echo $line | tr '+' ' ')
      echo "Checking bean $bean"
      ./ $domain $bean 2>/dev/null | grep -v "#" | grep -v "Got domain"
#!/usr/bin/env perl
use strict;
my $jar = "/home/ubuntu/jmxterm-1.0-alpha-4-uber.jar";
my $host = "";
my $port = 7199;
my $domain = shift @ARGV;
my @beans = ();
my $bean;
my $size;
for my $arg (@ARGV) {
  if ($arg =~ /^\w/) {
    push (@beans, $arg);
  } else {
$size = @beans;
$bean = join(' ',@beans) if ($size > 0);
open JMX, "| java -jar $jar -n";
print JMX "open $host:$port\n";
print JMX "domain $domain \n";
if (defined $bean && length $bean > 0) {
  print JMX "bean $bean \n";
  print JMX "info \n";
} else {
  print JMX "beans \n";
print JMX "close\n";
close JMX;

Mystery Solved: Replication lag in InnoDB


While running a backup with XtraBackup against a slave server we noticed that replication was lagging significantly. The root cause wasn't clear, but we noticed that DML statements from replication were just hanging for a long time. Replication wasn't always hanging, but it happened so frequently that a 24 hour backup caused replication to lag 11 hours.

The first hypothesis was that all the writes generated from replication (relay log, bin log, redo log, etc) were generating too high contention on IO while XtraBackup was reading the files from disk. The redo log wasn't hitting 75%, which meant that InnoDB wasn't doing aggressive flushing - some other contention was causing replication to stall.

After various tests, we found that disabling innodb_auto_lru_dump solved the issue. It wasn’t entirely clear what the relation was between the lru dump and replication lag during backup, but it was very easy to reproduce. Enabling lru dump at runtime was immediately causing replication to lag, and disabling it restored replication back to normal.

Also, when innodb_auto_lru_dump was enabled we noticed that from time to time the simple command "SHOW ENGINE INNODB STATUS" was hanging for 2-3 minutes.

To attempt to reproduce the issue outside this production environment, we tried to run various benchmarks using sysbench, with and without auto lru dump. The sbtest table (~20GB on disk) was created using the following command:

sysbench --test=oltp --mysql-table-engine=innodb --mysql-user=root --oltp-table-size=100000000 prepare

The InnoDB settings were:

innodb_buffer_pool_size = 10G

innodb_flush_log_at_trx_commit = 2

innodb_thread_concurrency = 0





The various benchmarks were ran using:

- read-only workload vs read-write workload;

- small buffer pool vs large buffer pool (from 2G to 30G)

- small number of threads vs large number of threads


None of the above benchmarks showed any significant difference with auto lru dump enabled or disabled. Perhaps these workloads were not really reproducing our environment where we were getting issues with auto lru dump. We therefore started a new series of benchmarks with only one thread doing mainly writes - this is the workload we expect in a slave used only for replication and backups.

The workload with sysbench was modified to perform more writes than read, yet the result of the benchmark didn't change a lot - enabling or disabling lru wasn't producing any significant change in performance. The problem with this benchmark was that it was generating too many writes and filling the redo log. InnoDB was then doing aggressive flushing and this was a bottleneck that was hiding any effect caused from the lru dump.

To prevent the redo from filling too quickly, we had to change the workload to read a lot of pages, change the buffer pool from 30G to 4G, and test with always restarting mysqld and with the buffer pool prewarmed with:

select sql_no_cache count(*), sum(length(c)) FROM sbtest where id between 1 and 20000000;

sysbench --num-threads=1 --test=oltp --mysql-user=root --oltp-table-size=100000000 --oltp-index-updates=10 --oltp-non-index-updates=10 --oltp-point-selects=1 --max-requests=1000 run


innodb_auto_lru_dump=0:    transactions: (7.26 per sec.)

innodb_auto_lru_dump=1:    transactions: (6.93 per sec.)


This was not a huge difference, but we finally saw some effect of the auto_lru_dump.

It became apparent that the number of transactions per second in the above benchmark was really low because the number of random reads from disk was the bottleneck. To remove this bottleneck, we removed innodb_flush_method=O_DIRECT (therefore using the default flush method), and then ran the following to load the whole table into the OS cache (not into the buffer pool).


dd if=sbtest/sbtest.ibd of=/dev/null bs=1M


To prevent the redo log from filling up, we also changed the innodb_log_file_size from 128M to 1G.

With these changes - always using a buffer pool of 4G, restarting mysqld before each test ,and prewarming the buffer pool with "select sql_no_cache count(*), sum(length(c)) FROM sbtest where id between 1 and 20000000" - we reran the same test changing the number of requests:

10K transactions:

sysbench --num-threads=1 --test=oltp --mysql-user=root --oltp-table-size=100000000 --oltp-index-updates=10 --oltp-non-index-updates=10 --oltp-point-selects=1 --max-requests=10000 run


innodb_auto_lru_dump=0:    transactions: (243.22 per sec.)

innodb_auto_lru_dump=1:    transactions: (230.62 per sec.)


50K transactions:

sysbench --num-threads=1 --test=oltp --mysql-user=root --oltp-table-size=100000000 --oltp-index-updates=10 --oltp-non-index-updates=10 --oltp-point-selects=1 --max-requests=50000 run


innodb_auto_lru_dump=0:    transactions: (194.31 per sec.)

innodb_auto_lru_dump=1:    transactions: (175.69 per sec.)



With innodb_auto_lru_dump=1 , performance drops by a factor of 5-10% !


After this, we wanted to run a completely different test with no writes, only reads.

innodb_auto_lru_dump didn't show any difference when sysbench was executed with read only workload, and we believe the reason is simply the fact that sysbench wasn't changing too many pages in the buffer pool. The easiest way to change pages in the buffer pool is to perform a full scan of a large table with a small buffer pool. We set innodb_flush_method=O_DIRECT, since otherwise the read from the OS cache was too fast and we couldn't detect any effect of innodb_auto_lru_dump. With innodb_buffer_pool_size=4G, and restarting mysqld after each test, this was the the result of a full table scan:


With innodb_auto_lru_dump=0 :

mysql> select sql_no_cache count(*), sum(length(c)) FROM sbtest;


| count(*)  | sum(length(c)) |


| 100000000 |      145342938 |


1 row in set (3 min 27.22 sec)



With innodb_auto_lru_dump=1 :

mysql> select sql_no_cache count(*), sum(length(c)) FROM sbtest;


| count(*)  | sum(length(c)) |


| 100000000 |      145342938 |


1 row in set (3 min 38.43 sec)


Again, innodb_auto_lru_dump=1 affects performance increasing the execution time by ~5% .

It is also important to note that innodb_auto_lru_dump seems to affect performance only for some specific workload scenarios. In fact, the majority of the benchmarks we ran weren't showing any performance effect caused by innodb_auto_lru_dump.


An overview of Riak


At PalominoDB, we constantly evaluate new technologies and database options for our clients’ environments. NoSQL databases are especially popular right now, and Riak is an increasingly-recommended option for highly available, fault-tolerant scenarios. Moss Gross attended an introductory workshop, and shares his findings here. For more on Riak, please see the Basho wiki.

What is Riak?

Some of the key features of Riak include:

- license is apache2

- key-value store

- masterless

- distributed

- fault-tolerant

- eventually consistent (Given a sufficiently long time, all nodes in the system will agree. Depending on your requirements, this could be a determining factor on whether you should use Riak)

- highly available

- scalable

- each node has one file

- supports map/reduce for spreading out query processing among multiple processes

- The database itself is written in Erlang.  There are currently clients written in Ruby, Java, Javascript, Python and many other languages.


Key Concept Definitions


Node:  One running instance of the Riak binary, which is an Erlang virtual machine.

Cluster:  A group of Riak nodes.  A cluster is almost always in a single datacenter.

Riak Object: One fundamental unit of replication.  An object includes a key, a value, and may include one or more pieces of metadata.

Value:  Since it is a binary blob, it can be any format.  The content-type of the value is given in the object's metadata.

Metadata:  Additional headers attached to a Riak Object.  Types of metadata include content-type, links, secondary indexes

Vector Clock: An opaque indicator that establishes the temporality of a Riak object.  This is used internally by Erlang, and the actual timestamp is not intended to be read by the user from the vector clock.

Bucket: Namespace.  This can be thought of as an analog to a table, however Riak namespaces are unlike tables in any way.  They indicate a group of Riak objects that share a configuration.

Link:  Unidirectional pointer from one Riak object to another.  Since an object can have multiple links, bi-directional behavior is possible.  These links use the HTTP RFC.

Secondary Index:  An index used to tag an object for a fast lookup. This is useful for one-to-many relationships



More Key Concepts


Riak uses consistent hashing, which can be thought of as a ring which maps all the possible Riak objects, the number of which can be up to 2^160.


Partition: Logical Division of the ring.  This corresponds to logical storage units on the disk.   The number of partitions must be a power of two, and in practice should be generally very high.  The default number of partitions is 64, and this is considered a very small number.





To insert data, the basic operation is a PUT request to one of the hosts.  The bucket is indicated in the address, and metadata is added via headers to the request.  For example: curl -v http://host:8091:/buckets/training/keys/my-first-object -X PUT -H "content-type: text/plain" -d "My first key"


To insert data without a key, use a POST request.   To retrieve an object, use GET, and to delete, use DELETE.


Riak doesn't have any inherent locking.  This must be handled in the application layer.





Riak has two configuration files, located in /etc/ by default.


vm.args: identifies the node to itself and other clusters.  The name of the node is of the form 'name@foo', where 'name' is a string and 'foo' can be an ip or a hostname, and it must resolve to a machine, using /etc/hosts or DNS, etc.


app.config:  identifies the ringstate directory and the addresses and ports that the node listens on.





There are four main logs


console.log:  All the INFO, WARN, and ERR messages from the node.

crash.log: crash dumps from the Erlang VM

error.log:  just the ERR messages from the node

run_erl.log: logs the start and stop of the master process




Things to check for proper operation:


Locally from the machine the node is on, you can run the command 'riak-admin status'. This gives one minutes stats for the node by name and cluster status


Cluster Status:

nodename: 'xxxx' (compare to what the rest of the cluster things it should be)

connected_nodes (verify it's what's expected)

ring_members (includes OOC members)

ring_ownership (has numbers that should show a general balance in

indexes across all of the nodes, if one is significantly different,

indicates a problem)


Storage Backend


Riak is very versatile in that you have many choices as to what to use for your storage backend.  Some of the possibilities include memory, Bitcask (a homegrown solution written in Erlang, which keeps just a hashmap of keys in memory), LevelDB (which will allow secondary indexing, uses compaction, and does not require you to store all keys in memory), or a combination.  They recommend actually using different storage engines for different buckets if it seems more appropriate.


Creating backups in Riak is relatively straightforward:


1. stop node

2. copy data directory (if not in memory)

3. start node back up

4. let read repair handle the rest



Monitoring and Security


- There is no built-in security in Riak - it's up to the administrator to add access restrictions via http authentication, firewalls, etc. The commercial product, Riak CS, has ACLs and security built-in however.

- JMX monitoring is built-in and can be enabled in the application configuration - just specify the port.

- MapReduce can be enabled by a setting in app.config.  JavaScript or Erlang can be used.


Querying connection information in MongoDB

As with all other database platforms, daily administration and troubleshooting of MongoDB often begins with analyzing database activity.  It can be helpful to see all connections from within MongoDB, which you can do by running:


where the “true” makes the command include idle connections.  But often there's so many connections that unless you capture the output in a script file or some similar thing, it's kind of useless. 

Thanks to a hint given by Scott Hernandez in the mongodb-user forum, we can use the line

   db.currentOp(true).inprog.forEach(function(o){if( <filter criteria> ) printjson(o)});

to show a subset that is more manageable.  For instance, we might want to show all connections from a particular application server:

   db.currentOp(true).inprog.forEach(function(o){if(o.client.indexOf(“”) != -1 ) printjson(o)});

or from the MongoDB logs we’ll see a particular connectionId and want to know where it came from:

   db.currentOp(true).inprog.forEach(function(o){if(o.connectionId == 8606 ) printjson(o)});

This will then show all the connection info for that connectionId:

   "opid" : 7117752, 
   "active" : false, 
   "lockType" : "read", 
   "waitingForLock" : false, 
   "op" : "query", 
   "ns" : "player_data.player_answerchoice", 
   "query" : { 
   "$query" : {  
   "poll_id" : ObjectId("4f58e08db3e93217c2000008") 
   "client" : "",  
   "desc" : "conn", 
   "threadId" : "0x49e31940", 
   "connectionId" : 8606, 
   "numYields" : 0 

Testing and Analyzing Performance with Benchmarks

Generic benchmark tools can be very useful for testing performance on your system. These benchmark tools normally have a set of predefined workloads, but often they don't match your specific workload in useful ways.

One of the best ways to reproduce your workload is to have a good sense of the application that uses the database and how it manages requests to the database. If this is not an option, it is also possible to analyze traffic and to find the most common queries, and use those to define the most common workload.

You can analyze traffic in many ways, from tcpdump to general log, from binlog (only for DML statements) to slow query log.

Afterwards it is possible to analyze them with pt-query-digest (or the obsolete mk-query-digest) to find the most common and/or heavy queries.

In the system we analyze here, the workload was mainly write intensive and involved just 4 tables:

  • tableA was receiving single-row INSERT statements;
  • for each insert on tableA , on average 200 INSERTs were performed in the other 3 tables, distributed as follows: 100 on tableB, 95 on tableC, 5 on tableD (to be more specific , for each INSERT on tableB there is an INSERT either on tableC or tableD).


The system also receives SELECT statements, but in a very small number and very simple primary key lookup.

To simulate the workload, we generated a simple perl script that spawns a certain number of threads that perform the DML statements, and other threads that perform the SELECT statements.

At regular intervals, the script prints statistics and progress.

The benchmark test was executed in a setup with 2 hosts: one host where the client was running, and another host where the servers were running.

The RDBMS tested were: MariaDB 5.2.3 with TokuDB 5.2.7 and InnoDB, and Percona 5.5.20.

Additionally, Percona 5.5.20 was tested as multiple instances running on the same hosts.


The goal of the first benchmark test was to compare TokuDB against InnoDB for this specific workload.

We executed MariaDB with TokuDB with the following (simple) config file:



We found the performance of InnoDB significantly better compared than TokuDB in this instance, though this test - where the dataset fits almost entirely in memory - does not show the real power of TokuDB, which excels at insertion rate at scale. Because these tables have very few indexes, TokuDB and Fractal tree indexes weren't very efficient. Furthermore, the benchmarks were running on FusionIO, which meant that performance on InnoDB didn't degrade much as on spinning disks. We excluded TokuDB out from the next benchmark tests because they are all cases which are not well-suited for TokuDB’s strengths.

We temporarily abandoned MariaDB, and tested Percona 5.5.20 with the following config file:



We tried various innodb_flush_method attempts, and the graphs show that O_DIRECT performs slightly better than the default fsync(), even if the benchmark shows a weird bootstrap. We also tried ALL_O_DIRECT, which performed badly.


Additionally, we tried innodb_log_block_size=4096 instead of the default 512, but nothing changed: insert rate wasn't affected.


One of the goals of this benchmark was to test if running multiple mysqld instances on the same host performs better than a single mysqld instance.

On this specific hardware, the answer seems to be yes. Configuring 8 mysqld instances with the same config file listed below (but different paths and ports), throughput is significantly higher. Note that innodb_buffer_pool_size was set to 256M to try to stress the IO subsystem.



All the above tests were executed using 36 client connections for writes and 36 client connections for reads.


We then ran a new cycle of tests, but instead of using 36 x 2 connections, we used 80 x 2 (80 for writes and 80 for reads).



With 80 connections, throughput was higher than with 36 connections, but at nearly regular intervals we found performance dropping. This seems independent from the size of the buffer pool.

It is interesting to note that with only one mysqld instance, FusionIO was performing at 4.7k – 4.8k IOPS, while with 8 mysqld instances FusionIO was performing at 27k – 29k IOPS. As expected, with a small buffer pool performance tends to slowly degrade when the data doesn't fit in memory.

We tried various values of innodb_write_io_threads, but this didn't make any difference, since the Redo Log was the most written and not the tablespaces.

To better analyze the throughput, we reduced the sample time to 10 seconds and reran the test:



It is clear that throughput drops from time to time, and for a nearly constant amount of time. While the test was running, we tried to monitor the mysqld instances, but there was no clear indication of why they were stalling. The Redo Log wasn't anywhere close to full and InnoDB wasn't performing aggressive flushing. The amount of data read from disk was pretty low but the amount of data written was spiking. Yet, the writes weren't coming from InnoDB.

The reason for the stalls became apparent when we analyzed the content of /proc/meminfo: the Linux Virtual Memory (VM) subsystem was performing dirty pages flushing!

We changed the dirty_background_ratio from 10 (the default) to 1 , and reran the test.

sysctl -w vm.dirty_background_ratio=1


Throughput is now way more stable, although performance has dropped by 2.8%. It is interesting to note that throughput drops at nearly the same time no matter the value of dirty_background_ratio.

A quick analysis of MySQL source code shows that binlog are synced to disk when closed, therefore the drops in throughput may be caused by the flush of binary logs.

We then raised vm.dirty_background_ratio up to 10 (the default value) and lowered max_binlog_size from 1G to 64M.



Throughput doesn't drop drastically as in the two previous tests, but goes up and down at more regular intervals.

At the end of this test, performance with max_binlog_size=64M is ~4% lower than the initial test with max_binlog_size=1G (in both cases, vm.dirty_background_ratio=10).

The last setup of 8 instances with a 256M buffer pool each and max_binlog_size=64M was then compared with a new setup:  4 instances with a 512M buffer pool each (2GB total in both cases) and max_binlog_size=64M:



An interesting outcome from this last test is that total throughput raised by around 4% (that was originally lost using binlogs of 64M) and that the total number of IOPS dropped to ~16k, leaving room for more IO in case of a different workload.

We then ran a new test using only 2 mysqld instances. It shows what was already easy to guess when running a similar test with only one mysqld instance: a lower number of mysqld instances can't fully utilize IO capacity and therefore has lower throughput.


Conclusions (most of them are as expected) for this specific workload and on this specific hardware:

O_DIRECT performs slightly better than the default fsync for innodb_flush_method .

A high number of clients provides more throughput than a smaller number of clients: not enough tests were performed to find the optimal number of clients.

Throughput reduces when data doesn't fit in the buffer pool.

A high number of mysqld instances running on the same server are able to better utilize the number of IOPS that FusionIO is able to provide (perhaps, it should be a very bad idea to run multiple mysqld instances on the same spinning disk or array)

The sync of binlog during binlog rotation are able to stall the system. Lowering dirty_background_ration or max_binlog_size is able to stabilize the throughput.

Syndicate content
Website by Digital Loom