Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x: merged gar1t's fork #47

Open
wants to merge 27 commits into
base: 3.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
1bfac14
Ignoring all generated html by default
Aug 7, 2011
cdf4408
Queue device in user space (used in examples)
Aug 7, 2011
c0587bb
Utility module (used in examples)
Aug 7, 2011
dc29b01
Coerce rcvmore sockopt to boolean
Aug 7, 2011
1738937
Missing docs reference
Aug 7, 2011
0493c1d
Merge remote-tracking branch 'zeromq/master'
Dec 20, 2011
912ce48
Using zeromq 2.1.10
Dec 20, 2011
c3acc99
zmq record
Mar 8, 2012
74dc962
Helper to work with multipart messages
Mar 8, 2012
e92777c
Merge branch 'master' of git://github.com/zeromq/erlzmq2 into zeromq
Mar 8, 2012
4b20827
Merge branch 'zeromq'
Mar 8, 2012
f63704e
Merged version conflict
Mar 8, 2012
170aa66
Helper for getting msg parts for active socket
Mar 20, 2012
5ae218b
Helper for receiving parts
Mar 23, 2012
6185bc6
Merge branch 'master' of github.com:gar1t/erlzmq2
Mar 23, 2012
597facb
Use zeromq2-x repo
chrisavl Apr 4, 2012
a3c5720
Merge pull request #37 from chrisavl/master
yrashk Apr 4, 2012
063f200
Merge branch 'master' of git://github.com/zeromq/erlzmq2 into zeromq
Apr 4, 2012
34013a3
Merge branch 'zeromq'
Apr 4, 2012
9ddf930
merged gar1t/erlzmq2 (needed for zguide)
rakvat Jan 20, 2013
3e0d100
adapted tests to gar1t's coerced getsockopt
rakvat Jan 20, 2013
7822b05
added test for erlzmq_device with router and dealer
rakvat Jan 20, 2013
fffb10c
subscribe test
rakvat Jan 21, 2013
881859c
fixed active in subscribe test, removed 'active, false' which is default
rakvat Jan 21, 2013
58ef1e1
proxy as an alternative device for non active sockets
rakvat Jan 24, 2013
f9f7282
using proxy in xpubxsub test
rakvat Jan 24, 2013
a9dc7fc
xpubxsub subscription test with proxy
rakvat Jan 24, 2013
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
ebin/*.app
ebin/*.beam
test/*.beam
priv/*.so
c_src/*.o
deps
Expand All @@ -8,9 +9,9 @@ perf/*.beam
.eunit
doc/edoc-info
doc/erlang.png
doc/ezmq.html
doc/index.html
doc/overview-summary.html
doc/packages-frame.html
doc/*.html
doc/stylesheet.css
doc/modules-frame.html
*~
*.swa
*.swp
*.swo
1 change: 1 addition & 0 deletions include/erlzmq.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,4 @@
%% Possible option values for {@link erlzmq:setsockopt/3. setsockopt/3}.
-type erlzmq_sockopt_value() :: integer() | iolist().

-record(zmq, {socket, data, flags}).
9 changes: 8 additions & 1 deletion src/erlzmq.erl
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,11 @@ setsockopt({I, Socket}, Name, Value) when is_integer(I), is_atom(Name) ->
{ok, erlzmq_sockopt_value()} |
erlzmq_error().
getsockopt({I, Socket}, Name) when is_integer(I), is_atom(Name) ->
erlzmq_nif:getsockopt(Socket, option_name(Name)).
coerce_sockopt(Name, erlzmq_nif:getsockopt(Socket, option_name(Name))).

coerce_sockopt(rcvmore, {ok, 0}) -> {ok, false};
coerce_sockopt(rcvmore, {ok, 1}) -> {ok, true};
coerce_sockopt(_Other, Val) -> Val.

%% @equiv close(Socket, infinity)
-spec close(Socket :: erlzmq_socket()) ->
Expand Down Expand Up @@ -368,6 +372,9 @@ term(Context, Timeout) ->
end.

%% @doc Returns the 0MQ library version.
%% <br />
%% <i>For more information see
%% <a href="http://api.zeromq.org/master:zmq_version">zmq_version</a>.</i>
%% @end
-spec version() -> {integer(), integer(), integer()}.

Expand Down
60 changes: 60 additions & 0 deletions src/erlzmq_device.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
-module(erlzmq_device).

-export([queue/2]).

-import(proplists, [get_bool/2]).

%%--------------------------------------------------------------------
%% @doc A queue device implemented in Erlang.
%%
%% Frontend and Backend must be sockets in active mode.
%%
%% This function will not return.
%%
%% @spec queue(Frontend, Backend) -> any()
%% Frontend = erlzmq_socket()
%% Backend = erlzmq_socket()
%% @end
%%--------------------------------------------------------------------

queue(Frontend, Backend) ->
receive
{zmq, Frontend, Msg, Flags} ->
Parts = lists:reverse(queue_recv_acc(Frontend, Flags, [Msg])),
queue_send(Backend, Parts),
queue(Frontend, Backend);
{zmq, Backend, Msg, Flags} ->
Parts = lists:reverse(queue_recv_acc(Backend, Flags, [Msg])),
queue_send(Frontend, Parts),
queue(Frontend, Backend);
{shutdown} ->
ok
end.

%%--------------------------------------------------------------------
%% @doc Accumulates messages from Socket.
%% @spec queue_recv_acc(Socket, Flags, Acc0) -> Acc
%% @end
%%--------------------------------------------------------------------

queue_recv_acc(Socket, Flags0, Acc) ->
case get_bool(rcvmore, Flags0) of
true ->
receive
{zmq, Socket, Msg, Flags} ->
queue_recv_acc(Socket, Flags, [Msg|Acc])
end;
false -> Acc
end.

%%--------------------------------------------------------------------
%% @doc Sends a multipart message to Out.
%% @spec queue_send(erlzmq_socket(), Parts) -> ok
%% @end
%%--------------------------------------------------------------------

queue_send(Out, [LastPart]) ->
ok = erlzmq:send(Out, LastPart);
queue_send(Out, [Part|Rest]) ->
ok = erlzmq:send(Out, Part, [sndmore]),
queue_send(Out, Rest).
26 changes: 26 additions & 0 deletions src/erlzmq_parts.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
-module(erlzmq_parts).

-export([new/0, part_in/2]).

new() -> [].

part_in({zmq, Socket, Part, [rcvmore]}, Parts) ->
{rcvmore, add_part(Socket, Part, Parts)};
part_in({zmq, Socket, Part, []}, Parts) ->
{SocketParts, NewParts} =
find_socket_parts(Socket, add_part(Socket, Part, Parts)),
{msg, Socket, SocketParts, NewParts}.

add_part(Socket, Part, Parts) ->
[{Socket, Part}|Parts].

find_socket_parts(Socket, Parts) ->
find_socket_parts_acc(Socket, Parts, [], []).

find_socket_parts_acc(_Socket, [], SocketParts, NewParts) ->
{SocketParts, lists:reverse(NewParts)};
find_socket_parts_acc(Socket, [{Socket, Part}|Rest], SocketParts, NewParts) ->
find_socket_parts_acc(Socket, Rest, [Part|SocketParts], NewParts);
find_socket_parts_acc(Socket, [OtherPart|Rest], SocketParts, NewParts) ->
find_socket_parts_acc(Socket, Rest, SocketParts, [OtherPart|NewParts]).

40 changes: 40 additions & 0 deletions src/erlzmq_proxy.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
-module(erlzmq_proxy).
-export([create/2]).


%%---------------------------------------------------
%% @doc A proxy implemented in Erlang.
%%
%% Frontend and Backend must be sockets in default,
%% i.e. non-active, mode
%%
%% @spec create(Frontend, Backend) -> any()
%% Frontend = erlzmq_socket()
%% Backend = erlzmq_socket()
%% @end
%%---------------------------------------------------

create(Frontend, Backend) ->
case erlzmq:recv(Backend, [dontwait]) of
{error, eagain} -> ok;
{ok, MsgB} ->
case erlzmq:getsockopt(Backend, rcvmore) of
{ok, true} -> erlzmq:send(Frontend, MsgB, [sndmore]);
{ok, false} -> erlzmq:send(Frontend, MsgB)
end
end,
case erlzmq:recv(Frontend, [dontwait]) of
{error, eagain} -> ok;
{ok, MsgF} ->
case erlzmq:getsockopt(Frontend, rcvmore) of
{ok, true} -> erlzmq:send(Backend, MsgF, [sndmore]);
{ok, false} -> erlzmq:send(Backend, MsgF)
end
end,
receive
shutdown ->
ok
after
0 ->
create(Frontend, Backend)
end.
99 changes: 99 additions & 0 deletions src/erlzmq_util.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
-module(erlzmq_util).

-export([dump/1, recv_parts/1, recv_parts/2]).

%%--------------------------------------------------------------------
%% @doc Reads available messages from Socket, printing them to stdout.
%% @spec dump(Socket) -> any()
%% Socket = erlzmq_socket()
%% @end
%%--------------------------------------------------------------------

dump(Socket) ->
{ok, Msg} = erlzmq:recv(Socket),
io:format("----------------------------------------~n"),
dump_msg(Msg, Socket).

%%--------------------------------------------------------------------
%% @doc Receives message parts for Socket, returning them as a list.
%% @equiv recv_parts(Socket, infinity)
%% @end
%%--------------------------------------------------------------------

recv_parts(Socket) ->
recv_parts(Socket, infinity).

%%--------------------------------------------------------------------
%% @doc Receives message parts for Socket, returning them as a list.
%% @spec recv_parts(Socket, Timeout) -> [binary()]
%% Socket = erlzmq_socket()
%% Timeout = integer() | infinity
%% @end
%%--------------------------------------------------------------------

recv_parts(Socket, Timeout) ->
recv_parts(Socket, Timeout, []).

%%--------------------------------------------------------------------
%% @doc Accumulator for recv_parts/2
%% @spec recv_parts(Socket, Timeout, Acc0) -> Acc
%% @end
%%--------------------------------------------------------------------
recv_parts(Socket, Timeout, Acc) ->
receive
{zmq, Socket, Part, []} ->
lists:reverse([Part|Acc]);
{zmq, Socket, Part, [rcvmore]} ->
recv_parts(Socket, Timeout, [Part|Acc])
after
Timeout -> exit({zmq_recv_timeout, Socket})
end.

%%--------------------------------------------------------------------
%% @doc Print a socket message, including subsequent parts.
%% @spec dump_msg(Msg, Socket) -> ok
%% @end
%%--------------------------------------------------------------------

dump_msg(Msg, Socket) ->
io:format("[~3..0B] ", [size(Msg)]),
Str = binary_to_list(Msg),
case io_lib:printable_list(Str) of
true -> io:format(Str);
false -> io:format(bin_to_hex(Msg))
end,
io:format("~n"),
case erlzmq:getsockopt(Socket, rcvmore) of
{ok, true} ->
{ok, Next} = erlzmq:recv(Socket),
dump_msg(Next, Socket);
{ok, false} ->
ok
end.

%%--------------------------------------------------------------------
%% @doc Convert a binary to a hex string.
%% @spec bin_to_hex(binary()) -> list()
%% @end
%%--------------------------------------------------------------------

bin_to_hex(B) when is_binary(B) ->
lists:flatten(lists:map(fun int_to_hex/1, binary_to_list(B))).

%%--------------------------------------------------------------------
%% @doc Convert an int to a two char hex string.
%% @spec int_to_hex(integer()) -> string()
%% @end
%%--------------------------------------------------------------------

int_to_hex(N) when N < 256 ->
[hex(N div 16), hex(N rem 16)].

%%--------------------------------------------------------------------
%% @doc Converts an integer to a hex char.
%% @spec hex(integer()) -> char()
%% @end
%%--------------------------------------------------------------------

hex(N) when N < 10 -> $0 + N;
hex(N) when N >= 10, N < 16 -> $a + (N - 10).
Loading