-
Notifications
You must be signed in to change notification settings - Fork 141
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add InteractiveSession and SessionManager #2290
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.sql.spark.execution.session; | ||
|
||
import lombok.Data; | ||
import org.opensearch.sql.spark.client.StartJobRequest; | ||
|
||
@Data | ||
public class CreateSessionRequest { | ||
private final StartJobRequest startJobRequest; | ||
private final String datasourceName; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.sql.spark.execution.session; | ||
|
||
import static org.opensearch.sql.spark.execution.session.SessionModel.initInteractiveSession; | ||
|
||
import java.util.Optional; | ||
import lombok.Builder; | ||
import lombok.Getter; | ||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.opensearch.index.engine.VersionConflictEngineException; | ||
import org.opensearch.sql.spark.client.EMRServerlessClient; | ||
import org.opensearch.sql.spark.execution.statestore.SessionStateStore; | ||
|
||
/** | ||
* Interactive session. | ||
* | ||
* <p>ENTRY_STATE: not_started | ||
*/ | ||
@Getter | ||
@Builder | ||
public class InteractiveSession implements Session { | ||
private static final Logger LOG = LogManager.getLogger(); | ||
|
||
private final SessionId sessionId; | ||
private final SessionStateStore sessionStateStore; | ||
private final EMRServerlessClient serverlessClient; | ||
private final CreateSessionRequest createSessionRequest; | ||
|
||
private SessionModel sessionModel; | ||
|
||
@Override | ||
public void open() { | ||
try { | ||
String jobID = serverlessClient.startJobRun(createSessionRequest.getStartJobRequest()); | ||
String applicationId = createSessionRequest.getStartJobRequest().getApplicationId(); | ||
|
||
sessionModel = | ||
initInteractiveSession( | ||
applicationId, jobID, sessionId, createSessionRequest.getDatasourceName()); | ||
sessionStateStore.create(sessionModel); | ||
} catch (VersionConflictEngineException e) { | ||
String errorMsg = "session already exist. " + sessionId; | ||
LOG.error(errorMsg); | ||
throw new IllegalStateException(errorMsg); | ||
} | ||
} | ||
|
||
@Override | ||
public void close() { | ||
Optional<SessionModel> model = sessionStateStore.get(sessionModel.getSessionId()); | ||
if (model.isEmpty()) { | ||
throw new IllegalStateException("session not exist. " + sessionModel.getSessionId()); | ||
} else { | ||
serverlessClient.cancelJobRun(sessionModel.getApplicationId(), sessionModel.getJobId()); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.sql.spark.execution.session; | ||
|
||
/** Session define the statement execution context. Each session is binding to one Spark Job. */ | ||
public interface Session { | ||
/** open session. */ | ||
void open(); | ||
|
||
/** close session. */ | ||
void close(); | ||
|
||
SessionModel getSessionModel(); | ||
|
||
SessionId getSessionId(); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.sql.spark.execution.session; | ||
|
||
import lombok.Data; | ||
import org.apache.commons.lang3.RandomStringUtils; | ||
|
||
@Data | ||
public class SessionId { | ||
private final String sessionId; | ||
|
||
public static SessionId newSessionId() { | ||
return new SessionId(RandomStringUtils.random(10, true, true)); | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return "sessionId=" + sessionId; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.sql.spark.execution.session; | ||
|
||
import static org.opensearch.sql.spark.execution.session.SessionId.newSessionId; | ||
|
||
import java.util.Optional; | ||
import lombok.RequiredArgsConstructor; | ||
import org.opensearch.sql.spark.client.EMRServerlessClient; | ||
import org.opensearch.sql.spark.execution.statestore.SessionStateStore; | ||
|
||
/** | ||
* Singleton Class | ||
* | ||
* <p>todo. add Session cache and Session sweeper. | ||
*/ | ||
@RequiredArgsConstructor | ||
public class SessionManager { | ||
private final SessionStateStore stateStore; | ||
private final EMRServerlessClient emrServerlessClient; | ||
|
||
public Session createSession(CreateSessionRequest request) { | ||
InteractiveSession session = | ||
InteractiveSession.builder() | ||
.sessionId(newSessionId()) | ||
.sessionStateStore(stateStore) | ||
.serverlessClient(emrServerlessClient) | ||
.createSessionRequest(request) | ||
.build(); | ||
session.open(); | ||
return session; | ||
} | ||
|
||
public Optional<Session> getSession(SessionId sid) { | ||
Optional<SessionModel> model = stateStore.get(sid); | ||
if (model.isPresent()) { | ||
InteractiveSession session = | ||
InteractiveSession.builder() | ||
.sessionId(sid) | ||
.sessionStateStore(stateStore) | ||
.serverlessClient(emrServerlessClient) | ||
.sessionModel(model.get()) | ||
.build(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it ok that the session's createSessionRequest is not set? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done. make sense createSessionRequest is open() method parameters. |
||
return Optional.ofNullable(session); | ||
} | ||
return Optional.empty(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,143 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.sql.spark.execution.session; | ||
|
||
import static org.opensearch.sql.spark.execution.session.SessionState.NOT_STARTED; | ||
import static org.opensearch.sql.spark.execution.session.SessionType.INTERACTIVE; | ||
|
||
import java.io.IOException; | ||
import lombok.Builder; | ||
import lombok.Data; | ||
import lombok.SneakyThrows; | ||
import org.opensearch.core.xcontent.ToXContentObject; | ||
import org.opensearch.core.xcontent.XContentBuilder; | ||
import org.opensearch.core.xcontent.XContentParser; | ||
import org.opensearch.core.xcontent.XContentParserUtils; | ||
import org.opensearch.index.seqno.SequenceNumbers; | ||
|
||
/** Session data in flint.ql.sessions index. */ | ||
@Data | ||
@Builder | ||
public class SessionModel implements ToXContentObject { | ||
public static final String VERSION = "version"; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the usecase for version here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In case schema upgrade. |
||
public static final String TYPE = "type"; | ||
public static final String SESSION_TYPE = "sessionType"; | ||
public static final String SESSION_ID = "sessionId"; | ||
public static final String SESSION_STATE = "state"; | ||
public static final String DATASOURCE_NAME = "dataSourceName"; | ||
public static final String LAST_UPDATE_TIME = "lastUpdateTime"; | ||
public static final String APPLICATION_ID = "applicationId"; | ||
public static final String JOB_ID = "jobId"; | ||
public static final String ERROR = "error"; | ||
public static final String UNKNOWN = "unknown"; | ||
public static final String SESSION_DOC_TYPE = "session"; | ||
|
||
private final String version; | ||
private final SessionType sessionType; | ||
private final SessionId sessionId; | ||
private final SessionState sessionState; | ||
private final String applicationId; | ||
private final String jobId; | ||
private final String datasourceName; | ||
private final String error; | ||
private final long lastUpdateTime; | ||
|
||
private final long seqNo; | ||
private final long primaryTerm; | ||
|
||
@Override | ||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { | ||
builder | ||
.startObject() | ||
.field(VERSION, version) | ||
.field(TYPE, SESSION_DOC_TYPE) | ||
.field(SESSION_TYPE, sessionType.getSessionType()) | ||
.field(SESSION_ID, sessionId.getSessionId()) | ||
.field(SESSION_STATE, sessionState.getSessionState()) | ||
.field(DATASOURCE_NAME, datasourceName) | ||
.field(APPLICATION_ID, applicationId) | ||
.field(JOB_ID, jobId) | ||
.field(LAST_UPDATE_TIME, lastUpdateTime) | ||
.field(ERROR, error) | ||
.endObject(); | ||
return builder; | ||
} | ||
|
||
public static SessionModel of(SessionModel copy, long seqNo, long primaryTerm) { | ||
return builder() | ||
.version(copy.version) | ||
.sessionType(copy.sessionType) | ||
.sessionId(new SessionId(copy.sessionId.getSessionId())) | ||
.sessionState(copy.sessionState) | ||
.datasourceName(copy.datasourceName) | ||
.seqNo(seqNo) | ||
.primaryTerm(primaryTerm) | ||
.build(); | ||
} | ||
|
||
@SneakyThrows | ||
public static SessionModel fromXContent(XContentParser parser, long seqNo, long primaryTerm) { | ||
SessionModelBuilder builder = new SessionModelBuilder(); | ||
XContentParserUtils.ensureExpectedToken( | ||
XContentParser.Token.START_OBJECT, parser.currentToken(), parser); | ||
while (!XContentParser.Token.END_OBJECT.equals(parser.nextToken())) { | ||
String fieldName = parser.currentName(); | ||
parser.nextToken(); | ||
switch (fieldName) { | ||
case VERSION: | ||
builder.version(parser.text()); | ||
break; | ||
case SESSION_TYPE: | ||
builder.sessionType(SessionType.fromString(parser.text())); | ||
break; | ||
case SESSION_ID: | ||
builder.sessionId(new SessionId(parser.text())); | ||
break; | ||
case SESSION_STATE: | ||
builder.sessionState(SessionState.fromString(parser.text())); | ||
break; | ||
case DATASOURCE_NAME: | ||
builder.datasourceName(parser.text()); | ||
break; | ||
case ERROR: | ||
builder.error(parser.text()); | ||
break; | ||
case APPLICATION_ID: | ||
builder.applicationId(parser.text()); | ||
break; | ||
case JOB_ID: | ||
builder.jobId(parser.text()); | ||
break; | ||
case LAST_UPDATE_TIME: | ||
builder.lastUpdateTime(parser.longValue()); | ||
break; | ||
case TYPE: | ||
// do nothing. | ||
break; | ||
} | ||
} | ||
builder.seqNo(seqNo); | ||
builder.primaryTerm(primaryTerm); | ||
return builder.build(); | ||
} | ||
|
||
public static SessionModel initInteractiveSession( | ||
String applicationId, String jobId, SessionId sid, String datasourceName) { | ||
return builder() | ||
.version("1.0") | ||
.sessionType(INTERACTIVE) | ||
.sessionId(sid) | ||
.sessionState(NOT_STARTED) | ||
.datasourceName(datasourceName) | ||
.applicationId(applicationId) | ||
.jobId(jobId) | ||
.error(UNKNOWN) | ||
.lastUpdateTime(System.currentTimeMillis()) | ||
.seqNo(SequenceNumbers.UNASSIGNED_SEQ_NO) | ||
.primaryTerm(SequenceNumbers.UNASSIGNED_PRIMARY_TERM) | ||
dai-chen marked this conversation as resolved.
Show resolved
Hide resolved
|
||
.build(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we require datasourceName in asyncQuery Request model?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
datasourceName is write to Session doc in flint.ql.sessions index. it is required for user perform doc level access control.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am actually asking for this api _plugins/_async_query. I think we need it.
Should we perform datasource authorization in both the calls?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.