Skip to content

Commit 097776b

Browse files
committed
Add partition awareness to shard activity tracking
1 parent 0080557 commit 097776b

4 files changed

Lines changed: 68 additions & 26 deletions

File tree

README.md

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,7 @@ This command helps identify which shards are receiving the most write activity b
314314

315315
**Enhanced output features:**
316316
- **Checkpoint visibility**: Shows actual `local_checkpoint` values (CP Start → CP End → Delta)
317+
- **Partition awareness**: Separate tracking for partitioned tables (different partition_ident values)
317318
- **Activity trends**: 🔥 HOT (≥100/s), 📈 HIGH (≥50/s), 📊 MED (≥10/s), 📉 LOW (<10/s)
318319
- **Smart insights**: Identifies concentration patterns and load distribution (non-watch mode)
319320
- **Flexible filtering**: Exclude system tables, set minimum rates, hide replicas
@@ -328,16 +329,15 @@ This approach captures shards that become active during the observation period,
328329
329330
Total checkpoint activity: 190,314 changes, Average rate: 2,109.0/sec
330331
331-
Rank | Schema.Table | Shard | Node | Type | CP Start | CP End | Checkpoint Δ | Rate/sec | Trend
332-
---------------------------------------------------------------------------------------------------------------
333-
1 | gc.scheduled_jobs_log | 0 | data-hot-8 | P | 2,847,291 | 2,961,035 | 113,744 | 3,791.5 | 🔥 HOT
334-
2 | TURVO.events | 0 | data-hot-0 | P | 1,234,567 | 1,280,404 | 45,837 | 1,527.9 | 🔥 HOT
335-
3 | doc.user_actions | 1 | data-hot-2 | P | 987,654 | 1,018,387 | 30,733 | 1,024.4 | 🔥 HOT
332+
Rank | Schema.Table | Shard | Partition | Node | Type | Checkpoint Δ | Rate/sec | Trend
333+
-----------------------------------------------------------------------------------------------------------
334+
1 | gc.scheduled_jobs_log | 0 | - | data-hot-8 | P | 113,744 | 3,791.5 | 🔥 HOT
335+
2 | TURVO.events | 0 | 04732dpl6osj8d | data-hot-0 | P | 45,837 | 1,527.9 | 🔥 HOT
336+
3 | doc.user_actions | 1 | 04732dpk70rj6d | data-hot-2 | P | 30,733 | 1,024.4 | 🔥 HOT
336337
337338
Legend:
338-
• CP Start: local_checkpoint at start of observation
339-
• CP End: local_checkpoint at end of observation
340-
• Checkpoint Δ: CP End - CP Start (write operations during period)
339+
• Checkpoint Δ: Write operations during observation period
340+
• Partition: partition_ident (truncated if >14 chars, '-' if none)
341341
342342
Insights:
343343
• 3 HOT shards (≥100 changes/sec) - consider load balancing
@@ -352,11 +352,11 @@ Insights:
352352
353353
Total checkpoint activity: 190,314 changes, Average rate: 2,109.0/sec
354354
355-
Rank | Schema.Table | Shard | Node | Type | CP Start | CP End | Checkpoint Δ | Rate/sec | Trend
356-
---------------------------------------------------------------------------------------------------------------
357-
1 | gc.scheduled_jobs_log | 0 | data-hot-8 | P | 2,847,291 | 2,961,035 | 113,744 | 3,791.5 | 🔥 HOT
358-
2 | TURVO.events | 0 | data-hot-0 | P | 1,234,567 | 1,280,404 | 45,837 | 1,527.9 | 🔥 HOT
359-
3 | doc.user_actions | 1 | data-hot-2 | P | 987,654 | 1,018,387 | 30,733 | 1,024.4 | 🔥 HOT
355+
Rank | Schema.Table | Shard | Partition | Node | Type | Checkpoint Δ | Rate/sec | Trend
356+
-----------------------------------------------------------------------------------------------------------
357+
1 | gc.scheduled_jobs_log | 0 | - | data-hot-8 | P | 113,744 | 3,791.5 | 🔥 HOT
358+
2 | TURVO.events | 0 | 04732dpl6osj8d | data-hot-0 | P | 45,837 | 1,527.9 | 🔥 HOT
359+
3 | doc.user_actions | 1 | 04732dpk70rj6d | data-hot-2 | P | 30,733 | 1,024.4 | 🔥 HOT
360360
361361
━━━ Next update in 30s ━━━
362362
```

src/xmover/analyzer.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1082,7 +1082,7 @@ def format_activity_display(self, activities: List['ActiveShardActivity'],
10821082
output.append("")
10831083

10841084
# Create table headers
1085-
headers = ["Rank", "Schema.Table", "Shard", "Node", "Type", "CP Start", "CP End", "Checkpoint Δ", "Rate/sec", "Trend"]
1085+
headers = ["Rank", "Schema.Table", "Shard", "Partition", "Node", "Type", "Checkpoint Δ", "Rate/sec", "Trend"]
10861086

10871087
# Calculate column widths
10881088
col_widths = [len(h) for h in headers]
@@ -1094,10 +1094,9 @@ def format_activity_display(self, activities: List['ActiveShardActivity'],
10941094
rank = str(i)
10951095
table_id = activity.table_identifier
10961096
shard_id = str(activity.shard_id)
1097+
partition = activity.partition_ident[:14] + "..." if len(activity.partition_ident) > 14 else activity.partition_ident or "-"
10971098
node = activity.node_name
10981099
shard_type = "P" if activity.is_primary else "R"
1099-
cp_start = f"{activity.snapshot1.local_checkpoint:,}"
1100-
cp_end = f"{activity.snapshot2.local_checkpoint:,}"
11011100
checkpoint_delta = f"{activity.local_checkpoint_delta:,}"
11021101
rate = f"{activity.activity_rate:.1f}" if activity.activity_rate >= 0.1 else "<0.1"
11031102

@@ -1111,7 +1110,7 @@ def format_activity_display(self, activities: List['ActiveShardActivity'],
11111110
else:
11121111
trend = "📉 LOW"
11131112

1114-
row = [rank, table_id, shard_id, node, shard_type, cp_start, cp_end, checkpoint_delta, rate, trend]
1113+
row = [rank, table_id, shard_id, partition, node, shard_type, checkpoint_delta, rate, trend]
11151114
rows.append(row)
11161115

11171116
# Update column widths
@@ -1132,10 +1131,9 @@ def format_activity_display(self, activities: List['ActiveShardActivity'],
11321131
if not watch_mode:
11331132
output.append("")
11341133
output.append("Legend:")
1135-
output.append(" • CP Start: local_checkpoint at start of observation")
1136-
output.append(" • CP End: local_checkpoint at end of observation")
1137-
output.append(" • Checkpoint Δ: CP End - CP Start (write operations during period)")
1134+
output.append(" • Checkpoint Δ: Write operations during observation period")
11381135
output.append(" • Rate/sec: Checkpoint changes per second")
1136+
output.append(" • Partition: partition_ident (truncated if >14 chars, '-' if none)")
11391137
output.append(" • Type: P=Primary, R=Replica")
11401138
output.append(" • Trend: 🔥 HOT (≥100/s), 📈 HIGH (≥50/s), 📊 MED (≥10/s), 📉 LOW (<10/s)")
11411139

src/xmover/database.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,9 +133,10 @@ def translog_uncommitted_mb(self) -> float:
133133

134134
@property
135135
def shard_identifier(self) -> str:
136-
"""Unique identifier for this shard"""
136+
"""Unique identifier for this shard including partition"""
137137
shard_type = "P" if self.is_primary else "R"
138-
return f"{self.schema_name}.{self.table_name}:{self.shard_id}:{self.node_name}:{shard_type}"
138+
partition = f":{self.partition_ident}" if self.partition_ident else ""
139+
return f"{self.schema_name}.{self.table_name}:{self.shard_id}:{self.node_name}:{shard_type}{partition}"
139140

140141

141142
@dataclass

tests/test_active_shard_monitor.py

Lines changed: 47 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -266,8 +266,7 @@ def test_format_activity_display_with_activities(self):
266266
assert "P" in display # primary indicator
267267
assert "Legend:" in display
268268
assert "Trend:" in display # new trend column explanation
269-
assert "CP Start:" in display # checkpoint start column
270-
assert "CP End:" in display # checkpoint end column
269+
assert "Partition:" in display # new partition column explanation
271270

272271
def test_format_activity_display_empty(self):
273272
"""Test formatting activity display with no data"""
@@ -382,6 +381,50 @@ def test_primary_replica_separation(self):
382381
# This test prevents the bug where we mixed primary CP End with replica CP Start
383382
# which created fake deltas like 129434 - 15876 = 113558
384383

384+
def test_partition_separation(self):
385+
"""Test that partitions within the same table/shard are tracked separately"""
386+
# Create snapshots with same table/shard but different partitions
387+
snapshot1 = [
388+
# Partition 1
389+
self.create_test_snapshot("TURVO", "appointmentFormFieldData_events", 0, "data-hot-8", True, 32684, 100.0),
390+
# Partition 2 (same table/shard/node/type but different partition)
391+
self.create_test_snapshot("TURVO", "appointmentFormFieldData_events", 0, "data-hot-8", True, 54289, 100.0),
392+
]
393+
394+
# Modify partition_ident for the snapshots to simulate different partitions
395+
snapshot1[0].partition_ident = "04732dpl6osj8d1g60o30c1g"
396+
snapshot1[1].partition_ident = "04732dpl6os3adpm60o30c1g"
397+
398+
snapshot2 = [
399+
# Partition 1 progresses
400+
self.create_test_snapshot("TURVO", "appointmentFormFieldData_events", 0, "data-hot-8", True, 32800, 130.0), # +116 delta
401+
# Partition 2 progresses
402+
self.create_test_snapshot("TURVO", "appointmentFormFieldData_events", 0, "data-hot-8", True, 54400, 130.0), # +111 delta
403+
]
404+
405+
# Set partition_ident for second snapshot
406+
snapshot2[0].partition_ident = "04732dpl6osj8d1g60o30c1g"
407+
snapshot2[1].partition_ident = "04732dpl6os3adpm60o30c1g"
408+
409+
activities = self.monitor.compare_snapshots(snapshot1, snapshot2, min_activity_threshold=1)
410+
411+
# Should have 2 separate activities (partitions tracked separately)
412+
assert len(activities) == 2
413+
414+
# Verify deltas are calculated correctly for each partition
415+
partition1_activity = next(a for a in activities if "04732dpl6osj8d1g60o30c1g" in a.snapshot1.shard_identifier)
416+
partition2_activity = next(a for a in activities if "04732dpl6os3adpm60o30c1g" in a.snapshot1.shard_identifier)
417+
418+
assert partition1_activity.local_checkpoint_delta == 116 # 32800 - 32684
419+
assert partition2_activity.local_checkpoint_delta == 111 # 54400 - 54289
420+
421+
# Verify they have different shard identifiers due to partition
422+
assert partition1_activity.snapshot1.shard_identifier != partition2_activity.snapshot1.shard_identifier
423+
assert ":04732dpl6osj8d1g60o30c1g" in partition1_activity.snapshot1.shard_identifier
424+
assert ":04732dpl6os3adpm60o30c1g" in partition2_activity.snapshot1.shard_identifier
425+
426+
# This test prevents mixing partitions which would create fake activity measurements
427+
385428
def test_format_activity_display_watch_mode(self):
386429
"""Test that watch mode excludes legend and insights"""
387430
snapshot1 = self.create_test_snapshot("schema1", "table1", 1, "node1", True, 1000, 100.0)
@@ -404,13 +447,13 @@ def test_format_activity_display_watch_mode(self):
404447
normal_display = self.monitor.format_activity_display([activity], show_count=10, watch_mode=False)
405448
assert "Legend:" in normal_display
406449
assert "Insights:" in normal_display
407-
assert "CP Start:" in normal_display
450+
assert "Checkpoint Δ:" in normal_display
408451

409452
# Test watch mode (should exclude legend and insights)
410453
watch_display = self.monitor.format_activity_display([activity], show_count=10, watch_mode=True)
411454
assert "Legend:" not in watch_display
412455
assert "Insights:" not in watch_display
413-
assert "CP Start:" not in watch_display
456+
assert "Checkpoint Δ" in watch_display # Core data should still be present
414457

415458
# But should still contain the core data
416459
assert "Most Active Shards" in watch_display

0 commit comments

Comments
 (0)