Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/schema registry in geronimo #2278

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
2 changes: 2 additions & 0 deletions developer-portal/content/reference/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ in the `5xx` range indicate an error with Fox's platform.
<!-- include(objects/stock_item.apib) -->
<!-- include(objects/shipment.apib) -->
<!-- include(objects/export.apib) -->
<!-- include(objects/cms.apib) -->

<!-- include(public.apib) -->
<!-- include(customers.apib) -->
Expand All @@ -260,3 +261,4 @@ in the `5xx` range indicate an error with Fox's platform.
<!-- include(inventory.apib) -->
<!-- include(transactions.apib) -->
<!-- include(discounts.apib) -->

41 changes: 41 additions & 0 deletions developer-portal/content/reference/objects/cms.apib
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Data Structures

## ContentTypeBase
+ schema: `{ "title": {"type": ["string"], "required": true }}` (required, string) - JSON schema of a content_type
+ name: `BlogPost` (required, string) - Name of a content_type

## ContentTypePayload
- Include ContentTypeBase

## ContentTypeUpdatePayload
+ schema: `{ "another_title": {"type": ["string"], "required": true }}` (optional, string) - JSON schema of a content_type
+ name: `BlogPost` (optional, string) - Name of a content_type

## ContentTypeResponse
- Include ContentTypeBase
+ scope: `1` (required, string) - Scope of an admin who created a given ContentType
+ id: `1` (required, number) - ID of a created ContentType
+ created_by: `4` (required, number) - ID of a user who created a given ContentType
+ versions: `["2017-06-12T03:13:46Z", "2017-06-13T03:12:46Z"]` (optional, string) - Previous versions of ContentType

## EntityBase
+ content_type_id: 1 (required, number) - id of a corresponding content_type
+ storefront: `theperfectgourmet.com` (required, string) - Name of a store on which given Entity has been created
+ content: ` { "title": "foo" } ` (required, string) - JSON content of a Entity

## EntityPayload
- Include EntityBase

## EntityUpdatePayload
+ content_type_id: 1 (optional, number) - id of a corresponding content_type
+ content: ` { "title": "bar" } ` (optional, string) - JSON content of a Entity

## EntityAdminResponse
- Include EntityBase
+ schema_version: `2017-06-12T03:13:46Z` (required, string) - Version of schema Entity was validated against
+ storefront: `theperfectgourmet.com` (required, string) - Name of a store on which given Entity has been created
+ kind: `BlogPost` (required, string) - Type of a created Entity
+ id: `3` (required, number) - ID of a created Entity
+ created_by: `4` (required, number) - ID of a user who created a given Entity
+ versions: `["2017-06-12T03:13:46Z", "2017-06-13T03:12:46Z"]` (optional, string) - Previous versions of Entity

105 changes: 105 additions & 0 deletions developer-portal/content/reference/resources/admin_cms.apib
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
## ContentType [/v1/admin/content_types{id}]

This object represents ContentTypes in CMS.
Each ContentType describes specific Entity.

+ Parameters
+ id: `1` (required, number) - ContentType ID.

+ Model (application/json)
+ Attributes (ContentTypeResponse)


### Retrieve ContentType [GET]

Allows an administrator to retrieve a content_type record, which includes all
information about the content_type.

+ Response 200 (application/json)
+ Attributes (ContentTypeResponse)

### Create New ContentType [POST /v1/admin/content_types]

Allows an administrator to create a new content_types.

+ Request (application/json)
+ Attributes (ContentTypePayload)
+ Response 200 (application/json)
+ Attributes (ContentTypeResponse)

### Update Existing ContentType [PUT]

+ Request (application/json)
+ Attributes(ContentTypeUpdatePayload)
+ Response 200 (application/json)
+ Attributes (ContentTypeResponse)

### Get specific version of ContentType [GET /v1/admin/content_types/{id}/versions?ver={version}]

+ Parameters
+ id: `1` (required, number) - ContentType ID.
+ ver: `2017-06-13T16:14:51.974673Z` (required, string) - Version of a ContentType

+ Response 200 (application/json)
+ Attributes (ContentTypeResponse)

### Restore specific version of ContentType [PUT /v1/admin/content_types/{id}/restore?ver={version}]

+ Parameters
+ id: `1` (required, number) - ContentType ID.
+ ver: `2017-06-13T16:14:51.974673Z` (required, string) - Version of a ContentType

+ Response 200 (application/json)
+ Attributes (ContentTypeResponse)

## Entity [/v1/admin/entities{id}]

This object describes an actual entity of CMS: Blog posts, testimonials, etc.
Each Entity validates against corresponding ContentType's schema before creation.

+ Parameters
+ id: `1` (required, number) - Entity ID.

+ Model (application/json)
+ Attributes (EntityAdminResponse)

### Retrieve Entity [GET]

Allows an administrator to retrieve a single Entity record.

