-
Notifications
You must be signed in to change notification settings - Fork 98
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implementing watch for Java, Mongo 3.6, Fix #146 #145
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A view comments without really knowing the codebase that well - so see it as nit picking in isolation. I'll leave it to @johnoliver to give more meaningful semantic feedback
public static final String OP_DELETE = "delete"; | ||
public static final String OP_INVALIDATE = "invalidate"; | ||
|
||
private List<String> removedFields; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be a List<JsonObject>
?
private JsonObject updatedFields; | ||
private JsonObject fullDocument; | ||
private JsonObject resumeToken; | ||
private String operationType; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could be an enum?
public MongoClientChange(JsonObject json) { | ||
this(); | ||
final JsonArray removedFields = json.getJsonArray("removedFields"); | ||
if (removedFields != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could stream / filter this as opposed to using traditional if / for?
} | ||
|
||
public WatchOptions(JsonObject options) { | ||
resumeAfter = options.getJsonObject("resumeAfter"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some magic strings could be extract here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am following the convention from other DataObject
classes in here, should I make this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that case no.
import io.vertx.ext.mongo.BulkWriteOptions; | ||
import io.vertx.ext.mongo.FindOptions; | ||
import io.vertx.ext.mongo.IndexOptions; | ||
import io.vertx.ext.mongo.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prefer explicit imports
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
|
||
try { | ||
List<Bson> pipelineBson = new ArrayList<>(); | ||
for (int i = 0; i < pipeline.size(); i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we use an enhanced for loop here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should actually probably use the JsonObjectBsonAdapter
MongoIterableStream<ChangeStreamDocument<JsonObject>> adaptor = new MongoIterableStream<>(view, context, options.getBatchSize()); | ||
resultHandler.handle(Future.succeededFuture(new MongoClientWatchIterableImpl(adaptor))); | ||
|
||
} catch (Exception unhandledEx) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there not CheckedExceptions that can be caught? Or are you worried about the random RuntimeExcpetion like an NPE?
@@ -0,0 +1,9 @@ | |||
package io.vertx.kotlin.ext.mongo |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume this is generated code? Same goes for the other Kotlin pieces
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes
} | ||
|
||
@Test | ||
public void testInsertChangeStream() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe slight name change. We're asserting that our watch operation 'sees' the inset, correct?
(Resolved) |
I'll hand that question to @johnoliver :-) |
a2169a5
to
3684b00
Compare
All the tests now run against a replica set, so tests will be a little slower. Otherwise, my changes seem to pass tests. We probably need a specific test for what that |
6ee43af
to
8106469
Compare
Fixes $setOnInsert no longer silently superseding $set on 3.6. Fixes findBatch creating too many connections. Note: Tests against mongo 3.6 cannot run on Windows with current flapdoodle embed mongo.
543491b
to
45e2fe8
Compare
|
||
@DataObject | ||
public class MongoClientChange { | ||
private static final ThreadLocal<JsonObjectCodec> codec = ThreadLocal.withInitial(() -> new JsonObjectCodec(new JsonObject())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason for having a codec per thread?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, wasn't sure if the codec object was thread safe
// If the ID is inside $set, remove it | ||
// The error that this would cause would start to get enforced in 3.6 (i.e., $setOnInsert no longer silently | ||
// supersedes $set). | ||
if (update.containsKey("$set") && update.getJsonObject("$set").containsKey("_id")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not 100% on this one, I do accept that if someone changes the _id to a different value, then it is almost certainly an error by the user, and the mongo will throw an error. However by removing the _id we may be covering up the downstream mistake, and would get into a situation where it looks like we have successfully saved a record from the users point of view but not with the _id that they asked for.
I am a bit 50/50 on what is better here, encode the _id and allow the user to discover their mistake, or remove it.
also this fixes #144
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The tests would have to change because three of them fail due to this issue right now. I agree with you, but I'm less certain about the object ID patching code and how to change it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at this again, I think it's actually better to keep this exact line in, because it passes the old test and maintains 100% compatibility with the current users (the issues reported) right now.
} else { | ||
if (options.getFullDocument() != null | ||
&& !options.getFullDocument().equals(WatchOptions.FULL_DOCUMENT_UPDATE_LOOKUP)) { | ||
resultHandler.handle(Future.failedFuture(new IllegalArgumentException("fullDocument must be null or \"updatedLookup\""))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think the error message makes a bit more sense as:
fullDocument option must be either null or "updatedLookup"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will change
Context context = vertx.getOrCreateContext(); | ||
if (replacedRootDocument) { | ||
MongoIterableStream<JsonObject> adaptor = new MongoIterableStream<>(view.withDocumentClass(JsonObject.class), context, options.getBatchSize()); | ||
resultHandler.handle(Future.succeededFuture((MongoClientChangeStream<T>)new MongoClientReplacedRootStreamImpl(adaptor))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this casting may show a mistake in how control is handled here, createWatchIterable
could return ChangeStreamIterable<JsonObject>
then let the upstream methods deal with converting it to either a MongoClientChangeStream<JsonObject>
or MongoClientChangeStream<MongoClientChange>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm going to remove the watchReplaceRoot
method so this won't be necessary. Based on my conversation with the mongo folks, they still need to think about how to allow $redact
, $replaceRoot
etc. in the watch
pipeline. They're probably going to allow the caller to disable the driver's saving of resume tokens.
build(); | ||
exe = MongodStarter.getDefaultInstance().prepare(config); | ||
exe.start(); | ||
final JsonObject replSetInitiateConfig = new JsonObject(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason to configure the replSet?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, watch
requires a replica set to work.
Or, if you mean there's a way to do that without configuring the replica set, I'd do that instead. But as far as I know, I have to issue this command to get everything going correctly.
@Override | ||
@Fluent | ||
@GenIgnore | ||
default MongoClient watch(String collection, JsonArray pipeline, WatchOptions options, Handler<AsyncResult<MongoClientChangeStream<MongoClientChange>>> resultHandler) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know they are unsupported but should still return MongoService
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will change.
regarding the
Which may give the impression that it is a blocking method call, but you would hope it is not. |
Based on inspecting the mongo code, the cursor uses "An asynchronous connection to a MongoDB server with non-blocking operations." So that's probably not what they meant. |
Looking more carefully, mongo supports getting a Netty event loop group instead of using nio2. You may consider always doing this for the end user if context is Apparently in the postgresql driver they do this: |
Also this passes tests with the 3.6.0-SNAPSHOT along with 3.5.1 |
can we merge this @karianna I had to set mongo version to 3.4 to get the project building, if we merge this it could be set back to 3.6 |
I would like also to backport to 3.5.2 |
@doctorpangloss Hmm, this is reporting lots of conflicts now, not sure how that came to be. If you can resolve those then @johnoliver and I can take a final look |
I think I'll revisit this again later. I am happy with my fork of good, working code. |
Ready to merge
Also fixes #146, fixes #144
The basics are all in there. This should be ready for review.
Other notes:
changeStreams
wonkiness. I've expanded thewatch
jsdoc to try to address common issues.$project
,$replaceRoot
, and$redact
pipeline operators will glitch out in all versions of the mongo java driver due to https://jira.mongodb.org/browse/JAVA-2828 . The code that supports those operations (watchReplaceRoot
) is written and willthrow new UnsupportedOperationException
until that patch is done. What do you think about this? ShouldMongoClientImpl
inspect the user'spipeline
for usages of those (currently) unsupported operators?