Skip to content

Architecture Document

dabuntu edited this page Jan 5, 2016 · 1 revision

##EventShop Architecture

EventShop is used for Personalized Situation Detection and Control by combining heterogeneous web streams.

Data streams generated by social media, sensor networks, internet of things, and digitalization of transactions provide us with abundance of data which when analysed will provide us with valuable data which can be used to detect patterns to notify when a particular situation occurs.

###Frameworks used : Apache Kafka MongoDB Akka

###Kafka

Data collected at real time from multiple Web Streams will be huge and needs to be partitioned and spread over a cluster a machines to handle the bulk load. Once this is achieved there should also be support for a cluster of co-ordinated consumers trying to access this data. To solve this issue Kafka has been chosen. The reason are as follows

Can handle High Throughput
Scalability - Distributed in nature. Horizontal scaling is easy.
Reliability - through Replications.
Data Retention policy of Kafka brings in the facility to re-generate Emages if required.
Publish and Subscribe model helps integrate Real Time(Streaming) and Batch Processing(time based) into a single data source.

###MongoDB - All the data collected by Eventshop is stored in a database to provide any historical data analysis to the end user.

MongoDB is chosen as the database to store all the data. The reasons for choosing MongoDB is as follows.

Can handle a very high rate load.
It is highly reliable using replica sets.
supports scaling of data, has support for sharding.
Supports geo-spatial Queries. Finding relevant data from specific locations is fast and accurate, which is very important feature of Event shop
Indexing on any attribute. Moving into multi dimensional model and flexible Rule based query indexing on any attribute of the document is essential.

###Data Ingestion Layer

Two types of Data Sources are now supported namely CSV Data and Twitter Data.

Data available in the Internet is of different formats and it has to be unified to a common format. The solution provided is to convert it into STT (Space Time Theme, STT) format.

The following data format is used currently
{“theme” : “”,
“timestamp” : “”,
“location” : {lat:””,lon””},
value : “”,
raw_data:””}

The raw data is also stored for any future reference.

###a. CSV Data Source

Data is then read from the individual kafka topics and converted to STT format and inserted into mongo DB.

Among the supported data formats value field is set to 1 for twitter as it specifies one tweet and set to the specified value in CSV.

###b. Twitter Data source

Twitter API
Twitter Streaming API
Twitter Streaming API generally returns only 40 % of the tweets. In the 40 % of tweets returned only 1 % is geo-tagged. So the amount of tweets collected is very less because of filtering tweets by geolocation. But twitter streaming API gives us with Real time data, which is useful for real time analytics. Twitter Streaming also has a limitation of 400 hashtags you can search for. Twitter Search API

	Twitter Search API searches for tweets back in time. Search is rate limited to the number of calls you make per day and returns you approximately 7 to 10 days of tweets with a maximum of 1000 tweets.   

So a combination of both is used in the system to get more tweets.

Twitter streaming API is started with the bag of words of all the enabled data sources. All the tweets are collected in a Kafka topic called “tweets”.

Then processing is done to analyse to which data source the tweet belongs and individual topics are populated with the tweets.
Then the tweets are analyzed if the belong to the geo location specified in the Data source, if so convert the data to the STT format and populate Mongo DS with the values.

###II. Stream Processing Engine Once the data sources are created, different spatio-temporal like min, max, average, sum, majority, split uniform, linear_interpolation.

###III. Query Processing Framework Query Plan tree is constructed to perform the different operations on the emage generated. The query tree is parsed to check if it can be executed in an asynchronous manner and events are generated to perform each query operation. Each Query Operation is now an Actor which receives the event generated and processes them. There is a master actor which parses the query tree and manages the messages sent to different operators actors separately. Each actor creates the C file and executes them, which in turn creates the query Emage.

###Configuring a query

###Rest services:-