+ Response 200 (application/json)
+ Attributes (EntityAdminResponse)

### Create New Entity [POST /v1/admin/entities]

Allows an administrator to create a new Entity.

+ Request (application/json)
+ Attributes (EntityPayload)
+ Response 200 (application/json)
+ Attributes (EntityAdminResponse)

### Update Existing Entity [PUT]

+ Request (application/json)
+ Attributes(EntityUpdatePayload)
+ Response 200 (application/json)
+ Attributes (EntityAdminResponse)

### Get specific version of Entity [GET /v1/admin/entities/{id}/versions?ver={version}]

+ Parameters
+ id: `1` (required, number) - Entity ID.
+ ver: `2017-06-13T16:14:51.974673Z` (required, string) - Version of a Entity

+ Response 200 (application/json)
+ Attributes (EntityAdminResponse)

### Restore specific version of Entity [PUT /v1/admin/entities/{id}/restore?ver={version}]

+ Parameters
+ id: `1` (required, number) - Entity ID.
+ ver: `2017-06-13T16:14:51.974673Z` (required, string) - Version of a Entity

+ Response 200 (application/json)
+ Attributes (EntityAdminResponse)
4 changes: 2 additions & 2 deletions geronimo/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ test:
MIX_ENV=test mix espec

seed:
mix run priv/seeds/clothes_accessories_categories.exs
mix run priv/seeds/clothes_schema.exs
mix run priv/seeds/seeds.exs

####################################################################
# Docker build targets #
Expand Down Expand Up @@ -82,6 +81,7 @@ migrate:

reset:
@make drop-db
dropdb --if-exists $(DB_TEST) -U $(DB_USER)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that we have drop-db, shall we create drop-test-db too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually no. We dropping test DB here because in other case we can not drop development DB cause it belongs to geronimo DB user as well.
We creating test DB in reset-test task.

@make drop-user
@make create-user
@make create-db
Expand Down
16 changes: 16 additions & 0 deletions geronimo/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,18 @@ GERONIMO_DB_HOST=localhost
GERONIMO_DB_USER=geronimo
GERONIMO_DB_NAME=geronimo_development
GERONIMO_DB_PASSWORD=''

# kafka
BROKER_HOST=kafka_broker_host
BROKER_PORT=9092
CONSUMER_GROUP=geronimo_kafka_ex
SCHEMA_REGISTRY_IP=schema_registry_ip
SCHEMA_REGISTRY_PORT=8081

# Start kafka worker on application start
START_WORKER=true

