Skip to content
This repository has been archived by the owner on Dec 20, 2022. It is now read-only.

fix policy cache missing #26

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 47 additions & 67 deletions policy/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ func (p *policyd) Start(ctx context.Context) <-chan error {
defer close(fch)
defer close(ech)

p.rolePolicies.StartExpired(ctx, p.policyExpiredDuration).
EnableExpiredHook().
SetExpiredHook(func(ctx context.Context, key string) {
//key = <domain>:role.<role>
p.fetchAndCachePolicy(ctx, strings.Split(key, ":role.")[0])
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. ExpiredHook is called after delete, checking policy between time gap with cause error
  2. retry on ExpiredHook fail?
  3. fetchAndCachePolicy() error is not print inside the ExpiredHook

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refer to: #28

p.etagCache.StartExpired(ctx, p.etagFlushDur)
ticker := time.NewTicker(p.refreshDuration)
for {
Expand Down Expand Up @@ -141,9 +147,7 @@ func (p *policyd) Start(ctx context.Context) <-chan error {
// Update updates and cache policy data
func (p *policyd) Update(ctx context.Context) error {
glg.Info("Updating policy")
defer glg.Info("Updated policy")
eg := errgroup.Group{}
rp := gache.New()

for _, domain := range p.athenzDomains {
select {
Expand All @@ -158,7 +162,7 @@ func (p *policyd) Update(ctx context.Context) error {
glg.Info("Update policy interrupted")
return ctx.Err()
default:
return p.fetchAndCachePolicy(ctx, rp, dom)
return p.fetchAndCachePolicy(ctx, dom)
}
})
}
Expand All @@ -168,93 +172,69 @@ func (p *policyd) Update(ctx context.Context) error {
return err
}

rp.StartExpired(ctx, p.policyExpiredDuration).
EnableExpiredHook().
SetExpiredHook(func(ctx context.Context, key string) {
//key = <domain>:role.<role>
p.fetchAndCachePolicy(ctx, p.rolePolicies, strings.Split(key, ":role.")[0])
})

p.rolePolicies, rp = rp, p.rolePolicies
rp.Stop()
rp.Clear()

glg.Info("Policy updated")
return nil
}

// CheckPolicy checks the specified request has privilege to access the resources or not.
// If return is nil then the request is allowed, otherwise the request is rejected.
func (p *policyd) CheckPolicy(ctx context.Context, domain string, roles []string, action, resource string) error {
ech := make(chan error, 1)
cctx, cancel := context.WithCancel(ctx)
defer cancel()
ch := make(chan error, 1)

go func() {
defer close(ech)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to close the channel explicitly

wg := new(sync.WaitGroup)
for _, role := range roles {
dr := fmt.Sprintf("%s:role.%s", domain, role)
wg.Add(1)
go func(ch chan<- error) {
go func(r string) {
defer wg.Done()
select {
case <-cctx.Done():
ch <- cctx.Err()
return
default:
asss, ok := p.rolePolicies.Get(dr)
if !ok {
return
}

for _, ass := range asss.([]*Assertion) {
glg.Debugf("Checking policy domain: %s, role: %v, action: %s, resource: %s, assertion: %v", domain, roles, action, resource, ass)
select {
case <-cctx.Done():
ch <- cctx.Err()
return
default:
if roleAsss, ok := p.rolePolicies.Get(domain); ok {
if asss, ok := roleAsss.(map[string][]*Assertion)[r]; ok {
for _, ass := range asss {
glg.Debugf("Checking policy domain: %s, role: %v, action: %s, resource: %s, assertion: %v", domain, roles, action, resource, ass)
if strings.EqualFold(ass.ResourceDomain, domain) && ass.Reg.MatchString(strings.ToLower(action+"-"+resource)) {
ch <- ass.Effect
Copy link
Contributor

@WindzCUHK WindzCUHK Jul 22, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we will have deadlock here, when multiple deny policy match.

  1. ech size = 1
  2. read from ech only 1 times
  3. when, write more than 2, the go func cannot return, wg.Wait() will block forever

I think the old logic may still have the same problem,
but the old logic handles ctx cancel in a better way,
better to keep the old logic first.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refer to: #30

return
}
}
}
}
}(ech)
}(role)
}
wg.Wait()
ech <- errors.Wrap(ErrNoMatch, "no match")
ch <- errors.Wrap(ErrNoMatch, "no match")
}()

err := <-ech

glg.Debugf("check policy domain: %s, role: %v, action: %s, resource: %s, result: %v", domain, roles, action, resource, err)
return err
select {
case <-ctx.Done():
return ctx.Err()
case err := <-ch:
glg.Debugf("check policy domain: %s, role: %v, action: %s, resource: %s, result: %v", domain, roles, action, resource, err)
return err
}
}

func (p *policyd) GetPolicyCache(ctx context.Context) map[string]interface{} {
return p.rolePolicies.ToRawMap(ctx)
}

func (p *policyd) fetchAndCachePolicy(ctx context.Context, g gache.Gache, dom string) error {
func (p *policyd) fetchAndCachePolicy(ctx context.Context, dom string) error {
spd, upd, err := p.fetchPolicy(ctx, dom)
glg.DebugFunc(func() string {
rawpol, _ := json.Marshal(spd)
return fmt.Sprintf("fetched policy domain: %s, updated: %v, body: %s", dom, upd, (string)(rawpol))
})

if err != nil {
glg.Debugf("fetch policy failed, err: %v", err)
return errors.Wrap(err, "error fetch policy")
}
glg.Debugf("fetch policy success, updated: %v", upd)

if upd {
glg.DebugFunc(func() string {
rawpol, _ := json.Marshal(spd)
return fmt.Sprintf("fetched policy data:\tdomain\t%s\tbody\t%s", dom, (string)(rawpol))
})

if err = simplifyAndCachePolicy(ctx, g, spd); err != nil {
rp, err := simplifyAndCachePolicy(ctx, dom, spd)
if err != nil {
glg.Debugf("simplify and cache error: %v", err)
return errors.Wrap(err, "error simplify and cache")
}
p.rolePolicies.SetWithExpire(dom, rp, time.Duration(spd.DomainSignedPolicyData.SignedPolicyData.Expires.UnixNano()))
}

return nil
Expand Down Expand Up @@ -330,7 +310,7 @@ func (p *policyd) fetchPolicy(ctx context.Context, domain string) (*SignedPolicy
return sp, true, nil
}

func simplifyAndCachePolicy(ctx context.Context, rp gache.Gache, sp *SignedPolicy) error {
func simplifyAndCachePolicy(ctx context.Context, dom string, sp *SignedPolicy) (map[string][]*Assertion, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func simplifyAndCachePolicy(ctx context.Context, dom string, sp *SignedPolicy) (map[string][]*Assertion, error) {
func simplifyPolicy(ctx context.Context, dom string, sp *SignedPolicy) (map[string][]*Assertion, error) {

eg := errgroup.Group{}
assm := new(sync.Map) // assertion map

Expand All @@ -343,6 +323,12 @@ func simplifyAndCachePolicy(ctx context.Context, rp gache.Gache, sp *SignedPolic
case <-ctx.Done():
return ctx.Err()
default:
if strings.SplitN(ass.Role, ":role.", 2)[0] != dom {
glg.Errorf("role is not matches with the domain, role: %v, domain: %v", ass.Role, dom)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
glg.Errorf("role is not matches with the domain, role: %v, domain: %v", ass.Role, dom)
glg.Debugf("role is not matches with the domain, role: %v, domain: %v", ass.Role, dom)

continue
}

// remove duplication of `role+action+reource' assertion
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// remove duplication of `role+action+reource' assertion
// remove duplication of `role+action+resource' assertion

km := fmt.Sprintf("%s,%s,%s", ass.Role, ass.Action, ass.Resource)
if _, ok := assm.Load(km); !ok {
assm.Store(km, ass)
Expand All @@ -358,37 +344,31 @@ func simplifyAndCachePolicy(ctx context.Context, rp gache.Gache, sp *SignedPolic
return nil
})
}

if err := eg.Wait(); err != nil {
return errors.Wrap(err, "error simplify and cache policy")
return nil, errors.Wrap(err, "error simplify and cache policy")
}

rp := make(map[string][]*Assertion)
// cache
var retErr error
assm.Range(func(k interface{}, val interface{}) bool {
ass := val.(*util.Assertion)
a, err := NewAssertion(ass.Action, ass.Resource, ass.Effect)
if err != nil {
glg.Debugf("error adding assertion to the cache, err: %v", err)
glg.Errorf("error adding assertion to the cache, assertion: %+v, err: %v", ass, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
glg.Errorf("error adding assertion to the cache, assertion: %+v, err: %v", ass, err)
glg.Errorf("error creating policy.Assertion, assertion: %+v, err: %v", ass, err)

retErr = err
return false
}

var asss []*Assertion
r := ass.Role
if r, ok := rp.Get(r); ok {
asss = append(r.([]*Assertion), a)
} else {
asss = []*Assertion{a}
}
rp.SetWithExpire(ass.Role, asss, time.Duration(sp.DomainSignedPolicyData.SignedPolicyData.Expires.UnixNano()))
role := strings.SplitN(ass.Role, ":role.", 2)[1]
rp[role] = append(rp[role], a)

glg.Debugf("added assertion to the cache: %+v", ass)
glg.Debugf("added assertion to the cache, domain: %v, role: %v, assertion: %+v", dom, role, ass)
return true
})
if retErr != nil {
return retErr
return nil, errors.Wrap(retErr, "error process assertion")
}

return nil
return rp, nil
}