1- Create a query

   Type: POST  
   Consumes: MediaType.APPLICATION_JSON  
   Path: http://localhost:8085/eventshoplinux/rest/queryService/createQuery  
   Returns: id  
   Data Format:  {  
                  "query_creator_id": 78,  
                  "query_name":"Test112",  
                  "query_esql": "[{\"qID\":1,\"patternType\":\"tpchar\",\"dataSources\":[\"ds3\",\"ds4\"],\"tcTimeWindow\":\"65\",\"tmplCharOperator\":\"Displacement\",\"queryName\":\"IP1_TC1\",\"timeWindow\":300,\"latitudeUnit\":2,\"longitudeUnit\":2,\"queryStatus\":\"S\",\"qryCreatorId\":\"78\",\"boundingBox\":\"24,-125,50,-66\",\"timeRange\":[1,1]}]",  
                  "latitude_unit": 0.2,  
                  "longitude_unit": 0.2,  
                  "boundingbox": "24,-125,50,-66"  
                   }   

2- Delete a query

   Type: DELETE  
   Consumes: MediaType.APPLICATION_JSON  
   Path: http://localhost:8085/eventshoplinux/rest/queryService/deleteQuery  
   Data Format:  {  
                      "ID": 3  
                   }  

3- Enable a query

   Type: POST  
   Consumes: MediaType.APPLICATION_JSON  
   Path: http://localhost:8085/eventshoplinux/rest/queryService/enableQuery  
   Data Format:  {  
                      "ID": 3  
                   }  

4- Disable a query

   Type: POST  
   Consumes: MediaType.APPLICATION_JSON  
   Path: http://localhost:8085/eventshoplinux/rest/queryService/disableQuery  
   Data Format:  {  
                     "ID": 3  
                   }  

Configuring a Data Source

A Data source is used to specify where to collect data from, the format of the incoming data, if any transformations needs to be applied on the data. The data collection can be scheduled to be read periodically and all the data collected is stored in the Database for future analysis.

The following are the fields in a Data Source

Theme - A theme of the DataSource
Name - A descriptive name for the Data Source
URL - The URL of the File, Rest or Twitter endpoint
Data syntax - The syntax of how the collected data has to be transformed and stored in DB.The field name and field type are configured as a json.
Example : "timestamp":"DATETIME","theme":"STRING","value":"NUMBER", "loc":{"lon":"NUMBER","lat":"NUMBER"}

The types of fields - NUMBER, STRING, DATETIME

Data Format - The format of the endpoint - Rest, File, Twitter
Type of DataFormat - The data format of the incoming data Csv, Xml, Json, Visual
Extra Configurations - A json containing other configuration specific to a Data source.
The json supports the following options now
{ "datasource_type":"point",
"spatial_wrapper":"sum",
"isList":true,
"rootElement": "",
"tokenizeElement": "list",
"lat_path" :"coord.lat",
"lon_path":"coord.lon",
"val_path":"wind.speed",
"date_time_path":"dt", "date_time_format":"Long"}

"datasource_type" -
"spatial_wrapper" - sum, max, min, avg, count, majority, most_freq
isList - true (optional field)
rootElement - Used in the case json, xml to specify the root path to parse through
tokenizeElement - Used in the case of json, xml to tokenize the list of elements
*_path - Data to be extracted for the fields specified in the data syntax. Have to be the exact same name
*_grok - Any grok expression can be specified for the corresponding *_path.
Note: If the data is of type DATETIME, you can specify the format of the DateTime in this json as *_format.

Time Window - Time frame to search the data.
Synchronize at nth sec - Time to schedule reading of the data from the source
Search Time type - Past or Future, used to search the data from the DB
Unit of Latitude - The unit of measure for the Grid latitude
Unit of Longitude - The unit of measure for the Grid latitude
SW Latitude - The SW latitude of the grid
SW Longitude - The SW longitude of the grid
NE Latitude - The NE latitude of the grid
NE Longitude - The NE longitude of the grid
Generate Emage - Checkbox to create the Emage if it is geo-specific data.

REST SERVICES FOR DATASOURCE:-

Create a data source.

   Type: POST  
   Consumes: MediaType.APPLICATION_JSON  
   Path: http://localhost:8085/eventshoplinux/rest/dataSourceService/createDataSource  
   Data Format:    

