Skip to content

Commit

Permalink
[1/2][backend] Eng 2196 m2 allow user to configure notifications (#935)
Browse files Browse the repository at this point in the history
* [2/2][UI] Eng 2196 m2 allow user to configure wf specific notifications (#936)

* Implement actual code to send email notifications (#943)

* Implement actual code to send slack notifications (#944)
  • Loading branch information
likawind authored Feb 4, 2023
1 parent 2eef267 commit 267dee9
Show file tree
Hide file tree
Showing 21 changed files with 392 additions and 65 deletions.
33 changes: 19 additions & 14 deletions src/golang/cmd/server/handler/edit_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
aq_context "github.com/aqueducthq/aqueduct/lib/context"
"github.com/aqueducthq/aqueduct/lib/database"
"github.com/aqueducthq/aqueduct/lib/engine"
"github.com/aqueducthq/aqueduct/lib/models/shared"
"github.com/aqueducthq/aqueduct/lib/repos"
"github.com/aqueducthq/aqueduct/lib/workflow"
"github.com/aqueducthq/aqueduct/lib/workflow/utils"
Expand Down Expand Up @@ -43,18 +44,20 @@ type EditWorkflowHandler struct {
}

type editWorkflowInput struct {
WorkflowName string `json:"name"`
WorkflowDescription string `json:"description"`
Schedule *col_workflow.Schedule `json:"schedule"`
RetentionPolicy *col_workflow.RetentionPolicy `json:"retention_policy"`
WorkflowName string `json:"name"`
WorkflowDescription string `json:"description"`
Schedule *col_workflow.Schedule `json:"schedule"`
RetentionPolicy *col_workflow.RetentionPolicy `json:"retention_policy"`
NotificationSettings *shared.NotificationSettings `json:"notification_settings"`
}

type editWorkflowArgs struct {
workflowId uuid.UUID
workflowName string
workflowDescription string
schedule *col_workflow.Schedule
retentionPolicy *col_workflow.RetentionPolicy
workflowId uuid.UUID
workflowName string
workflowDescription string
schedule *col_workflow.Schedule
retentionPolicy *col_workflow.RetentionPolicy
notificationSettings *shared.NotificationSettings
}

func (*EditWorkflowHandler) Name() string {
Expand Down Expand Up @@ -116,11 +119,12 @@ func (h *EditWorkflowHandler) Prepare(r *http.Request) (interface{}, int, error)
}

return &editWorkflowArgs{
workflowId: workflowID,
workflowName: input.WorkflowName,
workflowDescription: input.WorkflowDescription,
schedule: input.Schedule,
retentionPolicy: input.RetentionPolicy,
workflowId: workflowID,
workflowName: input.WorkflowName,
workflowDescription: input.WorkflowDescription,
schedule: input.Schedule,
retentionPolicy: input.RetentionPolicy,
notificationSettings: input.NotificationSettings,
}, http.StatusOK, nil
}

Expand Down Expand Up @@ -173,6 +177,7 @@ func (h *EditWorkflowHandler) Perform(ctx context.Context, interfaceArgs interfa
args.workflowDescription,
args.schedule,
args.retentionPolicy,
args.notificationSettings,
)
if err != nil {
return nil, http.StatusInternalServerError, errors.Wrap(err, "Unable to update workflow.")
Expand Down
1 change: 1 addition & 0 deletions src/golang/cmd/server/handler/register_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ func (h *RegisterWorkflowHandler) Perform(ctx context.Context, interfaceArgs int
dbWorkflowDag.Metadata.Description,
&dbWorkflowDag.Metadata.Schedule,
&dbWorkflowDag.Metadata.RetentionPolicy,
&dbWorkflowDag.Metadata.NotificationSettings,
)
if err != nil {
return nil, http.StatusInternalServerError, errors.Wrap(err, "Unable to update workflow.")
Expand Down
5 changes: 5 additions & 0 deletions src/golang/lib/engine/aq_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,7 @@ func (eng *aqEngine) EditWorkflow(
workflowDescription string,
schedule *workflow.Schedule,
retentionPolicy *workflow.RetentionPolicy,
notificationSettings *mdl_shared.NotificationSettings,
) error {
changes := map[string]interface{}{}
if workflowName != "" {
Expand All @@ -673,6 +674,10 @@ func (eng *aqEngine) EditWorkflow(
changes[models.WorkflowRetentionPolicy] = retentionPolicy
}

if notificationSettings != nil {
changes[models.WorkflowNotificationSettings] = notificationSettings
}

if schedule.Trigger != "" {
cronjobName := shared_utils.AppendPrefix(workflowID.String())
err := eng.updateWorkflowSchedule(ctx, workflowID, cronjobName, schedule)
Expand Down
2 changes: 2 additions & 0 deletions src/golang/lib/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/aqueducthq/aqueduct/lib/database"
exec_env "github.com/aqueducthq/aqueduct/lib/execution_environment"
"github.com/aqueducthq/aqueduct/lib/models"
mdl_shared "github.com/aqueducthq/aqueduct/lib/models/shared"
"github.com/dropbox/godropbox/errors"
"github.com/google/uuid"
)
Expand Down Expand Up @@ -50,6 +51,7 @@ type Engine interface {
workflowDescription string,
schedule *workflow.Schedule,
retentionPolicy *workflow.RetentionPolicy,
notificationSettings *mdl_shared.NotificationSettings,
) error

// TODO ENG-1444: Used as a wrapper to trigger a workflow via executor binary.
Expand Down
2 changes: 1 addition & 1 deletion src/golang/lib/engine/notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func sendNotifications(
}

msg := notificationMsg(wfDag.Name(), content.level, content.contextMsg)
workflowSettings := wfDag.NotificationSettings()
workflowSettings := wfDag.NotificationSettings().Settings
for _, notificationObj := range notifications {
if len(workflowSettings) > 0 {
// send based on settings
Expand Down
32 changes: 6 additions & 26 deletions src/golang/lib/models/shared/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,40 +8,20 @@ import (
)

// `NotificationSettings` maps IntegrationID to NotificationLevel
type NotificationSettings map[uuid.UUID]NotificationLevel
// This has to be a struct since sql driver does not support map type.
type NotificationSettings struct {
Settings map[uuid.UUID]NotificationLevel `json:"settings"`
}

func (s *NotificationSettings) Value() (driver.Value, error) {
return utils.ValueJSONB(*s)
}

func (s *NotificationSettings) Scan(value interface{}) error {
return utils.ScanJSONB(value, s)
}

type NullNotificationSettings struct {
NotificationSettings
IsNull bool
}

func (n *NullNotificationSettings) Value() (driver.Value, error) {
if n.IsNull {
return nil, nil
}

return (&n.NotificationSettings).Value()
}

func (n *NullNotificationSettings) Scan(value interface{}) error {
if value == nil {
n.IsNull = true
s.Settings = nil
return nil
}

s := &NotificationSettings{}
if err := s.Scan(value); err != nil {
return err
}

n.NotificationSettings, n.IsNull = *s, false
return nil
return utils.ScanJSONB(value, s)
}
16 changes: 8 additions & 8 deletions src/golang/lib/models/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ const (

// A Workflow maps to the workflow table.
type Workflow struct {
ID uuid.UUID `db:"id" json:"id"`
UserID uuid.UUID `db:"user_id" json:"user_id"`
Name string `db:"name" json:"name"`
Description string `db:"description" json:"description"`
Schedule workflow.Schedule `db:"schedule" json:"schedule"`
CreatedAt time.Time `db:"created_at" json:"created_at"`
RetentionPolicy workflow.RetentionPolicy `db:"retention_policy" json:"retention_policy"`
NotificationSettings shared.NullNotificationSettings `db:"notification_settings" json:"notification_settings"`
ID uuid.UUID `db:"id" json:"id"`
UserID uuid.UUID `db:"user_id" json:"user_id"`
Name string `db:"name" json:"name"`
Description string `db:"description" json:"description"`
Schedule workflow.Schedule `db:"schedule" json:"schedule"`
CreatedAt time.Time `db:"created_at" json:"created_at"`
RetentionPolicy workflow.RetentionPolicy `db:"retention_policy" json:"retention_policy"`
NotificationSettings shared.NotificationSettings `db:"notification_settings" json:"notification_settings"`
}

// WorkflowCols returns a comma-separated string of all Workflow columns.
Expand Down
27 changes: 25 additions & 2 deletions src/golang/lib/notification/email.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package notification
import (
"context"
"crypto/tls"
"fmt"
"net/smtp"
"strings"

"github.com/aqueducthq/aqueduct/lib/models"
"github.com/aqueducthq/aqueduct/lib/models/shared"
Expand All @@ -27,9 +29,30 @@ func (e *EmailNotification) Level() shared.NotificationLevel {
return e.conf.Level
}

func fullMessage(subject string, from string, targets []string, body string) string {
fullMsg := fmt.Sprintf("From: %s\n", from)
fullMsg += fmt.Sprintf("To: %s\n", strings.Join(targets, ","))
fullMsg += fmt.Sprintf("Subject: %s\n\n", subject)
fullMsg += body
return fullMsg
}

func (e *EmailNotification) Send(ctx context.Context, msg string) error {
// TODO: Implement
return nil
auth := smtp.PlainAuth(
"", // identity
e.conf.User,
e.conf.Password,
e.conf.Host,
)

fullMsg := fullMessage("aqueduct notification", e.conf.User, e.conf.Targets, msg)
return smtp.SendMail(
e.conf.FullHost(),
auth,
e.conf.User,
e.conf.Targets,
[]byte(fullMsg),
)
}

func AuthenticateEmail(conf *shared.EmailConfig) error {
Expand Down
75 changes: 74 additions & 1 deletion src/golang/lib/notification/slack.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ import (

"github.com/aqueducthq/aqueduct/lib/models"
"github.com/aqueducthq/aqueduct/lib/models/shared"
"github.com/dropbox/godropbox/errors"
"github.com/google/uuid"
"github.com/slack-go/slack"
)

const maxChannelLimit = 2000

type SlackNotification struct {
integration *models.Integration
conf *shared.SlackConfig
Expand All @@ -26,13 +29,83 @@ func (s *SlackNotification) Level() shared.NotificationLevel {
return s.conf.Level
}

// reference: https://stackoverflow.com/questions/50106263/slack-api-to-find-existing-channel
// We have to use list channel API together with a linear search.
func findChannels(client *slack.Client, names []string) ([]slack.Channel, error) {
channels, _ /* cursor */, err := client.GetConversations(&slack.GetConversationsParameters{
ExcludeArchived: true,
Limit: maxChannelLimit,
})
if err != nil {
return nil, err
}

namesSet := make(map[string]bool, len(names))
namesTaken := make(map[string]bool, len(names))
for _, name := range names {
namesSet[name] = true
}

// Slack channel names should be unique. We will still send notifications to all
// channels matching the given name.
results := make([]slack.Channel, 0, len(names))
for _, channel := range channels {
_, ok := namesSet[channel.Name]
if ok {
results = append(results, channel)
namesTaken[channel.Name] = true
}
}

if len(namesTaken) != len(namesSet) {
for name := range namesSet {
_, ok := namesTaken[name]
if !ok {
return nil, errors.Newf("Channel %s does not exist.", name)
}
}
}

return results, nil
}

func (s *SlackNotification) Send(ctx context.Context, msg string) error {
// TODO: Implement
client := slack.New(s.conf.Token)
channels, err := findChannels(client, s.conf.Channels)
if err != nil {
return err
}

for _, channel := range channels {
// reference: https://medium.com/@gausha/a-simple-slackbot-with-golang-c5a932d719c7
_, _, _, err = client.SendMessage(channel.ID, slack.MsgOptionBlocks(
slack.NewSectionBlock(
slack.NewTextBlockObject(
"plain_text",
msg,
false, /* emoji */
false, /* verbatim */
),
nil,
nil,
),
))

if err != nil {
return err
}
}

return nil
}

func AuthenticateSlack(conf *shared.SlackConfig) error {
client := slack.New(conf.Token)
_, err := client.AuthTest()
if err != nil {
return err
}

_, err = findChannels(client, conf.Channels)
return err
}
5 changes: 4 additions & 1 deletion src/golang/lib/repos/sqlite/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/aqueducthq/aqueduct/lib/collections/workflow"
"github.com/aqueducthq/aqueduct/lib/database"
"github.com/aqueducthq/aqueduct/lib/models"
mdl_shared "github.com/aqueducthq/aqueduct/lib/models/shared"
"github.com/aqueducthq/aqueduct/lib/models/views"
"github.com/aqueducthq/aqueduct/lib/repos"
"github.com/dropbox/godropbox/errors"
Expand Down Expand Up @@ -253,6 +254,7 @@ func (*workflowWriter) Create(
description string,
schedule *workflow.Schedule,
retentionPolicy *workflow.RetentionPolicy,
notificationSettings *mdl_shared.NotificationSettings,
DB database.Database,
) (*models.Workflow, error) {
cols := []string{
Expand All @@ -263,6 +265,7 @@ func (*workflowWriter) Create(
models.WorkflowSchedule,
models.WorkflowCreatedAt,
models.WorkflowRetentionPolicy,
models.WorkflowNotificationSettings,
}
query := DB.PrepareInsertWithReturnAllStmt(models.WorkflowTable, cols, models.WorkflowCols())

Expand All @@ -271,7 +274,7 @@ func (*workflowWriter) Create(
return nil, err
}

args := []interface{}{ID, userID, name, description, schedule, time.Now(), retentionPolicy}
args := []interface{}{ID, userID, name, description, schedule, time.Now(), retentionPolicy, notificationSettings}
return getWorkflow(ctx, DB, query, args...)
}

Expand Down
1 change: 1 addition & 0 deletions src/golang/lib/repos/tests/seed.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ func (ts *TestSuite) seedWorkflowWithUser(count int, userIDs []uuid.UUID) []mode
description,
schedule,
retentionPolicy,
&shared.NotificationSettings{},
ts.DB,
)
require.Nil(ts.T(), err)
Expand Down
Loading

0 comments on commit 267dee9

Please sign in to comment.