Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reuse the TCP socket and buffer IO to improve performance #32

Merged
merged 30 commits into from
Mar 13, 2023

Conversation

fonsp
Copy link
Member

@fonsp fonsp commented Jan 24, 2023

Before this PR, we set up a new TCP connection for each round trip (like remotecall plus its response), and we suspected that this caused #24. This PR uses the same functionality as before, but it reuses the TCP socket. Often, this meant wrapping things in while true end.

Strangely enough, this did not improve our overhead... it doubled it 😵‍💫

Buffered IO

The real trick was suggested in the Pluto community call today by @habemus-papadum, we need to buffer the IO! See https://juliapluto.github.io/weekly-call-notes/2023/01-24/notes.html

I still want to test if this PR was even necessary, perhaps buffering the IO is already enough, and the overhead of TCP handshake is small enough.

New messaging protocol

I also changed the messaging protocol into something new, described in the new messages.md file. The changes are because:

  • Messages now need an ID, to address the response.
  • Just like Distributed, we first send the msg_type, then the msg_id, and then we serialize the msg itself. If deserializing fails (e.g. type not defined in worker), then we still have the msg_id, so that we can respond with a failure message.
  • (TODO) Just like Distributed, each message is followed by a 10-byte unique boundary. When deserializing a message fails, the catch block can keep reading bytes until it has found the boundary. This allows us to continue using the socket.
  • Using tuples or individual write calls instead of NamedTuples is a tiny performance gain.

TODO

  • Reuse socket
  • System to keep track of which response belongs to which request
  • remotecall
  • remote_do
  • Implement a message boundary like Distributed
  • interrupt
  • stop, kill
  • Get overhead down to <1ms (thanks @habemus-papadum !!)
  • Fix performance loss when sending large data. This problem appeared when I enabled buffered IO. We might need to buffer the IO in chunks to avoid doing a big allocation, just like https://github.com/JuliaIO/BufferedStreams.jl (actually using that package on the worker process might be tricky because of package environments an relocatability).
  • Remove my implementation of BufferedIO. 🥲
  • remote channels
  • I think the latest task, to interrupt on Windows, is not working right now, because we process all messages synchronously. This means that the worker can only receive the from_host_interrupt message after it processed all previous work, so there is nothing to interrupt.
  • Do we need a reconnecting system? I think Distributed does not have it. What happens when you close your laptop/hibernate?
  • Make all tests green
  • What to do with interruptexceptions from the REPL? They can land in the middle of the read loop and break things. Not sure where to try-catch them and where not.
  • zeros(UInt8, 50_000_000) does not pass our benchmark tests before Julia 1.8: it is 1.5x slower than Distributed stdlib. Also see Reuse the TCP socket and buffer IO to improve performance #32 (comment)
  • Reuse the TCP socket and buffer IO to improve performance #32 (comment)

@fonsp fonsp linked an issue Jan 24, 2023 that may be closed by this pull request
@fonsp fonsp changed the title Reuse the TCP socket to improve performance Reuse the TCP socket and buffer IO to improve performance Jan 25, 2023
@fonsp fonsp requested a review from savq January 25, 2023 18:25
@savq
Copy link
Member

savq commented Jan 25, 2023

wrt. tuple messages:

I think it's a good idea to stop using NamedTuples. These carry symbols in their types, and symbols don't have a fixed size making them less efficient to serialize.

The same is true for the header. I think we could use an Enum for the header. That would also let us encode more info in the header (like send_result?). Nvm. Is that what MsgType does?

@fonsp
Copy link
Member Author

fonsp commented Jan 27, 2023

I made a type BufferedIO, which is a minimal version of the functionality in https://github.com/JuliaIO/BufferedStreams.jl, and after lots of tweaking, I got it to match the speed of Distributed in all test cases 🎉

Then I was wondering how Distributed got this speed, and I found out that Julia has this secret function:

https://github.com/JuliaLang/julia/blob/d77575063d7ecb5e79ee6fc9f3b73242f8282853/base/stream.jl#L1137-L1143

You can call this on your IO to enable this functionality directly with libuv. So my implementation is not necessary. 🙃 It's probably best to use the built-in version because I am not 100% confident in my implementation.