{
"Name": "Test",
"Theme": "Test",
"Url": "www.twitter.com",
"Format": "file",
"User_Id": 78,
"Time_Window": 300000,
"Latitude_Unit": 0.2,
"Longitude_Unit": 0.2,
"boundingbox": "28.565225,-126.826172,45.729191,-110.830078",
"Sync_Time": 300000,
"Wrapper_Name": "csvField",
"Wrapper_Key_Value": "{ "datasource_type": "point", "spatial_wrapper": "sum", "isList": true, "rootElement": "rss/channel", "tokenizeElement": "item", "lat_path": "/item/description/text()", "lat_grok": ": %{NUMBER:lat}°%{GREEDYDATA:waste1}%{NUMBER:lon}"N%{GREEDYDATA:waste2}%{WORD:status} at index %{NUMBER:index}", "lon_path": "/item/description/text()", "lon_grok": ": %{NUMBER:lat}°%{GREEDYDATA:waste1}%{NUMBER:lon}"N%{GREEDYDATA:waste2}%{WORD:status} at index %{NUMBER:index}", "val_path": "/item/description/text()", "val_grok": ": %{NUMBER:lat}°%{GREEDYDATA:waste1}%{NUMBER:lon}"N%{GREEDYDATA:waste2}%{WORD:status} at index %{NUMBER:index}", "date_time_path": "/item/pubDate/text()", "date_time_grok": "%{WORD:garbage}, %{GREEDYDATA:date_time} +%{NUMBER:garbage1}" }",
"Bag_Of_Words": "#California"
}

   2. Delete a Data Source  

   Type: DELETE  
   Consumes: MediaType.APPLICATION_JSON  
   Path: http://localhost:8085/eventshoplinux/rest/dataSourceService/deleteDataSource  
   Data Format:  { “ID”: 3 }  

  3. Enable a Data Source  

   Type: POST  
   Consumes: MediaType.APPLICATION_JSON  
   Path: http://localhost:8085/eventshoplinux/rest/dataSourceService/enableDataSource  
   Data Format: { “ID”: 3 }  


   4. Disable a Data Source  

   Type: POST  
   Consumes: MediaType.APPLICATION_JSON  
   Path: http://localhost:8085/eventshoplinux/rest/dataSourceService/disableDataSource  
   Data Format:  { “ID”: 3 }  

II. Rules Engine

The rules engine is used to query the data stored by the data source. The rules engine is a mongo specific implementation but can be changed for any other kind of Database.

The idea of the rule engine is to query the data stored in the DB. The fields required as an output should be specified in the Extract Field.
Select the field in which you want to query and select the query operator and specify the parameters as a comma separated value.
The query operators populated depend on the data type specified in the data syntax when configuring the Data source.

The following Query operators are supported for the corresponding Data type
NUMBER - >, <, =, !=
STRING - regex, equal
LOCATION - radius, address, coordinates

Data Type
Operator
Parameters
NUMBER

Double number
NUMBER
<
Double number
NUMBER

Double number
NUMBER
!=
Double number
STRING
regex
String regex
STRING
equal
String
LOCATION
radius
Double lon, Double lat, Double radius
LOCATION
address
String
LOCATION
coordinates
Bottom lon, Bottom lat, Top lon, Top lat - all Double values

Configure a rule

Rest services:-

1- Create a rule

   Type: POST  
   Consumes: MediaType.APPLICATION_JSON  
   Path: http://localhost:8085/eventshoplinux/rest/rulewebservice/rule  
   Data Format:  {  
                     "source": "ds3",  
                     "rules": [  
                         {  
                             "dataField": "value",  
                             "ruleOperator": ">",  
                             "ruleParameters": "50"  
                         }  
                     ],  
                     "extractFields": "location.lat,location.lon,value,"  
                 }  

2- Create and Enable a Rule

   Type: POST  
   Consumes: MediaType.APPLICATION_JSON  
   Path: http://localhost:8085/eventshoplinux/rest/rulewebservice/createAndEnableRule  
   Data Format:  {  
                       "source": "ds3",  
                      "rules": [  
                          {  
                               "dataField": "value",  
                              "ruleOperator": ">",  
                              "ruleParameters": "50"  
                          }  
                      ],  
                      "extractFields": "location.lat,location.lon,value,"  
                  }  

