Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sequencer class with lambda response message #2

Open
wants to merge 38 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
b30be88
Sequencer class with lambda response message
DanielaMurin Aug 21, 2024
d0369a2
Add Sequencer to sendGetResponse
DanielaMurin Aug 21, 2024
2caeedc
Ring Buffer - Phase 1: The main thread push to the ring buffer and th…
shiraez Aug 21, 2024
98c6c89
Remove singleton & fix compilation
DanielaMurin Aug 22, 2024
78ec4a7
Fix merge issues
DanielaMurin Aug 22, 2024
65f94df
add debug info + enable thread
shiraez Aug 25, 2024
8523a2a
add delay
shiraez Aug 25, 2024
0ddc1ec
debug single ring buffer & enable sequencer
DanielaMurin Aug 25, 2024
cc73b2e
Add new debug file & move sequencer alloc to processEvent
DanielaMurin Aug 26, 2024
c1b916a
fix reset sequencer logic
DanielaMurin Aug 27, 2024
8d0feae
Take out lambda execute from within lock
DanielaMurin Aug 27, 2024
efc1bc9
add loger
shiraez Aug 27, 2024
da6d6b9
Merge branch 'syncd_multithread' of https://github.com/Marvell-switch…
shiraez Aug 27, 2024
5e2e15c
Working single ring buffer & sequencer
DanielaMurin Aug 27, 2024
fdf56c9
Merge branch 'syncd_multithread' of https://github.com/Marvell-switch…
shiraez Aug 27, 2024
d3fddf3
add logger to syncd + sequencer
shiraez Aug 27, 2024
9b01706
Return m_switches and m_mdioIpcServer logic
DanielaMurin Aug 27, 2024
7a14462
fix Logger
shiraez Aug 28, 2024
b4d0a06
Merge branch 'syncd_multithread' of https://github.com/Marvell-switch…
shiraez Aug 28, 2024
04eb362
add logger.h file
shiraez Aug 28, 2024
00b8ade
Update sequencer statistics/mux/logging
DanielaMurin Aug 29, 2024
94005f8
Merge conflict fix, remove old logger code
DanielaMurin Aug 29, 2024
3f121a3
enable flag to the logger
shiraez Aug 29, 2024
7a661ce
Return diag shell thread, improve lambda func readability
DanielaMurin Aug 29, 2024
8ea0eb6
Allocate sequence number inside functions
DanielaMurin Aug 29, 2024
9908e23
add multiRing buffer support & send response for all ops
aviramd Aug 29, 2024
328867e
fix find operation group by string
aviramd Aug 29, 2024
762ae3b
fix find ring buffer by op or objectType
aviramd Aug 30, 2024
a5d4802
fix sendApiResponse objectCount param on processBulkQuadEventInInitVi…
aviramd Aug 31, 2024
b10bf7d
removing unnecessary code
shiraez Sep 1, 2024
0225abc
Fix send responde sync for nom get actions
aviramd Sep 1, 2024
6cf0e13
create muliple threads, cleanup, change response seq order
aviramd Sep 1, 2024
a2d711e
Sequencer update & Cleanup
DanielaMurin Sep 1, 2024
4c72076
Sequencer update - ring buffer instead of map
DanielaMurin Sep 2, 2024
c5e511a
fix bug when redis reads empty key
aviramd Sep 3, 2024
db71e99
REVERT - Sequencer update - ring buffer instead of map
DanielaMurin Sep 3, 2024
d52b04e
protected RedisClient + RedisSelectableChannel
shiraez Sep 3, 2024
cad2dcb
remove LOCK from loop wrap it instead , add thread id prints, 3 threads
aviramd Sep 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions proxylib/Proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1080,6 +1080,7 @@ void Proxy::updateAttributteNotificationPointers(
{
SWSS_LOG_ENTER();

SWSS_LOG_NOTICE("sai_metadata_update_attribute_notification_pointers from Proxy::updateAttributteNotificationPointers");
sai_metadata_update_attribute_notification_pointers(&m_sn, count, attr_list);
}

Expand Down
1 change: 1 addition & 0 deletions saiplayer/SaiPlayer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -946,6 +946,7 @@ void SaiPlayer::update_notifications_pointers(
* need to override them after create, and after set.
*/

SWSS_LOG_NOTICE("sai_metadata_update_attribute_notification_pointers from SaiPlayer::update_notifications_pointers");
sai_metadata_update_attribute_notification_pointers(&m_switchNotifications, attr_count, attr_list);
}

Expand Down
106 changes: 106 additions & 0 deletions syncd/Logger.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
#ifndef LOGGER_H
#define LOGGER_H

#define MODULE_NAME "syncd"

#include <string>
#include <iostream>
#include <fstream>
#include <sys/stat.h>
#include <chrono>
#include <cstdio>
#include <mutex>
#include <sstream>
#include <iomanip>
#include "fmt/format.h"
#include <ctime>

#define MAX_LOG_SIZE (50 * 1024) /* 50 KB */
#define ENABLE_LOGGING 1

// Define a mutex for thread safety
static std::mutex logMutex;

// Improved logging function with thread safety
static void writeToLogFile(const std::string& funcName, const std::string& fileNum, const std::string& message) {
// Lock the mutex to ensure thread safety
std::lock_guard<std::mutex> lock(logMutex);

std::string filePath = "/" + std::string(MODULE_NAME) + "_debugLog_" + fileNum + ".txt";
std::string backupFilePath = filePath + ".history";
struct stat fileStat;

// Check if the log file exists and if its size exceeds the maximum limit
if (stat(filePath.c_str(), &fileStat) == 0) {
if (fileStat.st_size > MAX_LOG_SIZE) {
// Remove the old backup file
std::remove(backupFilePath.c_str());
// Rename the current log file to the backup file
if (std::rename(filePath.c_str(), backupFilePath.c_str()) != 0) {
std::cerr << "Error: Could not rename file " << filePath << " to " << backupFilePath << std::endl;
return;
}
}
}

// Open the log file in append mode
std::ofstream logFile(filePath, std::ios_base::app);
if (!logFile.is_open()) {
std::cerr << "Error: Could not open file " << filePath << std::endl;
return;
}

auto now = std::chrono::system_clock::now();
auto in_time_t = std::chrono::system_clock::to_time_t(now);
auto milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()) % 1000;

std::ostringstream oss;
oss << std::put_time(std::localtime(&in_time_t), "%Y-%m-%d %H:%M:%S")
<< '.' << std::setw(3) << std::setfill('0') << milliseconds.count();
std::string formatted_time = oss.str();

// Write the timestamp, function name, and message to the log file
logFile << formatted_time << " " << funcName << ": " << message << std::endl;


logFile.close();
}

