Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
894 changes: 778 additions & 116 deletions api/doc/openapi.json

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions api/internal/features/deploy/tasks/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (t *TaskService) SetupCreateDeploymentQueue() {

TaskCreateDeployment = taskq.RegisterTask(&taskq.TaskOptions{
Name: TASK_CREATE_DEPLOYMENT,
RetryLimit: 0,
RetryLimit: 1,
Handler: func(ctx context.Context, data shared_types.TaskPayload) error {
fmt.Printf("[%s] start: correlation_id=%s\n", TASK_CREATE_DEPLOYMENT, data.CorrelationID)
err := t.BuildPack(ctx, data)
Expand All @@ -91,7 +91,7 @@ func (t *TaskService) SetupCreateDeploymentQueue() {

TaskUpdateDeployment = taskq.RegisterTask(&taskq.TaskOptions{
Name: TASK_UPDATE_DEPLOYMENT,
RetryLimit: 0,
RetryLimit: 1,
Handler: func(ctx context.Context, data shared_types.TaskPayload) error {
fmt.Println("Updating deployment")
err := t.HandleUpdateDeployment(ctx, data)
Expand All @@ -116,7 +116,7 @@ func (t *TaskService) SetupCreateDeploymentQueue() {

TaskReDeploy = taskq.RegisterTask(&taskq.TaskOptions{
Name: TASK_REDEPLOYMENT,
RetryLimit: 0,
RetryLimit: 1,
Handler: func(ctx context.Context, data shared_types.TaskPayload) error {
fmt.Println("Redeploying application")
err := t.HandleReDeploy(ctx, data)
Expand All @@ -141,7 +141,7 @@ func (t *TaskService) SetupCreateDeploymentQueue() {

TaskRollback = taskq.RegisterTask(&taskq.TaskOptions{
Name: TASK_ROLLBACK,
RetryLimit: 0,
RetryLimit: 1,
Handler: func(ctx context.Context, data shared_types.TaskPayload) error {
fmt.Println("Rolling back deployment")
err := t.HandleRollback(ctx, data)
Expand All @@ -166,7 +166,7 @@ func (t *TaskService) SetupCreateDeploymentQueue() {

TaskRestart = taskq.RegisterTask(&taskq.TaskOptions{
Name: TASK_RESTART,
RetryLimit: 0,
RetryLimit: 1,
Handler: func(ctx context.Context, data shared_types.TaskPayload) error {
fmt.Println("Restarting deployment")
err := t.HandleRestart(ctx, data)
Expand Down
121 changes: 117 additions & 4 deletions api/internal/queue/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ package queue

import (
"context"
"fmt"
"log"
"sync"
"time"

"github.com/go-redis/redis/v8"

Expand All @@ -10,28 +14,137 @@ import (
)

var (
redisClient *redis.Client
factory taskq.Factory
redisClient *redis.Client
factory taskq.Factory
onceConsumers sync.Once
consumersStarted bool
registeredQueues []string
queuesMutex sync.RWMutex
)

// Init initializes the queue factory with a shared Redis v8 client.
func Init(client *redis.Client) {
redisClient = client
factory = redisq.NewFactory()
registeredQueues = make([]string, 0)
}

// RegisterQueue registers a new queue with the shared redis client.
func RegisterQueue(opts *taskq.QueueOptions) taskq.Queue {
if opts.Redis == nil {
opts.Redis = redisClient
}
return factory.RegisterQueue(opts)
queue := factory.RegisterQueue(opts)

// Track registered queue names for cleanup
queuesMutex.Lock()
registeredQueues = append(registeredQueues, opts.Name)
queuesMutex.Unlock()

log.Printf("Registered queue: %s (MinNumWorker: %d, MaxNumWorker: %d)", opts.Name, opts.MinNumWorker, opts.MaxNumWorker)
return queue
}

// cleanupDeadConsumers removes dead consumers from Redis consumer groups.
// Dead consumers are those that haven't been active for more than ConsumerIdleTimeout.
// This helps prevent accumulation of dead consumer entries after restarts.
func cleanupDeadConsumers(ctx context.Context) error {
if redisClient == nil {
return nil
}

queuesMutex.RLock()
queueNames := make([]string, len(registeredQueues))
copy(queueNames, registeredQueues)
queuesMutex.RUnlock()

if len(queueNames) == 0 {
return nil
}

log.Println("Cleaning up dead consumers from Redis consumer groups...")
cleanedCount := 0

// taskq uses "taskq" as the default consumer group name
groupName := "taskq"

for _, queueName := range queueNames {
// Get stream key for this queue (taskq format: taskq:{queueName})
streamKey := fmt.Sprintf("taskq:{%s}", queueName)

// Get consumer information
cmd := redisClient.XInfoConsumers(ctx, streamKey, groupName)
if cmd.Err() != nil {
// Consumer group might not exist yet, skip
continue
}

consumers, err := cmd.Result()
if err != nil {
log.Printf("Warning: Failed to get consumers for queue %s: %v", queueName, err)
continue
}

// Check each consumer and remove if idle for too long
for _, consumer := range consumers {
idleTime := time.Duration(consumer.Idle) * time.Millisecond
// Consider consumers idle for more than 15 minutes as dead
// (longer than ConsumerIdleTimeout to account for processing time)
// Only remove if they have no pending messages
if idleTime > 15*time.Minute && consumer.Pending == 0 {
delCmd := redisClient.XGroupDelConsumer(ctx, streamKey, groupName, consumer.Name)
if delCmd.Err() == nil {
log.Printf("Removed dead consumer '%s' from queue '%s' (idle for %v)", consumer.Name, queueName, idleTime)
cleanedCount++
} else {
log.Printf("Warning: Failed to remove dead consumer '%s' from queue '%s': %v", consumer.Name, queueName, delCmd.Err())
}
} else if consumer.Pending > 0 {
log.Printf("Skipping consumer '%s' from queue '%s' (has %d pending messages, idle for %v)", consumer.Name, queueName, consumer.Pending, idleTime)
}
}
}

if cleanedCount > 0 {
log.Printf("Cleaned up %d dead consumer(s)", cleanedCount)
} else {
log.Println("No dead consumers found to clean up")
}

return nil
}

// StartConsumers starts consumers for all registered queues. This function is idempotent
// and will only start consumers once, even if called multiple times.
// It also cleans up dead consumers from previous restarts before starting new ones.
func StartConsumers(ctx context.Context) error {
return factory.StartConsumers(ctx)
var err error
onceConsumers.Do(func() {
// Clean up dead consumers before starting new ones
if cleanupErr := cleanupDeadConsumers(ctx); cleanupErr != nil {
log.Printf("Warning: Failed to cleanup dead consumers: %v", cleanupErr)
}

log.Println("Starting task queue consumers...")
err = factory.StartConsumers(ctx)
if err != nil {
log.Printf("Error starting consumers: %v", err)
} else {
consumersStarted = true
log.Println("Task queue consumers started successfully")
}
})
return err
}

// IsConsumersStarted returns whether consumers have been started
func IsConsumersStarted() bool {
return consumersStarted
}

// Close gracefully closes all consumers and cleans up resources
func Close() error {
log.Println("Closing task queue consumers...")
consumersStarted = false
return factory.Close()
}
1 change: 1 addition & 0 deletions api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func main() {
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan
log.Println("Shutting down...")
queue.Close()
schedulers.Main.Stop()
schedulers.HealthCheck.Stop()
os.Exit(0)
Expand Down
1 change: 1 addition & 0 deletions view/app/extensions/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ export default function ExtensionsPage() {
<PageLayout maxWidth="full" padding="md" spacing="lg">
<MainPageHeader
label={t('extensions.title')}
description={t('extensions.description')}
actions={
<div className="flex items-center gap-12">
<SearchBar
Expand Down
1 change: 1 addition & 0 deletions view/lib/i18n/locales/en/extensions.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"extensions": {
"title": "Extensions",
"description": "Discover and install extensions on your server",
"subtitle": "Skip the boilerplate, Kick off your server setup in seconds",
"exploreExtensions": "Explore Extensions Hub",
"exploreExtensionsTitle": "Explore extensions",
Expand Down
1 change: 1 addition & 0 deletions view/lib/i18n/locales/es/extensions.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"extensions": {
"title": "Extensiones",
"description": "Descubre e instala extensiones para mejorar la configuración de tu servidor",
"subtitle": "Omite el boilerplate, inicia tu configuración de servidor en segundos",
"exploreExtensions": "Explorar Hub de Extensiones",
"exploreExtensionsTitle": "Explorar extensiones",
Expand Down
1 change: 1 addition & 0 deletions view/lib/i18n/locales/fr/extensions.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"extensions": {
"title": "Extensions",
"description": "Découvrez et installez des extensions pour améliorer la configuration de votre serveur",
"subtitle": "Omettez le boilerplate, lancez votre configuration de serveur en quelques secondes",
"exploreExtensions": "Explorer le Hub d'Extensions",
"exploreExtensionsTitle": "Explorer les extensions",
Expand Down
1 change: 1 addition & 0 deletions view/lib/i18n/locales/kn/extensions.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"extensions": {
"title": "ವಿಸ್ತರಣೆಗಳು",
"description": "ನಿಮ್ಮ ಸರ್ವರ್ ಸೆಟಪ್ ಅನ್ನು ವರ್ಧಿಸಲು ವಿಸ್ತರಣೆಗಳನ್ನು ಕಂಡುಹಿಡಿಯಿರಿ ಮತ್ತು ಸ್ಥಾಪಿಸಿ",
"subtitle": "ಶಕ್ತಿಶಾಲಿ ವಿಸ್ತರಣೆಗಳೊಂದಿಗೆ ನಿಕ್ಸೋಪಸ್ ಅನ್ನು ವಿಸ್ತರಿಸಿ",
"exploreExtensions": "ವಿಸ್ತರಣೆಗಳ ಹಬ್ ಅನ್ನು ಅನ್ವೇಷಿಸಿ",
"exploreExtensionsTitle": "ವಿಸ್ತರಣೆಗಳನ್ನು ಅನ್ವೇಷಿಸಿ",
Expand Down
1 change: 1 addition & 0 deletions view/lib/i18n/locales/ml/extensions.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"extensions": {
"title": "എക്സ്റ്റെൻഷനുകൾ",
"description": "നിങ്ങളുടെ സർവർ സെറ്റപ്പ് മെച്ചപ്പെടുത്താൻ വിപുലീകരണങ്ങൾ കണ്ടെത്തി ഇൻസ്റ്റാൾ ചെയ്യുക",
"subtitle": "ബോയിലർപ്ലേറ്റ് ഒഴിവാക്കി, നിങ്ങളുടെ സർവർ സെറ്റപ്പ് സെക്കൻഡുകൾക്കുള്ളിൽ ആരംഭിക്കൂ",
"exploreExtensions": "എക്സ്റ്റെൻഷൻസ് ഹബ് അന്വേഷിക്കുക",
"exploreExtensionsTitle": "എക്സ്റ്റെൻഷനുകൾ അന്വേഷിക്കുക",
Expand Down
6 changes: 3 additions & 3 deletions view/packages/components/settings.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -666,12 +666,12 @@ export function SettingsSidebar({
key={cat.id}
onClick={() => onCategoryChange(cat.id)}
className={cn(
'w-full flex items-center gap-3 px-3 py-2 rounded-md text-sm transition-colors',
'w-full flex items-center gap-3 px-3 py-2 rounded-md text-sm transition-colors overflow-hidden text-left',
activeCategory === cat.id ? 'bg-muted font-medium' : 'hover:bg-muted/50'
)}
>
<Icon className="h-4 w-4" />
<span>{cat.label}</span>
<Icon className="h-4 w-4 flex-shrink-0" />
<span className="min-w-0 break-normal leading-normal text-left">{cat.label}</span>
</button>
);
};
Expand Down
18 changes: 17 additions & 1 deletion view/packages/hooks/shared/use-app-sidebar.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { useRouter } from 'next/navigation';
import { useRouter, usePathname } from 'next/navigation';
import { useAppSelector, useAppDispatch } from '@/redux/hooks';
import { useGetUserOrganizationsQuery } from '@/redux/services/users/userApi';
import { useNavigationState } from '@/packages/hooks/shared/use_navigation_state';
Expand Down Expand Up @@ -58,6 +58,7 @@ export function useAppSidebar() {
const dispatch = useAppDispatch();
const { canAccessResource } = useRBAC();
const router = useRouter();
const pathname = usePathname();
const [showLogoutDialog, setShowLogoutDialog] = useState(false);
const { closeSettings } = useSettingsModal();

Expand Down Expand Up @@ -262,6 +263,21 @@ Add any other context about the problem here.`;
}
}, [activeOrg?.id, refetch]);

// Sync activeNav with current pathname to prevent multiple active menu items
useEffect(() => {
if (pathname) {
// Find the matching navigation item URL for the current pathname
const matchingNavItem = data.navMain.find(
(item) => pathname === item.url || pathname.startsWith(item.url + '/')
);

// Only update activeNav if we found a match and it's different from current activeNav
if (matchingNavItem && matchingNavItem.url !== activeNav) {
setActiveNav(matchingNavItem.url);
}
}
}, [pathname, activeNav, setActiveNav]);

return {
user,
isLoading,
Expand Down
Loading