Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use the same code path for local and remote syncs #572

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 7 additions & 14 deletions src/copy.ml
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ let rec fingerprintPrefix fspath path offset len accu =
end

let fingerprintPrefixRemotely =
Remote.registerServerCmd
Remote.registerRemoteCmd
"fingerprintSubfile"
Umarshal.(prod3 Fspath.m Path.mlocal Uutil.Filesize.m id id)
Umarshal.(list Fingerprint.m)
Expand Down Expand Up @@ -382,7 +382,7 @@ let loadPropsExtDataLocal (fspath, path, desc) =
| `Global p -> Update.translatePathLocal fspath p in
(Some localPath, Props.loadExtData fspath localPath desc)

let loadPropsExtDataOnServer = Remote.registerServerCmd "propsExtData"
let loadPropsExtDataOnServer = Remote.registerRemoteCmd "propsExtData"
Umarshal.(prod3 Fspath.m mxpath Props.m id id)
Umarshal.(prod2 (option Path.mlocal) Props.mx id id)
(fun connFrom args -> Lwt.return (loadPropsExtDataLocal args))
Expand Down Expand Up @@ -638,7 +638,7 @@ let convV0 = Remote.makeConvV0FunArg
((biOpt, fspathFrom, pathFrom, fileKind), (sizeFrom, id, file_id)))

let compressRemotely =
Remote.registerServerCmd "compress" ~convV0 mcompress Umarshal.unit compress
Remote.registerRemoteCmd "compress" ~convV0 mcompress Umarshal.unit compress

let close_all infd outfd =
Util.convertUnixErrorsToTransient
Expand Down Expand Up @@ -1181,17 +1181,10 @@ let file rootFrom pathFrom rootTo fspathTo pathTo realPathTo
(Fspath.toDebugString fspathTo) (Path.toString pathTo)
(Props.toString desc));
let timer = Trace.startTimer "Transmitting file" in
begin match rootFrom, rootTo with
(Common.Local, fspathFrom), (Common.Local, realFspathTo) ->
localFile
fspathFrom pathFrom fspathTo pathTo realPathTo
update desc (Osx.ressLength ress) (Some id);
paranoidCheck fspathTo pathTo realPathTo desc fp ress
| _ ->
transferFile
rootFrom pathFrom rootTo fspathTo pathTo realPathTo
update desc fp ress id
end >>= fun status ->
transferFile
rootFrom pathFrom rootTo fspathTo pathTo realPathTo
update desc fp ress id
>>= fun status ->
Trace.showTimer timer;
match status with
TransferSucceeded info ->
Expand Down
46 changes: 34 additions & 12 deletions src/remote.ml
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,9 @@ let connectionIO conn =
let setConnectionVersion conn ver =
conn.version <- ver

let connectionVersion conn = conn.version
let connectionVersion = function
| None -> rpcDefaultVersion
| Some conn -> conn.version

let connEq conn conn' =
conn.inputBuffer.channel = conn'.inputBuffer.channel
Expand Down Expand Up @@ -596,7 +598,12 @@ module ClientConn = struct

end (* module ClientConn *)

let connectionOfRoot root = ClientConn.ofRoot root
let connectionOfRoot root =
(* This is not the same as [ClientConn.ofRootOpt]. We want this function
to fail when the remote connection is not found. *)
match root with
| (Common.Local, _) -> None
| (Common.Remote _ , _) -> Some (ClientConn.ofRoot root)

(****)

Expand Down Expand Up @@ -1141,6 +1148,17 @@ let registerServerCmd name ?(convV0=convV0_id_pair) mArg mRet f =
name (defaultMarshalingFunctions (fst convV0) mArg)
(defaultMarshalingFunctions (snd convV0) mRet) f

(* Same as [registerServerCmd] but returns a function that runs either
the proxy or the local version, depending on whether the call is to
the local host (in this case [conn] is None) or a remote one. *)
let registerRemoteCmd name ?convV0 mArg mRet f =
let serverSide = (fun conn args -> f (Some conn) args) in
let client0 = registerServerCmd name ?convV0 mArg mRet serverSide in
fun conn args ->
match conn with
| None -> f None args
| Some conn -> client0 conn args

(* RegisterHostCmd is a simpler version of registerClientServer [registerServerCmd?].
It is used to create remote procedure calls: the only communication
between the client and server is the sending of arguments from
Expand Down Expand Up @@ -1174,16 +1192,15 @@ let registerRootCmd (cmdName : string)
fun root args -> r root ((snd root), args)

let registerRootCmdWithConnection (cmdName : string)
?(convV0=convV0_id_pair) mArg mRet (cmd : connection -> 'a -> 'b) =
let client0 = registerServerCmd cmdName ~convV0 mArg mRet cmd in
?(convV0=convV0_id_pair) mArg mRet (cmd : connection option -> 'a -> 'b) =
let serverSide = (fun conn args -> cmd (Some conn) args) in
let client0 = registerServerCmd cmdName ~convV0 mArg mRet serverSide in
(* Return a function that runs either the proxy or the local version,
depending on whether the call is to the local host or a remote one *)
fun localRoot remoteRoot args ->
match (fst localRoot) with
| Common.Local -> let conn = ClientConn.ofRoot remoteRoot in
cmd conn args
| _ -> let conn = ClientConn.ofRoot localRoot in
client0 conn args
| Common.Local -> cmd (connectionOfRoot remoteRoot) args
| _ -> client0 (ClientConn.ofRoot localRoot) args

let streamReg = lwtRegionWithConnCleanup 1

Expand All @@ -1199,13 +1216,13 @@ let streamingActivated =
let registerStreamCmd
(cmdName : string)
marshalingFunctionsArgs
(serverSide : connection -> 'a -> unit)
(serverSide : connection option -> 'a -> unit)
=
let cmd =
registerSpecialServerCmd
cmdName marshalingFunctionsArgs
(defaultMarshalingFunctions convV0_id Umarshal.unit)
(fun conn v -> serverSide conn v; Lwt.return ())
(fun conn v -> serverSide (Some conn) v; Lwt.return ())
in
let ping =
registerServerCmd (cmdName ^ "Ping") Umarshal.int Umarshal.unit
Expand All @@ -1227,7 +1244,7 @@ let registerStreamCmd
(* Create a server function and remember it *)
let server conn buf =
let args = unmarshalArgs conn buf in
serverSide conn args
serverSide (Some conn) args
in
serverStreams := Util.StringMap.add cmdName server !serverStreams;
(* Create a client function and return it *)
Expand All @@ -1240,7 +1257,7 @@ let registerStreamCmd
in
dumpIdle conn request
in
fun conn sender ->
let proxy conn sender =
if not (Prefs.read streamingActivated) then
sender (fun v -> cmd conn v)
else begin
Expand All @@ -1259,6 +1276,11 @@ let registerStreamCmd
end else
Lwt.fail e)
end
in
fun conn sender ->
match conn with
| None -> sender (fun v -> Lwt.return (serverSide conn v))
| Some conn -> proxy conn sender

let commandAvailable =
registerRootCmd "commandAvailable" Umarshal.string Umarshal.bool
Expand Down
19 changes: 11 additions & 8 deletions src/remote.mli
Original file line number Diff line number Diff line change
Expand Up @@ -126,15 +126,18 @@ module MsgIdMap : Map.S with type key = msgId
val newMsgId : unit -> msgId

type connection
val connectionVersion : connection -> int
val connectionOfRoot : Common.root -> connection
val connectionVersion : connection option -> int
val connectionOfRoot : Common.root -> connection option
(* [connectionOfRoot] is None for a local root,
Some connection for a remote root,
raises if a remote connection is not found. *)

val registerServerCmd :
val registerRemoteCmd :
string
-> ?convV0: 'a convV0Fun * 'b convV0Fun
-> 'a Umarshal.t -> 'b Umarshal.t
-> (connection -> 'a -> 'b Lwt.t)
-> connection -> 'a -> 'b Lwt.t
-> (connection option -> 'a -> 'b Lwt.t)
-> connection option -> 'a -> 'b Lwt.t
val intSize : int
val encodeInt : int -> Bytearray.t * int * int
val decodeInt : Bytearray.t -> int -> int
Expand All @@ -144,7 +147,7 @@ val registerRootCmdWithConnection :
(* 2.51-compatibility functions for args
and result *)
-> 'a Umarshal.t -> 'b Umarshal.t
-> (connection -> 'a -> 'b Lwt.t) (* local command *)
-> (connection option -> 'a -> 'b Lwt.t) (* local command *)
-> Common.root (* root on which the command is executed *)
-> Common.root (* other root *)
-> 'a (* additional arguments *)
Expand All @@ -157,8 +160,8 @@ val registerStreamCmd :
(connection -> 'a ->
(Bytearray.t * int * int) list -> (Bytearray.t * int * int) list * int) *
(connection -> Bytearray.t -> int -> 'a) ->
(connection -> 'a -> unit) ->
connection -> (('a -> unit Lwt.t) -> 'b Lwt.t) -> 'b Lwt.t
(connection option -> 'a -> unit) ->
connection option -> (('a -> unit Lwt.t) -> 'b Lwt.t) -> 'b Lwt.t

(* Register a function to be run when the connection between client and server
is closed (willingly or unexpectedly). The function should not raise
Expand Down
Loading