Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sequencer class with lambda response message #2

Open
wants to merge 38 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
b30be88
Sequencer class with lambda response message
DanielaMurin Aug 21, 2024
d0369a2
Add Sequencer to sendGetResponse
DanielaMurin Aug 21, 2024
2caeedc
Ring Buffer - Phase 1: The main thread push to the ring buffer and th…
shiraez Aug 21, 2024
98c6c89
Remove singleton & fix compilation
DanielaMurin Aug 22, 2024
78ec4a7
Fix merge issues
DanielaMurin Aug 22, 2024
65f94df
add debug info + enable thread
shiraez Aug 25, 2024
8523a2a
add delay
shiraez Aug 25, 2024
0ddc1ec
debug single ring buffer & enable sequencer
DanielaMurin Aug 25, 2024
cc73b2e
Add new debug file & move sequencer alloc to processEvent
DanielaMurin Aug 26, 2024
c1b916a
fix reset sequencer logic
DanielaMurin Aug 27, 2024
8d0feae
Take out lambda execute from within lock
DanielaMurin Aug 27, 2024
efc1bc9
add loger
shiraez Aug 27, 2024
da6d6b9
Merge branch 'syncd_multithread' of https://github.com/Marvell-switch…
shiraez Aug 27, 2024
5e2e15c
Working single ring buffer & sequencer
DanielaMurin Aug 27, 2024
fdf56c9
Merge branch 'syncd_multithread' of https://github.com/Marvell-switch…
shiraez Aug 27, 2024
d3fddf3
add logger to syncd + sequencer
shiraez Aug 27, 2024
9b01706
Return m_switches and m_mdioIpcServer logic
DanielaMurin Aug 27, 2024
7a14462
fix Logger
shiraez Aug 28, 2024
b4d0a06
Merge branch 'syncd_multithread' of https://github.com/Marvell-switch…
shiraez Aug 28, 2024
04eb362
add logger.h file
shiraez Aug 28, 2024
00b8ade
Update sequencer statistics/mux/logging
DanielaMurin Aug 29, 2024
94005f8
Merge conflict fix, remove old logger code
DanielaMurin Aug 29, 2024
3f121a3
enable flag to the logger
shiraez Aug 29, 2024
7a661ce
Return diag shell thread, improve lambda func readability
DanielaMurin Aug 29, 2024
8ea0eb6
Allocate sequence number inside functions
DanielaMurin Aug 29, 2024
9908e23
add multiRing buffer support & send response for all ops
aviramd Aug 29, 2024
328867e
fix find operation group by string
aviramd Aug 29, 2024
762ae3b
fix find ring buffer by op or objectType
aviramd Aug 30, 2024
a5d4802
fix sendApiResponse objectCount param on processBulkQuadEventInInitVi…
aviramd Aug 31, 2024
b10bf7d
removing unnecessary code
shiraez Sep 1, 2024
0225abc
Fix send responde sync for nom get actions
aviramd Sep 1, 2024
6cf0e13
create muliple threads, cleanup, change response seq order
aviramd Sep 1, 2024
a2d711e
Sequencer update & Cleanup
DanielaMurin Sep 1, 2024
4c72076
Sequencer update - ring buffer instead of map
DanielaMurin Sep 2, 2024
c5e511a
fix bug when redis reads empty key
aviramd Sep 3, 2024
db71e99
REVERT - Sequencer update - ring buffer instead of map
DanielaMurin Sep 3, 2024
d52b04e
protected RedisClient + RedisSelectableChannel
shiraez Sep 3, 2024
cad2dcb
remove LOCK from loop wrap it instead , add thread id prints, 3 threads
aviramd Sep 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 64 additions & 20 deletions meta/RedisSelectableChannel.cpp
Original file line number Diff line number Diff line change
@@ -1,18 +1,37 @@
#include "RedisSelectableChannel.h"

#include "swss/logger.h"
#include "syncd/Logger.h"

using namespace sairedis;

#define MY_LOCK() \
if(m_protected) \
{ \
LogToModuleFile("1", "before MY_LOCK()"); \
m_mutex->lock(); \
LogToModuleFile("1", "after MY_LOCK()"); \
}

#define MY_UNLOCK() \
if(m_protected) \
{ \
LogToModuleFile("1", "before MY_UNLOCK()"); \
m_mutex->unlock(); \
LogToModuleFile("1", "after MY_UNLOCK()"); \
}

