Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] Reactive Client for OpenSearch #108

Closed
naumenkovadym opened this issue Mar 16, 2023 · 12 comments
Closed

[FEATURE] Reactive Client for OpenSearch #108

naumenkovadym opened this issue Mar 16, 2023 · 12 comments
Labels
enhancement New feature or request

Comments

@naumenkovadym
Copy link

naumenkovadym commented Mar 16, 2023

Hi OpenSearch Team,

I see that Reactive classes from Spring Data Elasticsearch are missing, like DefaultReactiveElasticsearchClient:
https://github.com/spring-projects/spring-data-elasticsearch/tree/main/src/main/java/org/springframework/data/elasticsearch/client/erhlc

https://github.com/opensearch-project/spring-data-opensearch/tree/main/spring-data-opensearch/src/main/java/org/opensearch/data/client/orhlc

How can I use Spring WebFlux with OpenSearch?

Thanks.

@naumenkovadym naumenkovadym added enhancement New feature or request untriaged labels Mar 16, 2023
@naumenkovadym naumenkovadym changed the title [FEATURE] Reactive Client for Opensearch [FEATURE] Reactive Client for OpenSearch Mar 16, 2023
@wbeckler wbeckler removed the untriaged label Apr 3, 2023
@wbeckler
Copy link

wbeckler commented Apr 3, 2023

This sounds like it's worth working on. Feel free to propose an architecture here or to submit a PR.

@j3t
Copy link

j3t commented Apr 6, 2023

How can I use Spring WebFlux with OpenSearch?

You can use the open search async client with Mono.fromFuture but this is just a workaround, for example Mono.fromFuture(client.search(searchRequest, JsonNode.class))

@naumenkovadym
Copy link
Author

Thanks, I managed to do it using the old spring-data-elasticsearch

@waytoharish
Copy link

@naumenkovadym Please can you help me how you were able to implement the same as I am stuck at same place and not able to integrate with Spring WebFlux

@naumenkovadym
Copy link
Author

naumenkovadym commented Aug 8, 2023

hi @waytoharish

Sure, here's an example

plugins {
    id 'java'
    id 'org.springframework.boot' version '2.5.14'
    id 'io.spring.dependency-management' version '1.1.0'
}

group = 'com.example'
version = '1.0.0'

configurations {
    compileOnly {
        extendsFrom annotationProcessor
    }
}

repositories {
    mavenCentral()
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-webflux'
    implementation 'org.springframework.boot:spring-boot-starter-validation'
    implementation('org.springframework.boot:spring-boot-starter-data-elasticsearch') {
        exclude group: 'org.elasticsearch'
        exclude group: 'org.elasticsearch.client'
    }
    implementation "org.elasticsearch.client:elasticsearch-rest-high-level-client:7.13.4"

    implementation 'io.netty:netty-all'

    implementation("com.google.guava:guava:31.1-jre")

    compileOnly 'org.projectlombok:lombok'
    annotationProcessor 'org.projectlombok:lombok'

    implementation 'ca.pjer:logback-awslogs-appender:1.6.0'
    implementation 'ch.qos.logback:logback-classic'
    implementation 'ch.qos.logback:logback-classic'

    implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.12.0'

    implementation('com.fasterxml.jackson.core:jackson-core:2.10.0')
    implementation('com.fasterxml.jackson.core:jackson-annotations:2.10.0')
    implementation('com.fasterxml.jackson.core:jackson-databind:2.10.0')

    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    testImplementation 'io.projectreactor:reactor-test'
    testImplementation("org.testcontainers:elasticsearch:1.17.6")
    testImplementation 'org.junit.vintage:junit-vintage-engine:5.9.2'
}

Configuration

import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
import org.springframework.data.elasticsearch.client.reactive.ReactiveRestClients;
import org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations;
import org.springframework.data.elasticsearch.core.ReactiveElasticsearchTemplate;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.data.elasticsearch.core.convert.MappingElasticsearchConverter;
import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext;
import org.springframework.web.reactive.function.client.ExchangeStrategies;

@Configuration
public class OpenSearchConfiguration {

    private static final int DEFAULT_CODEC_MAX_MEMORY_SIZE = 16 * 1024 * 1024;

    //vpc-...
    @Value("${opensearch.host}")
    private String host;

    //443
    @Value("${opensearch.port}")
    private int port;

    //https
    @Value("${opensearch.scheme}")
    private String scheme;

    @Value("${opensearch.user}")
    private String user;

    @Value("${opensearch.pass}")
    private String pass;

    @Bean
    public RestHighLevelClient restHighLevelClient() {

        String httpHostAddress = scheme + "://" + host + ":" + port;
        RestClientBuilder restClientBuilder = RestClient.builder(HttpHost.create(httpHostAddress));

        if (user != null && pass != null && !user.isEmpty() && !pass.isEmpty()) {
            final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
            credentialsProvider.setCredentials(
                    AuthScope.ANY,
                    new UsernamePasswordCredentials(user, pass)
            );
            restClientBuilder.setHttpClientConfigCallback(httpClientBuilder ->
                    httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
        }

        return new RestHighLevelClient(restClientBuilder);
    }

    @Bean
    public ReactiveElasticsearchClient reactiveOpensearchClient() {

        String httpHostAddress = host + ":" + port;

        ClientConfiguration.MaybeSecureClientConfigurationBuilder clientConfigurationBuilder =
                ClientConfiguration.builder()
                        .connectedTo(httpHostAddress);

        if (user != null && pass != null && !user.isEmpty() && !pass.isEmpty()) {
            clientConfigurationBuilder = (ClientConfiguration.MaybeSecureClientConfigurationBuilder)
                    clientConfigurationBuilder.usingSsl().withBasicAuth(user, pass);
        }

        return ReactiveRestClients.create(clientConfigurationBuilder.withWebClientConfigurer(webClient -> {
            ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder()
                    .codecs(configurer -> configurer.defaultCodecs()
                            .maxInMemorySize(DEFAULT_CODEC_MAX_MEMORY_SIZE))
                    .build();
            return webClient.mutate().exchangeStrategies(exchangeStrategies).build();
        }).build());
    }


    @Bean
    public ElasticsearchConverter elasticsearchConverter() {
        return new MappingElasticsearchConverter(elasticsearchMappingContext());
    }

    @Bean
    public SimpleElasticsearchMappingContext elasticsearchMappingContext() {
        return new SimpleElasticsearchMappingContext();
    }

    @Bean
    public ReactiveElasticsearchOperations reactiveElasticsearchOperations() {
        return new ReactiveElasticsearchTemplate(reactiveOpensearchClient(), elasticsearchConverter());
    }
}

ReactiveElasticsearchOperations should be used in repos

@cunhap
Copy link

cunhap commented Oct 4, 2023

Would really appreciate if this issue could be worked on! It would help us a lot!

@dblock
Copy link
Member

dblock commented Oct 13, 2023

@cunhap care to help?

@BarboraCigankova
Copy link

Any progress with the issue?

@dblock
Copy link
Member

dblock commented Jan 17, 2024

@BarboraCigankova @cunhap I don't think anyone is working on this.

@kalyashov
Copy link

Has there been any activity on this issue?

@reta
Copy link
Collaborator

reta commented Apr 16, 2024

@kalyashov I don't think anyone is working on this.

@reta
Copy link
Collaborator

reta commented May 8, 2024

Duplicate of #23

@reta reta marked this as a duplicate of #23 May 8, 2024
@reta reta closed this as completed May 8, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

9 participants