template<typename... Args>
static void logFormattedMessage(const std::string& funcName, const std::string& fileNum, const std::string& format, Args... messageArgs) {
std::ostringstream oss;
oss << funcName << ": ";

std::string remainingFormat = format;

// Helper function to process a single argument
auto processArg = [&oss, &remainingFormat](const auto& arg) {
size_t pos = remainingFormat.find("{}");
if (pos != std::string::npos) {
oss << remainingFormat.substr(0, pos) << arg;
remainingFormat = remainingFormat.substr(pos + 2);
}
};

// Helper function to recursively process all arguments
auto processAllArgs = [&](auto&&... innerArgs) -> void {
(void)std::initializer_list<int>{(processArg(innerArgs), 0)...};
};

// Process all arguments
processAllArgs(messageArgs...);

// Add any remaining format string
oss << remainingFormat;

// Write the full message to the log file
writeToLogFile(funcName, fileNum, oss.str());
}
#if ENABLE_LOGGING
// Macro for easy logging with formatting
#define LogToModuleFile(fileNum, format, ...) logFormattedMessage(__func__, fileNum, format, ##__VA_ARGS__)
#else
#define LogToModuleFile(fileNum, format, ...)
#endif

#endif // LOGGER_H
3 changes: 2 additions & 1 deletion syncd/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ libSyncd_a_SOURCES = \
WatchdogScope.cpp \
Workaround.cpp \
ZeroMQNotificationProducer.cpp \
syncd_main.cpp
syncd_main.cpp \
Sequencer.cpp

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is ring buffer not here ?


libSyncd_a_CPPFLAGS = $(CODE_COVERAGE_CPPFLAGS)
libSyncd_a_CXXFLAGS = $(DBGFLAGS) $(AM_CXXFLAGS) $(CXXFLAGS_COMMON) $(CODE_COVERAGE_CXXFLAGS)
Expand Down
1 change: 1 addition & 0 deletions syncd/NotificationHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ void NotificationHandler::updateNotificationsPointers(
* Also notice that we are using the same pointers for ALL switches.
*/

SWSS_LOG_NOTICE("sai_metadata_update_attribute_notification_pointers from NotificationHandler::updateNotificationsPointers");
sai_metadata_update_attribute_notification_pointers(&m_switchNotifications, attr_count, attr_list);
}