I think it's best to use a @static feature check to see if we can use this function, and fall back to just doing nothing, since this is only a performance optimisation, that will be caught by our tests if a future Julia version changes the API.

@habemus-papadum
Copy link
Contributor

HI @fonsp,
I was just going to comment that your BufferedIO looks good -- much better than the temp solution we discussed of allocating an IOBuffer. I think also better than PipedBuffer -- it is simpler and a more typical implementation of bufferedoutput with a bound memory consumption and no-allocation per message.

  • nehal

@fonsp
Copy link
Member Author

fonsp commented Jan 27, 2023

Ooh thanks @habemus-papadum , that's nice to hear! I had a fun time writing it, and I felt good to reach the same performance as Distributed, so I'm not sad to find out that Base.buffered_writes exists :)

What do you think about using Base.buffered_writes instead of BufferedIO, like I did in my previous commit?

@habemus-papadum
Copy link
Contributor

@fonsp Your last commit which removes BufferedIO and uses Base.buffered_writes looks good (I initially misread the source for Base.buffered_writes and thought it was allocating on each write -- but that would make no sense).

@habemus-papadum
Copy link
Contributor

I was also going to suggest Sockets.nagle(socket, false) but that looks like it is already there -- if you are curious what all this is I can give a quick explanation (or wikipedia) next session.

Doing all of this in a robust cross platform way can be hard. Libuv provides a lot of the hard work, but you may end uo finding adding an extra layer like https://nng.nanomsg.org/ may help avoid a lot complexity/re-inventing the wheel. I don't think it makes sense to worry about this now, but you can always ping me if you get curious and want a high level description.

src/worker.jl Outdated Show resolved Hide resolved
@fonsp fonsp closed this Jan 27, 2023
@fonsp fonsp reopened this Jan 27, 2023
@fonsp
Copy link
Member Author

fonsp commented Jan 27, 2023

whoops

@fonsp
Copy link
Member Author

fonsp commented Jan 27, 2023

I also implemented remote channels, and for the implementation, I cheated and looked at the Distributed implementation. Theirs was surprisingly easy: they don't have any special communication dedicated to channels, it is just built with simple remote_call commands.

The main difference is in "eagerness": in Distributed, channel values are not sent to the host as they come in, but as they are requested with take!. You can see this like so:

julia> using Distributed

julia> p = Distributed.addprocs(1)
1-element Vector{Int64}:
 2

julia> c = Distributed.RemoteChannel(() -> eval(:(c = Channel{Any}(3))), 2)
RemoteChannel{Channel{Any}}(2, 1, 8)

julia> Distributed.remotecall_eval(Main, 2, :(put!(c, 123)))
123

julia> Distributed.remotecall_eval(Main, 2, :(put!(c, 99)))
99

julia> Distributed.remotecall_eval(Main, 2, :(isready(c)))
true

julia> take!(c)
123

julia> Distributed.remotecall_eval(Main, 2, :(isready(c)))
true

julia> take!(c)
99

julia> Distributed.remotecall_eval(Main, 2, :(isready(c)))
false

This also affects the blocking behaviour: if a channel is full on the worker side, it won't empty on its own, it needs the host to take! an item from it.

julia> Distributed.remotecall_eval(Main, 2, :(put!(c, 1)))
1

julia> Distributed.remotecall_eval(Main, 2, :(put!(c, 2)))
2

julia> Distributed.remotecall_eval(Main, 2, :(put!(c, 3)))
3

julia> Distributed.remotecall_eval(Main, 2, :(put!(c, 4)))
^C^C^C^C^CWARNING: Force throwing a SIGINT
ERROR: InterruptException:

In the Distributed code, you can see that their implementation is similar to mine

One problem with my implementation might be performance: I used remote_eval_fetch instead of remote_call_fetch, which means that a bit more data needs to be sent to the worker, and it adds the overhead of Core.eval. But we can fix this in a future PR after #30

@fonsp
Copy link
Member Author

fonsp commented Mar 7, 2023

This PR is almost ready! 🎉 The only thing left is:

zeros(UInt8, 50_000_000) does not pass our benchmark tests before Julia 1.8: it is 1.5x slower than Distributed stdlib. Also see #32 (comment)