RedisSelectableChannel::RedisSelectableChannel(
_In_ std::shared_ptr<swss::DBConnector> dbAsic,
_In_ const std::string& asicStateTable,
_In_ const std::string& getResponseTable,
_In_ const std::string& tempPrefix,
_In_ bool modifyRedis):
_In_ bool modifyRedis,
_In_ std::shared_ptr<std::mutex> t_mutex):
m_dbAsic(dbAsic),
m_tempPrefix(tempPrefix),
m_modifyRedis(modifyRedis)
m_modifyRedis(modifyRedis),
m_mutex(t_mutex)
{
SWSS_LOG_ENTER();

Expand All @@ -29,13 +48,23 @@ RedisSelectableChannel::RedisSelectableChannel(
m_getResponse = std::make_shared<swss::ProducerTable>(m_dbAsic.get(), getResponseTable);

SWSS_LOG_NOTICE("opened redis channel");
if(t_mutex != NULL)
{
m_protected = true;
}
else
{
m_protected = false;
}
}

bool RedisSelectableChannel::empty()
{
SWSS_LOG_ENTER();

return m_asicState->empty();
MY_LOCK();
bool ans = m_asicState->empty();
MY_UNLOCK();
return ans;
}

void RedisSelectableChannel::set(
Expand All @@ -44,16 +73,17 @@ void RedisSelectableChannel::set(
_In_ const std::string& op)
{
SWSS_LOG_ENTER();

MY_LOCK();
m_getResponse->set(key, values, op);
MY_UNLOCK();
}

void RedisSelectableChannel::pop(
_Out_ swss::KeyOpFieldsValuesTuple& kco,
_In_ bool initViewMode)
{
SWSS_LOG_ENTER();

MY_LOCK();
if (initViewMode)
{
m_asicState->pop(kco, m_tempPrefix);
Expand All @@ -62,55 +92,69 @@ void RedisSelectableChannel::pop(
{
m_asicState->pop(kco);
}
MY_UNLOCK();
}

// Selectable overrides

int RedisSelectableChannel::getFd()
{
SWSS_LOG_ENTER();

return m_asicState->getFd();
MY_LOCK();
int ans = m_asicState->getFd();
MY_UNLOCK();
return ans;
}

uint64_t RedisSelectableChannel::readData()
{
SWSS_LOG_ENTER();

return m_asicState->readData();
MY_LOCK();
uint64_t ans = m_asicState->readData();
MY_UNLOCK();
return ans;
}

bool RedisSelectableChannel::hasData()
{
SWSS_LOG_ENTER();

return m_asicState->hasData();
MY_LOCK();
bool ans = m_asicState->hasData();
MY_UNLOCK();
return ans;
}

bool RedisSelectableChannel::hasCachedData()
{
SWSS_LOG_ENTER();

return m_asicState->hasCachedData();
MY_LOCK();
bool ans = m_asicState->hasCachedData();
MY_UNLOCK();
return ans;
}

bool RedisSelectableChannel::initializedWithData()
{
SWSS_LOG_ENTER();

return m_asicState->initializedWithData();
MY_LOCK();
bool ans = m_asicState->initializedWithData();
MY_UNLOCK();
return ans;
}

void RedisSelectableChannel::updateAfterRead()
{
SWSS_LOG_ENTER();

return m_asicState->updateAfterRead();
MY_LOCK();
m_asicState->updateAfterRead();
MY_UNLOCK();
}

int RedisSelectableChannel::getPri() const
{
SWSS_LOG_ENTER();

return m_asicState->getPri();
MY_LOCK();
auto ans = m_asicState->getPri();
MY_UNLOCK();
return ans;
}
8 changes: 7 additions & 1 deletion meta/RedisSelectableChannel.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include "swss/consumertable.h"
#include "swss/producertable.h"
#include <mutex>

namespace sairedis
{
Expand All @@ -17,7 +18,8 @@ namespace sairedis
_In_ const std::string& asicStateTable,
_In_ const std::string& getResponseTable,
_In_ const std::string& tempPrefix,
_In_ bool modifyRedis);
_In_ bool modifyRedis,
_In_ std::shared_ptr<std::mutex> mutex = nullptr);

virtual ~RedisSelectableChannel() = default;

Expand Down Expand Up @@ -61,5 +63,9 @@ namespace sairedis
std::string m_tempPrefix;

bool m_modifyRedis;

bool m_protected;

std::shared_ptr<std::mutex> m_mutex;
};
}
108 changes: 108 additions & 0 deletions syncd/Logger.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
#ifndef LOGGER_H
#define LOGGER_H

#define MODULE_NAME "syncd"

#include <string>
#include <iostream>
#include <fstream>
#include <sys/stat.h>
#include <chrono>
#include <cstdio>
#include <mutex>
#include <sstream>
#include <iomanip>
#include "fmt/format.h"
#include <ctime>
#include <thread>

#define MAX_LOG_SIZE (10 *50 * 1024) /* 50 KB */
#define ENABLE_LOGGING 1

// Define a mutex for thread safety
static std::mutex logMutex;

// Improved logging function with thread safety
static void writeToLogFile(const std::string& funcName, const std::string& fileNum, const std::string& message) {
// Lock the mutex to ensure thread safety
std::lock_guard<std::mutex> lock(logMutex);

std::string filePath = "/" + std::string(MODULE_NAME) + "_debugLog_" + fileNum + ".txt";
std::string backupFilePath = filePath + ".history";
struct stat fileStat;

// Check if the log file exists and if its size exceeds the maximum limit
if (stat(filePath.c_str(), &fileStat) == 0) {
if (fileStat.st_size > MAX_LOG_SIZE) {
// Remove the old backup file
std::remove(backupFilePath.c_str());
// Rename the current log file to the backup file
if (std::rename(filePath.c_str(), backupFilePath.c_str()) != 0) {
std::cerr << "Error: Could not rename file " << filePath << " to " << backupFilePath << std::endl;
return;
}
}
}

// Open the log file in append mode
std::ofstream logFile(filePath, std::ios_base::app);
if (!logFile.is_open()) {
std::cerr << "Error: Could not open file " << filePath << std::endl;
return;
}

auto now = std::chrono::system_clock::now();
auto in_time_t = std::chrono::system_clock::to_time_t(now);
auto milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()) % 1000;

std::ostringstream oss;
oss << std::put_time(std::localtime(&in_time_t), "%Y-%m-%d %H:%M:%S")
<< '.' << std::setw(3) << std::setfill('0') << milliseconds.count();
std::string formatted_time = oss.str();

std::thread::id this_id = std::this_thread::get_id();

// Write the timestamp, function name, and message to the log file
logFile << formatted_time << " V4 " << this_id << " " << funcName << ": " << message << std::endl;


logFile.close();
}

