Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add s3 storage artifact route and ui integration of it #2827

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
275 changes: 262 additions & 13 deletions backend/package-lock.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
"http-errors": "^1.8.0",
"js-yaml": "^4.0.0",
"lodash": "^4.17.21",
"minio": "^7.1.3",
"pino": "^8.11.0",
"prom-client": "^14.0.1",
"ts-node": "^10.9.1"
Expand Down
105 changes: 105 additions & 0 deletions backend/src/routes/api/storage/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import { FastifyInstance, FastifyReply } from 'fastify';
import { getObjectSize, getObjectStream, setupMinioClient } from './storageUtils';
import { getDashboardConfig } from '../../../utils/resourceUtils';
import { getDirectCallOptions } from '../../../utils/directCallUtils';
import { getAccessToken } from '../../../utils/directCallUtils';
import { OauthFastifyRequest } from '../../../types';

export default async (fastify: FastifyInstance): Promise<void> => {
fastify.get(
'/:namespace/size',
async (
request: OauthFastifyRequest<{
Querystring: { key: string };
Params: { namespace: string };
}>,
reply: FastifyReply,
) => {
try {
const dashConfig = getDashboardConfig();
if (dashConfig?.spec.dashboardConfig.disableS3Endpoint !== false) {
reply.code(404).send('Not found');
return reply;
}

const { namespace } = request.params;
const { key } = request.query;

const requestOptions = await getDirectCallOptions(fastify, request, request.url);
const token = getAccessToken(requestOptions);

const { client, bucket } = await setupMinioClient(fastify, token, namespace);

const size = await getObjectSize({
client,
key,
bucket,
});

reply.send(size);
} catch (err) {
reply.code(500).send(err.message);
return reply;
}
},
);

fastify.get(
'/:namespace',
async (
request: OauthFastifyRequest<{
Querystring: { key: string; peek?: number };
Params: { namespace: string };
}>,
reply: FastifyReply,
) => {
try {
const dashConfig = getDashboardConfig();
if (dashConfig?.spec.dashboardConfig.disableS3Endpoint !== false) {
reply.code(404).send('Not found');
return reply;
}

const { namespace } = request.params;
const { key, peek } = request.query;
andrewballantyne marked this conversation as resolved.
Show resolved Hide resolved

const requestOptions = await getDirectCallOptions(fastify, request, request.url);
const token = getAccessToken(requestOptions);

const { client, bucket } = await setupMinioClient(fastify, token, namespace);

const stream = await getObjectStream({
client,
key,
bucket,
peek,
});

reply.type('text/plain');

await new Promise<void>((resolve, reject) => {
stream.on('data', (chunk) => {
reply.raw.write(chunk);
});

stream.on('end', () => {
reply.raw.end();
resolve();
});

stream.on('error', (err) => {
fastify.log.error('Stream error:', err);
reply.raw.statusCode = 500;
reply.raw.end(err.message);
reject(err);
});
});

return;
} catch (err) {
reply.code(500).send(err.message);
return reply;
}
},
);
};
185 changes: 185 additions & 0 deletions backend/src/routes/api/storage/storageUtils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
import { Client as MinioClient } from 'minio';
import { DSPipelineKind, KubeFastifyInstance } from '../../../types';
import { Transform, TransformOptions } from 'stream';

export interface PreviewStreamOptions extends TransformOptions {
peek: number;
}

/**
* Transform stream that only stream the first X number of bytes.
*/
export class PreviewStream extends Transform {
constructor({ peek, ...opts }: PreviewStreamOptions) {
// acts like passthrough
let transform: TransformOptions['transform'] = (chunk, _encoding, callback) =>
callback(undefined, chunk);
// implements preview - peek must be positive number
if (peek && peek > 0) {
let size = 0;
transform = (chunk, _encoding, callback) => {
const delta = peek - size;
size += chunk.length;
if (size >= peek) {
callback(undefined, chunk.slice(0, delta));
this.resume(); // do not handle any subsequent data
return;
}
callback(undefined, chunk);
};
}
super({ ...opts, transform });
}
}

export async function getDspa(
fastify: KubeFastifyInstance,
token: string,
namespace: string,
): Promise<DSPipelineKind> {
const dspaResponse = await fastify.kube.customObjectsApi
.listNamespacedCustomObject(
'datasciencepipelinesapplications.opendatahub.io',
'v1alpha1',
namespace,
'datasciencepipelinesapplications',
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
undefined,
{
headers: {
authorization: `Bearer ${token}`,
},
},
)
.catch((e) => {
throw `A ${e.statusCode} error occurred when trying to fetch dspa aws storage credentials: ${
e.response?.body?.message || e?.response?.statusMessage
}`;
});

const dspas = (
dspaResponse?.body as {
items: DSPipelineKind[];
}
)?.items;

if (!dspas || !dspas.length) {
throw 'No Data Science Pipeline Application found';
}

return dspas[0];
}