Expand Down
175 changes: 175 additions & 0 deletions syncd/Sequencer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
#include "Sequencer.h"
#include "swss/logger.h"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add under ifdef


using namespace syncd;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this needed ?


// Helper function to execute all ready responses in order
// check how many consecutive responses were executed in sucession and log it
Sequencer::SequenceStatus Sequencer::executeReadyResponses() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should not run under lock since block new ecxcute ... ?
but need to think if we lock each iteration - two treads can run in parallel ...


std::string logMsg;
SequenceStatus status = FAILURE;

logMsg = "multithreaded: Checking for ready responses in queue... \n";

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add under debug flag or under if def


while (true) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

condeider adding while not exit thread like in ring buffer

// Check if the next sequence number is in the map
auto seq_data = responses.find(next_seq_to_send);
if (seq_data == responses.end()) {
logMsg += "multithreaded: No next sequence found in queue \n";

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add under debug flag or under if def

status = SUCCESS;
break; // Exit loop if the next sequence is not in the map
}

// Execute the stored lambda
auto func = seq_data->second;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we use std::move ?

if(func) {
func();
logMsg += "multithreaded: Executing lambda with seq: " + std::to_string(next_seq_to_send) + " \n";
status = NULL_PTR;
}
else {
logMsg += "multithreaded: response lambda is null \n";

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add under debug flag or under if def

num_of_null_functions++;
status = SUCCESS; //?????
}

// Increment the number of executed tasks in sequence
max_num_of_executed_tasks_in_sequence++;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this count the total exec in task which is also important
for mac in sequence need to zero out every time we have and compare to max , can also have the max in last session ...


// Safely erase the entry
responses.erase(seq_data);

// Increment the sequence number
++next_seq_to_send;

logMsg += "multithreaded: Next sequence found! Executed lambda with seq: " + std::to_string(next_seq_to_send) + " \n";

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add under debug flag or under if def


if (next_seq_to_send >= MAX_SEQUENCE_NUMBER) {
logMsg += "multithreaded: Resetting next sequence number to send needs to be reset to avoid overflow \n";
next_seq_to_send = 0;
}
}

LogToModuleFile("1", logMsg);
return status;
}

// Get sequence number
// if sequencer is full, reset the sequence number to avoid overflow
// wait for, throw full failure
// return if there is an issue or success (0-succss, 1-failure)
// add private param to see how many lambdas are stored,
// for each function, return a status code
// mux on everything
Sequencer::SequenceStatus Sequencer::allocateSequenceNumber(int *seq_num) {
std::unique_lock<std::mutex> lock(mtx);
std::string logMsg;
SequenceStatus status = FAILURE;

if(isFull()) {
logMsg = "multithreaded: Sequencer is full, cannot allocate sequence number" + std::to_string(current_seq);
LogToModuleFile("1", logMsg);
*seq_num = INVALID_SEQUENCE_NUMBER;
return BUFFER_OVERFLOW;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

strange return status
is this legitimate case ?
what to do ?

  1. wait untill gte free sequence
  2. retunr full and have application handle
    what does ring buffre do ?

}

// update recieved param
*seq_num = current_seq;
// increment the sequence number
current_seq++;

logMsg = "multithreaded: allocate seq num: " + std::to_string(*seq_num) + ", ";

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add under debug flag or under if def


// reset number to avoid overflow
if (current_seq >= MAX_SEQUENCE_NUMBER) {
logMsg += "multithreaded: Resetting allocated sequence number to avoid overflow, ";

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add under debug flag or under if def

current_seq = 0;
}

status = SUCCESS;

LogToModuleFile("1", logMsg);

return status;
}

// Add/Execute sequence function
Sequencer::SequenceStatus Sequencer::executeFuncInSequence(int seq, std::function<void()> response_lambda) {

std::unique_lock<std::mutex> lock(mtx);
std::string logMsg;
SequenceStatus status = FAILURE;

if (seq == next_seq_to_send) {
// If the received sequence is the next one to send, execute it immediately
logMsg = "multithreaded: executing reseponse lambda, seq num: " + std::to_string(seq) + " \n";

// execute response lambda
if(response_lambda) {
response_lambda();
logMsg += "multithreaded: execute response lambda \n";
status = SUCCESS;
}
else {
logMsg += "multithreaded: response lambda is null \n";
num_of_null_functions++;
status = SUCCESS; //NULL_PTR; ???
}

// increment the number of executed tasks in sequence
max_num_of_executed_tasks_in_sequence++;

// Increment the next sequence to send
++next_seq_to_send;

// reset number to avoid overflow
if (next_seq_to_send >= MAX_SEQUENCE_NUMBER) {
logMsg += "multithreaded: Resetting next sequence number to send needs to be reset to avoid overflow \n";
next_seq_to_send = 0;
}

// Continue sending any subsequent responses that are ready
status = executeReadyResponses();
} else {
// If the sequence is not the next to send, store it in the map
responses[seq] = response_lambda;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we use std::move ?

logMsg = "multithreaded: storing lambda with seq: " + std::to_string(seq) + ", next to send: " + std::to_string(next_seq_to_send) + " \n";

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add under debug flag or under if def

status = SUCCESS;
num_of_out_of_sequence_functions++;
}

LogToModuleFile("1", logMsg);
return status;
}

