Skip to content

Commit

Permalink
Pass Watch tests.
Browse files Browse the repository at this point in the history
Fixes $setOnInsert no longer silently superseding $set on 3.6. Fixes findBatch creating too many connections. FIXME: Using temporary vertx-embedded-mongo-db repo until it is updated. Cannot run 3.6 on Windows with currently flapdoodle embed mongo.
  • Loading branch information
doctorpangloss committed Apr 11, 2018
1 parent 2171b36 commit 3684b00
Show file tree
Hide file tree
Showing 15 changed files with 176 additions and 80 deletions.
13 changes: 10 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@
<codegen.rxjava.deprecated>true</codegen.rxjava.deprecated>
</properties>

<repositories>
<repository>
<id>jitpack.io</id>
<url>https://jitpack.io</url>
</repository>
</repositories>

<dependencyManagement>
<dependencies>
<dependency>
Expand Down Expand Up @@ -85,9 +92,9 @@
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-mongo-embedded-db</artifactId>
<scope>test</scope>
<groupId>com.github.hiddenswitch</groupId>
<artifactId>vertx-embedded-mongo-db</artifactId>
<version>3.5.1-mongo-three-six-two-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
Expand Down
2 changes: 1 addition & 1 deletion vertx-mongo-client/src/main/asciidoc/dataobjects.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ An unmodifiable list of upsert data. Each entry has the index of the request tha
|===
^|Name | Type ^| Description
|[[fullDocument]]`fullDocument`|`Json object`|-
|[[operationType]]`operationType`|`String`|-
|[[operationType]]`operationType`|`link:enums.html#MongoClientChangeOperationType[MongoClientChangeOperationType]`|-
|[[removedFields]]`removedFields`|`Array of String`|-
|[[resumeToken]]`resumeToken`|`Json object`|-
|[[updatedFields]]`updatedFields`|`Json object`|-
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
* A Vert.x service used to interact with MongoDB server instances.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.mongo.impl.codec.json.JsonObjectCodec;
import org.bson.BsonDocument;
import org.bson.BsonDocumentReader;
import org.bson.codecs.DecoderContext;