async function getDspaSecretKeys(
fastify: KubeFastifyInstance,
token: string,
namespace: string,
dspa: DSPipelineKind,
): Promise<{ accessKey: string; secretKey: string }> {
try {
const secret = await fastify.kube.coreV1Api.readNamespacedSecret(
dspa.spec.objectStorage.externalStorage.s3CredentialsSecret.secretName,
namespace,
undefined,
undefined,
undefined,
{
headers: {
authorization: `Bearer ${token}`,
},
},
);

const accessKey = atob(
secret.body.data[dspa.spec.objectStorage.externalStorage.s3CredentialsSecret.accessKey],
);
const secretKey = atob(
secret.body.data[dspa.spec.objectStorage.externalStorage.s3CredentialsSecret.secretKey],
);

if (!accessKey || !secretKey) {
throw 'Access key or secret key is empty';
}

return { accessKey, secretKey };
} catch (err) {
console.error('Unable to get dspa secret keys: ', err);
throw new Error('Unable to get dspa secret keys: ' + err);
}
}

/**
* Create minio client with aws instance profile credentials if needed.
* @param config minio client options where `accessKey` and `secretKey` are optional.
*/
export async function setupMinioClient(
fastify: KubeFastifyInstance,
token: string,
namespace: string,
): Promise<{ client: MinioClient; bucket: string }> {
try {
const dspa = await getDspa(fastify, token, namespace);

// check if object storage connection is available
if (
!dspa.status?.conditions?.find((c) => c.type === 'APIServerReady' && c.status === 'True') ||
!dspa.status?.conditions?.find(
(c) => c.type === 'ObjectStoreAvailable' && c.status === 'True',
)
) {
throw 'Object store is not available';
}

const externalStorage = dspa.spec.objectStorage.externalStorage;
if (externalStorage) {
const { region, host: endPoint, bucket } = externalStorage;
const { accessKey, secretKey } = await getDspaSecretKeys(fastify, token, namespace, dspa);
return {
client: new MinioClient({ accessKey, secretKey, endPoint, region }),
bucket,
};
}
} catch (err) {
console.error('Unable to create minio client: ', err);
throw new Error('Unable to create minio client: ' + err);
}
}

/** MinioRequestConfig describes the info required to retrieve an artifact. */
export interface MinioRequestConfig {
bucket: string;
key: string;
client: MinioClient;
peek?: number;
}

/**
* Returns a stream from an object in a s3 compatible object store (e.g. minio).
*
* @param param.bucket Bucket name to retrieve the object from.
* @param param.key Key of the object to retrieve.
* @param param.client Minio client.
* @param param.peek Number of bytes to preview.
*
*/
export async function getObjectStream({
key,
client,
bucket,
peek = 1e8, // 100mb
andrewballantyne marked this conversation as resolved.
Show resolved Hide resolved
}: MinioRequestConfig): Promise<Transform> {
const safePeek = Math.min(peek, 1e8); // 100mb
const stream = await client.getObject(bucket, key);
return stream.pipe(new PreviewStream({ peek: safePeek }));
}

export async function getObjectSize({ bucket, key, client }: MinioRequestConfig): Promise<number> {
const stat = await client.statObject(bucket, key);
return stat.size;
}
75 changes: 75 additions & 0 deletions backend/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ export type DashboardConfig = K8sResourceCommon & {
disableModelMesh: boolean;
disableAcceleratorProfiles: boolean;
disablePipelineExperiments: boolean;
disableS3Endpoint: boolean;
disableDistributedWorkloads: boolean;
disableModelRegistry: boolean;
};
Expand Down Expand Up @@ -1014,9 +1015,83 @@ export type K8sCondition = {
lastHeartbeatTime?: string;
};

export type DSPipelineExternalStorageKind = {
bucket: string;
host: string;
port?: '';
scheme: string;
region: string;
s3CredentialsSecret: {
accessKey: string;
secretKey: string;
secretName: string;
};
};

export type DSPipelineKind = K8sResourceCommon & {
metadata: {
name: string;
namespace: string;
};
spec: {
dspVersion: string;
apiServer?: Partial<{
apiServerImage: string;
artifactImage: string;
artifactScriptConfigMap: Partial<{
key: string;
name: string;
}>;
enableSamplePipeline: boolean;
}>;
database?: Partial<{
externalDB: Partial<{
host: string;
passwordSecret: Partial<{
key: string;
name: string;
}>;
pipelineDBName: string;
port: string;
username: string;
}>;
image: string;
mariaDB: Partial<{
image: string;
passwordSecret: Partial<{
key: string;
name: string;
}>;
pipelineDBName: string;
username: string;
}>;
}>;
mlpipelineUI?: {
configMap?: string;
image: string;
};
persistentAgent?: Partial<{
image: string;
pipelineAPIServerName: string;
}>;
scheduledWorkflow?: Partial<{
image: string;
}>;
objectStorage: Partial<{
externalStorage: DSPipelineExternalStorageKind;
minio: Partial<{
bucket: string;
image: string;
s3CredentialsSecret: Partial<{
accessKey: string;
secretKey: string;
secretName: string;
}>;
}>;
}>;
viewerCRD?: Partial<{
image: string;
}>;
};
status?: {
conditions?: K8sCondition[];
Expand Down
1 change: 1 addition & 0 deletions backend/src/utils/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ export const blankDashboardCR: DashboardConfig = {
disableModelMesh: false,
disableAcceleratorProfiles: false,
disablePipelineExperiments: true,
disableS3Endpoint: true,
disableDistributedWorkloads: false,
disableModelRegistry: true,
},
Expand Down
Loading
Loading