Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

internal-layer-protocol #211

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions src/blockstore/fp-envelope.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import { CID } from "multiformats";
import { encode, decode } from "cborg";
import { Result } from "@adviser/cement"

export interface FPEnvelope<T> {
readonly type: string; // "car" | "file" | "meta" | "wal"
readonly payload: T
}

export interface FPEnvelopeCar extends FPEnvelope<Uint8Array> {
readonly type: "car";
}

export interface FPEnvelopeFile extends FPEnvelope<Uint8Array> {
readonly type: "file";
}

export interface FPMeta {
readonly cid: string;
readonly data: Uint8Array;
readonly parents: string[];
}

export interface FPEnvelopeMeta extends FPEnvelope<FPMeta> {
readonly type: "meta";
}

export interface FPWALCarsOps {
readonly cars: CID[];
}
export interface FPWAL {
// fileOperations: any[]; will be added with connector-fixes
// noLoaderOps: any[]; will be added with connector-fixes
readonly operations: FPWALCarsOps[];
}
export interface FPEnvelopeWAL extends FPEnvelope<FPWAL> {
readonly type: "wal";
}

export function WAL2FPMsg(fpwal: FPWAL): Uint8Array {
return encode({ type: "wal", payload: JSON.parse(JSON.stringify(fpwal)) } as FPEnvelopeWAL);
}

export function FPMsg2WAL(fpmsg: Uint8Array): Result<FPWAL> {
const renv = FPMsgMatch2Envelope(fpmsg, "wal");
if (renv.isErr()) {
return Result.Err(renv.Err());
}
const convertCids = renv.Ok().payload as FPWAL;
for (const op of convertCids.operations) {
const cars = []
for (const strCid of op.cars) {
for (const cidVal of Object.values(strCid)) {
cars.push(CID.parse(cidVal));
}
}
(op as {cars: CID[]}).cars = cars;
}
return Result.Ok(renv.Ok().payload as FPWAL);
}

export function Meta2FPMsg(fpmeta: FPMeta): Uint8Array {
return encode({ type: "meta", payload: fpmeta } as FPEnvelopeMeta);
}

export function FPMsg2Meta(fpmsg: Uint8Array): Result<FPMeta> {
const renv = FPMsgMatch2Envelope(fpmsg, "meta");
if (renv.isErr()) {
return Result.Err(renv.Err());
}
return Result.Ok(renv.Ok().payload as FPMeta);
}

export function Car2FPMsg(fpcar: Uint8Array): Uint8Array {
return encode({ type: "car", payload: fpcar } as FPEnvelopeCar);
}

export function FPMsg2Car(fpmsg: Uint8Array): Result<Uint8Array> {
const renv = FPMsgMatch2Envelope(fpmsg, "car");
if (renv.isErr()) {
return Result.Err(renv.Err());
}
return Result.Ok(renv.Ok().payload as Uint8Array);
}

export function File2FPMsg(fpfile: Uint8Array): Uint8Array {
return encode({ type: "file", payload: fpfile } as FPEnvelopeFile);
}

export function FPMsg2File(fpmsg: Uint8Array): Result<Uint8Array> {
const renv = FPMsgMatch2Envelope(fpmsg, "file");
if (renv.isErr()) {
return Result.Err(renv.Err());
}
return Result.Ok(renv.Ok().payload as Uint8Array);
}

export function FPMsgMatch2Envelope(fpmsg: Uint8Array, ...types: string[]): Result<FPEnvelope<unknown>> {
let env: FPEnvelope<unknown>;
try {
env = decode(fpmsg);
} catch (e) {
return Result.Err(`failed to decode envelope: ${e}`);
}
if (typeof env !== "object") {
return Result.Err(`expected envelope to be an object`);
}
if (typeof env.type !== "string") {
return Result.Err(`expected type to be a string`);
}
if (types.length > 0 && !types.includes(env.type)) {
return Result.Err(`expected type to be ${types}`);
}
// need to check if the payload is a valid WAL
return Result.Ok(env);
}
2 changes: 2 additions & 0 deletions src/blockstore/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ export * from "./store-factory.js";
export * from "./gateway.js";
export * from "./fragment-gateway.js";

export * from "./fp-envelope.js"

import { type Connectable } from "./connection-base.js";
export { Connectable };

Expand Down
23 changes: 21 additions & 2 deletions src/blockstore/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import { keyedCryptoFactory } from "../runtime/keyed-crypto.js";
import { KeyBag } from "../runtime/key-bag.js";
import { FragmentGateway } from "./fragment-gateway.js";
import { Link } from "multiformats";
import { FPEnvelopeCar, FPEnvelopeFile, FPMsg2Car, FPMsgMatch2Envelope } from "./fp-envelope.js";

