Skip to content

Commit

Permalink
feat(kv-web): implement reconnects and bucket caching
Browse files Browse the repository at this point in the history
Signed-off-by: Roman Volosatovs <[email protected]>
  • Loading branch information
rvolosatovs committed Nov 8, 2024
1 parent 711da0f commit 6817f9c
Showing 1 changed file with 69 additions and 60 deletions.
129 changes: 69 additions & 60 deletions examples/web/ui/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
}
</style>
<script type="module">
let conn;
let buckets = new Map();

function toHex(v) {
if (!v) return '';
return v.reduce((s, x) => s + x.toString(16).padStart(2, '0'), '')
Expand Down Expand Up @@ -282,15 +285,14 @@
}

async function getBucket() {
if (!transport) {
throw 'transport not connected';
}
if (!conn) throw 'WebTransport not connected';

const obj = getFormValues('#settings');
let identifier = '';
let conn = obj.connection;
if (conn === 'mem') {
let proto = obj.proto;
if (proto === 'mem') {
identifier = '';
} else if (conn === 'nats') {
} else if (proto === 'nats') {
const addr = obj['nats-addr'];
if (!addr) throw 'NATS.io server address must be set';
identifier = 'wrpc+nats://' + addr;
Expand All @@ -300,55 +302,57 @@

const bucket = obj['nats-bucket'];
if (bucket) identifier = identifier + ';' + bucket;
} else if (conn === 'redis') {
} else if (proto === 'redis') {
const url = obj['redis-url'];
if (!url) throw 'Redis URL must be set';
identifier = url;
} else if (conn === 'quic') {
} else if (proto === 'quic') {
const addr = obj['quic-addr'];
if (!addr) throw 'QUIC address must be set';
identifier = 'wrpc+quic://' + addr;

const bucket = obj['quic-bucket'];
if (bucket) identifier = identifier + ';' + bucket;
} else if (conn === 'tcp') {
} else if (proto === 'tcp') {
const addr = obj['tcp-addr'];
if (!addr) throw 'TCP address must be set';
identifier = 'wrpc+tcp://' + addr;

const bucket = obj['tcp-bucket'];
if (bucket) identifier = identifier + ';' + bucket;
} else if (conn === 'unix') {
} else if (proto === 'unix') {
const path = obj['unix-path'];
if (!path) throw 'Unix Domain Socket path must be set';
identifier = 'wrpc+unix://' + path;

const bucket = obj['unix-bucket']
if (bucket) identifier = identifier + ';' + bucket;
} else if (conn === 'web') {
} else if (proto === 'web') {
const addr = obj['web-addr'];
if (!addr) throw 'WebTransport address must be set';
identifier = 'wrpc+web://' + addr;

const bucket = obj['web-bucket'];
if (bucket) identifier = identifier + ';' + bucket;
} else {
throw 'transport not supported yet';
throw 'selected wRPC transport not supported yet';
}

if (identifier.length > 127) throw 'this example currently does not support identifiers longer than 127 bytes - open a PR!';

// TODO: cache the bucket by URL (it should not persist across server restarts)
const bucket = buckets.get(identifier);
if (bucket) return bucket;

dbg.debug('creating `open` stream...');
const stream = await transport.createBidirectionalStream();
const stream = await conn.createBidirectionalStream();
return await wasiKeyvalueStoreOpen(
stream.writable.getWriter(),
stream.readable.getReader(),
identifier,
).then(bucket => {
const bucketName = uuidString(bucket);
dbg.log(`opened bucket ${bucketName}`);
buckets.set(identifier, bucket);
return bucket;
}, err => {
throw 'failed to open bucket: ' + err;
Expand All @@ -360,7 +364,8 @@
const getValue = document.querySelector('#get input[name="get-value"]');
if (getValue) getValue.value = null;

if (!transport) throw 'transport not connected';
if (!conn) throw 'WebTransport not connected';

const bucket = await getBucket();
if (!bucket) throw 'failed to get bucket';

Expand All @@ -371,7 +376,7 @@
if (key.length > 127) throw 'this example currently does not support keys longer than 127 bytes - open a PR!';

dbg.debug('creating `get` stream...');
const stream = await transport.createBidirectionalStream();
const stream = await conn.createBidirectionalStream();
await wasiKeyvalueStoreBucketGet(
stream.writable.getWriter(),
stream.readable.getReader(),
Expand All @@ -392,14 +397,11 @@
}

async function handleSet() {
if (!transport) {
dbg.error('transport not connected')
return
}
if (!conn) throw 'WebTransport not connected';

const bucket = await getBucket();
if (bucket == null) {
return
}
if (!bucket) throw 'failed to get bucket';

const obj = getFormValues('#set');

const key = obj['set-key'];
Expand All @@ -416,7 +418,7 @@
if (valueBuf.length > 127) throw 'this example currently does not support values longer than 127 bytes - open a PR!';

dbg.debug('creating `set` stream...');
const stream = await transport.createBidirectionalStream();
const stream = await conn.createBidirectionalStream();
await wasiKeyvalueStoreBucketSet(
stream.writable.getWriter(),
stream.readable.getReader(),
Expand All @@ -431,36 +433,6 @@
});
}

async function connect() {
const {PORT, CERT_DIGEST} = await import('./consts.js');
let transport;

dbg.log('connecting to wRPC over WebTransport on `' + PORT + '`...');
try {
transport = new WebTransport('https://localhost:' + PORT, {
serverCertificateHashes: [
{
algorithm: 'sha-256',
value: CERT_DIGEST.buffer
}
]
});
} catch (e) {
dbg.error('failed to connect: ' + e);
return null;
}

dbg.debug('waiting for WebTransport readiness...');
try {
await transport.ready;
} catch (e) {
throw 'failed to await WebTransport readiness: ' + e;
return null;
}
dbg.info('WebTransport connection established');
return transport;
}

const dbg = (() => {
const output = document.querySelector('#message-output');
const className = {
Expand Down Expand Up @@ -496,16 +468,16 @@
// @ts-check
function initUI() {
function updateTemplate() {
const option = connectionDropdown?.value ?? 'mem'
const option = protoDropdown?.value ?? 'mem'
const defaultTemplate = document.querySelector('.form-fields[data-option=default]');
const templateOutput = document.querySelector('#template-output');
const template = document.querySelector(`.form-fields[data-option=${option}]`) ?? defaultTemplate;
if (templateOutput && template) templateOutput.innerHTML = template.innerHTML;
}

/** @type {HTMLSelectElement | null} */
const connectionDropdown = document.querySelector('#connection');
connectionDropdown?.addEventListener('change', updateTemplate);
const protoDropdown = document.querySelector('#proto');
protoDropdown?.addEventListener('change', updateTemplate);
updateTemplate();

/** @type {HTMLFormElement | null} */
Expand Down Expand Up @@ -538,8 +510,45 @@

initUI();

let transport = await connect().catch(err => dbg.error('failed to connect to server: ' + err));
// TODO: Reconnect on failure
const {PORT, CERT_DIGEST} = await import('./consts.js');
for (; ;) {
dbg.log('connecting to wRPC over WebTransport on `' + PORT + '`...');
let c;
try {
c = new WebTransport('https://localhost:' + PORT, {
serverCertificateHashes: [
{
algorithm: 'sha-256',
value: CERT_DIGEST.buffer
}
]
});
} catch (err) {
dbg.error('failed to establish WebTransport connection: ' + err);
await new Promise(r => setTimeout(r, 1000));
continue;
}

dbg.debug('waiting for WebTransport readiness...');
try {
await c.ready;
} catch (err) {
dbg.error('failed to await WebTransport readiness: ' + err);
await new Promise(r => setTimeout(r, 1000));
continue;
}

conn = c;
dbg.info('WebTransport connection established');
try {
const {closeCode, reason} = await conn.closed;
dbg.log(`WebTransport connection closed with code '${closeCode}': ${reason}`);
} catch (err) {
dbg.error('WebTransport connection failed: ' + err);
}
conn = null;
}

</script>
</head>

Expand All @@ -550,7 +559,7 @@ <h1 class="title"><code>wasi:keyvalue</code></h1>
</div>
<div class="column is-narrow">
<div class="select">
<select id="connection" form="settings" name="connection">
<select id="proto" form="settings" name="proto">
<option value="mem">In-memory</option>
<option value="redis">Redis</option>
<option value="nats">wRPC/NATS.io</option>
Expand Down

0 comments on commit 6817f9c

Please sign in to comment.