-
Notifications
You must be signed in to change notification settings - Fork 6
[wip] User cache support. #13
base: master
Are you sure you want to change the base?
Changes from 4 commits
944699c
dcb4b40
d5ef49a
b098afa
bb2eecc
6d02953
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,114 @@ | ||
| package usercache | ||
|
|
||
| import ( | ||
| "encoding/json" | ||
| "fmt" | ||
|
|
||
| "github.com/riking/marvin/slack" | ||
| "github.com/riking/marvin/slack/rtm" | ||
| ) | ||
|
|
||
| const ( | ||
| sqlMigrate1 = `CREATE TABLE module_user_cache ( | ||
| user_id varchar(15) PRIMARY KEY NOT NULL, | ||
| data text | ||
|
|
||
| UNIQUE(user_id) | ||
| )` | ||
|
|
||
| sqlGetAllEntries = `SELECT * FROM module_user_cache` | ||
|
|
||
| // $1 = slack.UserID | ||
| sqlGetEntry = `SELECT data FROM module_user_cache WHERE user_id = $1` | ||
|
|
||
| // $1 = slack.UserID | ||
| // $2 = data (json encoded) | ||
| sqlUpsertEntry = `INSERT INTO module_user_cache (user_id,data) VALUES ($1, $2) | ||
| ON CONFLICT (user_id) DO UPDATE SET data = EXCLUDED.data` | ||
| ) | ||
|
|
||
| func (mod *UserCacheModule) GetEntry(userid slack.UserID) (slack.User, error) { | ||
| var entry slack.User | ||
|
|
||
| var data string | ||
| stmt, err := mod.team.DB().Prepare(sqlGetEntry) | ||
| if err != nil { | ||
| return entry, nil | ||
| } | ||
| defer stmt.Close() | ||
| row := stmt.QueryRow(userid) | ||
| err = row.Scan(&data) | ||
| if err != nil { | ||
| return entry, nil | ||
| } | ||
| err = json.Unmarshal([]byte(userid), &entry) | ||
| if err != nil { | ||
| return entry, nil | ||
| } | ||
| return entry, nil | ||
| } | ||
|
|
||
| func (mod *UserCacheModule) LoadEntries() error { | ||
| stmt, err := mod.team.DB().Query(sqlGetAllEntries) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| rtmClient := mod.team.GetRTMClient().(*rtm.Client) | ||
|
|
||
| defer stmt.Close() | ||
| var arr = make([]*slack.User, 200) | ||
| for stmt.Next() { | ||
| var id string | ||
| var data string | ||
| var user *slack.User = &slack.User{} | ||
|
|
||
| err = stmt.Scan(&id, &data) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| err = json.Unmarshal([]byte(data), user) | ||
| if err != nil { | ||
| return err | ||
|
||
| } | ||
| arr = append(arr, user) | ||
| if len(arr) >= 199 { | ||
| go rtmClient.ReplaceManyUserObjects(arr, false) | ||
| arr = make([]*slack.User, 200) | ||
|
||
| } | ||
| } | ||
| if len(arr) >= 0 { | ||
| go rtmClient.ReplaceManyUserObjects(arr, false) | ||
| arr = nil | ||
| } | ||
|
|
||
| return stmt.Err() | ||
| } | ||
|
|
||
| func (mod *UserCacheModule) UpdateEntry(userobject *slack.User) error { | ||
| var objarray = make([]*slack.User, 1) | ||
| objarray[0] = userobject | ||
| return mod.UpdateEntries(objarray) | ||
|
||
| } | ||
|
|
||
| func (mod *UserCacheModule) UpdateEntries(userobjects []*slack.User) error { | ||
| stmt, err := mod.team.DB().Prepare(sqlUpsertEntry) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| defer stmt.Close() | ||
|
|
||
| for _, obj := range userobjects { | ||
| if obj != nil { | ||
| entrydata, err := json.Marshal(obj) | ||
| if err == nil { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| _, err := stmt.Exec(obj.ID, entrydata) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| } | ||
| } | ||
| } | ||
| return nil | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,62 @@ | ||
| package usercache | ||
|
|
||
| import ( | ||
| "fmt" | ||
|
|
||
| "github.com/riking/marvin" | ||
| "github.com/riking/marvin/slack" | ||
| ) | ||
|
|
||
| // interface duplicated in rtm package | ||
| type API interface { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This API type can be dropped, or at least commented with |
||
| marvin.Module | ||
|
|
||
| GetEntry(userid slack.UserID) (slack.User, error) | ||
| LoadEntries() error | ||
| UpdateEntry(userobject *slack.User) error | ||
| UpdateEntries(userobjects []*slack.User) error | ||
| } | ||
|
|
||
| var _ API = &UserCacheModule{} | ||
|
|
||
| // --- | ||
| func init() { | ||
| marvin.RegisterModule(NewUserCacheModule) | ||
| } | ||
|
|
||
| const Identifier = "usercache" | ||
|
|
||
| type UserCacheModule struct { | ||
| team marvin.Team | ||
| } | ||
|
|
||
| func NewUserCacheModule(t marvin.Team) marvin.Module { | ||
| mod := &UserCacheModule{ | ||
| team: t, | ||
| } | ||
| return mod | ||
| } | ||
|
|
||
| func (mod *UserCacheModule) Identifier() marvin.ModuleID { | ||
| return Identifier | ||
| } | ||
|
|
||
| func (mod *UserCacheModule) Load(t marvin.Team) { | ||
| t.DB().MustMigrate(Identifier, 1505192548, sqlMigrate1) | ||
| t.DB().SyntaxCheck(sqlGetAllEntries, sqlGetEntry, sqlUpsertEntry) | ||
| } | ||
|
|
||
| func (mod *UserCacheModule) Enable(team marvin.Team) { | ||
| go func() { | ||
| fmt.Printf("Loading cache entries....\n") | ||
|
||
| err := mod.LoadEntries() | ||
| if err != nil { | ||
| fmt.Printf("Error whilst updating entries: %s\n", err.Error()) | ||
| return | ||
| } | ||
| fmt.Printf("Loaded all entries from the cache.\n") | ||
| }() | ||
| } | ||
|
|
||
| func (mod *UserCacheModule) Disable(t marvin.Team) { | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -93,6 +93,13 @@ func (c *Client) ReplaceUserObject(obj *slack.User) { | |
| c.MetadataLock.Lock() | ||
| defer c.MetadataLock.Unlock() | ||
|
|
||
| var cacheApi userCacheAPI | ||
| moduleCacheApi := c.team.GetModule("usercache") | ||
| if moduleCacheApi != nil { | ||
| cacheApi = moduleCacheApi.(userCacheAPI) | ||
| cacheApi.UpdateEntry(obj) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| } | ||
|
|
||
| obj.CacheTS = time.Now() | ||
| for i, v := range c.Users { | ||
| if v.ID == obj.ID { | ||
|
|
@@ -103,10 +110,18 @@ func (c *Client) ReplaceUserObject(obj *slack.User) { | |
| c.Users = append(c.Users, obj) | ||
| } | ||
|
|
||
| func (c *Client) ReplaceManyUserObjects(objs []*slack.User) { | ||
| func (c *Client) ReplaceManyUserObjects(objs []*slack.User, updateCache bool) { | ||
| c.MetadataLock.Lock() | ||
| defer c.MetadataLock.Unlock() | ||
|
|
||
| var cacheApi userCacheAPI | ||
| moduleCacheApi := c.team.GetModule("usercache") | ||
| if moduleCacheApi != nil && updateCache { | ||
| cacheApi = moduleCacheApi.(userCacheAPI) | ||
| cacheApi.UpdateEntries(objs) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| } | ||
|
|
||
|
|
||
| now := time.Now() | ||
| for ci, cv := range c.Users { | ||
| for ii, iv := range objs { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,6 +5,7 @@ import ( | |
| "time" | ||
|
|
||
| "github.com/pkg/errors" | ||
| "github.com/riking/marvin" | ||
| "github.com/riking/marvin/slack" | ||
| "github.com/riking/marvin/util" | ||
| ) | ||
|
|
@@ -16,6 +17,13 @@ type membershipRequest struct { | |
| C chan interface{} | ||
| } | ||
|
|
||
| type userCacheAPI interface { | ||
| marvin.Module | ||
|
|
||
| UpdateEntry(userobject *slack.User) error | ||
| UpdateEntries(userobjects []*slack.User) error | ||
| } | ||
|
|
||
| func (c *Client) membershipWorker() { | ||
| for req := range c.membershipCh { | ||
| req.C <- req.F(c.channelMembers) | ||
|
|
@@ -165,16 +173,17 @@ func (c *Client) fillUsersList() { | |
| util.LogError(errors.Wrapf(err, "[%s] Could not retrieve users list", c.Team.Domain)) | ||
| } | ||
|
|
||
| for response.PageInfo.NextCursor != "" { | ||
| c.ReplaceManyUserObjects(response.Members) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I had to make this change because otherwise the ReplaceManyUserObjects would get called again for the same group of objects retrieved from the last successful query. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. And with the new changes, it was also not retrieving all the users. |
||
| time.Sleep(2*time.Second) | ||
| c.ReplaceManyUserObjects(response.Members, true) | ||
|
|
||
| for response.PageInfo.NextCursor != "" { | ||
| time.Sleep(2 * time.Second) | ||
| form.Set("cursor", response.PageInfo.NextCursor) | ||
| err := c.team.SlackAPIPostJSON("users.list", form, &response) | ||
| if err != nil { | ||
| util.LogError(errors.Wrapf(err, "[%s] Could not retrieve users list", c.Team.Domain)) | ||
| break | ||
| continue | ||
| } | ||
| c.ReplaceManyUserObjects(response.Members, true) | ||
| } | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leave it at nil, json.Unmarshal(&user) will allocate for you.