Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add InteractiveSession and SessionManager #2290

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 34 additions & 5 deletions spark/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -52,22 +52,47 @@ dependencies {
api group: 'com.amazonaws', name: 'aws-java-sdk-emrserverless', version: '1.12.545'
implementation group: 'commons-io', name: 'commons-io', version: '2.8.0'

testImplementation('org.junit.jupiter:junit-jupiter:5.6.2')
testImplementation(platform("org.junit:junit-bom:5.6.2"))

testImplementation('org.junit.jupiter:junit-jupiter')
testImplementation group: 'org.mockito', name: 'mockito-core', version: '5.2.0'
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '5.2.0'
testImplementation 'junit:junit:4.13.1'
testImplementation "org.opensearch.test:framework:${opensearch_version}"

testCompileOnly('junit:junit:4.13.1') {
exclude group: 'org.hamcrest', module: 'hamcrest-core'
}
testRuntimeOnly("org.junit.vintage:junit-vintage-engine") {
exclude group: 'org.hamcrest', module: 'hamcrest-core'
}
testRuntimeOnly("org.junit.platform:junit-platform-launcher") {
because 'allows tests to run from IDEs that bundle older version of launcher'
}
testImplementation("org.opensearch.test:framework:${opensearch_version}")
}

test {
useJUnitPlatform()
useJUnitPlatform {
includeEngines("junit-jupiter")
}
testLogging {
events "failed"
exceptionFormat "full"
}
}
task junit4(type: Test) {
useJUnitPlatform {
includeEngines("junit-vintage")
}
systemProperty 'tests.security.manager', 'false'
testLogging {
events "failed"
exceptionFormat "full"
}
}

jacocoTestReport {
dependsOn test, junit4
executionData test, junit4
reports {
html.enabled true
xml.enabled true
Expand All @@ -78,9 +103,10 @@ jacocoTestReport {
}))
}
}
test.finalizedBy(project.tasks.jacocoTestReport)

