-
Notifications
You must be signed in to change notification settings - Fork 178
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add s3 storage artifact route and ui integration of it
chore: Update artifact URL generation logic chore: Update artifact URL generation logic chore: Refactor storage route to handle S3 endpoint disablement chore: Update artifact URL generation logic
- Loading branch information
1 parent
b33db83
commit 09d0651
Showing
34 changed files
with
1,103 additions
and
75 deletions.
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
import { FastifyInstance, FastifyReply } from 'fastify'; | ||
import { getObjectSize, getObjectStream, setupMinioClient } from './storageUtils'; | ||
import { getDashboardConfig } from '../../../utils/resourceUtils'; | ||
import { getDirectCallOptions } from '../../../utils/directCallUtils'; | ||
import { getAccessToken } from '../../../utils/directCallUtils'; | ||
import { OauthFastifyRequest } from '../../../types'; | ||
|
||
export default async (fastify: FastifyInstance): Promise<void> => { | ||
fastify.get( | ||
'/:namespace/size', | ||
async ( | ||
request: OauthFastifyRequest<{ | ||
Querystring: { key: string }; | ||
Params: { namespace: string }; | ||
}>, | ||
reply: FastifyReply, | ||
) => { | ||
try { | ||
const dashConfig = getDashboardConfig(); | ||
if (dashConfig?.spec.dashboardConfig.disableS3Endpoint !== false) { | ||
reply.code(404).send('Not found'); | ||
return reply; | ||
} | ||
|
||
const { namespace } = request.params; | ||
const { key } = request.query; | ||
|
||
const requestOptions = await getDirectCallOptions(fastify, request, request.url); | ||
const token = getAccessToken(requestOptions); | ||
|
||
const { client, bucket } = await setupMinioClient(fastify, token, namespace); | ||
|
||
const size = await getObjectSize({ | ||
client, | ||
key, | ||
bucket, | ||
}); | ||
|
||
reply.send(size); | ||
} catch (err) { | ||
reply.code(500).send(err.message); | ||
return reply; | ||
} | ||
}, | ||
); | ||
|
||
fastify.get( | ||
'/:namespace', | ||
async ( | ||
request: OauthFastifyRequest<{ | ||
Querystring: { key: string; peek?: number }; | ||
Params: { namespace: string }; | ||
}>, | ||
reply: FastifyReply, | ||
) => { | ||
try { | ||
const dashConfig = getDashboardConfig(); | ||
if (dashConfig?.spec.dashboardConfig.disableS3Endpoint !== false) { | ||
reply.code(404).send('Not found'); | ||
return reply; | ||
} | ||
|
||
const { namespace } = request.params; | ||
const { key, peek } = request.query; | ||
|
||
const requestOptions = await getDirectCallOptions(fastify, request, request.url); | ||
const token = getAccessToken(requestOptions); | ||
|
||
const { client, bucket } = await setupMinioClient(fastify, token, namespace); | ||
|
||
const stream = await getObjectStream({ | ||
client, | ||
key, | ||
bucket, | ||
peek, | ||
}); | ||
|
||
reply.type('text/plain'); | ||
|
||
await new Promise<void>((resolve, reject) => { | ||
stream.on('data', (chunk) => { | ||
reply.raw.write(chunk); | ||
}); | ||
|
||
stream.on('end', () => { | ||
reply.raw.end(); | ||
resolve(); | ||
}); | ||
|
||
stream.on('error', (err) => { | ||
fastify.log.error('Stream error:', err); | ||
reply.raw.statusCode = 500; | ||
reply.raw.end(err.message); | ||
reject(err); | ||
}); | ||
}); | ||
|
||
return; | ||
} catch (err) { | ||
reply.code(500).send(err.message); | ||
return reply; | ||
} | ||
}, | ||
); | ||
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,182 @@ | ||
import { Client as MinioClient } from 'minio'; | ||
import { DSPipelineKind, KubeFastifyInstance } from '../../../types'; | ||
import { Transform, TransformOptions } from 'stream'; | ||
|
||
export interface PreviewStreamOptions extends TransformOptions { | ||
peek: number; | ||
} | ||
|
||
/** | ||
* Transform stream that only stream the first X number of bytes. | ||
*/ | ||
export class PreviewStream extends Transform { | ||
constructor({ peek, ...opts }: PreviewStreamOptions) { | ||
// acts like passthrough | ||
let transform: TransformOptions['transform'] = (chunk, _encoding, callback) => | ||
callback(undefined, chunk); | ||
// implements preview - peek must be positive number | ||
if (peek && peek > 0) { | ||
let size = 0; | ||
transform = (chunk, _encoding, callback) => { | ||
const delta = peek - size; | ||
size += chunk.length; | ||
if (size >= peek) { | ||
callback(undefined, chunk.slice(0, delta)); | ||
this.resume(); // do not handle any subsequent data | ||
return; | ||
} | ||
callback(undefined, chunk); | ||
}; | ||
} | ||
super({ ...opts, transform }); | ||
} | ||
} | ||
|
||
export async function getDspa( | ||
fastify: KubeFastifyInstance, | ||
token: string, | ||
namespace: string, | ||
): Promise<DSPipelineKind> { | ||
const dspaResponse = await fastify.kube.customObjectsApi | ||
.listNamespacedCustomObject( | ||
'datasciencepipelinesapplications.opendatahub.io', | ||
'v1alpha1', | ||
namespace, | ||
'datasciencepipelinesapplications', | ||
undefined, | ||
undefined, | ||
undefined, | ||
undefined, | ||
undefined, | ||
undefined, | ||
undefined, | ||
undefined, | ||
{ | ||
headers: { | ||
authorization: `Bearer ${token}`, | ||
}, | ||
}, | ||
) | ||
.catch((e) => { | ||
throw `A ${e.statusCode} error occurred when trying to fetch dspa aws storage credentials: ${ | ||
e.response?.body?.message || e?.response?.statusMessage | ||
}`; | ||
}); | ||
|
||
const dspas = ( | ||
dspaResponse?.body as { | ||
items: DSPipelineKind[]; | ||
} | ||
)?.items; | ||
|
||
if (!dspas || !dspas.length) { | ||
throw 'No Data Science Pipeline Application found'; | ||
} | ||
|
||
return dspas[0]; | ||
} | ||
|
||
async function getDspaSecretKeys( | ||
fastify: KubeFastifyInstance, | ||
token: string, | ||
namespace: string, | ||
dspa: DSPipelineKind, | ||
): Promise<{ accessKey: string; secretKey: string }> { | ||
try { | ||
const secret = await fastify.kube.coreV1Api.readNamespacedSecret( | ||
dspa.spec.objectStorage.externalStorage.s3CredentialsSecret.secretName, | ||
namespace, | ||
undefined, | ||
undefined, | ||
undefined, | ||
{ | ||
headers: { | ||
authorization: `Bearer ${token}`, | ||
}, | ||
}, | ||
); | ||
|
||
const accessKey = atob( | ||
secret.body.data[dspa.spec.objectStorage.externalStorage.s3CredentialsSecret.accessKey], | ||
); | ||
const secretKey = atob( | ||
secret.body.data[dspa.spec.objectStorage.externalStorage.s3CredentialsSecret.secretKey], | ||
); | ||
|
||
if (!accessKey || !secretKey) { | ||
throw 'Access key or secret key is empty'; | ||
} | ||
|
||
return { accessKey, secretKey }; | ||
} catch (err) { | ||
console.error('Unable to get dspa secret keys: ', err); | ||
throw new Error('Unable to get dspa secret keys: ' + err); | ||
} | ||
} | ||
|
||
/** | ||
* Create minio client with aws instance profile credentials if needed. | ||
* @param config minio client options where `accessKey` and `secretKey` are optional. | ||
*/ | ||
export async function setupMinioClient( | ||
fastify: KubeFastifyInstance, | ||
token: string, | ||
namespace: string, | ||
): Promise<{ client: MinioClient; bucket: string }> { | ||
try { | ||
const dspa = await getDspa(fastify, token, namespace); | ||
|
||
// check if object storage connection is available | ||
if ( | ||
dspa.status.conditions.find((condition) => condition.type === 'ObjectStoreAvailable') | ||
.status !== 'True' | ||
) { | ||
throw 'Object store is not available'; | ||
} | ||
|
||
const externalStorage = dspa.spec.objectStorage.externalStorage; | ||
if (externalStorage) { | ||
const { region, host: endPoint, bucket } = externalStorage; | ||
const { accessKey, secretKey } = await getDspaSecretKeys(fastify, token, namespace, dspa); | ||
return { | ||
client: new MinioClient({ accessKey, secretKey, endPoint, region }), | ||
bucket, | ||
}; | ||
} | ||
} catch (err) { | ||
console.error('Unable to create minio client: ', err); | ||
throw new Error('Unable to create minio client: ' + err); | ||
} | ||
} | ||
|
||
/** MinioRequestConfig describes the info required to retrieve an artifact. */ | ||
export interface MinioRequestConfig { | ||
bucket: string; | ||
key: string; | ||
client: MinioClient; | ||
peek?: number; | ||
} | ||
|
||
/** | ||
* Returns a stream from an object in a s3 compatible object store (e.g. minio). | ||
* | ||
* @param param.bucket Bucket name to retrieve the object from. | ||
* @param param.key Key of the object to retrieve. | ||
* @param param.client Minio client. | ||
* @param param.peek Number of bytes to preview. | ||
* | ||
*/ | ||
export async function getObjectStream({ | ||
key, | ||
client, | ||
bucket, | ||
peek = 1e8, // 100mb | ||
}: MinioRequestConfig): Promise<Transform> { | ||
const stream = await client.getObject(bucket, key); | ||
return stream.pipe(new PreviewStream({ peek })); | ||
} | ||
|
||
export async function getObjectSize({ bucket, key, client }: MinioRequestConfig): Promise<number> { | ||
const stat = await client.statObject(bucket, key); | ||
return stat.size; | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.