@habemus-papadum or @Pangoraw Maybe you could take a look? My questions are:

  • In the current implementation, why do we have a slowdown compared to Distributed stdlib?
  • Why is it much worse on the GHA runners for MacOS (but not on my MacOS computer)?
  • Disabling buffering seems to give a huge speedup for this benchmark, 4x faster than Distributed stdlib. Is there a possibility to get the best of both worlds?

@habemus-papadum
Copy link
Contributor

habemus-papadum commented Mar 7, 2023

@fonsp
fyi, I'm also having trouble recreating the zero array regression on my local mac:

Precompiling project...
  2 dependencies successfully precompiled in 1 seconds. 9 already precompiled.
     Testing Running tests...
Test Summary:     | Pass  Total  Time
Worker management |    2      2  1.2s
Test Summary:        | Pass  Total  Time
Evaluating functions |    2      2  0.6s
Test Summary:          | Pass  Total  Time
Evaluating expressions |    2      2  0.7s
Test Summary:   | Pass  Total  Time
Worker channels |   41     41  0.8s

signal (15): Terminated: 15
in expression starting at /Users/nehal/src/Malt.jl/src/worker.jl:163
fatal: error thrown and no exception handler available.
InterruptException()
_ZNK5dyld311MachOLoaded17findClosestSymbolEyPPKcPy at /usr/lib/dyld (unknown line)
Test Summary: | Pass  Total  Time
Signals       |    2      2  0.4s
Test Summary: | Pass  Total  Time
Exceptions    |   11     11  0.8s
┌ Info: Expr 1
│   ratio = 0.83745638251121
│   t1 = 0.021015375
└   t2 = 0.025094292
┌ Info: Expr 2
│   ratio = 1.002852464877677
│   t1 = 0.011382208
└   t2 = 0.011349833
┌ Info: Expr 3
│   ratio = 1.1188505361538272
│   t1 = 0.012433833
└   t2 = 0.011113042
┌ Info: Expr 4
│   ratio = 1.0159279171046702
│   t1 = 0.021986458
└   t2 = 0.02164175
┌ Info: Expr 5
│   ratio = 0.7646539534814127
│   t1 = 0.000102209
└   t2 = 0.000133667
Test Summary: | Pass  Total   Time
Benchmark     |   11     11  27.6s
┌ Info: Launch benchmark
│   ratio = 0.4352724189643513
│   t1 = 0.569716625
└   t2 = 1.308873708
Test Summary:    | Pass  Total   Time
Benchmark launch |    1      1  30.9s
     Testing Malt tests passed

(Malt) pkg> ^C

julia> versioninfo()
Julia Version 1.8.5
Commit 17cfb8e65ea (2023-01-08 06:45 UTC)
Platform Info:
  OS: macOS (arm64-apple-darwin21.5.0)
  CPU: 10 × Apple M1 Max
  WORD_SIZE: 64
  LIBM: libopenlibm
  LLVM: libLLVM-13.0.1 (ORCJIT, apple-m1)
  Threads: 10 on 8 virtual cores
Environment:
  JULIA_NUM_THREADS = 10

commit 0828bb40172031f7bd469cfbb03d895fd2fed95f (HEAD -> reuse-connection, origin/reuse-connection)
Author: Fons van der Plas <[email protected]>
Date:   Tue Mar 7 13:03:08 2023 +0100

    Reset custom value for BUFFER_SIZE?

@habemus-papadum
Copy link
Contributor

actually, I see there is a slow down, but not as bad on GHA -- will look closer

@fonsp
Copy link
Member Author

fonsp commented Mar 7, 2023

