Skip to content

Commit

Permalink
fixed linting
Browse files Browse the repository at this point in the history
  • Loading branch information
ftisiot committed May 30, 2022
1 parent 21a486d commit c88facb
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 88 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ code/kafkaEndpointConf.py
code/opensearch_sink.json
certs
code/__pycache__
.vscode
212 changes: 124 additions & 88 deletions code/app.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# app.py
"""Flask app for pizzerias"""

import json
import time
Expand All @@ -8,158 +8,194 @@

TOPIC_NAME = "pizza-orders"
TOPIC_DELIVERY_NAME = "pizza-delivery"
KAFKA_SERVER = HOST +":" +str(PORT)
KAFKA_SERVER = HOST + ":" + str(PORT)
CERTS_FOLDER = "certs"
CONSUMER_GROUP = "pizza-consumers"
CONSUMER_GROUP_DELIVERY = "pizza-consumers"
CONSUMER_GROUP_CALC = "pizza-calculators"

### Definition of a Kafka Producer with SSL authentication and JSON serialization for value and key
# Definition of a Kafka Producer with SSL authentication
# and JSON serialization for value and key

producer = KafkaProducer(
bootstrap_servers=KAFKA_SERVER,
security_protocol="SSL",
ssl_cafile = CERTS_FOLDER+ "/ca.pem",
ssl_certfile = CERTS_FOLDER+ "/service.cert",
ssl_keyfile = CERTS_FOLDER+ "/service.key",
ssl_cafile=CERTS_FOLDER + "/ca.pem",
ssl_certfile=CERTS_FOLDER + "/service.cert",
ssl_keyfile=CERTS_FOLDER + "/service.key",
value_serializer=lambda v: json.dumps(v).encode('ascii'),
key_serializer=lambda v: json.dumps(v).encode('ascii')
)

app = Flask(__name__, template_folder='templates')

### The / page shows a list of available pizzas and other fields to fill to create an order
### which is then written into the pizza-orders Kafka topic
# The / page shows a list of available pizzas
# and other fields to fill to create an order
# which is then written into the pizza-orders Kafka topic


@app.route('/', methods=['GET', 'POST'])
def index():

"""Returning home page for Pizza"""
if request.method == 'POST':
producer.send(
TOPIC_NAME,
key={"caller":request.form.get("caller")},
value=
{
"caller":request.form.get("caller"),
"pizza":request.form.get("pizza"),
"address":request.form.get("address"),
"timestamp": int(time.time())
}
)

producer.flush()
key={"caller": request.form.get("caller")},
value={
"caller": request.form.get("caller"),
"pizza": request.form.get("pizza"),
"address": request.form.get("address"),
"timestamp": int(time.time())
}
)

producer.flush()
elif request.method == 'GET':
return render_template('index.html', form=request.form)

return render_template("index.html")

### is a function allowing the streaming of results back to the source page
# is a function allowing the streaming of results back to the source page


def stream_template(template_name, **context):
"""Enabling streaming back results to app"""
app.update_template_context(context)
t = app.jinja_env.get_template(template_name)
rv = t.stream(context)
return rv
template = app.jinja_env.get_template(template_name)
streaming = template.stream(context)
return streaming

# pizza-makers reads from the pizza-orders topic
# and allows pizzaioli to click on the pizzas they already made.
# Once the click is pushed, the /pizza-ready endpoint is called
# passing the order ID

### pizza-makers reads from the pizza-orders topic
### and allows pizzaioli to click on the pizzas they already made.
### Once the click is pushed, the /pizza-ready endpoint is called passing the order ID

@app.route('/pizza-makers')
def consume():
"""Returning pizza orders"""
consumer = KafkaConsumer(
client_id = "client1",
group_id = CONSUMER_GROUP,
bootstrap_servers = KAFKA_SERVER,
security_protocol = "SSL",
ssl_cafile = CERTS_FOLDER+"/ca.pem",
ssl_certfile = CERTS_FOLDER+"/service.cert",
ssl_keyfile = CERTS_FOLDER+"/service.key",
value_deserializer = lambda v: json.loads(v.decode('ascii')),
key_deserializer = lambda v: json.loads(v.decode('ascii')),
max_poll_records = 10,
client_id="client1",
group_id=CONSUMER_GROUP,
bootstrap_servers=KAFKA_SERVER,
security_protocol="SSL",
ssl_cafile=CERTS_FOLDER+"/ca.pem",
ssl_certfile=CERTS_FOLDER+"/service.cert",
ssl_keyfile=CERTS_FOLDER+"/service.key",
value_deserializer=lambda v: json.loads(v.decode('ascii')),
key_deserializer=lambda v: json.loads(v.decode('ascii')),
max_poll_records=10,
auto_offset_reset='earliest',
session_timeout_ms=6000,
heartbeat_interval_ms=3000
)
consumer.subscribe(topics=[TOPIC_NAME])

def consume_msg():
for message in consumer:
print(message.value)
yield [message.value["timestamp"], message.value["caller"],message.value["pizza"], message.value["address"], 1]

yield [
message.value["timestamp"],
message.value["caller"],
message.value["pizza"],
message.value["address"],
1]

return Response(stream_template('pizza-makers.html', data=consume_msg()))

