Make an Atomic "Find Or Insert" Operation With a MongoDB Array

Problem

Recently, I was presented with a defect in our code where there was a race condition.  We were using MongoDB to track the state of a collection of messages.  When all the message were received, the application would then execute logic based on the collective information of all the messages. All the messages had a unique identifier that would correlate them together.  We would use that identifier as the ID in the document in MongoDB.

This was a multi-node application where any of the service nodes could get any piece of the message puzzle.  Thus, it was possible for two nodes to have two different pieces of the puzzle at the same time.  There was plenty of logic in the code to "prevent" any race conditions.  All worked great in testing.

However, when we got to production with a wild-west type of load, we started seeing problems.

First problem:

We started to see unique key exceptions while inserting new "tracking" record for coordination.  Although it seemed impossible to collide, it was happening. In the very small window of checking if the document existed and creating it if it didn't, some node beat another to the punch.  (I know, I know, it seems like a "duh" thing.  I get that.)

Second problem:

In the database document for tracking, we saved the unique ID of each message to know we got all the pieces.  We knew on every message how many there were in total.  This was sort of like 3 printers spitting out the same Word document, but each page had it's own identifier of "Page x of n" (i.e., Page 2 of 10).  You can then collate the document after all printing is done from all printers.

The problem came in when the last two pieces were delivered to the service, and two different nodes had the last two pieces at the same time.  Without explaining all our internal code, it came down to the fact that neither service node knew that all the pieces were collected.  They both shrugged it off assuming some other message would be the final piece.


Solution

Many of us know that in SQL, you can wrap a SELECT, INSERT, and/or an UPDATE into one transaction.  It makes the above problem simple to solve.  Up until recent, MongoDB had no (limited?) way to "lock" a document with locking hints like in SQL.  However, even with the current locking mechanisms in MongoDB, it really doesn't nicely solve the problem above.

But, there is a way!

After Googling and overflowing my stack with StackOverflow, I really didn't find a solution I was totally happy with.  Then I took a step back and thought of all I read as a big picture, and the solution presented itself.

The Example

In my example below, we'll pretend that we are receiving a chucked message.  The original message I need to process has been chucked up into smaller packets, and I need to collect all the pieces to reassemble.  We will keep track of which packets we have received thus far, and how many we expect.

This is the document model for this example for keeping track of received pieces:

{ 
    "_id" : ObjectId("59022c5d5c2d32013cd5932c"), 
    "myId" : 1, 
    "packetsReceived" : [
        4,
        6,
        2
    ], 
    "totalExpectedPacketCount" : 10,
    "comment": "Something relevant here."
}

_id: The MongoDB auto-generated ID.
myId: A unique identifier of message which is to be reassembled after we collect all the packets.
packetsReceived: An array of the packet numbers we have received so far
totalExpectedPacketCount: How many chunks or packets I expect to receive in total.
comment: Something to the keep the humans happy.

FindAndModify

MongoDB has a nice function called findAndModify. It will look for a particular document based on your query, and modify it based on your update parameter.  You can even tell it to return the newly updated document with the 'new' parameter.

db.mycollection.findAndModify({
  query: { myId: 1 },
  update: {
     comment: "We updated."
  },
  new: true
})


upsert

Now, what if the document doesn't exist?  We can insert it with the 'upsert' parameter.  The following query will add a document with 'myId: 1' and 'comment: We updated or inserted' by simply adding the 'upsert' parameter with 'true'.  Notice, that we don't have to specify the values in the 'query' block in the 'update' block.  Those get automatically set in the insert part of the upsert.

db.mycollection.findAndModify({
  query: { myId: 1 },
  update: {
    comment: "We updated or inserted." 
  },
  new: true,
  upsert: true
})

$setOnInsert

Now, what if I want a value updated when it exists, but I have several other items that need to be set if the document doesn't exist?  We can use the $setOnInsert operator.  If the document doesn't exist, the '$setOnInsert' values will be used on the insert.  Otherwise, they are ignored.

db.mycollection.findAndModify({
  query: { myId: 1 },
  update: {
    comment: "We updated or inserted.",
    $setOnInsert: { totalExpectedPacketCount: 10 }
  },
  new: true,
  upsert: true
})


$addToSet

The last query gets us most of the way to our final solution, but it's presenting us with a snag.  We could easily modify the query to insert a new array the first time we get a packet.  However, what happens when I get subsequent packets?  How do I modify the array without having to read it, modify the array, and then update it?  That is where the $addToSet operator comes in handy.  Instead of us monkeying around with the array, this operator tells MonogDB to simply insert an element if it doesn't exist.

db.mycollection.findAndModify({
  query: { myId: 1 },
  update: {
    comment: "We updated or inserted.",
    $addToSet: { packetsReceived: 4 },
    $setOnInsert: { totalExpectedPacketCount: 10 } 
  },
  new: true,
  upsert: true
})


Summary

The last query above gives us an atomic "Find or Insert" style query, and it gives us back the latest and greatest document at the time of that operation.


Bonus - How to accomplish the above query in Java Spring Data

If you have implemented MongoTemplate, here's a code example that would accomplish the last query above.

public PacketsReceived findOrInsertPacketsReceived(
        final int myId,
        final int currentPacketNumber,
        final int totalPacketCount) {

    final String packetsReceivedKey = "packetsReceived";
    final String totalPacketCountKey = "totalExpectedPacketCount";
    final String myIdKey = "myId";

    Query q = new Query(Criteria.where(myIdKey).is(myId));

    Update update = new Update()
            .addToSet(packetsReceivedKey,currentPacketNumber)
            .setOnInsert(totalPacketCountKey,totalPacketCount);

    FindAndModifyOptions findAndModifyOptions = new FindAndModifyOptions();
    findAndModifyOptions.returnNew(true);
    findAndModifyOptions.upsert(true);
    findAndModifyOptions.remove(false);
    return mongoTemplate.findAndModify(
            q, update, findAndModifyOptions, SummarizedDayPacketsReceived.class);

}

Comments

Popular Posts