template<typename... Args>
static void logFormattedMessage(const std::string& funcName, const std::string& fileNum, const std::string& format, Args... messageArgs) {
std::ostringstream oss;

std::string remainingFormat = format;

// Helper function to process a single argument
auto processArg = [&oss, &remainingFormat](const auto& arg) {
size_t pos = remainingFormat.find("{}");
if (pos != std::string::npos) {
oss << remainingFormat.substr(0, pos) << arg;
remainingFormat = remainingFormat.substr(pos + 2);
}
};

// Helper function to recursively process all arguments
auto processAllArgs = [&](auto&&... innerArgs) -> void {
(void)std::initializer_list<int>{(processArg(innerArgs), 0)...};
};

// Process all arguments
processAllArgs(messageArgs...);

// Add any remaining format string
oss << remainingFormat;

// Write the full message to the log file
writeToLogFile(funcName, fileNum, oss.str());
}
#if ENABLE_LOGGING
// Macro for easy logging with formatting
#define LogToModuleFile(fileNum, format, ...) logFormattedMessage(__func__, fileNum, format, ##__VA_ARGS__)
#else
#define LogToModuleFile(fileNum, format, ...)
#endif

#endif // LOGGER_H
3 changes: 2 additions & 1 deletion syncd/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ libSyncd_a_SOURCES = \
WatchdogScope.cpp \
Workaround.cpp \
ZeroMQNotificationProducer.cpp \
syncd_main.cpp
syncd_main.cpp \
Sequencer.cpp

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is ring buffer not here ?


libSyncd_a_CPPFLAGS = $(CODE_COVERAGE_CPPFLAGS)
libSyncd_a_CXXFLAGS = $(DBGFLAGS) $(AM_CXXFLAGS) $(CXXFLAGS_COMMON) $(CODE_COVERAGE_CXXFLAGS)
Expand Down
Loading