Thanks for investigating Nehal! If this turns out to be a blocker, then we could also try switching to a different IPC method already (#32 (comment)) rather than solving this TCP issue, since you said that that might be relatively easy to get going.

@habemus-papadum
Copy link
Contributor

I'll be around at 12noon EST if the dev call is happening -- changing from TCP sockets to Unix Domain Sockets should be very quick (something as easy as make port a unique string per worker rather than an uint16 and just pass that to both listen and connect) -- we could try together if you like.

@fonsp
Copy link
Member Author

fonsp commented Mar 7, 2023

Nice! Let's discuss during the dev call

@fonsp
Copy link
Member Author

fonsp commented Mar 7, 2023

It looks like bc8d408 added the overhead on benchmark 3 :o Let's experiment with reverting that!

@habemus-papadum
Copy link
Contributor

@habemus-papadum note to myself: try fifo, try removing serialization.

@fonsp
Copy link
Member Author

fonsp commented Mar 7, 2023

bc8d408 indeed caused the extra overhead in Expr 3. Right now we match Distributed in every benchmark!

@fonsp
Copy link
Member Author

fonsp commented Mar 7, 2023

On Julia 1.9, we are 4x faster with Malt on Expr 3. It looks like this is because Julia 1.9 made this benchmark 4x slower, compared to Julia 1.8:

Julia 1.8

┌ Info: Expr 1
│   ratio = 0.862758185136152
│   t1 = 0.022380666
└   t2 = 0.025940833
┌ Info: Expr 2
│   ratio = 1.0021676384106104
│   t1 = 0.011789917
└   t2 = 0.011764416
┌ Info: Expr 3
│   ratio = 1.0783677142842505
│   t1 = 0.011366041
└   t2 = 0.010540042
┌ Info: Expr 4
│   ratio = 1.0029839208653089
│   t1 = 0.023025209
└   t2 = 0.022956708
┌ Info: Expr 5
│   ratio = 0.9820832894413427
│   t1 = 0.000111875
└   t2 = 0.000113916
Test Summary: | Pass  Total   Time
Benchmark     |   11     11  29.4s

Julia 1.9

┌ Info: Expr 1
│   ratio = 0.8308254275894384
│   t1 = 0.020205917
└   t2 = 0.024320292
┌ Info: Expr 2
│   ratio = 1.0272232917058837
│   t1 = 0.012090375
└   t2 = 0.011769958
┌ Info: Expr 3
│   ratio = 0.2414768445700603
│   t1 = 0.011983208
└   t2 = 0.049624667
┌ Info: Expr 4
│   ratio = 0.9914541831507528
│   t1 = 0.025499458
└   t2 = 0.02571925
┌ Info: Expr 5
│   ratio = 0.9822328618063112
│   t1 = 0.000112834
└   t2 = 0.000114875
Test Summary: | Pass  Total   Time
Benchmark     |   11     11  29.3s

Reported this in JuliaLang/julia#48938

@fonsp
Copy link
Member Author

fonsp commented Mar 7, 2023

Now we have #32 (comment) again 😵‍💫

@Pangoraw
Copy link
Member

Pangoraw commented Mar 7, 2023

bc8d408 indeed caused the extra overhead in Expr 3. Right now we match Distributed in every benchmark!

I don't really understand what could be causing this overhead. serialize(io::IO, a) calls to serialize(Serializer(io), a) and the same goes for deserialize(io::IO) 🤔

@fonsp
Copy link
Member Author

fonsp commented Mar 9, 2023

Yeah that's weird... maybe it's a more Julian performance issue due to typing and optimization etc? bc8d408 also fixed #32 (comment) , right? Maybe we can see how the Julia devs solve JuliaLang/julia#48938 and learn something from that in case it's related?

@fonsp
Copy link
Member Author

fonsp commented Mar 13, 2023

I wrote a simple workaround for JuliaLang/julia#48938 🙂: replaced while true with for i in Iterators.countfrom(1)

@fonsp
Copy link
Member Author

fonsp commented Mar 13, 2023

Awesome! Only thing left is the small slowdown on Expr 3 on Windows nightly... hmmmmmmm we can just ignore it?

@fonsp fonsp marked this pull request as ready for review March 13, 2023 15:19
@fonsp
Copy link
Member Author

fonsp commented Mar 13, 2023

Going to merge this! I will leave the last performance issue for a separate PR

@fonsp fonsp merged commit 1bdd409 into main Mar 13, 2023
@fonsp fonsp deleted the reuse-connection branch March 13, 2023 17:06
@fonsp
Copy link
Member Author

fonsp commented Mar 13, 2023

Heyy tests passed on main after merging 😌

Pangoraw added a commit that referenced this pull request Mar 15, 2023
this worker just runs in the same process.

Fixes #27.

> **Note**
> This targets the branch for PR #32.
@fonsp fonsp mentioned this pull request Jun 3, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Reduce overhead to <1ms
4 participants