Skip to content

Commit

Permalink
Add s3 storage artifact route and ui integration of it
Browse files Browse the repository at this point in the history
chore: Update artifact URL generation logic

chore: Update artifact URL generation logic

chore: Refactor storage route to handle S3 endpoint disablement

chore: Update artifact URL generation logic
  • Loading branch information
Gkrumbach07 committed Jun 6, 2024
1 parent f7c9979 commit 740b2d4
Show file tree
Hide file tree
Showing 39 changed files with 1,218 additions and 126 deletions.
275 changes: 262 additions & 13 deletions backend/package-lock.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
"http-errors": "^1.8.0",
"js-yaml": "^4.0.0",
"lodash": "^4.17.21",
"minio": "^7.1.3",
"pino": "^8.11.0",
"prom-client": "^14.0.1",
"ts-node": "^10.9.1"
Expand Down
105 changes: 105 additions & 0 deletions backend/src/routes/api/storage/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import { FastifyInstance, FastifyReply } from 'fastify';
import { getObjectSize, getObjectStream, setupMinioClient } from './storageUtils';
import { getDashboardConfig } from '../../../utils/resourceUtils';
import { getDirectCallOptions } from '../../../utils/directCallUtils';
import { getAccessToken } from '../../../utils/directCallUtils';
import { OauthFastifyRequest } from '../../../types';

export default async (fastify: FastifyInstance): Promise<void> => {
fastify.get(
'/:namespace/size',
async (
request: OauthFastifyRequest<{
Querystring: { key: string };
Params: { namespace: string };
}>,
reply: FastifyReply,
) => {
try {
const dashConfig = getDashboardConfig();
if (dashConfig?.spec.dashboardConfig.disableS3Endpoint !== false) {
reply.code(404).send('Not found');
return reply;
}

const { namespace } = request.params;
const { key } = request.query;

const requestOptions = await getDirectCallOptions(fastify, request, request.url);
const token = getAccessToken(requestOptions);

const { client, bucket } = await setupMinioClient(fastify, token, namespace);

const size = await getObjectSize({
client,
key,
bucket,
});

reply.send(size);
} catch (err) {
reply.code(500).send(err.message);
return reply;
}
},
);

fastify.get(
'/:namespace',
async (
request: OauthFastifyRequest<{
Querystring: { key: string; peek?: number };
Params: { namespace: string };
}>,
reply: FastifyReply,
) => {
try {
const dashConfig = getDashboardConfig();
if (dashConfig?.spec.dashboardConfig.disableS3Endpoint !== false) {
reply.code(404).send('Not found');
return reply;
}

const { namespace } = request.params;
const { key, peek } = request.query;

const requestOptions = await getDirectCallOptions(fastify, request, request.url);
const token = getAccessToken(requestOptions);

const { client, bucket } = await setupMinioClient(fastify, token, namespace);

const stream = await getObjectStream({
client,
key,
bucket,
peek,
});

reply.type('text/plain');

await new Promise<void>((resolve, reject) => {
stream.on('data', (chunk) => {
reply.raw.write(chunk);
});

stream.on('end', () => {
reply.raw.end();
resolve();
});

stream.on('error', (err) => {
fastify.log.error('Stream error:', err);
reply.raw.statusCode = 500;
reply.raw.end(err.message);
reject(err);
});
});

return;
} catch (err) {
reply.code(500).send(err.message);
return reply;
}
},
);
};
182 changes: 182 additions & 0 deletions backend/src/routes/api/storage/storageUtils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
import { Client as MinioClient } from 'minio';
import { DSPipelineKind, KubeFastifyInstance } from '../../../types';
import { Transform, TransformOptions } from 'stream';

export interface PreviewStreamOptions extends TransformOptions {
peek: number;
}

/**
* Transform stream that only stream the first X number of bytes.
*/
export class PreviewStream extends Transform {
constructor({ peek, ...opts }: PreviewStreamOptions) {
// acts like passthrough
let transform: TransformOptions['transform'] = (chunk, _encoding, callback) =>
callback(undefined, chunk);
// implements preview - peek must be positive number
if (peek && peek > 0) {
let size = 0;
transform = (chunk, _encoding, callback) => {
const delta = peek - size;
size += chunk.length;
if (size >= peek) {
callback(undefined, chunk.slice(0, delta));
this.resume(); // do not handle any subsequent data
return;
}
callback(undefined, chunk);
};
}
super({ ...opts, transform });
}
}

export async function getDspa(
fastify: KubeFastifyInstance,
token: string,
namespace: string,
): Promise<DSPipelineKind> {
const dspaResponse = await fastify.kube.customObjectsApi
.listNamespacedCustomObject(
'datasciencepipelinesapplications.opendatahub.io',
'v1alpha1',
namespace,
'datasciencepipelinesapplications',
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
{
headers: {
authorization: `Bearer ${token}`,
},
},
)
.catch((e) => {
throw `A ${e.statusCode} error occurred when trying to fetch dspa aws storage credentials: ${
e.response?.body?.message || e?.response?.statusMessage
}`;
});

const dspas = (
dspaResponse?.body as {
items: DSPipelineKind[];
}
)?.items;

if (!dspas || !dspas.length) {
throw 'No Data Science Pipeline Application found';
}

return dspas[0];
}

