diff --git a/bpf/accesslog/syscalls/connect_conntrack.c b/bpf/accesslog/syscalls/connect_conntrack.c index d7180d50..365c8b74 100644 --- a/bpf/accesslog/syscalls/connect_conntrack.c +++ b/bpf/accesslog/syscalls/connect_conntrack.c @@ -23,7 +23,7 @@ static __always_inline void nf_conntrack_read_in6_addr(__u64 *addr_h, __u64 *add bpf_probe_read(addr_l, sizeof(*addr_l), &in6->s6_addr32[2]); } -static __always_inline int nf_conntrack_tuple_to_conntrack_tuple(conntrack_tuple_t *t, const struct nf_conntrack_tuple *ct) { +static __always_inline int nf_conntrack_tuple_to_conntrack_tuple(struct connect_args_t *connect_args, conntrack_tuple_t *t, const struct nf_conntrack_tuple *ct) { __builtin_memset(t, 0, sizeof(conntrack_tuple_t)); switch (ct->dst.protonum) { @@ -60,6 +60,25 @@ static __always_inline int nf_conntrack_tuple_to_conntrack_tuple(conntrack_tuple return 0; } } + + struct sock *sock = connect_args->sock; + struct socket *tmps = _(sock->sk_socket); + if (tmps != NULL) { + struct sock* s; + BPF_CORE_READ_INTO(&s, tmps, sk); + short unsigned int skc_family; + BPF_CORE_READ_INTO(&skc_family, s, __sk_common.skc_family); + if (skc_family == AF_INET) { + __u16 local_port; + BPF_CORE_READ_INTO(&local_port, s, __sk_common.skc_num); + __u32 local_addr_v4; + BPF_CORE_READ_INTO(&local_addr_v4, s, __sk_common.skc_rcv_saddr); + // make sure connntrack with the same socket address + if (local_addr_v4 != t->daddr_l || local_port != t->dport) { + return 0; + } + } + } return 1; } @@ -73,6 +92,11 @@ static __always_inline int nf_conn_aware(struct pt_regs* ctx, struct nf_conn *ct return 0; } + // already contains the remote address + if (&(connect_args->remote) != NULL) { + return 0; + } + __u32 status; if (bpf_probe_read(&status, sizeof(status), &(ct->status)) != 0) { return 0; // Invalid ct pointer @@ -93,7 +117,7 @@ static __always_inline int nf_conn_aware(struct pt_regs* ctx, struct nf_conn *ct struct nf_conntrack_tuple reply = tuplehash[IP_CT_DIR_REPLY].tuple; conntrack_tuple_t reply_conn = {}; - if (!nf_conntrack_tuple_to_conntrack_tuple(&reply_conn, &reply)) { + if (!nf_conntrack_tuple_to_conntrack_tuple(connect_args, &reply_conn, &reply)) { return 0; } diff --git a/pkg/accesslog/collector/connect.go b/pkg/accesslog/collector/connect.go index 958103aa..f315967d 100644 --- a/pkg/accesslog/collector/connect.go +++ b/pkg/accesslog/collector/connect.go @@ -116,7 +116,7 @@ func (c *ConnectCollector) buildSocketFromConnectEvent(event *events.SocketConne pair, err := ip.ParseSocket(event.PID, event.SocketFD) if err != nil { - connectLogger.Warnf("cannot found the socket, pid: %d, socket FD: %d", event.PID, event.SocketFD) + connectLogger.Debugf("cannot found the socket, pid: %d, socket FD: %d", event.PID, event.SocketFD) return nil } connectLogger.Debugf("found the connection from the socket, connection ID: %d, randomID: %d", diff --git a/pkg/accesslog/collector/protocols/queue.go b/pkg/accesslog/collector/protocols/queue.go index 9c38826e..2a850ce1 100644 --- a/pkg/accesslog/collector/protocols/queue.go +++ b/pkg/accesslog/collector/protocols/queue.go @@ -19,11 +19,14 @@ package protocols import ( "context" + "errors" "fmt" "os" "sync" "time" + "github.com/cilium/ebpf" + "github.com/apache/skywalking-rover/pkg/accesslog/common" "github.com/apache/skywalking-rover/pkg/accesslog/events" "github.com/apache/skywalking-rover/pkg/accesslog/forwarder" @@ -212,7 +215,14 @@ func (p *PartitionContext) processEvents() { p.processConnectionEvents(info) // if the connection already closed and not contains any buffer data, then delete the connection - if info.closed && info.dataBuffer.DataLength() == 0 { + bufLen := info.dataBuffer.DataLength() + if bufLen > 0 { + return + } + if !info.closed { + p.checkTheConnectionIsAlreadyClose(info) + } + if info.closed { if info.closeCallback != nil { info.closeCallback() } @@ -225,6 +235,26 @@ func (p *PartitionContext) processEvents() { } } +func (p *PartitionContext) checkTheConnectionIsAlreadyClose(con *PartitionConnection) { + if time.Since(con.lastCheckCloseTime) <= time.Second*30 { + return + } + con.lastCheckCloseTime = time.Now() + var activateConn common.ActiveConnection + if err := p.context.BPF.ActiveConnectionMap.Lookup(con.connectionID, &activateConn); err != nil { + if errors.Is(err, ebpf.ErrKeyNotExist) { + con.closed = true + return + } + log.Warnf("cannot found the active connection: %d-%d, err: %v", con.connectionID, con.randomID, err) + return + } else if activateConn.RandomID != 0 && activateConn.RandomID != con.randomID { + log.Debugf("detect the connection: %d-%d is already closed, so remove from the activate connection", + con.connectionID, con.randomID) + con.closed = true + } +} + func (p *PartitionContext) processExpireEvents() { // the expiry must be mutual exclusion with events processor p.analyzeLocker.Lock() @@ -267,6 +297,7 @@ type PartitionConnection struct { closed bool closeCallback common.ConnectionProcessFinishCallback skipAllDataAnalyze bool + lastCheckCloseTime time.Time } func (p *PartitionConnection) appendDetail(ctx *common.AccessLogContext, detail *events.SocketDetailEvent) { diff --git a/pkg/accesslog/common/connection.go b/pkg/accesslog/common/connection.go index bb64ff83..cf5c5ee9 100644 --- a/pkg/accesslog/common/connection.go +++ b/pkg/accesslog/common/connection.go @@ -574,7 +574,10 @@ func (c *ConnectionManager) OnBuildConnectionLogFinished() { func (c *ConnectionManager) SkipAllDataAnalyze(conID, ranID uint64) { var activateConn ActiveConnection if err := c.activeConnectionMap.Lookup(conID, &activateConn); err != nil { - log.Warnf("cannot found the active connection: %d-%d", conID, ranID) + if errors.Is(err, ebpf.ErrKeyNotExist) { + return + } + log.Warnf("cannot found the active connection: %d-%d, err: %v", conID, ranID, err) return } if activateConn.RandomID != ranID {