Expand All @@ -15,17 +16,11 @@
public class MongoClientChange {
private static final ThreadLocal<JsonObjectCodec> codec = ThreadLocal.withInitial(() -> new JsonObjectCodec(new JsonObject()));

public static final String OP_INSERT = "insert";
public static final String OP_UPDATE = "update";
public static final String OP_REPLACE = "replace";
public static final String OP_DELETE = "delete";
public static final String OP_INVALIDATE = "invalidate";

private List<String> removedFields;
private JsonObject updatedFields;
private JsonObject fullDocument;
private JsonObject resumeToken;
private String operationType;
private MongoClientChangeOperationType operationType;

public MongoClientChange() {
removedFields = new ArrayList<>();
Expand All @@ -51,7 +46,7 @@ public MongoClientChange(JsonObject json) {
this.updatedFields = json.getJsonObject("updatedFields");
this.fullDocument = json.getJsonObject("fullDocument");
this.resumeToken = json.getJsonObject("resumeToken");
this.operationType = json.getString("operationType");
this.operationType = MongoClientChangeOperationType.valueOf(json.getString("operationType"));
}

public JsonObject toJson() {
Expand All @@ -73,11 +68,20 @@ public JsonObject toJson() {
}

public MongoClientChange(ChangeStreamDocument<JsonObject> obj) {
removedFields = obj.getUpdateDescription().getRemovedFields();
updatedFields = codec.get().decode(new BsonDocumentReader(obj.getUpdateDescription().getUpdatedFields()), DecoderContext.builder().build());
if (obj.getUpdateDescription() != null) {
removedFields = obj.getUpdateDescription().getRemovedFields();
final BsonDocument updatedFields = obj.getUpdateDescription().getUpdatedFields();
if (updatedFields != null) {
this.updatedFields = codec.get().decode(new BsonDocumentReader(updatedFields), DecoderContext.builder().build());
}
}

fullDocument = obj.getFullDocument();
resumeToken = codec.get().decode(new BsonDocumentReader(obj.getResumeToken()), DecoderContext.builder().build());
operationType = obj.getOperationType().toString();
final BsonDocument resumeToken = obj.getResumeToken();
if (resumeToken != null) {
this.resumeToken = codec.get().decode(new BsonDocumentReader(resumeToken), DecoderContext.builder().build());
}
operationType = MongoClientChangeOperationType.valueOf(obj.getOperationType().name());
}

public MongoClientChange removedFields(List<String> removedFields) {
Expand All @@ -100,7 +104,7 @@ public MongoClientChange resumeToken(JsonObject resumeToken) {
return this;
}

public MongoClientChange operationType(String operationType) {
public MongoClientChange operationType(MongoClientChangeOperationType operationType) {
this.operationType = operationType;
return this;
}
Expand All @@ -121,7 +125,7 @@ public JsonObject getResumeToken() {
return resumeToken;
}

public String getOperationType() {
public MongoClientChangeOperationType getOperationType() {
return operationType;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package io.vertx.ext.mongo;

public enum MongoClientChangeOperationType {
INSERT,
UPDATE,
REPLACE,
DELETE,
INVALIDATE
}
Original file line number Diff line number Diff line change
Expand Up @@ -209,14 +209,21 @@ private JsonObject generateIdIfNeeded(JsonObject query, JsonObject update, Updat
JsonObject setId = update.getJsonObject("$setOnInsert", new JsonObject());
String id;

//This seems odd, but if you filter based on _id, mongo expects the generated _id to match
// This seems odd, but if you filter based on _id, mongo expects the generated _id to match
if (query.containsKey(ID_FIELD)) {
id = query.getString(ID_FIELD);
} else {
id = JsonObjectCodec.generateHexObjectId();
}
setId.put(ID_FIELD, id);
update.put("$setOnInsert", setId);

// If the ID is inside $set, remove it
// The error that this would cause would start to get enforced in 3.6 (i.e., $setOnInsert no longer silently
// supersedes $set).
if (update.containsKey("$set") && update.getJsonObject("$set").containsKey("_id")) {
update.getJsonObject("$set").remove("_id");
}
}
return update;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.mongodb.async.SingleResultCallback;
import com.mongodb.async.client.MongoIterable;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.json.JsonObject;
import io.vertx.core.streams.ReadStream;
Expand All @@ -12,20 +13,22 @@
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

class MongoIterableStream<T> implements ReadStream<T> {
protected final Context context;
protected final MongoIterable<T> mongoIterable;
protected final int batchSize;
// All the following fields are guarded by this instance
private AtomicBoolean readInProgress = new AtomicBoolean(false);
private AsyncBatchCursor<T> batchCursor;
private Deque<T> queue;
private Handler<T> dataHandler;
private Handler<Throwable> exceptionHandler;
private Handler<Void> endHandler;
private boolean paused;
private boolean readInProgress;
private boolean closed;

public MongoIterableStream(MongoIterable<T> mongoIterable, Context context, int batchSize) {
Expand All @@ -42,7 +45,7 @@ public synchronized MongoIterableStream exceptionHandler(Handler<Throwable> hand
}

// Always called from a synchronized method or block
private void checkClosed() {
private synchronized void checkClosed() {
if (closed) {
throw new IllegalArgumentException("Stream is closed");
}
Expand Down Expand Up @@ -82,7 +85,7 @@ public synchronized MongoIterableStream handler(Handler<T> handler) {
}

// Always called from a synchronized method or block
private boolean canRead() {
private synchronized boolean canRead() {
return !paused && !closed;
}

Expand All @@ -105,29 +108,28 @@ public synchronized MongoIterableStream resume() {
return this;
}

// Always called from a synchronized method or block
private synchronized void doRead() {
if (readInProgress) {
private void doRead() {
// This is essentially a semaphore
if (!readInProgress.compareAndSet(false, true)) {
return;
}
readInProgress = true;
if (queue == null) {
queue = new ArrayDeque<>(batchSize);
queue = new ConcurrentLinkedDeque<>();
}
if (!queue.isEmpty()) {
context.runOnContext(v -> emitQueued());
return;
}
context.<List<T>>executeBlocking(fut -> {
batchCursor.next((result, t) -> {
if (t != null) {
fut.fail(t);
} else {
fut.complete(result == null ? Collections.emptyList() : result);
}
});
}, false, ar -> {
synchronized (this) {

batchCursor.next((result, t) -> {
final Future<List<T>> ar;
if (t != null) {
ar = Future.failedFuture(t);
} else {
ar = Future.succeededFuture(result == null ? Collections.emptyList() : result);
}

context.runOnContext(v -> {
if (ar.succeeded()) {
queue.addAll(ar.result());
if (queue.isEmpty()) {
Expand All @@ -142,23 +144,21 @@ private synchronized void doRead() {
close();
handleException(ar.cause());
}
}
});
});
}

// Always called from a synchronized method or block
private void handleException(Throwable cause) {
private synchronized void handleException(Throwable cause) {
if (exceptionHandler != null) {
exceptionHandler.handle(cause);
}
}

// Always called from a synchronized method or block
private synchronized void emitQueued() {
private void emitQueued() {
while (!queue.isEmpty() && canRead()) {
dataHandler.handle(queue.remove());
}
readInProgress = false;
readInProgress.set(false);
if (canRead()) {
doRead();
}
Expand All @@ -171,7 +171,7 @@ public synchronized MongoIterableStream endHandler(Handler<Void> handler) {
}

// Always called from a synchronized method or block
void close() {
synchronized void close() {
closed = true;
AtomicReference<AsyncBatchCursor> cursorRef = new AtomicReference<>();
context.executeBlocking(fut -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.vertx.kotlin.ext.mongo

import io.vertx.ext.mongo.MongoClientChange
import io.vertx.ext.mongo.MongoClientChangeOperationType

fun MongoClientChange(
): MongoClientChange = io.vertx.ext.mongo.MongoClientChange().apply {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class DistinctTest extends MongoTestBase {

Expand All @@ -35,7 +36,7 @@ public void setUp() throws Exception {
mongoClient = MongoClient.createNonShared(vertx, config);
CountDownLatch latch = new CountDownLatch(1);
dropCollections(mongoClient, latch);
awaitLatch(latch);
longAwaitLatch(latch);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.mongodb.async.client.MongoDatabase;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.mongo.impl.codec.json.JsonObjectCodec;
import io.vertx.test.core.Repeat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -28,7 +29,7 @@ public void setUp() throws Exception {
mongoClient = MongoClient.createNonShared(vertx, config);
CountDownLatch latch = new CountDownLatch(1);
dropCollections(mongoClient, latch);
awaitLatch(latch);
longAwaitLatch(latch);


actualMongo = MongoClients.create("mongodb://localhost:27018");
Expand Down Expand Up @@ -58,7 +59,7 @@ public void testFindBatch() throws Exception {
.handler(result -> foos.add(result.getString("foo")));
}));
}));
awaitLatch(latch);
longAwaitLatch(latch);
assertEquals(numDocs, foos.size());
assertEquals("bar0", foos.get(0));
assertEquals("bar999", foos.get(numDocs - 1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,11 @@ public void testRunCommandWithBody() throws Exception {

JsonObject command = new JsonObject()
.put("aggregate", "collection_name")
.put("cursor", new JsonObject().put("batchSize", 1))
.put("pipeline", new JsonArray());

mongoClient.runCommand("aggregate", command, onSuccess(resultObj -> {
JsonArray resArr = resultObj.getJsonArray("result");
JsonArray resArr = resultObj.getJsonObject("cursor").getJsonArray("firstBatch");
assertNotNull(resArr);
assertEquals(0, resArr.size());
testComplete();
Expand Down Expand Up @@ -708,7 +709,7 @@ public void testCountWithQuery() throws Exception {
}));
}

awaitLatch(latch);
longAwaitLatch(latch);

JsonObject query = new JsonObject().put("flag", true);
mongoClient.count(collection, query, onSuccess(count -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public void setUp() throws Exception {
mongoClient = MongoClient.createNonShared(vertx, config);
CountDownLatch latch = new CountDownLatch(1);
dropCollections(mongoClient, latch);
awaitLatch(latch);
longAwaitLatch(latch);
}

@Override
Expand Down
Loading

0 comments on commit 3684b00

Please sign in to comment.