Oplog retention and MongoDB connectors for Kafka

Ivana Salmanić
5 min readJan 8, 2021

--

If you are using MongoDB as a source database for streaming data changes to Kafka, your biggest fear is probably oplog retention. Well, fear no more!

In this post, we will discuss how to resume the source connector when oplog retention deletes the last consumed event.

What is oplog and what is oplog retention?

MongoDB oplog is a common name for a database operation log collection located in system databases under local.oplog.rs. This collection is created only when MongoDB is instantiated as a replica set or sharded cluster. The reason is the way MongoDB secondary replicas track changes on the primary replica in the MongoDB cluster.

Primary replica writes to oplog collection all database operations — insert, update, and delete events but also, system logs.

Secondary replicas read or “tail” primary’s oplog collection and apply these events chronologically on their data.

This way MongoDB replicas stay in sync with each other.

Storing database operational logs for a long time is expensive. So the oplog is configured to delete old events. If oplog retention wasn’t a thing, we could recreate collections data at any point in time.

https://www.confluent.io/blog/tag/mongodb/

Why do connectors rely on oplog?

Currently, the two most popular MongoDB connectors for Kafka are the Debezium connector and MongoDB Official Connector. Both of them depend on the oplog collection because they work similarly to MongoDB replicas.

For example, the Debezium connector works similarly as if it was a secondary replica itself. However, it doesn’t join the MongoDB replica set.

When a new Debezium connector is added, it connects to MongoDB and reads the latest oplog event. This event helps Debezium track the current database state, which will be used later for resuming streaming.

The connector performs a database snapshot (if configured that way), just like when a new MongoDB replica is added to the MongoDB cluster. When the snapshot is done, the Debezium connector continues to read the oplog collection from the memorized event.

Debezium connector produces the latest read event from oplog to Kafka. If something goes wrong, it resumes streaming from that event forward.

MongoDB Official Connector works in a slightly different way. It uses MongoDB’s Change streams feature for tracking data changes. Change streams feature relies on MongoDB replication and reading oplog events under the hood. It’s not so different from the Debezium connector in that way. It only provides a layer of abstraction and some additional features.

So you can see, these connectors need oplog collection to produce change data capture events to Kafka.

But what happens when oplog retention kicks in?

If the connector is down for a long time (this is relative and depends on the oplog retention policy), the latest oplog event connector produced is no longer there. It is deleted from oplog and the connector doesn’t know how to resume streaming.

There are a few configuration options that can help handle these situations:

  1. Repeat snapshot and continue streaming — this can cause a lot of load on architecture, and it can take a lot of time to perform if the database is large
  2. Continue streaming as nothing has happened — we will have a time gap, so data inconsistencies and missing data is very likely
  3. Fail and stop streaming — the problem is pretty obvious

What if I told you there is another option?

https://mongodb.email/issues/55

Partial snapshot

Although this feature isn’t supported on connectors, we can imitate it manually. Sometimes the time spent on the manual partial snapshot isn’t justified but there are cases when this makes sense.

For example, in one of my latest challenges, I had to fix quite a lot of CDC pipelines where the source MongoDB cluster was on GCloud. Collections were huge due to operational tasks, archiving, and oplog drastically grew in size. At one point, it only contained about one hour of operational data.

The database was under high load, and connectors failed due to socket timeout (don’t ask).

None of the above three options were useful:

  • Repeating snapshot would take too much time, and the Google network traffic bill would be huge
  • Time gap could be tolerated only for a short time. Missing and incorrect data had to be fixed fast
  • Fail and stop streaming — self-explanatory

So we went with the time gap and this procedure.

The partial snapshot procedure

It’s not mandatory, but it would be good to have some kind of DateTime property that would change every time we update a document — “changed_at”. Also, tracking deleted documents is impossible in this case. But, if we have an update on the boolean field “deleted” instead of a delete operation we can handle them.

At this time, the connector continued to run like nothing happened, and there is a gap in the data streaming. We should determine the gap’s start time and end time so we can later filter events we are missing.

Pause the connector whose gap we are trying to fix. This isn’t mandatory in some cases, like when we are certain no update/delete events will change data we are trying to fix.

Filter and copy all data created or updated from start to end of the gap and import them in a separate collection. There are many ways to do this, so I’ll leave them to you.

You can use the mongo-export tool with query filtering. Copy only documents with the “changed_at” timestamp in the gap interval and later import them using the mongo-import tool. You can filter on any condition you find suitable, but this one will minimize data duplication.

Create the new connector from the existing one by altering connector properties so a snapshot of the new collection can be made.

When the snapshot is done, you can remove this connector and resume the old one.

And that’s it. The gap in streaming is gone, there is no missing data, and data inconsistencies are fixed.

Conclusion

In this article, we’ve gone through the way MongoDB connectors work and how they depend on MongoDB oplog collection for streaming data to Kafka.

I’ve described how oplog retention affects connectors streaming and what options we have to handle the worst-case scenario.

The idea for a manual partial snapshot isn’t applicable in all cases. Sometimes it demands more work than just letting the snapshot repeat. But it saved my day more than once, so I’m sure you will put it to good use.

--

--