Skip to content

Commit b1ebcdd

Browse files
authored
chore: add more logs and make redis cache as optional (#1422)
1 parent a09cc6a commit b1ebcdd

File tree

2 files changed

+57
-13
lines changed

2 files changed

+57
-13
lines changed

libs/access-control/src/casbin/enforcer_v2.rs

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -136,17 +136,19 @@ impl AFEnforcerV2 {
136136
pending_operations: Arc<RwLock<HashSet<(String, String)>>>,
137137
generation_notify: Arc<Notify>,
138138
) {
139-
trace!("[access control v2]: Policy update processor started");
139+
info!("[access control v2]: Policy update processor started");
140140
let buffer_size = 5;
141141
let mut buf = Vec::with_capacity(buffer_size);
142142

143143
loop {
144+
trace!("[access control v2]: Waiting for policy commands...");
144145
let n = rx.recv_many(&mut buf, buffer_size).await;
145146
if n == 0 {
146-
trace!("[access control v2]: Channel closed, exiting processor");
147+
info!("[access control v2]: Channel closed, exiting processor");
147148
break;
148149
}
149150

151+
trace!("[access control v2]: Received {} policy commands", n);
150152
let mut enforcer = enforcer.write().await;
151153
let mut max_generation = 0u64;
152154
let mut processed_keys = Vec::new();
@@ -170,6 +172,7 @@ impl AFEnforcerV2 {
170172
Ok(())
171173
}
172174
.await;
175+
trace!("[access control v2]: AddPolicies result: {:?}", result);
173176
let _ = response.send(result);
174177
},
175178
PolicyCommand::RemovePolicies {
@@ -189,6 +192,7 @@ impl AFEnforcerV2 {
189192
Ok(())
190193
}
191194
.await;
195+
trace!("[access control v2]: RemovePolicies result: {:?}", result);
192196
let _ = response.send(result);
193197
},
194198
PolicyCommand::Shutdown => {
@@ -198,8 +202,14 @@ impl AFEnforcerV2 {
198202
}
199203
}
200204
drop(enforcer);
205+
trace!("[access control v2]: Finished processing {} commands", n);
201206
// Update consistency tracking
202207
if max_generation > 0 {
208+
trace!(
209+
"[access control v2]: Updating processed generation from {} to {}",
210+
processed_generation.load(Ordering::Relaxed),
211+
max_generation
212+
);
203213
processed_generation.store(max_generation, Ordering::Release);
204214
if !processed_keys.is_empty() {
205215
let mut pending = pending_operations.write().await;
@@ -209,6 +219,10 @@ impl AFEnforcerV2 {
209219
}
210220

211221
// Notify waiters
222+
trace!(
223+
"[access control v2]: Notifying waiters after processing generation {}",
224+
max_generation
225+
);
212226
generation_notify.notify_waiters();
213227
}
214228
}
@@ -224,7 +238,10 @@ impl AFEnforcerV2 {
224238

225239
// First try to send without blocking to detect if channel is full
226240
match self.policy_cmd_tx.try_send(cmd) {
227-
Ok(()) => Ok(()),
241+
Ok(()) => {
242+
trace!("[access control v2]: Command sent successfully");
243+
Ok(())
244+
},
228245
Err(mpsc::error::TrySendError::Full(cmd)) => {
229246
self
230247
.metrics_state
@@ -234,7 +251,10 @@ impl AFEnforcerV2 {
234251
warn!("[access control v2]: Policy channel is full, waiting to send...");
235252
let send_timeout = Duration::from_secs(5);
236253
match timeout(send_timeout, self.policy_cmd_tx.send(cmd)).await {
237-
Ok(Ok(())) => Ok(()),
254+
Ok(Ok(())) => {
255+
trace!("[access control v2]: Command sent successfully after waiting");
256+
Ok(())
257+
},
238258
Ok(Err(_)) => {
239259
self
240260
.metrics_state
@@ -310,8 +330,14 @@ impl AFEnforcerV2 {
310330
})
311331
.await?;
312332

313-
rx.await
314-
.map_err(|_| AppError::Internal(anyhow!("Policy update response dropped")))?
333+
let result = rx
334+
.await
335+
.map_err(|_| AppError::Internal(anyhow!("Policy update response dropped")))?;
336+
trace!(
337+
"[access control v2]: Received policy update response: {:?}",
338+
result
339+
);
340+
result
315341
}
316342

317343
/// Remove policies for a subject and object type.
@@ -362,8 +388,14 @@ impl AFEnforcerV2 {
362388
})
363389
.await?;
364390

365-
rx.await
366-
.map_err(|_| AppError::Internal(anyhow!("Policy update response dropped")))?
391+
let result = rx
392+
.await
393+
.map_err(|_| AppError::Internal(anyhow!("Policy update response dropped")))?;
394+
trace!(
395+
"[access control v2]: Received policy removal response: {:?}",
396+
result
397+
);
398+
result
367399
}
368400

369401
/// Enforces an access control policy with eventual consistency.
@@ -455,12 +487,13 @@ impl AFEnforcerV2 {
455487
let timeout_duration = timeout_duration.unwrap_or(Duration::from_secs(5));
456488
let wait_future = async {
457489
loop {
490+
let notified = self.generation_notify.notified();
458491
let processed = self.processed_generation.load(Ordering::Acquire);
459492
if processed >= target_generation {
460493
return Ok(());
461494
}
462-
// Wait for notification
463-
self.generation_notify.notified().await;
495+
496+
notified.await;
464497
}
465498
};
466499

src/application.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -248,13 +248,24 @@ pub async fn init_state(config: &Config, rt_cmd_tx: CLCommandSender) -> Result<A
248248
let pg_listeners = Arc::new(PgListeners::new(&pg_pool).await?);
249249
// let collab_member_listener = pg_listeners.subscribe_collab_member_change();
250250

251+
let use_redis_ac_cache = get_env_var("APPFLOWY_ACCESS_CONTROL_REDIS_CACHE_ENABLED", "false")
252+
.parse::<bool>()
253+
.unwrap_or(false);
254+
251255
info!(
252-
"Setting up access controls, is_enable: {}",
253-
&config.access_control.is_enabled
256+
"Setting up access controls, is_enable: {}, use redis cache: {}",
257+
&config.access_control.is_enabled, use_redis_ac_cache,
254258
);
259+
260+
let redis_uri = if use_redis_ac_cache {
261+
Some(config.redis_uri.expose_secret().as_str())
262+
} else {
263+
None
264+
};
265+
255266
let access_control = AccessControl::new(
256267
pg_pool.clone(),
257-
Some(config.redis_uri.expose_secret()),
268+
redis_uri,
258269
metrics.access_control_metrics.clone(),
259270
)
260271
.await?;

0 commit comments

Comments
 (0)