jacocoTestCoverageVerification {
dependsOn test, junit4
executionData test, junit4
violationRules {
rule {
element = 'CLASS'
Expand All @@ -92,6 +118,9 @@ jacocoTestCoverageVerification {
'org.opensearch.sql.spark.asyncquery.exceptions.*',
'org.opensearch.sql.spark.dispatcher.model.*',
'org.opensearch.sql.spark.flint.FlintIndexType',
// ignore because XContext IOException
'org.opensearch.sql.spark.execution.statestore.SessionStateStore',
'org.opensearch.sql.spark.execution.session.SessionModel'
]
limit {
counter = 'LINE'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.execution.session;

import lombok.Data;
import org.opensearch.sql.spark.client.StartJobRequest;

@Data
public class CreateSessionRequest {
private final StartJobRequest startJobRequest;
private final String datasourceName;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we require datasourceName in asyncQuery Request model?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

datasourceName is write to Session doc in flint.ql.sessions index. it is required for user perform doc level access control.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am actually asking for this api _plugins/_async_query. I think we need it.
Should we perform datasource authorization in both the calls?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. for create session, _plugins/_async_query/sessions, require datasource, address it in separate PR.
  2. we need datasource authZ for both _plugins/_async_query/sessions and _plugins/_async_query API.

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.execution.session;

import static org.opensearch.sql.spark.execution.session.SessionModel.initInteractiveSession;

import java.util.Optional;
import lombok.Builder;
import lombok.Getter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.sql.spark.client.EMRServerlessClient;
import org.opensearch.sql.spark.execution.statestore.SessionStateStore;

/**
* Interactive session.
*
* <p>ENTRY_STATE: not_started
*/
@Getter
@Builder
public class InteractiveSession implements Session {
private static final Logger LOG = LogManager.getLogger();

private final SessionId sessionId;
private final SessionStateStore sessionStateStore;
private final EMRServerlessClient serverlessClient;

private SessionModel sessionModel;

@Override
public void open(CreateSessionRequest createSessionRequest) {
try {
String jobID = serverlessClient.startJobRun(createSessionRequest.getStartJobRequest());
String applicationId = createSessionRequest.getStartJobRequest().getApplicationId();

sessionModel =
initInteractiveSession(
applicationId, jobID, sessionId, createSessionRequest.getDatasourceName());
sessionStateStore.create(sessionModel);
} catch (VersionConflictEngineException e) {
String errorMsg = "session already exist. " + sessionId;
LOG.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
}

@Override
public void close() {
Optional<SessionModel> model = sessionStateStore.get(sessionModel.getSessionId());
if (model.isEmpty()) {
throw new IllegalStateException("session not exist. " + sessionModel.getSessionId());
} else {
serverlessClient.cancelJobRun(sessionModel.getApplicationId(), sessionModel.getJobId());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.execution.session;

/** Session define the statement execution context. Each session is binding to one Spark Job. */
public interface Session {
/** open session. */
void open(CreateSessionRequest createSessionRequest);

/** close session. */
void close();

SessionModel getSessionModel();

SessionId getSessionId();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.execution.session;

import lombok.Data;
import org.apache.commons.lang3.RandomStringUtils;

@Data
public class SessionId {
private final String sessionId;

public static SessionId newSessionId() {
return new SessionId(RandomStringUtils.random(10, true, true));
}

@Override
public String toString() {
return "sessionId=" + sessionId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.execution.session;

import static org.opensearch.sql.spark.execution.session.SessionId.newSessionId;

import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.spark.client.EMRServerlessClient;
import org.opensearch.sql.spark.execution.statestore.SessionStateStore;

/**
* Singleton Class
*
* <p>todo. add Session cache and Session sweeper.
*/
@RequiredArgsConstructor
public class SessionManager {
private final SessionStateStore stateStore;
private final EMRServerlessClient emrServerlessClient;

public Session createSession(CreateSessionRequest request) {
InteractiveSession session =
InteractiveSession.builder()
.sessionId(newSessionId())
.sessionStateStore(stateStore)
.serverlessClient(emrServerlessClient)
.build();
session.open(request);
return session;
}

public Optional<Session> getSession(SessionId sid) {
Optional<SessionModel> model = stateStore.get(sid);
if (model.isPresent()) {
InteractiveSession session =
InteractiveSession.builder()
.sessionId(sid)
.sessionStateStore(stateStore)
.serverlessClient(emrServerlessClient)
.sessionModel(model.get())
.build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it ok that the session's createSessionRequest is not set?

Copy link
Collaborator Author

@penghuo penghuo Oct 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return Optional.ofNullable(session);
}
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.execution.session;

import static org.opensearch.sql.spark.execution.session.SessionState.NOT_STARTED;
import static org.opensearch.sql.spark.execution.session.SessionType.INTERACTIVE;

import java.io.IOException;
import lombok.Builder;
import lombok.Data;
import lombok.SneakyThrows;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.core.xcontent.XContentParserUtils;
import org.opensearch.index.seqno.SequenceNumbers;

/** Session data in flint.ql.sessions index. */
@Data
@Builder
public class SessionModel implements ToXContentObject {
public static final String VERSION = "version";
Copy link
Member

@vmmusings vmmusings Oct 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the usecase for version here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case schema upgrade.

public static final String TYPE = "type";
public static final String SESSION_TYPE = "sessionType";
public static final String SESSION_ID = "sessionId";
public static final String SESSION_STATE = "state";
public static final String DATASOURCE_NAME = "dataSourceName";
public static final String LAST_UPDATE_TIME = "lastUpdateTime";
public static final String APPLICATION_ID = "applicationId";
public static final String JOB_ID = "jobId";
public static final String ERROR = "error";
public static final String UNKNOWN = "unknown";
public static final String SESSION_DOC_TYPE = "session";

private final String version;
private final SessionType sessionType;
private final SessionId sessionId;
private final SessionState sessionState;
private final String applicationId;
private final String jobId;
private final String datasourceName;
private final String error;
private final long lastUpdateTime;

private final long seqNo;
private final long primaryTerm;

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder
.startObject()
.field(VERSION, version)
.field(TYPE, SESSION_DOC_TYPE)
.field(SESSION_TYPE, sessionType.getSessionType())
.field(SESSION_ID, sessionId.getSessionId())
.field(SESSION_STATE, sessionState.getSessionState())
.field(DATASOURCE_NAME, datasourceName)
.field(APPLICATION_ID, applicationId)
.field(JOB_ID, jobId)
.field(LAST_UPDATE_TIME, lastUpdateTime)
.field(ERROR, error)
.endObject();
return builder;
}

public static SessionModel of(SessionModel copy, long seqNo, long primaryTerm) {
return builder()
.version(copy.version)
.sessionType(copy.sessionType)
.sessionId(new SessionId(copy.sessionId.getSessionId()))
.sessionState(copy.sessionState)
.datasourceName(copy.datasourceName)
.seqNo(seqNo)
.primaryTerm(primaryTerm)
.build();
}

@SneakyThrows

Check warning on line 81 in spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionModel.java

View check run for this annotation

Codecov / codecov/patch

spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionModel.java#L81

Added line #L81 was not covered by tests
public static SessionModel fromXContent(XContentParser parser, long seqNo, long primaryTerm) {
SessionModelBuilder builder = new SessionModelBuilder();
XContentParserUtils.ensureExpectedToken(
XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (!XContentParser.Token.END_OBJECT.equals(parser.nextToken())) {
String fieldName = parser.currentName();
parser.nextToken();
switch (fieldName) {
case VERSION:
builder.version(parser.text());
break;
case SESSION_TYPE:
builder.sessionType(SessionType.fromString(parser.text()));
break;
case SESSION_ID:
builder.sessionId(new SessionId(parser.text()));
break;
case SESSION_STATE:
builder.sessionState(SessionState.fromString(parser.text()));
break;
case DATASOURCE_NAME:
builder.datasourceName(parser.text());
break;
case ERROR:
builder.error(parser.text());
break;
case APPLICATION_ID:
builder.applicationId(parser.text());
break;
case JOB_ID:
builder.jobId(parser.text());
break;
case LAST_UPDATE_TIME:
builder.lastUpdateTime(parser.longValue());
break;
case TYPE:
// do nothing.
break;
}
}
builder.seqNo(seqNo);
builder.primaryTerm(primaryTerm);
return builder.build();
}

public static SessionModel initInteractiveSession(
String applicationId, String jobId, SessionId sid, String datasourceName) {
return builder()
.version("1.0")
.sessionType(INTERACTIVE)
.sessionId(sid)
.sessionState(NOT_STARTED)
.datasourceName(datasourceName)
.applicationId(applicationId)
.jobId(jobId)
.error(UNKNOWN)
.lastUpdateTime(System.currentTimeMillis())
.seqNo(SequenceNumbers.UNASSIGNED_SEQ_NO)
.primaryTerm(SequenceNumbers.UNASSIGNED_PRIMARY_TERM)
dai-chen marked this conversation as resolved.
Show resolved Hide resolved
.build();
}
}
Loading
Loading