Skip to content

Commit

Permalink
perf: 记录会话生命周期日志 (#1297)
Browse files Browse the repository at this point in the history
* perf: 记录会话生命周期日志

* perf: 增加录像存储为 null 记录

* perf: 增加会话生命周期记录

* perf: 记录 sftp 和 ssh 命令会话连接记录

---------

Co-authored-by: Eric <[email protected]>
  • Loading branch information
fit2bot and LeeEirc authored Feb 6, 2024
1 parent 37f4996 commit 940e6a3
Show file tree
Hide file tree
Showing 11 changed files with 161 additions and 7 deletions.
18 changes: 17 additions & 1 deletion pkg/handler/server_ssh.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,13 @@ func IsScpCommand(rawStr string) bool {
return false
}

func (s *Server) recordSessionLifecycle(sid string, event model.LifecycleEvent, reason string) {
logObj := model.SessionLifecycleLog{Reason: reason}
if err2 := s.jmsService.RecordSessionLifecycleLog(sid, event, logObj); err2 != nil {
logger.Errorf("Record session %s lifecycle %s failed: %s", sid, event, err2)
}
}

func (s *Server) proxyAssetCommand(sess ssh.Session, sshClient *srvconn.SSHClient,
tokeInfo *model.ConnectToken) {
rawStr := sess.RawCommand()
Expand Down Expand Up @@ -371,6 +378,7 @@ func (s *Server) proxyAssetCommand(sess ssh.Session, sshClient *srvconn.SSHClien
}
ctx, cancel := context.WithCancel(sess.Context())
defer cancel()

traceSession := session.NewSession(&respSession, func(task *model.TerminalTask) error {
switch task.Name {
case model.TaskKillSession:
Expand All @@ -393,6 +401,7 @@ func (s *Server) proxyAssetCommand(sess ssh.Session, sshClient *srvconn.SSHClien
logger.Errorf("Get SSH session failed: %s", err)
return
}
s.recordSessionLifecycle(respSession.ID, model.AssetConnectSuccess, "")
defer goSess.Close()
defer sshClient.ReleaseSession(goSess)
go func() {
Expand Down Expand Up @@ -455,6 +464,8 @@ func (s *Server) proxyAssetCommand(sess ssh.Session, sshClient *srvconn.SSHClien
logger.Errorf("User %s Run command %s failed: %s",
tokeInfo.User.String(), rawStr, err)
}
reason := string(model.ReasonErrConnectDisconnect)
s.recordSessionLifecycle(respSession.ID, model.AssetConnectFinished, reason)
}

func (s *Server) proxyVscodeShell(sess ssh.Session, vsReq *vscodeReq, sshClient *srvconn.SSHClient,
Expand Down Expand Up @@ -490,7 +501,7 @@ func (s *Server) proxyVscodeShell(sess ssh.Session, vsReq *vscodeReq, sshClient
logger.Errorf("Get SSH session failed: %s", err)
return err
}

s.recordSessionLifecycle(respSession.ID, model.AssetConnectSuccess, "")
defer goSess.Close()
defer sshClient.ReleaseSession(goSess)
stdOut, err := goSess.StdoutPipe()
Expand All @@ -506,6 +517,7 @@ func (s *Server) proxyVscodeShell(sess ssh.Session, vsReq *vscodeReq, sshClient
err = goSess.Shell()
if err != nil {
logger.Errorf("Get SSH session shell failed: %s", err)
s.recordSessionLifecycle(respSession.ID, model.AssetConnectFinished, err.Error())
return err
}
logger.Infof("User %s start vscode request to %s", vsReq.user, sshClient)
Expand All @@ -525,11 +537,15 @@ func (s *Server) proxyVscodeShell(sess ssh.Session, vsReq *vscodeReq, sshClient
case <-ctx.Done():
logger.Infof("SSH conn[%s] User %s end vscode request %s as session done",
vsReq.reqId, vsReq.user, sshClient)
reason := string(model.ReasonErrConnectDisconnect)
s.recordSessionLifecycle(respSession.ID, model.AssetConnectFinished, reason)
return nil
case now := <-ticker.C:
if vsReq.expireInfo.IsExpired(now) {
logger.Infof("SSH conn[%s] User %s end vscode request %s as permission has expired",
vsReq.reqId, vsReq.user, sshClient)
reason := string(model.ReasonErrPermissionExpired)
s.recordSessionLifecycle(respSession.ID, model.AssetConnectFinished, reason)
return nil
}
logger.Debugf("SSH conn[%s] user %s vscode request still alive", vsReq.reqId, vsReq.user)
Expand Down
6 changes: 6 additions & 0 deletions pkg/httpd/tty.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,8 @@ func (h *tty) JoinRoom(c *Client, roomID string) {
Body: nil,
Meta: meta,
})
logObj := model.SessionLifecycleLog{User: h.ws.user.String()}
h.ws.RecordLifecycleLog(roomID, model.UserJoinSession, logObj)
for {
buf := make([]byte, 1024)
nr, err := c.Read(buf)
Expand All @@ -451,6 +453,7 @@ func (h *tty) JoinRoom(c *Client, roomID string) {
Body: nil,
Meta: meta,
})
h.ws.RecordLifecycleLog(roomID, model.UserLeaveSession, logObj)
logger.Infof("Conn[%s] user read end", c.ID())
if err := h.ws.apiClient.FinishShareRoom(h.shareInfo.Record.ID); err != nil {
logger.Infof("Conn[%s] finish share room err: %s", c.ID(), err)
Expand All @@ -463,6 +466,8 @@ func (h *tty) Monitor(c *Client, roomID string) {
conn := exchange.WrapperUserCon(c)
room.Subscribe(conn)
defer room.UnSubscribe(conn)
logObj := model.SessionLifecycleLog{User: h.ws.user.String()}
h.ws.RecordLifecycleLog(roomID, model.AdminJoinMonitor, logObj)
for {
buf := make([]byte, 1024)
_, err := c.Read(buf)
Expand All @@ -473,5 +478,6 @@ func (h *tty) Monitor(c *Client, roomID string) {
logger.Debugf("Conn[%s] user monitor", c.ID())
}
logger.Infof("Conn[%s] user read end", c.ID())
h.ws.RecordLifecycleLog(roomID, model.AdminExitMonitor, logObj)
}
}
7 changes: 7 additions & 0 deletions pkg/httpd/userwebsocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,3 +241,10 @@ var (
ErrDisableShare = errors.New("disable share")
ErrPermissionDenied = errors.New("permission denied")
)

func (userCon *UserWebsocket) RecordLifecycleLog(sid string, event model.LifecycleEvent,
logObj model.SessionLifecycleLog) {
if err := userCon.apiClient.RecordSessionLifecycleLog(sid, event, logObj); err != nil {
logger.Errorf("Record session lifecycle log err: %s", err)
}
}
38 changes: 38 additions & 0 deletions pkg/jms-sdk-go/model/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,41 @@ const (
SessionReplayErrUploadFailed ReplayError = "replay_upload_failed"
SessionReplayErrUnsupported ReplayError = "replay_unsupported"
)

type LifecycleEvent string

const (
AssetConnectSuccess LifecycleEvent = "asset_connect_success"
AssetConnectFinished LifecycleEvent = "asset_connect_finished"
CreateShareLink LifecycleEvent = "create_share_link"
UserJoinSession LifecycleEvent = "user_join_session"
UserLeaveSession LifecycleEvent = "user_leave_session"
AdminJoinMonitor LifecycleEvent = "admin_join_monitor"
AdminExitMonitor LifecycleEvent = "admin_exit_monitor"
ReplayConvertStart LifecycleEvent = "replay_convert_start"
ReplayConvertSuccess LifecycleEvent = "replay_convert_success"
ReplayConvertFailure LifecycleEvent = "replay_convert_failure"
ReplayUploadStart LifecycleEvent = "replay_upload_start"
ReplayUploadSuccess LifecycleEvent = "replay_upload_success"
ReplayUploadFailure LifecycleEvent = "replay_upload_failure"
)

type SessionLifecycleLog struct {
Reason string `json:"reason"`
User string `json:"user"`
}

var EmptyLifecycleLog = SessionLifecycleLog{}

type SessionLifecycleReasonErr string

const (
ReasonErrConnectFailed SessionLifecycleReasonErr = "connect_failed"
ReasonErrConnectDisconnect SessionLifecycleReasonErr = "connect_disconnect"
ReasonErrUserClose SessionLifecycleReasonErr = "user_close"
ReasonErrIdleDisconnect SessionLifecycleReasonErr = "idle_disconnect"
ReasonErrAdminTerminate SessionLifecycleReasonErr = "admin_terminate"
ReasonErrMaxSessionTimeout SessionLifecycleReasonErr = "max_session_timeout"
ReasonErrPermissionExpired SessionLifecycleReasonErr = "permission_expired"
ReasonErrNullStorage SessionLifecycleReasonErr = "null_storage"
)
24 changes: 24 additions & 0 deletions pkg/jms-sdk-go/service/jms_session_record.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package service

import (
"fmt"

"github.com/jumpserver/koko/pkg/jms-sdk-go/model"
)

func (s *JMService) RecordSessionLifecycleLog(sid string, event model.LifecycleEvent, logObj model.SessionLifecycleLog) (err error) {
data := map[string]interface{}{
"event": event,
}
if logObj.Reason != "" {
data["reason"] = logObj.Reason
}
if logObj.User != "" {
data["user"] = logObj.User
}

reqURL := fmt.Sprintf(SessionLifecycleLogURL, sid)
var resp map[string]interface{}
_, err = s.authClient.Post(reqURL, data, &resp)
return
}
2 changes: 2 additions & 0 deletions pkg/jms-sdk-go/service/url.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ const (
FTPLogListURL = "/api/v1/audits/ftp-logs/" // 上传 ftp日志
FTPLogUpdateURL = "/api/v1/audits/ftp-logs/%s/"
FTPLogFileURL = "/api/v1/audits/ftp-logs/%s/upload/"

SessionLifecycleLogURL = "/api/v1/terminal/sessions/%s/lifecycle_log/"
)

// 授权相关API
Expand Down
19 changes: 15 additions & 4 deletions pkg/koko/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ func uploadRemainReplay(jmsService *service.JMService) {
return nil
})

recordLifecycleLog := func(id string, event model.LifecycleEvent, reason string) {
logObj := model.SessionLifecycleLog{Reason: reason}
if err1 := jmsService.RecordSessionLifecycleLog(id, event, logObj); err1 != nil {
logger.Errorf("Update session %s activity log failed: %s", id, err1)
}
}

for absPath, remainReplay := range allRemainFiles {
absGzPath := absPath
if !remainReplay.IsGzip {
Expand All @@ -64,18 +71,22 @@ func uploadRemainReplay(jmsService *service.JMService) {
}
_ = os.Remove(absPath)
}
Target, _ := filepath.Rel(replayDir, absGzPath)
target, _ := filepath.Rel(replayDir, absGzPath)

recordLifecycleLog(remainReplay.Id, model.ReplayUploadStart, "")
logger.Infof("Upload replay file: %s, type: %s", absGzPath, replayStorage.TypeName())
if err2 := replayStorage.Upload(absGzPath, Target); err2 != nil {
if err2 := replayStorage.Upload(absGzPath, target); err2 != nil {
logger.Errorf("Upload remain replay file %s failed: %s", absGzPath, err2)
reason := model.SessionReplayErrUploadFailed
if err3 := jmsService.SessionReplayFailed(remainReplay.Id, reason); err3 != nil {
logger.Errorf("Update session %s status %s failed: %s", remainReplay.Id, reason, err3)
}
recordLifecycleLog(remainReplay.Id, model.ReplayUploadFailure, err2.Error())
continue
}
if err := jmsService.FinishReply(remainReplay.Id); err != nil {
logger.Errorf("Notify session %s upload failed: %s", remainReplay.Id, err)
recordLifecycleLog(remainReplay.Id, model.ReplayUploadSuccess, "")
if err1 := jmsService.FinishReply(remainReplay.Id); err1 != nil {
logger.Errorf("Notify session %s upload failed: %s", remainReplay.Id, err1)
continue
}
_ = os.Remove(absGzPath)
Expand Down
12 changes: 12 additions & 0 deletions pkg/proxy/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ func (r *ReplyRecorder) Record(p []byte) {

func (r *ReplyRecorder) End() {
if r.isNullStorage() {
r.recordLifecycleLog(model.ReplayUploadFailure, string(model.ReasonErrNullStorage))
return
}
_ = r.file.Close()
Expand Down Expand Up @@ -230,8 +231,10 @@ func (r *ReplyRecorder) uploadReplay() {
func (r *ReplyRecorder) UploadGzipFile(maxRetry int) {
if r.isNullStorage() {
_ = os.Remove(r.absGzipFilePath)
r.recordLifecycleLog(model.ReplayUploadFailure, string(model.ReasonErrNullStorage))
return
}
r.recordLifecycleLog(model.ReplayUploadStart, "")
for i := 0; i <= maxRetry; i++ {
logger.Infof("Upload replay file: %s, type: %s", r.absGzipFilePath, r.storage.TypeName())
err := r.storage.Upload(r.absGzipFilePath, r.Target)
Expand All @@ -240,8 +243,10 @@ func (r *ReplyRecorder) UploadGzipFile(maxRetry int) {
if err = r.jmsService.FinishReply(r.SessionID); err != nil {
logger.Errorf("Session[%s] finish replay err: %s", r.SessionID, err)
}
r.recordLifecycleLog(model.ReplayUploadSuccess, "")
break
}
r.recordLifecycleLog(model.ReplayUploadFailure, err.Error())
logger.Errorf("Upload replay file err: %s", err)
// 如果还是失败,上传 server 再传一次
if i == maxRetry {
Expand All @@ -260,6 +265,13 @@ func (r *ReplyRecorder) UploadGzipFile(maxRetry int) {
}
}

func (r *ReplyRecorder) recordLifecycleLog(event model.LifecycleEvent, reason string) {
eventLog := model.SessionLifecycleLog{Reason: reason}
if err := r.jmsService.RecordSessionLifecycleLog(r.SessionID, event, eventLog); err != nil {
logger.Errorf("Update session %s activity log %s failed: %s", r.SessionID, event, err)
}
}

type ReplyInfo struct {
Width int
Height int
Expand Down
9 changes: 9 additions & 0 deletions pkg/proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1074,9 +1074,18 @@ func (s *Server) Proxy() {
if err2 := s.ConnectedFailedCallback(err); err2 != nil {
logger.Errorf("Conn[%s] update session err: %s", s.UserConn.ID(), err2)
}
errLog := model.SessionLifecycleLog{Reason: err.Error()}
if err1 := s.jmsService.RecordSessionLifecycleLog(s.sessionInfo.ID, model.AssetConnectFinished,
errLog); err1 != nil {
logger.Errorf("Conn[%s] record session activity log err: %s", s.UserConn.ID(), err1)
}
return
}
defer srvCon.Close()
if err1 := s.jmsService.RecordSessionLifecycleLog(s.sessionInfo.ID, model.AssetConnectSuccess,
model.EmptyLifecycleLog); err1 != nil {
logger.Errorf("Conn[%s] record session activity log err: %s", s.UserConn.ID(), err1)
}

logger.Infof("Conn[%s] create session %s success", s.UserConn.ID(), s.ID)
if err2 := s.ConnectedSuccessCallback(); err2 != nil {
Expand Down
15 changes: 15 additions & 0 deletions pkg/proxy/switch.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ func (s *SwitchSession) Bridge(userConn UserConnection, srvConn srvconn.ServerCo
msg = utils.WrapperWarn(msg)
replayRecorder.Record([]byte(msg))
room.Broadcast(&exchange.RoomMessage{Event: exchange.DataEvent, Body: []byte("\n\r" + msg)})
s.recordSessionFinished(model.ReasonErrMaxSessionTimeout)
return
}

Expand All @@ -302,6 +303,7 @@ func (s *SwitchSession) Bridge(userConn UserConnection, srvConn srvconn.ServerCo
msg = utils.WrapperWarn(msg)
replayRecorder.Record([]byte(msg))
room.Broadcast(&exchange.RoomMessage{Event: exchange.DataEvent, Body: []byte("\n\r" + msg)})
s.recordSessionFinished(model.ReasonErrIdleDisconnect)
return
}
if s.p.CheckPermissionExpired(now) {
Expand All @@ -310,6 +312,7 @@ func (s *SwitchSession) Bridge(userConn UserConnection, srvConn srvconn.ServerCo
msg = utils.WrapperWarn(msg)
replayRecorder.Record([]byte(msg))
room.Broadcast(&exchange.RoomMessage{Event: exchange.DataEvent, Body: []byte("\n\r" + msg)})
s.recordSessionFinished(model.ReasonErrPermissionExpired)
return
}
continue
Expand All @@ -321,6 +324,7 @@ func (s *SwitchSession) Bridge(userConn UserConnection, srvConn srvconn.ServerCo
replayRecorder.Record([]byte(msg))
logger.Infof("Session[%s]: %s", s.ID, msg)
room.Broadcast(&exchange.RoomMessage{Event: exchange.DataEvent, Body: []byte("\n\r" + msg)})
s.recordSessionFinished(model.ReasonErrAdminTerminate)
return
// 监控窗口大小变化
case win, ok := <-winCh:
Expand All @@ -339,6 +343,7 @@ func (s *SwitchSession) Bridge(userConn UserConnection, srvConn srvconn.ServerCo
// 经过parse处理的server数据,发给user
case p, ok := <-srvOutChan:
if !ok {
s.recordSessionFinished(model.ReasonErrConnectDisconnect)
return
}
if parser.NeedRecord() {
Expand All @@ -352,6 +357,7 @@ func (s *SwitchSession) Bridge(userConn UserConnection, srvConn srvconn.ServerCo
// 经过parse处理的user数据,发给server
case p, ok := <-userOutChan:
if !ok {
s.recordSessionFinished(model.ReasonErrUserClose)
return
}
if _, err1 := srvConn.Write(p); err1 != nil {
Expand All @@ -367,9 +373,11 @@ func (s *SwitchSession) Bridge(userConn UserConnection, srvConn srvconn.ServerCo
continue
case <-userConn.Context().Done():
logger.Infof("Session[%s]: user conn context done", s.ID)
s.recordSessionFinished(model.ReasonErrUserClose)
return nil
case <-exitSignal:
logger.Debugf("Session[%s] end by exit signal", s.ID)
s.recordSessionFinished(model.ReasonErrConnectDisconnect)
return
case notifyMsg := <-s.notifyMsgChan:
logger.Infof("Session[%s] notify event: %s", s.ID, notifyMsg.Event)
Expand All @@ -379,3 +387,10 @@ func (s *SwitchSession) Bridge(userConn UserConnection, srvConn srvconn.ServerCo
lastActiveTime = time.Now()
}
}

func (s *SwitchSession) recordSessionFinished(reason model.SessionLifecycleReasonErr) {
logObj := model.SessionLifecycleLog{Reason: string(reason)}
if err := s.p.jmsService.RecordSessionLifecycleLog(s.ID, model.AssetConnectFinished, logObj); err != nil {
logger.Errorf("Session[%s] record session asset_connect_finished failed: %s", s.ID, err)
}
}
Loading

0 comments on commit 940e6a3

Please sign in to comment.