function guardVersion(url: URI): Result<URI> {
if (!url.hasParam("version")) {
Expand Down Expand Up @@ -287,7 +288,19 @@ export class DataStoreImpl extends BaseStoreImpl implements DataStore {
if (res.isErr()) {
throw res.Err();
}
return { cid, bytes: res.Ok() };
const rfpenv = FPMsgMatch2Envelope(res.Ok(), "car", "file");
if (rfpenv.isErr()) {
throw this.logger.Error().Err(rfpenv.Err()).Msg("got error from FPMsgMatch2Envelope").AsError();
}
const fpenv = rfpenv.Ok() as FPEnvelopeFile|FPEnvelopeCar
switch (fpenv.type) {
case "car":
return { cid, bytes: fpenv.payload };
case "file":
return { cid, bytes: fpenv.payload };
default:
throw this.logger.Error().Msg("unexpected type").AsError();
}
}

// eslint-disable-next-line @typescript-eslint/no-unused-vars
Expand All @@ -297,7 +310,13 @@ export class DataStoreImpl extends BaseStoreImpl implements DataStore {
if (url.isErr()) {
throw this.logger.Error().Err(url.Err()).Ref("cid", car.cid).Msg("got error from gateway.buildUrl").AsError();
}
const res = await this.gateway.put(url.Ok(), car.bytes);
// without URL changes in super-this branch we
// can distinguish between car and file
const rfpenv = FPMsg2Car(car.bytes);
if (rfpenv.isErr()) {
throw this.logger.Error().Err(rfpenv.Err()).Msg("got error from FPMsg2Car").AsError();
}
const res = await this.gateway.put(url.Ok(), rfpenv.Ok());
if (res.isErr()) {
throw this.logger.Error().Err(res.Err()).Msg("got error from gateway.put").AsError();
}
Expand Down
14 changes: 13 additions & 1 deletion src/runtime/gateways/file/gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { ensureLogger, exceptionWrapper, isNotFoundError, NotFoundError } from "
import { Gateway, GetResult, TestGateway } from "../../../blockstore/gateway.js";
import { getFileName, getFileSystem, getPath } from "./utils.js";
import { SuperThis, SysFileSystem } from "../../../types.js";
import { FPEnvelopeCar, FPEnvelopeFile, FPMsg2Car, FPMsgMatch2Envelope } from "../../../blockstore/fp-envelope.js";

const versionFiles = new KeyedResolvOnce<string>();

Expand Down Expand Up @@ -77,10 +78,18 @@ export class FileGateway implements Gateway {
}

async put(url: URI, body: Uint8Array): Promise<Result<void>> {
const rbuf = FPMsgMatch2Envelope(body) as Result<FPEnvelopeCar | FPEnvelopeFile>;
if (rbuf.isErr()) {
return Result.Err(rbuf.Err());
}
let payload = body
if (["car", "file"].includes(rbuf.Ok().type)) {
payload = rbuf.Ok().payload
}
return exception2Result(async () => {
const file = await this.getFilePath(url);
this.logger.Debug().Str("url", url.toString()).Str("file", file).Msg("put");
await this.fs.writefile(file, body);
await this.fs.writefile(file, payload);
});
}

Expand All @@ -90,6 +99,9 @@ export class FileGateway implements Gateway {
try {
const res = await this.fs.readfile(file);
this.logger.Debug().Url(url.asURL()).Str("file", file).Msg("get");
if (url.getParam("store") === "data") {
return FPMsg2Car(res);
}
return Result.Ok(new Uint8Array(res));
} catch (e: unknown) {
// this.logger.Error().Err(e).Str("file", file).Msg("get");
Expand Down
56 changes: 56 additions & 0 deletions tests/blockstore/fp-envelope.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import { encode } from "cborg";
import { bs, Result } from '@fireproof/core'
import { CID } from "multiformats";

it("unknown bytes", () => {
expect(bs.FPMsgMatch2Envelope(Uint8Array.from([1, 2, 3]), "bla").Err().message).toStrictEqual(
"failed to decode envelope: Error: CBOR decode error: too many terminals, data makes no sense"
);
});

it("unknown type", () => {
expect(bs.FPMsgMatch2Envelope(encode({ type: "blax", payload: 4 }), "bla"))
.toStrictEqual(Result.Err("expected type to be bla"));
})

it("no type", () => {
expect(bs.FPMsgMatch2Envelope(encode({ type: "blax", payload: 4 })))
.toStrictEqual(Result.Ok({ type: "blax", payload: 4 }));
})

it("car type", () => {
expect(bs.FPMsg2Car(bs.Car2FPMsg(Uint8Array.from([1, 2, 3]))).Ok()).toStrictEqual(Uint8Array.from([1, 2, 3]));
})

it("file type", () => {
expect(bs.FPMsg2File(bs.File2FPMsg(Uint8Array.from([1, 2, 3]))).Ok()).toStrictEqual(Uint8Array.from([1, 2, 3]));
})

it("meta type", () => {
const ref = {
cid: "CID",
data: Uint8Array.from([1, 2, 3]),
parents: ["C1", "C2"]
};
expect(bs.FPMsg2Meta(bs.Meta2FPMsg(ref)).Ok()).toStrictEqual(ref);
})

it("wal type", () => {
const ref: bs.FPWAL = {
operations: [
{
cars: [
CID.parse("bag4yvqabciqdzvfxrxfi6feubspyz666zegmp3z5w556mr4ykya2kkdm22r7pyy")
]
},
{
cars: [
CID.parse("bag4yvqabciqd2ul2tw4mdcpvfq2pdqhvnqp2ktuyrtcl3j3gwhxbjzjt62xzeaq")
]
}
]
};
const res = bs.FPMsg2WAL(bs.WAL2FPMsg(ref)).Ok();
expect(res).toStrictEqual(ref);
expect(res.operations[0].cars[0].version).toStrictEqual(1);
})
Loading