Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
arunjose1995 committed Mar 26, 2021
2 parents 9e5e0d8 + 1cba90e commit e1cd579
Show file tree
Hide file tree
Showing 12 changed files with 2,854 additions and 1 deletion.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
.idea
*/config.ini
22 changes: 21 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,21 @@
# ml-analytics-service
# ml-analytics-service

#### Prerequisites
1. Python3.6.x or above
1. Python virtual environment to work with

#### Setup

Each folder consists of its own config files and programs. Navigate into respective folder and create a file with the name `config.ini` and copy the contents from `config.sample` present in the respective folders.

Replace the `<>` with appropriate values in the `config.ini` and then save.

Activate the virtual environment created earlier by running the following command `. <PATH-TO-YOUR-VIRTUAL-ENV-NAME>/bin/activate`

Run the following command to install all the dependencies `pip install -r requirements.txt`

#### Execution

For programs that start with `py_*` are a general ones. So use `python <PATH-TO-PROGRAM>`

For programs that start with `pyspark_*` use `spark-submit <PATH-TO-PROGRAM>`
175 changes: 175 additions & 0 deletions observations/config.sample
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
[COMMON]

diksha_survey_app_name = <SURVEY_APP_NAME>
diksha_integrated_app_name = <INTEGRATED_APP_NAME>
observation_status_output_dir = <FOLDER_PATH>

[API_HEADERS]

# --------------------------------
# API variables
# --------------------------------

headers_getkeyclock = {'Content-Type': 'application/x-www-form-urlencoded'}
client_id = <CLIENT_ID>
grant_type = refresh_token

# --------------------
# Header API variables
# --------------------

content_type = application/json
authorization = <AUTHORIZATION_KEY>
refresh_token = <REFRESH_TOKEN>

# -----------------------
# Channel Id
# -----------------------

channel-id = <CHANNEL_ID>


[URL]

base_url = http://<INTERNAL_IP>:<PORT_NUMBER>
url_getkeyclock = https://<YOUR_APPLICATION_URL>/auth/realms/sunbird/protocol/openid-connect/token

url_entity_related = assessment/api/v1/entities/relatedEntities/
url_entity_relatedapi = http://<INTERNAL_IP>:<PORT_NUMBER>/assessment/api/v1/entities/relatedEntities/

sunbird_api_base_url_ip = http://<INTERNAL_IP>:<PORT_NUMBER>/api
sunbird_api_url_readuser = user/v1/read
sunbird_api_url_syncuser = data/v1/index/sync

url_user_profile_api = assessment/api/v1/userExtension/getProfile/

sunbird_api_url_searchuser = user/v1/search


[MONGO]

# --------------
# Mongo prod url
#---------------

mongo_url = mongodb://<INTERNAL_IP>:<PORT_NUMBER>

# -----------------------
# Mongo database name
# -----------------------

database_name = <DATABASE_NAME>

# -------------------------------------
# Mongo observationSubmission Collection
# -------------------------------------

observation_sub_collec = <OBSERVATION_SUB_COLLECTION_NAME>

# -------------------------
# Mongo solutions Collection
# --------------------------

solutions_collec = <SOLUTION_COLLECTION_NAME>

# ----------------------------
# Mongo observations Collection
# -----------------------------

observations_collec = <OBSERVATION_COLLECTION_NAME>

# ---------------------------
# Mongo entityTypes Collection
# ----------------------------

entity_type_collec = <ENTITY_TYPE_COLLECTION_NAME>

# -------------------------
# Mongo questions Collection
# -------------------------

questions_collec = <QUESTION_COLLECTION_NAME>

# ------------------------
# Mongo criteria Collection
# -----------------------

criteria_collec = <CRITERIA_COLLECTION_NAME>

# -----------------------
# Mongo entities Collection
# -----------------------

entities_collec = <ENTITIES_COLLECTION_NAME>

# -----------------------
# Mongo programs Collection
# -----------------------

programs_collec = <PROGRAM_COLLECTION_NAME>

# -----------------------
# Mongo user_roles Collection
# -----------------------

user_roles_collection = <USERROLES_COLLECTION_NAME>

[DRUID]

urlQuery = http://<INTERNAL_IP>:<PORT_NUMBER>/druid/indexer/v1/supervisor

druid_end_point = http://<INTERNAL_IP>:<PORT_NUMBER>/druid/coordinator/v1/datasources/

druid_batch_end_point = http://<INTERNAL_IP>:<PORT_NUMBER>/druid/indexer/v1/task

observation_status_spec = <OBSERVATION_STATUS_SPEC>

observation_spec = <OBSERVATION_SPEC>