### /pizza-ready/<id> receives the info about a pizza-order being in ready state from pizzaioli
### and adds it into the pizza-delivery topic
# /pizza-ready/<id> receives the info about a
# pizza-order being in ready state from pizzaioli
# and adds it into the pizza-delivery topic


@app.route('/pizza-ready/<id>', methods=['POST'])
def pizzaReady(id=None):
print(id)
@app.route('/pizza-ready/<my_id>', methods=['POST'])
def pizza_ready(my_id=None):
"""Endpoint to pass ready pizzas"""
print(my_id)
producer.send(
TOPIC_DELIVERY_NAME,
key={"timestamp":id},
key={"timestamp": my_id},
value=request.json
)
)
producer.flush()
return "OK"
return "OK"

# /pizza-calc simulates the billing person,
# reading from the pizza-orders topic
# but with a different consumer group,
# therefore receiving a copy of each message
# without conflicting with pizza-makers

### /pizza-calc simulates the billing person, reading from the pizza-orders topic
### but with a different consumer group, therefore receiving a copy of each message
### without conflicting with pizza-makers

@app.route('/pizza-calc')
def consumeCalc():
consumerCalc = KafkaConsumer(
client_id = "client3",
group_id = CONSUMER_GROUP_CALC,
bootstrap_servers = KAFKA_SERVER,
security_protocol = "SSL",
ssl_cafile = CERTS_FOLDER+"/ca.pem",
ssl_certfile = CERTS_FOLDER+"/service.cert",
ssl_keyfile = CERTS_FOLDER+"/service.key",
value_deserializer = lambda v: json.loads(v.decode('ascii')),
key_deserializer = lambda v: json.loads(v.decode('ascii')),
max_poll_records = 10,
def consume_calc():
"""Returning home page for Pizza calculators"""
consumer_calc = KafkaConsumer(
client_id="client3",
group_id=CONSUMER_GROUP_CALC,
bootstrap_servers=KAFKA_SERVER,
security_protocol="SSL",
ssl_cafile=CERTS_FOLDER+"/ca.pem",
ssl_certfile=CERTS_FOLDER+"/service.cert",
ssl_keyfile=CERTS_FOLDER+"/service.key",
value_deserializer=lambda v: json.loads(v.decode('ascii')),
key_deserializer=lambda v: json.loads(v.decode('ascii')),
max_poll_records=10,
auto_offset_reset='earliest',
session_timeout_ms=6000,
heartbeat_interval_ms=3000
)
consumerCalc.subscribe(topics=[TOPIC_NAME])
consumer_calc.subscribe(topics=[TOPIC_NAME])

def consume_msg():
for message in consumerCalc:
for message in consumer_calc:
print(message.value)
yield [message.value["timestamp"], message.value["caller"],message.value["pizza"], message.value["address"], 1]

return Response(stream_template('pizza-calculators.html', data=consume_msg()))
yield [
message.value["timestamp"],
message.value["caller"],
message.value["pizza"],
message.value["address"],
1]

return Response(
stream_template('pizza-calculators.html', data=consume_msg()))

# /pizza-delivery reads from the topic pizza-delivery
# and display the results of pizza ready for delivery

### /pizza-delivery reads from the topic pizza-delivery
### and display the results of pizza ready for delivery

@app.route('/pizza-delivery')
def consumeDelivery():
consumerDelivery = KafkaConsumer(
client_id = "clientDelivery",
group_id = CONSUMER_GROUP_DELIVERY,
bootstrap_servers = KAFKA_SERVER,
security_protocol = "SSL",
ssl_cafile = CERTS_FOLDER+"/ca.pem",
ssl_certfile = CERTS_FOLDER+"/service.cert",
ssl_keyfile = CERTS_FOLDER+"/service.key",
value_deserializer = lambda v: json.loads(v.decode('ascii')),
key_deserializer = lambda v: json.loads(v.decode('ascii')),
max_poll_records = 10,
def consume_delivery():
"""Returning home page for Pizza delivery"""
consumer_delivery = KafkaConsumer(
client_id="clientDelivery",
group_id=CONSUMER_GROUP_DELIVERY,
bootstrap_servers=KAFKA_SERVER,
security_protocol="SSL",
ssl_cafile=CERTS_FOLDER+"/ca.pem",
ssl_certfile=CERTS_FOLDER+"/service.cert",
ssl_keyfile=CERTS_FOLDER+"/service.key",
value_deserializer=lambda v: json.loads(v.decode('ascii')),
key_deserializer=lambda v: json.loads(v.decode('ascii')),
max_poll_records=10,
auto_offset_reset='earliest',
session_timeout_ms=6000,
heartbeat_interval_ms=3000
)
consumerDelivery.subscribe(topics=[TOPIC_DELIVERY_NAME])
consumer_delivery.subscribe(topics=[TOPIC_DELIVERY_NAME])

def consume_msg_delivery():
for message in consumerDelivery:
for message in consumer_delivery:
print(message.value)
yield [message.key["timestamp"], message.value["caller"], message.value["address"]]

return Response(stream_template('pizza-delivery.html', data=consume_msg_delivery()))
yield [
message.key["timestamp"],
message.value["caller"],
message.value["address"]
]

return Response(
stream_template('pizza-delivery.html', data=consume_msg_delivery()))


if __name__ == "__main__":
app.run(debug=True, port = 5000)
app.run(debug=True, port=5000)

0 comments on commit c88facb

Please sign in to comment.