3- Enable a rule

   Type: POST  
   Path: http://localhost:8085/eventshoplinux/rest/rulewebservice/enableRule/{id}  

4- Get a rule

   Type: GET  
   Path: http://localhost:8085/eventshoplinux/rest/rulewebservice/rule/{id}  

###V. Alerts Framework

Alert Framework is used to set up notify any abnormal conditions in the raw data or transformed data. For Example: Temperature above 45C. This is a Rest service providing 3 features. Create Alert, Enable Alert and Disable Alert. Alerts can be created either on Query or Data Source, How ever the region of alert is nonspecific to the data source or query. It can be resizable to a smaller region. Example: The datasource or query could be for entire US region but alerts can be configured only to a particular state.

For a configured region all the alerts would be collected and sent to a configured rest endpoint.

Configuring an Alert

  1. Create Alert
    Endpoint: http://localhost:8085/eventshoplinux/rest/alertwebservice/registerAlert/
    Http Method : POST
    Request Body:

Without Solution
{"alertName":"NoSolutionAlert",
"theme" : "theme",
"alertSource" : "Q3",
"alertMin":40,
"alertMax":100,
"email":"[email protected]",
"resultEndpoint": "http://localhost:8085/eventshoplinux/rest/alertwebservice/resultAlert/"}

With Solution
{"alertName":"test123",
"alertType":"solution",
"theme" : "theme",
"alertSource" : "Q3",
"safeSource":"ds3",
"alertMin":40,
"alertMax":100,
"safeMin":0,
"safeMax":20,
"email":"[email protected]",
"resultEndpoint": "http://localhost:8085/eventshoplinux/rest/alertwebservice/resultAlert/"
}

Without Solution for a particular region
{"alertName":"NoSolutionAlert",
"theme" : "theme",
"alertSource" : "Q3",
"alertMin":40,
"alertMax":100,
"email":"[email protected]",
"resultEndpoint": "http://localhost:8085/eventshoplinux/rest/alertwebservice/resultAlert/",
"alertMessage":"High PM2.5 found in {$coordinate} and address is {$geoAddress} and solution found in {$solutionCoordinate} and address is {$solutionAddress}.",
"loc":{"lat":32.11,"lng":-116.11"},
"radius":"100"
}

With Solution for a particular region
{"alertName":"test123",
"alertType":"solution",
"theme" : "theme",
"alertSource" : "Q3",
"safeSource":"ds3",
"alertMin":40,
"alertMax":100,
"safeMin":0,
"safeMax":20,
"email":"[email protected]",
"resultEndpoint": "http://localhost:8085/eventshoplinux/rest/alertwebservice/resultAlert/",
"alertMessage":"High PM2.5 found in {$coordinate} and address is {$geoAddress} and solution found in {$solutionCoordinate} and address is {$solutionAddress}.",
"loc":{"lat":32.11,"lng":-116.11"},
"radius":"100"
}

NOTE:- The Alert Source has to be the query Id prefixed with Q example "Q31" or data Source id prefixed with ds example "ds3".
In "alertMessage" field, constant characters of the message is limited to 50 characters. Each variable field should be given as specified in above example message. Respective meanings for each variable field is as follows:-

  1. {$coordinate} :- Latitude and longitude of the alerted locations.

  2. {$geoAddress} :- Geographical address of the alerted locations.

  3. {$solutionCoordinate} :- Latitude and longitude of the safe locations.

  4. {$solutionAddress} :- Geographical address of the safe locations.

  5. Enable Alert
    To enable an alert
    Endpoint : http://localhost:8085/eventshoplinux/rest/alertwebservice/enableAlert/
    Http Method : POST
    Request:
    {"alertID":7}

  6. Disable Alert
    To enable an alert
    Endpoint : http://localhost:8085/eventshoplinux/rest/alertwebservice/disableAlert/
    Http Method : POST
    Request:
    {"alertID":7}

Clone this wiki locally