Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kotlin coroutines driver #565

Draft
wants to merge 3 commits into
base: 6.0.x
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ micronaut-core = { module = 'io.micronaut:micronaut-core-bom', version.ref = 'mi
managed-mongo-core = { module = "org.mongodb:mongodb-driver-core", version.ref = "managed-mongo" }
managed-mongo-driver = { module = "org.mongodb:mongodb-driver-sync", version.ref = "managed-mongo" }
managed-mongo-reactive = { module = "org.mongodb:mongodb-driver-reactivestreams", version.ref = "managed-mongo-reactive" }
managed-mongo-coroutine = {module = "org.mongodb:mongodb-driver-kotlin-coroutine", version.ref = "managed-mongo-reactive"}

micronaut-micrometer = { module = "io.micronaut.micrometer:micronaut-micrometer-bom", version.ref = "micronaut-micrometer" }
micronaut-serde = { module = "io.micronaut.serde:micronaut-serde-bom", version.ref = "micronaut-serde" }
Expand Down
29 changes: 29 additions & 0 deletions mongo-coroutine/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
plugins {
id 'io.micronaut.build.internal.mongodb-module'
}

dependencies {
annotationProcessor mn.micronaut.inject.java
annotationProcessor(mnValidation.micronaut.validation.processor)

api mn.micronaut.aop
api mn.micronaut.runtime
api projects.micronautMongoCore
api(libs.managed.mongo.coroutine) {
exclude group: 'org.mongodb', module: 'mongodb-driver-async'
}
api mnValidation.micronaut.validation

compileOnly mnMicrometer.micronaut.micrometer.core
compileOnly mn.micronaut.inject.java
compileOnly mn.micronaut.management

implementation mn.reactor


testImplementation mn.micronaut.function.web
testImplementation mn.micronaut.inject.groovy
testImplementation mn.micronaut.management
testImplementation mnTest.micronaut.test.spock
testImplementation libs.testcontainers
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2017-2020 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.configuration.mongo.coroutine;


import com.mongodb.kotlin.client.coroutine.MongoClient;
import io.micronaut.configuration.mongo.core.DefaultMongoConfiguration;
import io.micronaut.configuration.mongo.core.MongoSettings;
import io.micronaut.context.annotation.Bean;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.annotation.Primary;
import io.micronaut.context.annotation.Requires;
import io.micronaut.runtime.context.scope.Refreshable;

/**
* Factory for the default {@link MongoClient}. Creates the injectable {@link Primary} bean
*
* @author Graeme Rocher
* @since 1.0
*/
@Requires(beans = DefaultMongoConfiguration.class)
@Factory
public class DefaultCoroutineMongoClientFactory {


/**
* Factory Method for creating a client.
* @param mongoConfiguration mongoConfiguration
* @return mongoClient
*/
@Bean(preDestroy = "close")
@Refreshable(MongoSettings.PREFIX)
@Primary
public MongoClient mongoClient(DefaultMongoConfiguration mongoConfiguration) {
return MongoClient.Factory.create(mongoConfiguration.buildSettings(), null);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2017-2020 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.configuration.mongo.coroutine;

import com.mongodb.kotlin.client.coroutine.MongoClient;
import io.micronaut.configuration.mongo.core.MongoSettings;
import io.micronaut.configuration.mongo.core.NamedMongoConfiguration;
import io.micronaut.context.annotation.Bean;
import io.micronaut.context.annotation.EachBean;
import io.micronaut.context.annotation.Factory;
import io.micronaut.runtime.context.scope.Refreshable;

/**
* Factory for named {@link MongoClient} instances. Creates the injectable {@link io.micronaut.context.annotation.Primary} bean
*
* @author Graeme Rocher
* @since 1.0
*/
@Factory
public class NamedCoroutineMongoClientFactory {

/**
* Factory name to create a client.
* @param configuration configuration pulled in
* @return mongoClient
*/
@Bean(preDestroy = "close")
@EachBean(NamedMongoConfiguration.class)
@Refreshable(MongoSettings.PREFIX)
public com.mongodb.reactivestreams.client.MongoClient mongoClient(NamedMongoConfiguration configuration) {
return MongoClient.Factory.create(configuration.buildSettings(), null);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2017-2020 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.configuration.mongo.coroutine.condition;

import com.mongodb.kotlin.client.coroutine.MongoClient;
import io.micronaut.configuration.mongo.core.MongoSettings;
import io.micronaut.context.annotation.Requires;

import java.lang.annotation.*;

/**
* A custom requirement for MongoDB.
*
* @author graemerocher
* @since 1.0
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.PACKAGE, ElementType.TYPE})
@Requires(classes = MongoClient.class)
@Requires(property = MongoSettings.PREFIX)
public @interface RequiresMongo {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2017-2020 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Condition for Mongo.
*
* @author James Kleeh
* @since 1.0
*/
package io.micronaut.configuration.mongo.coroutine.condition;
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Copyright 2017-2020 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.configuration.mongo.coroutine.health;

import com.mongodb.kotlin.client.coroutine.MongoClient;
import io.micronaut.context.BeanContext;
import io.micronaut.context.BeanRegistration;
import io.micronaut.context.annotation.Requires;
import io.micronaut.core.util.StringUtils;
import io.micronaut.health.HealthStatus;
import io.micronaut.management.endpoint.health.HealthEndpoint;
import io.micronaut.management.health.aggregator.HealthAggregator;
import io.micronaut.management.health.indicator.HealthIndicator;
import io.micronaut.management.health.indicator.HealthResult;
import jakarta.inject.Singleton;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

import static io.micronaut.configuration.mongo.coroutine.health.MongoHealthIndicator.HEALTH_INDICATOR_NAME;

/**
* A {@link HealthIndicator} for MongoDB.
*
* @author graemerocher
* @since 1.0
*/
@Singleton
@Requires(beans = MongoClient.class)
@Requires(beans = HealthEndpoint.class)
@Requires(property = HealthEndpoint.PREFIX + "." + HEALTH_INDICATOR_NAME + ".enabled", notEquals = StringUtils.FALSE)
public class MongoHealthIndicator implements HealthIndicator {
static final String HEALTH_INDICATOR_NAME = "mongodb";

private final BeanContext beanContext;
private final HealthAggregator<?> healthAggregator;
private final MongoClient[] mongoClients;

/**
* @param beanContext beanContext
* @param healthAggregator healthAggregator
* @param mongoClients The mongo clients
*/
public MongoHealthIndicator(BeanContext beanContext, HealthAggregator<?> healthAggregator, MongoClient... mongoClients) {
this.beanContext = beanContext;
this.healthAggregator = healthAggregator;
this.mongoClients = mongoClients;
}

@Override
public Publisher<HealthResult> getResult() {

// List<BeanRegistration<MongoClient>> registrations = getRegisteredConnections();
//
// Flux<HealthResult> healthResults = Flux.fromIterable(registrations)
// .flatMap(this::checkRegisteredMongoClient)
// .onErrorResume(throwable -> Flux.just(buildStatusDown(throwable, HEALTH_INDICATOR_NAME)));
final var status = HealthResult.builder("mongo");
status.status(HealthStatus.UP);

return this.healthAggregator.aggregate(HEALTH_INDICATOR_NAME, Flux.just(status.build()));
}

private List<BeanRegistration<MongoClient>> getRegisteredConnections() {
return Arrays.stream(mongoClients)
.map(beanContext::findBeanRegistration)
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList());
}

// private Publisher<HealthResult> checkRegisteredMongoClient(BeanRegistration<MongoClient> registration) {
// MongoClient mongoClient = registration.getBean();
// String databaseName = "mongodb (" + registration.getIdentifier().getName() + ")";
//
// Flux<Map<String, String>> databasePings = Flux.from(pingMongo(mongoClient))
// .map(this::getVersionDetails)
// .timeout(Duration.of(10, ChronoUnit.SECONDS))
// .retry(3);
//
// return databasePings.map(detail -> buildStatusUp(databaseName, detail))
// .onErrorResume(throwable -> Flux.just(buildStatusDown(throwable, databaseName)));
// }
//
// private Document pingMongo(MongoClient mongoClient) {
//Flux.
// return mongoClient.getDatabase("admin").runCommandDocumentWithSession(new BsonDocument("buildinfo", new BsonInt64(1)), null, null);
// }
//
// private Map<String, String> getVersionDetails(Document document) {
// String version = document.get("version", String.class);
// if (version == null) {
// throw new IllegalStateException("Mongo version not found");
// }
// return Collections.singletonMap("version", version);
// }

private HealthResult buildStatusUp(String name, Map<String, String> details) {
HealthResult.Builder builder = HealthResult.builder(name);
builder.status(HealthStatus.UP);
builder.details(details);
return builder.build();
}

private HealthResult buildStatusDown(Throwable throwable, String name) {
HealthResult.Builder builder = HealthResult.builder(name);
builder.status(HealthStatus.DOWN);
builder.exception(throwable);
return builder.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2017-2020 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* MongoDB Health indicators.
*
* @author James Kleeh
* @since 1.0
*/
package io.micronaut.configuration.mongo.coroutine.health;
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2017-2020 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Configuration group for Mongo Coroutines.
*
* @author James Kleeh
* @since 1.0
*/
@Configuration
@RequiresMongo
package io.micronaut.configuration.mongo.coroutine;

import io.micronaut.configuration.mongo.coroutine.condition.RequiresMongo;
import io.micronaut.context.annotation.Configuration;
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#
# Copyright 2017-2020 original authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

Args = --initialize-at-run-time=io.micronaut.configuration.mongo.core.test.AbstractMongoProcessFactory
Loading