Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow timeout timing/v14 #12084

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 38 additions & 6 deletions src/flow-hash.c
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,8 @@ static inline void MoveToWorkQueue(ThreadVars *tv, FlowLookupStruct *fls,
}
}

static inline bool FlowIsTimedOut(const Flow *f, const SCTime_t ts, const bool emerg)
static inline bool FlowIsTimedOut(
const FlowThreadId ftid, const Flow *f, const SCTime_t pktts, const bool emerg)
{
SCTime_t timesout_at;
if (emerg) {
Expand All @@ -840,13 +841,43 @@ static inline bool FlowIsTimedOut(const Flow *f, const SCTime_t ts, const bool e
} else {
timesout_at = SCTIME_ADD_SECS(f->lastts, f->timeout_policy);
}
/* do the timeout check */
if (SCTIME_CMP_LT(ts, timesout_at)) {
return false;
/* if time is live, we just use the pktts */
if (TimeModeIsLive()) {
if (SCTIME_CMP_LT(pktts, timesout_at)) {
return false;
}
} else {
if (ftid == f->thread_id[0] || f->thread_id[0] == 0) {
/* do the timeout check */
if (SCTIME_CMP_LT(pktts, timesout_at)) {
return false;
}
Comment on lines +845 to +854
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We seem to do the same thing in both the blocks? Can we merge them? Then it becomes similar to flow manager..

} else {
SCTime_t checkts = TmThreadsGetThreadTime(f->thread_id[0]);
/* do the timeout check */
if (SCTIME_CMP_LT(checkts, timesout_at)) {
return false;
}
}
}
return true;
}

static inline uint16_t GetTvId(const ThreadVars *tv)
{
uint16_t tv_id;
#ifdef UNITTESTS
if (RunmodeIsUnittests()) {
tv_id = 0;
} else {
tv_id = (uint16_t)tv->id;
}
#else
tv_id = (uint16_t)tv->id;
#endif
return tv_id;
}

/** \brief Get Flow for packet
*
* Hash retrieval function for flows. Looks up the hash bucket containing the
Expand Down Expand Up @@ -898,6 +929,7 @@ Flow *FlowGetFlowFromHash(ThreadVars *tv, FlowLookupStruct *fls, Packet *p, Flow
return f;
}

const uint16_t tv_id = GetTvId(tv);
const bool emerg = (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) != 0;
const uint32_t fb_nextts = !emerg ? SC_ATOMIC_GET(fb->next_ts) : 0;
/* ok, we have a flow in the bucket. Let's find out if it is our flow */
Expand All @@ -906,8 +938,8 @@ Flow *FlowGetFlowFromHash(ThreadVars *tv, FlowLookupStruct *fls, Packet *p, Flow
do {
Flow *next_f = NULL;
FLOWLOCK_WRLOCK(f);
const bool timedout =
(fb_nextts <= (uint32_t)SCTIME_SECS(p->ts) && FlowIsTimedOut(f, p->ts, emerg));
const bool timedout = (fb_nextts <= (uint32_t)SCTIME_SECS(p->ts) &&
FlowIsTimedOut(tv_id, f, p->ts, emerg));
if (timedout) {
next_f = f->next;
MoveToWorkQueue(tv, fls, fb, f, prev_f);
Expand Down