From 0fa149c256e934218c188808d48980376e0e73d9 Mon Sep 17 00:00:00 2001 From: linsen Date: Tue, 15 Oct 2019 03:29:22 -0400 Subject: [PATCH] Restructure. --- .gitignore | 40 +++- LICENSE | 201 ------------------ README.md | 104 --------- dub.json | 55 +---- examples/chatroom/chatroom.d | 31 --- examples/chatroom/chatroom.proto | 28 --- examples/chatroom/client/client.d | 129 ----------- examples/chatroom/command.d | 12 -- examples/chatroom/server/server.d | 101 --------- examples/client/dub.json | 12 ++ examples/client/source/client.d | 51 +++++ .../client/source/command/SayHelloCommand.d | 26 +++ examples/client/source/common/Commands.d | 6 + .../source/common}/helloworld.d | 2 +- examples/{helloworld => }/helloworld.proto | 0 examples/helloworld/bootstrap.d | 75 ------- examples/helloworld/command.d | 11 - examples/server/dub.json | 12 ++ .../server/source/command/HttpGetRequest.d | 26 +++ .../server/source/command/SayHelloCommand.d | 30 +++ examples/server/source/common/Commands.d | 6 + examples/server/source/common/helloworld.d | 19 ++ examples/server/source/server.d | 22 ++ source/hunt/imf/Command.d | 10 + source/hunt/imf/ConnectBase.d | 45 ++++ source/hunt/imf/ConnectionEventBaseHandler.d | 40 ++++ source/hunt/imf/ConnectionManager.d | 102 +++++++++ source/hunt/imf/EvBuffer.d | 76 +++++++ source/hunt/imf/GatewayApplication.d | 76 +++++++ source/hunt/imf/MessageBuffer.d | 49 +++++ source/hunt/imf/ParserBase.d | 168 +++++++++++++++ source/hunt/imf/Router.d | 30 +++ source/hunt/imf/clients/GatewayClient.d | 25 +++ source/hunt/imf/clients/GatewayHttpClient.d | 80 +++++++ source/hunt/imf/clients/GatewayTcpClient.d | 68 ++++++ .../hunt/imf/clients/GatewayWebSocketClient.d | 127 +++++++++++ source/hunt/imf/core/application.d | 83 -------- source/hunt/imf/core/dispatcher.d | 47 ---- source/hunt/imf/core/routing.d | 103 --------- source/hunt/imf/core/task.d | 98 --------- source/hunt/imf/io/client.d | 80 ------- source/hunt/imf/io/clientext.d | 90 -------- source/hunt/imf/io/context.d | 93 -------- source/hunt/imf/io/server.d | 72 ------- source/hunt/imf/package.d | 16 -- source/hunt/imf/protocol/Protocol.d | 20 ++ source/hunt/imf/protocol/http/HttpCodec.d | 31 +++ .../hunt/imf/protocol/http/HttpConnection.d | 53 +++++ .../http/HttpConnectionEventHandler.d | 91 ++++++++ source/hunt/imf/protocol/http/HttpDecoder.d | 35 +++ source/hunt/imf/protocol/http/HttpEncoder.d | 99 +++++++++ source/hunt/imf/protocol/http/HttpProtocol.d | 62 ++++++ source/hunt/imf/protocol/packet.d | 49 ----- source/hunt/imf/protocol/parser.d | 80 ------- .../imf/protocol/protobuf/ProtobufCodec.d | 29 +++ .../imf/protocol/protobuf/ProtobufDecoder.d | 38 ++++ .../imf/protocol/protobuf/ProtobufEncoder.d | 38 ++++ .../imf/protocol/protobuf/ProtobufProtocol.d | 77 +++++++ .../protocol/protobuf/ProtobufTcpConnection.d | 59 +++++ .../protobuf/TcpConnectionEventHandler.d | 88 ++++++++ source/hunt/imf/protocol/websocket/WsCodec.d | 35 +++ .../imf/protocol/websocket/WsConnection.d | 58 +++++ .../websocket/WsConnectionEventHandler.d | 100 +++++++++ .../hunt/imf/protocol/websocket/WsProtocol.d | 106 +++++++++ source/hunt/imf/utils/element.d | 58 ----- source/hunt/imf/utils/room.d | 105 --------- source/hunt/imf/utils/singleton.d | 23 -- 67 files changed, 2170 insertions(+), 1741 deletions(-) delete mode 100644 LICENSE delete mode 100644 README.md delete mode 100644 examples/chatroom/chatroom.d delete mode 100644 examples/chatroom/chatroom.proto delete mode 100644 examples/chatroom/client/client.d delete mode 100644 examples/chatroom/command.d delete mode 100644 examples/chatroom/server/server.d create mode 100644 examples/client/dub.json create mode 100644 examples/client/source/client.d create mode 100644 examples/client/source/command/SayHelloCommand.d create mode 100644 examples/client/source/common/Commands.d rename examples/{helloworld => client/source/common}/helloworld.d (91%) rename examples/{helloworld => }/helloworld.proto (100%) delete mode 100644 examples/helloworld/bootstrap.d delete mode 100644 examples/helloworld/command.d create mode 100644 examples/server/dub.json create mode 100644 examples/server/source/command/HttpGetRequest.d create mode 100644 examples/server/source/command/SayHelloCommand.d create mode 100644 examples/server/source/common/Commands.d create mode 100644 examples/server/source/common/helloworld.d create mode 100644 examples/server/source/server.d create mode 100644 source/hunt/imf/Command.d create mode 100644 source/hunt/imf/ConnectBase.d create mode 100644 source/hunt/imf/ConnectionEventBaseHandler.d create mode 100644 source/hunt/imf/ConnectionManager.d create mode 100644 source/hunt/imf/EvBuffer.d create mode 100644 source/hunt/imf/GatewayApplication.d create mode 100644 source/hunt/imf/MessageBuffer.d create mode 100644 source/hunt/imf/ParserBase.d create mode 100644 source/hunt/imf/Router.d create mode 100644 source/hunt/imf/clients/GatewayClient.d create mode 100644 source/hunt/imf/clients/GatewayHttpClient.d create mode 100644 source/hunt/imf/clients/GatewayTcpClient.d create mode 100644 source/hunt/imf/clients/GatewayWebSocketClient.d delete mode 100644 source/hunt/imf/core/application.d delete mode 100644 source/hunt/imf/core/dispatcher.d delete mode 100644 source/hunt/imf/core/routing.d delete mode 100644 source/hunt/imf/core/task.d delete mode 100644 source/hunt/imf/io/client.d delete mode 100644 source/hunt/imf/io/clientext.d delete mode 100644 source/hunt/imf/io/context.d delete mode 100644 source/hunt/imf/io/server.d delete mode 100644 source/hunt/imf/package.d create mode 100644 source/hunt/imf/protocol/Protocol.d create mode 100644 source/hunt/imf/protocol/http/HttpCodec.d create mode 100644 source/hunt/imf/protocol/http/HttpConnection.d create mode 100644 source/hunt/imf/protocol/http/HttpConnectionEventHandler.d create mode 100644 source/hunt/imf/protocol/http/HttpDecoder.d create mode 100644 source/hunt/imf/protocol/http/HttpEncoder.d create mode 100644 source/hunt/imf/protocol/http/HttpProtocol.d delete mode 100644 source/hunt/imf/protocol/packet.d delete mode 100644 source/hunt/imf/protocol/parser.d create mode 100644 source/hunt/imf/protocol/protobuf/ProtobufCodec.d create mode 100644 source/hunt/imf/protocol/protobuf/ProtobufDecoder.d create mode 100644 source/hunt/imf/protocol/protobuf/ProtobufEncoder.d create mode 100644 source/hunt/imf/protocol/protobuf/ProtobufProtocol.d create mode 100644 source/hunt/imf/protocol/protobuf/ProtobufTcpConnection.d create mode 100644 source/hunt/imf/protocol/protobuf/TcpConnectionEventHandler.d create mode 100644 source/hunt/imf/protocol/websocket/WsCodec.d create mode 100644 source/hunt/imf/protocol/websocket/WsConnection.d create mode 100644 source/hunt/imf/protocol/websocket/WsConnectionEventHandler.d create mode 100644 source/hunt/imf/protocol/websocket/WsProtocol.d delete mode 100644 source/hunt/imf/utils/element.d delete mode 100644 source/hunt/imf/utils/room.d delete mode 100644 source/hunt/imf/utils/singleton.d diff --git a/.gitignore b/.gitignore index c06e1a1..3f3c0c9 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,34 @@ -/.dub -/chatclient -/chatserver -/dub.selections.json -/helloworld -/libhunt-imf.a +# Visual Studio Code +.vscode/ +.idea/ +hunt-fw.iml + +# Compiled Object files +*.o +*.obj + +# Compiled Dynamic libraries +*.so +*.dylib +*.dll + +# Compiled Static libraries +*.a +*.lib + +# Executables +*.exe + +# DUB +.dub +dub.*.json +docs.json +__dummy.html +docs/ + +# Code coverage +*.lst + +# Examples +examples/client/client +examples/server/server diff --git a/LICENSE b/LICENSE deleted file mode 100644 index 261eeb9..0000000 --- a/LICENSE +++ /dev/null @@ -1,201 +0,0 @@ - Apache License - Version 2.0, January 2004 - http://www.apache.org/licenses/ - - TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - - 1. Definitions. - - "License" shall mean the terms and conditions for use, reproduction, - and distribution as defined by Sections 1 through 9 of this document. - - "Licensor" shall mean the copyright owner or entity authorized by - the copyright owner that is granting the License. - - "Legal Entity" shall mean the union of the acting entity and all - other entities that control, are controlled by, or are under common - control with that entity. For the purposes of this definition, - "control" means (i) the power, direct or indirect, to cause the - direction or management of such entity, whether by contract or - otherwise, or (ii) ownership of fifty percent (50%) or more of the - outstanding shares, or (iii) beneficial ownership of such entity. - - "You" (or "Your") shall mean an individual or Legal Entity - exercising permissions granted by this License. - - "Source" form shall mean the preferred form for making modifications, - including but not limited to software source code, documentation - source, and configuration files. - - "Object" form shall mean any form resulting from mechanical - transformation or translation of a Source form, including but - not limited to compiled object code, generated documentation, - and conversions to other media types. - - "Work" shall mean the work of authorship, whether in Source or - Object form, made available under the License, as indicated by a - copyright notice that is included in or attached to the work - (an example is provided in the Appendix below). - - "Derivative Works" shall mean any work, whether in Source or Object - form, that is based on (or derived from) the Work and for which the - editorial revisions, annotations, elaborations, or other modifications - represent, as a whole, an original work of authorship. For the purposes - of this License, Derivative Works shall not include works that remain - separable from, or merely link (or bind by name) to the interfaces of, - the Work and Derivative Works thereof. - - "Contribution" shall mean any work of authorship, including - the original version of the Work and any modifications or additions - to that Work or Derivative Works thereof, that is intentionally - submitted to Licensor for inclusion in the Work by the copyright owner - or by an individual or Legal Entity authorized to submit on behalf of - the copyright owner. For the purposes of this definition, "submitted" - means any form of electronic, verbal, or written communication sent - to the Licensor or its representatives, including but not limited to - communication on electronic mailing lists, source code control systems, - and issue tracking systems that are managed by, or on behalf of, the - Licensor for the purpose of discussing and improving the Work, but - excluding communication that is conspicuously marked or otherwise - designated in writing by the copyright owner as "Not a Contribution." - - "Contributor" shall mean Licensor and any individual or Legal Entity - on behalf of whom a Contribution has been received by Licensor and - subsequently incorporated within the Work. - - 2. Grant of Copyright License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - copyright license to reproduce, prepare Derivative Works of, - publicly display, publicly perform, sublicense, and distribute the - Work and such Derivative Works in Source or Object form. - - 3. Grant of Patent License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - (except as stated in this section) patent license to make, have made, - use, offer to sell, sell, import, and otherwise transfer the Work, - where such license applies only to those patent claims licensable - by such Contributor that are necessarily infringed by their - Contribution(s) alone or by combination of their Contribution(s) - with the Work to which such Contribution(s) was submitted. If You - institute patent litigation against any entity (including a - cross-claim or counterclaim in a lawsuit) alleging that the Work - or a Contribution incorporated within the Work constitutes direct - or contributory patent infringement, then any patent licenses - granted to You under this License for that Work shall terminate - as of the date such litigation is filed. - - 4. Redistribution. You may reproduce and distribute copies of the - Work or Derivative Works thereof in any medium, with or without - modifications, and in Source or Object form, provided that You - meet the following conditions: - - (a) You must give any other recipients of the Work or - Derivative Works a copy of this License; and - - (b) You must cause any modified files to carry prominent notices - stating that You changed the files; and - - (c) You must retain, in the Source form of any Derivative Works - that You distribute, all copyright, patent, trademark, and - attribution notices from the Source form of the Work, - excluding those notices that do not pertain to any part of - the Derivative Works; and - - (d) If the Work includes a "NOTICE" text file as part of its - distribution, then any Derivative Works that You distribute must - include a readable copy of the attribution notices contained - within such NOTICE file, excluding those notices that do not - pertain to any part of the Derivative Works, in at least one - of the following places: within a NOTICE text file distributed - as part of the Derivative Works; within the Source form or - documentation, if provided along with the Derivative Works; or, - within a display generated by the Derivative Works, if and - wherever such third-party notices normally appear. The contents - of the NOTICE file are for informational purposes only and - do not modify the License. You may add Your own attribution - notices within Derivative Works that You distribute, alongside - or as an addendum to the NOTICE text from the Work, provided - that such additional attribution notices cannot be construed - as modifying the License. - - You may add Your own copyright statement to Your modifications and - may provide additional or different license terms and conditions - for use, reproduction, or distribution of Your modifications, or - for any such Derivative Works as a whole, provided Your use, - reproduction, and distribution of the Work otherwise complies with - the conditions stated in this License. - - 5. Submission of Contributions. Unless You explicitly state otherwise, - any Contribution intentionally submitted for inclusion in the Work - by You to the Licensor shall be under the terms and conditions of - this License, without any additional terms or conditions. - Notwithstanding the above, nothing herein shall supersede or modify - the terms of any separate license agreement you may have executed - with Licensor regarding such Contributions. - - 6. Trademarks. This License does not grant permission to use the trade - names, trademarks, service marks, or product names of the Licensor, - except as required for reasonable and customary use in describing the - origin of the Work and reproducing the content of the NOTICE file. - - 7. Disclaimer of Warranty. Unless required by applicable law or - agreed to in writing, Licensor provides the Work (and each - Contributor provides its Contributions) on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - implied, including, without limitation, any warranties or conditions - of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A - PARTICULAR PURPOSE. You are solely responsible for determining the - appropriateness of using or redistributing the Work and assume any - risks associated with Your exercise of permissions under this License. - - 8. Limitation of Liability. In no event and under no legal theory, - whether in tort (including negligence), contract, or otherwise, - unless required by applicable law (such as deliberate and grossly - negligent acts) or agreed to in writing, shall any Contributor be - liable to You for damages, including any direct, indirect, special, - incidental, or consequential damages of any character arising as a - result of this License or out of the use or inability to use the - Work (including but not limited to damages for loss of goodwill, - work stoppage, computer failure or malfunction, or any and all - other commercial damages or losses), even if such Contributor - has been advised of the possibility of such damages. - - 9. Accepting Warranty or Additional Liability. While redistributing - the Work or Derivative Works thereof, You may choose to offer, - and charge a fee for, acceptance of support, warranty, indemnity, - or other liability obligations and/or rights consistent with this - License. However, in accepting such obligations, You may act only - on Your own behalf and on Your sole responsibility, not on behalf - of any other Contributor, and only if You agree to indemnify, - defend, and hold each Contributor harmless for any liability - incurred by, or claims asserted against, such Contributor by reason - of your accepting any such warranty or additional liability. - - END OF TERMS AND CONDITIONS - - APPENDIX: How to apply the Apache License to your work. - - To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "[]" - replaced with your own identifying information. (Don't include - the brackets!) The text should be enclosed in the appropriate - comment syntax for the file format. We also recommend that a - file or class name and description of purpose be included on the - same "printed page" as the copyright notice for easier - identification within third-party archives. - - Copyright [yyyy] [name of copyright owner] - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. diff --git a/README.md b/README.md deleted file mode 100644 index 908390c..0000000 --- a/README.md +++ /dev/null @@ -1,104 +0,0 @@ -# hunt-imf -## Introduction -hunt-imf is a tcp-based instant messaging framework and can be used for Push service, Chat service, Game service, etc. -* serializes messages using [protobuf](https://github.com/protobuf) -* uses binary protocol (type & length) -* implements message distribution, message routing, and network IO -* ensure that all messages in the same context are serialized - -## Depends -* [protobuf](https://github.com/dcarp/protobuf-d) -* [hunt-net](https://github.com/huntlabs/hunt-net) - -## Tools -* `protoc` *apt-get install protoc-compile* -* [protoc-gen-d](https://github.com/dcarp/protobuf-d/tree/master/protoc_gen_d) *plugin generates D code for proto3 .proto files only* - -## examples -* `helloworld` *An executable program containing the server and the client* -* `chatroom` *two executable programs include chatclient and chatserver* - -## build -* `hunt-imf` *dub build * -* `helloworld` *dub build -c=helloworld* -* `chatclient` *dub build -c=chatclient* -* `chatserver` *dub build -c=chatserver* - -## Quick start -### Proto -* define a `.proto` file named `helloworld.proto`: -```proto -syntax = "proto3"; -package helloworld; - -// The request message containing the user's name. -message HelloRequest { - string name = 1; -} - -// The response message containing the greetings -message HelloReply { - string message = 1; -} - ``` -* using `protoc` and `protoc-gen-d` compiles `helloworld.proto` to `hellowrold.d`: -```shell -./protoc --plugin="protoc-gen-d" --d_out=~/example/ -I~/example/hellowrold ~/hellworld.proto -``` - -### COMMAND -define a `dlang` source file named command.d: - ```D - enum COMMAND - { - HELO_REQ = 1001, - HELO_RES = 1002, - } - ``` - - -### Controller -* define a server side control class: -```D -class ServerController -{ - mixin MakeRouter; - - @route(COMMAND.HELO_REQ) - void hello(HelloRequest request) - { - auto reply = new HelloReply(); - reply.message = "hello " ~ request.name; - sendMessage(context , Command.HELO_RES , reply); - - } -} -``` -* define a client size control class: -```D -class ClientController -{ - mixin MakeRouter; - - @route(Command.HELO_RES) - void hello(HelloReply reply) - { - writeln(reply.message); - } -} -``` - - -### Bootstrap -```D - auto app = new Application(); - - auto server = app.createServer("127.0.0.1" , 3003); - auto client = app.createClient("127.0.0.1" , 3003); - client.setOpenHandler((Context context){ - auto hello = new HelloRequest(); - hello.name = "world"; - context.sendMessage(Command.Q_HELO , hello); - }); - app.run(); -``` diff --git a/dub.json b/dub.json index 04d9cc2..db31cf2 100644 --- a/dub.json +++ b/dub.json @@ -1,47 +1,12 @@ { - "name": "hunt-imf", - "description": "A tcp-server framework.", - "copyright": "Copyright (c) 2017-2019, HuntLabs", - "homepage": "http://www.huntlabs.net", - "license": "Apache-2.0", - "dependencies": { - "protobuf": "~>0.5.0", - "hunt-net": "~>0.2.0" - }, - "configurations":[ - { - "name" : "hunt-imf", - "targetType": "library" - }, - { - "targetName": "helloworld", - "name": "helloworld", - "targetType": "executable", - "sourceFiles": [ - "./examples/helloworld/bootstrap.d", - "./examples/helloworld/command.d", - "./examples/helloworld/helloworld.d" - ] - }, - { - "targetName": "chatclient", - "name": "chatclient", - "targetType": "executable", - "sourceFiles": [ - "./examples/chatroom/chatroom.d", - "./examples/chatroom/command.d", - "./examples/chatroom/client/client.d" - ] - }, - { - "targetName": "chatserver", - "name": "chatserver", - "targetType": "executable", - "sourceFiles": [ - "./examples/chatroom/chatroom.d", - "./examples/chatroom/command.d", - "./examples/chatroom/server/server.d" - ] - } - ] + "name": "hunt-imf", + "description": "A tcp-server framework.", + "copyright": "Copyright (c) 2017-2019, HuntLabs", + "homepage": "http://www.huntlabs.net", + "license": "Apache-2.0", + "dependencies": { + "protobuf": "~>0.5.5", + "hunt-http": "~>0.4.0-beta.5", + "hunt-net": "~>0.4.2" + } } diff --git a/examples/chatroom/chatroom.d b/examples/chatroom/chatroom.d deleted file mode 100644 index cbfe4ea..0000000 --- a/examples/chatroom/chatroom.d +++ /dev/null @@ -1,31 +0,0 @@ -// Generated by the protocol buffer compiler. DO NOT EDIT! -// source: chatroom.proto - -module chatroom.chatroom; - -import google.protobuf; - -enum protocVersion = 3006001; - -class Login -{ - @Proto(1) string name = protoDefaultValue!string; -} - -class Msg -{ - @Proto(1) string name = protoDefaultValue!string; - @Proto(2) string message = protoDefaultValue!string; -} - -class LoginReply -{ - @Proto(1) LoginState status = protoDefaultValue!LoginState; - @Proto(2) string name = protoDefaultValue!string; - - enum LoginState - { - OK = 0, - FAIL = 1, - } -} diff --git a/examples/chatroom/chatroom.proto b/examples/chatroom/chatroom.proto deleted file mode 100644 index cdaf42d..0000000 --- a/examples/chatroom/chatroom.proto +++ /dev/null @@ -1,28 +0,0 @@ - - -syntax = "proto3"; - - - -package chatroom; - - -message Login{ - string name = 1; -} - -message Msg{ - string name = 1; - string message = 2; -} - -message LoginReply { - enum LoginState - { - OK = 0; - FAIL = 1; //用户名重复 - } - LoginState status = 1; - string name = 2; -} - diff --git a/examples/chatroom/client/client.d b/examples/chatroom/client/client.d deleted file mode 100644 index c9b5169..0000000 --- a/examples/chatroom/client/client.d +++ /dev/null @@ -1,129 +0,0 @@ -module chatroom.client.client; - -import hunt.imf; - -import chatroom.command; -import chatroom.chatroom; - -import std.stdio; -import std.string; - -import hunt.concurrency.thread; - -class ClientInfo { - __gshared string name; - __gshared Context context; -} - -class ClientService { - mixin MakeRouter; - - @route(Command.MESSAGE) - void recv(Msg message) { - writeln(message.name, " : ", message.message); - } - - @route(Command.R_LOGIN) - void mylogin(LoginReply login) { - ClientInfo.name = login.name; - if (login.status == LoginReply.LoginState.OK) - writeln("you have logined!"); - else - writeln("username repeated! change username relogin please!"); - } - - @route(Command.LOGIN) - void otherlogin(Login login) { - writeln(login.name, " login"); - } - - @route(Command.LOGOUT) - void otherlogout(Login login) { - writeln(login.name, " logout"); - } - -} - -//////////////////////////////////------///////////////////////////// - -void showHelp() { - writeln("clientchat:"); - writeln("1 login username"); - writeln("2 send message"); - writeln("3 help"); - writeln("4 quit"); -} - -void showPromt() { - write(">>>>"); -} - -void showError() { - writeln("input error"); -} - -string login(string name) { - auto login = new Login(); - login.name = name; - sendMessage(ClientInfo.context, Command.Q_LOGIN, login); - return name; -} - -void send(string m) { - auto msg = new Msg(); - msg.name = ClientInfo.name; - msg.message = m; - sendMessage(ClientInfo.context, Command.MESSAGE, msg); -} - -int main() { - string c; - showHelp(); - showPromt(); - - auto app = new Application(); - auto client = app.createClient("127.0.0.1", 3003); - client.setOpenHandler((Context context) { - writeln("connected to server!"); - ClientInfo.context = context; - }); - app.run(50); - - while ((c = strip(readln())) != "quit") { - string[] params = c.split(" "); - if (params.length == 0) { - showPromt(); - continue; - } - - switch (params[0]) { - case "help": - showHelp(); - break; - case "login": - if (params.length < 2) { - showError(); - continue; - } - login(params[1]); - break; - case "send": - if (params.length < 2 || ClientInfo.name == string.init) { - showError(); - continue; - } - send(params[1]); - break; - default: - showHelp(); - break; - - } - showPromt(); - } - - app.stop(); - // thread_joinAll(); - - return 0; -} diff --git a/examples/chatroom/command.d b/examples/chatroom/command.d deleted file mode 100644 index b00b607..0000000 --- a/examples/chatroom/command.d +++ /dev/null @@ -1,12 +0,0 @@ -module chatroom.command; - - -enum Command -{ - Q_LOGIN = 1001, - R_LOGIN = 1002, - MESSAGE = 1003, - LOGIN = 1004, - LOGOUT = 1005 -} - diff --git a/examples/chatroom/server/server.d b/examples/chatroom/server/server.d deleted file mode 100644 index e87f42f..0000000 --- a/examples/chatroom/server/server.d +++ /dev/null @@ -1,101 +0,0 @@ -module chatroom.server.server; - -import hunt.imf; - -import chatroom.command; -import chatroom.chatroom; - -import std.stdio; -import std.string; - - - - -class UserInfo : Element -{ - this(Context context , string username) - { - super(context); - this.username = username; - } - string username; -} - -alias ChatRoom = Singleton!(Room!(string,UserInfo)); - -class ChatService -{ - mixin MakeRouter; - - @route(Command.MESSAGE) - void recv(Msg message) - { - ChatRoom.instance.broadCast(Command.MESSAGE , message); - } - - @route(Command.Q_LOGIN) - void mylogin(Login login) - { - ChatRoom.instance.findEx(login.name , - (UserInfo info){ - if(info is null) - { - auto user = new UserInfo(context , login.name); - ChatRoom.instance.add(login.name, user); - - /// notify onlines this one login , except this one. - ChatRoom.instance.broadCast(Command.LOGIN , login , login.name); - - /// rely to this one login suc. - auto reply = new LoginReply(); - reply.status = LoginReply.LoginState.OK; - reply.name = login.name; - context.sendMessage(Command.R_LOGIN , reply); - - /// set context bind - context.setAttachment(user); - - writeln(login.name , " login"); - } - else - { - auto reply = new LoginReply(); - reply.name = login.name; - reply.status = LoginReply.LoginState.FAIL; - sendMessage(context , Command.R_LOGIN , reply); - } - }); - - - } - -} - - -int main() -{ - auto app = new Application(); - - auto server = app.createServer("0.0.0.0" , 3003); - server.setCloseHandler((Context context){ - auto user = cast(UserInfo)context.getAttachment(); - if( user !is null) - { - /// clear attach - context.setAttachment(null); - - auto login = new Login(); - login.name = user.username; - - /// remove from chatroom - ChatRoom.instance.remove(login.name); - - /// notify to all users - ChatRoom.instance.broadCast(Command.LOGOUT,login); - - writeln(user.username ~ " logout"); - } - }); - app.run(); - return 0; -} \ No newline at end of file diff --git a/examples/client/dub.json b/examples/client/dub.json new file mode 100644 index 0000000..ab5d5f3 --- /dev/null +++ b/examples/client/dub.json @@ -0,0 +1,12 @@ +{ + "authors": [ + "linsen" + ], + "description": "A minimal D application.", + "license": "proprietary", + "name": "client", + "targetType": "executable", + "dependencies":{ + "hunt-imf" : {"path": "../../"} + } +} \ No newline at end of file diff --git a/examples/client/source/client.d b/examples/client/source/client.d new file mode 100644 index 0000000..6db6ce5 --- /dev/null +++ b/examples/client/source/client.d @@ -0,0 +1,51 @@ +import std.stdio; + +import hunt.imf.protocol.Protocol; +import hunt.imf.protocol.protobuf.ProtobufProtocol; +import common.Commands; +import common.helloworld; +import google.protobuf; +import std.array; +import hunt.imf.clients.GatewayClient; +import hunt.imf.clients.GatewayWebSocketClient; +import hunt.imf.protocol.websocket.WsProtocol; +import hunt.imf.clients.GatewayHttpClient; +import hunt.imf.protocol.http.HttpProtocol; +import hunt.imf.clients.GatewayTcpClient; +import hunt.imf.ParserBase; +import core.thread; +import hunt.logging; +void main() +{ + auto req = new HelloRequest (); + req.name = "1234567890abcdefjhijklmnopqrstuvwxyz"; + + WsProtocol ws = new WsProtocol("127.0.0.1",18181); + GatewayWebSocketClient wsclient = new GatewayWebSocketClient(ws); + wsclient.connect(); + + wsclient.sendMsg(Commands.SayHelloReq,req); + +//------------------------------------------------------------------- + + ProtobufProtocol tcp = new ProtobufProtocol("127.0.0.1",12001); + GatewayTcpClient tcpclient = new GatewayTcpClient(tcp); + tcpclient.connect(); + + tcpclient.sendMsg(Commands.SayHelloReq,req); + +//---------------------------------------------------------------- + HttpProtocol http = new HttpProtocol("127.0.0.1",18080); + GatewayHttpClient httpclient = new GatewayHttpClient(http); + httpclient.connect(); + + HttpContent content; + content.path = "/test"; + content.parameters["a"] = "123"; + content.parameters["b"] = "456"; + content.method = "POST"; + content.body = "1234567890abcdefjhijklmnopqrstuvwxyz"; + httpclient.sendMsg(content); + + getchar(); +} diff --git a/examples/client/source/command/SayHelloCommand.d b/examples/client/source/command/SayHelloCommand.d new file mode 100644 index 0000000..0647875 --- /dev/null +++ b/examples/client/source/command/SayHelloCommand.d @@ -0,0 +1,26 @@ +module command.SayHelloCommand; + +import common.helloworld; +import hunt.imf.Command; +import hunt.net; +import hunt.imf.ConnectBase; +import hunt.imf.MessageBuffer; +import google.protobuf; +import hunt.imf.Router; +import common.Commands; +import std.array; +import std.stdio; + +class SayHelloCommand : Command +{ + void execute (ConnectBase connection,MessageBuffer msg) + { + auto resp = new HelloReply(); + msg.message.fromProtobuf!HelloReply(resp); + writefln("%s",resp.message); + } +} + +shared static this () { + Router.instance().registerProcessHandler!SayHelloCommand(Commands.SayHelloResp); +} \ No newline at end of file diff --git a/examples/client/source/common/Commands.d b/examples/client/source/common/Commands.d new file mode 100644 index 0000000..5b3b825 --- /dev/null +++ b/examples/client/source/common/Commands.d @@ -0,0 +1,6 @@ +module common.Commands; + +enum Commands { + SayHelloReq = 49, + SayHelloResp = 51 +} \ No newline at end of file diff --git a/examples/helloworld/helloworld.d b/examples/client/source/common/helloworld.d similarity index 91% rename from examples/helloworld/helloworld.d rename to examples/client/source/common/helloworld.d index b67e97c..ff7c025 100644 --- a/examples/helloworld/helloworld.d +++ b/examples/client/source/common/helloworld.d @@ -1,7 +1,7 @@ // Generated by the protocol buffer compiler. DO NOT EDIT! // source: examples/protos/helloworld.proto -module helloworld.helloworld; +module common.helloworld; import google.protobuf; diff --git a/examples/helloworld/helloworld.proto b/examples/helloworld.proto similarity index 100% rename from examples/helloworld/helloworld.proto rename to examples/helloworld.proto diff --git a/examples/helloworld/bootstrap.d b/examples/helloworld/bootstrap.d deleted file mode 100644 index 0380168..0000000 --- a/examples/helloworld/bootstrap.d +++ /dev/null @@ -1,75 +0,0 @@ - -import hunt.imf; - -import helloworld.command; -import helloworld.helloworld; - -import std.stdio; - -import hunt.concurrency.thread; - -class ServerService -{ - mixin MakeRouter; - - @route(Command.Q_HELO) - void hello(HelloRequest request) - { - writeln("server hello " , getTid()); - auto reply = new HelloReply(); - reply.message = "hello " ~ request.name; - sendMessage(context , Command.R_HELO , reply); - - } - - - @route(Command.Q_HEART) - void heart() - { - writeln("server heart" , getTid()); - sendMessage(context , Command.Q_HEART); - } -} - -@namespace("client") -class ClientService -{ - mixin MakeRouter; - - @route(Command.R_HELO) - void hello(HelloReply reply) - { - writeln("client hello " , getTid()); - writeln(reply.message); - } - - @route(Command.Q_HEART) - void heart() - { - writeln("client heart " , getTid()); - writeln("recv heart"); - } -} - - - - - -int main() -{ - auto app = new Application(); - - auto server = app.createServer("127.0.0.1" , 3003); - auto client = app.createClient("127.0.0.1" , 3003 , "client"); - - client.setOpenHandler((Context context){ - auto hello = new HelloRequest(); - hello.name = "world"; - context.sendMessage(Command.Q_HEART); - context.sendMessage(Command.Q_HELO , hello); - context.sendMessage(Command.Q_HEART); - }); - - app.run(); - return 0; -} \ No newline at end of file diff --git a/examples/helloworld/command.d b/examples/helloworld/command.d deleted file mode 100644 index 88d5e1c..0000000 --- a/examples/helloworld/command.d +++ /dev/null @@ -1,11 +0,0 @@ -module helloworld.command; - - -enum Command -{ - Q_HELO = 1001, // for HelloRequest - R_HELO = 1002, // for HelloReply - Q_HEART = 1000, // for Heart -} - - diff --git a/examples/server/dub.json b/examples/server/dub.json new file mode 100644 index 0000000..e718f54 --- /dev/null +++ b/examples/server/dub.json @@ -0,0 +1,12 @@ +{ + "authors": [ + "linsen" + ], + "description": "A minimal D application.", + "license": "proprietary", + "name": "server", + "targetType": "executable", + "dependencies":{ + "hunt-imf" : {"path": "../../"} + } +} \ No newline at end of file diff --git a/examples/server/source/command/HttpGetRequest.d b/examples/server/source/command/HttpGetRequest.d new file mode 100644 index 0000000..628e6d7 --- /dev/null +++ b/examples/server/source/command/HttpGetRequest.d @@ -0,0 +1,26 @@ +module action.HttpGetRequest; + +import common.Commands; +import hunt.imf.Router; +import hunt.imf.Command; +import hunt.imf.ConnectBase; +import hunt.imf.MessageBuffer; +import hunt.imf.ParserBase; +import hunt.util.Serialize; +import hunt.logging; +class HttpGetRequest : Command { + void execute (ConnectBase connection,MessageBuffer msg) + { + HttpContent content = unserialize!HttpContent(cast(byte[])msg.message); + content.reset(); + content.status = 200; + content.body = "hello world " ~ content.body; + + MessageBuffer anser = new MessageBuffer(-1,cast(ubyte[])serialize!HttpContent(content)); + connection.sendMsg(anser); + } +} + +shared static this () { + Router.instance().registerProcessHandler!HttpGetRequest(cast(int)hashOf("/test")); +} diff --git a/examples/server/source/command/SayHelloCommand.d b/examples/server/source/command/SayHelloCommand.d new file mode 100644 index 0000000..d5225bc --- /dev/null +++ b/examples/server/source/command/SayHelloCommand.d @@ -0,0 +1,30 @@ +module command.SayHelloCommand; + +import hunt.imf.Command; +import hunt.imf.ConnectBase; +import hunt.imf.MessageBuffer; +import common.helloworld; +import hunt.net; +import common.Commands; +import hunt.imf.Router; +import google.protobuf; +import hunt.logging; +import std.array; + +class SayHelloCommand : Command { + + void execute (ConnectBase connection,MessageBuffer msg) + { + auto req = new HelloRequest(); + msg.message.fromProtobuf!HelloRequest(req); + + auto resp = new HelloReply(); + resp.message = "hello " ~ req.name; + MessageBuffer answer = new MessageBuffer(Commands.SayHelloResp,resp.toProtobuf.array); + connection.sendMsg(answer); + } +} + +shared static this () { + Router.instance().registerProcessHandler!SayHelloCommand(Commands.SayHelloReq); +} \ No newline at end of file diff --git a/examples/server/source/common/Commands.d b/examples/server/source/common/Commands.d new file mode 100644 index 0000000..5b3b825 --- /dev/null +++ b/examples/server/source/common/Commands.d @@ -0,0 +1,6 @@ +module common.Commands; + +enum Commands { + SayHelloReq = 49, + SayHelloResp = 51 +} \ No newline at end of file diff --git a/examples/server/source/common/helloworld.d b/examples/server/source/common/helloworld.d new file mode 100644 index 0000000..ff7c025 --- /dev/null +++ b/examples/server/source/common/helloworld.d @@ -0,0 +1,19 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: examples/protos/helloworld.proto + +module common.helloworld; + +import google.protobuf; + +enum protocVersion = 3005001; + +class HelloRequest +{ + @Proto(1) string name = protoDefaultValue!string; +} + +class HelloReply +{ + @Proto(1) string message = protoDefaultValue!string; +} + diff --git a/examples/server/source/server.d b/examples/server/source/server.d new file mode 100644 index 0000000..7b9554d --- /dev/null +++ b/examples/server/source/server.d @@ -0,0 +1,22 @@ +import std.stdio; +import hunt.net; +import hunt.logging; +import hunt.imf.protocol.protobuf.TcpConnectionEventHandler; +import hunt.imf.protocol.Protocol; +import hunt.imf.protocol.protobuf.ProtobufProtocol; +import hunt.imf.protocol.http.HttpProtocol; +import hunt.imf.GatewayApplication; +import hunt.imf.protocol.websocket.WsProtocol; +import core.thread; + +void main() +{ + GatewayApplication app = GatewayApplication.instance(); + ProtobufProtocol tcp = new ProtobufProtocol("0.0.0.0",12001); + HttpProtocol http = new HttpProtocol("0.0.0.0",18080); + WsProtocol ws = new WsProtocol("0.0.0.0",18181); + app.addServer(http); + app.addServer(tcp); + app.addServer(ws); + app.run(); +} diff --git a/source/hunt/imf/Command.d b/source/hunt/imf/Command.d new file mode 100644 index 0000000..e96cf02 --- /dev/null +++ b/source/hunt/imf/Command.d @@ -0,0 +1,10 @@ +module hunt.imf.Command; + +import hunt.net; +import hunt.imf.ConnectBase; +import hunt.imf.MessageBuffer; +import hunt.imf.Router; + +interface Command { + void execute (ConnectBase connection,MessageBuffer message); +} \ No newline at end of file diff --git a/source/hunt/imf/ConnectBase.d b/source/hunt/imf/ConnectBase.d new file mode 100644 index 0000000..796f68f --- /dev/null +++ b/source/hunt/imf/ConnectBase.d @@ -0,0 +1,45 @@ +module hunt.imf.ConnectBase; +import std.bitmanip; +import hunt.net; +import hunt.imf.MessageBuffer; +import hunt.imf.Router; +import hunt.imf.ParserBase; +import hunt.util.Serialize; +import google.protobuf; +import std.stdint; +import hunt.imf.Command; +import hunt.logging; +import hunt.http.codec.websocket.frame; +import hunt.http.codec.websocket.stream.WebSocketConnection; + + +enum SESSION +{ + PROTOCOL = "PROTOCOL", + USER = "USER" +} + +class ConnectBase { + +public: + static void dispatchMessage(ConnectBase connection , MessageBuffer message ) + { + Command handler = Router.instance().getProcessHandler(message.messageId); + if (handler !is null) + { + handler.execute(connection,message); + } else { + logError("Unknown msgType %d",message.messageId ); + } + } + + abstract void sendMsg(MessageBuffer message) {} + + abstract string getProtocol() { return null;} + + abstract Connection getConnection() {return null;} + + abstract void close() {} + + abstract bool isConnected() {return false;} +} \ No newline at end of file diff --git a/source/hunt/imf/ConnectionEventBaseHandler.d b/source/hunt/imf/ConnectionEventBaseHandler.d new file mode 100644 index 0000000..e2ddbfd --- /dev/null +++ b/source/hunt/imf/ConnectionEventBaseHandler.d @@ -0,0 +1,40 @@ +module hunt.imf.ConnectionEventBaseHandler; + +import hunt.net; +import hunt.imf.ConnectBase; + +class ConnectionEventBaseHandler : ConnectionEventHandler +{ + alias ConnCallBack = void delegate( ConnectBase connection); + alias MsgCallBack = void delegate(Connection connection ,Object message); + + override + void connectionOpened(Connection connection) {} + + override + void connectionClosed(Connection connection) {} + + override + void messageReceived(Connection connection, Object message) {} + + override + void exceptionCaught(Connection connection, Throwable t) {} + + override + void failedOpeningConnection(int connectionId, Throwable t) { } + + override + void failedAcceptingConnection(int connectionId, Throwable t) { } + + void setOnConnection(ConnCallBack callback) + { + } + + void setOnClosed(ConnCallBack callback) + { + } + + void setOnMessage(MsgCallBack callback) + { + } +} \ No newline at end of file diff --git a/source/hunt/imf/ConnectionManager.d b/source/hunt/imf/ConnectionManager.d new file mode 100644 index 0000000..03ef650 --- /dev/null +++ b/source/hunt/imf/ConnectionManager.d @@ -0,0 +1,102 @@ +module hunt.imf.ConnectionManager; + +import hunt.collection.HashMap; +import hunt.imf.ConnectBase; +import hunt.imf.protocol.protobuf.ProtobufTcpConnection; +import hunt.imf.protocol.http.HttpConnection; +import hunt.imf.protocol.websocket.WsConnection; +import hunt.imf.protocol.protobuf.TcpConnectionEventHandler; +import hunt.imf.protocol.http.HttpConnectionEventHandler; +import hunt.imf.protocol.websocket.WsConnectionEventHandler; +import hunt.http.codec.websocket.stream.WebSocketConnection; +import hunt.net; +import hunt.logging; +import std.conv : to; + +class ConnectionManager(T) { + + alias CloseCallBack = void delegate(ConnectBase connection); + + private { + HashMap!(T,ConnectBase) _mapConns; + string _protocolName; + CloseCallBack _onClosed = null; + } + + this () + { + _mapConns = new HashMap!(T,ConnectBase); + } + + + void onConnection ( ConnectBase connection) + { + synchronized(this) + { + trace("----------------put--%s",connection.getProtocol()); + _mapConns.put(connection.getConnection().getId().to!T,connection); + } + } + + void onClosed(ConnectBase connection) + { + if (_onClosed !is null) + { + _onClosed(connection); + } + synchronized(this){ + trace("----------------del--%s",connection.getProtocol()); + _mapConns.remove(connection.getConnection().getId().to!T); + } + } + + ConnectBase getConnection(T connId) + { + synchronized(this) + { + return _mapConns.get(connId); + } + } + + void putConnection(T connId ,ConnectBase conn) + { + synchronized(this) + { + _mapConns.put(connId,conn); + } + } + + void removeConnection(T connId) + { + synchronized(this) + { + _mapConns.remove(connId); + } + } + + bool isExist(T connId) + { + HashMap!(T,ConnectBase) temp = null; + synchronized(this) + { + temp = _mapConns; + } + return temp.containsKey(connId); + } + + void setProtocolName(string name) + { + _protocolName = name; + } + + string getProtocolName() + { + return _protocolName; + } + + void setCloseHandler (CloseCallBack callback) + { + _onClosed = callback; + } + +} \ No newline at end of file diff --git a/source/hunt/imf/EvBuffer.d b/source/hunt/imf/EvBuffer.d new file mode 100644 index 0000000..36a2f55 --- /dev/null +++ b/source/hunt/imf/EvBuffer.d @@ -0,0 +1,76 @@ +module hunt.imf.EvBuffer; + +import std.array; +import std.stdio; + +class EvBuffer(T) { + this( ulong sz = 0){ + _buffer = new T [sz]; + _buf_sz = 0; + } + +public: + + void mergeBuffer ( ref T [] buf) + { + if (buf != null) + { + this._buffer ~= buf; + _buf_sz += buf.length; + } + } + + bool copyOutFromHead (ref T [] buf , ref const ulong len) + { + if (_buf_sz >= len && buf != null) + { + buf[0 .. len] = _buffer [0 .. len]; + return true; + } else + { + return false; + } + } + + bool drainBufferFromHead (ref const ulong len) + { + if (_buf_sz < len) + { + return false; + } else { + _buffer = _buffer[len .. $]; + _buf_sz -= len; + return true; + } + } + + bool removeBufferFromHead (ref T [] buf , ref const ulong len) + { + if (_buf_sz < len) + { + return false; + } else { + buf[0 .. len] = _buffer [0 .. len]; + _buffer = _buffer[len .. $]; + _buf_sz -= len; + return true; + } + } + + void reset(){ + _buffer = new T [0]; + _buf_sz = 0; + } + + ulong getBufferLength () { return this._buf_sz ;} + + T [] getBuffer(){return _buffer;} + + void print () { + writeln("%s",this._buffer); + } + +private: + T [] _buffer; + ulong _buf_sz; +} diff --git a/source/hunt/imf/GatewayApplication.d b/source/hunt/imf/GatewayApplication.d new file mode 100644 index 0000000..a38e410 --- /dev/null +++ b/source/hunt/imf/GatewayApplication.d @@ -0,0 +1,76 @@ +module hunt.imf.GatewayApplication; + +import hunt.net; +import hunt.imf.protocol.Protocol; +import hunt.imf.ConnectionEventBaseHandler; +import hunt.imf.ConnectionManager; +import std.typecons; + +class GatewayApplication +{ + private { + NetServer[string] _servers; + Protocol[string] _protocols; + ConnectionManager!int[string] _mapConnManager; + __gshared GatewayApplication _app = null; + } + + private + { + this () { + } + } + + static GatewayApplication instance() + { + if (_app is null) + _app = new GatewayApplication(); + return _app; + } + + NetServer[string] getServers(){ + return _servers; + } + + + public void addServer(Protocol protocol) + { + protocol.registerHandler(); + _protocols[protocol.getName()] = protocol; + } + + + void registerConnectionManager(string protocolName) + { + if (protocolName in _mapConnManager) + { + return; + }else + { + ConnectionManager!int manager = new ConnectionManager!int(); + _mapConnManager[protocolName] = manager; + } + } + + ConnectionManager!int getConnectionManager(string protocolName) + { + return _mapConnManager.get(protocolName,null); + } + + + void run() + { + foreach(protocol;_protocols) + { + NetServer server = NetUtil.createNetServer!(ThreadMode.Single)(); + server.setCodec(protocol.getCodec()); + server.setHandler(protocol.getHandler()); + if (protocol.getOptions() !is null) + { + server.setOptions(protocol.getOptions()); + } + server.listen(protocol.getHost() ,protocol.getPort()); + _servers[protocol.getName()] = server; + } + } +} diff --git a/source/hunt/imf/MessageBuffer.d b/source/hunt/imf/MessageBuffer.d new file mode 100644 index 0000000..d9a16cf --- /dev/null +++ b/source/hunt/imf/MessageBuffer.d @@ -0,0 +1,49 @@ +module hunt.imf.MessageBuffer; + +import hunt.logging; +import std.bitmanip; +import hunt.util.Serialize; +import std.stdint; + +class MessageBuffer +{ + this(){ + authId = 0; + messageId = -1; + message = new ubyte[0]; + } + + this(long type,ubyte[] msgBody) + { + messageId = type; + message = msgBody; + } + + ubyte[] encode() + { + //ubyte[4] len = nativeToBigEndian(cast(int)message.length); + //ubyte[] data = new ubyte[0]; + //data ~= cast(ubyte)messageId; + //data ~= len; + //data ~= message; + //return data; + ubyte[8] u1 = nativeToBigEndian(authId); + ubyte[8] u2 = nativeToBigEndian(messageId); + ubyte[4] u3 = nativeToBigEndian(cast(int32_t)message.length); + + return u1 ~ u2 ~ u3 ~ message; + } + + static MessageBuffer decode(ubyte[] buff) + { + if (buff.length >= 20) + { + return new MessageBuffer(bigEndianToNative!long(buff[8 .. 16]),buff[20..$]); + } else + return null; + } + + long authId; + long messageId; + ubyte[] message; +} \ No newline at end of file diff --git a/source/hunt/imf/ParserBase.d b/source/hunt/imf/ParserBase.d new file mode 100644 index 0000000..92b0a14 --- /dev/null +++ b/source/hunt/imf/ParserBase.d @@ -0,0 +1,168 @@ +module hunt.imf.ParserBase; + +import hunt.imf.EvBuffer; +import hunt.net.Connection; +import hunt.imf.MessageBuffer; +import hunt.util.Serialize; +import std.bitmanip; +import std.stdint; +import std.string; +import std.stdio; +import std.conv; +import std.typecons; +import std.uri; + +struct HttpContent { + size_t status = 0; + string path; + string method; + string[string] headField; + string[string] parameters; + string body; + + void reset() + { + status= 0; + path = ""; + } +} + +enum Field { + CONTENTLENGTH = "Content-Length" +} + +class ParserBase { + + protected enum string CONTEXT = "REVBUFFER"; + + private const ulong DATAHEADLEN = 20; + + protected const string HTTPHEADEOF = "\r\n\r\n"; + protected const string LINEFEEDS = "\r\n"; + + enum MAX_HTTP_REQUEST_BUFF = 4096; + + public void parserTcpStream (EvBuffer!ubyte src , ubyte [] incr , Connection connection) + { + src.mergeBuffer(incr); + ulong uBufLen = 0; + + while ( (uBufLen = src.getBufferLength()) >= DATAHEADLEN ) + { + auto head = new ubyte [DATAHEADLEN]; + if (!src.copyOutFromHead(head ,DATAHEADLEN)) { break;} + + ulong bodyLength = bigEndianToNative!int32_t(head[16 .. 20]); + + if (bodyLength > 2147483647 || bodyLength < 0) + { + src.reset(); + break; + } + + if (uBufLen >= bodyLength + DATAHEADLEN) + { + MessageBuffer msg = new MessageBuffer(); + msg.messageId = bigEndianToNative!long(head[8 .. 16]); + if (!src.drainBufferFromHead(DATAHEADLEN)) { break;} + msg.message = new ubyte [bodyLength]; + if (bodyLength) + { + if (!src.removeBufferFromHead(msg.message,bodyLength)) {break;} + } + if (connection !is null) + { + ConnectionEventHandler handler = connection.getHandler(); + if(handler !is null) { + handler.messageReceived(connection, msg); + } + } + } else + { + break; + } + } + } + + public void parserHttpStream (EvBuffer!ubyte src , ubyte [] incr , Connection connection) + { + src.mergeBuffer(incr); + if (src.getBufferLength() > MAX_HTTP_REQUEST_BUFF) + { + src.reset(); + return; + } + + ulong head_pos = 0 ; + string buffer = cast(string)src.getBuffer(); + while ((head_pos = indexOf(buffer,HTTPHEADEOF)) != -1) + { + HttpContent content; + string head = cast(string)buffer[0 .. head_pos]; + parserHttpHead(content,head); + auto content_length = content.headField.get(Field.CONTENTLENGTH,null); + if (content_length !is null && (head_pos + 4 + to!int(content_length)) > buffer.length) + { + break; + } + else + { + content.body = buffer[head_pos + 4 .. $]; + MessageBuffer msg = new MessageBuffer(); + msg.messageId = cast(int)hashOf!string(content.path); + msg.message = cast(ubyte[])serialize!HttpContent(content); + if (connection !is null) + { + ConnectionEventHandler handler = connection.getHandler(); + if(handler !is null) { + handler.messageReceived(connection, msg); + } + } + src.reset(); + break; + } + } + } + + private void parserHttpHead(ref HttpContent httpcontent , string headBuff) + { + auto fields = split(headBuff,"\r\n"); + foreach(field;fields) + { + if (field.count("HTTP/")) + { + auto child = split(field," "); + if (child[0].count("HTTP/")) + { + httpcontent.status = to!int(child[1]); + + } else + { + httpcontent.method= child[0]; + string url = decodeComponent(child[1]); + long flag = indexOf(url ,"?"); + httpcontent.path = url[0 .. flag == -1 ? $:flag]; + if (flag != -1) + { + string[] items = url[flag + 1 .. $].split("&"); + foreach( item ; items) + { + if(item != string.init) + { + auto v = item.split("="); + httpcontent.parameters[v[0]] = v[1]; + } + } + } + } + } else + { + long pos = 0; + if ( (pos = indexOf(field ,":")) != -1) + { + httpcontent.headField[strip(field[0 .. pos])] = strip(field[pos+1 ..$]); + } + } + } + } +} \ No newline at end of file diff --git a/source/hunt/imf/Router.d b/source/hunt/imf/Router.d new file mode 100644 index 0000000..62b2604 --- /dev/null +++ b/source/hunt/imf/Router.d @@ -0,0 +1,30 @@ +module hunt.imf.Router; + +import hunt.imf.Command; + +class Router +{ + private + { + __gshared Router _grouter = null; + Command[long] _actions; + } + + static Router instance() + { + if (_grouter is null) + _grouter = new Router(); + return _grouter; + } + + public void registerProcessHandler(M)(int messageId) + { + auto action = new M(); + _actions[messageId] = action; + } + + public Command getProcessHandler(long messageId) + { + return _actions.get(messageId, null); + } +} diff --git a/source/hunt/imf/clients/GatewayClient.d b/source/hunt/imf/clients/GatewayClient.d new file mode 100644 index 0000000..35286e7 --- /dev/null +++ b/source/hunt/imf/clients/GatewayClient.d @@ -0,0 +1,25 @@ +module hunt.imf.clients.GatewayClient; + +import hunt.imf.protocol.protobuf.ProtobufTcpConnection; +import hunt.imf.protocol.Protocol; +import hunt.net; +import std.typecons; +import hunt.imf.ConnectionEventBaseHandler; +import hunt.imf.protocol.protobuf.TcpConnectionEventHandler; +import core.thread; +import core.sync.condition; +import core.sync.mutex; +import hunt.imf.MessageBuffer; +import hunt.imf.ConnectBase; +import hunt.logging; + +interface GatewayClient { + + void onConnection (ConnectBase connection); + + //void sendMsg(int tid,ubyte[] msg) ; + + void connect() ; + + void onClosed (ConnectBase connection); +} \ No newline at end of file diff --git a/source/hunt/imf/clients/GatewayHttpClient.d b/source/hunt/imf/clients/GatewayHttpClient.d new file mode 100644 index 0000000..9a0a57b --- /dev/null +++ b/source/hunt/imf/clients/GatewayHttpClient.d @@ -0,0 +1,80 @@ +module hunt.imf.clients.GatewayHttpClient; + +import hunt.imf.clients.GatewayClient; +import hunt.imf.protocol.Protocol; +import hunt.imf.ConnectBase; +import hunt.imf.ConnectionEventBaseHandler; +import hunt.imf.protocol.http.HttpConnection; +import hunt.imf.MessageBuffer; +import hunt.imf.ParserBase; +import hunt.util.Serialize; +import hunt.net; +import core.thread; +import core.sync.condition; +import core.sync.mutex; +import hunt.logging; + +class GatewayHttpClient : GatewayClient { + + private { + Condition _condition; + ConnectBase _conn = null; + Protocol _protocol; + NetClient _netClient; + } + + this(Protocol protocol) { + _condition = new Condition(new Mutex()); + ConnectionEventBaseHandler handler = cast(ConnectionEventBaseHandler)protocol.getHandler(); + handler.setOnConnection(&this.onConnection); + handler.setOnClosed(&this.onClosed); + handler.setOnMessage(&this.onMessage); + _protocol = protocol; + } + + void onConnection (ConnectBase connection) + { + _condition.mutex().lock(); + _condition.notify(); + _condition.mutex().unlock(); + connection.getConnection().setAttribute("CLIENT"); + _conn = connection; + } + + void connect() + { + NetClient client = NetUtil.createNetClient(); + client.setCodec(_protocol.getCodec()); + client.setHandler(_protocol.getHandler()); + client.connect(_protocol.getHost(),_protocol.getPort()); + _condition.mutex().lock(); + _condition.wait(); + _condition.mutex().unlock(); + _netClient = client; + } + + void sendMsg(ref HttpContent content) + { + if (_conn !is null) + { + MessageBuffer ask = new MessageBuffer(0,cast(ubyte[])serialize!HttpContent(content)); + _conn.sendMsg(ask); + } + } + + + void onMessage(Connection conneciton, Object message) + { + MessageBuffer msg = cast(MessageBuffer)message; + HttpContent content = unserialize!HttpContent(cast(byte[])msg.message); + tracef("%s",content.body); + conneciton.close(); + } + + void onClosed (ConnectBase connection) + { + + } + +} + diff --git a/source/hunt/imf/clients/GatewayTcpClient.d b/source/hunt/imf/clients/GatewayTcpClient.d new file mode 100644 index 0000000..11873b0 --- /dev/null +++ b/source/hunt/imf/clients/GatewayTcpClient.d @@ -0,0 +1,68 @@ +module hunt.imf.clients.GatewayTcpClient; + +import hunt.imf.clients.GatewayClient; +import hunt.imf.protocol.Protocol; +import hunt.imf.ConnectBase; +import hunt.imf.protocol.protobuf.ProtobufTcpConnection; +import hunt.imf.ConnectionEventBaseHandler; +import hunt.imf.protocol.protobuf.TcpConnectionEventHandler; +import hunt.imf.MessageBuffer; +import google.protobuf; +import std.array; +import hunt.net; +import core.thread; +import core.sync.condition; +import core.sync.mutex; + +class GatewayTcpClient : GatewayClient{ + + private { + Condition _condition; + ConnectBase _conn = null; + Protocol _protocol; + NetClient _netClient; + } + + this(Protocol protocol) { + _condition = new Condition(new Mutex()); + ConnectionEventBaseHandler handler = cast(ConnectionEventBaseHandler)protocol.getHandler(); + handler.setOnConnection(&this.onConnection); + handler.setOnClosed(&this.onClosed); + _protocol = protocol; + } + + void onConnection (ConnectBase connection) + { + _condition.mutex().lock(); + _conn = connection; + _condition.notify(); + _condition.mutex().unlock(); + } + + void connect() + { + NetClient client = NetUtil.createNetClient(); + client.setCodec(_protocol.getCodec()); + client.setHandler(_protocol.getHandler()); + client.connect(_protocol.getHost(),_protocol.getPort()); + _condition.mutex().lock(); + _condition.wait(); + _condition.mutex().unlock(); + _netClient = client; + } + + void sendMsg(T)(int tid,T t) + { + if (_conn !is null) + { + MessageBuffer ask = new MessageBuffer(tid,t.toProtobuf.array); + _conn.sendMsg(ask); + } + } + + void onClosed (ConnectBase connection) + { + + } +} + diff --git a/source/hunt/imf/clients/GatewayWebSocketClient.d b/source/hunt/imf/clients/GatewayWebSocketClient.d new file mode 100644 index 0000000..4d1ea95 --- /dev/null +++ b/source/hunt/imf/clients/GatewayWebSocketClient.d @@ -0,0 +1,127 @@ +module hunt.imf.clients.GatewayWebSocketClient; + +import hunt.http.client.ClientHttpHandler; +import hunt.http.client.HttpClient; +import hunt.http.client.HttpClientConnection; +import hunt.http.client.HttpClientRequest; +import hunt.http.HttpOptions; +import hunt.http.HttpConnection; +import hunt.http.codec.http.stream.HttpOutputStream; +import hunt.http.codec.websocket.frame; +import hunt.http.codec.websocket.model.IncomingFrames; +import hunt.http.codec.websocket.stream.WebSocketConnection; +import hunt.http.codec.websocket.stream.WebSocketPolicy; +import hunt.concurrency.Promise; +import hunt.concurrency.Future; +import hunt.concurrency.FuturePromise; +import hunt.concurrency.CompletableFuture; +import hunt.http.client.HttpClientOptions; +import hunt.imf.clients.GatewayClient; +import hunt.imf.protocol.Protocol; +import google.protobuf; +import std.array; +import hunt.imf.ConnectBase; +import hunt.imf.protocol.websocket.WsConnection; +import hunt.net; +import hunt.logging; +import hunt.imf.MessageBuffer; + +class ClientHttpHandlerEx : AbstractClientHttpHandler { + import hunt.http.codec.http.model; + + override public bool messageComplete(HttpRequest request, + HttpResponse response, HttpOutputStream output, HttpConnection connection) { + tracef("upgrade websocket success: " ~ response.toString()); + return true; + } +} + +class IncomingFramesEx : IncomingFrames +{ + private + { + ConnectBase _conn; + } + + void setWsConnection(ConnectBase connection) + { + _conn = connection; + } + + override public void incomingError(Exception t) { + } + override public void incomingFrame(Frame frame) { + FrameType type = frame.getType(); + switch (type) { + case FrameType.TEXT: + { + break ; + } + case FrameType.BINARY: + { + BinaryFrame binFrame = cast(BinaryFrame) frame; + ConnectBase.dispatchMessage( _conn,MessageBuffer.decode( cast(ubyte[])binFrame.getPayload().getRemaining())); + break ; + } + default: + break ; + } + } +} + +class GatewayWebSocketClient : GatewayClient +{ + private + { + HttpClientConnection _connection; + Protocol _protocol; + HttpClientRequest _request; + FuturePromise!WebSocketConnection _promise; + IncomingFramesEx _incomingFramesEx; + ClientHttpHandlerEx _handlerEx; + ConnectBase _conn = null; + } + + this(Protocol protocol) + { + _request = new HttpClientRequest("GET", "/index"); + _promise = new FuturePromise!WebSocketConnection(); + _incomingFramesEx = new IncomingFramesEx(); + _handlerEx = new ClientHttpHandlerEx(); + _protocol = protocol; + } + + + void connect() + { + HttpClient client = new HttpClient(new HttpClientOptions()); + Future!(HttpClientConnection) conn = client.connect(_protocol.getHost(), _protocol.getPort()); + _connection = conn.get(); + _connection.upgradeWebSocket(_request, WebSocketPolicy.newClientPolicy(), + _promise, _handlerEx, _incomingFramesEx); + WebSocketConnection connection = _promise.get(); + _conn = new WsConnection(connection); + _incomingFramesEx.setWsConnection(_conn); + } + + void sendMsg(T)(int tid,T t) + { + if (_conn !is null) + { + MessageBuffer ask = new MessageBuffer(tid,t.toProtobuf.array); + _conn.sendMsg(ask); + } + } + + void onConnection (ConnectBase connection) + { + + } + + void onClosed (ConnectBase connection) + { + + } + +} + diff --git a/source/hunt/imf/core/application.d b/source/hunt/imf/core/application.d deleted file mode 100644 index 527a908..0000000 --- a/source/hunt/imf/core/application.d +++ /dev/null @@ -1,83 +0,0 @@ -module hunt.imf.core.application; - -import hunt.imf.core.dispatcher; -import hunt.imf.io.server; -import hunt.imf.io.client; -import hunt.imf.io.clientext; -import hunt.imf.io.context; - -import hunt.net.NetUtil; - -class Application -{ - - this() - { - _dispatcher = new Dispatcher(); - - } - - Server createServer(string host , int port , string namespace_="") - { - auto server = new Server(_dispatcher, namespace_); - _servers ~= server; - Addr addr = {host , port}; - _server_addrs ~= addr; - return server; - } - - Client createClient(string host , int port , string namespace_="") - { - auto client = new Client(_dispatcher ,namespace_ ); - _clients ~= client; - Addr addr = {host , port}; - _client_addrs ~= addr; - return client; - } - - /// supported reconnect. - Client createClientExt(string host , int port , string namespace_ = "") - { - auto client = new ClientExt(_dispatcher ,namespace_ ); - _clients ~= client; - Addr addr = {host , port}; - _client_addrs ~= addr; - return client; - } - - void run(long timeout = -1) - { - NetUtil.startEventLoop(timeout); - _dispatcher.start(); - for(size_t i = 0 ; i < _servers.length ; i++) - _servers[i].listen(_server_addrs[i].port , _server_addrs[i].host); - for(size_t i = 0 ; i < _clients.length ; i++) - _clients[i].connect(_client_addrs[i].port , _client_addrs[i].host); - } - - void stop() { - NetUtil.stopEventLoop(); - _dispatcher.stop(); - - for(size_t i = 0 ; i < _servers.length ; i++) - _servers[i].stop(); - for(size_t i = 0 ; i < _clients.length ; i++) - _clients[i].stop(); - } - - - private: - - Addr[] _server_addrs; - Addr[] _client_addrs; - Server[] _servers; - Client[] _clients; - Dispatcher _dispatcher; - - - struct Addr - { - string host; - int port; - } -} diff --git a/source/hunt/imf/core/dispatcher.d b/source/hunt/imf/core/dispatcher.d deleted file mode 100644 index 9f887cf..0000000 --- a/source/hunt/imf/core/dispatcher.d +++ /dev/null @@ -1,47 +0,0 @@ -module hunt.imf.core.dispatcher; - -import hunt.imf.io.context; -import hunt.imf.protocol.packet; -import hunt.imf.core.task; - -import std.parallelism:totalCPUs; - -class Dispatcher -{ - this(size_t num = totalCPUs) - { - for(size_t i = 0 ; i < num ; i++) - _taskpool ~= new Task(); - } - - void start() - { - foreach(t ; _taskpool) - t.start(); - - /// for dlang bug. - import core.thread; - import core.time; - Thread.sleep(dur!"nsecs"(1)); - } - - void stop() - { - foreach(t ; _taskpool) - { - t.stop(); - t.join(); - } - } - - void dispatch(Context context , Packet packet) - { - size_t index = context.toHash() % _taskpool.length; - packet.setAttachment(context); - _taskpool[index].push(packet); - } - private: - - Task[] _taskpool; -} - diff --git a/source/hunt/imf/core/routing.d b/source/hunt/imf/core/routing.d deleted file mode 100644 index fc4e867..0000000 --- a/source/hunt/imf/core/routing.d +++ /dev/null @@ -1,103 +0,0 @@ -module hunt.imf.core.routing; - -import std.traits; -import std.conv; -import std.stdint; - - -struct route -{ - int32_t cmd; -} - -struct namespace -{ - string name; -} - -alias VoidProcessFunc = void function(ubyte[]); -alias VoidProcessDele = void delegate(ubyte[]); - -struct RouterData -{ - VoidProcessFunc func; - string className; -} - -class Router -{ - __gshared static RouterData[int64_t][string] g_router; - - static void addRouter(string fullClassName , string ns , int64_t value, VoidProcessFunc func) - { - RouterData data; - data.func = func; - data.className = fullClassName; - g_router[ns][value] = data; - } - - static RouterData * findRouter( string ns , int64_t value) - { - auto updata = ns in g_router; - if(updata == null) - return null; - auto data = value in g_router[ns]; - return data; - } -} - -mixin template MakeRouter(string moduleName = __MODULE__) -{ - mixin("import google.protobuf;"); - mixin(__MakeRouter0!(typeof(this))); - mixin(__MakeRouter1!(typeof(this) , moduleName)); -} - - -string __MakeRouter0(T)() -{ - string str; - foreach (m; __traits(derivedMembers, T)) - { - foreach( u ; getUDAs!(__traits(getMember , T , m) , route)) - { - str ~= "void " ~ m ~ "_message(ubyte[] data){"; - alias classArr = Parameters!(__traits(getMember , T , m)); - static if (classArr.length == 0) - { - str ~= m ~ "();}"; - } - else - { - str ~= "auto proto = new " ~ classArr[0].stringof ~"();"; - str ~= "try{ data.fromProtobuf!"~classArr[0].stringof~"(proto); " ~m~ "(proto); }"; - str ~= "catch(Throwable e){ import hunt.logging;logError(e.msg);} }"; - } - } - } - return str; -} - - -string __MakeRouter1(T , string moduleName)() -{ - string ns = ""; - string str = "shared static this(){"; - - foreach(n ; getSymbolsByUDA!(T,namespace)) - { - if(n.stringof == T.stringof) - ns = getUDAs!(n , namespace)[0].name; - } - - foreach (m; __traits(derivedMembers, T)) - { - foreach( u ; getUDAs!(__traits(getMember , T , m) , route)) - { - str ~= "Router.addRouter(\"" ~ moduleName ~ "." ~ T.stringof ~ "\",\"" ~ ns ~"\" , " ~ to!string(u.cmd) ~ ",&" ~ T.stringof ~ "." ~ m ~ "_message);"; - } - } - str ~= "}"; - return str; -} - diff --git a/source/hunt/imf/core/task.d b/source/hunt/imf/core/task.d deleted file mode 100644 index 9ae8f13..0000000 --- a/source/hunt/imf/core/task.d +++ /dev/null @@ -1,98 +0,0 @@ -module hunt.imf.core.task; - - - -import hunt.imf.protocol.packet; -import hunt.imf.core.routing; -import hunt.imf.io.context; - -import std.stdio; -import std.conv; -import std.stdint; -import std.container : DList; - -import core.thread; -import core.sync.condition; -import core.sync.mutex; - -import hunt.net; - -class Task : Thread -{ - this() - { - _flag = true; - _condition = new Condition(new Mutex()); - super(&run); - } - - void push( Packet packet) - { - synchronized(this){ - _queue.insertBack(packet); - } - _condition.notify(); - } - - void stop() - { - _flag = false; - _condition.notify(); - } - -private: - Packet pop() - { - synchronized(this){ - if(_queue.empty()) - return null; - auto packet = _queue.front(); - _queue.removeFront(); - return packet; - } - } - - void execute(Packet packet) - { - auto context = cast(Context)packet.getAttachment(); - auto data = Router.findRouter(context.ns , packet.message_id); - if( data is null) - { - writeln("can't found router " ~ to!string(packet.message_id)); - return ; - } - - auto obj = Object.factory(data.className); - if( obj is null) - { - writeln("can't create " , data.className); - return; - } - - VoidProcessDele dele; - dele.ptr = cast(void*)obj; - dele.funcptr = data.func; - - setContext(context); - dele(packet.message_data); - } - - void run() - { - while(_flag) - { - _condition.mutex().lock(); - _condition.wait(); - _condition.mutex().unlock(); - Packet packet = null; - while((packet = pop()) !is null) - { - execute(packet); - } - } - } - - bool _flag; - Condition _condition; - DList!Packet _queue; -} \ No newline at end of file diff --git a/source/hunt/imf/io/client.d b/source/hunt/imf/io/client.d deleted file mode 100644 index 3c4aab1..0000000 --- a/source/hunt/imf/io/client.d +++ /dev/null @@ -1,80 +0,0 @@ -module hunt.imf.io.client; - -import hunt.collection.ByteBuffer; -import hunt.net; - -import hunt.imf.core.dispatcher; -import hunt.imf.protocol.parser; -import hunt.imf.protocol.packet; -import hunt.imf.io.context; -import hunt.logging; - -import std.socket; -import std.conv; - -class Client -{ - this(Dispatcher dispatcher , string ns = "") - { - _dispatcher = dispatcher; - _client = NetUtil.createNetClient(); - _namespace = ns; - } - - void connect(int port , string host = "127.0.0.1") - { - string strPort = to!string(port); - AddressInfo[] arr = getAddressInfo(host , strPort , AddressInfoFlags.CANONNAME); - if(arr.length == 0 && arr[0].family == AF_INET) - { - throw new Exception("can't parse " ~ host ~ " or ipv6"); - } - - host = arr[0].address.toAddrString; - - _client.connect(port , host ,0, (Result!NetSocket result){ - if(result.failed()) - { - logError(result.cause.msg); - return; - } - auto tcp = cast(AsynchronousTcpSession)result.result(); - auto context = new Context(_namespace , tcp); - tcp.attachObject(context); - if(_open !is null) - _open(context); - - tcp.closeHandler((){ - if(_close !is null) - _close(context); - }); - tcp.handler((ByteBuffer buffer) { - auto context = cast(Context)tcp.getAttachment(); - auto list = context.parser.consume(cast(byte[])buffer.getRemaining()); - foreach(p ; list) - _dispatcher.dispatch(context , p); - }); - }); - - } - - void stop() { - _client.stop(); - } - - void setOpenHandler(OpenHandler handler) - { - _open = handler; - } - - void setCloseHandler(CloseHandler handler){ - _close = handler; - } - -protected: - string _namespace; - Dispatcher _dispatcher; - NetClient _client; - OpenHandler _open; - CloseHandler _close; -} \ No newline at end of file diff --git a/source/hunt/imf/io/clientext.d b/source/hunt/imf/io/clientext.d deleted file mode 100644 index 1c8f92b..0000000 --- a/source/hunt/imf/io/clientext.d +++ /dev/null @@ -1,90 +0,0 @@ -module hunt.imf.io.clientext; - -import hunt.logging; - -import hunt.imf.io.client; -import hunt.imf.core.dispatcher; -import hunt.imf.protocol.parser; -import hunt.imf.protocol.packet; -import hunt.imf.io.context; - -import hunt.event.timer; -import hunt.util.Timer; -import hunt.net.NetUtil; - -import core.time; - -/// -class ClientExt : Client -{ - OpenHandler _openHandler; - CloseHandler _closeHandler; - Timer _timer; - string _host; - int _port; - - /// - this(Dispatcher dispatcher , string ns = "") - { - super(dispatcher , ns); - } - - /// - override void connect(int port , string host = "127.0.0.1") - { - _host = host; - _port = port; - - super.connect(port , host); - super.setOpenHandler(&openHandler); - super.setCloseHandler(&closeHandler); - } - - override void setOpenHandler(OpenHandler handler) - { - _openHandler = handler; - } - - override void setCloseHandler(CloseHandler handler){ - _closeHandler = handler; - } - - private void onTick(Object sender) - { - logWarning("reconnecting " , _host , " " , _port); - connect(_port , _host); - } - - private void openHandler(Context context) - { - if(_openHandler !is null) - { - _openHandler(context); - } - - //remove timer - if(_timer !is null) - { - _timer.stop(); - _timer = null; - } - } - - private void closeHandler(Context context) - { - if(_closeHandler !is null) - { - _closeHandler(context); - } - if(_timer is null) - { - _timer = new Timer(NetUtil.defaultEventLoopGroup().nextLoop(), 3.seconds); - _timer.onTick(&onTick); - _timer.start(); - } - - } - - - -} \ No newline at end of file diff --git a/source/hunt/imf/io/context.d b/source/hunt/imf/io/context.d deleted file mode 100644 index b3e6813..0000000 --- a/source/hunt/imf/io/context.d +++ /dev/null @@ -1,93 +0,0 @@ -module hunt.imf.io.context; - -import hunt.net; - - -import hunt.imf.protocol.packet; -import hunt.imf.protocol.parser; - -import google.protobuf; - -import std.array; -import std.stdio; -import std.stdint; - -import hunt.Functions; - -alias VoidHandler = SimpleEventHandler; - -class Context -{ - this(string ns , NetSocket sock) - { - _namespace = ns; - _sock = sock; - _parser = new Parser(); - } - - string ns() @property - { - return _namespace; - } - - NetSocket sock() @property - { - return _sock; - } - - Parser parser() @property - { - return _parser; - } - - Object getAttachment() - { - return object; - } - - void setAttachment(Object attachment) - { - object = attachment; - } - string _namespace; - NetSocket _sock; - Parser _parser; - - - Object object; -} - -alias OpenHandler = void delegate(Context context); -alias CloseHandler = void delegate(Context context); - - -static Context g_context; - -Context context() @property -{ - return g_context; -} - -void setContext(Context context) -{ - g_context = context; -} - -void sendMessage(M)(Context context,int64_t message_id , M m , VoidHandler finish = null) -{ - auto packet = new Packet(message_id , m.toProtobuf.array); - auto data = packet.data; - context.sock.write(packet.data); -} - -void sendMessage(Context context,int64_t message_id ,VoidHandler finish = null ) -{ - auto packet = new Packet(message_id); - auto data = packet.data; - context.sock.write(packet.data); -} - -void close(Context context) -{ - context.sock.close(); -} \ No newline at end of file diff --git a/source/hunt/imf/io/server.d b/source/hunt/imf/io/server.d deleted file mode 100644 index af5aab9..0000000 --- a/source/hunt/imf/io/server.d +++ /dev/null @@ -1,72 +0,0 @@ -module hunt.imf.io.server; - -import hunt.collection.ByteBuffer; -import hunt.net; - -import hunt.imf.core.dispatcher; -import hunt.imf.protocol.parser; -import hunt.imf.protocol.packet; -import hunt.imf.io.context; - - - -class Server -{ - this(Dispatcher dispatcher , string ns = "") - { - _namespace = ns; - _dispatcher = dispatcher; - _server = NetUtil.createNetServer!(ServerThreadMode.Single)(); - } - - void listen(int port , string host = "127.0.0.1") - { - alias Server = hunt.net.Server.Server; - _server.listen(host , port , (Result!Server result){ - if(result.failed()) - throw result.cause(); - - }); - _server.connectionHandler((NetSocket sock){ - auto tcp = cast(AsynchronousTcpSession)sock; - auto context = new Context(_namespace , sock); - tcp.attachObject(context); - if(_open !is null){ - _open(context); - } - sock.closeHandler((){ - if(_close !is null) - _close(context); - }); - sock.handler( - (ByteBuffer buffer) { - auto context = cast(Context)tcp.getAttachment(); - auto list = context.parser.consume(cast(byte[])buffer.getRemaining()); - foreach(p ; list) - _dispatcher.dispatch(context , p); - } - ); - }); - } - - void stop() { - _server.stop(); - } - - void setOpenHandler(OpenHandler handler) - { - _open = handler; - } - - void setCloseHandler(CloseHandler handler){ - _close = handler; - } - - -private: - string _namespace; - Dispatcher _dispatcher; - AbstractServer _server; - OpenHandler _open; - CloseHandler _close; -} \ No newline at end of file diff --git a/source/hunt/imf/package.d b/source/hunt/imf/package.d deleted file mode 100644 index a93ea32..0000000 --- a/source/hunt/imf/package.d +++ /dev/null @@ -1,16 +0,0 @@ -module hunt.imf; - -public import hunt.imf.core.application; -public import hunt.imf.core.routing; - -public import hunt.imf.io.context; -public import hunt.imf.io.client; -public import hunt.imf.io.server; -public import hunt.imf.io.clientext; - -public import hunt.imf.protocol.packet; - -public import hunt.imf.utils.room; -public import hunt.imf.utils.element; -public import hunt.imf.utils.singleton; - diff --git a/source/hunt/imf/protocol/Protocol.d b/source/hunt/imf/protocol/Protocol.d new file mode 100644 index 0000000..61232e2 --- /dev/null +++ b/source/hunt/imf/protocol/Protocol.d @@ -0,0 +1,20 @@ +module hunt.imf.protocol.Protocol; + +import hunt.net.codec.Codec; +import hunt.imf.ConnectionEventBaseHandler; +import hunt.net.NetServerOptions; +import hunt.net; +interface Protocol +{ + string getName(); + ushort getPort(); + string getHost(); + ConnectionEventHandler getHandler(); + Codec getCodec(); + NetServerOptions getOptions(); + void registerHandler(); +} + + + + diff --git a/source/hunt/imf/protocol/http/HttpCodec.d b/source/hunt/imf/protocol/http/HttpCodec.d new file mode 100644 index 0000000..e3bf281 --- /dev/null +++ b/source/hunt/imf/protocol/http/HttpCodec.d @@ -0,0 +1,31 @@ +module hunt.imf.protocol.http.HttpCodec; + + +import hunt.net.codec.Codec; +import hunt.net.codec.Encoder; +import hunt.net.codec.Decoder; +import hunt.imf.protocol.http.HttpDecoder; +import hunt.imf.protocol.http.HttpEncoder; + +class HttpCodec : Codec{ + + private HttpEncoder _encoder; + private HttpDecoder _decoder; + + this() { + _encoder = new HttpEncoder(); + _decoder = new HttpDecoder(); + } + + Encoder getEncoder() + { + return _encoder; + } + + Decoder getDecoder() + { + return _decoder; + } + +} + diff --git a/source/hunt/imf/protocol/http/HttpConnection.d b/source/hunt/imf/protocol/http/HttpConnection.d new file mode 100644 index 0000000..368d1bb --- /dev/null +++ b/source/hunt/imf/protocol/http/HttpConnection.d @@ -0,0 +1,53 @@ +module hunt.imf.protocol.http.HttpConnection; + +import hunt.net; +import hunt.imf.ConnectBase; +import hunt.imf.MessageBuffer; +import hunt.imf.EvBuffer; +import hunt.util.Serialize; + +class HttpConnection : ConnectBase { + + private { + Connection _conn = null; + } + this(Connection connection) { + _conn = connection; + } + + void onConnectionClosed() + { + _conn = null; + } + + override void sendMsg(MessageBuffer message) + { + if (_conn.isConnected()) + { + _conn.write(message); + } + } + + override Connection getConnection() + { + return _conn; + } + + override string getProtocol(){ + return null; + } + + override void close() + { + if (_conn !is null && _conn.getState() !is ConnectionState.Closed) + { + _conn.close(); + } + } + + override bool isConnected() + { + return _conn.isConnected(); + } +} + diff --git a/source/hunt/imf/protocol/http/HttpConnectionEventHandler.d b/source/hunt/imf/protocol/http/HttpConnectionEventHandler.d new file mode 100644 index 0000000..f3c372a --- /dev/null +++ b/source/hunt/imf/protocol/http/HttpConnectionEventHandler.d @@ -0,0 +1,91 @@ +module hunt.imf.protocol.http.HttpConnectionEventHandler; + +import hunt.logging; +import hunt.net; +import hunt.imf.MessageBuffer; +import hunt.imf.ConnectBase; +import hunt.imf.protocol.http.HttpConnection; +import hunt.imf.ConnectionEventBaseHandler; + +class HttpConnectionEventHandler : ConnectionEventBaseHandler { + + this() { + } + + override + void connectionOpened(Connection connection) + { + if (_onConnection !is null) + { + auto conn = new HttpConnection(connection); + _onConnection(conn); + } + } + + override + void connectionClosed(Connection connection) + { + if (_onClosed !is null ) + { + auto conn = new HttpConnection(connection); + _onClosed(conn); + } + } + + override + void messageReceived(Connection connection, Object message) + { + MessageBuffer msg = cast(MessageBuffer)message; + if (connection.getAttribute("CLIENT") !is null) + { + if (_onMessage !is null) + { + _onMessage(connection,message); + } + } + else + { + ConnectBase.dispatchMessage(new HttpConnection(connection),msg); + } + } + + override + void exceptionCaught(Connection connection, Throwable t) + { + + } + + override + void failedOpeningConnection(int connectionId, Throwable t) { } + + override + void failedAcceptingConnection(int connectionId, Throwable t) { } + + override + void setOnConnection(ConnCallBack callback) + { + _onConnection = callback; + } + + override + void setOnClosed(ConnCallBack callback) + { + _onClosed = callback; + } + + override + void setOnMessage(MsgCallBack callback) + { + _onMessage = callback; + } + + private + { + string _tag = null; + ConnCallBack _onConnection = null; + ConnCallBack _onClosed = null; + MsgCallBack _onMessage = null; + } + +} + diff --git a/source/hunt/imf/protocol/http/HttpDecoder.d b/source/hunt/imf/protocol/http/HttpDecoder.d new file mode 100644 index 0000000..ed65137 --- /dev/null +++ b/source/hunt/imf/protocol/http/HttpDecoder.d @@ -0,0 +1,35 @@ +module hunt.imf.protocol.http.HttpDecoder; + +import hunt.net.codec.Decoder; +import hunt.collection.ByteBuffer; +import hunt.collection.BufferUtils; +import hunt.net.Connection; +import hunt.imf.ParserBase; + +import hunt.imf.EvBuffer; + + +class HttpDecoder : ParserBase , Decoder { + + this() { + + } + + override void decode(ByteBuffer buf, Connection connection) + { + EvBuffer!ubyte revbuferr = getContext(connection); + parserHttpStream(revbuferr,cast(ubyte[])buf.getRemaining(),connection); + } + + private EvBuffer!ubyte getContext(Connection connection) { + EvBuffer!ubyte revbuferr = null; + revbuferr = cast(EvBuffer!ubyte) connection.getAttribute(CONTEXT); + + if (revbuferr is null) { + revbuferr = new EvBuffer!ubyte ; + connection.setAttribute(CONTEXT, revbuferr); + } + return revbuferr; + } +} + diff --git a/source/hunt/imf/protocol/http/HttpEncoder.d b/source/hunt/imf/protocol/http/HttpEncoder.d new file mode 100644 index 0000000..3ee0f9f --- /dev/null +++ b/source/hunt/imf/protocol/http/HttpEncoder.d @@ -0,0 +1,99 @@ +module hunt.imf.protocol.http.HttpEncoder; + +import hunt.net.codec.Encoder; +import hunt.net.Connection; +import hunt.imf.ParserBase; +import hunt.imf.MessageBuffer; +import hunt.collection.ByteBuffer; +import hunt.collection.BufferUtils; +import hunt.Exceptions; +import std.bitmanip; +import hunt.util.Serialize; +import std.conv; +import std.stdio; +import std.format; +import std.array; +import std.uri; + +class HttpEncoder : ParserBase , Encoder { + + this() { + } + + override void encode(Object message, Connection connection) + { + auto msg = cast(MessageBuffer)message; + HttpContent content= unserialize!HttpContent(cast(byte[])msg.message); + content.headField[Field.CONTENTLENGTH] = to!string(content.body.length); + auto appender = appender!string; + appender.reserve(1024); + if (content.path.length != 0) + { + appender.put(content.method); + appender.put(" "); + appender.put(generateUrl(content)); + appender.put(" HTTP/1.1"); + appender.put(LINEFEEDS); + appender.put(generateHeadField(content)); + } + else + { + appender.put("HTTP/1.1 "); + appender.put(to!string(content.status)); + appender.put(LINEFEEDS); + appender.put(generateHeadField(content)); + } + appender.put(content.body); + // writefln(appender[]); + connection.write(appender[]); + } + + void setBufferSize(int size) + { + + } + + string generateUrl(ref HttpContent content) + { + auto appender = appender!string; + appender.reserve(1024); + + appender.put(content.path); + if (content.parameters.length != 0) + { + appender.put("?"); + } + bool begin = true; + foreach (key ; content.parameters.keys) + { + if (begin) + { + begin = !begin; + } + else + { + appender.put("&"); + } + + appender.put(key); + appender.put("="); + appender.put(content.parameters[key]); + } + return encodeComponent(appender[]); + } + + string generateHeadField (ref HttpContent content) + { + auto appender = appender!string; + foreach (key ; content.headField.keys) + { + appender.put(key); + appender.put(": "); + appender.put(content.headField[key]); + appender.put(LINEFEEDS); + } + appender.put(content.headField.length == 0?HTTPHEADEOF:LINEFEEDS); + return appender[]; + } +} + diff --git a/source/hunt/imf/protocol/http/HttpProtocol.d b/source/hunt/imf/protocol/http/HttpProtocol.d new file mode 100644 index 0000000..19037dd --- /dev/null +++ b/source/hunt/imf/protocol/http/HttpProtocol.d @@ -0,0 +1,62 @@ +module hunt.imf.protocol.http.HttpProtocol; + +import hunt.imf.protocol.Protocol; +import hunt.net.codec.Codec; +import hunt.imf.ConnectionEventBaseHandler; +import hunt.imf.protocol.http.HttpConnectionEventHandler; +import hunt.imf.protocol.http.HttpCodec; +import hunt.imf.GatewayApplication; +import hunt.net.NetServerOptions; +import hunt.imf.ConnectionManager; +import hunt.net; + +class HttpProtocol : Protocol +{ + private { + string _host; + ushort _port; + enum string _name = typeof(this).stringof; + ConnectionEventBaseHandler _handler; + NetServerOptions _options = null; + Codec _codec; + } + + this(string host , ushort port) + { + _host = host; + _port = port; + _handler = new HttpConnectionEventHandler(); + _codec = new HttpCodec(); + } + + override void registerHandler() + { + GatewayApplication.instance().registerConnectionManager(_name); + ConnectionManager!int manager = GatewayApplication.instance().getConnectionManager(_name); + _handler.setOnConnection(&manager.onConnection); + } + + void setCodec(Codec codec) { + _codec = codec; + } + + void setConnectionEventHandler(ConnectionEventBaseHandler handler) + { + _handler = handler; + } + + override NetServerOptions getOptions() + { + return _options; + } + + override string getName() {return _name;} + + override ushort getPort() {return _port;} + + override ConnectionEventHandler getHandler() {return _handler;} + + override Codec getCodec() {return _codec;} + + override string getHost() {return _host;} +} diff --git a/source/hunt/imf/protocol/packet.d b/source/hunt/imf/protocol/packet.d deleted file mode 100644 index 180f934..0000000 --- a/source/hunt/imf/protocol/packet.d +++ /dev/null @@ -1,49 +0,0 @@ -module hunt.imf.protocol.packet; - -import std.stdint; -import std.bitmanip; - -class Packet -{ - enum HEADERLEN = 20; - long auth_key_id = 0; - long message_id; - int32_t message_data_length; - ubyte[] message_data; - Object object; - - this(long message_id , ubyte[] message_data) - { - this.message_id = message_id; - this.message_data_length = cast(int32_t)message_data.length; - this.message_data = message_data; - } - - this(long message_id) - { - this.message_id = message_id; - this.message_data_length = 0; - } - - ubyte[] data() @property - { - ubyte[8] u1 = nativeToBigEndian(auth_key_id); - ubyte[8] u2 = nativeToBigEndian(message_id); - ubyte[4] u3 = nativeToBigEndian(message_data_length); - - return u1 ~ u2 ~ u3 ~ message_data; - } - - Object getAttachment() - { - return object; - } - - void setAttachment(Object attachment) - { - object = attachment; - } - - - -} \ No newline at end of file diff --git a/source/hunt/imf/protocol/parser.d b/source/hunt/imf/protocol/parser.d deleted file mode 100644 index 9960718..0000000 --- a/source/hunt/imf/protocol/parser.d +++ /dev/null @@ -1,80 +0,0 @@ -module hunt.imf.protocol.parser; - -import hunt.imf.protocol.packet; - -import std.bitmanip; -import std.stdint; - -class Parser -{ - ubyte[Packet.HEADERLEN] headerbuffer; - int32_t headerlen = 0; - - int32_t message_data_len; - ubyte[] message_data; - - - - /// - Packet[] consume(byte[] buffer) - { - Packet[] result; - size_t length = buffer.length; - size_t index = 0; - size_t used; - while(index < length) - { - /// in header. - size_t left = length - index; - if(headerlen < Packet.HEADERLEN) - { - if(left >= Packet.HEADERLEN - headerlen) - { - used = Packet.HEADERLEN - headerlen; - for(size_t i = 0 ; i < used ; i++) - headerbuffer[headerlen + i] = buffer[index + i]; - index += used; - headerlen += used; - - message_data_len = bigEndianToNative!int32_t(headerbuffer[16 .. 20]); - message_data.length = 0; - - if(message_data_len == 0) - { - long message_id = bigEndianToNative!long(headerbuffer[8 .. 16]); - result ~= new Packet(message_id); - headerlen = 0; - } - - } - else - { - for(size_t i = 0 ; i < left ; i++) - headerbuffer[headerlen + i] = buffer[index + i]; - index += left; - headerlen += left; - } - } - else - { - - if( left >= message_data_len - message_data.length) - { - long message_id = bigEndianToNative!long(headerbuffer[8 .. 16]); - used = message_data_len - message_data.length; - message_data ~= buffer[index .. index + used]; - result ~= new Packet(message_id , message_data); - headerlen = 0; - index += used; - } - else - { - message_data ~= buffer[index .. index + left]; - index += left; - } - } - } - return result; - } - -} \ No newline at end of file diff --git a/source/hunt/imf/protocol/protobuf/ProtobufCodec.d b/source/hunt/imf/protocol/protobuf/ProtobufCodec.d new file mode 100644 index 0000000..2342e36 --- /dev/null +++ b/source/hunt/imf/protocol/protobuf/ProtobufCodec.d @@ -0,0 +1,29 @@ +module hunt.imf.protocol.protobuf.ProtobufCodec; + +import hunt.net.codec.Codec; +import hunt.net.codec.Encoder; +import hunt.net.codec.Decoder; +import hunt.imf.protocol.protobuf.ProtobufDecoder; +import hunt.imf.protocol.protobuf.ProtobufEncoder; + + +class ProtobufCodec : Codec +{ + private ProtobufEncoder _encoder = null; + private ProtobufDecoder _decoder = null; + + this() { + _encoder = new ProtobufEncoder(); + _decoder = new ProtobufDecoder(); + } + + Encoder getEncoder() + { + return _encoder; + } + + Decoder getDecoder() + { + return _decoder; + } +} \ No newline at end of file diff --git a/source/hunt/imf/protocol/protobuf/ProtobufDecoder.d b/source/hunt/imf/protocol/protobuf/ProtobufDecoder.d new file mode 100644 index 0000000..15c52d3 --- /dev/null +++ b/source/hunt/imf/protocol/protobuf/ProtobufDecoder.d @@ -0,0 +1,38 @@ +module hunt.imf.protocol.protobuf.ProtobufDecoder; + +import hunt.imf.ParserBase; +import hunt.net.codec.Decoder; +import hunt.net.Connection; +import hunt.net.Exceptions; + +import hunt.collection.ByteBuffer; +import hunt.collection.BufferUtils; +import hunt.Exceptions; +import hunt.logging.ConsoleLogger; +import hunt.String; +import hunt.imf.EvBuffer; + +import std.algorithm; +import std.conv; + + +class ProtobufDecoder : ParserBase , Decoder { + + override void decode(ByteBuffer buf, Connection connection) + { + EvBuffer!ubyte revbuferr = getContext(connection); + parserTcpStream(revbuferr,cast(ubyte[])buf.getRemaining(),connection); + } + + + private EvBuffer!ubyte getContext(Connection connection) { + EvBuffer!ubyte revbuferr = null; + revbuferr = cast(EvBuffer!ubyte) connection.getAttribute(CONTEXT); + + if (revbuferr is null) { + revbuferr = new EvBuffer!ubyte ; + connection.setAttribute(CONTEXT, revbuferr); + } + return revbuferr; + } +} \ No newline at end of file diff --git a/source/hunt/imf/protocol/protobuf/ProtobufEncoder.d b/source/hunt/imf/protocol/protobuf/ProtobufEncoder.d new file mode 100644 index 0000000..4612041 --- /dev/null +++ b/source/hunt/imf/protocol/protobuf/ProtobufEncoder.d @@ -0,0 +1,38 @@ +module hunt.imf.protocol.protobuf.ProtobufEncoder; + +import hunt.net.codec.Encoder; +import hunt.net.Connection; +import hunt.imf.ParserBase; +import hunt.imf.MessageBuffer; +import hunt.collection.ByteBuffer; +import hunt.collection.BufferUtils; +import hunt.Exceptions; +import std.bitmanip; +import std.conv; +import std.stdio; +import std.stdint; + +class ProtobufEncoder : ParserBase , Encoder { + + override void encode(Object message, Connection connection) + { + auto msg = cast(MessageBuffer)message; + ubyte[] msgBody = msg.message; + + if (msgBody.length > 2147483647 || msgBody.length < 0 ) + { + return; + } + + ubyte[8] u1 = nativeToBigEndian(msg.authId); + ubyte[8] u2 = nativeToBigEndian(msg.messageId); + ubyte[4] u3 = nativeToBigEndian(cast(int32_t)msgBody.length); + + connection.write(u1 ~ u2 ~ u3 ~ msgBody); + } + + void setBufferSize(int size) + { + + } +} \ No newline at end of file diff --git a/source/hunt/imf/protocol/protobuf/ProtobufProtocol.d b/source/hunt/imf/protocol/protobuf/ProtobufProtocol.d new file mode 100644 index 0000000..f1af57f --- /dev/null +++ b/source/hunt/imf/protocol/protobuf/ProtobufProtocol.d @@ -0,0 +1,77 @@ +module hunt.imf.protocol.protobuf.ProtobufProtocol; + +import hunt.imf.protocol.Protocol; +import hunt.net.codec.Codec; +import hunt.imf.ConnectionEventBaseHandler; + +import hunt.imf.protocol.protobuf.TcpConnectionEventHandler; +import hunt.imf.protocol.protobuf.ProtobufCodec; +import hunt.imf.GatewayApplication; +import hunt.net.NetServerOptions; +import hunt.imf.ConnectionManager; +import hunt.imf.ConnectBase; +import hunt.net; + +class ProtobufProtocol : Protocol { + + alias CloseCallBack = void delegate(ConnectBase connection); + + private { + string _host; + ushort _port; + enum string _name = typeof(this).stringof; + ConnectionEventBaseHandler _handler; + NetServerOptions _options = null; + Codec _codec; + } + + this(string host , ushort port) + { + _host = host; + _port = port; + _handler = new TcpConnectionEventHandler(_name); + _codec = new ProtobufCodec(); + } + + override void registerHandler() + { + GatewayApplication.instance().registerConnectionManager(_name); + ConnectionManager!int manager = GatewayApplication.instance().getConnectionManager(_name); + _handler.setOnConnection(&manager.onConnection); + _handler.setOnClosed(&manager.onClosed); + } + + void setDisConnectHandler (CloseCallBack handler) + { + GatewayApplication.instance().registerConnectionManager(_name); + ConnectionManager!int manager = GatewayApplication.instance().getConnectionManager(_name); + if (manager !is null ) + { + manager.setCloseHandler(handler); + } + } + + void setCodec(Codec codec) { + _codec = codec; + } + + void setConnectionEventHandler(ConnectionEventBaseHandler handler) + { + _handler = handler; + } + + override NetServerOptions getOptions() + { + return _options; + } + + override string getName() {return _name;} + + override ushort getPort() {return _port;} + + override ConnectionEventHandler getHandler() {return _handler;} + + override Codec getCodec() {return _codec;} + + override string getHost() {return _host;} +} diff --git a/source/hunt/imf/protocol/protobuf/ProtobufTcpConnection.d b/source/hunt/imf/protocol/protobuf/ProtobufTcpConnection.d new file mode 100644 index 0000000..9a41bf5 --- /dev/null +++ b/source/hunt/imf/protocol/protobuf/ProtobufTcpConnection.d @@ -0,0 +1,59 @@ +module hunt.imf.protocol.protobuf.ProtobufTcpConnection; + +import hunt.net; +import hunt.imf.ConnectBase; +import hunt.imf.EvBuffer; +import hunt.String; +import hunt.imf.MessageBuffer; +import std.stdio; +import google.protobuf; +import std.array; + +class ProtobufTcpConnection : ConnectBase { + +private { + Connection _conn = null; +} +public: + + this(Connection connection) + { + _conn = connection; + } + + void onConnectionClosed() + { + _conn = null; + } + + override void sendMsg(MessageBuffer message) + { + if (_conn.isConnected()) + { + _conn.write(message); + } + } + + override Connection getConnection() + { + return _conn; + } + + override string getProtocol() + { + return (cast(String)_conn.getAttribute(SESSION.PROTOCOL)).value; + } + + override void close() + { + if (_conn !is null && _conn.getState() !is ConnectionState.Closed) + { + _conn.close(); + } + } + + override bool isConnected() + { + return _conn.isConnected(); + } +} \ No newline at end of file diff --git a/source/hunt/imf/protocol/protobuf/TcpConnectionEventHandler.d b/source/hunt/imf/protocol/protobuf/TcpConnectionEventHandler.d new file mode 100644 index 0000000..b22d1d3 --- /dev/null +++ b/source/hunt/imf/protocol/protobuf/TcpConnectionEventHandler.d @@ -0,0 +1,88 @@ +module hunt.imf.protocol.protobuf.TcpConnectionEventHandler; + +import hunt.net; +import hunt.imf.protocol.protobuf.ProtobufTcpConnection; +import hunt.imf.ConnectionEventBaseHandler; +import hunt.imf.ConnectBase; +import hunt.imf.MessageBuffer; +import hunt.String; +import std.stdio; + +class TcpConnectionEventHandler : ConnectionEventBaseHandler +{ + //alias ConnCallBack = void delegate(ConnectBase connection); + //alias MsgCallBack = void delegate(Connection connection ,Object message); + + this(string attribute){ + _attribute = attribute; + } + + override + void connectionOpened(Connection connection) + { + if (_onConnection !is null) + { + connection.setAttribute(SESSION.PROTOCOL,new String(_attribute)); + ProtobufTcpConnection conn = new ProtobufTcpConnection(connection); + _onConnection(conn); + } + } + + override + void connectionClosed(Connection connection) + { + connection.setState(ConnectionState.Closed); + if (_onClosed !is null ) + { + ProtobufTcpConnection conn = new ProtobufTcpConnection(connection); + _onClosed(conn); + } + } + + override + void messageReceived(Connection connection, Object message) + { + MessageBuffer msg = cast(MessageBuffer)message; + ConnectBase.dispatchMessage(new ProtobufTcpConnection(connection),msg); + } + + override + void exceptionCaught(Connection connection, Throwable t) + { + + } + + override + void failedOpeningConnection(int connectionId, Throwable t) { } + + override + void failedAcceptingConnection(int connectionId, Throwable t) { } + + override + void setOnConnection(ConnCallBack callback) + { + _onConnection = callback; + } + + override + void setOnClosed(ConnCallBack callback) + { + _onClosed = callback; + } + + override + void setOnMessage(MsgCallBack callback) + { + _onMessage = callback; + } + +private +{ + string _attribute = null; + ConnCallBack _onConnection = null; + ConnCallBack _onClosed = null; + MsgCallBack _onMessage = null; +} + +} + diff --git a/source/hunt/imf/protocol/websocket/WsCodec.d b/source/hunt/imf/protocol/websocket/WsCodec.d new file mode 100644 index 0000000..18f5704 --- /dev/null +++ b/source/hunt/imf/protocol/websocket/WsCodec.d @@ -0,0 +1,35 @@ +module hunt.imf.protocol.websocket.WsCodec; + +import hunt.net.codec.Codec; +import hunt.http.codec.CommonDecoder; +import hunt.http.codec.CommonEncoder; +import hunt.net.codec.Encoder; +import hunt.net.codec.Decoder; +import hunt.http.server.Http1ServerDecoder; +import hunt.http.server.Http2ServerDecoder; +import hunt.http.codec.websocket.decode.WebSocketDecoder; + +class WsCodec : Codec { + + private CommonEncoder encoder = null; + private CommonDecoder decoder = null; + + this() { + encoder = new CommonEncoder(); + + Http1ServerDecoder httpServerDecoder = new Http1ServerDecoder( + new WebSocketDecoder(), + new Http2ServerDecoder()); + decoder = new CommonDecoder(httpServerDecoder); + } + + Encoder getEncoder() { + return encoder; + } + + Decoder getDecoder() { + return decoder; + } + +} + diff --git a/source/hunt/imf/protocol/websocket/WsConnection.d b/source/hunt/imf/protocol/websocket/WsConnection.d new file mode 100644 index 0000000..3cd84f3 --- /dev/null +++ b/source/hunt/imf/protocol/websocket/WsConnection.d @@ -0,0 +1,58 @@ +module hunt.imf.protocol.websocket.WsConnection; + +import hunt.http.codec.websocket.stream.WebSocketConnection; +import hunt.imf.ConnectBase; +import hunt.imf.MessageBuffer; +import hunt.http.codec.websocket.frame; +import hunt.net; +import hunt.http.HttpConnection; +import hunt.String; +import std.stdio; + +class WsConnection : ConnectBase { + + private { + WebSocketConnection _conn = null; + } + + this(WebSocketConnection connection) { + _conn = connection; + } + + override void sendMsg(MessageBuffer message) + { + if (_conn.getTcpConnection().isConnected()) + { + _conn.sendData(cast(byte[])message.encode()); + } + } + + WebSocketConnection getConnection() + { + return _conn; + } + + override string getProtocol() + { + return (cast(String)_conn.getTcpConnection().getAttribute(SESSION.PROTOCOL)).value; + } + + override Connection getConnection() + { + return _conn.getTcpConnection(); + } + + override void close() + { + if (_conn !is null && _conn.getTcpConnection().getState !is ConnectionState.Closed) + { + _conn.getTcpConnection().close(); + } + } + + override bool isConnected() + { + return _conn.getTcpConnection().isConnected(); + } +} + diff --git a/source/hunt/imf/protocol/websocket/WsConnectionEventHandler.d b/source/hunt/imf/protocol/websocket/WsConnectionEventHandler.d new file mode 100644 index 0000000..fa00dff --- /dev/null +++ b/source/hunt/imf/protocol/websocket/WsConnectionEventHandler.d @@ -0,0 +1,100 @@ +module hunt.imf.protocol.websocket.WsConnectionEventHandler; + +import hunt.imf.ConnectionEventBaseHandler; +import hunt.imf.ConnectBase; +import hunt.imf.MessageBuffer; +import hunt.http.server.WebSocketHandler; +import hunt.http.codec.http.model; +import hunt.http.codec.http.stream; +import hunt.http.codec.websocket.frame; +import hunt.http.codec.websocket.model; +import hunt.http.codec.websocket.stream.WebSocketConnection; +import hunt.http.codec.websocket.stream.IOState; +import hunt.imf.protocol.websocket.WsConnection; +import hunt.Byte; +import hunt.util.Serialize; +import hunt.String; +import hunt.logging; + + +class WsConnectionEventHandler : WebSocketHandler { + + alias ConnCallBack = void delegate(ConnectBase connection); + alias CloseConnCallBack = void delegate(ConnectBase connection ); + alias MsgCallBack = void delegate(WebSocketConnection connection ,Frame message); + + this(string attribute) + { + _attribute = attribute; + } + + override + void onConnect(WebSocketConnection webSocketConnection) { + import hunt.net.Connection; + if (_onConnection !is null) + { + webSocketConnection.getTcpConnection.setAttribute(SESSION.PROTOCOL,new String(_attribute)); + WsConnection conn = new WsConnection(webSocketConnection); + _onConnection(conn); + } + webSocketConnection.onClose((HttpConnection conn){ + conn.getTcpConnection().setState(ConnectionState.Closed); + if (_onClosed !is null) + { + _onClosed(new WsConnection(webSocketConnection)); + } + }); + } + + override + void onError(Exception t, WebSocketConnection connection) { + + } + + override + void onFrame(Frame frame, WebSocketConnection conn) { + + FrameType type = frame.getType(); + switch (type) { + case FrameType.TEXT: + { + break; + } + case FrameType.BINARY: + { + BinaryFrame binFrame = cast(BinaryFrame) frame; + ConnectBase.dispatchMessage(new WsConnection(conn),MessageBuffer.decode(cast(ubyte[])binFrame.getPayload().getRemaining())); + break; + } + default: + break; + } + } + + + void setOnConnection(ConnCallBack callback) + { + _onConnection = callback; + } + + + void setOnClosed(CloseConnCallBack callback) + { + _onClosed = callback; + } + + + void setOnMessage(MsgCallBack callback) + { + _onMessage = callback; + } + + private + { + string _attribute = null; + ConnCallBack _onConnection = null; + CloseConnCallBack _onClosed = null; + MsgCallBack _onMessage = null; + } +} + diff --git a/source/hunt/imf/protocol/websocket/WsProtocol.d b/source/hunt/imf/protocol/websocket/WsProtocol.d new file mode 100644 index 0000000..7800a52 --- /dev/null +++ b/source/hunt/imf/protocol/websocket/WsProtocol.d @@ -0,0 +1,106 @@ +module hunt.imf.protocol.websocket.WsProtocol; + +import hunt.imf.protocol.Protocol; +import hunt.net.codec.Codec; +import hunt.imf.ConnectionEventBaseHandler; +import hunt.http.server.HttpServerOptions; +import hunt.imf.protocol.websocket.WsConnectionEventHandler; +import hunt.http.HttpOptions; +import hunt.http.server.ServerHttpHandler; +import hunt.http.codec.http.stream; +import hunt.http.server.Http2ServerRequestHandler; +import hunt.http.server.ServerSessionListener; +import hunt.http.codec.http.model.MetaData; +import hunt.net.NetServerOptions; +import hunt.http.server.WebSocketHandler; +import hunt.http.server.HttpServerHandler; +import hunt.net.Connection; +import hunt.imf.ConnectionManager; +import hunt.imf.protocol.websocket.WsConnectionEventHandler; +import hunt.imf.protocol.websocket.WsCodec; +import hunt.imf.GatewayApplication; +import hunt.logging; +import hunt.imf.ConnectBase; + +class WsProtocol : Protocol{ + + alias CloseCallBack = void delegate(ConnectBase connection); + + private { + string _host; + ushort _port; + enum string _name = typeof(this).stringof; + ConnectionEventHandler _handler = null; + NetServerOptions _options = null; + ServerHttpHandler _serverHandler = null; + ServerSessionListener _listener = null; + WsConnectionEventHandler _eventHandler = null; + HttpServerOptions _httpOptions = null; + Codec _codec; + } + + this(string host , ushort port) + { + _host = host; + _port = port; + _codec = new WsCodec(); + + HttpServerOptions config = new HttpServerOptions(); + _options = config.getTcpConfiguration(); + if(_options is null ) { + _options = new NetServerOptions(); + config.setTcpConfiguration(_options); + } + _httpOptions = config; + _serverHandler = new class ServerHttpHandlerAdapter { + override + bool messageComplete(HttpRequest request, HttpResponse response, + HttpOutputStream output, + HttpConnection connection) { + return true; + } + }; + _listener = new Http2ServerRequestHandler(_serverHandler); + _eventHandler = new WsConnectionEventHandler(_name); + } + + override void registerHandler() + { + GatewayApplication.instance().registerConnectionManager(_name); + ConnectionManager!int manager = GatewayApplication.instance().getConnectionManager(_name); + _eventHandler.setOnConnection(&manager.onConnection); + _eventHandler.setOnClosed(&manager.onClosed); + } + + void setDisConnectHandler (CloseCallBack handler) + { + GatewayApplication.instance().registerConnectionManager(_name); + ConnectionManager!int manager = GatewayApplication.instance().getConnectionManager(_name); + if (manager !is null ) + { + manager.setCloseHandler(handler); + } + } + + override NetServerOptions getOptions() + { + return _options; + } + + override string getName() {return _name;} + + override ushort getPort() {return _port;} + + override ConnectionEventHandler getHandler() + { + if (_handler is null) + { + _handler = new HttpServerHandler(_httpOptions, _listener,_serverHandler, _eventHandler); + } + return _handler; + } + + override Codec getCodec() {return _codec;} + + override string getHost() {return _host;} +} diff --git a/source/hunt/imf/utils/element.d b/source/hunt/imf/utils/element.d deleted file mode 100644 index a55b969..0000000 --- a/source/hunt/imf/utils/element.d +++ /dev/null @@ -1,58 +0,0 @@ -module hunt.imf.utils.element; -import hunt.imf.io.context; - -class Element -{ - this(Context context) - { - _context = context; - } - - Context context() @property - { - return _context; - } - - private: - Context _context; - -} - - - -unittest -{ - class E1 : Element - { - this(Context context) - { - super(context); - } - } - - class E2 : E1 - { - this(Context context) - { - super(context); - } - } - - import hunt.imf.utils.room; - import hunt.imf.utils.singleton; - auto room = new Room!(size_t , Element)(); - auto room1 = new Room!(size_t , E1)(); - auto room2 = new Room!(size_t , E2)(); - - import std.stdio; - void test(string[] arg...) - { - - writeln(arg.length); - } - - writeln(Singleton!(Room!(size_t , Element)).instance().length); - - test("test"); - test(["test"]); -} \ No newline at end of file diff --git a/source/hunt/imf/utils/room.d b/source/hunt/imf/utils/room.d deleted file mode 100644 index c041b93..0000000 --- a/source/hunt/imf/utils/room.d +++ /dev/null @@ -1,105 +0,0 @@ -module hunt.imf.utils.room; -import hunt.imf.utils.element; -import std.traits; -import hunt.imf.io.context; - -class Room(K = size_t , E = Element) -{ - bool exists(K key) - { - synchronized(this) - { - auto e = key in _hash; - if(e is null) - return false; - else - return true; - } - } - - bool add(K key , E entity) - { - synchronized(this) - { - if( key in _hash) - return false; - _hash[key] = entity; - return true; - } - } - - void findEx(K key , void delegate(E e) dele) - { - synchronized(this) - { - auto e = key in _hash; - if( e is null) - dele(null); - else - dele(*e); - } - } - - bool remove(K key) - { - synchronized(this) - { - return _hash.remove(key); - } - } - - static if ( is(E == Element) || is(BaseClassesTuple!E[$-2] == Element)) - { - void broadCast(C )(C c , K[] excepts...) const - { - synchronized(this) - { - import std.algorithm.searching; - foreach(k , ref v ; _hash) - { - if(find!(" a == b")(excepts , k).length == 0) - v.context.sendMessage( c); - } - } - } - } - - static if ( is(E == Element) || is(BaseClassesTuple!E[$-2] == Element)) - { - void broadCast(C , T)(C c , T t , K[] excepts...) - { - synchronized(this) - { - import std.algorithm.searching; - foreach(k , ref v ; _hash) - { - if(find!(" a == b")(excepts , k).length == 0) - v.context.sendMessage(c , t); - } - } - } - } - - void traverse(void delegate( K , const E) dele) - { - synchronized(this) - { - foreach( k , v ; _hash) - { - dele(k , v); - } - } - } - - size_t length() const - { - synchronized(this) - { - return _hash.length; - } - } - - private: - E[K] _hash; - -} \ No newline at end of file diff --git a/source/hunt/imf/utils/singleton.d b/source/hunt/imf/utils/singleton.d deleted file mode 100644 index 5c821eb..0000000 --- a/source/hunt/imf/utils/singleton.d +++ /dev/null @@ -1,23 +0,0 @@ -module hunt.imf.utils.singleton; - -import core.sync.mutex; - -class Singleton(T) -{ - static T instance() @property - { - __gshared T g_t; - if(g_t is null) - { - synchronized - { - if(g_t is null) - { - g_t = new T(); - } - } - } - return g_t; - } - -} \ No newline at end of file