Skip to content

Commit

Permalink
Removed historical data feature (#178)
Browse files Browse the repository at this point in the history
* updated dockerfile

* updated deps

* revert changes in dockerfile

* fixed deps

* delete lock file

* fixed elixir version

* revert changes from package-lock.json file

* Remove historical publication in pubsub publisher

* removed the test for error

* removed from http worker

* Go for supported version

* revert

---------

Co-authored-by: Jose Bonora Soriano <[email protected]>
  • Loading branch information
awaistkd and jBonoraW authored Sep 18, 2023
1 parent 1d754e8 commit ba0c6d1
Show file tree
Hide file tree
Showing 9 changed files with 32 additions and 300 deletions.
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,7 @@ To start postoffice bundle with docker:
* `CLEAN_MESSAGES_THRESHOLD` defines from what time you want to keep the historical data from the `sent_messages` table (in seconds)
* `CLEAN_MESSAGES_CRONTAB` defines when the Oban cronjob to clean historical data from the `sent_messages` table should be run. Must be a valid crontab declaration.
* `CLUSTER_NAME` defines cluster name to know the source of historical data in pubsub from different clusters
* `PUBSUB_HISTORICAL_TOPIC_NAME` defines the name of pubsub topic to send historical data. Default `postoffice-sent-messages`
* `ENABLE_HISTORICAL_DATA` defines if you want to send the historical data to Pub/Sub. Default `true`


## Clustering
Postoffice has been developed to be used forming a cluster. We use [libcluster](https://github.com/bitwalker/libcluster) under the hood to create the cluster. You can take a look at its documentation in case you want to tune settings.
Expand Down
2 changes: 0 additions & 2 deletions config/releases.exs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ config :postoffice, max_bulk_messages: {:system, "MAX_BULK_MESSAGES", default: 3
config :postoffice, clean_messages_threshold: {:system, "CLEAN_MESSAGES_THRESHOLD", default: "7890000"}
config :postoffice, clean_messages_crontab: {:system, "CLEAN_MESSAGES_CRONTAB", default: "0 12 * * 0"}
config :postoffice, cluster_name: {:system, "CLUSTER_NAME", default: "postoffice"}
config :postoffice, pubsub_historical_topic_name: {:system, "PUBSUB_HISTORICAL_TOPIC_NAME", default: "postoffice-sent-messages"}
config :postoffice, enable_historical_data: {:system, "ENABLE_HISTORICAL_DATA", default: true}

config :postoffice, Postoffice.Repo,
username: {:system, "DB_USERNAME", default: "postgres"},
Expand Down
4 changes: 2 additions & 2 deletions docker/Dockerfile.local
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM elixir
FROM elixir:1.14.1

RUN apt update \
&& apt upgrade -y \
Expand All @@ -24,4 +24,4 @@ RUN mix local.hex --force \

RUN cd assets && npm install

CMD ["mix", "phx.server"]
CMD ["mix", "phx.server"]
29 changes: 0 additions & 29 deletions lib/postoffice/workers/http.ex
Original file line number Diff line number Diff line change
Expand Up @@ -45,23 +45,6 @@ defmodule Postoffice.Workers.Http do
target: target
)

if is_enable_historical_data() do
historical_pubsub_args = %{
"consumer_id" => consumer_id,
"target" => Application.get_env(:postoffice, :pubsub_historical_topic_name),
"payload" => %{
"consumer_id" => consumer_id,
"target" => target,
"type" => "http",
"message_payload" => Map.get(args, "payload"),
"attributes" => attributes,
},
"attributes" => %{"cluster_name" => Application.get_env(:postoffice, :cluster_name)}
}

impl_pubsub().publish(id, historical_pubsub_args)
end

{:ok, :sent}

{:ok, response} ->
Expand All @@ -72,15 +55,6 @@ defmodule Postoffice.Workers.Http do

Logger.error(error_reason, postoffice_message_id: id)

{:ok, _data} =
HistoricalData.create_failed_messages(%{
message_id: message_id,
consumer_id: consumer_id,
payload: historical_payload,
attributes: attributes,
reason: error_reason
})

{:error, :nosent}

{:error, %HTTPoison.Error{reason: reason}} ->
Expand Down Expand Up @@ -121,7 +95,4 @@ defmodule Postoffice.Workers.Http do
Application.get_env(:postoffice, :pubsub_consumer_impl, Postoffice.Adapters.Pubsub)
end

defp is_enable_historical_data do
Application.get_env(:postoffice, :enable_historical_data, true)
end
end
29 changes: 0 additions & 29 deletions lib/postoffice/workers/pubsub.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,45 +32,19 @@ defmodule Postoffice.Workers.Pubsub do
} = args
) do
message_id = id || 0
historical_payload = if not is_list(payload), do: [payload], else: payload

