Skip to content

Commit

Permalink
[Issue-154] Update connector version to 0.13.0-SNAPSHOT (#155)
Browse files Browse the repository at this point in the history
Signed-off-by: Fan, Yang <[email protected]>
  • Loading branch information
fyang86 authored and crazyzhou committed Oct 20, 2022
1 parent 760336e commit 9fb66ea
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 11 deletions.
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ that use Pravega as the stream storage and message bus, and Apache Spark for com

| Spark Version | Pravega Version | Java Version To Build Connector | Java Version To Run Connector | Git Branch |
|---------------|-----------------|---------------------------------|-------------------------------|-----------------------------------------------------------------------------------|
| 3.1 | 0.12 | Java 11 | Java 8 or 11 | [master](https://github.com/pravega/spark-connectors) |
| 3.1 | 0.11 | Java 11 | Java 8 or 11 | [r0.11](https://github.com/pravega/spark-connectors/tree/r0.11) |
| 3.1+ | 0.13 | Java 11 | Java 8 or 11 | [master](https://github.com/pravega/spark-connectors) |
| 2.4 | 0.13 | Java 8 | Java 8 | [r0.13-spark2.4](https://github.com/pravega/spark-connectors/tree/r0.13-spark2.4) |
| 3.1+ | 0.12 | Java 11 | Java 8 or 11 | [r0.12](https://github.com/pravega/spark-connectors/tree/r0.12) |
| 2.4 | 0.12 | Java 8 | Java 8 | [r0.12-spark2.4](https://github.com/pravega/spark-connectors/tree/r0.12-spark2.4) |
| 3.1+ | 0.11 | Java 11 | Java 8 or 11 | [r0.11](https://github.com/pravega/spark-connectors/tree/r0.11) |
| 2.4 | 0.11 | Java 8 | Java 8 | [r0.11-spark2.4](https://github.com/pravega/spark-connectors/tree/r0.11-spark2.4) |
| 3.1 | 0.10 | Java 11 | Java 8 or 11 | [r0.10](https://github.com/pravega/spark-connectors/tree/r0.10) |
| 3.0 | 0.10 | Java 11 | Java 8 or 11 | [r0.10-spark3.0](https://github.com/pravega/spark-connectors/tree/r0.10-spark3.0) |
| 2.4 | 0.10 | Java 8 | Java 8 | [r0.10-spark2.4](https://github.com/pravega/spark-connectors/tree/r0.10-spark2.4) |

## Limitations

Expand Down
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ slf4jApiVersion=1.7.25
sparkVersion=2.4.7

# Version and base tags can be overridden at build time.
connectorVersion=0.12.0-SNAPSHOT
pravegaVersion=0.12.0-3099.d57bc8b-SNAPSHOT
connectorVersion=0.13.0-SNAPSHOT
pravegaVersion=0.13.0-3142.2542966-SNAPSHOT

# flag to indicate if Pravega sub-module should be used instead of the version defined in 'pravegaVersion'
usePravegaVersionSubModule=false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class PravegaMicroBatchReader(scopeName: String,
// Resolve start and end stream cuts now.
// We must ensure that getStartOffset and getEndOffset return specific stream cuts, even
// if the caller passes "earliest" or "latest".
private lazy val initialStreamInfo = streamManager.getStreamInfo(scopeName, streamName)
private lazy val initialStreamInfo = PravegaUtils.getStreamInfo(streamManager, scopeName, streamName)
private val resolvedStartStreamCut = startStreamCut match {
case EarliestStreamCut | UnboundedStreamCut => initialStreamInfo.getHeadStreamCut
case LatestStreamCut => initialStreamInfo.getTailStreamCut
Expand Down Expand Up @@ -66,7 +66,7 @@ class PravegaMicroBatchReader(scopeName: String,
override def setOffsetRange(start: ju.Optional[Offset], end: ju.Optional[Offset]): Unit = {
log.info(s"setOffsetRange(${start},${end})")

lazy val streamInfo = streamManager.getStreamInfo(scopeName, streamName)
lazy val streamInfo = PravegaUtils.getStreamInfo(streamManager, scopeName, streamName)

// The batch will start at the first available stream cut: start, resolvedStartStreamCut
batchStartStreamCut = Option(start.orElse(null))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ class PravegaSourceProvider extends DataSourceV2
(for (streamManager <- managed(StreamManager.create(clientConfig))) yield {
metadataTableName match {
case MetadataTableName.StreamInfo => {
val streamInfo = streamManager.getStreamInfo(scopeName, streamName)
val streamInfo = PravegaUtils.getStreamInfo(streamManager, scopeName, streamName)
val schema = StructType(Seq(
StructField("head_stream_cut", StringType),
StructField("tail_stream_cut", StringType)
Expand Down
18 changes: 18 additions & 0 deletions src/main/scala/io/pravega/connectors/spark/PravegaUtils.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/**
* Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
package io.pravega.connectors.spark

import io.pravega.client.admin.{StreamInfo, StreamManager}

private object PravegaUtils {
def getStreamInfo(streamManager: StreamManager, scopeName: String, streamName: String): StreamInfo = {
streamManager.fetchStreamInfo(scopeName, streamName).join()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class PravegaTestUtils extends Logging {
def getStreamInfo(streamName: String): StreamInfo = {
val streamManager = StreamManager.create(SETUP_UTILS.getControllerUri)
try {
streamManager.getStreamInfo(scope, streamName)
PravegaUtils.getStreamInfo(streamManager, scope, streamName)
} finally {
streamManager.close()
}
Expand Down

0 comments on commit 9fb66ea

Please sign in to comment.