Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a cloud storage for delta sharing #178

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,37 @@ export GOOGLE_APPLICATION_CREDENTIALS="KEY_PATH"

Replace `KEY_PATH` with path of the JSON file that contains your service account key.

### Aliyun Object Storage Service

The server is using `hadoop-aliyun` to read Aliyun OSS, find other approaches in [hadoop-aliyun doc](https://hadoop.apache.org/docs/current/hadoop-aliyun/tools/hadoop-aliyun/index.html). You can create a Hadoop configuration file named `core-site.xml` and add it to the server's `conf` directory. Then add the following content to the xml file:

```xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>fs.oss.impl</name>
<value>org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem</value>
</property>
<property>
<name>fs.oss.accessKeyId</name>
<value>YOUR-ACCESS-KEY</value>
</property>
<property>
<name>fs.oss.accessKeySecret</name>
<value>YOUR-ACCESS-KEY-SECRET</value>
</property>
<property>
<name>fs.oss.endpoint</name>
<value>YOUR-ENDPOINT</value>
<description>Aliyun OSS endpoint to connect to. An up-to-date list is provided
in the Aliyun OSS Documentation. Example: http://oss-cn-beijing.aliyuncs.com
</description>
</property>
</configuration>
```
`YOUR-ACCESS-KEY` is your Aliyun access key ID and `YOUR-ACCESS-KEY-SECRET` is Aliyun access key secre.

## Authorization

The server supports a basic authorization with pre-configed bearer token. You can add the following config to your server yaml file:
Expand Down
5 changes: 5 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ lazy val server = (project in file("server")) enablePlugins(JavaAppPackaging) se
ExclusionRule("com.amazonaws", "aws-java-sdk-bundle")
),
"com.amazonaws" % "aws-java-sdk-bundle" % "1.12.189",
"org.apache.hadoop" % "hadoop-aliyun" % "2.10.1" excludeAll(
ExclusionRule("com.fasterxml.jackson.core"),
ExclusionRule("com.fasterxml.jackson.module"),
ExclusionRule("com.google.guava", "guava")
),
"org.apache.hadoop" % "hadoop-azure" % "2.10.1" excludeAll(
ExclusionRule("com.fasterxml.jackson.core"),
ExclusionRule("com.fasterxml.jackson.module"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import java.net.URI
import java.util.Date
import java.util.concurrent.TimeUnit.SECONDS

import com.aliyun.oss.ClientConfiguration
import com.aliyun.oss.OSSClient
import com.amazonaws.HttpMethod
import com.amazonaws.services.s3.model.GeneratePresignedUrlRequest
import com.google.cloud.hadoop.gcsio.StorageResourceId
Expand All @@ -31,6 +33,8 @@ import com.microsoft.azure.storage.{CloudStorageAccount, SharedAccessProtocols,
import com.microsoft.azure.storage.blob.{SharedAccessBlobPermissions, SharedAccessBlobPolicy}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.aliyun.oss.AliyunOSSUtils
import org.apache.hadoop.fs.aliyun.oss.Constants._
import org.apache.hadoop.fs.azure.{AzureNativeFileSystemStore, NativeAzureFileSystem}
import org.apache.hadoop.fs.azurebfs.{AzureBlobFileSystem, AzureBlobFileSystemStore}
import org.apache.hadoop.fs.azurebfs.services.AuthType
Expand Down Expand Up @@ -218,3 +222,28 @@ object GCSFileSigner {
(resourceId.getBucketName, resourceId.getObjectName)
}
}

class OSSFileSigner(
name: URI,
conf: Configuration,
preSignedUrlTimeoutSeconds: Long) extends CloudFileSigner {

private val clientConf = new ClientConfiguration()
clientConf.setMaxConnections(conf.getInt(MAXIMUM_CONNECTIONS_KEY, MAXIMUM_CONNECTIONS_DEFAULT))
private val endPoint: String = conf.getTrimmed(ENDPOINT_KEY, "")
private val provider = AliyunOSSUtils.getCredentialsProvider(name, conf)
private val ossClient = new OSSClient(endPoint, provider, clientConf)

override def sign(path: Path): String = {
val absPath = path.toUri
val bucketName = absPath.getHost
val objectKey = absPath.getPath.stripPrefix("/")
val expiration =
new Date(System.currentTimeMillis() + SECONDS.toMillis(preSignedUrlTimeoutSeconds))
assert(objectKey.nonEmpty, s"cannot get object key from $path")
val request = new com.aliyun.oss.model.GeneratePresignedUrlRequest(bucketName, objectKey)
request.setMethod(com.aliyun.oss.HttpMethod.GET)
request.setExpiration(expiration)
ossClient.generatePresignedUrl(request).toString
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@ import io.delta.standalone.DeltaLog
import io.delta.standalone.internal.actions.{AddCDCFile, AddFile, RemoveFile}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem
import org.apache.hadoop.fs.azure.NativeAzureFileSystem
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem
import org.apache.hadoop.fs.s3a.S3AFileSystem
import org.apache.spark.sql.types.{DataType, MetadataBuilder, StructType}
import scala.collection.mutable.ListBuffer

import io.delta.sharing.server.{model, AbfsFileSigner, GCSFileSigner, S3FileSigner, WasbFileSigner}
import io.delta.sharing.server.{model, AbfsFileSigner, GCSFileSigner, OSSFileSigner, S3FileSigner, WasbFileSigner}
import io.delta.sharing.server.config.{ServerConfig, TableConfig}

/**
Expand Down Expand Up @@ -93,6 +94,8 @@ class DeltaSharedTable(
AbfsFileSigner(abfs, deltaLog.dataPath.toUri, preSignedUrlTimeoutSeconds)
case gc: GoogleHadoopFileSystem =>
new GCSFileSigner(deltaLog.dataPath.toUri, conf, preSignedUrlTimeoutSeconds)
case oss: AliyunOSSFileSystem =>
new OSSFileSigner(deltaLog.dataPath.toUri, conf, preSignedUrlTimeoutSeconds)
case _ =>
throw new IllegalStateException(s"File system ${fs.getClass} is not supported")
}
Expand Down