case impl().publish(id, args) do
{:ok, _response = %PublishResponse{}} ->
Logger.info("Successfully sent pubsub message",
target: target
)
if is_enable_historical_data() do
historical_pubsub_args = %{
"consumer_id" => consumer_id,
"target" => Application.get_env(:postoffice, :pubsub_historical_topic_name),
"payload" => %{
"consumer_id" => consumer_id,
"target" => target,
"type" => "pubsub",
"message_payload" => Map.get(args, "payload"),
"attributes" => attributes,
},
"attributes" => %{"cluster_name" => Application.get_env(:postoffice, :cluster_name)}
}

impl().publish(id, historical_pubsub_args)
end

{:ok, :sent}

{:error, error} ->
error_reason = "Error trying to process message from PubsubConsumer: #{error}"
Logger.error(error_reason, postoffice_message_id: id)

{:ok, _data} =
HistoricalData.create_failed_messages(%{
message_id: message_id,
consumer_id: consumer_id,
payload: historical_payload,
attributes: attributes,
reason: error_reason
})

{:error, :nosent}
end
end
Expand All @@ -89,7 +63,4 @@ defmodule Postoffice.Workers.Pubsub do
Application.get_env(:postoffice, :pubsub_consumer_impl, Postoffice.Adapters.Pubsub)
end

defp is_enable_historical_data do
Application.get_env(:postoffice, :enable_historical_data, true)
end
end
58 changes: 29 additions & 29 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -52,36 +52,36 @@ defmodule Postoffice.MixProject do
# Type `mix help deps` for examples and options.
defp deps do
[
{:phoenix, "~> 1.6.15"},
{:phoenix_pubsub, "~> 2.1"},
{:phoenix_ecto, "~> 4.4"},
{:ecto_sql, "~> 3.7.0"},
{:postgrex, ">= 0.16.0"},
{:phoenix_html, "~> 3.2.0"},
{:phoenix_live_view, "~> 0.18"},
{:phoenix_live_dashboard, "~> 0.7"},
{:phoenix_live_reload, "~> 1.4", only: :dev},
{:gettext, "~> 0.20.0"},
{:jason, "~> 1.4"},
{:plug_cowboy, "~> 2.5.2"},
{:bcrypt_elixir, "~> 3.0"},
{:google_api_pub_sub, "~> 0.36.0"},
{:goth, "~> 1.3.1"},
{:httpoison, "~> 1.8"},
{:mox, "~> 1.0", only: :test},
{:gen_stage, "~> 0.14"},
{:ink, "~> 1.2"},
{:config_tuples, "~> 0.4"},
{:libcluster, "~> 3.3"},
{:swarm, "~> 3.4"},
{:excoveralls, "~> 0.15"},
{:floki, "~> 0.34.0", only: :test},
{:hackney, "~> 1.18.1"},
{:cachex, "~> 3.4"},
{:number, "~> 1.0.3"},
{:phoenix, "1.6.15"},
{:phoenix_pubsub, "2.1.1"},
{:phoenix_ecto, "4.4.0"},
{:ecto_sql, "3.7.2"},
{:postgrex, "0.16.5"},
{:phoenix_html, "3.2.0"},
{:phoenix_live_view, "0.18.3"},
{:phoenix_live_dashboard, "0.7.2"},
{:phoenix_live_reload, "1.4.0", only: :dev},
{:gettext, "0.20.0"},
{:jason, "1.4.0"},
{:plug_cowboy, "2.5.2"},
{:bcrypt_elixir, "3.0.1"},
{:google_api_pub_sub, "0.36.0"},
{:goth, "1.3.1"},
{:httpoison, "1.8.2"},
{:mox, "1.0.0", only: :test},
{:gen_stage, "0.14.3"},
{:ink, "1.2.1"},
{:config_tuples, "0.4.2"},
{:libcluster, "3.3.1"},
{:swarm, "3.4.0"},
{:excoveralls, "0.15.1"},
{:floki, "0.34.0", only: :test},
{:hackney, "1.18.1"},
{:cachex, "3.4.0"},
{:number, "1.0.3"},
{:oban, "2.13.5"},
{:prom_ex, "~> 1.7.1"},
{:scrivener_ecto, "~> 2.7"}
{:prom_ex, "1.7.1"},
{:scrivener_ecto, "2.7.0"}
]
end

