Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GoFlow stop fail bug #775

Open
jxplus opened this issue Aug 2, 2024 · 1 comment
Open

GoFlow stop fail bug #775

jxplus opened this issue Aug 2, 2024 · 1 comment

Comments

@jxplus
Copy link

jxplus commented Aug 2, 2024

Describe the bug
this goroutine will never stop,because flow.FlowRoutine also has an Infinite loop。
see codes,

  1. operator/builtin/input/goflow/goflow.go:Start() lin109
func (n *GoflowInput) Start() error {
	n.ctx, n.cancel = context.WithCancel(context.Background())

	go func() {
		...
		for {
			n.Infof("Starting Goflow on %s:%d in %s mode", n.address, n.port, n.mode)
			switch n.mode {
			case modeSflow:
				flow := &utils.StateSFlow{Transport: n, Logger: n}
				goflowErr = flow.FlowRoutine(n.workers, n.address, n.port, reuse)
			case modeNetflowV5:
				flow := &utils.StateNFLegacy{Transport: n, Logger: n}
				goflowErr = flow.FlowRoutine(n.workers, n.address, n.port, reuse)
			case modeNetflowIPFIX:
				flow := &utils.StateNetFlow{Transport: n, Logger: n}
				goflowErr = flow.FlowRoutine(n.workers, n.address, n.port, reuse)
			}
                        
                        // never exec
			select {
			case <-n.ctx.Done():
				return
			default:
			}
                       ...
		}
	}()

	return nil
}`

2. C:/Users/Administrator/go/pkg/mod/github.com/observiq/goflow/[email protected]/utils/utils.go:256
```func UDPRoutine(name string, decodeFunc decoder.DecoderFunc, workers int, addr string, port int, sockReuse bool, logger Logger) error {
	ecb := DefaultErrorCallback{
		Logger: logger,
	}
       ...
       // this also is a ifinite loop
	for {
		size, pktAddr, _ := udpconn.ReadFromUDP(payload)
		payloadCut := make([]byte, size)
		copy(payloadCut, payload[0:size])

		baseMessage := BaseMessage{
			Src:     pktAddr.IP,
			Port:    pktAddr.Port,
			Payload: payloadCut,
		}
		processor.ProcessMessage(baseMessage)

		MetricTrafficBytes.With(
			prometheus.Labels{
				"remote_ip":   pktAddr.IP.String(),
				"remote_port": strconv.Itoa(pktAddr.Port),
				"local_ip":    localIP,
				"local_port":  strconv.Itoa(addrUDP.Port),
				"type":        name,
			}).
			Add(float64(size))
		MetricTrafficPackets.With(
			prometheus.Labels{
				"remote_ip":   pktAddr.IP.String(),
				"remote_port": strconv.Itoa(pktAddr.Port),
				"local_ip":    localIP,
				"local_port":  strconv.Itoa(addrUDP.Port),
				"type":        name,
			}).
			Inc()
		MetricPacketSizeSum.With(
			prometheus.Labels{
				"remote_ip":   pktAddr.IP.String(),
				"remote_port": strconv.Itoa(pktAddr.Port),
				"local_ip":    localIP,
				"local_port":  strconv.Itoa(addrUDP.Port),
				"type":        name,
			}).
			Observe(float64(size))
	}
}```

**To Reproduce**
Steps to reproduce the behavior:
1. Go to '...'
5. Click on '....'
6. Scroll down to '....'
7. See error

**Expected behavior**
A clear and concise description of what you expected to happen.

**Screenshots**
If applicable, add screenshots to help explain your problem.

**Environment:**
 - OS
 - Stanza Version or Commit Hash

**Additional context**
Add any other context about the problem here.
@jsirianni
Copy link
Member

Hi @jxplus, I believe you are correct. Was this causing a problem with your Stanza deployment?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants