-
Notifications
You must be signed in to change notification settings - Fork 141
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Browse files
Browse the repository at this point in the history
* add InteractiveSession and SessionManager * address comments --------- (cherry picked from commit f856cb3) Signed-off-by: Peng Huo <[email protected]> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> (cherry picked from commit 79031a4) Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
- Loading branch information
1 parent
b729164
commit 729594f
Showing
15 changed files
with
834 additions
and
5 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
15 changes: 15 additions & 0 deletions
15
spark/src/main/java/org/opensearch/sql/spark/execution/session/CreateSessionRequest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.sql.spark.execution.session; | ||
|
||
import lombok.Data; | ||
import org.opensearch.sql.spark.client.StartJobRequest; | ||
|
||
@Data | ||
public class CreateSessionRequest { | ||
private final StartJobRequest startJobRequest; | ||
private final String datasourceName; | ||
} |
61 changes: 61 additions & 0 deletions
61
spark/src/main/java/org/opensearch/sql/spark/execution/session/InteractiveSession.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.sql.spark.execution.session; | ||
|
||
import static org.opensearch.sql.spark.execution.session.SessionModel.initInteractiveSession; | ||
|
||
import java.util.Optional; | ||
import lombok.Builder; | ||
import lombok.Getter; | ||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.opensearch.index.engine.VersionConflictEngineException; | ||
import org.opensearch.sql.spark.client.EMRServerlessClient; | ||
import org.opensearch.sql.spark.execution.statestore.SessionStateStore; | ||
|
||
/** | ||
* Interactive session. | ||
* | ||
* <p>ENTRY_STATE: not_started | ||
*/ | ||
@Getter | ||
@Builder | ||
public class InteractiveSession implements Session { | ||
private static final Logger LOG = LogManager.getLogger(); | ||
|
||
private final SessionId sessionId; | ||
private final SessionStateStore sessionStateStore; | ||
private final EMRServerlessClient serverlessClient; | ||
|
||
private SessionModel sessionModel; | ||
|
||
@Override | ||
public void open(CreateSessionRequest createSessionRequest) { | ||
try { | ||
String jobID = serverlessClient.startJobRun(createSessionRequest.getStartJobRequest()); | ||
String applicationId = createSessionRequest.getStartJobRequest().getApplicationId(); | ||
|
||
sessionModel = | ||
initInteractiveSession( | ||
applicationId, jobID, sessionId, createSessionRequest.getDatasourceName()); | ||
sessionStateStore.create(sessionModel); | ||
} catch (VersionConflictEngineException e) { | ||
String errorMsg = "session already exist. " + sessionId; | ||
LOG.error(errorMsg); | ||
throw new IllegalStateException(errorMsg); | ||
} | ||
} | ||
|
||
@Override | ||
public void close() { | ||
Optional<SessionModel> model = sessionStateStore.get(sessionModel.getSessionId()); | ||
if (model.isEmpty()) { | ||
throw new IllegalStateException("session not exist. " + sessionModel.getSessionId()); | ||
} else { | ||
serverlessClient.cancelJobRun(sessionModel.getApplicationId(), sessionModel.getJobId()); | ||
} | ||
} | ||
} |
19 changes: 19 additions & 0 deletions
19
spark/src/main/java/org/opensearch/sql/spark/execution/session/Session.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.sql.spark.execution.session; | ||
|
||
/** Session define the statement execution context. Each session is binding to one Spark Job. */ | ||
public interface Session { | ||
/** open session. */ | ||
void open(CreateSessionRequest createSessionRequest); | ||
|
||
/** close session. */ | ||
void close(); | ||
|
||
SessionModel getSessionModel(); | ||
|
||
SessionId getSessionId(); | ||
} |
23 changes: 23 additions & 0 deletions
23
spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionId.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.sql.spark.execution.session; | ||
|
||
import lombok.Data; | ||
import org.apache.commons.lang3.RandomStringUtils; | ||
|
||
@Data | ||
public class SessionId { | ||
private final String sessionId; | ||
|
||
public static SessionId newSessionId() { | ||
return new SessionId(RandomStringUtils.random(10, true, true)); | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return "sessionId=" + sessionId; | ||
} | ||
} |
50 changes: 50 additions & 0 deletions
50
spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionManager.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.sql.spark.execution.session; | ||
|
||
import static org.opensearch.sql.spark.execution.session.SessionId.newSessionId; | ||
|
||
import java.util.Optional; | ||
import lombok.RequiredArgsConstructor; | ||
import org.opensearch.sql.spark.client.EMRServerlessClient; | ||
import org.opensearch.sql.spark.execution.statestore.SessionStateStore; | ||
|
||
/** | ||
* Singleton Class | ||
* | ||
* <p>todo. add Session cache and Session sweeper. | ||
*/ | ||
@RequiredArgsConstructor | ||
public class SessionManager { | ||
private final SessionStateStore stateStore; | ||
private final EMRServerlessClient emrServerlessClient; | ||
|
||
public Session createSession(CreateSessionRequest request) { | ||
InteractiveSession session = | ||
InteractiveSession.builder() | ||
.sessionId(newSessionId()) | ||
.sessionStateStore(stateStore) | ||
.serverlessClient(emrServerlessClient) | ||
.build(); | ||
session.open(request); | ||
return session; | ||
} | ||
|
||
public Optional<Session> getSession(SessionId sid) { | ||
Optional<SessionModel> model = stateStore.get(sid); | ||
if (model.isPresent()) { | ||
InteractiveSession session = | ||
InteractiveSession.builder() | ||
.sessionId(sid) | ||
.sessionStateStore(stateStore) | ||
.serverlessClient(emrServerlessClient) | ||
.sessionModel(model.get()) | ||
.build(); | ||
return Optional.ofNullable(session); | ||
} | ||
return Optional.empty(); | ||
} | ||
} |
143 changes: 143 additions & 0 deletions
143
spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionModel.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,143 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.sql.spark.execution.session; | ||
|
||
import static org.opensearch.sql.spark.execution.session.SessionState.NOT_STARTED; | ||
import static org.opensearch.sql.spark.execution.session.SessionType.INTERACTIVE; | ||
|
||
import java.io.IOException; | ||
import lombok.Builder; | ||
import lombok.Data; | ||
import lombok.SneakyThrows; | ||
import org.opensearch.core.xcontent.ToXContentObject; | ||
import org.opensearch.core.xcontent.XContentBuilder; | ||
import org.opensearch.core.xcontent.XContentParser; | ||
import org.opensearch.core.xcontent.XContentParserUtils; | ||
import org.opensearch.index.seqno.SequenceNumbers; | ||
|
||
/** Session data in flint.ql.sessions index. */ | ||
@Data | ||
@Builder | ||
public class SessionModel implements ToXContentObject { | ||
public static final String VERSION = "version"; | ||
public static final String TYPE = "type"; | ||
public static final String SESSION_TYPE = "sessionType"; | ||
public static final String SESSION_ID = "sessionId"; | ||
public static final String SESSION_STATE = "state"; | ||
public static final String DATASOURCE_NAME = "dataSourceName"; | ||
public static final String LAST_UPDATE_TIME = "lastUpdateTime"; | ||
public static final String APPLICATION_ID = "applicationId"; | ||
public static final String JOB_ID = "jobId"; | ||
public static final String ERROR = "error"; | ||
public static final String UNKNOWN = "unknown"; | ||
public static final String SESSION_DOC_TYPE = "session"; | ||
|
||
private final String version; | ||
private final SessionType sessionType; | ||
private final SessionId sessionId; | ||
private final SessionState sessionState; | ||
private final String applicationId; | ||
private final String jobId; | ||
private final String datasourceName; | ||
private final String error; | ||
private final long lastUpdateTime; | ||
|
||
private final long seqNo; | ||
private final long primaryTerm; | ||
|
||
@Override | ||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { | ||
builder | ||
.startObject() | ||
.field(VERSION, version) | ||
.field(TYPE, SESSION_DOC_TYPE) | ||
.field(SESSION_TYPE, sessionType.getSessionType()) | ||
.field(SESSION_ID, sessionId.getSessionId()) | ||
.field(SESSION_STATE, sessionState.getSessionState()) | ||
.field(DATASOURCE_NAME, datasourceName) | ||
.field(APPLICATION_ID, applicationId) | ||
.field(JOB_ID, jobId) | ||
.field(LAST_UPDATE_TIME, lastUpdateTime) | ||
.field(ERROR, error) | ||
.endObject(); | ||
return builder; | ||
} | ||
|
||
public static SessionModel of(SessionModel copy, long seqNo, long primaryTerm) { | ||
return builder() | ||
.version(copy.version) | ||
.sessionType(copy.sessionType) | ||
.sessionId(new SessionId(copy.sessionId.getSessionId())) | ||
.sessionState(copy.sessionState) | ||
.datasourceName(copy.datasourceName) | ||
.seqNo(seqNo) | ||
.primaryTerm(primaryTerm) | ||
.build(); | ||
} | ||
|
||
@SneakyThrows | ||
public static SessionModel fromXContent(XContentParser parser, long seqNo, long primaryTerm) { | ||
SessionModelBuilder builder = new SessionModelBuilder(); | ||
XContentParserUtils.ensureExpectedToken( | ||
XContentParser.Token.START_OBJECT, parser.currentToken(), parser); | ||
while (!XContentParser.Token.END_OBJECT.equals(parser.nextToken())) { | ||
String fieldName = parser.currentName(); | ||
parser.nextToken(); | ||
switch (fieldName) { | ||
case VERSION: | ||
builder.version(parser.text()); | ||
break; | ||
case SESSION_TYPE: | ||
builder.sessionType(SessionType.fromString(parser.text())); | ||
break; | ||
case SESSION_ID: | ||
builder.sessionId(new SessionId(parser.text())); | ||
break; | ||
case SESSION_STATE: | ||
builder.sessionState(SessionState.fromString(parser.text())); | ||
break; | ||
case DATASOURCE_NAME: | ||
builder.datasourceName(parser.text()); | ||
break; | ||
case ERROR: | ||
builder.error(parser.text()); | ||
break; | ||
case APPLICATION_ID: | ||
builder.applicationId(parser.text()); | ||
break; | ||
case JOB_ID: | ||
builder.jobId(parser.text()); | ||
break; | ||
case LAST_UPDATE_TIME: | ||
builder.lastUpdateTime(parser.longValue()); | ||
break; | ||
case TYPE: | ||
// do nothing. | ||
break; | ||
} | ||
} | ||
builder.seqNo(seqNo); | ||
builder.primaryTerm(primaryTerm); | ||
return builder.build(); | ||
} | ||
|
||
public static SessionModel initInteractiveSession( | ||
String applicationId, String jobId, SessionId sid, String datasourceName) { | ||
return builder() | ||
.version("1.0") | ||
.sessionType(INTERACTIVE) | ||
.sessionId(sid) | ||
.sessionState(NOT_STARTED) | ||
.datasourceName(datasourceName) | ||
.applicationId(applicationId) | ||
.jobId(jobId) | ||
.error(UNKNOWN) | ||
.lastUpdateTime(System.currentTimeMillis()) | ||
.seqNo(SequenceNumbers.UNASSIGNED_SEQ_NO) | ||
.primaryTerm(SequenceNumbers.UNASSIGNED_PRIMARY_TERM) | ||
.build(); | ||
} | ||
} |
Oops, something went wrong.