-
Notifications
You must be signed in to change notification settings - Fork 6
[wip] User cache support. #13
base: master
Are you sure you want to change the base?
Changes from 3 commits
944699c
dcb4b40
d5ef49a
b098afa
bb2eecc
6d02953
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,115 @@ | ||
| package usercache | ||
|
|
||
| import ( | ||
| "encoding/json" | ||
|
|
||
| "github.com/pkg/errors" | ||
| "github.com/riking/marvin/slack" | ||
| "github.com/riking/marvin/slack/rtm" | ||
| ) | ||
|
|
||
| const ( | ||
| sqlMigrate1 = `CREATE TABLE module_user_cache ( | ||
| user_id varchar(15) PRIMARY KEY NOT NULL, | ||
| data text | ||
| )` | ||
|
|
||
| sqlGetAllEntries = `SELECT * FROM module_user_cache` | ||
|
|
||
| // $1 = slack.UserID | ||
| sqlGetEntry = `SELECT data FROM module_user_cache WHERE user_id = $1` | ||
|
|
||
| // $1 = slack.UserID | ||
| // $2 = data (json encoded) | ||
| sqlAddEntry = `INSERT INTO module_user_cache (user_id,data) VALUES ($1, $2)` | ||
|
|
||
| // $1 = data (json encoded) | ||
| // $2 = slack.UserID | ||
| sqlUpdateEntry = `UPDATE module_user_cache SET data = $1 WHERE user_id = $2` | ||
| ) | ||
|
|
||
| func (mod *UserCacheModule) GetEntry(userid slack.UserID) (slack.User, error) { | ||
| var entry slack.User | ||
|
|
||
| var data string | ||
| stmt, err := mod.team.DB().Prepare(sqlGetEntry) | ||
| if err != nil { | ||
| return entry, nil | ||
| } | ||
| defer stmt.Close() | ||
| row := stmt.QueryRow(userid) | ||
| err = row.Scan(&data) | ||
| if err != nil { | ||
| return entry, nil | ||
| } | ||
| err = json.Unmarshal([]byte(userid), &entry) | ||
| if err != nil { | ||
| return entry, nil | ||
| } | ||
| return entry, nil | ||
| } | ||
|
|
||
| func (mod *UserCacheModule) LoadEntries() error { | ||
| stmt, err := mod.team.DB().Query(sqlGetAllEntries) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| defer stmt.Close() | ||
| for stmt.Next() { | ||
| var id string | ||
| var data string | ||
| var user slack.User | ||
|
||
|
|
||
| err = stmt.Scan(&id, &data) | ||
| if err != nil { | ||
| return errors.Wrap(err, "error in user cache: obtaining row info") | ||
| continue | ||
| } | ||
| err = json.Unmarshal([]byte(data), &user) | ||
| if err != nil { | ||
| return errors.Wrap(err, "error in user cache: unmarshal user object") | ||
| } | ||
| rtmClient := mod.team.GetRTMClient().(*rtm.Client) | ||
|
||
| rtmClient.ReplaceUserObject(&user) | ||
|
||
| } | ||
| return stmt.Err() | ||
| } | ||
|
|
||
| func (mod *UserCacheModule) UpdateEntry(userobject slack.User) error { | ||
|
||
| _, exists := mod.GetEntry(userobject.ID) | ||
|
|
||
| var entrydata []byte | ||
| entrydata, err := json.Marshal(&userobject) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| var query = sqlAddEntry | ||
| if exists != nil { | ||
| query = sqlUpdateEntry | ||
| } | ||
|
|
||
| stmt, err := mod.team.DB().Prepare(query) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| defer stmt.Close() | ||
| row := stmt.QueryRow(userobject.ID, entrydata) | ||
| var id slack.UserID | ||
| err = row.Scan(&id) | ||
| return err | ||
| } | ||
|
|
||
| func (mod *UserCacheModule) UpdateEntries(userobjects []*slack.User) error { | ||
|
||
| for _, obj := range userobjects { | ||
| if obj != nil { | ||
| err := mod.UpdateEntry(*obj) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| } | ||
| } | ||
| return nil | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,64 @@ | ||
| package usercache | ||
|
|
||
| import ( | ||
| "sync" | ||
|
|
||
|
||
| "fmt" | ||
|
|
||
| "github.com/riking/marvin" | ||
| "github.com/riking/marvin/slack" | ||
| ) | ||
|
|
||
| type API interface { | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This API type can be dropped, or at least commented with |
||
| marvin.Module | ||
|
|
||
| GetEntry(userid slack.UserID) (slack.User, error) | ||
| UpdateEntry(userobject slack.User) error | ||
| UpdateEntries(userobjects []*slack.User) error | ||
| } | ||
|
|
||
| var _ API = &UserCacheModule{} | ||
|
|
||
| // --- | ||
| func init() { | ||
| marvin.RegisterModule(NewUserCacheModule) | ||
| } | ||
|
|
||
| const Identifier = "usercache" | ||
|
|
||
| type UserCacheModule struct { | ||
| team marvin.Team | ||
|
|
||
| cacheLock sync.Mutex | ||
| cacheMap map[slack.UserID]slack.User | ||
| } | ||
|
|
||
| func NewUserCacheModule(t marvin.Team) marvin.Module { | ||
| mod := &UserCacheModule{ | ||
| team: t, | ||
| cacheMap: make(map[slack.UserID]slack.User), | ||
| } | ||
| return mod | ||
| } | ||
|
|
||
| func (mod *UserCacheModule) Identifier() marvin.ModuleID { | ||
| return Identifier | ||
| } | ||
|
|
||
| func (mod *UserCacheModule) Load(t marvin.Team) { | ||
| t.DB().MustMigrate(Identifier, 1505192548, sqlMigrate1) | ||
| t.DB().SyntaxCheck(sqlGetAllEntries, sqlGetEntry, sqlAddEntry, sqlUpdateEntry) | ||
| } | ||
|
|
||
| func (mod *UserCacheModule) Enable(team marvin.Team) { | ||
| go func() { | ||
| err := mod.LoadEntries() | ||
| if err != nil { | ||
| fmt.Errorf("Error whilst updating entries: %s", err.Error()) | ||
| return | ||
| } | ||
| }() | ||
| } | ||
|
|
||
| func (mod *UserCacheModule) Disable(t marvin.Team) { | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,6 +5,7 @@ import ( | |
| "time" | ||
|
|
||
| "github.com/pkg/errors" | ||
| "github.com/riking/marvin" | ||
| "github.com/riking/marvin/slack" | ||
| "github.com/riking/marvin/util" | ||
| ) | ||
|
|
@@ -16,6 +17,14 @@ type membershipRequest struct { | |
| C chan interface{} | ||
| } | ||
|
|
||
| type userCacheAPI interface { | ||
| marvin.Module | ||
|
|
||
| GetEntry(userid slack.UserID) (slack.User, error) | ||
|
||
| UpdateEntry(userobject slack.User) error | ||
| UpdateEntries(userobjects []*slack.User) error | ||
| } | ||
|
|
||
| func (c *Client) membershipWorker() { | ||
| for req := range c.membershipCh { | ||
| req.C <- req.F(c.channelMembers) | ||
|
|
@@ -167,13 +176,13 @@ func (c *Client) fillUsersList() { | |
|
|
||
| for response.PageInfo.NextCursor != "" { | ||
| c.ReplaceManyUserObjects(response.Members) | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I had to make this change because otherwise the ReplaceManyUserObjects would get called again for the same group of objects retrieved from the last successful query.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. And with the new changes, it was also not retrieving all the users. |
||
| time.Sleep(2*time.Second) | ||
| time.Sleep(2 * time.Second) | ||
|
|
||
| form.Set("cursor", response.PageInfo.NextCursor) | ||
| err := c.team.SlackAPIPostJSON("users.list", form, &response) | ||
| if err != nil { | ||
| util.LogError(errors.Wrapf(err, "[%s] Could not retrieve users list", c.Team.Domain)) | ||
| break | ||
| continue | ||
| } | ||
| } | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ON CONFLICT DO UPDATE SET data = EXCLUDED.data😃
Avoids the need for a transaction / checking first.