Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.TimeUnit;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Strings.isNullOrEmpty;
import static org.graylog2.shared.utilities.StringUtils.f;

Expand Down Expand Up @@ -61,11 +62,10 @@ public RefreshingLockService(LockService lockService,
* @throws AlreadyLockedException when the resource couldn't be locked
*/
public void acquireAndKeepLock(String resource, int maxConcurrency) throws AlreadyLockedException {
Optional<Lock> optionalLock = lockService.lock(resource, maxConcurrency);
if (optionalLock.isEmpty()) {
throw new AlreadyLockedException(f("Could not acquire lock for resource <%s> with max concurrency <%d>", resource, maxConcurrency));
}
scheduleLock(optionalLock.get());
assertNoLockYet();
final var newLock = lockService.lock(resource, maxConcurrency)
.orElseThrow(() -> new AlreadyLockedException(f("Could not acquire lock for resource <%s> with max concurrency <%d>", resource, maxConcurrency)));
scheduleLock(newLock);
}

/**
Expand All @@ -76,12 +76,15 @@ public void acquireAndKeepLock(String resource, int maxConcurrency) throws Alrea
* @throws AlreadyLockedException when the resource couldn't be locked
*/
public void acquireAndKeepLock(String resource, String lockContext) throws AlreadyLockedException {
assertNoLockYet();
checkArgument(!isNullOrEmpty(lockContext), "lockContext cannot be blank");
Optional<Lock> optionalLock = lockService.lock(resource, lockContext);
if (optionalLock.isEmpty()) {
throw new AlreadyLockedException(f("Could not acquire lock for resource <%s> and lock context <%s>", resource, lockContext));
}
scheduleLock(optionalLock.get());
final var newLock = lockService.lock(resource, lockContext)
.orElseThrow(() -> new AlreadyLockedException(f("Could not acquire lock for resource <%s> and lock context <%s>", resource, lockContext)));
scheduleLock(newLock);
}

private void assertNoLockYet() {
checkState(lock == null, "Unable to acquire new lock, already holding lock that would get lost: " + lock);
}

private void scheduleLock(Lock newLock) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog2.cluster.lock;

import org.graylog2.shared.SuppressForbidden;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import java.time.Duration;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
public class RefreshingLockServiceTest {

@Mock
private LockService lockService;

private ScheduledExecutorService scheduler;

private RefreshingLockService refreshingLockService;

private final Lock firstLock = Lock.builder()
.resource("first-resource")
.lockedBy("node-123")
.createdAt(ZonedDateTime.now(ZoneOffset.UTC))
.updatedAt(ZonedDateTime.now(ZoneOffset.UTC))
.build();

private final Lock secondLock = Lock.builder()
.resource("second-resource")
.lockedBy("node-123")
.createdAt(ZonedDateTime.now(ZoneOffset.UTC))
.updatedAt(ZonedDateTime.now(ZoneOffset.UTC))
.build();

@BeforeEach
@SuppressForbidden("Using Executors.newSingleThreadScheduledExecutor() is okay in tests")
void setUp() {
scheduler = Executors.newSingleThreadScheduledExecutor();
refreshingLockService = new RefreshingLockService(
lockService,
scheduler,
Duration.ofMinutes(5)
);
}

@AfterEach
void tearDown() {
if (scheduler != null) {
scheduler.shutdownNow();
}
}

@Test
void throwsIllegalStateExceptionWhenAcquiringLockWhileAlreadyHoldingOne() throws AlreadyLockedException {
// Mock the lock service to return locks
when(lockService.lock(eq("first-resource"), anyString()))
.thenReturn(Optional.of(firstLock));

// Acquire first lock successfully
refreshingLockService.acquireAndKeepLock("first-resource", "context-1");

// Attempt to acquire second lock while still holding the first
assertThatThrownBy(() -> refreshingLockService.acquireAndKeepLock("second-resource", "context-2"))
.isInstanceOf(IllegalStateException.class)
.hasMessageContaining("Unable to acquire new lock, already holding lock that would get lost");
}

@Test
void throwsIllegalStateExceptionWhenAcquiringLockWithMaxConcurrencyWhileAlreadyHoldingOne() throws AlreadyLockedException {
// Mock the lock service to return locks
when(lockService.lock(eq("first-resource"), eq(1)))
.thenReturn(Optional.of(firstLock));

// Acquire first lock successfully
refreshingLockService.acquireAndKeepLock("first-resource", 1);

// Attempt to acquire second lock while still holding the first
assertThatThrownBy(() -> refreshingLockService.acquireAndKeepLock("second-resource", 1))
.isInstanceOf(IllegalStateException.class)
.hasMessageContaining("Unable to acquire new lock, already holding lock that would get lost");
}
}
Loading