observation_evidence_spec = <OBSERVATION_EVIDENCE_SPEC>

[KAFKA]

kafka_url = <KAFKA_INTERNAL_IP>:<PORT>

kafka_raw_data_topic = <TOPIC_NAME>

kafka_druid_topic = <DRUID_TOPIC_NAME>

kafka_evidence_druid_topic = <EVIDENCE_TOPIC_NAME>

[LOGS]

observation_streaming_success_log_filename = <FOLDER_PATH>/success.log

observation_streaming_error_log_filename = <FOLDER_PATH>/error.log

observation_streaming_evidence_success_log_filename = <FOLDER_PATH>/success.log

observation_streaming_evidence_error_log_filename = <FOLDER_PATH>/error.log

observation_status_success_log_filename = <FOLDER_PATH>/status/success.log

observation_status_error_log_filename = <FOLDER_PATH>/status/error.log

[ELASTICSEARCH]

header = {'Content-Type': 'application/json'}

url_user = http://<INTERNAL_IP>:<PORT_NUMBER>/users/_search/?scroll=1m

user_body = {"size": 10000,"query":{"bool":{"must":[{"match":{"_type":"common"}}]}}}

url_user_scroll = http://<INTERNAL_IP>:<PORT_NUMBER>/_search/scroll

url_entity = http://<INTERNAL_IP>:<PORT_NUMBER>/entities/_search/?scroll=1m

[AZURE]

account_name = <ACCOUNT_NAME>

sas_token = <SAS_TOKEN>

container_name = <CONTAINER_NAME>

blob_path = <BLOB_PATH>
174 changes: 174 additions & 0 deletions observations/py_observation_evidence_streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
# -----------------------------------------------------------------
# Name : py_observation_evidence_streaming.py
# Author : Shakthieshwari.A
# Description : Extracts the Evidence or Files Attached at each question level
# during the observation submission
# -----------------------------------------------------------------
from pymongo import MongoClient
from bson.objectid import ObjectId
import csv,os
import json
import boto3
import datetime
from datetime import date,time
import requests
import argparse
from kafka import KafkaConsumer
from kafka import KafkaProducer
import dateutil
from dateutil import parser as date_parser
from configparser import ConfigParser,ExtendedInterpolation
import faust
import logging
import logging.handlers
import time
from logging.handlers import TimedRotatingFileHandler

config_path = os.path.dirname(os.path.abspath(__file__))
config = ConfigParser(interpolation=ExtendedInterpolation())
config.read(config_path + "/config.ini")

formatter = logging.Formatter('%(asctime)s - %(levelname)s')

successLogger = logging.getLogger('success log')
successLogger.setLevel(logging.DEBUG)

# Add the log message handler to the logger
successHandler = logging.handlers.\
RotatingFileHandler(config.get('LOGS','observation_streaming_evidence_success_log_filename'))
successBackuphandler = TimedRotatingFileHandler(config.get('LOGS','observation_streaming_evidence_success_log_filename'),
when="w0",backupCount=1)
successHandler.setFormatter(formatter)
successLogger.addHandler(successHandler)
successLogger.addHandler(successBackuphandler)

errorLogger = logging.getLogger('error log')
errorLogger.setLevel(logging.ERROR)
errorHandler = logging.handlers.\
RotatingFileHandler(config.get('LOGS','observation_streaming_evidence_error_log_filename'))
errorBackuphandler = TimedRotatingFileHandler(config.get('LOGS','observation_streaming_evidence_error_log_filename'),
when="w0",backupCount=1)
errorHandler.setFormatter(formatter)
errorLogger.addHandler(errorHandler)
errorLogger.addHandler(errorBackuphandler)

try:
kafka_url = (config.get("KAFKA","kafka_url"))
app = faust.App('sl_observation_evidences_diksha_faust',broker='kafka://'+kafka_url,value_serializer='raw',
web_port=7002,broker_max_poll_records=500)
rawTopicName = app.topic(config.get("KAFKA","kafka_raw_data_topic"))
producer = KafkaProducer(bootstrap_servers=[kafka_url])

#db production
clientdev = MongoClient(config.get('MONGO','mongo_url'))
dbdev = clientdev[config.get('MONGO','database_name')]

observationSubmissionsDevCollec = dbdev[config.get('MONGO','observation_sub_collec')]
solutionsDevCollec = dbdev[config.get('MONGO','solutions_collec')]
observationDevCollec = dbdev[config.get('MONGO','observations_collec')]
entityTypeDevCollec = dbdev[config.get('MONGO','entity_type_collec')]
questionsDevCollec = dbdev[config.get('MONGO','questions_collec')]
criteriaDevCollec = dbdev[config.get('MONGO','criteria_collec')]
entitiesDevCollec = dbdev[config.get('MONGO','entities_collec')]
except Exception as e:
errorLogger.error(e,exc_info=True)