#jwt
PUBLIC_KEY=/path/to/public_key.pem
```

Expand Down Expand Up @@ -488,13 +500,16 @@ Params:
|----|----|-----------|---------|
|content_type_id|Integer|Corresponding ContentType id|Yes|
|content|JSON|Content of an Entity|Yes|
|storefront|string|Storefront on which given entity has been created|Yes|


**Request:**
`POST /v1/admin/entities/`

```json
{
"content_type_id": 1,
"storefront": "foo.bar",
"content": {
"title":"Some title foooooo",
"body":"Lorem ipsum",
Expand All @@ -511,6 +526,7 @@ Params:
"updated_at": "2017-06-12T03:15:17Z",
"scope": "1",
"schema_version": "2017-06-12T03:13:46Z",
"storefront": "foo.bar",
"kind": "BlogPost",
"inserted_at": "2017-06-12T03:15:17Z",
"id": 3,
Expand Down
11 changes: 10 additions & 1 deletion geronimo/config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,13 @@ config :geronimo, Geronimo.Repo,
database: System.get_env("GERONIMO_DB_NAME"),
hostname: System.get_env("GERONIMO_DB_HOST"),
pool: Ecto.Adapters.SQL.Sandbox,
types: Geronimo.PostgresTypes
types: Geronimo.PostgresTypes

config :exvcr, [
vcr_cassette_library_dir: "spec/fixture/vcr_cassettes",
filter_sensitive_data: [
[pattern: "<PASSWORD>.+</PASSWORD>", placeholder: "PASSWORD_PLACEHOLDER"]
],
filter_url_params: false,
response_headers_blacklist: []
]
3 changes: 2 additions & 1 deletion geronimo/lib/geronimo/api/router/v1/admin.ex
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,15 @@ defmodule Geronimo.Router.V1.Admin do
params do
requires :content_type_id, type: Integer
requires :content, type: Any
requires :storefront, type: String
end

post do
try do
{:ok, content_type} = ContentType.get(params[:content_type_id], conn.assigns[:current_user].scope)
validated = Validation.validate!(params[:content], content_type.schema)

case Entity.create(validated, content_type, conn.assigns[:current_user]) do
case Entity.create(validated, content_type, params[:storefront], conn.assigns[:current_user]) do
{:ok, record} -> respond_with(conn, record)
{:error, err} -> respond_with(conn, err, 400)
end
Expand Down
27 changes: 27 additions & 0 deletions geronimo/lib/geronimo/kafka/pusher.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
defmodule Geronimo.Kafka.Pusher do
@moduledoc """
Implements sync and async pushes to Kafka
"""
require Logger

def push(module, obj) do
kind = apply(module, :table, [])
data = apply(module, :avro_encode!, [obj])
res = KafkaEx.produce("geronimo_#{kind}", 0, data,
key: "#{kind}_#{obj.id}",
worker_name: :geronimo_worker)
Logger.debug "#{Inflex.singularize(kind)} with id #{obj.id} pushed to kafka #{inspect(res)}"
end

def push_async(kind, obj) do
unless Mix.env == :test do
Task.async(fn ->
push(kind, obj)
end)
end
end

def push_async_await(kind, obj) do
push_async(kind, obj) |> Task.await
end
end
44 changes: 23 additions & 21 deletions geronimo/lib/geronimo/kafka/schema_registry_client.ex
Original file line number Diff line number Diff line change
@@ -1,36 +1,38 @@
defmodule Geronimo.Kafka.SchemaRegistryClient do
@moduledoc """
Provides a easy and conveniens functions to get/store schemas in schema registry
"""
use HTTPoison.Base

def get_schema(object, id) do
case get("/subjects/#{object}/versions/#{id}") do
{:ok, %HTTPoison.Response{body: body, headers: _, status_code: 200}} ->
body[:schema]
|> Poison.decode!
|> Utils.atomize
{:ok, %HTTPoison.Response{body: body, headers: _, status_code: _}} -> body
{:error, err} -> err
end
end

def store_schema(schema) do
schema
get("/subjects/#{object}/versions/#{id}")
|> response_body()
end

def process_url(url) do
"http://#{schema_registry_url()}/#{url}"
def store_schema(object, schema) do
post("/subjects/#{object}/versions", Poison.encode!(%{schema: schema}))
|> response_body()
end

def process_response_body(body) do
body
|> Poison.decode!
|> Enum.map(fn({k, v}) -> {String.to_atom(k), v} end)
defp response_body(response) do
case response do
{:ok, %HTTPoison.Response{body: body, headers: _, status_code: 200}} -> {:ok, body}
{:ok, %HTTPoison.Response{body: body, headers: _, status_code: _}} -> {:error, body}
{:error, err} -> {:fail, err.reason}
end
end

def schema_registry_url do
url = Application.fetch_env!(:geronimo, :schema_registry_ip)
def process_url(path) do
ip = Application.fetch_env!(:geronimo, :schema_registry_ip)
port = Application.fetch_env!(:geronimo, :schema_registry_port)
"http://#{url}:#{port}"
"http://#{ip}:#{port}" <> path
end

def process_request_headers(), do: ["Content-Type", "application/vnd.schemaregistry.v1+json"]

def process_response_body(body) do
body
|> Poison.decode!
|> Utils.atomize
end
end
31 changes: 18 additions & 13 deletions geronimo/lib/geronimo/kafka/worker.ex
Original file line number Diff line number Diff line change
@@ -1,27 +1,32 @@
defmodule Geronimo.Kafka.Worker do
@moduledoc """
Starts KafkaEs worker on app start and registers all needed schemas
NB: Add new modules to register_schemas() if needed.
"""

require Logger
alias Geronimo.Kafka.SchemaRegistryClient

def start do
kafka_url = [{Application.fetch_env!(:geronimo, :kafka_host),
Application.fetch_env!(:geronimo, :kafka_port) |> String.to_integer }]
KafkaEx.create_worker(:geronimo_worker, [uris: kafka_url,
consumer_group: Application.fetch_env!(:geronimo, :consumer_group)])
register_schemas()
end

def push(kind, obj) do
KafkaEx.produce("geronimo_#{kind}", 0, Poison.encode!(obj),
key: "#{kind}_#{obj.id}", worker_name: :geronimo_worker)
end
def register_schemas do
Task.async(fn->
modules = [Geronimo.ContentType, Geronimo.Entity]

def push_async(kind, obj) do
unless Mix.env == :test do
Task.async(fn ->
push(kind, obj)
Enum.each(modules, fn(module) ->
key_schema = apply(module, :avro_schema_key, [])
value_schema = apply(module, :avro_schema_value, [])
object = apply(module, :table, [])
{:ok, k_res} = SchemaRegistryClient.store_schema("#{object}-key", key_schema)
{:ok, v_ver} = SchemaRegistryClient.store_schema("#{object}-value", value_schema)
Logger.info "Schemas for #{object} registered. Key: #{inspect(k_res)}, value: #{inspect(v_ver)}"
end)
end
end

def push_async_await(kind, obj) do
push_async(kind, obj) |> Task.await
end) |> Task.await
end
end
Loading