Sequencer::SequenceStatus Sequencer::showStatistics() {
std::unique_lock<std::mutex> lock(mtx);
std::string logMsg = "STATISTICS: \n";
logMsg = "multithreaded: max number of executed tasks in sequence: " + std::to_string(max_num_of_executed_tasks_in_sequence) + " \n";
logMsg += "multithreaded: number of null functions: " + std::to_string(num_of_null_functions) + " \n";
logMsg += "multithreaded: number of out of sequence functions: " + std::to_string(num_of_out_of_sequence_functions) + " \n";
logMsg += std::to_string(current_seq) + " out of " + std::to_string(max_seq_num) + "used";
LogToModuleFile("1", logMsg);
return SUCCESS;
}

Sequencer::SequenceStatus Sequencer::clearStatistics() {
std::unique_lock<std::mutex> lock(mtx);
max_num_of_executed_tasks_in_sequence = 0;
num_of_null_functions = 0;
num_of_out_of_sequence_functions = 0;
LogToModuleFile("1", "CLEANED STATISTICS \n");
return SUCCESS;
}

bool Sequencer::isFull() {
if(responses.size() < max_seq_num) {
LogToModuleFile("1", "is not full");
return false;
}
else {
LogToModuleFile("1", "is full");
return true;
}
}
71 changes: 71 additions & 0 deletions syncd/Sequencer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#ifndef SEQUENCER_H
#define SEQUENCER_H

#include <map>
#include <mutex>
#include <iostream>
#include <functional>
#include <chrono>
#include <limits>


#include <string>
#include <fstream>
#include <sys/stat.h>
#include <cstdio>
#include "Logger.h"

namespace syncd {

#define MAX_SEQUENCE_NUMBER 1024
#define INVALID_SEQUENCE_NUMBER std::numeric_limits<int>::min() // allow user to choose
using AnyTask = std::function<void()>;

class Sequencer {
public:
// Public constructor
Sequencer() : current_seq(0), next_seq_to_send(0), max_seq_num(MAX_SEQUENCE_NUMBER), max_num_of_executed_tasks_in_sequence(0), num_of_null_functions(0), num_of_out_of_sequence_functions(0) {}

// Public destructor
~Sequencer() {}

enum SequenceStatus {
FAILURE = -1,
SUCCESS = 0,
BUFFER_OVERFLOW = 1,
NULL_PTR = 2,
};

// Get sequence number
SequenceStatus allocateSequenceNumber(int *seq_num);

// Add/Execute sequence function
SequenceStatus executeFuncInSequence(int seq, std::function<void()> response_lambda);

private:
// Watchdog function to monitor inactivity
SequenceStatus performWatchdogCheck();

// Helper function to execute all ready responses in order
SequenceStatus executeReadyResponses();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be protected ?


SequenceStatus showStatistics();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it is private who can call it ??


SequenceStatus clearStatistics();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it is private who can call it ??


bool isFull();

int current_seq; // Tracks the latest sequence number assigned to a task
int next_seq_to_send; // The next sequence number that should be sent
long unsigned int max_seq_num; // The maximum sequence number
int max_num_of_executed_tasks_in_sequence; // The maximum number of executed tasks in sequence
int num_of_null_functions; // The number of null functions
int num_of_out_of_sequence_functions; // The number of out of sequence functions
std::mutex mtx; // Protects shared data
std::map<int, std::function<void()>> responses; // Stores responses by sequence number
};

}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aviramd need to add show stats to dump ....
or print to log every x entries ...
also if detect "error" condition also get stats ...
need to check if stats sum up correctly e.g. totatl allocate = store + excutetd


#endif // SEQUENCER_H
1 change: 1 addition & 0 deletions syncd/SingleReiniter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include <unistd.h>
#include <inttypes.h>
//#include "Logger.h"

using namespace syncd;
using namespace saimeta;
Expand Down
Loading