try :
def convert(lst):
return ','.join(lst)
except Exception as e:
errorLogger.error(e,exc_info=True)

try:
def evidence_extraction(msg_id):
for obSub in observationSubmissionsDevCollec.find({'_id':ObjectId(msg_id)}):
successLogger.debug("Observation Evidence Submission Id : " + str(msg_id))
try:
completedDate = str(datetime.datetime.date(obSub['completedDate'])) + 'T' \
+ str(datetime.datetime.time(obSub['completedDate'])) + 'Z'
except KeyError:
pass
evidence_sub_count = 0
try:
answersArr = [ v for v in obSub['answers'].values()]
except KeyError:
pass
for ans in answersArr:
try:
if len(ans['fileName']):
evidence_sub_count = evidence_sub_count + len(ans['fileName'])
except KeyError:
if len(ans['instanceFileName']):
for instance in ans['instanceFileName']:
evidence_sub_count = evidence_sub_count + len(instance)
for answer in answersArr:
observationSubQuestionsObj = {}
observationSubQuestionsObj['completedDate'] = completedDate
observationSubQuestionsObj['total_evidences'] = evidence_sub_count
observationSubQuestionsObj['userName'] = obSub['evidencesStatus'][0]['submissions'][0]['submittedByName']
observationSubQuestionsObj['userName'] = observationSubQuestionsObj['userName'].replace("null","")
observationSubQuestionsObj['observationSubmissionId'] = str(obSub['_id'])
observationSubQuestionsObj['school'] = str(obSub['entityId'])
observationSubQuestionsObj['schoolExternalId'] = obSub['entityExternalId']
observationSubQuestionsObj['schoolName'] = obSub['entityInformation']['name']
observationSubQuestionsObj['entityTypeId'] = str(obSub['entityTypeId'])
observationSubQuestionsObj['createdBy'] = obSub['createdBy']
observationSubQuestionsObj['solutionExternalId'] = obSub['solutionExternalId']
observationSubQuestionsObj['solutionId'] = str(obSub['solutionId'])
observationSubQuestionsObj['observationId'] = str(obSub['observationId'])
try :
observationSubQuestionsObj['appName'] = obSub["appInformation"]["appName"].lower()
except KeyError :
observationSubQuestionsObj['appName'] = config.get("COMMON","diksha_survey_app_name")
fileName = []
fileSourcePath = []
try:
observationSubQuestionsObj['remarks'] = answer['remarks']
observationSubQuestionsObj['questionName'] = answer['payload']['question'][0]
except KeyError:
pass
observationSubQuestionsObj['questionId'] = str(answer['qid'])
for ques in questionsDevCollec.find({'_id':ObjectId(observationSubQuestionsObj['questionId'])}):
observationSubQuestionsObj['questionExternalId'] = ques['externalId']
observationSubQuestionsObj['questionResponseType'] = answer['responseType']
evidence = []
evidenceCount = 0
try:
if answer['fileName']:
evidence = answer['fileName']
observationSubQuestionsObj['evidence_count'] = len(evidence)
evidenceCount = len(evidence)
except KeyError:
if answer['instanceFileName']:
for inst in answer['instanceFileName'] :
evidence.extend(inst)
observationSubQuestionsObj['evidence_count'] = len(evidence)
evidenceCount = len(evidence)
for evi in evidence:
fileName.append(evi['name'])
fileSourcePath.append(evi['sourcePath'])
observationSubQuestionsObj['fileName'] = convert(fileName)
observationSubQuestionsObj['fileSourcePath'] = convert(fileSourcePath)
if evidenceCount > 0:
producer.send((config.get("KAFKA","kafka_evidence_druid_topic")), json.dumps(observationSubQuestionsObj)
.encode('utf-8'))
producer.flush()
successLogger.debug("Send Obj to Kafka")
except Exception as e:
errorLogger.error(e,exc_info=True)


try:
@app.agent(rawTopicName)
async def observationEvidenceFaust(consumer) :
async for msg in consumer :
msg_val = msg.decode('utf-8')
msg_data = json.loads(msg_val)
successLogger.debug("========== START OF OBSERVATION EVIDENCE SUBMISSION ========")
obj_arr = evidence_extraction(msg_data['_id'])
successLogger.debug("********* END OF OBSERVATION EVIDENCE SUBMISSION ***********")
except Exception as e:
errorLogger.error(e,exc_info=True)

if __name__ == '__main__':
app.main()
Loading

0 comments on commit e1cd579

Please sign in to comment.