-
Notifications
You must be signed in to change notification settings - Fork 6
[wip] User cache support. #13
base: master
Are you sure you want to change the base?
Changes from 1 commit
944699c
dcb4b40
d5ef49a
b098afa
bb2eecc
6d02953
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,8 +2,8 @@ package usercache | |
|
|
||
| import ( | ||
| "encoding/json" | ||
| "fmt" | ||
|
|
||
| "github.com/pkg/errors" | ||
| "github.com/riking/marvin/slack" | ||
| "github.com/riking/marvin/slack/rtm" | ||
| ) | ||
|
|
@@ -12,6 +12,8 @@ const ( | |
| sqlMigrate1 = `CREATE TABLE module_user_cache ( | ||
| user_id varchar(15) PRIMARY KEY NOT NULL, | ||
| data text | ||
|
|
||
| UNIQUE(user_id) | ||
| )` | ||
|
|
||
| sqlGetAllEntries = `SELECT * FROM module_user_cache` | ||
|
|
@@ -21,11 +23,8 @@ const ( | |
|
|
||
| // $1 = slack.UserID | ||
| // $2 = data (json encoded) | ||
| sqlAddEntry = `INSERT INTO module_user_cache (user_id,data) VALUES ($1, $2)` | ||
|
|
||
| // $1 = data (json encoded) | ||
| // $2 = slack.UserID | ||
| sqlUpdateEntry = `UPDATE module_user_cache SET data = $1 WHERE user_id = $2` | ||
| sqlUpsertEntry = `INSERT INTO module_user_cache (user_id,data) VALUES ($1, $2) | ||
| ON CONFLICT (user_id) DO UPDATE SET data = EXCLUDED.data` | ||
| ) | ||
|
|
||
| func (mod *UserCacheModule) GetEntry(userid slack.UserID) (slack.User, error) { | ||
|
|
@@ -55,59 +54,59 @@ func (mod *UserCacheModule) LoadEntries() error { | |
| return err | ||
| } | ||
|
|
||
| rtmClient := mod.team.GetRTMClient().(*rtm.Client) | ||
|
|
||
| defer stmt.Close() | ||
| var arr = make([]*slack.User, 200) | ||
| for stmt.Next() { | ||
| var id string | ||
| var data string | ||
| var user slack.User | ||
| var user *slack.User = &slack.User{} | ||
|
|
||
| err = stmt.Scan(&id, &data) | ||
| if err != nil { | ||
| return errors.Wrap(err, "error in user cache: obtaining row info") | ||
| continue | ||
| return err | ||
| } | ||
| err = json.Unmarshal([]byte(data), &user) | ||
| err = json.Unmarshal([]byte(data), user) | ||
| if err != nil { | ||
| return errors.Wrap(err, "error in user cache: unmarshal user object") | ||
| return err | ||
|
||
| } | ||
| arr = append(arr, user) | ||
| if len(arr) >= 199 { | ||
| go rtmClient.ReplaceManyUserObjects(arr, false) | ||
| arr = make([]*slack.User, 200) | ||
|
||
| } | ||
| rtmClient := mod.team.GetRTMClient().(*rtm.Client) | ||
| rtmClient.ReplaceUserObject(&user) | ||
| } | ||
| if len(arr) >= 0 { | ||
| go rtmClient.ReplaceManyUserObjects(arr, false) | ||
| arr = nil | ||
| } | ||
|
|
||
| return stmt.Err() | ||
| } | ||
|
|
||
| func (mod *UserCacheModule) UpdateEntry(userobject slack.User) error { | ||
| _, exists := mod.GetEntry(userobject.ID) | ||
|
|
||
| var entrydata []byte | ||
| entrydata, err := json.Marshal(&userobject) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| var query = sqlAddEntry | ||
| if exists != nil { | ||
| query = sqlUpdateEntry | ||
| } | ||
| func (mod *UserCacheModule) UpdateEntry(userobject *slack.User) error { | ||
| var objarray = make([]*slack.User, 1) | ||
| objarray[0] = userobject | ||
| return mod.UpdateEntries(objarray) | ||
|
||
| } | ||
|
|
||
| stmt, err := mod.team.DB().Prepare(query) | ||
| func (mod *UserCacheModule) UpdateEntries(userobjects []*slack.User) error { | ||
| stmt, err := mod.team.DB().Prepare(sqlUpsertEntry) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| defer stmt.Close() | ||
| row := stmt.QueryRow(userobject.ID, entrydata) | ||
| var id slack.UserID | ||
| err = row.Scan(&id) | ||
| return err | ||
| } | ||
|
|
||
| func (mod *UserCacheModule) UpdateEntries(userobjects []*slack.User) error { | ||
| for _, obj := range userobjects { | ||
| if obj != nil { | ||
| err := mod.UpdateEntry(*obj) | ||
| if err != nil { | ||
| return err | ||
| entrydata, err := json.Marshal(obj) | ||
| if err == nil { | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| _, err := stmt.Exec(obj.ID, entrydata) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,19 +1,19 @@ | ||
| package usercache | ||
|
|
||
| import ( | ||
| "sync" | ||
|
|
||
| "fmt" | ||
|
|
||
| "github.com/riking/marvin" | ||
| "github.com/riking/marvin/slack" | ||
| ) | ||
|
|
||
| // interface duplicated in rtm package | ||
| type API interface { | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This API type can be dropped, or at least commented with |
||
| marvin.Module | ||
|
|
||
| GetEntry(userid slack.UserID) (slack.User, error) | ||
| UpdateEntry(userobject slack.User) error | ||
| LoadEntries() error | ||
| UpdateEntry(userobject *slack.User) error | ||
| UpdateEntries(userobjects []*slack.User) error | ||
| } | ||
|
|
||
|
|
@@ -28,15 +28,11 @@ const Identifier = "usercache" | |
|
|
||
| type UserCacheModule struct { | ||
| team marvin.Team | ||
|
|
||
| cacheLock sync.Mutex | ||
| cacheMap map[slack.UserID]slack.User | ||
| } | ||
|
|
||
| func NewUserCacheModule(t marvin.Team) marvin.Module { | ||
| mod := &UserCacheModule{ | ||
| team: t, | ||
| cacheMap: make(map[slack.UserID]slack.User), | ||
| team: t, | ||
| } | ||
| return mod | ||
| } | ||
|
|
@@ -47,16 +43,18 @@ func (mod *UserCacheModule) Identifier() marvin.ModuleID { | |
|
|
||
| func (mod *UserCacheModule) Load(t marvin.Team) { | ||
| t.DB().MustMigrate(Identifier, 1505192548, sqlMigrate1) | ||
| t.DB().SyntaxCheck(sqlGetAllEntries, sqlGetEntry, sqlAddEntry, sqlUpdateEntry) | ||
| t.DB().SyntaxCheck(sqlGetAllEntries, sqlGetEntry, sqlUpsertEntry) | ||
| } | ||
|
|
||
| func (mod *UserCacheModule) Enable(team marvin.Team) { | ||
| go func() { | ||
| fmt.Printf("Loading cache entries....\n") | ||
|
||
| err := mod.LoadEntries() | ||
| if err != nil { | ||
| fmt.Errorf("Error whilst updating entries: %s", err.Error()) | ||
| fmt.Printf("Error whilst updating entries: %s\n", err.Error()) | ||
| return | ||
| } | ||
| fmt.Printf("Loaded all entries from the cache.\n") | ||
| }() | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -97,54 +97,45 @@ func (c *Client) ReplaceUserObject(obj *slack.User) { | |
| moduleCacheApi := c.team.GetModule("usercache") | ||
| if moduleCacheApi != nil { | ||
| cacheApi = moduleCacheApi.(userCacheAPI) | ||
| cacheApi.UpdateEntry(obj) | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| } | ||
|
|
||
| obj.CacheTS = time.Now() | ||
| for i, v := range c.Users { | ||
| if v.ID == obj.ID { | ||
| c.Users[i] = obj | ||
|
|
||
| if cacheApi != nil { | ||
| cacheApi.UpdateEntry(*v) | ||
| } | ||
| return | ||
| } | ||
| } | ||
| c.Users = append(c.Users, obj) | ||
| } | ||
|
|
||
| func (c *Client) ReplaceManyUserObjects(objs []*slack.User) { | ||
| func (c *Client) ReplaceManyUserObjects(objs []*slack.User, updateCache bool) { | ||
| c.MetadataLock.Lock() | ||
| defer c.MetadataLock.Unlock() | ||
|
|
||
| var cacheApi userCacheAPI | ||
| moduleCacheApi := c.team.GetModule("usercache") | ||
| if moduleCacheApi != nil { | ||
| if moduleCacheApi != nil && updateCache { | ||
| cacheApi = moduleCacheApi.(userCacheAPI) | ||
| cacheApi.UpdateEntries(objs) | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| } | ||
|
|
||
|
|
||
| now := time.Now() | ||
| for ci, cv := range c.Users { | ||
| for ii, iv := range objs { | ||
| if iv != nil && cv.ID == iv.ID { | ||
| iv.CacheTS = now | ||
| c.Users[ci] = iv | ||
| objs[ii] = nil | ||
|
|
||
| if cacheApi != nil { | ||
| cacheApi.UpdateEntry(*iv) | ||
| } | ||
| } | ||
| } | ||
| } | ||
| for _, iv := range objs { | ||
| if iv != nil { | ||
| iv.CacheTS = now | ||
| c.Users = append(c.Users, iv) | ||
|
|
||
| if cacheApi != nil { | ||
| cacheApi.UpdateEntry(*iv) | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,8 +20,7 @@ type membershipRequest struct { | |
| type userCacheAPI interface { | ||
| marvin.Module | ||
|
|
||
| GetEntry(userid slack.UserID) (slack.User, error) | ||
| UpdateEntry(userobject slack.User) error | ||
| UpdateEntry(userobject *slack.User) error | ||
| UpdateEntries(userobjects []*slack.User) error | ||
| } | ||
|
|
||
|
|
@@ -174,16 +173,17 @@ func (c *Client) fillUsersList() { | |
| util.LogError(errors.Wrapf(err, "[%s] Could not retrieve users list", c.Team.Domain)) | ||
| } | ||
|
|
||
| c.ReplaceManyUserObjects(response.Members, true) | ||
|
|
||
| for response.PageInfo.NextCursor != "" { | ||
| c.ReplaceManyUserObjects(response.Members) | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I had to make this change because otherwise the ReplaceManyUserObjects would get called again for the same group of objects retrieved from the last successful query.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. And with the new changes, it was also not retrieving all the users. |
||
| time.Sleep(2 * time.Second) | ||
|
|
||
| form.Set("cursor", response.PageInfo.NextCursor) | ||
| err := c.team.SlackAPIPostJSON("users.list", form, &response) | ||
| if err != nil { | ||
| util.LogError(errors.Wrapf(err, "[%s] Could not retrieve users list", c.Team.Domain)) | ||
| continue | ||
| } | ||
| c.ReplaceManyUserObjects(response.Members, true) | ||
| } | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leave it at nil, json.Unmarshal(&user) will allocate for you.