Skip to content

Commit

Permalink
feat: streaming delete conntrack entries
Browse files Browse the repository at this point in the history
Signed-off-by: zwtop <[email protected]>
  • Loading branch information
zwtop committed Oct 10, 2022
1 parent 8715fe7 commit ca92ab9
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 21 deletions.
11 changes: 4 additions & 7 deletions conntrack_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,10 @@ func (h *Handle) ConntrackTableFlush(table ConntrackTableType) error {
// ConntrackDeleteFilter deletes entries on the specified table on the base of the filter using the netlink handle passed
// conntrack -D [table] parameters Delete conntrack or expectation
func (h *Handle) ConntrackDeleteFilter(table ConntrackTableType, family InetFamily, filter CustomConntrackFilter) (uint, error) {
res, err := h.dumpConntrackTable(table, family)
if err != nil {
return 0, err
}
req := h.newConntrackRequest(table, family, nl.IPCTNL_MSG_CT_GET, unix.NLM_F_DUMP)

var matched uint
for _, dataRaw := range res {
err := req.ExecuteWithHandleFunc(unix.NETLINK_NETFILTER, 0, func(dataRaw []byte) {
flow := parseRawData(dataRaw)
if match := filter.MatchConntrackFlow(flow); match {
req2 := h.newConntrackRequest(table, family, nl.IPCTNL_MSG_CT_DELETE, unix.NLM_F_ACK)
Expand All @@ -105,9 +102,9 @@ func (h *Handle) ConntrackDeleteFilter(table ConntrackTableType, family InetFami
req2.Execute(unix.NETLINK_NETFILTER, 0)
matched++
}
}
})

return matched, nil
return matched, err
}

func (h *Handle) newConntrackRequest(table ConntrackTableType, family InetFamily, operation, flags int) *nl.NetlinkRequest {
Expand Down
36 changes: 22 additions & 14 deletions nl/nl_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,16 @@ func (req *NetlinkRequest) AddRawData(data []byte) {
// Returns a list of netlink messages in serialized format, optionally filtered
// by resType.
func (req *NetlinkRequest) Execute(sockType int, resType uint16) ([][]byte, error) {
var res [][]byte
var msgHandleFunc = func(msg []byte) { res = append(res, msg) }

err := req.ExecuteWithHandleFunc(sockType, resType, msgHandleFunc)
return res, err
}

// ExecuteWithHandleFunc execute the request against the given sockType.
// Processing messages by msgHandleFunc, optionally filtered by resType.
func (req *NetlinkRequest) ExecuteWithHandleFunc(sockType int, resType uint16, msgHandleFunc func(msg []byte)) error {
var (
s *NetlinkSocket
err error
Expand All @@ -495,18 +505,18 @@ func (req *NetlinkRequest) Execute(sockType int, resType uint16) ([][]byte, erro
if s == nil {
s, err = getNetlinkSocket(sockType)
if err != nil {
return nil, err
return err
}

if err := s.SetSendTimeout(&SocketTimeoutTv); err != nil {
return nil, err
return err
}
if err := s.SetReceiveTimeout(&SocketTimeoutTv); err != nil {
return nil, err
return err
}
if EnableErrorMessageReporting {
if err := s.SetExtAck(true); err != nil {
return nil, err
return err
}
}

Expand All @@ -517,31 +527,29 @@ func (req *NetlinkRequest) Execute(sockType int, resType uint16) ([][]byte, erro
}

if err := s.Send(req); err != nil {
return nil, err
return err
}

pid, err := s.GetPid()
if err != nil {
return nil, err
return err
}

var res [][]byte

done:
for {
msgs, from, err := s.Receive()
if err != nil {
return nil, err
return err
}
if from.Pid != PidKernel {
return nil, fmt.Errorf("Wrong sender portid %d, expected %d", from.Pid, PidKernel)
return fmt.Errorf("Wrong sender portid %d, expected %d", from.Pid, PidKernel)
}
for _, m := range msgs {
if m.Header.Seq != req.Seq {
if sharedSocket {
continue
}
return nil, fmt.Errorf("Wrong Seq nr %d, expected %d", m.Header.Seq, req.Seq)
return fmt.Errorf("Wrong Seq nr %d, expected %d", m.Header.Seq, req.Seq)
}
if m.Header.Pid != pid {
continue
Expand Down Expand Up @@ -578,18 +586,18 @@ done:
}
}

return nil, err
return err
}
if resType != 0 && m.Header.Type != resType {
continue
}
res = append(res, m.Data)
msgHandleFunc(m.Data)
if m.Header.Flags&unix.NLM_F_MULTI == 0 {
break done
}
}
}
return res, nil
return nil
}

// Create a new netlink request from proto and flags
Expand Down

0 comments on commit ca92ab9

Please sign in to comment.