Skip to content

Commit

Permalink
Source: Added configuration to use the document key for the sourceRec…
Browse files Browse the repository at this point in the history
…ord key

This is potentially a breaking change as the newly added configuration
`change.stream.document.key.as.key` defaults to true.

Previously, the resume token was used as the source key, but
it limits the usefulness of tombstones both for topic compactions
and for downstream implementations. 

Not all events relate to documents (eg drop collection) so fallbacks to 
resume token for those events.

As such this is considered both an improvement and a bug fix.

Set to false to revert back to the previous behaviour.

KAFKA-360

Co-authored-by: Ross Lawley <[email protected]>
Co-authored-by: Goncalo Pinho <[email protected]>
  • Loading branch information
rozza and gonpinho committed Aug 21, 2023
1 parent 358ce92 commit 00b266a
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 6 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@

### Improvements
- [KAFKA-274](https://jira.mongodb.org/browse/KAFKA-274) Made Debezium (DBZ) ddl events a noop
- [KAFKA-360](https://jira.mongodb.org/browse/KAFKA-360) Added configuration `change.stream.document.key.as.key` and defaults to true.
Previously, the resume token was used as the key, however, that limits the usefulness of tombstones and topic compaction. Set to false to revert.

### Bug Fixes
- [KAFKA-378](https://jira.mongodb.org/browse/KAFKA-378) Changed connection uri configuration to password type and for security removed the legacy partition map.

- [KAFKA-360](https://jira.mongodb.org/browse/KAFKA-360) Fixed tombstones on delete by using the `documentKey` if available by default.

## 1.10.0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,15 @@ void testSourceEmitsNullValuesOnDelete() {
assertIterableEquals(expectedDocs, actualDocs);

coll.deleteMany(new Document());
getNextResults(task).forEach(s -> assertNull(s.value()));
List<SourceRecord> pollAfterDelete = getNextResults(task);
pollAfterDelete.forEach(s -> assertNull(s.value()));

List<String> documentIds = docs.stream().map(s -> s.get("_id").toString()).collect(toList());
List<String> connectRecordsKeyIds =
pollAfterDelete.stream()
.map(r -> Document.parse(r.key().toString()).get("_id").toString())
.collect(toList());
assertIterableEquals(documentIds, connectRecordsKeyIds);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,15 @@ public class MongoSourceConfig extends AbstractConfig {
PUBLISH_FULL_DOCUMENT_ONLY_CONFIG,
PUBLISH_FULL_DOCUMENT_ONLY_TOMBSTONE_ON_DELETE_DEFAULT);

public static final String DOCUMENT_KEY_AS_KEY_CONFIG = "change.stream.document.key.as.key";
private static final boolean DOCUMENT_KEY_AS_KEY_DEFAULT = true;
private static final String DOCUMENT_KEY_AS_KEY_DISPLAY =
"Use the `documentKey` for the source record key";
private static final String DOCUMENT_KEY_AS_KEY_DOC =
format(
"Use the document key as the source record key. Defaults to: %s",
DOCUMENT_KEY_AS_KEY_DEFAULT);

public static final String FULL_DOCUMENT_BEFORE_CHANGE_CONFIG =
"change.stream.full.document.before.change";
private static final String FULL_DOCUMENT_BEFORE_CHANGE_DISPLAY =
Expand Down Expand Up @@ -1028,6 +1037,17 @@ public Map<String, ConfigValue> validateAll(final Map<String, String> props) {
Width.MEDIUM,
PUBLISH_FULL_DOCUMENT_ONLY_TOMBSTONE_ON_DELETE_DISPLAY);

configDef.define(
DOCUMENT_KEY_AS_KEY_CONFIG,
Type.BOOLEAN,
DOCUMENT_KEY_AS_KEY_DEFAULT,
Importance.MEDIUM,
DOCUMENT_KEY_AS_KEY_DOC,
group,
++orderInGroup,
Width.MEDIUM,
DOCUMENT_KEY_AS_KEY_DISPLAY);

configDef.define(
FULL_DOCUMENT_BEFORE_CHANGE_CONFIG,
Type.STRING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public final class MongoSourceTask extends SourceTask {
static final Logger LOGGER = LoggerFactory.getLogger(MongoSourceTask.class);
private static final String CONNECTOR_TYPE = "source";
public static final String ID_FIELD = "_id";
public static final String DOCUMENT_KEY_FIELD = "documentKey";
static final String COPY_KEY = "copy";
private static final String NS_KEY = "ns";
private static final int UNKNOWN_FIELD_ERROR = 40415;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static com.mongodb.kafka.connect.source.MongoSourceConfig.BATCH_SIZE_CONFIG;
import static com.mongodb.kafka.connect.source.MongoSourceConfig.COLLECTION_CONFIG;
import static com.mongodb.kafka.connect.source.MongoSourceConfig.DATABASE_CONFIG;
import static com.mongodb.kafka.connect.source.MongoSourceConfig.DOCUMENT_KEY_AS_KEY_CONFIG;
import static com.mongodb.kafka.connect.source.MongoSourceConfig.HEARTBEAT_INTERVAL_MS_CONFIG;
import static com.mongodb.kafka.connect.source.MongoSourceConfig.HEARTBEAT_TOPIC_NAME_CONFIG;
import static com.mongodb.kafka.connect.source.MongoSourceConfig.POLL_AWAIT_TIME_MS_CONFIG;
Expand All @@ -29,6 +30,7 @@
import static com.mongodb.kafka.connect.source.MongoSourceConfig.StartupConfig.StartupMode.COPY_EXISTING;
import static com.mongodb.kafka.connect.source.MongoSourceConfig.StartupConfig.StartupMode.TIMESTAMP;
import static com.mongodb.kafka.connect.source.MongoSourceTask.COPY_KEY;
import static com.mongodb.kafka.connect.source.MongoSourceTask.DOCUMENT_KEY_FIELD;
import static com.mongodb.kafka.connect.source.MongoSourceTask.ID_FIELD;
import static com.mongodb.kafka.connect.source.MongoSourceTask.LOGGER;
import static com.mongodb.kafka.connect.source.MongoSourceTask.createPartitionMap;
Expand Down Expand Up @@ -247,10 +249,15 @@ private List<SourceRecord> pollInternal() {
statisticsManager.currentStatistics().getMongodbBytesRead().sample(sizeBytes);
}

BsonDocument keyDocument =
sourceConfig.getKeyOutputFormat() == MongoSourceConfig.OutputFormat.SCHEMA
? changeStreamDocument
: new BsonDocument(ID_FIELD, changeStreamDocument.get(ID_FIELD));
BsonDocument keyDocument;
if (sourceConfig.getKeyOutputFormat() == MongoSourceConfig.OutputFormat.SCHEMA) {
keyDocument = changeStreamDocument;
} else if (sourceConfig.getBoolean(DOCUMENT_KEY_AS_KEY_CONFIG)
&& changeStreamDocument.containsKey(DOCUMENT_KEY_FIELD)) {
keyDocument = changeStreamDocument.getDocument(DOCUMENT_KEY_FIELD);
} else {
keyDocument = new BsonDocument(ID_FIELD, changeStreamDocument.get(ID_FIELD));
}

createSourceRecord(
keySchemaAndValueProducer,
Expand Down

0 comments on commit 00b266a

Please sign in to comment.