async function getDspaSecretKeys(
fastify: KubeFastifyInstance,
token: string,
namespace: string,
dspa: DSPipelineKind,
): Promise<{ accessKey: string; secretKey: string }> {
try {
const secret = await fastify.kube.coreV1Api.readNamespacedSecret(
dspa.spec.objectStorage.externalStorage.s3CredentialsSecret.secretName,
namespace,
undefined,
undefined,
undefined,
{
headers: {
authorization: `Bearer ${token}`,
},
},
);

const accessKey = atob(
secret.body.data[dspa.spec.objectStorage.externalStorage.s3CredentialsSecret.accessKey],
);
const secretKey = atob(
secret.body.data[dspa.spec.objectStorage.externalStorage.s3CredentialsSecret.secretKey],
);

if (!accessKey || !secretKey) {
throw 'Access key or secret key is empty';
}

return { accessKey, secretKey };
} catch (err) {
console.error('Unable to get dspa secret keys: ', err);
throw new Error('Unable to get dspa secret keys: ' + err);
}
}

/**
* Create minio client with aws instance profile credentials if needed.
* @param config minio client options where `accessKey` and `secretKey` are optional.
*/
export async function setupMinioClient(
fastify: KubeFastifyInstance,
token: string,
namespace: string,
): Promise<{ client: MinioClient; bucket: string }> {
try {
const dspa = await getDspa(fastify, token, namespace);

// check if object storage connection is available
if (
dspa.status.conditions.find((condition) => condition.type === 'ObjectStoreAvailable')
.status !== 'True'
) {
throw 'Object store is not available';
}

const externalStorage = dspa.spec.objectStorage.externalStorage;
if (externalStorage) {
const { region, host: endPoint, bucket } = externalStorage;
const { accessKey, secretKey } = await getDspaSecretKeys(fastify, token, namespace, dspa);
return {
client: new MinioClient({ accessKey, secretKey, endPoint, region }),
bucket,
};
}
} catch (err) {
console.error('Unable to create minio client: ', err);
throw new Error('Unable to create minio client: ' + err);
}
}

/** MinioRequestConfig describes the info required to retrieve an artifact. */
export interface MinioRequestConfig {
bucket: string;
key: string;
client: MinioClient;
peek?: number;
}

/**
* Returns a stream from an object in a s3 compatible object store (e.g. minio).
*
* @param param.bucket Bucket name to retrieve the object from.
* @param param.key Key of the object to retrieve.
* @param param.client Minio client.
* @param param.peek Number of bytes to preview.
*
*/
export async function getObjectStream({
key,
client,
bucket,
peek = 1e8, // 100mb
}: MinioRequestConfig): Promise<Transform> {
const stream = await client.getObject(bucket, key);
return stream.pipe(new PreviewStream({ peek }));
}

export async function getObjectSize({ bucket, key, client }: MinioRequestConfig): Promise<number> {
const stat = await client.statObject(bucket, key);
return stat.size;
}
75 changes: 75 additions & 0 deletions backend/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ export type DashboardConfig = K8sResourceCommon & {
disableModelMesh: boolean;
disableAcceleratorProfiles: boolean;
disablePipelineExperiments: boolean;
disableS3Endpoint: boolean;
disableDistributedWorkloads: boolean;
disableModelRegistry: boolean;
};
Expand Down Expand Up @@ -1014,9 +1015,83 @@ export type K8sCondition = {
lastHeartbeatTime?: string;
};

export type DSPipelineExternalStorageKind = {
bucket: string;
host: string;
port?: '';
scheme: string;
region: string;
s3CredentialsSecret: {
accessKey: string;
secretKey: string;
secretName: string;
};
};

export type DSPipelineKind = K8sResourceCommon & {
metadata: {
name: string;
namespace: string;
};
spec: {
dspVersion: string;
apiServer?: Partial<{
apiServerImage: string;
artifactImage: string;
artifactScriptConfigMap: Partial<{
key: string;
name: string;
}>;
enableSamplePipeline: boolean;
}>;
database?: Partial<{
externalDB: Partial<{
host: string;
passwordSecret: Partial<{
key: string;
name: string;
}>;
pipelineDBName: string;
port: string;
username: string;
}>;
image: string;
mariaDB: Partial<{
image: string;
passwordSecret: Partial<{
key: string;
name: string;
}>;
pipelineDBName: string;
username: string;
}>;
}>;
mlpipelineUI?: {
configMap?: string;
image: string;
};
persistentAgent?: Partial<{
image: string;
pipelineAPIServerName: string;
}>;
scheduledWorkflow?: Partial<{
image: string;
}>;
objectStorage: Partial<{
externalStorage: DSPipelineExternalStorageKind;
minio: Partial<{
bucket: string;
image: string;
s3CredentialsSecret: Partial<{
accessKey: string;
secretKey: string;
secretName: string;
}>;
}>;
}>;
viewerCRD?: Partial<{
image: string;
}>;
};
status?: {
conditions?: K8sCondition[];
Expand Down
1 change: 1 addition & 0 deletions backend/src/utils/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ export const blankDashboardCR: DashboardConfig = {
disableModelMesh: false,
disableAcceleratorProfiles: false,
disablePipelineExperiments: true,
disableS3Endpoint: false,
disableDistributedWorkloads: false,
disableModelRegistry: true,
},
Expand Down
Loading

0 comments on commit 740b2d4

Please sign in to comment.