From 346d079c20b4d9539829738d6227169f5caa3442 Mon Sep 17 00:00:00 2001 From: Santhosh Gandham <31979949+gandham-santhosh@users.noreply.github.com> Date: Tue, 23 Feb 2021 19:21:36 +0530 Subject: [PATCH 01/13] Initial commit --- LICENSE | 21 +++++++++++++++++++++ README.md | 1 + 2 files changed, 22 insertions(+) create mode 100644 LICENSE create mode 100644 README.md diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..5f3a684 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2021 Sunbird for Education + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..3018356 --- /dev/null +++ b/README.md @@ -0,0 +1 @@ +# ml-analytics-service \ No newline at end of file From 5b4ec84af0fb6fe9d99f954bdc73f63ab7c09f3d Mon Sep 17 00:00:00 2001 From: Arunachalam Date: Mon, 8 Mar 2021 20:53:00 +0530 Subject: [PATCH 02/13] Add observation, projects and survey pipeline programs --- .gitignore | 2 + observations/config.sample | 176 +++++ .../py_observation_evidence_streaming.py | 174 +++++ observations/py_observation_streaming.py | 647 ++++++++++++++++++ .../pyspark_observation_status_batch.py | 626 +++++++++++++++++ projects/config.sample | 64 ++ projects/pyspark_project_batch.py | 499 ++++++++++++++ survey/config.sample | 70 ++ survey/py_survey_evidence_streaming.py | 176 +++++ survey/py_survey_streaming.py | 397 +++++++++++ 10 files changed, 2831 insertions(+) create mode 100644 .gitignore create mode 100644 observations/config.sample create mode 100755 observations/py_observation_evidence_streaming.py create mode 100755 observations/py_observation_streaming.py create mode 100644 observations/pyspark_observation_status_batch.py create mode 100644 projects/config.sample create mode 100644 projects/pyspark_project_batch.py create mode 100644 survey/config.sample create mode 100755 survey/py_survey_evidence_streaming.py create mode 100755 survey/py_survey_streaming.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e7e215d --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +.idea +*/config.ini \ No newline at end of file diff --git a/observations/config.sample b/observations/config.sample new file mode 100644 index 0000000..c2415ef --- /dev/null +++ b/observations/config.sample @@ -0,0 +1,176 @@ +[COMMON] + +diksha_survey_app_name = +diksha_integrated_app_name = +observation_status_output_dir = + +[API_HEADERS] + +# -------------------------------- +# API variables +# -------------------------------- + +headers_getkeyclock = {'Content-Type': 'application/x-www-form-urlencoded'} +client_id = +grant_type = refresh_token + +# -------------------- +# Header API variables +# -------------------- + +content_type = application/json +authorization = +refresh_token = + +# ----------------------- +# Channel Id +# ----------------------- + +channel-id = + + +[URL] + +base_url = http://: +url_getkeyclock = https:///auth/realms/sunbird/protocol/openid-connect/token + +url_entity_related = assessment/api/v1/entities/relatedEntities/ +url_entity_relatedapi = http://:/assessment/api/v1/entities/relatedEntities/ + + +sunbird_api_base_url_ip = http://:/api +sunbrid_api_url_readuser = user/v1/read +sunbrid_api_url_syncuser = data/v1/index/sync + +url_user_profile_api = assessment/api/v1/userExtension/getProfile/ + +sunbird_api_url_searchuser = user/v1/search + + +[MONGO] + +# -------------- +# Mongo prod url +#--------------- + +mongo_url = mongodb://: + +# ----------------------- +# Mongo database name +# ----------------------- + +database_name = + +# ------------------------------------- +# Mongo observationSubmission Collection +# ------------------------------------- + +observation_sub_collec = + +# ------------------------- +# Mongo solutions Collection +# -------------------------- + +solutions_collec = + +# ---------------------------- +# Mongo observations Collection +# ----------------------------- + +observations_collec = + +# --------------------------- +# Mongo entityTypes Collection +# ---------------------------- + +entity_type_collec = + +# ------------------------- +# Mongo questions Collection +# ------------------------- + +questions_collec = + +# ------------------------ +# Mongo criteria Collection +# ----------------------- + +criteria_collec = + +# ----------------------- +# Mongo entities Collection +# ----------------------- + +entities_collec = + +# ----------------------- +# Mongo programs Collection +# ----------------------- + +programs_collec = + +# ----------------------- +# Mongo user_roles Collection +# ----------------------- + +user_roles_collection = + +[DRUID] + +urlQuery = http://:/druid/indexer/v1/supervisor + +druid_end_point = http://:/druid/coordinator/v1/datasources/ + +druid_batch_end_point = http://:/druid/indexer/v1/task + +observation_status_spec = + +observation_spec = + +observation_evidence_spec = + +[KAFKA] + +kafka_url = : + +kafka_raw_data_topic = + +kafka_druid_topic = + +kafka_evidence_druid_topic = + +[LOGS] + +observation_streaming_success_log_filename = /success.log + +observation_streaming_error_log_filename = /error.log + +observation_streaming_evidence_success_log_filename = /success.log + +observation_streaming_evidence_error_log_filename = /error.log + +observation_status_success_log_filename = /status/success.log + +observation_status_error_log_filename = /status/error.log + +[ELASTICSEARCH] + +header = {'Content-Type': 'application/json'} + +url_user = http://:/users/_search/?scroll=1m + +user_body = {"size": 10000,"query":{"bool":{"must":[{"match":{"_type":"common"}}]}}} + +url_user_scroll = http://:/_search/scroll + +url_entity = http://:/entities/_search/?scroll=1m + +[AZURE] + +account_name = + +sas_token = + +container_name = + +blob_path = \ No newline at end of file diff --git a/observations/py_observation_evidence_streaming.py b/observations/py_observation_evidence_streaming.py new file mode 100755 index 0000000..4f1328f --- /dev/null +++ b/observations/py_observation_evidence_streaming.py @@ -0,0 +1,174 @@ +# ----------------------------------------------------------------- +# Name : py_observation_evidence_streaming.py +# Author : Shakthieshwari.A +# Description : Extracts the Evidence or Files Attached at each question level +# during the observation submission +# ----------------------------------------------------------------- +from pymongo import MongoClient +from bson.objectid import ObjectId +import csv,os +import json +import boto3 +import datetime +from datetime import date,time +import requests +import argparse +from kafka import KafkaConsumer +from kafka import KafkaProducer +import dateutil +from dateutil import parser as date_parser +from configparser import ConfigParser,ExtendedInterpolation +import faust +import logging +import logging.handlers +import time +from logging.handlers import TimedRotatingFileHandler + +config_path = os.path.dirname(os.path.abspath(__file__)) +config = ConfigParser(interpolation=ExtendedInterpolation()) +config.read(config_path + "/config.ini") + +formatter = logging.Formatter('%(asctime)s - %(levelname)s') + +successLogger = logging.getLogger('success log') +successLogger.setLevel(logging.DEBUG) + +# Add the log message handler to the logger +successHandler = logging.handlers.\ + RotatingFileHandler(config.get('LOGS','observation_streaming_evidence_success_log_filename')) +successBackuphandler = TimedRotatingFileHandler(config.get('LOGS','observation_streaming_evidence_success_log_filename'), + when="w0",backupCount=1) +successHandler.setFormatter(formatter) +successLogger.addHandler(successHandler) +successLogger.addHandler(successBackuphandler) + +errorLogger = logging.getLogger('error log') +errorLogger.setLevel(logging.ERROR) +errorHandler = logging.handlers.\ + RotatingFileHandler(config.get('LOGS','observation_streaming_evidence_error_log_filename')) +errorBackuphandler = TimedRotatingFileHandler(config.get('LOGS','observation_streaming_evidence_error_log_filename'), + when="w0",backupCount=1) +errorHandler.setFormatter(formatter) +errorLogger.addHandler(errorHandler) +errorLogger.addHandler(errorBackuphandler) + +try: + kafka_url = (config.get("KAFKA","kafka_url")) + app = faust.App('sl_observation_evidences_diksha_faust',broker='kafka://'+kafka_url,value_serializer='raw', + web_port=7002,broker_max_poll_records=500) + rawTopicName = app.topic(config.get("KAFKA","kafka_raw_data_topic")) + producer = KafkaProducer(bootstrap_servers=[kafka_url]) + + #db production + clientdev = MongoClient(config.get('MONGO','mongo_url')) + dbdev = clientdev[config.get('MONGO','database_name')] + + observationSubmissionsDevCollec = dbdev[config.get('MONGO','observation_sub_collec')] + solutionsDevCollec = dbdev[config.get('MONGO','solutions_collec')] + observationDevCollec = dbdev[config.get('MONGO','observations_collec')] + entityTypeDevCollec = dbdev[config.get('MONGO','entity_type_collec')] + questionsDevCollec = dbdev[config.get('MONGO','questions_collec')] + criteriaDevCollec = dbdev[config.get('MONGO','criteria_collec')] + entitiesDevCollec = dbdev[config.get('MONGO','entities_collec')] +except Exception as e: + errorLogger.error(e,exc_info=True) + +try : + def convert(lst): + return ','.join(lst) +except Exception as e: + errorLogger.error(e,exc_info=True) + +try: + def evidence_extraction(msg_id): + for obSub in observationSubmissionsDevCollec.find({'_id':ObjectId(msg_id)}): + successLogger.debug("Observation Evidence Submission Id : " + str(msg_id)) + try: + completedDate = str(datetime.datetime.date(obSub['completedDate'])) + 'T' \ + + str(datetime.datetime.time(obSub['completedDate'])) + 'Z' + except KeyError: + pass + evidence_sub_count = 0 + try: + answersArr = [ v for v in obSub['answers'].values()] + except KeyError: + pass + for ans in answersArr: + try: + if len(ans['fileName']): + evidence_sub_count = evidence_sub_count + len(ans['fileName']) + except KeyError: + if len(ans['instanceFileName']): + for instance in ans['instanceFileName']: + evidence_sub_count = evidence_sub_count + len(instance) + for answer in answersArr: + observationSubQuestionsObj = {} + observationSubQuestionsObj['completedDate'] = completedDate + observationSubQuestionsObj['total_evidences'] = evidence_sub_count + observationSubQuestionsObj['userName'] = obSub['evidencesStatus'][0]['submissions'][0]['submittedByName'] + observationSubQuestionsObj['userName'] = observationSubQuestionsObj['userName'].replace("null","") + observationSubQuestionsObj['observationSubmissionId'] = str(obSub['_id']) + observationSubQuestionsObj['school'] = str(obSub['entityId']) + observationSubQuestionsObj['schoolExternalId'] = obSub['entityExternalId'] + observationSubQuestionsObj['schoolName'] = obSub['entityInformation']['name'] + observationSubQuestionsObj['entityTypeId'] = str(obSub['entityTypeId']) + observationSubQuestionsObj['createdBy'] = obSub['createdBy'] + observationSubQuestionsObj['solutionExternalId'] = obSub['solutionExternalId'] + observationSubQuestionsObj['solutionId'] = str(obSub['solutionId']) + observationSubQuestionsObj['observationId'] = str(obSub['observationId']) + try : + observationSubQuestionsObj['appName'] = obSub["appInformation"]["appName"].lower() + except KeyError : + observationSubQuestionsObj['appName'] = config.get("COMMON","diksha_survey_app_name") + fileName = [] + fileSourcePath = [] + try: + observationSubQuestionsObj['remarks'] = answer['remarks'] + observationSubQuestionsObj['questionName'] = answer['payload']['question'][0] + except KeyError: + pass + observationSubQuestionsObj['questionId'] = str(answer['qid']) + for ques in questionsDevCollec.find({'_id':ObjectId(observationSubQuestionsObj['questionId'])}): + observationSubQuestionsObj['questionExternalId'] = ques['externalId'] + observationSubQuestionsObj['questionResponseType'] = answer['responseType'] + evidence = [] + evidenceCount = 0 + try: + if answer['fileName']: + evidence = answer['fileName'] + observationSubQuestionsObj['evidence_count'] = len(evidence) + evidenceCount = len(evidence) + except KeyError: + if answer['instanceFileName']: + for inst in answer['instanceFileName'] : + evidence.extend(inst) + observationSubQuestionsObj['evidence_count'] = len(evidence) + evidenceCount = len(evidence) + for evi in evidence: + fileName.append(evi['name']) + fileSourcePath.append(evi['sourcePath']) + observationSubQuestionsObj['fileName'] = convert(fileName) + observationSubQuestionsObj['fileSourcePath'] = convert(fileSourcePath) + if evidenceCount > 0: + producer.send((config.get("KAFKA","kafka_evidence_druid_topic")), json.dumps(observationSubQuestionsObj) + .encode('utf-8')) + producer.flush() + successLogger.debug("Send Obj to Kafka") +except Exception as e: + errorLogger.error(e,exc_info=True) + + +try: + @app.agent(rawTopicName) + async def observationEvidenceFaust(consumer) : + async for msg in consumer : + msg_val = msg.decode('utf-8') + msg_data = json.loads(msg_val) + successLogger.debug("========== START OF OBSERVATION EVIDENCE SUBMISSION ========") + obj_arr = evidence_extraction(msg_data['_id']) + successLogger.debug("********* END OF OBSERVATION EVIDENCE SUBMISSION ***********") +except Exception as e: + errorLogger.error(e,exc_info=True) + +if __name__ == '__main__': + app.main() diff --git a/observations/py_observation_streaming.py b/observations/py_observation_streaming.py new file mode 100755 index 0000000..e61ebc7 --- /dev/null +++ b/observations/py_observation_streaming.py @@ -0,0 +1,647 @@ +# ----------------------------------------------------------------- +# Name : sl_py_observation_streaming.py +# Author : Ashwini.E , Shakthieshwari.A +# Description : +# This is streaming program +# Reads the data from Kafka topic process the observation submitted data +# ----------------------------------------------------------------- + +# Program to read data from one kafka topic and produce it to another kafka topic +import faust +from pymongo import MongoClient +from bson.objectid import ObjectId +import csv,os +import json +import boto3 +import datetime +from datetime import date,time +import requests +import argparse +from kafka import KafkaConsumer +from kafka import KafkaProducer +from configparser import ConfigParser,ExtendedInterpolation +import logging +import logging.handlers +import time +from logging.handlers import TimedRotatingFileHandler + +config_path = os.path.dirname(os.path.abspath(__file__)) +config = ConfigParser(interpolation=ExtendedInterpolation()) +config.read(config_path + "/config.ini") + +formatter = logging.Formatter('%(asctime)s - %(levelname)s') + + +successLogger = logging.getLogger('success log') +successLogger.setLevel(logging.DEBUG) + +# Add the log message handler to the logger +successHandler = logging.handlers.\ + RotatingFileHandler(config.get('LOGS','observation_streaming_success_log_filename')) +successBackuphandler = TimedRotatingFileHandler(config.get('LOGS','observation_streaming_success_log_filename'), + when="w0",backupCount=1) +successHandler.setFormatter(formatter) +successLogger.addHandler(successHandler) +successLogger.addHandler(successBackuphandler) + +errorLogger = logging.getLogger('error log') +errorLogger.setLevel(logging.ERROR) +errorHandler = logging.handlers.\ + RotatingFileHandler(config.get('LOGS','observation_streaming_error_log_filename')) +errorBackuphandler = TimedRotatingFileHandler(config.get('LOGS','observation_streaming_error_log_filename'), + when="w0",backupCount=1) +errorHandler.setFormatter(formatter) +errorLogger.addHandler(errorHandler) +errorLogger.addHandler(errorBackuphandler) + +try: + kafka_url = (config.get("KAFKA","kafka_url")) + #consume the message from kafka topic + app = faust.App('sl_observation_diksha_faust',broker='kafka://'+kafka_url,value_serializer='raw', + web_port=7001,broker_max_poll_records=500) + rawTopicName = app.topic(config.get("KAFKA","kafka_raw_data_topic")) + producer = KafkaProducer(bootstrap_servers=[kafka_url]) + + #db production + clientdev = MongoClient(config.get('MONGO','mongo_url')) + dbdev = clientdev[config.get('MONGO','database_name')] + + observationSubmissionsDevCollec = dbdev[config.get('MONGO','observation_sub_collec')] + solutionsDevCollec = dbdev[config.get('MONGO','solutions_collec')] + observationDevCollec = dbdev[config.get('MONGO','observations_collec')] + entityTypeDevCollec = dbdev[config.get('MONGO','entity_type_collec')] + questionsDevCollec = dbdev[config.get('MONGO','questions_collec')] + criteriaDevCollec = dbdev[config.get('MONGO','criteria_collec')] + entitiesDevCollec = dbdev[config.get('MONGO','entities_collec')] + programsDevCollec = dbdev[config.get('MONGO','programs_collec')] +except Exception as e: + errorLogger.error(e,exc_info=True) + +try: + def removeduplicate(it): + seen = [] + for x in it: + if x not in seen: + yield x + seen.append(x) +except Exception as e: + errorLogger.error(e,exc_info=True) + +try: + #getKeyclock api to generate authentication token + def get_keyclock_accesstoken(): + url_getkeyclock = config.get("URL","url_getkeyclock") + headers_getkeyclock = {'Content-Type': 'application/x-www-form-urlencoded'} + body_getkeyclock = {"grant_type":config.get("API_HEADESRS","grant_type"), + "client_id":config.get("API_HEADESRS","client_id"), + "refresh_token":config.get("API_HEADESRS","refresh_token")} + + responsegetkeyclock = requests.post(url_getkeyclock, data=body_getkeyclock,headers=headers_getkeyclock) + if responsegetkeyclock.status_code == 200: + successLogger.debug("getkeyclock api") + return responsegetkeyclock.json() + else: + errorLogger.error("Failure in getkeyclock API") + errorLogger.error(responsegetkeyclock) + errorLogger.error(responsegetkeyclock.text) +except Exception as e: + errorLogger.error(e,exc_info=True) + +try: + def getRelatedEntity(entityId,accessToken): + urlEntityRelated = config.get("URL","base_url") + "/" + config.get("URL","url_entity_related") + str(entityId) + headersEntityRelated ={ + 'Content-Type': config.get("API_HEADERS","content_type"), + 'Authorization': "Bearer "+ config.get("API_HEADESRS","authorization"), + 'X-authenticated-user-token': accessToken, + 'X-Channel-id' : config.get("API_HEADESRS","channel-id") + } + responseEntityRelated = requests.get(urlEntityRelated, headers=headersEntityRelated) + if responseEntityRelated.status_code == 200 : + successLogger.debug("entityRelated api") + return responseEntityRelated.json() + else: + errorLogger.error(" Failure in EntityRelatedApi ") + errorLogger.error(responseEntityRelated) + errorLogger.error(responseEntityRelated.text) +except Exception as e: + errorLogger.error(e,exc_info=True) + +try: + def syncUser(userId,accessToken): + urlSyncUser = config.get("URL","sunbird_api_base_url_ip") + "/" + config.get("URL","sunbrid_api_url_syncuser") + headersSyncUser ={ + 'Content-Type': config.get("API_HEADERS","content_type"), + 'Authorization': "Bearer "+ config.get("API_HEADESRS","authorization"), + 'X-authenticated-user-token': accessToken, + 'X-Channel-id' : config.get("API_HEADESRS","channel-id") + } + body_sync_user = {"params": {},"request": {"objectType": "user","objectIds": [userId]}} + responseSyncUser = requests.post(urlSyncUser, headers=headersSyncUser,data=json.dumps(body_sync_user)) + if responseSyncUser.status_code == 200 : + successLogger.debug("user sync api") + return True + else : + errorLogger.error("user sync api failed") + errorLogger.error(responseSyncUser) + errorLogger.error(responseSyncUser.text) +except Exception as e: + errorLogger.error(e,exc_info=True) + +try: + def readUser(userId,accessToken,userSyncCnt): + queryStringReadUser = "?fields=completeness%2CmissingFields%2ClastLoginTime%2Ctopics%2Corganisations%2Croles%2Clocations%2Cdeclarations" + urlReadUser = config.get("URL","sunbird_api_base_url_ip") + "/" + config.get("URL","sunbrid_api_url_readuser") \ + + "/" + str(userId) + queryStringReadUser + headersReadUser ={ + 'Content-Type': config.get("API_HEADERS","content_type"), + 'Authorization': "Bearer "+ config.get("API_HEADESRS","authorization"), + 'X-authenticated-user-token': accessToken, + 'X-Channel-id' : config.get("API_HEADESRS","channel-id") + } + responseReadUser = requests.get(urlReadUser, headers=headersReadUser) + if responseReadUser.status_code == 200 : + successLogger.debug("read user api") + return responseReadUser.json() + else: + errorLogger.error("read user api failed") + errorLogger.error(responseReadUser) + errorLogger.error(responseReadUser.text) + if responseReadUser.status_code == 404 : + responseReadUser = responseReadUser.json() + if responseReadUser["params"]["status"] == "USER_NOT_FOUND": + syncUserStatus = syncUser(userId,accessToken) + if syncUserStatus == True and userSyncCnt == 1: + userSyncCnt = userSyncCnt + 1 + readUser(userId,accessToken,userSyncCnt) +except Exception as e: + errorLogger.error(e,exc_info=True) + +try: + def getUserRoles(userId,accessToken): + urlUserRoles = config.get("URL","base_url") + "/" + config.get("URL","url_user_profile_api") + str(userId) + headersUserRoles ={ + 'Content-Type': config.get("API_HEADESRS","content_type"), + 'Authorization': "Bearer "+ config.get("API_HEADESRS","authorization"), + 'X-authenticated-user-token': accessToken, + 'X-Channel-id' : config.get("API_HEADESRS","channel-id") + } + responseUserRoles = requests.get(urlUserRoles, headers=headersUserRoles) + if responseUserRoles.status_code == 200 : + successLogger.debug("user profile api") + return responseUserRoles.json() + else: + errorLogger.error("user profile api failed") + errorLogger.error(responseUserRoles) + errorLogger.error(responseUserRoles.text) +except Exception as e: + errorLogger.error(e,exc_info=True) + +try: + def obj_creation(msg_id): + data_keyclock = {} + data_keyclock = get_keyclock_accesstoken() + tokenKeyCheck = None + tokenKeyCheck = "access_token" in data_keyclock + if tokenKeyCheck == True : + accessToken= data_keyclock['access_token'] + successLogger.debug("Observation Submission Id : " + str(msg_id)) + cursorMongo = observationSubmissionsDevCollec.find({'_id':ObjectId(msg_id)}, no_cursor_timeout=True) + for obSub in cursorMongo : + observationSubQuestionsArr = [] + completedDate = str(datetime.datetime.date(obSub['completedDate'])) + 'T' + \ + str(datetime.datetime.time(obSub['completedDate'])) + 'Z' + createdAt = str(datetime.datetime.date(obSub['createdAt'])) + 'T' + \ + str(datetime.datetime.time(obSub['createdAt'])) + 'Z' + updatedAt = str(datetime.datetime.date(obSub['updatedAt'])) + 'T' + \ + str(datetime.datetime.time(obSub['updatedAt'])) + 'Z' + evidencesArr = [ v for v in obSub['evidences'].values() ] + evidence_sub_count = 0 + entityId = obSub['entityId'] + + # fetch entity latitude and longitude from the database + entityLatitude = None + entityLongitude = None + for ent in entitiesDevCollec.find({'_id':ObjectId(entityId)}): + try : + if ent['metaInformation']['gpsLocation'] : + gpsLocation = (ent['metaInformation']['gpsLocation']).split(',') + entityLatitude = gpsLocation[0] + entityLongitude = gpsLocation[1] + except KeyError : + entityLatitude = None + entityLongitude = None + pass + userSyncCnt = 1 + # fetch user name from postgres with the help of keycloak id + queryJsonOutput = {} + queryJsonOutput = readUser(obSub["createdBy"],accessToken,userSyncCnt) + if queryJsonOutput : + if queryJsonOutput["result"]["response"]["userName"] : + userRoles = {} + obsAppName = None + try : + obsAppName = obSub["appInformation"]["appName"].lower() + except KeyError : + obsAppName = config.get("COMMON","diksha_survey_app_name") + if obsAppName == config.get("COMMON","diksha_survey_app_name") : + userRoles = getUserRoles(obSub["createdBy"],accessToken) + userRolesArr = [] + if userRoles: + userRoleKeyCheck = "result" in userRoles + if userRoleKeyCheck == True : + try : + if len(userRoles["result"]["roles"]) > 0 : + for rol in userRoles["result"]["roles"] : + for ent in rol["entities"]: + userEntityRelated = None + userEntityRelated = getRelatedEntity(ent["_id"],accessToken) + userEntityRelatedResultKeyCheck = None + roleObj = {} + roleObj["role_id"] = rol["_id"] + roleObj["role_externalId"] = rol["code"] + roleObj["role_title"] = rol["title"] + if userEntityRelated: + userEntityRelatedResultKeyCheck = "result" in userEntityRelated + if userEntityRelatedResultKeyCheck == True: + if userEntityRelated["result"]: + if (userEntityRelated["result"]["entityType"] == "district") or \ + (userEntityRelated["result"]["entityType"] == "block") or \ + (userEntityRelated["result"]["entityType"] == "cluster"): + roleObj['user_'+userEntityRelated["result"]["entityType"]+'Name'] = userEntityRelated["result"]["metaInformation"]["name"] + for usrEntityData in userEntityRelated["result"]["relatedEntities"]: + if (usrEntityData['entityType'] == "district") or \ + (usrEntityData['entityType'] == "block") or \ + (usrEntityData['entityType'] == "cluster") : + roleObj['user_'+usrEntityData['entityType']+'Name'] = usrEntityData['metaInformation']['name'] + userRolesArr.append(roleObj) + except KeyError : + pass + userRolesArrUnique = [] + if len(userRolesArr) > 0: + userRolesArrUnique = list(removeduplicate(userRolesArr)) + elif obsAppName == config.get("COMMON","diksha_integrated_app_name") : + userRolesArrUnique = [] + roleObj = {} + roleObj["role_id"] = "integrated_app" + roleObj["role_externalId"] = "integrated_app" + roleObj["role_title"] = queryJsonOutput["result"]["response"]["userSubType"] + try : + for usrLoc in queryJsonOutput["result"]["response"]["userLocations"]: + roleObj['user_'+usrLoc["type"]+'Name'] = usrLoc["name"] + userRolesArrUnique.append(roleObj) + except KeyError : + pass + entityRelated = None + entityRelated = getRelatedEntity(entityId,accessToken) + entityRelatedResultKeyCheck = None + entityRelatedData = None + if entityRelated: + entityRelatedResultKeyCheck = "result" in entityRelated + if entityRelatedResultKeyCheck == True: + entityRelatedData = entityRelated['result'] + + if 'answers' in obSub.keys() : + answersArr = [ v for v in obSub['answers'].values()] + for ans in answersArr: + try: + if len(ans['fileName']): + evidence_sub_count = evidence_sub_count + len(ans['fileName']) + except KeyError: + pass + for ans in answersArr: + def sequenceNumber(externalId,answer): + for solu in solutionsDevCollec.find({'externalId':obSub['solutionExternalId']}): + section = [k for k in solu['sections'].keys()] + #parsing through questionSequencebyecm to get the sequence number + try: + for num in range(len(solu['questionSequenceByEcm'][answer['evidenceMethod']][section[0]])): + if solu['questionSequenceByEcm'][answer['evidenceMethod']][section[0]][num] == externalId: + return num + 1 + except KeyError: + pass + + def creatingObj(answer,quesexternalId,ans_val,instNumber,responseLabel,entityLatitudeCreateObjFn, + entityLongitudeCreateObjFn,usrRolFn): + observationSubQuestionsObj = {} + observationSubQuestionsObj['userName'] = obSub['evidencesStatus'][0]['submissions'][0]['submittedByName'] + observationSubQuestionsObj['userName'] = observationSubQuestionsObj['userName'].replace("null","") + observationSubQuestionsObj['observationSubmissionId'] = str(obSub['_id']) + observationSubQuestionsObj['appName'] = obsAppName + # geo tag validation , question answered within 200 meters of the selected entity + if entityLatitudeCreateObjFn and entityLongitudeCreateObjFn : + entityGeoFencing = (entityLatitudeCreateObjFn,entityLongitudeCreateObjFn) + answerGpsLocation = [] + try : + if answer['gpsLocation']: + answerGpsLocation = answer['gpsLocation'].split(',') + answerLatitude = None + answerLongitude = None + answerLatitude = answerGpsLocation[0] + answerLongitude = answerGpsLocation[1] + except KeyError : + answerGpsLocation = [] + pass + + if len(answerGpsLocation) > 0 : + answerGeoFencing = (answerLatitude,answerLongitude) + calcuGeoLocMtrs = (geodesic(entityGeoFencing, answerGeoFencing).km)*1000 + calcuGeoLocMtrsFloat = float(calcuGeoLocMtrs) + + if calcuGeoLocMtrsFloat <= float(200) : + observationSubQuestionsObj['location_validated_with_geotag'] = 'verified' + observationSubQuestionsObj['distance_in_meters'] = int(calcuGeoLocMtrsFloat) + + else : + observationSubQuestionsObj['location_validated_with_geotag'] = 'not verified' + observationSubQuestionsObj['distance_in_meters'] = int(calcuGeoLocMtrsFloat) + + else : + observationSubQuestionsObj['location_validated_with_geotag'] = 'gps location not found for question' + observationSubQuestionsObj['distance_in_meters'] = None + else : + observationSubQuestionsObj['location_validated_with_geotag'] = 'gps location not found for school' + observationSubQuestionsObj['distance_in_meters'] = None + + + observationSubQuestionsObj['entity'] = str(obSub['entityId']) + observationSubQuestionsObj['entityExternalId'] = obSub['entityExternalId'] + observationSubQuestionsObj['entityName'] = obSub['entityInformation']['name'] + + if entityRelatedData : + entityType = entityRelatedData["entityType"] + observationSubQuestionsObj[entityType] = entityRelatedData['_id'] + observationSubQuestionsObj[entityType+'Name'] = entityRelatedData['metaInformation']['name'] + observationSubQuestionsObj[entityType+'ExternalId'] = entityRelatedData['metaInformation']['externalId'] + for entityData in entityRelatedData["relatedEntities"]: + if entityData['entityType']: + entityType = entityData['entityType'] + observationSubQuestionsObj[entityType] = entityData['_id'] + observationSubQuestionsObj[entityType+'Name'] = entityData['metaInformation']['name'] + observationSubQuestionsObj[entityType+'ExternalId'] = entityData['metaInformation']['externalId'] + + observationSubQuestionsObj['entityTypeId'] = str(obSub['entityTypeId']) + try: + observationSubQuestionsObj['schoolTypes'] = obSub['entityInformation']['schoolTypes'] + observationSubQuestionsObj['administrationTypes'] = obSub['entityInformation']['administrationTypes'] + except KeyError: + pass + observationSubQuestionsObj['createdBy'] = obSub['createdBy'] + + try: + if obSub['isAPrivateProgram']: + observationSubQuestionsObj['isAPrivateProgram'] = obSub['isAPrivateProgram'] + else: + observationSubQuestionsObj['isAPrivateProgram'] = False + except KeyError: + observationSubQuestionsObj['isAPrivateProgram'] = False + pass + + try: + observationSubQuestionsObj['programExternalId'] = obSub['programExternalId'] + except KeyError : + observationSubQuestionsObj['programExternalId'] = None + try: + observationSubQuestionsObj['programId'] = str(obSub['programId']) + except KeyError : + observationSubQuestionsObj['programId'] = None + try: + for program in programsDevCollec.find({'externalId':obSub['programExternalId']}): + observationSubQuestionsObj['programName'] = program['name'] + except KeyError : + observationSubQuestionsObj['programName'] = None + + observationSubQuestionsObj['solutionExternalId'] = obSub['solutionExternalId'] + observationSubQuestionsObj['observationId'] = str(obSub['observationId']) + for solu in solutionsDevCollec.find({'externalId':obSub['solutionExternalId']}): + observationSubQuestionsObj['solutionName'] = solu['name'] + section = [k for k in solu['sections'].keys()] + observationSubQuestionsObj['section'] = section[0] + observationSubQuestionsObj['questionSequenceByEcm']= sequenceNumber(quesexternalId,answer) + + try: + if solu['scoringSystem'] == 'pointsBasedScoring': + observationSubQuestionsObj['totalScore'] = obSub['pointsBasedMaxScore'] + observationSubQuestionsObj['scoreAchieved'] = obSub['pointsBasedScoreAchieved'] + observationSubQuestionsObj['totalpercentage'] = obSub['pointsBasedPercentageScore'] + observationSubQuestionsObj['maxScore'] = answer['maxScore'] + observationSubQuestionsObj['minScore'] = answer['scoreAchieved'] + observationSubQuestionsObj['percentageScore'] = answer['percentageScore'] + observationSubQuestionsObj['pointsBasedScoreInParent'] = answer['pointsBasedScoreInParent'] + except KeyError: + pass + + for entTy in entityTypeDevCollec.find({'_id':obSub['entityTypeId']},{'name':1}): + observationSubQuestionsObj['entityType'] = entTy['name'] + for ob in observationDevCollec.find({'_id':obSub['observationId']}): + observationSubQuestionsObj['observationName'] = ob['name'] + observationSubQuestionsObj['questionId'] = str(answer['qid']) + observationSubQuestionsObj['questionAnswer'] = ans_val + observationSubQuestionsObj['questionResponseType'] = answer['responseType'] + if answer['responseType'] == 'number': + if answer['payload']['labels']: + observationSubQuestionsObj['questionResponseLabel_number'] = responseLabel + else: + observationSubQuestionsObj['questionResponseLabel_number'] = '' + if answer['payload']['labels']: + observationSubQuestionsObj['questionResponseLabel'] = responseLabel + else: + observationSubQuestionsObj['questionResponseLabel'] = '' + observationSubQuestionsObj['questionExternalId'] = quesexternalId + observationSubQuestionsObj['questionName'] = answer['payload']['question'][0] + observationSubQuestionsObj['questionECM'] = answer['evidenceMethod'] + observationSubQuestionsObj['criteriaId'] = str(answer['criteriaId']) + for crit in obSub["criteria"] : + if str(answer['criteriaId']) == str(crit["_id"]) : + try: + observationSubQuestionsObj['criteriaLevel'] = crit["score"] + except KeyError : + observationSubQuestionsObj['criteriaLevel'] = '' + try: + observationSubQuestionsObj['criteriaScore'] = crit["scoreAchieved"] + except KeyError : + observationSubQuestionsObj['criteriaScore'] = '' + for crit in criteriaDevCollec.find({'_id':ObjectId(answer['criteriaId'])}): + observationSubQuestionsObj['criteriaExternalId'] = crit['externalId'] + observationSubQuestionsObj['criteriaName'] = crit['name'] + observationSubQuestionsObj['completedDate'] = completedDate + observationSubQuestionsObj['createdAt'] = createdAt + observationSubQuestionsObj['updatedAt'] = updatedAt + observationSubQuestionsObj['remarks'] = answer['remarks'] + if len(answer['fileName']): + multipleFiles = None + fileCnt = 1 + for filedetail in answer['fileName']: + if fileCnt == 1: + multipleFiles = 'https://samikshaprod.blob.core.windows.net/samiksha/' + filedetail['sourcePath'] + fileCnt = fileCnt + 1 + else: + multipleFiles = multipleFiles + ' , ' + 'https://samikshaprod.blob.core.windows.net/samiksha/' + \ + filedetail['sourcePath'] + observationSubQuestionsObj['evidences'] = multipleFiles + observationSubQuestionsObj['evidence_count'] = len(answer['fileName']) + observationSubQuestionsObj['total_evidences'] = evidence_sub_count + # to fetch the parent question of matrix + if ans['responseType']=='matrix': + observationSubQuestionsObj['instanceParentQuestion'] = ans['payload']['question'][0] + observationSubQuestionsObj['instanceParentId'] = ans['qid'] + observationSubQuestionsObj['instanceParentResponsetype'] =ans['responseType'] + observationSubQuestionsObj['instanceParentCriteriaId'] =ans['criteriaId'] + for crit in criteriaDevCollec.find({'_id':ObjectId(ans['criteriaId'])}): + observationSubQuestionsObj['instanceParentCriteriaExternalId'] = crit['externalId'] + observationSubQuestionsObj['instanceParentCriteriaName'] = crit['name'] + observationSubQuestionsObj['instanceId'] = instNumber + for ques in questionsDevCollec.find({'_id':ObjectId(ans['qid'])}): + observationSubQuestionsObj['instanceParentExternalId'] = ques['externalId'] + observationSubQuestionsObj['instanceParentEcmSequence']= sequenceNumber(observationSubQuestionsObj['instanceParentExternalId'],answer) + else: + observationSubQuestionsObj['instanceParentQuestion'] = '' + observationSubQuestionsObj['instanceParentId'] = '' + observationSubQuestionsObj['instanceParentResponsetype'] ='' + observationSubQuestionsObj['instanceId'] = instNumber + observationSubQuestionsObj['instanceParentExternalId'] = '' + observationSubQuestionsObj['instanceParentEcmSequence'] = '' + observationSubQuestionsObj['user_id'] = queryJsonOutput["result"]["response"]["userName"] + observationSubQuestionsObj['channel'] = queryJsonOutput["result"]["response"]["rootOrgId"] + observationSubQuestionsObj['parent_channel'] = "SHIKSHALOKAM" + if usrRolFn : + observationSubQuestionsObj = { **usrRolFn , **observationSubQuestionsObj} + observationSubQuestionsObj["submissionNumber"] = obSub["submissionNumber"] + observationSubQuestionsObj["submissionTitle"] = obSub["title"] + + + return observationSubQuestionsObj + # fetching the question details from questions collection + def fetchingQuestiondetails(ansFn,instNumber,entityLatitudeQuesFn,entityLongitudeQuesFn): + for ques in questionsDevCollec.find({'_id':ObjectId(ansFn['qid'])}): + if len(ques['options']) == 0: + try: + if len(ansFn['payload']['labels']) > 0: + if(len(userRolesArrUnique)) > 0: + for usrRol in userRolesArrUnique : + finalObj = {} + finalObj = creatingObj(ansFn,ques['externalId'],ansFn['value'],instNumber, + ansFn['payload']['labels'][0], + entityLatitudeQuesFn,entityLongitudeQuesFn,usrRol) + producer.send((config.get("KAFKA","kafka_druid_topic")), json.dumps(finalObj) + .encode('utf-8')) + producer.flush() + successLogger.debug("Send Obj to Kafka") + else : + finalObj = {} + finalObj = creatingObj(ansFn,ques['externalId'],ansFn['value'],instNumber, + ansFn['payload']['labels'][0], + entityLatitudeQuesFn,entityLongitudeQuesFn,None) + producer.send((config.get("KAFKA","kafka_druid_topic")), json.dumps(finalObj) + .encode('utf-8')) + producer.flush() + successLogger.debug("Send Obj to Kafka") + except KeyError: + pass + else: + labelIndex = 0 + for quesOpt in ques['options']: + try: + if type(ansFn['value']) == str or type(ansFn['value']) == int: + if quesOpt['value'] == ansFn['value'] : + if(len(userRolesArrUnique)) > 0: + for usrRol in userRolesArrUnique : + finalObj = {} + finalObj = creatingObj(ansFn,ques['externalId'],ansFn['value'], + instNumber,ansFn['payload']['labels'][0], + entityLatitudeQuesFn,entityLongitudeQuesFn,usrRol) + producer.send((config.get("KAFKA","kafka_druid_topic")), json.dumps(finalObj) + .encode('utf-8')) + producer.flush() + successLogger.debug("Send Obj to Kafka") + else : + finalObj = {} + finalObj = creatingObj(ansFn,ques['externalId'],ansFn['value'], + instNumber,ansFn['payload']['labels'][0], + entityLatitudeQuesFn,entityLongitudeQuesFn,None) + producer.send((config.get("KAFKA","kafka_druid_topic")), json.dumps(finalObj) + .encode('utf-8')) + producer.flush() + successLogger.debug("Send Obj to Kafka") + + elif type(ansFn['value']) == list: + for ansArr in ansFn['value']: + if quesOpt['value'] == ansArr: + if(len(userRolesArrUnique)) > 0: + for usrRol in userRolesArrUnique : + finalObj = {} + finalObj = creatingObj(ansFn,ques['externalId'],ansArr,instNumber, + quesOpt['label'], + entityLatitudeQuesFn, + entityLongitudeQuesFn,usrRol) + producer.send((config.get("KAFKA","kafka_druid_topic")), json.dumps(finalObj) + .encode('utf-8')) + producer.flush() + successLogger.debug("Send Obj to Kafka") + else : + finalObj = {} + finalObj = creatingObj(ansFn,ques['externalId'],ansArr,instNumber, + quesOpt['label'], + entityLatitudeQuesFn, + entityLongitudeQuesFn,None) + producer.send((config.get("KAFKA","kafka_druid_topic")), json.dumps(finalObj) + .encode('utf-8')) + producer.flush() + successLogger.debug("Send Obj to Kafka") + labelIndex = labelIndex + 1 + except KeyError: + pass + #to check the value is null ie is not answered + try: + if type(ansFn['value']) == str and ansFn['value'] == '': + if(len(userRolesArrUnique)) > 0: + for usrRol in userRolesArrUnique : + finalObj = {} + finalObj = creatingObj(ansFn,ques['externalId'],ansFn['value'],instNumber,None, + entityLatitudeQuesFn, + entityLongitudeQuesFn,usrRol) + producer.send((config.get("KAFKA","kafka_druid_topic")), json.dumps(finalObj) + .encode('utf-8')) + producer.flush() + successLogger.debug("Send Obj to Kafka") + else : + finalObj = {} + finalObj = creatingObj(ansFn,ques['externalId'],ansFn['value'],instNumber,None, + entityLatitudeQuesFn,entityLongitudeQuesFn,None) + producer.send((config.get("KAFKA","kafka_druid_topic")), json.dumps(finalObj) + .encode('utf-8')) + producer.flush() + successLogger.debug("Send Obj to Kafka") + except KeyError: + pass + + if ans['responseType'] == 'text' or ans['responseType'] == 'radio' \ + or ans['responseType'] == 'multiselect' or ans['responseType'] == 'slider' \ + or ans['responseType'] == 'number' or ans['responseType'] == 'date': + inst_cnt = '' + fetchingQuestiondetails(ans,inst_cnt,entityLatitude,entityLongitude) + elif ans['responseType'] == 'matrix' and len(ans['value'])>0: + inst_cnt =0 + for instances in ans['value']: + inst_cnt = inst_cnt + 1 + for instance in instances.values(): + fetchingQuestiondetails(instance,inst_cnt,entityLatitude,entityLongitude) + + cursorMongo.close() +except Exception as e: + errorLogger.error(e,exc_info=True) + +try: + @app.agent(rawTopicName) + async def observationFaust(consumer) : + async for msg in consumer : + msg_val = msg.decode('utf-8') + msg_data = json.loads(msg_val) + successLogger.debug("========== START OF OBSERVATION SUBMISSION ========") + obj_arr = obj_creation(msg_data['_id']) + successLogger.debug("********* END OF OBSERVATION SUBMISSION ***********") +except Exception as e: + errorLogger.error(e,exc_info=True) + +if __name__ == '__main__': + app.main() diff --git a/observations/pyspark_observation_status_batch.py b/observations/pyspark_observation_status_batch.py new file mode 100644 index 0000000..eb2f806 --- /dev/null +++ b/observations/pyspark_observation_status_batch.py @@ -0,0 +1,626 @@ +# ----------------------------------------------------------------- +# Name : pyspark_observation_status_batch.py +# Author : Shakthieshwari.A +# Description : Extracts the Status of the observation submissions +# either notStarted / In-Progress / Completed along with the users entity information + +# ----------------------------------------------------------------- +import requests +import json,csv,sys,os,time +import datetime +from datetime import date +from configparser import ConfigParser,ExtendedInterpolation +from pymongo import MongoClient +from bson.objectid import ObjectId +from pyspark.sql import SparkSession +from pyspark.sql.functions import * +import pyspark.sql.functions as F +from pyspark.sql.types import * +from pyspark.sql import Row +from collections import OrderedDict,Counter +from cassandra.cluster import Cluster +from cassandra.query import SimpleStatement,ConsistencyLevel +import databricks.koalas as ks +from azure.storage.blob import BlockBlobService, PublicAccess +from azure.storage.blob import ContentSettings +import logging +import logging.handlers +import time +from logging.handlers import TimedRotatingFileHandler + + +config_path = os.path.dirname(os.path.abspath(__file__)) +config = ConfigParser(interpolation=ExtendedInterpolation()) +config.read(config_path + "/config.ini") + +formatter = logging.Formatter('%(asctime)s - %(levelname)s') + +successLogger = logging.getLogger('success log') +successLogger.setLevel(logging.DEBUG) + +# Add the log message handler to the logger +successHandler = logging.handlers.RotatingFileHandler(config.get('LOGS','observation_status_success_log_filename')) +successBackuphandler = TimedRotatingFileHandler(config.get('LOGS','observation_status_success_log_filename'), + when="w0",backupCount=1) +successHandler.setFormatter(formatter) +successLogger.addHandler(successHandler) +successLogger.addHandler(successBackuphandler) + +errorLogger = logging.getLogger('error log') +errorLogger.setLevel(logging.ERROR) +errorHandler = logging.handlers.RotatingFileHandler(config.get('LOGS','observation_status_error_log_filename')) +errorBackuphandler = TimedRotatingFileHandler(config.get('LOGS','observation_status_error_log_filename'), + when="w0",backupCount=1) +errorHandler.setFormatter(formatter) +errorLogger.addHandler(errorHandler) +errorLogger.addHandler(errorBackuphandler) + +try: + def get_keyclock_accesstoken(): + url_getkeyclock = config.get("URL","url_getkeyclock") + headers_getkeyclock = {'Content-Type': 'application/x-www-form-urlencoded'} + body_getkeyclock = {"grant_type":config.get("API_HEADESRS","grant_type"), + "client_id":config.get("API_HEADESRS","client_id"), + "refresh_token":config.get("API_HEADESRS","refresh_token")} + + responsegetkeyclock = requests.post(url_getkeyclock, data=body_getkeyclock,headers=headers_getkeyclock) + if responsegetkeyclock.status_code == 200: + successLogger.debug("getkeyclock api") + return responsegetkeyclock.json() + else: + errorLogger.error("Failure in getkeyclock API") + errorLogger.error(responsegetkeyclock) + errorLogger.error(responsegetkeyclock.text) +except Exception as e: + errorLogger.error(e,exc_info=True) + +try: + def searchUser(accessToken,userId): + queryStringReadUser = "?fields=completeness%2CmissingFields%2ClastLoginTime%2Ctopics%2Corganisations%2Croles%2Clocations%2Cdeclarations" + urlReadUser = config.get("URL","sunbird_api_base_url_ip") + "/" + config.get("URL","sunbird_api_url_readuser") \ + + "/" + str(userId) + queryStringReadUser + headersReadUser ={ + 'Content-Type': config.get("API_HEADERS","content_type"), + 'Authorization': "Bearer "+ config.get("API_HEADESRS","authorization"), + 'X-authenticated-user-token': accessToken + } + + try: + responseReadUser = requests.get(urlReadUser, headers=headersReadUser) + if responseReadUser.status_code == 200 : + return responseReadUser.json() + else: + successLogger.debug("Failure in Search User API") + successLogger.debug(responseReadUser.status_code) + successLogger.debug(responseReadUser.json()) + except Exception as e : + errorLogger.error("Search User API Failed") + errorLogger.error(e) + errorLogger.error(e,exc_info=True) +except Exception as e: + errorLogger.error(e,exc_info=True) + +get_keycloak_obj = get_keyclock_accesstoken() + +try: + def removeduplicate(it): + seen = [] + for x in it: + if x not in seen: + yield x + seen.append(x) +except Exception as e: + errorLogger.error(e,exc_info=True) + +try: + def chunks(l, n): + for i in range(0, len(l), n): + yield l[i:i + n] +except Exception as e: + errorLogger.error(e,exc_info=True) + +try: + def convert_to_row(d: dict) -> Row: + return Row(**OrderedDict(sorted(d.items()))) +except Exception as e: + errorLogger.error(e,exc_info=True) + + +clientProd = MongoClient(config.get('MONGO','mongo_url')) +dbProd = clientProd[config.get('MONGO','database_name')] + +obsSubmissionsCollec = dbProd[config.get('MONGO','observation_submissions_collec')] + +solutionCollec = dbProd[config.get('MONGO','solution_collec')] + +userRolesCollec = dbProd[config.get("MONGO","user_roles_collection")] + +programCollec = dbProd[config.get("MONGO","program_collec")] + +#observation submission dataframe +obs_sub_cursorMongo = obsSubmissionsCollec.aggregate([{"$project": {"_id": {"$toString": "$_id"}, + "entityId":{"$toString": "$entityId"},"status":1, + "entityExternalId":1,"entityInformation":{"name":1}, + "entityType":1,"createdBy":1, + "solutionId":{"$toString": "$solutionId"}, + "solutionExternalId":1,"updatedAt":1, + "programId":{"$toString": "$programId"}, + "programExternalId":1, + "appInformation":{"appName":1} + } + } + ] + ) + +#schema for the observation submission dataframe +obs_sub_schema = StructType([ + StructField('status', StringType(), True), + StructField('entityExternalId', StringType(), True), + StructField('entityId', StringType(), True), + StructField('entityType', StringType(), True), + StructField('createdBy', StringType(), True), + StructField('solutionId', StringType(), True), + StructField('solutionExternalId', StringType(), True), + StructField('programId', StringType(), True), + StructField('programExternalId', StringType(), True), + StructField('_id', StringType(), True), + StructField('updatedAt', TimestampType(), True), + StructField('entityInformtion',StructType([ + StructField('name', StringType(), True) + ])), + StructField('appInformation',StructType([ + StructField('appName', StringType(), True) + ])) + ]) +spark = SparkSession.builder.appName("obs_sub_status").config("spark.driver.memory", "50g")\ + .config("spark.executor.memory","100g")\ + .config("spark.memory.offHeap.enabled",True)\ + .config("spark.memory.offHeap.size","32g").getOrCreate() + +sc=spark.sparkContext + +obs_sub_rdd = spark.sparkContext.parallelize(list(obs_sub_cursorMongo)); +obs_sub_df1 = spark.createDataFrame(obs_sub_rdd,obs_sub_schema); + +obs_sub_df1 = obs_sub_df1.withColumn("date_time", to_timestamp(obs_sub_df1["updatedAt"], 'yyyy-MM-dd HH:mm:ss')) + +obs_sub_df1 = obs_sub_df1.withColumn("date",F.split(obs_sub_df1["date_time"], ' ')[0]) +obs_sub_df1 = obs_sub_df1.withColumn("time",F.split(obs_sub_df1["date_time"], ' ')[1]) + +obs_sub_df1 = obs_sub_df1.withColumn("app_name",\ + F.when(obs_sub_df1["appInformation"]["appName"].isNull(), + F.lit(config.get("COMMON","diksha_survey_app_name"))) + .otherwise(lower(obs_sub_df1["appInformation"]["appName"]))) + +obs_sub_df1 = obs_sub_df1.withColumn("timestamp",F.concat(F.col("date"),F.lit("T"),F.col("time"),F.lit(".000Z"))) +obs_sub_df = obs_sub_df1.select("status",obs_sub_df1["entityExternalId"].alias("entity_externalId"), + obs_sub_df1["entityId"].alias("entity_id"), + obs_sub_df1["entityType"].alias("entity_type"), + obs_sub_df1["createdBy"].alias("user_id"), + obs_sub_df1["solutionId"].alias("solution_id"), + obs_sub_df1["solutionExternalId"].alias("solution_externalId"), + obs_sub_df1["_id"].alias("submission_id"), + obs_sub_df1["entityInformation"]["name"].alias("entity_name"), + "timestamp",obs_sub_df1["programId"].alias("program_id"), + obs_sub_df1["programExternalId"].alias("program_externalId"), + obs_sub_df1["app_name"]) +obs_sub_cursorMongo.close() + +#observation solution dataframe +obs_sol_cursorMongo = solutionCollec.aggregate([{"$match":{"type":"observation"}}, + {"$project": {"_id": {"$toString": "$_id"},"name":1}}]) + +#schema for the observation solution dataframe +obs_sol_schema = StructType([ + StructField('name', StringType(), True), + StructField('_id', StringType(), True) +]) + +obs_soln_rdd = spark.sparkContext.parallelize(list(obs_sol_cursorMongo)); +obs_soln_df = spark.createDataFrame(obs_soln_rdd,obs_sol_schema); +obs_sol_cursorMongo.close() + +#match solution id from solution df to submission df to fetch the solution name +obs_sub_soln_df = obs_sub_df.join(obs_soln_df,obs_sub_df.solution_id==obs_soln_df._id,'inner').drop(obs_soln_df["_id"]) +obs_sub_soln_df = obs_sub_soln_df.withColumnRenamed("name","solution_name") + +#observation program dataframe +obs_pgm_cursorMongo = programCollec.aggregate([{"$project": {"_id": {"$toString": "$_id"},"name":1}}]) + +#schema for the observation program dataframe +obs_pgm_schema = StructType([ + StructField('name', StringType(), True), + StructField('_id', StringType(), True) +]) + +obs_pgm_rdd = spark.sparkContext.parallelize(list(obs_pgm_cursorMongo)); +obs_pgm_df = spark.createDataFrame(obs_pgm_rdd,obs_pgm_schema); +obs_pgm_cursorMongo.close() + +#match solution id from solution df to submission df to fetch the solution name +obs_sub_pgm_df = obs_sub_soln_df.join(obs_pgm_df,obs_sub_soln_df.program_id==obs_pgm_df._id,'inner')\ + .drop(obs_pgm_df["_id"]) +obs_sub_pgm_df = obs_sub_pgm_df.withColumnRenamed("name","program_name") + +#user organisation dataframe +obs_sub_soln_userid_df = obs_sub_pgm_df.select("user_id") + +userId_obs_status_df_before = [] +userId_obs_status_df_after = [] +userId_arr = [] +uniqueuserId_arr = [] +userId_obs_status_df_before = obs_sub_soln_userid_df.toJSON().map(lambda j: json.loads(j)).collect() +for uid in userId_obs_status_df_before : + userId_arr.append(uid["user_id"]) + +uniqueuserId_arr = list(removeduplicate(userId_arr)) +userIntegratedAppEntitiesArr = [] +for ch in uniqueuserId_arr : + searchUserObj = {} + searchUserObj = searchUser(get_keycloak_obj["access_token"],ch) + if searchUserObj: + searchResult = False + searchResult = "result" in searchUserObj + if searchResult == True : + searchResponse = False + searchResponse = "response" in searchUserObj["result"] + if searchResponse == True : + userRelatedEntitiesObj = {} + userRoles = None + try : + userRoles = searchUserObj["result"]["response"]["userSubType"] + except KeyError : + userRoles = '' + try : + for usrLoc in searchUserObj["result"]["response"]["userLocations"]: + userRelatedEntitiesObj[usrLoc["type"]+'_name'] = usrLoc["name"] + userRelatedEntitiesObj[usrLoc["type"]+'_id'] = usrLoc["id"] + userRelatedEntitiesObj[usrLoc["type"]+'_externalId'] = usrLoc["code"] + userRelatedEntitiesObj["user_id"] = searchUserObj["result"]["response"]["id"] + if userRoles : + userRelatedEntitiesObj["role_id"] = "integrated_app" + userRelatedEntitiesObj["role_externalId"] = "integrated_app" + userRelatedEntitiesObj["role_title"] = userRoles + except KeyError : + pass + if userRelatedEntitiesObj : + userIntegratedAppEntitiesArr.append(userRelatedEntitiesObj) + + for usOg in searchUserObj["result"]["response"]["organisations"]: + searchObj = {} + searchObj["id"] = searchUserObj["result"]["response"]["id"] + searchObj["user_name"] = searchUserObj["result"]["response"]["userName"] + searchObj["first_name"] = searchUserObj["result"]["response"]["firstName"] + searchObj["channel"] = searchUserObj["result"]["response"]["rootOrgId"] + searchObj["parent_channel"] = "SHIKSHALOKAM" + try: + searchObj["organisation_id"] = usOg["organisationId"] + except KeyError : + searchObj["organisation_id"] = None + userId_obs_status_df_after.append(searchObj) + +df_user_org = ks.DataFrame(userId_obs_status_df_after); +df_user_org = df_user_org.to_spark() + +if len(userIntegratedAppEntitiesArr) > 0 : + df_user_rel_entities = ks.DataFrame(userIntegratedAppEntitiesArr) + df_user_rel_entities = df_user_rel_entities.to_spark() + +# roles dataframe from mongodb +roles_cursorMongo = userRolesCollec.aggregate([{"$project": {"_id": {"$toString": "$_id"},"title":1}}]) + +#schema for the observation solution dataframe +roles_schema = StructType([ + StructField('title', StringType(), True), + StructField('_id', StringType(), True) +]) + +roles_rdd = spark.sparkContext.parallelize(list(roles_cursorMongo)); +roles_df = spark.createDataFrame(roles_rdd,roles_schema); + +roles_cursorMongo.close() + +# user roles along with entity from elastic search +userEntityRoleArray = [] + +try: + def elasticSearchJson(userEntityJson) : + for user in userEntityJson : + try: + if len(user["_source"]["data"]["roles"]) > 0 : + for roleObj in user["_source"]["data"]["roles"]: + try: + if len(roleObj["entities"]) > 0: + for ent in roleObj["entities"]: + entObj = {} + entObj["userId"] = user["_source"]["data"]["userId"] + entObj["roleId"] = roleObj["roleId"] + entObj["roleCode"] =roleObj["code"] + entObj["entityId"] = ent + userEntityRoleArray.append(entObj) + else : + entNoObj = {} + entNoObj["userId"] = user["_source"]["data"]["userId"] + entNoObj["roleId"] = roleObj["roleId"] + entNoObj["roleCode"] = roleObj["code"] + entNoObj["entityId"] = None + userEntityRoleArray.append(entNoObj) + except KeyError : + entNoEntObj = {} + entNoEntObj["userId"] = user["_source"]["data"]["userId"] + entNoEntObj["roleId"] = roleObj["roleId"] + entNoEntObj["roleCode"] = roleObj["code"] + entNoEntObj["entityId"] = None + userEntityRoleArray.append(entNoEntObj) + pass + except KeyError : + pass +except Exception as e: + errorLogger.error(e,exc_info=True) + +headers_user = {'Content-Type': 'application/json'} +url_getuserinfo = config.get("ELASTICSEARCH","url_user") +payload_user_elastic = {"size": 10000,"query":{"bool":{"must":[{"match":{"_type":"_doc"}}]}}} +user_response = requests.post(url_getuserinfo , headers = headers_user,data=json.dumps(payload_user_elastic)) +try: + if user_response.status_code == 200: + user_response = user_response.json() + user_data = user_response['hits']['hits'] + elasticSearchJson(user_data) + user_scroll_id = user_response['_scroll_id'] + else: + errorLogger.error(user_response) + errorLogger.error(user_response.text) + errorLogger.error("Failure in getting User Data From Elastic Search") +except KeyError as e: + user_hit = [] + user_scroll_id = None + errorLogger.error("user scroll id error") + +while user_data: + user_scroll_payload = json.dumps({ + 'scroll': '1m', + 'scroll_id': user_scroll_id + }) + user_scroll_api_url = config.get("ELASTICSEARCH","url_user_scroll") + user_scroll_response = requests.post(user_scroll_api_url,headers=headers_user,data = user_scroll_payload) + try: + if user_scroll_response.status_code == 200: + user_scroll_response = user_scroll_response.json() + user_data = user_scroll_response['hits']['hits'] + if len(user_data) > 0 : + elasticSearchJson(user_data) + user_scroll_id = user_scroll_response['_scroll_id'] + + else: + errorLogger.error("Failure in getting User Data From Elastic Search") + except KeyError : + user_entity_data = [] + user_entity_scroll_id = None + +#schema for the observation solution dataframe +user_roles_schema = StructType([ + StructField('roleId', StringType(), True), + StructField('userId', StringType(), True), + StructField('roleCode', StringType(), True), + StructField('entityId', StringType(), True) +]) + +user_roles_rdd = spark.sparkContext.parallelize(list(userEntityRoleArray)); +user_roles_df = spark.createDataFrame(user_roles_rdd,user_roles_schema); + + +# merge user_roles_df and roles_df to get role title +user_roles_title_df = user_roles_df.join(roles_df,user_roles_df.roleId==roles_df._id,'inner').drop(roles_df["_id"]) +user_roles_title_df = user_roles_title_df.select(user_roles_title_df["roleId"].alias("role_id"), + user_roles_title_df["userId"].alias("user_id"), + user_roles_title_df["roleCode"].alias("role_externalId"), + user_roles_title_df["entityId"], + user_roles_title_df["title"].alias("role_title")) + +#entity elastic search dataframe +entityArray = [] + +def entityElasticSearchJson(entityJsonData): + for ent_data in entityJsonData : + for tel in ent_data["_source"]["data"]["telemetry_entities"]: + tel["entity_id"] = ent_data["_source"]["data"]["_id"] + entityArray.append(tel) +headers_entity = {'Content-Type': 'application/json'} +url_getentityinfo = config.get("ELASTICSEARCH","url_entity") +payload_entity_elastic = {"size": 10000,"query":{"bool":{"must":[{"match":{"_type":"_doc"}}]}}} +entity_response = requests.post(url_getentityinfo , headers = headers_entity,data=json.dumps(payload_entity_elastic)) +try: + if entity_response.status_code == 200: + entity_response = entity_response.json() + entity_data = entity_response['hits']['hits'] + entityElasticSearchJson(entity_data) + entity_scroll_id = entity_response['_scroll_id'] + else: + errorLogger.error("Failure in getting Entity Data From Elastic Search") +except KeyError as e: + entity_hit = [] + entity_scroll_id = None + errorLogger.error("entity scroll id error") + +while entity_data: + entity_scroll_payload = json.dumps({ + 'scroll': '1m', + 'scroll_id': entity_scroll_id + }) + entity_scroll_api_url = config.get("ELASTICSEARCH","url_user_scroll") + entity_scroll_response = requests.post(entity_scroll_api_url,headers=headers_entity,data = entity_scroll_payload) + try: + if entity_scroll_response.status_code == 200: + entity_scroll_response = entity_scroll_response.json() + entity_data = entity_scroll_response['hits']['hits'] + if len(entity_data) > 0 : + entityElasticSearchJson(entity_data) + entity_scroll_id = entity_scroll_response['_scroll_id'] + + else: + errorLogger.error("Failure in getting Entity Data From Elastic Search") + except KeyError : + entity_entity_data = [] + entity_entity_scroll_id = None + + +entity_df = ks.DataFrame(entityArray); +entity_df = entity_df.to_spark() + +# merge user role title dataframe and entity dataframe +user_entity_info_df = user_roles_title_df.join(entity_df,user_roles_title_df.entityId==entity_df.entity_id,'inner')\ + .drop(user_roles_title_df["entityId"]) + +# merge user entity dataframe and user org dataframe +user_df = df_user_org.join(user_entity_info_df,df_user_org.id==user_entity_info_df.user_id,'left')\ + .drop(user_entity_info_df["user_id"]).drop(user_entity_info_df["entity_id"]) + +user_df_integrated_app = df_user_org.join(df_user_rel_entities,df_user_org.id==df_user_rel_entities.user_id,'left') +user_df_integrated_app = user_df_integrated_app.drop(user_df_integrated_app["user_id"]) + +obs_sub_cursorMongo = [] +obs_sol_cursorMongo = [] +user_org_rows = [] +org_rows = [] +roles_cursorMongo = [] +userEntityRoleArray = [] +entityArray = [] + +obs_sub_df1.cache() +obs_sub_df.cache() +obs_soln_df.cache() +df_user_org.cache() +roles_df.cache() +user_roles_df.cache() +entity_df.cache() +user_entity_info_df.cache() + +# merge user dataframe and observation submission dataframe +obs_sub_status_df_survey = obs_sub_pgm_df\ + .join(user_df,[obs_sub_pgm_df.user_id==user_df.id, + obs_sub_pgm_df.app_name==config.get("COMMON","diksha_survey_app_name")],'inner')\ + .drop(user_df["id"]).drop(user_df["entity_id"]) + +obs_sub_status_df_integrated_app = obs_sub_pgm_df\ + .join(user_df_integrated_app,[obs_sub_pgm_df.user_id==user_df_integrated_app.id, + obs_sub_pgm_df.app_name==config.get("COMMON","diksha_integrated_app_name")],'inner')\ + .drop(user_df_integrated_app["id"]) + +integrated_app_column_list = [] +survey_app_column_list = [] +integrated_app_column_list = obs_sub_status_df_integrated_app.columns +survey_app_column_list = obs_sub_status_df_survey.columns + +missing_col_in_integrated_app_list = [] +missing_col_in_integrated_app_list = list(set(integrated_app_column_list) - set(survey_app_column_list)) +missing_col_in_survey_app_list = [] +missing_col_in_survey_app_list = list(set(survey_app_column_list) - set(integrated_app_column_list)) + +if len(missing_col_in_survey_app_list) : + for inte in missing_col_in_survey_app_list : + obs_sub_status_df_integrated_app = obs_sub_status_df_integrated_app.withColumn(inte, lit(None).cast(StringType())) + +if len(missing_col_in_integrated_app_list) : + for sur in missing_col_in_integrated_app_list : + obs_sub_status_df_survey = obs_sub_status_df_survey.withColumn(sur, lit(None).cast(StringType())) + +final_df = obs_sub_status_df_integrated_app.unionByName(obs_sub_status_df_survey) +final_df = final_df.dropDuplicates() +final_df.coalesce(1).write.format("json").mode("overwrite") \ + .save(config.get("COMMON","observation_status_output_dir")+"/") + +for filename in os.listdir(config.get("COMMON","observation_status_output_dir")+"/"): + if filename.endswith(".json"): + os.rename(config.get("COMMON","observation_status_output_dir")+"/"+filename, + config.get("COMMON","observation_status_output_dir")+"/sl_observation_status.json") +blob_service_client = BlockBlobService(account_name=config.get("AZURE","account_name"), + sas_token=config.get("AZURE","sas_token")) +container_name = config.get("AZURE","container_name") +local_path = config.get("COMMON","observation_status_output_dir") +blob_path = config.get("AZURE","blob_path") + +for files in os.listdir(local_path): + if "sl_observation_status.json" in files: + blob_service_client.create_blob_from_path(container_name,os.path.join(blob_path,files),local_path + "/" + files) + +datasources = ["sl-observation-status"] + +sl_status_spec = config.get("DRUID","sl_observation_status_spec") + +ingestion_specs = [sl_status_spec] + +for i,j in zip(datasources,ingestion_specs): + + druid_end_point = config.get("DRUID","druid_end_point")+i + + druid_batch_end_point = config.get("DRUID","druid_batch_end_point") + + headers = {'Content-Type' : 'application/json'} + + get_timestamp = requests.get(druid_end_point, headers=headers) + + successLogger.debug(get_timestamp) + if get_timestamp.status_code == 200 : + successLogger.debug("Successfully fetched time stamp of the datasource " + i ) + timestamp = get_timestamp.json() + #calculating interval from druid get api + minTime = timestamp["segments"]["minTime"] + maxTime = timestamp["segments"]["maxTime"] + min1 = datetime.datetime.strptime(minTime,"%Y-%m-%dT%H:%M:%S.%fZ") + max1 = datetime.datetime.strptime(maxTime,"%Y-%m-%dT%H:%M:%S.%fZ") + new_format = "%Y-%m-%d" + min1.strftime(new_format) + max1.strftime(new_format) + minmonth = "{:02d}".format(min1.month) + maxmonth = "{:02d}".format(max1.month) + min2 = str(min1.year) + "-" + minmonth + "-" + str(min1.day) + max2 = str(max1.year) + "-" + maxmonth + "-" + str(max1.day) + interval = min2 + "_" + max2 + successLogger.debug(interval) + + time.sleep(50) + + disable_datasource = requests.delete(druid_end_point, headers=headers) + if disable_datasource.status_code == 200: + successLogger.debug("successfully disabled the datasource " + i) + time.sleep(300) + + delete_segments = requests.delete(druid_end_point + "/intervals/" + interval, headers=headers) + if delete_segments.status_code == 200: + successLogger.debug("successfully deleted the segments " + i) + time.sleep(300) + + enable_datasource = requests.get(druid_end_point, headers=headers) + if enable_datasource.status_code == 204: + successLogger.debug("successfully enabled the datasource " + i) + + time.sleep(300) + + start_supervisor = requests.post(druid_batch_end_point,data=j, headers=headers) + successLogger.debug("ingest data") + if start_supervisor.status_code == 200: + successLogger.debug("started the batch ingestion task sucessfully for the datasource " + i) + time.sleep(50) + else: + errorLogger.error("failed to start batch ingestion task" + str(start_supervisor.status_code)) + + else: + errorLogger.error("failed to enable the datasource " + i) + else: + errorLogger.error("failed to delete the segments of the datasource " + i) + else: + errorLogger.error("failed to disable the datasource " + i) + + + + elif get_timestamp.status_code == 204: + start_supervisor = requests.post(druid_batch_end_point,data=j, headers=headers) + if start_supervisor.status_code == 200: + successLogger.debug("started the batch ingestion task sucessfully for the datasource " + i) + time.sleep(50) + else: + errorLogger.error("failed to start batch ingestion task" + str(start_supervisor.status_code)) + errorLogger.error(start_supervisor.json()) + else: + errorLogger.error("failed to get the timestamp of the datasource " + i) diff --git a/projects/config.sample b/projects/config.sample new file mode 100644 index 0000000..0556a0b --- /dev/null +++ b/projects/config.sample @@ -0,0 +1,64 @@ +# -------------------- +[MONGO] +# -------------------- + +url = mongodb://: +db = +collection = + +# -------------------- +[KEYCLOAK] +# -------------------- + +#url = https:///auth/realms/sunbird/protocol/openid-connect/token +url = http:///auth/realms/sunbird/protocol/openid-connect/token +#client_id = +#client_secret = +client_id = +grant_type = +refresh_token = + +# -------------------- +[API] +# -------------------- + +content_type = application/json +authorization = Bearer + +# -------------------- +[ENDPOINTS] +# -------------------- + +read_user = http:///api/user/v3/read +coordinator_v1_ds = coordinator/v1/datasources/ +indexer_v1_task = indexer/v1/task + +# -------------------- +[DRUID] +# -------------------- + +url = http:///druid/ + +# -------------------- +[SPECS] +# -------------------- + +sl_general_unnati_spec = {"type":"index","spec":{"ioConfig":{"type":"index","inputSource":{"type": "azure","uris": ["azure://samiksha/projects/sl_projects.json"]},"inputFormat":{"type":"json"}},"tuningConfig":{"type":"index","partitionsSpec":{"type":"dynamic"}},"dataSchema":{"dataSource":"sl-project","granularitySpec":{"type":"uniform","queryGranularity":"HOUR","rollup":true,"segmentGranularity":"DAY"},"timestampSpec":{"column":"project_updated_date","format":"auto"},"dimensionsSpec":{"dimensions":[]},"metricsSpec":[]}}} + +# -------------------- +[AZURE] +# -------------------- + +account_name = +sas_token = +container_name = +blob_path = + +# -------------------- +[FILE_PATHS] +# -------------------- + +projects_output_dir = +project_success_log_filename = +project_error_log_filename = + diff --git a/projects/pyspark_project_batch.py b/projects/pyspark_project_batch.py new file mode 100644 index 0000000..d7b1cf6 --- /dev/null +++ b/projects/pyspark_project_batch.py @@ -0,0 +1,499 @@ +# ----------------------------------------------------------------- +# Name : pyspark_project_batch.py +# Author : +# Description : +# +# ----------------------------------------------------------------- +import json, sys, time +from configparser import ConfigParser, ExtendedInterpolation +from pymongo import MongoClient +import os +import requests +from pyspark.sql import SparkSession +from pyspark.sql.functions import * +import pyspark.sql.functions as F +from pyspark.sql.types import * +from pyspark.sql import Row +from collections import OrderedDict +import databricks.koalas as ks +from azure.storage.blob import BlockBlobService, PublicAccess +import logging.handlers +from logging.handlers import TimedRotatingFileHandler +import datetime + +config_path = os.path.dirname(os.path.abspath(__file__)) +config = ConfigParser(interpolation=ExtendedInterpolation()) +config.read(config_path + "/config.ini") + +formatter = logging.Formatter('%(asctime)s - %(levelname)s') + +successLogger = logging.getLogger('success log') +successLogger.setLevel(logging.DEBUG) + +# Add the log message handler to the logger +successHandler = logging.handlers.RotatingFileHandler( + config.get('FILE_PATHS', 'project_success_log_filename') +) +successBackuphandler = TimedRotatingFileHandler( + config.get('FILE_PATHS', 'project_success_log_filename'), + when="w0", + backupCount=1 +) +successHandler.setFormatter(formatter) +successLogger.addHandler(successHandler) +successLogger.addHandler(successBackuphandler) + +errorLogger = logging.getLogger('error log') +errorLogger.setLevel(logging.ERROR) +errorHandler = logging.handlers.RotatingFileHandler( + config.get('FILE_PATHS', 'project_error_log_filename') +) +errorBackuphandler = TimedRotatingFileHandler( + config.get('FILE_PATHS', 'project_error_log_filename'), + when="w0", + backupCount=1 +) +errorHandler.setFormatter(formatter) +errorLogger.addHandler(errorHandler) +errorLogger.addHandler(errorBackuphandler) + +try: + def convert_to_row(d: dict) -> Row: + return Row(**OrderedDict(sorted(d.items()))) +except Exception as e: + errorLogger.error(e, exc_info=True) + +spark = SparkSession.builder.appName("projects").config("spark.driver.memory", "25g").getOrCreate() + +clientProd = MongoClient(config.get('MONGO', 'url')) + +dbProd = clientProd[config.get('MONGO', 'db')] + +projectsCollec = dbProd[config.get('MONGO', 'collection')] + +# getKeyclock api to generate authentication token +try: + def get_keyclock_accesstoken(): + url_getkeyclock = config.get("KEYCLOAK", "url") + headers_getkeyclock = {'Content-Type': 'application/x-www-form-urlencoded'} + body_getkeyclock = { + "grant_type": config.get("KEYCLOAK", "grant_type"), + "client_id": config.get("KEYCLOAK", "client_id"), + "refresh_token": config.get("KEYCLOAK", "refresh_token") + } + + responsegetkeyclock = requests.post( + url_getkeyclock, data=body_getkeyclock, headers=headers_getkeyclock + ) + if responsegetkeyclock.status_code == 200: + successLogger.debug("getkeyclock api") + return responsegetkeyclock.json() + else: + errorLogger.error(" Failure in getkeyclock API ") + errorLogger.error(responsegetkeyclock) + errorLogger.error(responsegetkeyclock.text) +except Exception as e: + errorLogger.error(e, exc_info=True) + +try: + def readUser(userId, accessToken): + queryStringReadUser = "?fields=completeness%2CmissingFields%2ClastLoginTime%2Ctopics%2Corganisations%2Croles%2Clocations%2Cdeclarations" + urlReadUser = config.get("ENDPOINTS", "read_user") + "/" + str(userId) + queryStringReadUser + headersReadUser = { + 'Content-Type': config.get("API", "content_type"), + 'Authorization': config.get("API", "authorization"), + 'X-authenticated-user-token': accessToken + } + responseReadUser = requests.get(urlReadUser, headers=headersReadUser) + if responseReadUser.status_code == 200: + successLogger.debug("read user api") + responseReadUser = responseReadUser.json() + return responseReadUser + else: + errorLogger.error("read user api failed") + errorLogger.error(responseReadUser) + errorLogger.error(responseReadUser.text) +except Exception as e: + errorLogger.error(e, exc_info=True) + +try: + def removeduplicate(it): + seen = [] + for x in it: + if x not in seen: + yield x + seen.append(x) +except Exception as e: + errorLogger.error(e, exc_info=True) + +spark = SparkSession.builder.appName("projects").config( + "spark.driver.memory", "50g" +).config("spark.executor.memory", "100g").config( + "spark.memory.offHeap.enabled", True +).config("spark.memory.offHeap.size", "32g").getOrCreate() + +sc = spark.sparkContext + +projects_cursorMongo = projectsCollec.aggregate( + [ + { + "$project": { + "_id": {"$toString": "$_id"}, + "projectTemplateId": {"$toString": "$projectTemplateId"}, + "solutionInformation": {"name": 1}, "title": 1, + "programId": {"$toString": "$programId"}, + "programInformation": {"name": 1}, + "metaInformation": {"duration": 1}, "syncedAt": 1, + "updatedAt": 1, "isDeleted": 1, "categories": 1, + "tasks": 1, "status": 1, "userId": 1, "description": 1, + "createdAt": 1 + } + } + ] +) + +projects_schema = StructType( + [ + StructField('_id', StringType(), True), + StructField('projectTemplateId', StringType(), True), + StructField( + 'solutionInformation', StructType( + [StructField('name', StringType(), True)] + ) + ), + StructField('title', StringType(), True), + StructField('programId', StringType(), True), + StructField( + 'programInformation', StructType( + [StructField('name', StringType(), True)] + ) + ), + StructField( + 'metaInformation', StructType( + [StructField('duration', StringType(), True)] + ) + ), + StructField('updatedAt', TimestampType(), True), + StructField('syncedAt', TimestampType(), True), + StructField('isDeleted', BooleanType(), True), + StructField('status', StringType(), True), + StructField('userId', StringType(), True), + StructField('description', StringType(), True), + StructField('createdAt', TimestampType(), True), + StructField( + 'categories', ArrayType( + StructType( + [StructField('name', StringType(), True)] + ) + ), True + ), + StructField( + 'tasks', ArrayType( + StructType( + [ + StructField('_id', StringType(), True), + StructField('name', StringType(), True), + StructField('assignee', StringType(), True), + StructField( + 'attachments', ArrayType( + StructType([StructField('sourcePath', StringType(), True)]) + ) + ), + StructField('startDate', StringType(), True), + StructField('endDate', StringType(), True), + StructField('syncedAt', TimestampType(), True), + StructField('status', StringType(), True), + StructField('children', ArrayType( + StructType([ + StructField('_id', StringType(), True), + StructField('name', StringType(), True), + StructField('startDate', StringType(), True), + StructField('endDate', StringType(), True), + StructField('syncedAt', TimestampType(), True), + StructField('status', StringType(), True) + ]) + )), + ] + ) + ), True + ) + ] +) + +projects_rdd = spark.sparkContext.parallelize(list(projects_cursorMongo)) +projects_df = spark.createDataFrame(projects_rdd, projects_schema) + +projects_df = projects_df.withColumn( + "project_created_type", + F.when( + projects_df["projectTemplateId"].isNotNull() == True, "project imported from library" + ).otherwise("user created project") +) + +projects_df = projects_df.withColumn( + "project_title", F.when( + projects_df["solutionInformation"]["name"].isNotNull() == True, + projects_df["solutionInformation"]["name"] + ).otherwise(projects_df["title"]) +) + +projects_df = projects_df.withColumn( + "date_time", to_timestamp(projects_df["updatedAt"], 'yyyy-MM-dd HH:mm:ss') +) + +projects_df = projects_df.withColumn("date", F.split(projects_df["date_time"], ' ')[0]) +projects_df = projects_df.withColumn("time", F.split(projects_df["date_time"], ' ')[1]) + +projects_df = projects_df.withColumn( + "project_updated_date", F.concat( + F.col("date"), F.lit("T"), F.col("time"), F.lit(".000Z") + ) +) + +projects_df = projects_df.withColumn( + "deleted_flag", F.when( + (projects_df["isDeleted"].isNotNull() == True) & + (projects_df["isDeleted"] == True), "true" + ).when( + (projects_df["isDeleted"].isNotNull() == True) & + (projects_df["isDeleted"] == False), "false" + ).otherwise("false") +) + +projects_df = projects_df.withColumn("exploded_categories", F.explode_outer(F.col("categories"))) + +projects_df = projects_df.withColumn("parent_channel", F.lit("SHIKSHALOKAM")) + +projects_df = projects_df.withColumn("exploded_tasks", F.explode_outer(F.col("tasks"))) + +projects_df = projects_df.withColumn( + "exploded_tasks_attachments", F.explode_outer(projects_df["exploded_tasks"]["attachments"]) +) + +projects_df = projects_df.withColumn( + "task_evidence_status", F.when( + projects_df["exploded_tasks_attachments"]["sourcePath"].isNotNull() == True, True + ).otherwise(False) +) + +projects_df = projects_df.withColumn( + "task_evidence", F.when( + projects_df["exploded_tasks_attachments"]["sourcePath"].isNotNull() == True, + F.concat( + F.lit("https://samikshaprod.blob.core.windows.net/samiksha/"), + projects_df["exploded_tasks_attachments"]["sourcePath"] + ) + ) +) + +projects_df = projects_df.withColumn("exploded_sub_tasks", F.explode_outer(projects_df["exploded_tasks"]["children"])) + +projects_df_cols = projects_df.select( + projects_df["_id"].alias("project_id"), projects_df["project_created_type"], + projects_df["project_title"], + projects_df["title"].alias("project_title_editable"), + projects_df["programId"].alias("program_id"), + projects_df["programInformation"]["name"].alias("program_name"), + projects_df["metaInformation"]["duration"].alias("project_duration"), + projects_df["syncedAt"].alias("project_last_sync"), + projects_df["project_updated_date"], projects_df["deleted_flag"], + projects_df["exploded_categories"]["name"].alias("area_of_improvement"), + projects_df["status"].alias("status_of_project"), + projects_df["userId"].alias("createdBy"), + projects_df["description"].alias("project_goal"), projects_df["parent_channel"], + projects_df["createdAt"].alias("project_created_date"), + projects_df["exploded_tasks"]["_id"].alias("task_id"), + projects_df["exploded_tasks"]["name"].alias("tasks"), + projects_df["exploded_tasks"]["assignee"].alias("task_assigned_to"), + projects_df["exploded_tasks"]["startDate"].alias("task_start_date"), + projects_df["exploded_tasks"]["endDate"].alias("task_end_date"), + projects_df["exploded_tasks"]["syncedAt"].alias("tasks_date"), + projects_df["exploded_tasks"]["status"].alias("tasks_status"), + projects_df["task_evidence"], projects_df["task_evidence_status"], + projects_df["exploded_sub_tasks"]["_id"].alias("sub_task_id"), + projects_df["exploded_sub_tasks"]["name"].alias("sub_task"), + projects_df["exploded_sub_tasks"]["status"].alias("sub_task_status"), + projects_df["exploded_sub_tasks"]["syncedAt"].alias("sub_task_date"), + projects_df["exploded_sub_tasks"]["startDate"].alias("sub_task_start_date"), + projects_df["exploded_sub_tasks"]["endDate"].alias("sub_task_end_date") +) + +projects_df_cols = projects_df_cols.dropDuplicates() + +projects_userid_df = projects_df_cols.select("createdBy") + +userId_projects_df_after = [] +userId_arr = [] +uniqueuserId_arr = [] +userId_projects_df_before = projects_userid_df.toJSON().map(lambda j: json.loads(j)).collect() +for uid in userId_projects_df_before: + userId_arr.append(uid["createdBy"]) +uniqueuserId_arr = list(removeduplicate(userId_arr)) + +get_keycloak_obj = get_keyclock_accesstoken() + +user_info_arr = [] +entitiesArr = [] +for usr in uniqueuserId_arr: + readUserObj = {} + readUserObj = readUser(usr, get_keycloak_obj["access_token"]) + if readUserObj: + readResult = False + readResult = "result" in readUserObj + if readResult == True: + readResponse = False + readResponse = "response" in readUserObj["result"] + if readResponse == True: + userEntitiesArr = [] + userObj = {} + try: + if len(readUserObj["result"]["response"]["userLocations"]) > 0: + for usrLoc in readUserObj["result"]["response"]["userLocations"]: + userObj[usrLoc["type"] + '_name'] = usrLoc["name"] + userObj[usrLoc["type"] + '_id'] = usrLoc["id"] + userObj[usrLoc["type"] + '_externalId'] = usrLoc["code"] + except KeyError: + pass + userEntitiesArr = list(userObj.keys()) + entitiesArr.extend(userEntitiesArr) + userObj["id"] = readUserObj["result"]["response"]["id"] + userObj["user_id"] = readUserObj["result"]["response"]["userName"] + userObj["user_full_name"] = readUserObj["result"]["response"]["firstName"] + userObj["channel"] = readUserObj["result"]["response"]["rootOrgId"] + userRoles = None + try: + userRoles = readUserObj["result"]["response"]["userSubType"] + except KeyError: + userRoles = '' + try: + if userRoles: + userObj["designation"] = userRoles + except KeyError: + pass + user_info_arr.append(userObj) + +user_df = ks.DataFrame(user_info_arr) +user_df = user_df.to_spark() + +final_projects_df = projects_df_cols.join( + user_df, projects_df_cols["createdBy"] == user_df["id"], "inner" +).drop(user_df["id"]) + +final_projects_df = final_projects_df.dropDuplicates() + +final_projects_df.coalesce(1).write.format("json").mode("overwrite").save( + config.get("FILE_PATHS", "projects_output_dir") + "/" +) + +for filename in os.listdir(config.get("FILE_PATHS", "projects_output_dir") + "/"): + if filename.endswith(".json"): + os.rename( + config.get("FILE_PATHS", "projects_output_dir") + "/" + filename, + config.get("FILE_PATHS", "projects_output_dir") + "/sl_projects.json" + ) + +blob_service_client = BlockBlobService( + account_name=config.get("AZURE", "account_name"), + sas_token=config.get("AZURE", "sas_token") +) +container_name = config.get("AZURE", "container_name") +local_path = config.get("FILE_PATHS", "projects_output_dir") +blob_path = config.get("AZURE", "blob_path") + +for files in os.listdir(local_path): + if "sl_projects.json" in files: + blob_service_client.create_blob_from_path( + container_name, + os.path.join(blob_path, files), + local_path + "/" + files + ) + +os.remove(config.get("FILE_PATHS", "projects_output_dir") + "/sl_projects.json") + +dimensionsArr = [] +dimensionsArr = list(set(entitiesArr)) + +submissionReportColumnNamesArr = [ + 'user_id', 'user_full_name', 'project_title', 'project_goal', 'project_created_date', + 'project_last_sync', 'area_of_improvement', 'status_of_project', 'tasks', + 'tasks_date', 'tasks_status', 'sub_task', 'sub_task_status', 'sub_task_date', + "task_start_date", "task_end_date", "sub_task_start_date", "sub_task_end_date", + "designation", "deleted_flag", "task_evidence", "task_evidence_status", "project_id", + "task_id", "sub_task_id", "project_created_type", "task_assigned_to", 'channel', + 'parent_channel', 'program_id', 'program_name', 'project_updated_date', 'createdBy', + 'project_title_editable', 'project_duration' +] + +dimensionsArr.extend(submissionReportColumnNamesArr) + +datasources = ["sl-project"] +payload = json.loads(config.get("SPECS", "sl_general_unnati_spec")) +payload["spec"]["dataSchema"]["dimensionsSpec"]["dimensions"] = dimensionsArr +ingestion_specs = [json.dumps(payload)] + +for i, j in zip(datasources, ingestion_specs): + + druid_end_point = config.get("DRUID", "url") + config.get("ENDPOINTS", "coordinator_v1_ds") + i + + druid_batch_end_point = config.get("DRUID", "url") + config.get("ENDPOINTS", "indexer_v1_task") + + headers = {'Content-Type': 'application/json'} + + get_timestamp = requests.get(druid_end_point, headers=headers) + + if get_timestamp.status_code == 200: + successLogger.debug("Successfully fetched time stamp of the datasource " + i) + timestamp = get_timestamp.json() + # calculating interval from druid get api + minTime = timestamp["segments"]["minTime"] + maxTime = timestamp["segments"]["maxTime"] + min1 = datetime.datetime.strptime(minTime, "%Y-%m-%dT%H:%M:%S.%fZ") + max1 = datetime.datetime.strptime(maxTime, "%Y-%m-%dT%H:%M:%S.%fZ") + new_format = "%Y-%m-%d" + min1.strftime(new_format) + max1.strftime(new_format) + minmonth = "{:02d}".format(min1.month) + maxmonth = "{:02d}".format(max1.month) + min2 = str(min1.year) + "-" + minmonth + "-" + str(min1.day) + max2 = str(max1.year) + "-" + maxmonth + "-" + str(max1.day) + interval = min2 + "_" + max2 + time.sleep(50) + + disable_datasource = requests.delete(druid_end_point, headers=headers) + if disable_datasource.status_code == 200: + successLogger.debug("successfully disabled the datasource " + i) + time.sleep(300) + + delete_segments = requests.delete(druid_end_point + "/intervals/" + interval, headers=headers) + if delete_segments.status_code == 200: + successLogger.debug("successfully deleted the segments " + i) + time.sleep(300) + + enable_datasource = requests.get(druid_end_point, headers=headers) + if enable_datasource.status_code == 204: + successLogger.debug("successfully enabled the datasource " + i) + + time.sleep(300) + + start_supervisor = requests.post(druid_batch_end_point, data=j, headers=headers) + successLogger.debug("ingest data") + if start_supervisor.status_code == 200: + successLogger.debug("started the batch ingestion task sucessfully for the datasource " + i) + time.sleep(50) + else: + errorLogger.error("failed to start batch ingestion task" + str(start_supervisor.status_code)) + else: + errorLogger.error("failed to enable the datasource " + i) + else: + errorLogger.error("failed to delete the segments of the datasource " + i) + else: + errorLogger.error("failed to disable the datasource " + i) + + elif get_timestamp.status_code == 204: + start_supervisor = requests.post(druid_batch_end_point, data=j, headers=headers) + if start_supervisor.status_code == 200: + successLogger.debug("started the batch ingestion task sucessfully for the datasource " + i) + time.sleep(50) + else: + errorLogger.error(start_supervisor.text) + errorLogger.error("failed to start batch ingestion task" + str(start_supervisor.status_code)) diff --git a/survey/config.sample b/survey/config.sample new file mode 100644 index 0000000..2967e61 --- /dev/null +++ b/survey/config.sample @@ -0,0 +1,70 @@ +# -------------------- +[KEYCLOAK] +# -------------------- + +url = http:///auth/realms/sunbird/protocol/openid-connect/token +grant_type = refresh_token +client_id = android +refresh_token = + +# -------------------- +[SUNBIRD] +# -------------------- + +base_url_ip = http:///api + +# -------------------- +[ENDPOINT] +# -------------------- + +read_user = user/v1/read + +# -------------------- +[STORAGE] +# -------------------- + +base_url = https:/// + +# -------------------- +[COMMON] +# -------------------- + +parent_channel = +content_type = application/json +authorization = +diksha_survey_app_name = + +# -------------------- +[MONGO] +# -------------------- + +url = mongodb://:27017 +db = + +solutionsCollec = solutions +entityTypeCollec = entityTypes +questionsCollec = questions +criteriaCollec = criteria +entitiesCollec = entities +programsCollec = programs +surveysCollec = surveys +surveySubmissionsCollec = surveySubmissions + +# -------------------- +[KAFKA] +# -------------------- + +url = :9092 +raw_data_topic = +druid_topic = +evidence_druid_topic = +dev_topic = + +# -------------------- +[LOG_FILE] +# -------------------- + +survey_streaming_success_log_filename = +survey_streaming_error_log_filename = +survey_evidence_streaming_success_log_filename = +survey_evidence_streaming_error_log_filename = diff --git a/survey/py_survey_evidence_streaming.py b/survey/py_survey_evidence_streaming.py new file mode 100755 index 0000000..868d107 --- /dev/null +++ b/survey/py_survey_evidence_streaming.py @@ -0,0 +1,176 @@ +# ----------------------------------------------------------------- +# Name : py_survey_evidence_streaming.py +# Author : +# Description : +# +# ----------------------------------------------------------------- +from pymongo import MongoClient +from bson.objectid import ObjectId +import csv, os +import json +import datetime +from kafka import KafkaProducer +from configparser import ConfigParser, ExtendedInterpolation +import faust +import logging.handlers +from logging.handlers import TimedRotatingFileHandler + +config_path = os.path.dirname(os.path.abspath(__file__)) +config = ConfigParser(interpolation=ExtendedInterpolation()) +config.read(config_path + "/config.ini") + +formatter = logging.Formatter('%(asctime)s - %(levelname)s') + +successLogger = logging.getLogger('success log') +successLogger.setLevel(logging.DEBUG) + +# Add the log message handler to the logger +successHandler = logging.handlers.RotatingFileHandler( + config.get('LOG_FILE', 'survey_evidence_streaming_success_log_filename') +) +successBackuphandler = TimedRotatingFileHandler( + config.get('LOG_FILE', 'survey_evidence_streaming_success_log_filename'), + when="w0", + backupCount=1 +) +successHandler.setFormatter(formatter) +successLogger.addHandler(successHandler) +successLogger.addHandler(successBackuphandler) + +errorLogger = logging.getLogger('error log') +errorLogger.setLevel(logging.ERROR) +errorHandler = logging.handlers.RotatingFileHandler( + config.get('LOG_FILE', 'survey_evidence_streaming_error_log_filename') +) +errorBackuphandler = TimedRotatingFileHandler( + config.get('LOG_FILE', 'survey_evidence_streaming_error_log_filename'), + when="w0", + backupCount=1 +) +errorHandler.setFormatter(formatter) +errorLogger.addHandler(errorHandler) +errorLogger.addHandler(errorBackuphandler) + +try: + app = faust.App( + 'sl_py_survey_evidence_prod', + broker='kafka://' + config.get("KAFKA", "url"), + value_serializer='raw', + web_port=7005 + ) + + kafka_url = (config.get("KAFKA", "url")) + producer = KafkaProducer(bootstrap_servers=[kafka_url]) + + # db production + clientqa = MongoClient(config.get('MONGO', 'url')) + dbqa = clientqa[config.get('MONGO', 'db')] + + surveySubmissionsCollec = dbqa[config.get('MONGO', 'surveySubmissionsCollec')] + solutionsDevCollec = dbqa[config.get('MONGO', 'solutionsCollec')] + surveysCollec = dbqa[config.get('MONGO', 'surveysCollec')] + entityTypeDevCollec = dbqa[config.get('MONGO', 'entityTypeCollec')] + questionsDevCollec = dbqa[config.get('MONGO', 'questionsCollec')] + criteriaDevCollec = dbqa[config.get('MONGO', 'criteriaCollec')] + entitiesDevCollec = dbqa[config.get('MONGO', 'entitiesCollec')] +except Exception as e: + errorLogger.error(e, exc_info=True) + +try: + def convert(lst): + return ','.join(lst) +except Exception as e: + errorLogger.error(e, exc_info=True) + +try: + def evidence_extraction(msg_id): + for obSub in surveySubmissionsCollec.find({'_id': ObjectId(msg_id)}): + successLogger.debug("Survey Evidence Submission Id : " + str(msg_id)) + try: + completedDate = str(datetime.datetime.date(obSub['completedDate'])) + 'T' + \ + str(datetime.datetime.time(obSub['completedDate'])) + 'Z' + except KeyError: + pass + evidence_sub_count = 0 + try: + answersArr = [v for v in obSub['answers'].values()] + except KeyError: + pass + for ans in answersArr: + try: + if len(ans['fileName']): + evidence_sub_count = evidence_sub_count + len(ans['fileName']) + except KeyError: + if len(ans['instanceFileName']): + for instance in ans['instanceFileName']: + evidence_sub_count = evidence_sub_count + len(instance) + for answer in answersArr: + surveySubQuestionsObj = {} + surveySubQuestionsObj['completedDate'] = completedDate + surveySubQuestionsObj['total_evidences'] = evidence_sub_count + surveySubQuestionsObj['userName'] = obSub['evidencesStatus'][0]['submissions'][0]['submittedByName'] + surveySubQuestionsObj['userName'] = surveySubQuestionsObj['userName'].replace("null", "") + surveySubQuestionsObj['surveySubmissionId'] = str(obSub['_id']) + surveySubQuestionsObj['createdBy'] = obSub['createdBy'] + surveySubQuestionsObj['solutionExternalId'] = obSub['solutionExternalId'] + surveySubQuestionsObj['solutionId'] = str(obSub['solutionId']) + surveySubQuestionsObj['surveyId'] = str(obSub['surveyId']) + + fileName = [] + fileSourcePath = [] + try: + surveySubQuestionsObj['remarks'] = answer['remarks'] + surveySubQuestionsObj['questionName'] = answer['payload']['question'][0] + except KeyError: + pass + surveySubQuestionsObj['questionId'] = str(answer['qid']) + for ques in questionsDevCollec.find({'_id': ObjectId(surveySubQuestionsObj['questionId'])}): + surveySubQuestionsObj['questionExternalId'] = ques['externalId'] + surveySubQuestionsObj['questionResponseType'] = answer['responseType'] + try: + surveySubQuestionsObj['appName'] = obSub["appInformation"]["appName"].lower() + except KeyError: + surveySubQuestionsObj['appName'] = config.get("COMMON", "diksha_survey_app_name") + evidence = [] + evidenceCount = 0 + try: + if answer['fileName']: + evidence = answer['fileName'] + surveySubQuestionsObj['evidence_count'] = len(evidence) + evidenceCount = len(evidence) + except KeyError: + if answer['instanceFileName']: + for inst in answer['instanceFileName']: + evidence.extend(inst) + surveySubQuestionsObj['evidence_count'] = len(evidence) + evidenceCount = len(evidence) + for evi in evidence: + fileName.append(evi['name']) + fileSourcePath.append(evi['sourcePath']) + surveySubQuestionsObj['fileName'] = convert(fileName) + surveySubQuestionsObj['fileSourcePath'] = convert(fileSourcePath) + if evidenceCount > 0: + producer.send( + (config.get("KAFKA", "evidence_druid_topic")), + json.dumps(surveySubQuestionsObj).encode('utf-8') + ) + producer.flush() + successLogger.debug("Send Obj to Kafka") +except Exception as e: + errorLogger.error(e, exc_info=True) + +try: + # loop the consumer messages and produce it to another topic + @app.agent(config.get("KAFKA", "dev_topic")) + async def survey_Faust(consumer): + async for msg in consumer: + msg_val = msg.decode('utf-8') + msg_data = json.loads(msg_val) + successLogger.debug("========== START OF SURVEY EVIDENCE SUBMISSION ========") + evidence_extraction(msg_data['_id']) + successLogger.debug("********* END OF SURVEY EVIDENCE SUBMISSION ***********") +except Exception as e: + errorLogger.error(e, exc_info=True) + +if __name__ == '__main__': + app.main() diff --git a/survey/py_survey_streaming.py b/survey/py_survey_streaming.py new file mode 100755 index 0000000..9670a24 --- /dev/null +++ b/survey/py_survey_streaming.py @@ -0,0 +1,397 @@ +# ----------------------------------------------------------------- +# Name : py_survey_streaming.py +# Author : +# Description : Program to read data from one kafka topic and produce it +# to another kafka topic +# ----------------------------------------------------------------- +import datetime +import json +import logging.handlers +import os +from configparser import ConfigParser, ExtendedInterpolation +from logging.handlers import TimedRotatingFileHandler + +import faust +import requests +from bson.objectid import ObjectId +from kafka import KafkaProducer +from pymongo import MongoClient + +config_path = os.path.dirname(os.path.abspath(__file__)) +config = ConfigParser(interpolation=ExtendedInterpolation()) +config.read(config_path + "/config.ini") + +formatter = logging.Formatter('%(asctime)s - %(levelname)s') + +successLogger = logging.getLogger('success log') +successLogger.setLevel(logging.DEBUG) + +# Add the log message handler to the logger +successHandler = logging.handlers.RotatingFileHandler( + config.get('LOG_FILE', 'survey_streaming_success_log_filename') +) +successBackuphandler = TimedRotatingFileHandler( + config.get('LOG_FILE', 'survey_streaming_success_log_filename'), + when="w0", + backupCount=1 +) +successHandler.setFormatter(formatter) +successLogger.addHandler(successHandler) +successLogger.addHandler(successBackuphandler) + +errorLogger = logging.getLogger('error log') +errorLogger.setLevel(logging.ERROR) +errorHandler = logging.handlers.RotatingFileHandler( + config.get('LOG_FILE', 'survey_streaming_error_log_filename') +) +errorBackuphandler = TimedRotatingFileHandler( + config.get('LOG_FILE', 'survey_streaming_error_log_filename'), + when="w0", + backupCount=1 +) +errorHandler.setFormatter(formatter) +errorLogger.addHandler(errorHandler) +errorLogger.addHandler(errorBackuphandler) + +try: + kafka_url = (config.get("KAFKA", "url")) + + app = faust.App( + 'sl_survey_prod_faust', + broker='kafka://' + kafka_url, + value_serializer='raw', + web_port=7004, + broker_max_poll_records=500 + ) + rawTopicName = app.topic(config.get("KAFKA", "raw_data_topic")) + producer = KafkaProducer(bootstrap_servers=[config.get("KAFKA", "url")]) + # db production + clientqa = MongoClient(config.get('MONGO', 'url')) + dbqa = clientqa[config.get('MONGO', 'db')] + + surveySubmissionsQACollec = dbqa[config.get('MONGO', 'surveySubmissionsCollec')] + solutionsQACollec = dbqa[config.get('MONGO', 'solutionsCollec')] + surveyQACollec = dbqa[config.get('MONGO', 'surveysCollec')] + entityTypeQACollec = dbqa[config.get('MONGO', 'entityTypeCollec')] + questionsQACollec = dbqa[config.get('MONGO', 'questionsCollec')] + criteriaQACollec = dbqa[config.get('MONGO', 'criteriaCollec')] + entitiesQACollec = dbqa[config.get('MONGO', 'entitiesCollec')] + programsQACollec = dbqa[config.get('MONGO', 'programsCollec')] +except Exception as e: + errorLogger.error(e, exc_info=True) + +try: + def get_keycloak_access_token(): + url = config.get("KEYCLOAK", "url") + headers = {'Content-Type': 'application/x-www-form-urlencoded'} + body = { + "grant_type": config.get("KEYCLOAK", "grant_type"), + "client_id": config.get("KEYCLOAK", "client_id"), + "refresh_token": config.get("KEYCLOAK", "refresh_token") + } + response = requests.post( + url, data=body, headers=headers + ) + if response.status_code == 200: + successLogger.debug("getKeycloak api") + return response.json() + else: + errorLogger.error("Failure in getKeycloak API") + errorLogger.error(response) + errorLogger.error(response.text) + +except Exception as e: + errorLogger.error(e, exc_info=True) + +try: + def read_user(user_id, access_token): + urlReadUser = config.get("SUNBIRD", "base_url_ip") + "/" + \ + config.get("ENDPOINT", "read_user") + "/" + \ + str(user_id) + headersReadUser = { + 'Content-Type': config.get("COMMON", "content_type"), + 'Authorization': "Bearer " + config.get("COMMON", "authorization"), + 'X-authenticated-user-token': access_token, + 'X-Channel-id': config.get("COMMON", "channel-id") + } + responseReadUser = requests.get(urlReadUser, headers=headersReadUser) + if responseReadUser.status_code == 200: + successLogger.debug("read user api") + return responseReadUser.json() + else: + errorLogger.error("Failure in read user api") + errorLogger.error(responseReadUser) + errorLogger.error(responseReadUser.text) +except Exception as e: + errorLogger.error(e, exc_info=True) + +try: + def obj_creation(msg_id): + data_keycloak = get_keycloak_access_token() + tokenKeyCheck = "access_token" in data_keycloak + if tokenKeyCheck: + accessToken = data_keycloak['access_token'] + successLogger.debug("Survey Submission Id : " + str(msg_id)) + cursorMongo = surveySubmissionsQACollec.find({'_id': ObjectId(msg_id)}, no_cursor_timeout=True) + for obSub in cursorMongo: + completedDate = str(datetime.datetime.date(obSub['completedDate'])) + 'T' + str( + datetime.datetime.time(obSub['completedDate'])) + 'Z' + createdAt = str(datetime.datetime.date(obSub['createdAt'])) + 'T' + str( + datetime.datetime.time(obSub['createdAt'])) + 'Z' + updatedAt = str(datetime.datetime.date(obSub['updatedAt'])) + 'T' + str( + datetime.datetime.time(obSub['updatedAt'])) + 'Z' + evidencesArr = [v for v in obSub['evidences'].values()] + evidence_sub_count = 0 + + # fetch user name from postgres with the help of keycloak id + queryJsonOutput = {} + queryJsonOutput = read_user(obSub["createdBy"], accessToken) + if queryJsonOutput["result"]["response"]["userName"]: + if 'answers' in obSub.keys(): + answersArr = [v for v in obSub['answers'].values()] + for ans in answersArr: + try: + if len(ans['fileName']): + evidence_sub_count = evidence_sub_count + len(ans['fileName']) + except KeyError: + pass + for ans in answersArr: + def sequence_number(external_id, answer): + for sol in solutionsQACollec.find({'externalId': obSub['solutionExternalId']}): + section = [k for k in sol['sections'].keys()] + # parsing through questionSequencebyecm to get the sequence number + try: + for num in range(len( + sol['questionSequenceByEcm'][answer['evidenceMethod']][section[0]])): + if sol['questionSequenceByEcm'][answer['evidenceMethod']][section[0]][num] == external_id: + return num + 1 + except KeyError: + pass + + def creatingObj(answer, quesexternalId, ans_val, instNumber, responseLabel): + surveySubQuestionsObj = { + 'userName': obSub['evidencesStatus'][0]['submissions'][0]['submittedByName'] + } + surveySubQuestionsObj['userName'] = surveySubQuestionsObj['userName'].replace("null", + "") + try: + surveySubQuestionsObj['appName'] = obSub["appInformation"]["appName"].lower() + except KeyError: + surveySubQuestionsObj['appName'] = config.get("COMMON", "diksha_survey_app_name") + surveySubQuestionsObj['surveySubmissionId'] = str(obSub['_id']) + + surveySubQuestionsObj['createdBy'] = obSub['createdBy'] + + try: + if obSub['isAPrivateProgram']: + surveySubQuestionsObj['isAPrivateProgram'] = obSub['isAPrivateProgram'] + else: + surveySubQuestionsObj['isAPrivateProgram'] = False + except KeyError: + surveySubQuestionsObj['isAPrivateProgram'] = False + pass + + try: + surveySubQuestionsObj['programExternalId'] = obSub['programExternalId'] + except KeyError: + surveySubQuestionsObj['programExternalId'] = None + try: + surveySubQuestionsObj['programId'] = str(obSub['programId']) + except KeyError: + surveySubQuestionsObj['programId'] = None + try: + for program in programsQACollec.find({'externalId': obSub['programExternalId']}): + surveySubQuestionsObj['programName'] = program['name'] + except KeyError: + surveySubQuestionsObj['programName'] = None + + surveySubQuestionsObj['solutionExternalId'] = obSub['solutionExternalId'] + surveySubQuestionsObj['surveyId'] = str(obSub['surveyId']) + for solu in solutionsQACollec.find({'externalId': obSub['solutionExternalId']}): + surveySubQuestionsObj['solutionId'] = str(solu["_id"]) + surveySubQuestionsObj['solutionName'] = solu['name'] + section = [k for k in solu['sections'].keys()] + surveySubQuestionsObj['section'] = section[0] + surveySubQuestionsObj['questionSequenceByEcm'] = sequence_number( + quesexternalId, answer + ) + try: + if solu['scoringSystem'] == 'pointsBasedScoring': + surveySubQuestionsObj['totalScore'] = obSub['pointsBasedMaxScore'] + surveySubQuestionsObj['scoreAchieved'] = obSub['pointsBasedScoreAchieved'] + surveySubQuestionsObj['totalpercentage'] = obSub[ + 'pointsBasedPercentageScore'] + surveySubQuestionsObj['maxScore'] = answer['maxScore'] + surveySubQuestionsObj['minScore'] = answer['scoreAchieved'] + surveySubQuestionsObj['percentageScore'] = answer['percentageScore'] + surveySubQuestionsObj['pointsBasedScoreInParent'] = answer[ + 'pointsBasedScoreInParent'] + except KeyError: + pass + + for ob in surveyQACollec.find({'_id': obSub['surveyId']}): + surveySubQuestionsObj['surveyName'] = ob['name'] + surveySubQuestionsObj['questionId'] = str(answer['qid']) + surveySubQuestionsObj['questionAnswer'] = ans_val + surveySubQuestionsObj['questionResponseType'] = answer['responseType'] + if answer['responseType'] == 'number': + if answer['payload']['labels']: + surveySubQuestionsObj['questionResponseLabel_number'] = responseLabel + else: + surveySubQuestionsObj['questionResponseLabel_number'] = '' + if answer['payload']['labels']: + surveySubQuestionsObj['questionResponseLabel'] = responseLabel + else: + surveySubQuestionsObj['questionResponseLabel'] = '' + surveySubQuestionsObj['questionExternalId'] = quesexternalId + surveySubQuestionsObj['questionName'] = answer['payload']['question'][0] + surveySubQuestionsObj['questionECM'] = answer['evidenceMethod'] + surveySubQuestionsObj['criteriaId'] = str(answer['criteriaId']) + for crit in criteriaQACollec.find({'_id': ObjectId(answer['criteriaId'])}): + surveySubQuestionsObj['criteriaExternalId'] = crit['externalId'] + surveySubQuestionsObj['criteriaName'] = crit['name'] + surveySubQuestionsObj['completedDate'] = completedDate + surveySubQuestionsObj['createdAt'] = createdAt + surveySubQuestionsObj['updatedAt'] = updatedAt + surveySubQuestionsObj['remarks'] = answer['remarks'] + if len(answer['fileName']): + multipleFiles = None + fileCnt = 1 + for filedetail in answer['fileName']: + if fileCnt == 1: + multipleFiles = config.get('STORAGE', 'base_url') + filedetail['sourcePath'] + fileCnt = fileCnt + 1 + else: + multipleFiles = multipleFiles + ' , ' + config.get('STORAGE', 'base_url') + \ + filedetail['sourcePath'] + surveySubQuestionsObj['evidences'] = multipleFiles + surveySubQuestionsObj['evidence_count'] = len(answer['fileName']) + surveySubQuestionsObj['total_evidences'] = evidence_sub_count + # to fetch the parent question of matrix + if ans['responseType'] == 'matrix': + surveySubQuestionsObj['instanceParentQuestion'] = ans['payload']['question'][0] + surveySubQuestionsObj['instanceParentId'] = ans['qid'] + surveySubQuestionsObj['instanceParentResponsetype'] = ans['responseType'] + surveySubQuestionsObj['instanceParentCriteriaId'] = ans['criteriaId'] + for crit in criteriaQACollec.find({'_id': ObjectId(ans['criteriaId'])}): + surveySubQuestionsObj['instanceParentCriteriaExternalId'] = crit['externalId'] + surveySubQuestionsObj['instanceParentCriteriaName'] = crit['name'] + surveySubQuestionsObj['instanceId'] = instNumber + for ques in questionsQACollec.find({'_id': ObjectId(ans['qid'])}): + surveySubQuestionsObj['instanceParentExternalId'] = ques['externalId'] + surveySubQuestionsObj['instanceParentEcmSequence'] = sequence_number( + observationSubQuestionsObj['instanceParentExternalId'], answer + ) + else: + surveySubQuestionsObj['instanceParentQuestion'] = '' + surveySubQuestionsObj['instanceParentId'] = '' + surveySubQuestionsObj['instanceParentResponsetype'] = '' + surveySubQuestionsObj['instanceId'] = instNumber + surveySubQuestionsObj['instanceParentExternalId'] = '' + surveySubQuestionsObj['instanceParentEcmSequence'] = '' + surveySubQuestionsObj['user_id'] = queryJsonOutput["result"]["response"]["userName"] + surveySubQuestionsObj['channel'] = queryJsonOutput["result"]["response"]["rootOrgId"] + surveySubQuestionsObj['parent_channel'] = config.get('COMMON', 'parent_channel') + return surveySubQuestionsObj + + # fetching the question details from questions collection + def fetching_question_details(ansFn, instNumber): + for ques in questionsQACollec.find({'_id': ObjectId(ansFn['qid'])}): + # surveySubQuestionsArr.append('t') + if len(ques['options']) == 0: + try: + if len(ansFn['payload']['labels']) > 0: + finalObj = {} + finalObj = creatingObj( + ansFn, ques['externalId'], ansFn['value'], + instNumber, ansFn['payload']['labels'][0] + ) + producer.send( + (config.get("KAFKA", "druid_topic")), + json.dumps(finalObj).encode('utf-8') + ) + producer.flush() + successLogger.debug("Send Obj to Kafka") + except KeyError: + pass + else: + labelIndex = 0 + for quesOpt in ques['options']: + try: + if type(ansFn['value']) == str or type(ansFn['value']) == int: + if quesOpt['value'] == ansFn['value']: + finalObj = {} + finalObj = creatingObj( + ansFn, ques['externalId'], + ansFn['value'], instNumber, + ansFn['payload']['labels'][0] + ) + producer.send( + (config.get("KAFKA", "druid_topic")), + json.dumps(finalObj).encode('utf-8') + ) + producer.flush() + successLogger.debug("Send Obj to Kafka") + elif type(ansFn['value']) == list: + for ansArr in ansFn['value']: + if quesOpt['value'] == ansArr: + finalObj = {} + finalObj = creatingObj( + ansFn, ques['externalId'], ansArr, + instNumber, quesOpt['label'] + ) + producer.send( + (config.get("KAFKA", "druid_topic")), + json.dumps(finalObj).encode('utf-8') + ) + producer.flush() + successLogger.debug("Send Obj to Kafka") + except KeyError: + pass + # to check the value is null ie is not answered + try: + if type(ansFn['value']) == str and ansFn['value'] == '': + finalObj = {} + finalObj = creatingObj( + ansFn, ques['externalId'], ansFn['value'], + instNumber, None + ) + producer.send( + (config.get("KAFKA", "druid_topic")), + json.dumps(finalObj).encode('utf-8') + ) + producer.flush() + successLogger.debug("Send Obj to Kafka") + except KeyError: + pass + + if ans['responseType'] == 'text' or ans['responseType'] == 'radio' or \ + ans['responseType'] == 'multiselect' or ans['responseType'] == 'slider' or \ + ans['responseType'] == 'number' or ans['responseType'] == 'date': + inst_cnt = '' + fetching_question_details(ans, inst_cnt) + elif ans['responseType'] == 'matrix' and len(ans['value']) > 0: + inst_cnt = 0 + for instances in ans['value']: + inst_cnt = inst_cnt + 1 + for instance in instances.values(): + fetching_question_details(instance, inst_cnt) + + cursorMongo.close() +except Exception as e: + errorLogger.error(e, exc_info=True) + +try: + @app.agent(rawTopicName) + async def surveyFaust(consumer): + async for msg in consumer: + msg_val = msg.decode('utf-8') + msg_data = json.loads(msg_val) + successLogger.debug("========== START OF SURVEY SUBMISSION ========") + obj_creation(msg_data['_id']) + successLogger.debug("********* END OF SURVEY SUBMISSION ***********") +except Exception as e: + errorLogger.error(e, exc_info=True) + +if __name__ == '__main__': + app.main() From 4dcb38622ae9ee49a7d200654d6da4935948ae34 Mon Sep 17 00:00:00 2001 From: Arunachalam Date: Tue, 9 Mar 2021 20:57:39 +0530 Subject: [PATCH 03/13] Update readme and add requirments.txt file --- README.md | 22 +++++++++++++++++++++- requirements.txt | 1 + 2 files changed, 22 insertions(+), 1 deletion(-) create mode 100644 requirements.txt diff --git a/README.md b/README.md index 3018356..6c21360 100644 --- a/README.md +++ b/README.md @@ -1 +1,21 @@ -# ml-analytics-service \ No newline at end of file +# ml-analytics-service + +#### Prerequisites +1. Python3.6.x or above +1. Python virtual environment to work with + +#### Setup + +Each folder consists of its own config files and programs. Navigate into respective folder and create a file with the name `config.ini` and copy the contents from `config.sample` present in the respective folders. + +Replace the `<>` with appropriate values in the `config.ini` and then save. + +Activate the virtual environment created earlier by running the following command `. /bin/activate` + +Run the following command to install all the dependencies `pip install -r requirements.txt` + +#### Execution + +For programs that start with `py_*` are a general ones. So use `python ` + +For programs that start with `pyspark_*` use `spark-submit ` \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..c1e7995 --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +Add the pip modules in here \ No newline at end of file From 294a285c6aa6e3c55c24ea231c655fa735c2ecec Mon Sep 17 00:00:00 2001 From: Arunachalam E <58763977+arunachalam-nd@users.noreply.github.com> Date: Sun, 21 Mar 2021 16:38:08 +0530 Subject: [PATCH 04/13] Update pyspark_observation_status_batch.py --- observations/pyspark_observation_status_batch.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/observations/pyspark_observation_status_batch.py b/observations/pyspark_observation_status_batch.py index eb2f806..85b6a5b 100644 --- a/observations/pyspark_observation_status_batch.py +++ b/observations/pyspark_observation_status_batch.py @@ -59,9 +59,9 @@ def get_keyclock_accesstoken(): url_getkeyclock = config.get("URL","url_getkeyclock") headers_getkeyclock = {'Content-Type': 'application/x-www-form-urlencoded'} - body_getkeyclock = {"grant_type":config.get("API_HEADESRS","grant_type"), - "client_id":config.get("API_HEADESRS","client_id"), - "refresh_token":config.get("API_HEADESRS","refresh_token")} + body_getkeyclock = {"grant_type":config.get("API_HEADERS","grant_type"), + "client_id":config.get("API_HEADERS","client_id"), + "refresh_token":config.get("API_HEADERS","refresh_token")} responsegetkeyclock = requests.post(url_getkeyclock, data=body_getkeyclock,headers=headers_getkeyclock) if responsegetkeyclock.status_code == 200: @@ -81,7 +81,7 @@ def searchUser(accessToken,userId): + "/" + str(userId) + queryStringReadUser headersReadUser ={ 'Content-Type': config.get("API_HEADERS","content_type"), - 'Authorization': "Bearer "+ config.get("API_HEADESRS","authorization"), + 'Authorization': "Bearer "+ config.get("API_HEADERS","authorization"), 'X-authenticated-user-token': accessToken } From 91cec8bc85c3b058ae13258ca711f6d5a9961114 Mon Sep 17 00:00:00 2001 From: Arunachalam E <58763977+arunachalam-nd@users.noreply.github.com> Date: Sun, 21 Mar 2021 16:47:56 +0530 Subject: [PATCH 05/13] Update py_observation_streaming.py --- observations/py_observation_streaming.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/observations/py_observation_streaming.py b/observations/py_observation_streaming.py index e61ebc7..24c0d97 100755 --- a/observations/py_observation_streaming.py +++ b/observations/py_observation_streaming.py @@ -92,9 +92,9 @@ def removeduplicate(it): def get_keyclock_accesstoken(): url_getkeyclock = config.get("URL","url_getkeyclock") headers_getkeyclock = {'Content-Type': 'application/x-www-form-urlencoded'} - body_getkeyclock = {"grant_type":config.get("API_HEADESRS","grant_type"), - "client_id":config.get("API_HEADESRS","client_id"), - "refresh_token":config.get("API_HEADESRS","refresh_token")} + body_getkeyclock = {"grant_type":config.get("API_HEADERS","grant_type"), + "client_id":config.get("API_HEADERS","client_id"), + "refresh_token":config.get("API_HEADERS","refresh_token")} responsegetkeyclock = requests.post(url_getkeyclock, data=body_getkeyclock,headers=headers_getkeyclock) if responsegetkeyclock.status_code == 200: @@ -112,9 +112,9 @@ def getRelatedEntity(entityId,accessToken): urlEntityRelated = config.get("URL","base_url") + "/" + config.get("URL","url_entity_related") + str(entityId) headersEntityRelated ={ 'Content-Type': config.get("API_HEADERS","content_type"), - 'Authorization': "Bearer "+ config.get("API_HEADESRS","authorization"), + 'Authorization': "Bearer "+ config.get("API_HEADERS","authorization"), 'X-authenticated-user-token': accessToken, - 'X-Channel-id' : config.get("API_HEADESRS","channel-id") + 'X-Channel-id' : config.get("API_HEADERS","channel-id") } responseEntityRelated = requests.get(urlEntityRelated, headers=headersEntityRelated) if responseEntityRelated.status_code == 200 : @@ -132,9 +132,9 @@ def syncUser(userId,accessToken): urlSyncUser = config.get("URL","sunbird_api_base_url_ip") + "/" + config.get("URL","sunbrid_api_url_syncuser") headersSyncUser ={ 'Content-Type': config.get("API_HEADERS","content_type"), - 'Authorization': "Bearer "+ config.get("API_HEADESRS","authorization"), + 'Authorization': "Bearer "+ config.get("API_HEADERS","authorization"), 'X-authenticated-user-token': accessToken, - 'X-Channel-id' : config.get("API_HEADESRS","channel-id") + 'X-Channel-id' : config.get("API_HEADERS","channel-id") } body_sync_user = {"params": {},"request": {"objectType": "user","objectIds": [userId]}} responseSyncUser = requests.post(urlSyncUser, headers=headersSyncUser,data=json.dumps(body_sync_user)) @@ -155,9 +155,9 @@ def readUser(userId,accessToken,userSyncCnt): + "/" + str(userId) + queryStringReadUser headersReadUser ={ 'Content-Type': config.get("API_HEADERS","content_type"), - 'Authorization': "Bearer "+ config.get("API_HEADESRS","authorization"), + 'Authorization': "Bearer "+ config.get("API_HEADERS","authorization"), 'X-authenticated-user-token': accessToken, - 'X-Channel-id' : config.get("API_HEADESRS","channel-id") + 'X-Channel-id' : config.get("API_HEADERS","channel-id") } responseReadUser = requests.get(urlReadUser, headers=headersReadUser) if responseReadUser.status_code == 200 : @@ -181,10 +181,10 @@ def readUser(userId,accessToken,userSyncCnt): def getUserRoles(userId,accessToken): urlUserRoles = config.get("URL","base_url") + "/" + config.get("URL","url_user_profile_api") + str(userId) headersUserRoles ={ - 'Content-Type': config.get("API_HEADESRS","content_type"), - 'Authorization': "Bearer "+ config.get("API_HEADESRS","authorization"), + 'Content-Type': config.get("API_HEADERS","content_type"), + 'Authorization': "Bearer "+ config.get("API_HEADERS","authorization"), 'X-authenticated-user-token': accessToken, - 'X-Channel-id' : config.get("API_HEADESRS","channel-id") + 'X-Channel-id' : config.get("API_HEADERS","channel-id") } responseUserRoles = requests.get(urlUserRoles, headers=headersUserRoles) if responseUserRoles.status_code == 200 : From 0beefefdf8ea6e49395aa762876344c23a4bb4a7 Mon Sep 17 00:00:00 2001 From: Arunachalam E <58763977+arunachalam-nd@users.noreply.github.com> Date: Mon, 22 Mar 2021 15:23:03 +0530 Subject: [PATCH 06/13] Update pyspark_observation_status_batch.py --- observations/pyspark_observation_status_batch.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/observations/pyspark_observation_status_batch.py b/observations/pyspark_observation_status_batch.py index 85b6a5b..1723cc5 100644 --- a/observations/pyspark_observation_status_batch.py +++ b/observations/pyspark_observation_status_batch.py @@ -129,13 +129,13 @@ def convert_to_row(d: dict) -> Row: clientProd = MongoClient(config.get('MONGO','mongo_url')) dbProd = clientProd[config.get('MONGO','database_name')] -obsSubmissionsCollec = dbProd[config.get('MONGO','observation_submissions_collec')] +obsSubmissionsCollec = dbProd[config.get('MONGO','observation_sub_collec')] -solutionCollec = dbProd[config.get('MONGO','solution_collec')] +solutionCollec = dbProd[config.get('MONGO','solutions_collec')] userRolesCollec = dbProd[config.get("MONGO","user_roles_collection")] -programCollec = dbProd[config.get("MONGO","program_collec")] +programCollec = dbProd[config.get("MONGO","programs_collec")] #observation submission dataframe obs_sub_cursorMongo = obsSubmissionsCollec.aggregate([{"$project": {"_id": {"$toString": "$_id"}, @@ -165,7 +165,7 @@ def convert_to_row(d: dict) -> Row: StructField('programExternalId', StringType(), True), StructField('_id', StringType(), True), StructField('updatedAt', TimestampType(), True), - StructField('entityInformtion',StructType([ + StructField('entityInformation',StructType([ StructField('name', StringType(), True) ])), StructField('appInformation',StructType([ @@ -546,7 +546,7 @@ def entityElasticSearchJson(entityJsonData): datasources = ["sl-observation-status"] -sl_status_spec = config.get("DRUID","sl_observation_status_spec") +sl_status_spec = config.get("DRUID","observation_status_spec") ingestion_specs = [sl_status_spec] From 7953f768083b1abad0e6e0187f4429627beeafa6 Mon Sep 17 00:00:00 2001 From: Arunachalam E <58763977+arunachalam-nd@users.noreply.github.com> Date: Mon, 22 Mar 2021 15:25:32 +0530 Subject: [PATCH 07/13] Update config.sample --- observations/config.sample | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/observations/config.sample b/observations/config.sample index c2415ef..87a8609 100644 --- a/observations/config.sample +++ b/observations/config.sample @@ -37,10 +37,9 @@ url_getkeyclock = https:///auth/realms/sunbird/protocol/op url_entity_related = assessment/api/v1/entities/relatedEntities/ url_entity_relatedapi = http://:/assessment/api/v1/entities/relatedEntities/ - sunbird_api_base_url_ip = http://:/api -sunbrid_api_url_readuser = user/v1/read -sunbrid_api_url_syncuser = data/v1/index/sync +sunbird_api_url_readuser = user/v1/read +sunbird_api_url_syncuser = data/v1/index/sync url_user_profile_api = assessment/api/v1/userExtension/getProfile/ @@ -173,4 +172,4 @@ sas_token = container_name = -blob_path = \ No newline at end of file +blob_path = From e46820ff9fb1fd3c8cffc906d1cb548974ac74d2 Mon Sep 17 00:00:00 2001 From: Arunachalam E <58763977+arunachalam-nd@users.noreply.github.com> Date: Mon, 22 Mar 2021 17:07:53 +0530 Subject: [PATCH 08/13] Update py_survey_streaming.py --- survey/py_survey_streaming.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/survey/py_survey_streaming.py b/survey/py_survey_streaming.py index 9670a24..78fbcec 100755 --- a/survey/py_survey_streaming.py +++ b/survey/py_survey_streaming.py @@ -112,7 +112,7 @@ def read_user(user_id, access_token): 'Content-Type': config.get("COMMON", "content_type"), 'Authorization': "Bearer " + config.get("COMMON", "authorization"), 'X-authenticated-user-token': access_token, - 'X-Channel-id': config.get("COMMON", "channel-id") + 'X-Channel-id': config.get("COMMON", "parent_channel") } responseReadUser = requests.get(urlReadUser, headers=headersReadUser) if responseReadUser.status_code == 200: From 33ec1de48f6ade975e81c24d13c5013ca49fc90a Mon Sep 17 00:00:00 2001 From: Arunachalam E <58763977+arunachalam-nd@users.noreply.github.com> Date: Mon, 22 Mar 2021 17:26:19 +0530 Subject: [PATCH 09/13] Update py_survey_evidence_streaming.py --- survey/py_survey_evidence_streaming.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/survey/py_survey_evidence_streaming.py b/survey/py_survey_evidence_streaming.py index 868d107..7b5c7d4 100755 --- a/survey/py_survey_evidence_streaming.py +++ b/survey/py_survey_evidence_streaming.py @@ -51,6 +51,18 @@ errorLogger.addHandler(errorHandler) errorLogger.addHandler(errorBackuphandler) +# db production +clientqa = MongoClient(config.get('MONGO', 'url')) +dbqa = clientqa[config.get('MONGO', 'db')] + +surveySubmissionsCollec = dbqa[config.get('MONGO', 'surveySubmissionsCollec')] +solutionsDevCollec = dbqa[config.get('MONGO', 'solutionsCollec')] +surveysCollec = dbqa[config.get('MONGO', 'surveysCollec')] +entityTypeDevCollec = dbqa[config.get('MONGO', 'entityTypeCollec')] +questionsDevCollec = dbqa[config.get('MONGO', 'questionsCollec')] +criteriaDevCollec = dbqa[config.get('MONGO', 'criteriaCollec')] +entitiesDevCollec = dbqa[config.get('MONGO', 'entitiesCollec')] + try: app = faust.App( 'sl_py_survey_evidence_prod', @@ -61,18 +73,6 @@ kafka_url = (config.get("KAFKA", "url")) producer = KafkaProducer(bootstrap_servers=[kafka_url]) - - # db production - clientqa = MongoClient(config.get('MONGO', 'url')) - dbqa = clientqa[config.get('MONGO', 'db')] - - surveySubmissionsCollec = dbqa[config.get('MONGO', 'surveySubmissionsCollec')] - solutionsDevCollec = dbqa[config.get('MONGO', 'solutionsCollec')] - surveysCollec = dbqa[config.get('MONGO', 'surveysCollec')] - entityTypeDevCollec = dbqa[config.get('MONGO', 'entityTypeCollec')] - questionsDevCollec = dbqa[config.get('MONGO', 'questionsCollec')] - criteriaDevCollec = dbqa[config.get('MONGO', 'criteriaCollec')] - entitiesDevCollec = dbqa[config.get('MONGO', 'entitiesCollec')] except Exception as e: errorLogger.error(e, exc_info=True) From 197b5cc41130f62c307d482c7d5bc1b4091b5726 Mon Sep 17 00:00:00 2001 From: Arunachalam E <58763977+arunachalam-nd@users.noreply.github.com> Date: Tue, 23 Mar 2021 11:15:24 +0530 Subject: [PATCH 10/13] Update py_observation_streaming.py --- observations/py_observation_streaming.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/observations/py_observation_streaming.py b/observations/py_observation_streaming.py index 24c0d97..1abb054 100755 --- a/observations/py_observation_streaming.py +++ b/observations/py_observation_streaming.py @@ -129,7 +129,7 @@ def getRelatedEntity(entityId,accessToken): try: def syncUser(userId,accessToken): - urlSyncUser = config.get("URL","sunbird_api_base_url_ip") + "/" + config.get("URL","sunbrid_api_url_syncuser") + urlSyncUser = config.get("URL","sunbird_api_base_url_ip") + "/" + config.get("URL","sunbird_api_url_syncuser") headersSyncUser ={ 'Content-Type': config.get("API_HEADERS","content_type"), 'Authorization': "Bearer "+ config.get("API_HEADERS","authorization"), @@ -151,7 +151,7 @@ def syncUser(userId,accessToken): try: def readUser(userId,accessToken,userSyncCnt): queryStringReadUser = "?fields=completeness%2CmissingFields%2ClastLoginTime%2Ctopics%2Corganisations%2Croles%2Clocations%2Cdeclarations" - urlReadUser = config.get("URL","sunbird_api_base_url_ip") + "/" + config.get("URL","sunbrid_api_url_readuser") \ + urlReadUser = config.get("URL","sunbird_api_base_url_ip") + "/" + config.get("URL","sunbird_api_url_readuser") \ + "/" + str(userId) + queryStringReadUser headersReadUser ={ 'Content-Type': config.get("API_HEADERS","content_type"), From 7cd90db0381da234a65b3e875bc42ec2e6ba1401 Mon Sep 17 00:00:00 2001 From: Arunachalam E <58763977+arunachalam-nd@users.noreply.github.com> Date: Fri, 26 Mar 2021 09:30:03 +0530 Subject: [PATCH 11/13] Update config.sample --- survey/config.sample | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/survey/config.sample b/survey/config.sample index 2967e61..a2aaab3 100644 --- a/survey/config.sample +++ b/survey/config.sample @@ -17,7 +17,7 @@ base_url_ip = http:///api [ENDPOINT] # -------------------- -read_user = user/v1/read +read_user = user/v3/read # -------------------- [STORAGE] From f7f23fd004ef4945164783eec24d6c443900052a Mon Sep 17 00:00:00 2001 From: Arunachalam E <58763977+arunachalam-nd@users.noreply.github.com> Date: Fri, 26 Mar 2021 09:32:37 +0530 Subject: [PATCH 12/13] Update py_survey_streaming.py --- survey/py_survey_streaming.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/survey/py_survey_streaming.py b/survey/py_survey_streaming.py index 78fbcec..e5e28ba 100755 --- a/survey/py_survey_streaming.py +++ b/survey/py_survey_streaming.py @@ -105,9 +105,10 @@ def get_keycloak_access_token(): try: def read_user(user_id, access_token): + queryStringReadUser = "?fields=completeness%2CmissingFields%2ClastLoginTime%2Ctopics%2Corganisations%2Croles%2Clocations%2Cdeclarations" urlReadUser = config.get("SUNBIRD", "base_url_ip") + "/" + \ config.get("ENDPOINT", "read_user") + "/" + \ - str(user_id) + str(user_id) + queryStringReadUser headersReadUser = { 'Content-Type': config.get("COMMON", "content_type"), 'Authorization': "Bearer " + config.get("COMMON", "authorization"), From 1cba90e9ef1cf6a15bd06949872ff014e1c04556 Mon Sep 17 00:00:00 2001 From: Arunachalam E <58763977+arunachalam-nd@users.noreply.github.com> Date: Fri, 26 Mar 2021 09:33:59 +0530 Subject: [PATCH 13/13] Update py_survey_streaming.py --- survey/py_survey_streaming.py | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/survey/py_survey_streaming.py b/survey/py_survey_streaming.py index e5e28ba..e3c02dc 100755 --- a/survey/py_survey_streaming.py +++ b/survey/py_survey_streaming.py @@ -53,6 +53,19 @@ errorLogger.addHandler(errorHandler) errorLogger.addHandler(errorBackuphandler) +# db production +clientqa = MongoClient(config.get('MONGO', 'url')) +dbqa = clientqa[config.get('MONGO', 'db')] + +surveySubmissionsQACollec = dbqa[config.get('MONGO', 'surveySubmissionsCollec')] +solutionsQACollec = dbqa[config.get('MONGO', 'solutionsCollec')] +surveyQACollec = dbqa[config.get('MONGO', 'surveysCollec')] +entityTypeQACollec = dbqa[config.get('MONGO', 'entityTypeCollec')] +questionsQACollec = dbqa[config.get('MONGO', 'questionsCollec')] +criteriaQACollec = dbqa[config.get('MONGO', 'criteriaCollec')] +entitiesQACollec = dbqa[config.get('MONGO', 'entitiesCollec')] +programsQACollec = dbqa[config.get('MONGO', 'programsCollec')] + try: kafka_url = (config.get("KAFKA", "url")) @@ -65,18 +78,6 @@ ) rawTopicName = app.topic(config.get("KAFKA", "raw_data_topic")) producer = KafkaProducer(bootstrap_servers=[config.get("KAFKA", "url")]) - # db production - clientqa = MongoClient(config.get('MONGO', 'url')) - dbqa = clientqa[config.get('MONGO', 'db')] - - surveySubmissionsQACollec = dbqa[config.get('MONGO', 'surveySubmissionsCollec')] - solutionsQACollec = dbqa[config.get('MONGO', 'solutionsCollec')] - surveyQACollec = dbqa[config.get('MONGO', 'surveysCollec')] - entityTypeQACollec = dbqa[config.get('MONGO', 'entityTypeCollec')] - questionsQACollec = dbqa[config.get('MONGO', 'questionsCollec')] - criteriaQACollec = dbqa[config.get('MONGO', 'criteriaCollec')] - entitiesQACollec = dbqa[config.get('MONGO', 'entitiesCollec')] - programsQACollec = dbqa[config.get('MONGO', 'programsCollec')] except Exception as e: errorLogger.error(e, exc_info=True)