Controlling Collection Distribution

Shard tagging is a new feature in MongoDB version 2.2.0. It’s supposed to force writes to go to a local data center, but it can also be used to pin a collection to a shard or set of shards.

Note: to try this out, you’ll have to use 2.2.0-rc0 or greater.

To play with this feature, first you’ll need to spin up a sharded cluster:

> sharding = new ShardingTest({shards:3,chunksize:1})

This command will start up 3 shards, a config server, and a mongos. It’ll also start spewing out the logs from all the servers into stdout, so I recommend putting this shell aside and using a different one from here on in.

Start up a new shell and connect to the mongos (defaults to port 30999) and create some sharded collections and data to play with:

> // remember, different shell
> conn = new Mongo("localhost:30999")
> db = conn.getDB("villains")
>
> // shard db
> sh.enableSharding("villains")
>
> // shard collections
> sh.shardCollection("villains.joker", {jokes:1});
> sh.shardCollection("villains.two-face", {luck:1});
> sh.shardCollection("villains.poison ivy", {flora:1});
> 
> // add data
> for (var i=0; i for (var i=0; i for (var i=0; i<100000; i++) { db["poison ivy"].insert({flora: Math.random(), count: i, time: new Date()}); }

Now we have 3 shards and 3 villains. If you look at where the chunks are, you should see that they’re pretty evenly spread out amongst the shards:

> use config
> db.chunks.find({ns: "villains.joker"}, {shard:1, _id:0}).sort({shard:1})
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
{ "shard" : "shard0001" }
{ "shard" : "shard0001" }
{ "shard" : "shard0001" }
{ "shard" : "shard0002" }
{ "shard" : "shard0002" }
{ "shard" : "shard0002" }
> db.chunks.find({ns: "villains.two-face"}, {shard:1, _id:0}).sort({shard:1})
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
{ "shard" : "shard0001" }
{ "shard" : "shard0001" }
{ "shard" : "shard0001" }
{ "shard" : "shard0002" }
{ "shard" : "shard0002" }
{ "shard" : "shard0002" }
> db.chunks.find({ns: "villains.poison ivy"}, {shard:1, _id:0}).sort({shard:1})
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
{ "shard" : "shard0001" }
{ "shard" : "shard0001" }
{ "shard" : "shard0002" }
{ "shard" : "shard0002" }
Or, as Harley would say, “Puddin’.”

However, villains tend to not play well with others, so we’d like to separate the collections: 1 villain per shard. Our goal:

Shard Namespace
shard0000 “villains.joker”
shard0001 “villains.two-face”
shard0002 “villains.poison ivy”

To accomplish this, we’ll use tags. A tag describes a property of a shard, any property (they’re very flexible). So, you might tag a shard as “fast” or “slow” or “east coast” or “rackspace”.

In this example, we want to mark a shard as belonging to a certain villain, so we’ll add villains’ nicknames as tags.

> sh.addShardTag("shard0000", "mr. j")
> sh.addShardTag("shard0001", "harv")
> sh.addShardTag("shard0002", "ivy")

This says, “put any chunks tagged ‘mr. j’ on shard0000.”

The second thing we have to do is to make a rule, “For all chunks created in the villains.joker collection, give them the tag ‘mr. j’.” To do this, we can use the addTagRange helper:

> sh.addTagRange("villains.joker", {jokes:MinKey}, {jokes:MaxKey}, "mr. j")

This says, “Mark every chunk in villains.joker with the ‘mr. j’ tag” (MinKey is negative infinity, MaxKey is positive infinity, so all of the chunks fall in this range).

Now let’s do the same thing for the other two collections:

> sh.addTagRange("villains.two-face", {luck:MinKey}, {luck:MaxKey}, "harv")
> sh.addTagRange("villains.poison ivy", {flora:MinKey}, {flora:MaxKey}, "ivy")

Now wait a couple of minutes (it takes a little while for it to rebalance) and then look at the chunks for these collections.

> use config
> db.chunks.find({ns: "villains.joker"}, {shard:1, _id:0}).sort({shard:1})
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
> db.chunks.find({ns: "villains.two-face"}, {shard:1, _id:0}).sort({shard:1})
{ "shard" : "shard0001" }
{ "shard" : "shard0001" }
{ "shard" : "shard0001" }
{ "shard" : "shard0001" }
{ "shard" : "shard0001" }
{ "shard" : "shard0001" }
{ "shard" : "shard0001" }
{ "shard" : "shard0001" }
{ "shard" : "shard0001" }
{ "shard" : "shard0001" }
> db.chunks.find({ns: "villains.poison ivy"}, {shard:1, _id:0}).sort({shard:1})
{ "shard" : "shard0002" }
{ "shard" : "shard0002" }
{ "shard" : "shard0002" }
{ "shard" : "shard0002" }
{ "shard" : "shard0002" }
{ "shard" : "shard0002" }
{ "shard" : "shard0002" }
{ "shard" : "shard0002" }

Scaling with Tags

Obviously, Two-Face isn’t very happy with this arrangement and immediately requests two servers for his data. We can move the Joker and Poison Ivy’s collections to one shard and expand Harvey’s to two by manipulating tags:

> // move Poison Ivy to shard0000
> sh.addShardTag("shard0000", "ivy")
> sh.removeShardTag("shard0002", "ivy")
>
> // expand Two-Face to shard0002
> sh.addShardTag("shard0002", "harv")

Now if you wait a couple minutes and look at the chunks, you’ll see that Two-Face’s collection is distributed across 2 shards and the other two collections are on shard0000.

> db.chunks.find({ns: "villains.poison ivy"}, {shard:1, _id:0}).sort({shard:1})
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
{ "shard" : "shard0000" }
> db.chunks.find({ns: "villains.two-face"}, {shard:1, _id:0}).sort({shard:1})
{ "shard" : "shard0001" }
{ "shard" : "shard0001" }
{ "shard" : "shard0001" }
{ "shard" : "shard0001" }
{ "shard" : "shard0001" }
{ "shard" : "shard0002" }
{ "shard" : "shard0002" }
{ "shard" : "shard0002" }
{ "shard" : "shard0002" }
{ "shard" : "shard0002" }
“Bad heads, you get EBS.”

However, this still isn’t quite right for Harvey, he’d like one shard to be good and one to be bad. Let’s say we take advantage of Amazon’s new offering and replace shard0002 with SSDs. Then we divide up the traffic: send 50% of Harvey’s writes to the SSD shard and 50% to the spinning disk shard. First, we’ll add tags to the shards, describing them:

> sh.addShardTag("shard0001", "spinning")
> sh.addShardTag("shard0002", "ssd")

The value of the “luck” field is between 0 and 1, so we want to say, “If luck = .5, send to the SSD.”

> sh.addTagRange("villains.two-face", {luck:MinKey}, {luck:.5}, "spinning")
> sh.addTagRange("villains.two-face", {luck:.5}, {luck:MaxKey}, "ssd")

Now “bad luck” docs will be written to the slow disk and “good luck” documents will be written to SSD.

As we add new servers, we can control what kind of load they get. Tagging gives operators a ton of control over what collections go where.

Finally, I wrote a small script that adds a “home” method to collections to pin them to a single tag. Example usage:

> // load the script
> load("batman.js")
> // put foo on bar
> db.foo.home("bar")
> // put baz on bar
> db.baz.home("bar")
> // move foo to bat
> db.foo.home("bat")

Enjoy!

19 thoughts on “Controlling Collection Distribution

  1. Really nice feature, thanks for sharing. A couple questions:

    1. What happens when the field of the record used in tagging changes? Let’s say we do update({}, {$set: {luck: 1}}) – will that move all of the records to the corresponding shard transparently?

    2. When the tags are added / changed, I suppose the chunks are moved while reads and writes are accepted as usual with only the chunk on the move in a given moment being unavailable. Is that correct?

    3. The use case I have in mind is to have shards geo-distributed and use tagging to move data to the DC closest to the customer as necessary. Would that work?

    Like

    1. You’re welcome!

      1. The field used by tags is the shard key and you can never change a document’s shard key. If you wanted to do a $set like that you’d have to do: x = findOne({_id:…}); remove({_id:…}); x.luck = 1; save(x);
      2. The chunks are moved via the normal balancing mechanism, so they should be available for read ‘n writes all the time (requests continue to know where the chunk is, next time a balancer runs it does a moveChunk to wherever).

      3. Yes, that’s what this feature is actually intended for!  I just thought this was a cool side effect and couldn’t think of anything particularly interesting to write about the geo-distribution case.

      Like

  2. Could this potentially be used for moving around time series data based on the timestamp? In your example with moving data between spinning and SSDs you could have more recent data kept on SSD based shards, then as time goes by have them “fall” off onto spinning disk based servers. One downside I could see is that all the writes would go to one shard.

    Like

    1. Yeah, the SSDs would have to be able to handle the write load on their own.

      You’d have to keep changing the tag range, too.  You could have the SSDs tagged “new data” and the spinning disk shards tagged “old data”, then update the tag range once a day to advance the “old data” range a day (or something like that).

      Like

    1. To backup chunks, you should use replicas.  You could add a shard and use a tag range to force “old” chunks to that shard.

      Like

  3. Hi,

    very helpful tutorial, thanks. 

    I got two questions so far:

    Is there a way of removing tag ranges?

    A use case:
    I set the tags US, EU and AP to three distinct shards and there are only documents inserted which have a root level key “region” which contains “US”, “EU” or “AP”. What is the easiest way of making sure the documents are stored on the corresponding shard? I thought of settings the tags and adding tag ranges like
    sh.addTagRange(“db.collection”, {region:”U”}, {region:”Z”}, “US”)sh.addTagRange(“db.collection”, {region:”E”}, {region:”U”}, “EU”)sh.addTagRange(“db.collection”, {region:”A”}, {region:”E”}, “AP”)As I understood the sharding key MUST be region as well?

    Like

    1. Well, I found the tag range config in config.tags and removed wrong ones so the mongs log doesn’t complain about catching exceptions because of a wrong tag range config. 😉

      Nevertheless the setup still doesn’t seem to work.

                
      mongos> db.printShardingStatus()— Sharding Status —   sharding version: { “_id” : 1, “version” : 3 }  shards: {  “_id” : “ReplSet1”,  “host” : “ReplSet1/one, two, three”,  “tags” : [ “US” ] } {  “_id” : “ReplSet2”,  “host” : “ReplSet2/one, two, three”,  “tags” : [ “EU” ] } {  “_id” : “ReplSet3”,  “host” : “ReplSet3/one, two, three”,  “tags” : [ “AP” ] }  databases: {  “_id” : “test”,  “partitioned” : true,  “primary” : “ReplSet2” } test.abc chunks: ReplSet2 1 { “region” : { $minKey : 1 } } –>> { “region” : { $maxKey : 1 } } on : ReplSet2 Timestamp(1000, 0)  tag: AP  { “region” : “A” } –>> { “region” : “E” } tag: EU  { “region” : “E” } –>> { “region” : “U” } tag: US  { “region” : “U” } –>> { “region” : “Z” }

      I added a lot of documents which all went to shard2/ReplSet2.mongos> for (var i=0; i for (var i=0; i for (var i=0; i db.abc.find().count()33000

      mongos> db.chunks.find({ns: “test.abc”}, {shard:1, _id:0}).sort({shard:1}){ “shard” : “ReplSet2” }

      Like

  4. Hi –

    We are planning to leverage this for archiving records to another shard. 

    Is it possible to use a non shard key along with shard key for tag rules. For example, based on the status on the document and age of the document, we want to move the document to a different shard. 

    I doubt if we can do the above as your explanation says that chunks are migrated and the migration doesn’t happen at record level. 

    Is there any better approach for this.

    Like

    1. You cannot use a non-shard key because then there’s no guarantee that the documents would be in the same chunk.  Could you use the age as part of your shard key?

      Like

      1. Even if I used a shard key my documents could be in different chunks or they could be subset of a chunk. In that case you would have to migrate the data to new chunks and put new chunks on the tagged shard. So why the limitation? 

        Like

      2. No, not really… say you shard by the field “date_inserted”.  You’ll have chunks like January 14th-15th, January 16th, January 17th-20th, etc.  If you make a tag like {min:{date_inserted: new Date(1970)}, max:{date_inserted: new Date(January 17th)}}, then the chunks with date ranges between 1970 & January 17th can all be moved to whatever shard you specify.  mongos doesn’t even have to look at the data itself, it can just read the config.chunks collection to see what data is in that range.

        Like

  5. Can I use this feature where I have a single collection of several customers (each customers data identified by customerId) and I want to keep customer specific data on a specific shard. Also, I have a compound shard key.

    Like

Leave a comment