Skip to content

Commit

Permalink
Resolve SRV urls (#198)
Browse files Browse the repository at this point in the history
  • Loading branch information
mattijauhiainen authored May 19, 2021
1 parent a9ccb29 commit fc3f3e6
Show file tree
Hide file tree
Showing 9 changed files with 569 additions and 156 deletions.
112 changes: 20 additions & 92 deletions src/client.ts
Original file line number Diff line number Diff line change
@@ -1,97 +1,29 @@
import { assert } from "../deps.ts";
import { Database } from "./database.ts";
import { WireProtocol } from "./protocol/mod.ts";
import {
ConnectOptions,
Credential,
Document,
ListDatabaseInfo,
} from "./types.ts";
import { ConnectOptions, Document, ListDatabaseInfo } from "./types.ts";
import { parse } from "./utils/uri.ts";
import { AuthContext, ScramAuthPlugin, X509AuthPlugin } from "./auth/mod.ts";
import { MongoError } from "./error.ts";
import { Cluster } from "./cluster.ts";
import { assert } from "../deps.ts";

const DENO_DRIVER_VERSION = "0.0.1";

export interface DenoConnectOptions {
hostname: string;
port: number;
certFile?: string;
}

export class MongoClient {
#protocol?: WireProtocol;
#conn?: Deno.Conn;
#cluster?: Cluster;

async connect(
options: ConnectOptions | string,
serverIndex: number = 0,
): Promise<Database> {
try {
if (typeof options === "string") {
options = parse(options);
}
let conn;
const denoConnectOps: DenoConnectOptions = {
hostname: options.servers[serverIndex].host,
port: options.servers[serverIndex].port,
};
if (options.tls) {
if (options.certFile) {
denoConnectOps.certFile = options.certFile;
}
if (options.keyFile) {
if (options.keyFilePassword) {
throw new MongoError(
`Tls keyFilePassword not implemented in Deno driver`,
);
//TODO, need something like const key = decrypt(options.keyFile) ...
}
throw new MongoError(`Tls keyFile not implemented in Deno driver`);
//TODO, need Deno.connectTls with something like key or keyFile option.
}
conn = await Deno.connectTls(denoConnectOps);
} else {
conn = await Deno.connect(denoConnectOps);
}

this.#conn = conn;
this.#protocol = new WireProtocol(conn);

if ((options as ConnectOptions).credential) {
const authContext = new AuthContext(
this.#protocol,
(options as ConnectOptions).credential,
options as ConnectOptions,
);
const mechanism = (options as ConnectOptions).credential!.mechanism;
let authPlugin;
if (mechanism === "SCRAM-SHA-256") {
authPlugin = new ScramAuthPlugin("sha256"); //TODO AJUST sha256
} else if (mechanism === "SCRAM-SHA-1") {
authPlugin = new ScramAuthPlugin("sha1");
} else if (mechanism === "MONGODB-X509") {
authPlugin = new X509AuthPlugin();
} else {
throw new MongoError(
`Auth mechanism not implemented in Deno driver: ${mechanism}`,
);
}
const request = authPlugin.prepare(authContext);
authContext.response = await this.#protocol.commandSingle(
"admin",
request,
);
await authPlugin.auth(authContext);
} else {
await this.#protocol.connect();
}
const parsedOptions = typeof options === "string"
? await parse(options)
: options;
const cluster = new Cluster(parsedOptions);
await cluster.connect();
await cluster.authenticate();
await cluster.updateMaster();
this.#cluster = cluster;
} catch (e) {
if (serverIndex < (options as ConnectOptions).servers.length - 1) {
return await this.connect(options, serverIndex + 1);
} else {
throw new MongoError(`Connection failed: ${e.message || e}`);
}
throw new MongoError(`Connection failed: ${e.message || e}`);
}
return this.database((options as ConnectOptions).db);
}
Expand All @@ -102,11 +34,11 @@ export class MongoClient {
authorizedCollections?: boolean;
comment?: Document;
}): Promise<ListDatabaseInfo[]> {
assert(this.#protocol);
if (!options) {
options = {};
}
const { databases } = await this.#protocol.commandSingle("admin", {
assert(this.#cluster);
const { databases } = await this.#cluster.protocol.commandSingle("admin", {
listDatabases: 1,
...options,
});
Expand All @@ -115,20 +47,16 @@ export class MongoClient {

// TODO: add test cases
async runCommand<T = any>(db: string, body: Document): Promise<T> {
assert(this.#protocol);
return await this.#protocol.commandSingle(db, body);
assert(this.#cluster);
return await this.#cluster.protocol.commandSingle(db, body);
}

database(name: string): Database {
assert(this.#protocol);
return new Database(this.#protocol, name);
assert(this.#cluster);
return new Database(this.#cluster, name);
}

close() {
if (this.#conn) {
Deno.close(this.#conn.rid);
this.#conn = undefined;
this.#protocol = undefined;
}
if (this.#cluster) this.#cluster.close();
}
}
133 changes: 133 additions & 0 deletions src/cluster.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import { WireProtocol } from "./protocol/mod.ts";
import { ConnectOptions } from "./types.ts";
import { AuthContext, ScramAuthPlugin, X509AuthPlugin } from "./auth/mod.ts";
import { MongoError } from "./error.ts";
import { assert } from "../deps.ts";
import { Server } from "./types.ts";

export interface DenoConnectOptions {
hostname: string;
port: number;
certFile?: string;
}

export class Cluster {
#options: ConnectOptions;
#connections: Deno.Conn[];
#protocols: WireProtocol[];
#masterIndex: number;

constructor(options: ConnectOptions) {
this.#options = options;
this.#connections = [];
this.#protocols = [];
this.#masterIndex = -1;
}

async connect() {
const options = this.#options;
this.#connections = await Promise.all(
options.servers.map((server) => this.connectToServer(server, options)),
);
}

async connectToServer(server: Server, options: ConnectOptions) {
const denoConnectOps: DenoConnectOptions = {
hostname: server.host,
port: server.port,
};
if (options.tls) {
if (options.certFile) {
denoConnectOps.certFile = options.certFile;
}
if (options.keyFile) {
if (options.keyFilePassword) {
throw new MongoError(
`Tls keyFilePassword not implemented in Deno driver`,
);
//TODO, need something like const key = decrypt(options.keyFile) ...
}
throw new MongoError(`Tls keyFile not implemented in Deno driver`);
//TODO, need Deno.connectTls with something like key or keyFile option.
}
return await Deno.connectTls(denoConnectOps);
} else {
return await Deno.connect(denoConnectOps);
}
}

async authenticate() {
const options = this.#options;
this.#protocols = await Promise.all(
this.#connections.map((conn) => this.authenticateToServer(conn, options)),
);
}

async authenticateToServer(conn: Deno.Conn, options: ConnectOptions) {
const protocol = new WireProtocol(conn);
if (options.credential) {
const authContext = new AuthContext(
protocol,
options.credential,
options,
);
const mechanism = options.credential!.mechanism;
let authPlugin;
if (mechanism === "SCRAM-SHA-256") {
authPlugin = new ScramAuthPlugin("sha256"); //TODO AJUST sha256
} else if (mechanism === "SCRAM-SHA-1") {
authPlugin = new ScramAuthPlugin("sha1");
} else if (mechanism === "MONGODB-X509") {
authPlugin = new X509AuthPlugin();
} else {
throw new MongoError(
`Auth mechanism not implemented in Deno driver: ${mechanism}`,
);
}
const request = authPlugin.prepare(authContext);
authContext.response = await protocol.commandSingle(
"admin", // TODO: Should get the auth db from connectionOptions?
request,
);
await authPlugin.auth(authContext);
} else {
await protocol.connect();
}
return protocol;
}

async updateMaster() {
const results = await Promise.all(this.#protocols.map((protocol) => {
return protocol.commandSingle(
"admin",
{ hello: 1 },
);
}));
const masterIndex = results.findIndex((result) => result.isWritablePrimary);
if (masterIndex === -1) throw new Error(`Could not find a master node`);
this.#masterIndex = masterIndex;
}

private getMaster() {
return {
protocol: this.#protocols[this.#masterIndex],
conn: this.#connections[this.#masterIndex],
};
}

get protocol() {
const protocol = this.getMaster().protocol;
assert(protocol);
return protocol;
}

close() {
this.#connections.forEach((connection) => {
try {
Deno.close(connection.rid);
} catch (error) {
console.error(`Error closing connection: ${error}`);
}
});
}
}
19 changes: 10 additions & 9 deletions src/database.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Collection } from "./collection/mod.ts";
import { CommandCursor, WireProtocol } from "./protocol/mod.ts";
import { CommandCursor } from "./protocol/mod.ts";
import { CreateUserOptions, Document } from "./types.ts";
import { Cluster } from "./cluster.ts";

interface ListCollectionsReponse {
cursor: {
Expand All @@ -22,14 +23,14 @@ export interface ListCollectionsResult {
}

export class Database {
#protocol: WireProtocol;
#cluster: Cluster;

constructor(protocol: WireProtocol, readonly name: string) {
this.#protocol = protocol;
constructor(cluster: Cluster, readonly name: string) {
this.#cluster = cluster;
}

collection<T>(name: string): Collection<T> {
return new Collection(this.#protocol, this.name, name);
return new Collection(this.#cluster.protocol, this.name, name);
}

listCollections(options?: {
Expand All @@ -42,9 +43,9 @@ export class Database {
options = {};
}
return new CommandCursor<ListCollectionsResult>(
this.#protocol,
this.#cluster.protocol,
async () => {
const { cursor } = await this.#protocol.commandSingle<
const { cursor } = await this.#cluster.protocol.commandSingle<
ListCollectionsReponse
>(this.name, {
listCollections: 1,
Expand Down Expand Up @@ -82,7 +83,7 @@ export class Database {
password: string,
options?: CreateUserOptions,
) {
await this.#protocol.commandSingle(this.name, {
await this.#cluster.protocol.commandSingle(this.name, {
createUser: options?.username ?? username,
pwd: options?.password ?? password,
customData: options?.customData,
Expand All @@ -99,7 +100,7 @@ export class Database {
writeConcern?: Document;
comment?: Document;
}) {
await this.#protocol.commandSingle(this.name, {
await this.#cluster.protocol.commandSingle(this.name, {
dropUser: username,
writeConcern: options?.writeConcern,
comment: options?.comment,
Expand Down
Loading

0 comments on commit fc3f3e6

Please sign in to comment.