Expand Down
107 changes: 0 additions & 107 deletions test/postoffice/http_worker_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -44,86 +44,9 @@ defmodule Postoffice.HttpWorkerTest do
{:ok, %HTTPoison.Response{status_code: 201}}
end)

expect(PubsubMock, :publish, fn _, _ ->
{:ok, %PublishResponse{}}
end)

assert {:ok, _sent} = perform_job(HttpWorker, args)
end

test "historical data is created when message is sent" do
topic = Fixtures.create_topic()
publisher = Fixtures.create_publisher(topic)

args = %{
"consumer_id" => publisher.id,
"target" => publisher.target,
"payload" => %{"action" => "test"},
"attributes" => %{"hive_id" => "vlc"}
}

expected_args = %{
"consumer_id" => publisher.id,
"target" => publisher.target,
"payload" => %{"action" => "test", "attributes" => %{"hive_id" => "vlc"}},
"attributes" => %{"hive_id" => "vlc"}
}

expect(HttpMock, :publish, fn _id, ^expected_args ->
{:ok, %HTTPoison.Response{status_code: 201}}
end)

expected_pubsub_args = %{
"consumer_id" => publisher.id,
"target" => "postoffice-sent-messages",
"payload" => %{
"consumer_id" => publisher.id,
"target" => publisher.target,
"type" => publisher.type,
"message_payload" => %{"action" => "test", "attributes" => %{"hive_id" => "vlc"}},
"attributes" => %{"hive_id" => "vlc"},
},
"attributes" => %{"cluster_name" => "vlc"}
}

expect(PubsubMock, :publish, fn _id, ^expected_pubsub_args ->
{:ok, %PublishResponse{}}
end)

perform_job(HttpWorker, args)
assert Kernel.length(HistoricalData.list_sent_messages()) == 0

end

test "historical data is not created when enabled_historical_data is false" do
topic = Fixtures.create_topic()
publisher = Fixtures.create_publisher(topic)

Application.put_env(:postoffice, :enable_historical_data, false)

args = %{
"consumer_id" => publisher.id,
"target" => publisher.target,
"payload" => %{"action" => "test"},
"attributes" => %{"hive_id" => "vlc"}
}

expect(HttpMock, :publish, fn _id, _expected_args ->
{:ok, %HTTPoison.Response{status_code: 201}}
end)


expect(PubsubMock, :publish, 0, fn _id, _expected_pubsub_args ->
{:ok, %PublishResponse{}}
end)

perform_job(HttpWorker, args)
assert Kernel.length(HistoricalData.list_sent_messages()) == 0

Application.delete_env(:postoffice, :enable_historical_data)

end

test "message is not send if response code is out of 2xx range" do
topic = Fixtures.create_topic()
publisher = Fixtures.create_publisher(topic)
Expand Down Expand Up @@ -153,32 +76,6 @@ defmodule Postoffice.HttpWorkerTest do
assert {:error, :nosent} = perform_job(HttpWorker, args)
end

test "historical data is created when message response is out of 2xx range" do
topic = Fixtures.create_topic()
publisher = Fixtures.create_publisher(topic)

args = %{
"consumer_id" => publisher.id,
"target" => publisher.target,
"payload" => %{"action" => "test"},
"attributes" => %{"hive_id" => "vlc"}
}

expected_args = %{
"consumer_id" => publisher.id,
"target" => publisher.target,
"payload" => %{"action" => "test", "attributes" => %{"hive_id" => "vlc"}},
"attributes" => %{"hive_id" => "vlc"}
}

expect(HttpMock, :publish, fn _id, ^expected_args ->
{:ok, %HTTPoison.Response{status_code: 302}}
end)

perform_job(HttpWorker, args)
assert Kernel.length(HistoricalData.list_failed_messages()) == 1
end

test "message is not send if there is any error on the request" do
topic = Fixtures.create_topic()
publisher = Fixtures.create_publisher(topic)
Expand Down Expand Up @@ -253,10 +150,6 @@ defmodule Postoffice.HttpWorkerTest do
{:ok, %HTTPoison.Response{status_code: 201}}
end)

expect(PubsubMock, :publish, fn _, _ ->
{:ok, %PublishResponse{}}
end)

assert {:ok, _sent} = perform_job(HttpWorker, args, attempt: 100)
end

Expand Down
Loading

0 comments on commit ba0c6d1

Please sign in to comment.