Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP Pull Request for Issue #7 #8 ( Bryan + Minja) #9

Merged
merged 28 commits into from
Sep 29, 2018

Conversation

bdieu178
Copy link
Contributor

@bdieu178 bdieu178 commented Jul 22, 2018

#7 #8
@Bad-Science: @huangmj7 and I are testing the WebSocket for appropriate functionality atm.

@bdieu178
Copy link
Contributor Author

@huangmj7 Please run this updated PR again on your computer.

@bdieu178
Copy link
Contributor Author

bdieu178 commented Jul 25, 2018

@Bad-Science @huangmj7 I'm confident I resolved issues #7 & #8. However, I need to format the JSON message. The EventStream class is temporarily printing data to the command line. Right now, I'm making sure to write the data to the websocket.

notifications_1  | Consumer message sent to websocket
notifications_1  | I, [2018-07-25T01:01:40.510116 #5]  INFO -- : Inline processing of topic course_change with 1 messages took 0.2 ms
notifications_1  | I, [2018-07-25T01:01:40.510164 #5]  INFO -- : 1 messages on course_change topic delegated to CourseConsumer
notifications_1  | {"parser"=>Karafka::Parsers::Json, "partition"=>0, "offset"=>374, "key"=>nil, "create_time"=>2018-07-25 01:01:38 +0000, "receive_time"=>2018-07-25 01:01:40 +00
00, "topic"=>"course_change", "parsed"=>true, "department_id"=>5, "name"=>"Writing And Response", "number"=>4380, "min_credits"=>4, "max_credits"=>4, "created_at"=>"2018-07-25T01
:01:38.737Z", "updated_at"=>"2018-07-25T01:01:38.737Z", "description"=>"", "uuid"=>"cea03174-7f3a-4d8b-b77d-dd51d379bb4a", "tags"=>[]}
notifications_1  | Consumer message sent to websocket
notifications_1  | I, [2018-07-25T01:01:40.510404 #5]  INFO -- : Inline processing of topic course_change with 1 messages took 0.11 ms
notifications_1  | I, [2018-07-25T01:01:40.510433 #5]  INFO -- : 1 messages on course_change topic delegated to CourseConsumer
notifications_1  | {"parser"=>Karafka::Parsers::Json, "partition"=>0, "offset"=>375, "key"=>nil, "create_time"=>2018-07-25 01:01:39 +0000, "receive_time"=>2018-07-25 01:01:40 +00
00, "topic"=>"course_change", "parsed"=>true, "department_id"=>5, "name"=>"Information Design", "number"=>4470, "min_credits"=>4, "max_credits"=>4, "created_at"=>"2018-07-25T01:0
1:39.047Z", "updated_at"=>"2018-07-25T01:01:39.047Z", "description"=>"", "uuid"=>"5337d942-e13e-42a1-b252-32b66186d8d7", "tags"=>[]}
notifications_1  | Consumer message sent to websocket
notifications_1  | I, [2018-07-25T01:01:40.510664 #5]  INFO -- : Inline processing of topic course_change with 1 messages took 0.12 ms
notifications_1  | I, [2018-07-25T01:01:40.510694 #5]  INFO -- : 1 messages on course_change topic delegated to CourseConsumer
notifications_1  | {"parser"=>Karafka::Parsers::Json, "partition"=>0, "offset"=>376, "key"=>nil, "create_time"=>2018-07-25 01:01:39 +0000, "receive_time"=>2018-07-25 01:01:40 +00
00, "topic"=>"course_change", "parsed"=>true, "department_id"=>5, "name"=>"Reality Tv Post Factual Media", "number"=>4530, "min_credits"=>4, "max_credits"=>4, "created_at"=>"2018
-07-25T01:01:39.207Z", "updated_at"=>"2018-07-25T01:01:39.207Z", "description"=>"", "uuid"=>"cefd3a09-a469-4d82-b810-e5efc8ed99e4", "tags"=>[]}
notifications_1  | Consumer message sent to websocket
notifications_1  | I, [2018-07-25T01:01:45.040927 #5]  INFO -- : Inline processing of topic section_change with 1 messages took 0.28 ms
notifications_1  | I, [2018-07-25T01:01:45.040976 #5]  INFO -- : 1 messages on section_change topic delegated to SectionConsumer
notifications_1  | {"parser"=>Karafka::Parsers::Json, "partition"=>0, "offset"=>759, "key"=>nil, "create_time"=>2018-07-25 01:01:44 +0000, "receive_time"=>2018-07-25 01:01:45 +0000, "topic"=>"section_change", "parsed"=>true, "name"=>"03", "crn"=>51429, "course_id"=>367, "seats"=>35, "seats_taken"=>25, "created_at"=>"2018-07-25T01:01:44.287Z", "updated_at"=>"2018-07-25T01:01:44.287Z", "num_periods"=>4, "instructors"=>[], "conflicts"=>[5, 6, 7, 8, 9, 10, 11, 12, 22, 23, 30, 48, 75, 86, 89, 91, 93, 102, 111, 112, 114, 126, 139, 140, 144, 147, 153, 163, 181, 183, 195, 199, 200, 203, 216, 217, 243, 257, 258, 288, 289, 290, 291, 292, 293, 294, 295, 296, 297, 302, 303, 304, 305, 311, 313, 319, 320, 322, 327, 331, 381, 382, 388, 391, 402, 413, 414, 471, 472, 473, 474, 475, 476, 477, 478, 482, 485, 486, 495, 496, 497, 508, 514, 521, 562, 563, 565, 568, 571, 573, 616, 621, 623, 626, 638, 674, 675, 679, 693, 694, 725, 734, 744, 745, 746, 747, 748, 749, 750, 751, 752, 753], "uuid"=>"eac0b92a-7c31-4dbb-8c71-55f0378da4f4"}
notifications_1  | Consumer message sent to websocket
notifications_1  | I, [2018-07-25T01:01:45.041441 #5]  INFO -- : Inline processing of topic section_change with 1 messages took 0.23 ms
notifications_1  | I, [2018-07-25T01:01:45.041485 #5]  INFO -- : 1 messages on section_change topic delegated to SectionConsumer
notifications_1  | {"parser"=>Karafka::Parsers::Json, "partition"=>0, "offset"=>760, "key"=>nil, "create_time"=>2018-07-25 01:01:44 +0000, "receive_time"=>2018-07-25 01:01:45 +0000, "topic"=>"section_change", "parsed"=>true, "name"=>"04", "crn"=>51430, "course_id"=>367, "seats"=>35, "seats_taken"=>33, "created_at"=>"2018-07-25T01:01:44.531Z", "updated_at"=>"2018-07-25T01:01:44.531Z", "num_periods"=>4, "instructors"=>[], "conflicts"=>[5, 6, 7, 8, 9, 10, 11, 12, 22, 23, 30, 48, 75, 86, 89, 91, 93, 102, 111, 112, 114, 126, 139, 140, 144, 147, 153, 163, 181, 183, 195, 199, 200, 203, 216, 217, 243, 257, 258, 288, 289, 290, 291, 292, 293, 294, 295, 296, 297, 302, 303, 304, 305, 311, 313, 319, 320, 322, 327, 331, 381, 382, 388, 391, 402, 413, 414, 471, 472, 473, 474, 475, 476, 477, 478, 482, 485, 486, 495, 496, 497, 508, 514, 521, 562, 563, 565, 568, 571, 573, 616, 621, 623, 626, 638, 674, 675, 679, 693, 694, 725, 734, 744, 745, 746, 747, 748, 749, 750, 751, 752, 753], "uuid"=>"519529a9-fe56-40c9-8469-541604cf6045"}
notifications_1  | Consumer message sent to websocket
notifications_1  | I, [2018-07-25T01:01:45.041921 #5]  INFO -- : Inline processing of topic section_change with 1 messages took 0.27 ms
notifications_1  | I, [2018-07-25T01:01:45.041972 #5]  INFO -- : 1 messages on section_change topic delegated to SectionConsumer
notifications_1  | {"parser"=>Karafka::Parsers::Json, "partition"=>0, "offset"=>761, "key"=>nil, "create_time"=>2018-07-25 01:01:45 +0000, "receive_time"=>2018-07-25 01:01:45 +0000, "topic"=>"section_change", "parsed"=>true, "name"=>"05", "crn"=>51431, "course_id"=>367, "seats"=>35, "seats_taken"=>26, "created_at"=>"2018-07-25T01:01:44.767Z", "updated_at"=>"2018-07-25T01:01:44.767Z", "num_periods"=>4, "instructors"=>[], "conflicts"=>[22, 23, 30, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 54, 60, 61, 64, 65, 66, 67, 70, 71, 72,
75, 76, 77, 78, 79, 86, 89, 91, 93, 101, 102, 110, 111, 112, 114, 121, 122, 126, 139, 140, 144, 147, 153, 163, 181, 183, 195, 199, 200, 203, 212, 216, 217, 239, 240, 243, 257, 258, 288, 289, 294, 295, 296, 297, 302, 303, 306, 307, 311, 313, 319, 320, 322, 327, 331, 340, 381, 382, 388, 391, 402, 413, 418, 471, 482, 485, 486, 495, 496, 497, 508, 514, 521,
522, 557, 562, 563, 564, 568, 571, 573, 574, 575, 618, 621, 623, 626, 634, 674, 675, 679, 694, 725, 734, 744, 745, 746, 747, 748, 749, 750, 751, 752, 753], "uuid"=>"bdd54f9b-77a3-42df-93ce-fdc5f4f7b968"}

@bdieu178
Copy link
Contributor Author

bdieu178 commented Jul 25, 2018

Hi @Bad-Science, @huangmj7 pointed out that we can have one method within our Plezi class read-in data from the Kafka Consumer class and use the build in Plezi Method write to send that message to the web-socket connection.


class CourseConsumer < ApplicationConsumer
def consume
if ( Object.const_defined?('EventStream') == false )
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this check needed?

puts params #print out the single message received
end
def consume
if ( Object.const_defined?('EventStream') == false )
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, not sure why we need this check

if ( Object.const_defined?('EventStream') == false )
puts "EventStream class not initialized"
else
EventStream.on_message(params)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you cannot call on_message, it is called by plezi when a websocket client sends a message to the server, where data is the contents of the message

if ( Object.const_defined?('EventStream') == false )
puts "EventStream class not initialized"
else
EventStream.on_message(params)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You cannot call on_message, it is called internally by plezi only when the websocket server receives a message from the client.

@bdieu178
Copy link
Contributor Author

@Bad-Science I'm unsure whether we need the auto_dispatch setting to true.

@huangmj7
Copy link
Contributor

According to the Plezi.io : "To set up a route to accept websocket connections, our controller must either implement an on_message(data) callback OR be an auto_dispatch enabled controller (more about this powerful feature soon)." ([http://www.plezi.io/docs/websockets#what-are-websockets-skip-if-you-know]).

@bdieu178
Copy link
Contributor Author

bdieu178 commented Aug 7, 2018

Status so far: I'm confident that I have Kafka pushing messages to the iodine server and now I just need to fix some bugs so that the client side can view the messages from kafka pub/sub stream. @Bad-Science @huangmj7

notifications_1  | Consumer message sent to websocket
notifications_1  | I, [2018-08-07T00:54:51.384357 #6]  INFO -- : Inline processing of topic course_change with 1 messages took 3.84 ms
notifications_1  | I, [2018-08-07T00:54:51.384692 #6]  INFO -- : 1 messages on course_change topic delegated to CourseConsumer
notifications_1  | Consumer message sent to websocket
notifications_1  | I, [2018-08-07T00:54:51.387295 #6]  INFO -- : Inline processing of topic course_change with 1 messages took 2.14 ms
notifications_1  | I, [2018-08-07T00:54:51.387379 #6]  INFO -- : 1 messages on course_change topic delegated to CourseConsumer
notifications_1  | Consumer message sent to websocket
notifications_1  | I, [2018-08-07T00:54:53.451256 #6]  INFO -- : Inline processing of topic course_change with 1 messages took 4.11 ms
notifications_1  | I, [2018-08-07T00:54:53.460466 #6]  INFO -- : 1 messages on course_change topic delegated to CourseConsumer
notifications_1  | Consumer message sent to websocket
notifications_1  | I, [2018-08-07T00:54:54.408911 #6]  INFO -- : Inline processing of topic section_change with 1 messages took 1.63 ms
notifications_1  | I, [2018-08-07T00:54:54.409102 #6]  INFO -- : 1 messages on section_change topic delegated to SectionConsumer
notifications_1  | Consumer message sent to websocket
notifications_1  | I, [2018-08-07T00:54:54.410566 #6]  INFO -- : Inline processing of topic section_change with 1 messages took 0.8 ms
notifications_1  | I, [2018-08-07T00:54:54.410693 #6]  INFO -- : 1 messages on section_change topic delegated to SectionConsumer
notifications_1  | Consumer message sent to websocket
notifications_1  | I, [2018-08-07T00:54:54.412780 #6]  INFO -- : Inline processing of topic section_change with 1 messages took 1.27 ms
notifications_1  | I, [2018-08-07T00:54:54.413418 #6]  INFO -- : 1 messages on section_change topic delegated to SectionConsumer
* Listening on port 4860

Server is running 2 workers X 2 threads, press ^C to stop
Running Plezi version: 0.15.1

Starting up Iodine HTTP Server on port 4860:
 * Ruby v.2.5.1
 * Iodine v.0.4.19 
 * 1048576 max concurrent connections / open files
 * Serving static files from ./public

* 6 is running.
* 14 is running.
172.18.0.1 - - [Tue, 6 Aug 2018 03:14:14 GMT] "GET /notifications HTTP/1.1" 101 -- 300ms
Iodine caught an unprotected exception - LocalJumpError: no block given
/usr/src/app/app/controllers/eventstream.rb:12:in `subscribe'
/usr/src/app/app/controllers/eventstream.rb:12:in `on_open'
Iodine caught an unprotected exception - TypeError: wrong argument type String (expected Hash)
/usr/src/app/app/controllers/eventstream.rb:26:in `publish'
/usr/src/app/app/controllers/eventstream.rb:26:in `on_close'

@bdieu178
Copy link
Contributor Author

bdieu178 commented Aug 7, 2018

@Bad-Science Next TODO:

  • Parse & format Karafka::Params obj according to JSON:API formatting

@Bad-Science Bad-Science merged commit 0808acc into YACS-RCOS:master Sep 29, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants