Skip to content

Commit

Permalink
Merge branch 'multithread-run'
Browse files Browse the repository at this point in the history
  • Loading branch information
mendelgusmao committed Mar 21, 2012
2 parents e8c8512 + b5ae66f commit f903355
Show file tree
Hide file tree
Showing 14 changed files with 131 additions and 104 deletions.
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CGI FOR LCD SMARTIE 0.1-alpha
CGI FOR LCD SMARTIE 0.2-alpha
=============================

> CGI for LCD Smartie is designed to be a fast gateway between LCD Smartie ([http://lcdsmartie.sourceforge.net](http://lcdsmartie.sourceforge.net)) and language
Expand Down Expand Up @@ -66,7 +66,10 @@ CONFIGURING
> The default file extension to be considered by the plugin if not specified in the filename passed to $dll
> * add_and_run (boolean, 0-1)
> If the value is 1, the server will execute the command right after it is added to the queue. If not, the server will await the configured interval to run the command and will be returning an empty response until it happens.
> If the value is 1, the server will execute the command right after it is added to the queue. If not, the server will await the configured interval to run the command and will be returning an empty response until it happens.
> * max_threads (numeric)
> The maximum number of commands that can be run simultaneously
> The subsequent sections are named with the common file extension of the language. They have the following attributes:
Expand Down
1 change: 1 addition & 0 deletions scripts/cgi4lcd.ini
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ refresh=1
port=65432
default_extension="php"
add_and_run=1
max_threads=4

[php]
language="PHP"
Expand Down
22 changes: 1 addition & 21 deletions src/Common/Command.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using std::string;

Command::Command() {
is_running = false;
time(&last_execution);
time(&last_request);
}
Expand All @@ -12,24 +13,3 @@ string Command::line() {
return executable + " " + arguments;
}

void Command::run() {

char psBuffer[128];
FILE *iopipe;

iopipe = _popen(line().c_str(), "r");

if (iopipe == NULL) {
response = "[CGI4LCD] Error running...";
}
else {
response = "";

while(!feof(iopipe)) {
if(fgets(psBuffer, 128, iopipe) != NULL) {
response += string(psBuffer);
}
}
_pclose(iopipe);
}
}
2 changes: 1 addition & 1 deletion src/Common/Command.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ class Command {
bool is_internal;
bool do_not_queue;
bool add_and_run;
bool is_running;

Command();
string line();
void run();
};

#endif // COMMAND_H
Expand Down
42 changes: 21 additions & 21 deletions src/Common/Protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,48 +8,48 @@ using boost::algorithm::split;
Command Protocol::parse(const string &data) {

vector<string> packet;
Command cmd;
Command command;

split(packet, data, boost::is_any_of(PROTOCOL_DELIMITER));

if (packet.size() != PROTOCOL_EXPECTED_SIZE) {
cmd.is_malformed = true;
command.is_malformed = true;
}
else {
if (packet[0] != PROTOCOL_HEADER) {
cmd.is_malformed = true;
command.is_malformed = true;
}
else if (packet[1] == "command" && packet[2] != "") {
cmd.is_internal = true;
cmd.executable = packet[1];
cmd.arguments = packet[2];
command.is_internal = true;
command.executable = packet[2];
command.arguments = packet[3];
}
else {
cmd.executable = packet[1];
cmd.arguments = packet[2];
cmd.interval = lexical_cast<unsigned int>(packet[3]);
cmd.timeout = lexical_cast<unsigned int>(packet[4]);
cmd.do_not_queue = packet[5] == "1";
cmd.add_and_run = packet[6] == "1";
cmd.is_malformed = false;
command.executable = packet[1];
command.arguments = packet[2];
command.interval = lexical_cast<unsigned int>(packet[3]);
command.timeout = lexical_cast<unsigned int>(packet[4]);
command.do_not_queue = packet[5] == "1";
command.add_and_run = packet[6] == "1";
command.is_malformed = false;
}
}

return cmd;
return command;

}

string Protocol::build(const Command &cmd) {
string Protocol::build(const Command &command) {

vector<string> packet;

packet.push_back(PROTOCOL_HEADER);
packet.push_back(cmd.executable);
packet.push_back(cmd.arguments);
packet.push_back(lexical_cast<string>(cmd.interval));
packet.push_back(lexical_cast<string>(cmd.timeout));
packet.push_back(cmd.do_not_queue ? "1" : "0");
packet.push_back(cmd.add_and_run ? "1" : "0");
packet.push_back(command.executable);
packet.push_back(command.arguments);
packet.push_back(lexical_cast<string>(command.interval));
packet.push_back(lexical_cast<string>(command.timeout));
packet.push_back(command.do_not_queue ? "1" : "0");
packet.push_back(command.add_and_run ? "1" : "0");
packet.push_back("");

return join(packet, PROTOCOL_DELIMITER);
Expand Down
2 changes: 1 addition & 1 deletion src/Common/Protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class Protocol {
public:

static Command parse(const string &data);
static string build(const Command &cmd);
static string build(const Command &command);

};

Expand Down
16 changes: 8 additions & 8 deletions src/Plugin/Client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,13 @@ string Client::request(const string &interpreter, const string &arguments, unsig

using boost::asio::ip::udp;

Command cmd;
cmd.executable = interpreter;
cmd.arguments = arguments;
cmd.interval = interval;
cmd.timeout = timeout;
cmd.do_not_queue = do_not_queue;
cmd.add_and_run = _add_and_run;
Command command;
command.executable = interpreter;
command.arguments = arguments;
command.interval = interval;
command.timeout = timeout;
command.do_not_queue = do_not_queue;
command.add_and_run = _add_and_run;

string buffer("");

Expand All @@ -111,7 +111,7 @@ string Client::request(const string &interpreter, const string &arguments, unsig
udp::socket socket(io_service);
socket.open(udp::v4());

string data = Protocol::build(cmd);
string data = Protocol::build(command);
const char* send_buf = data.c_str();
socket.send_to(boost::asio::buffer(send_buf, data.size()), receiver_endpoint);

Expand Down
2 changes: 1 addition & 1 deletion src/Plugin/Plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,6 @@ __stdcall function3(char *param1, char *param2)
extern "C" DLLEXPORT char *
__stdcall function20(char *param1, char *param2)
{
return "cgi.dll is part of CGI4LCD version 0.1 by MendelGusmao github.com/MendelGusmao";
return "cgi.dll is part of CGI4LCD version 0.2 by MendelGusmao github.com/MendelGusmao";
}

102 changes: 70 additions & 32 deletions src/Server/Queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,41 +5,43 @@

using boost::lexical_cast;

Queue::Queue(boost::asio::io_service& io_service) :
_timer(io_service, boost::posix_time::seconds(1)) {
Queue::Queue(boost::asio::io_service& io_service, unsigned int max_threads) :
_timer(io_service, boost::posix_time::seconds(1)),
_max_threads(max_threads),
_running_threads(0) {

_timer.async_wait(boost::bind(&Queue::run, this));
_timer.async_wait(boost::bind(&Queue::process, this));

}

void Queue::add(Command &cmd) {
void Queue::add(Command &command) {

map<string, Command>::iterator it = _commands.find(cmd.line());
map<string, Command>::iterator it = _commands.find(command.line());
time_t now;
time(&now);

if (it == _commands.end()) {
cmd.response = "";
cmd.last_request = now;
command.response = "";
command.last_request = now;

_commands[cmd.line()] = cmd;
_commands[command.line()] = command;

if (cmd.add_and_run) {
_commands[cmd.line()].run();
if (command.add_and_run) {
run(command);
}
}
else {
_commands[cmd.line()].last_request = now;
_commands[command.line()].last_request = now;
}
}

void Queue::run() {
void Queue::process() {

map<string, Command>::iterator it;
Command cmd;
Command command;

_timer.expires_at(_timer.expires_at() + boost::posix_time::seconds(1));
_timer.async_wait(boost::bind(&Queue::run, this));
_timer.async_wait(boost::bind(&Queue::process, this));

#ifdef DEBUG
int queue_size = _commands.size();
Expand All @@ -51,37 +53,73 @@ void Queue::run() {
#endif

for (it = _commands.begin(); it != _commands.end(); ++it) {
cmd = it->second;
command = it->second;

time_t now;
time(&now);

echo("Command '" << cmd.line() << "'");
echo("Cleanup Time: " << cmd.last_request << " + " << cmd.timeout);
echo("Next Execution: " << cmd.last_execution << " + " << cmd.interval);
echo("Cached Response: '" << cmd.response << "'");
echo("Command '" << command.line() << "'");
echo("Cleanup Time: " << command.last_request << " + " << command.timeout);
echo("Next Execution: " << command.last_execution << " + " << command.interval);
echo("Cached Response: '" << command.response << "'");

if (now >= cmd.last_request + cmd.timeout) {
echo("Erasing '" << cmd.line() << "'");
if (now >= command.last_request + command.timeout) {
echo("Erasing '" << command.line() << "'");

_commands.erase(it);
continue;
break;
}
else if (now >= cmd.last_execution + cmd.interval) {
echo("Running '" << cmd.line() << "'");

cmd.run();
time(&cmd.last_execution);

echo(cmd.response);

else if (_running_threads < _max_threads && command.is_running == false && now >= command.last_execution + command.interval) {
boost::thread runner(boost::bind(&Queue::run, this, command));
}

_commands[cmd.line()] = cmd;
_commands[command.line()] = command;
}

}

Command Queue::get(const string &line) {
return _commands[line];
}
}

void Queue::run(Command &command) {

char psBuffer[128];
FILE *iopipe;

echo("Running '" << command.line() << "'");

++_running_threads;

if (!command.do_not_queue) {
_commands[command.line()].is_running = true;
}

iopipe = _popen(command.line().c_str(), "r");

if (iopipe == NULL) {
command.response = "[CGI4LCD] Error running...";
}
else {
string response = "";

while(!feof(iopipe)) {
if(fgets(psBuffer, 128, iopipe) != NULL) {
response += string(psBuffer);
}
}

_pclose(iopipe);
command.response = response;
}

echo("Runner response: '" << command.response << "'");

if (!command.do_not_queue) {
_commands[command.line()].is_running = false;
_commands[command.line()].response = command.response;
time(&_commands[command.line()].last_execution);
}

--_running_threads;
}
9 changes: 6 additions & 3 deletions src/Server/Queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,18 @@ class Queue {

public:

Queue(boost::asio::io_service& io_service);
void add(Command &cmd);
void run();
Queue(boost::asio::io_service& io_service, unsigned int max_threads);
void add(Command &command);
void process();
void run(Command &command);
Command get(const string& line);

private:

boost::asio::deadline_timer _timer;
map<string, Command> _commands;
unsigned int _max_threads;
unsigned int _running_threads;

};

Expand Down
Loading

0 comments on commit f903355

Please sign in to comment.