diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/AbstractPersistedAddressSetting.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/AbstractPersistedAddressSetting.java index 0aefe2a72fc..671db598afb 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/AbstractPersistedAddressSetting.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/AbstractPersistedAddressSetting.java @@ -18,10 +18,9 @@ package org.apache.activemq.artemis.core.persistence.config; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; -public abstract class AbstractPersistedAddressSetting implements EncodingSupport { +public abstract class AbstractPersistedAddressSetting extends PersistedConfiguration { protected long storeId; @@ -38,15 +37,6 @@ public AbstractPersistedAddressSetting(SimpleString addressMatch, AddressSetting this.setting = setting; } - public long getStoreId() { - return storeId; - } - - public AbstractPersistedAddressSetting setStoreId(long storeId) { - this.storeId = storeId; - return this; - } - public SimpleString getAddressMatch() { return addressMatch; } @@ -64,4 +54,9 @@ public AbstractPersistedAddressSetting setSetting(AddressSettings setting) { this.setting = setting; return this; } + + @Override + public String getName() { + return addressMatch.toString(); + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedAddressSetting.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedAddressSetting.java index dcb06cbe150..dbb5c5fe6a2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedAddressSetting.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedAddressSetting.java @@ -18,6 +18,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.core.journal.EncodingSupport; +import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; /** @@ -63,4 +64,8 @@ public int getEncodeSize() { return addressMatch.sizeof() + setting.getEncodeSize(); } + @Override + public byte getRecordType() { + return JournalRecordIds.ADDRESS_SETTING_RECORD; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedAddressSettingJSON.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedAddressSettingJSON.java index 99ce6597a3d..1a9e0fa2f98 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedAddressSettingJSON.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedAddressSettingJSON.java @@ -18,10 +18,10 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.journal.EncodingSupport; +import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; -public class PersistedAddressSettingJSON extends AbstractPersistedAddressSetting implements EncodingSupport { +public class PersistedAddressSettingJSON extends AbstractPersistedAddressSetting { SimpleString jsonSetting; @@ -68,4 +68,9 @@ public int getEncodeSize() { public String toString() { return "PersistedAddressSettingJSON{" + "jsonSetting=" + jsonSetting + ", storeId=" + storeId + ", addressMatch=" + addressMatch + ", setting=" + setting + '}'; } + + @Override + public byte getRecordType() { + return JournalRecordIds.ADDRESS_SETTING_RECORD_JSON; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedBridgeConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedBridgeConfiguration.java index d10edbb9e4c..438796ee445 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedBridgeConfiguration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedBridgeConfiguration.java @@ -18,11 +18,10 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.core.config.BridgeConfiguration; -import org.apache.activemq.artemis.core.journal.EncodingSupport; +import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds; -public class PersistedBridgeConfiguration implements EncodingSupport { +public class PersistedBridgeConfiguration extends PersistedConfiguration { - private long storeId; private BridgeConfiguration bridgeConfiguration; @Override @@ -38,14 +37,6 @@ public PersistedBridgeConfiguration() { bridgeConfiguration = new BridgeConfiguration(); } - public void setStoreId(long id) { - this.storeId = id; - } - - public long getStoreId() { - return storeId; - } - @Override public int getEncodeSize() { return bridgeConfiguration.getEncodeSize(); @@ -61,10 +52,16 @@ public void decode(ActiveMQBuffer buffer) { bridgeConfiguration.decode(buffer); } + @Override public String getName() { return bridgeConfiguration.getParentName(); } + @Override + public byte getRecordType() { + return JournalRecordIds.BRIDGE_RECORD; + } + public BridgeConfiguration getBridgeConfiguration() { return bridgeConfiguration; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedConfiguration.java new file mode 100644 index 00000000000..8795502f3f8 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedConfiguration.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.persistence.config; + +import org.apache.activemq.artemis.core.journal.EncodingSupport; + +public abstract class PersistedConfiguration implements EncodingSupport { + + protected long storeId; + + public void setStoreId(long id) { + this.storeId = id; + } + + public long getStoreId() { + return storeId; + } + + /** + * Returns the name to be used as the key for the map used by the broker to track this configuration implementation. + * + * @return the name as a String + */ + public abstract String getName(); + + public abstract byte getRecordType(); +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedConnector.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedConnector.java index 7cc12dab333..2c187276917 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedConnector.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedConnector.java @@ -17,12 +17,10 @@ package org.apache.activemq.artemis.core.persistence.config; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.core.journal.EncodingSupport; +import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds; import org.apache.activemq.artemis.utils.BufferHelper; -public class PersistedConnector implements EncodingSupport { - - private long storeId; +public class PersistedConnector extends PersistedConfiguration { private String url; @@ -36,14 +34,6 @@ public PersistedConnector(String name, String url) { this.url = url; } - public void setStoreId(long id) { - this.storeId = id; - } - - public long getStoreId() { - return storeId; - } - public void setUrl(String url) { this.url = url; } @@ -56,10 +46,16 @@ public void setName(String name) { this.name = name; } + @Override public String getName() { return name; } + @Override + public byte getRecordType() { + return JournalRecordIds.CONNECTOR_RECORD; + } + @Override public int getEncodeSize() { int size = 0; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedDivertConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedDivertConfiguration.java index 67860bcc0d2..4de2d8418d8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedDivertConfiguration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedDivertConfiguration.java @@ -18,11 +18,9 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.core.config.DivertConfiguration; -import org.apache.activemq.artemis.core.journal.EncodingSupport; +import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds; -public class PersistedDivertConfiguration implements EncodingSupport { - - private long storeId; +public class PersistedDivertConfiguration extends PersistedConfiguration { private DivertConfiguration divertConfiguration; @@ -39,14 +37,6 @@ public PersistedDivertConfiguration() { divertConfiguration = new DivertConfiguration(); } - public void setStoreId(long id) { - this.storeId = id; - } - - public long getStoreId() { - return storeId; - } - @Override public int getEncodeSize() { return divertConfiguration.getEncodeSize(); @@ -62,10 +52,16 @@ public void decode(ActiveMQBuffer buffer) { divertConfiguration.decode(buffer); } + @Override public String getName() { return divertConfiguration.getName(); } + @Override + public byte getRecordType() { + return JournalRecordIds.DIVERT_RECORD; + } + public DivertConfiguration getDivertConfiguration() { return divertConfiguration; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedKeyValuePair.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedKeyValuePair.java index 4c9965811f1..5edbcba7366 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedKeyValuePair.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedKeyValuePair.java @@ -17,12 +17,10 @@ package org.apache.activemq.artemis.core.persistence.config; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.core.journal.EncodingSupport; +import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds; import org.apache.activemq.artemis.utils.BufferHelper; -public class PersistedKeyValuePair implements EncodingSupport { - - private long storeId; +public class PersistedKeyValuePair extends PersistedConfiguration { private String mapId; @@ -39,14 +37,6 @@ public PersistedKeyValuePair(String mapId, String key, String value) { this.value = value; } - public void setStoreId(long id) { - this.storeId = id; - } - - public long getStoreId() { - return storeId; - } - public String getMapId() { return mapId; } @@ -59,6 +49,16 @@ public String getValue() { return value; } + @Override + public String getName() { + return mapId; + } + + @Override + public byte getRecordType() { + return JournalRecordIds.KEY_VALUE_PAIR_RECORD; + } + @Override public int getEncodeSize() { int size = 0; @@ -85,12 +85,9 @@ public void decode(ActiveMQBuffer buffer) { @Override public String toString() { return "PersistedKeyValuePair [storeId=" + storeId + - ", mapId=" + - mapId + - ", key=" + - key + - ", value=" + - value + + ", mapId=" + mapId + + ", key=" + key + + ", value=" + value + "]"; } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedRole.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedRole.java index 6dda88b00ed..84a8d791c44 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedRole.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedRole.java @@ -21,13 +21,11 @@ import java.util.Objects; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.core.journal.EncodingSupport; +import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds; import org.apache.activemq.artemis.utils.BufferHelper; import org.apache.activemq.artemis.utils.DataConstants; -public class PersistedRole implements EncodingSupport { - - private long storeId; +public class PersistedRole extends PersistedConfiguration { private String username; @@ -41,14 +39,6 @@ public PersistedRole(String username, List roles) { this.roles = Objects.requireNonNull(roles); } - public void setStoreId(long id) { - this.storeId = id; - } - - public long getStoreId() { - return storeId; - } - public String getUsername() { return username; } @@ -105,4 +95,14 @@ public String toString() { return result.toString(); } + + @Override + public byte getRecordType() { + return JournalRecordIds.ROLE_RECORD; + } + + @Override + public String getName() { + return username; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedSecuritySetting.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedSecuritySetting.java index 2a679db0f3e..9792873d2d6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedSecuritySetting.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedSecuritySetting.java @@ -21,7 +21,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.JsonUtil; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.journal.EncodingSupport; +import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds; import org.apache.activemq.artemis.json.JsonObject; import static org.apache.activemq.artemis.core.security.Role.BROWSE_PERMISSION; @@ -39,9 +39,7 @@ import static org.apache.activemq.artemis.utils.DataConstants.SIZE_INT; import static org.apache.activemq.artemis.utils.DataConstants.SIZE_NULL; -public class PersistedSecuritySetting implements EncodingSupport { - - private long storeId; +public class PersistedSecuritySetting extends PersistedConfiguration { private SimpleString addressMatch; @@ -81,9 +79,7 @@ public PersistedSecuritySetting(final String addressMatch, final String createNonDurableQueueRoles, final String deleteNonDurableQueueRoles, final String manageRoles, - final String browseRoles, - final String createAddressRoles, - final String deleteAddressRoles, + final String browseRoles, final String createAddressRoles, final String deleteAddressRoles, final String viewRoles, final String editRoles) { super(); @@ -118,13 +114,14 @@ public PersistedSecuritySetting(final String addressMatch, JsonObject o) { JsonUtil.arrayToString(o, EDIT_PERMISSION)); } - - public long getStoreId() { - return storeId; + @Override + public String getName() { + return addressMatch.toString(); } - public void setStoreId(final long id) { - storeId = id; + @Override + public byte getRecordType() { + return JournalRecordIds.SECURITY_SETTING_RECORD; } public SimpleString getAddressMatch() { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedUser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedUser.java index b3dee5d5231..afe189bf19f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedUser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedUser.java @@ -17,12 +17,10 @@ package org.apache.activemq.artemis.core.persistence.config; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.core.journal.EncodingSupport; +import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds; import org.apache.activemq.artemis.utils.BufferHelper; -public class PersistedUser implements EncodingSupport { - - private long storeId; +public class PersistedUser extends PersistedConfiguration { private String username; @@ -36,12 +34,14 @@ public PersistedUser(String username, String password) { this.password = password; } - public void setStoreId(long id) { - this.storeId = id; + @Override + public byte getRecordType() { + return JournalRecordIds.USER_RECORD; } - public long getStoreId() { - return storeId; + @Override + public String getName() { + return username; } public String getUsername() { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java index e6df630c942..fbb3cad2372 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java @@ -36,6 +36,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.function.Supplier; import io.netty.buffer.Unpooled; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; @@ -73,6 +74,7 @@ import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting; import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSettingJSON; import org.apache.activemq.artemis.core.persistence.config.PersistedBridgeConfiguration; +import org.apache.activemq.artemis.core.persistence.config.PersistedConfiguration; import org.apache.activemq.artemis.core.persistence.config.PersistedConnector; import org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration; import org.apache.activemq.artemis.core.persistence.config.PersistedKeyValuePair; @@ -224,9 +226,9 @@ public Configuration getConfig() { } // Persisted core configuration - protected final Map mapPersistedSecuritySettings = new ConcurrentHashMap<>(); + protected final Map mapPersistedSecuritySettings = new ConcurrentHashMap<>(); - protected final Map mapPersistedAddressSettings = new ConcurrentHashMap<>(); + protected final Map mapPersistedAddressSettings = new ConcurrentHashMap<>(); protected final Map mapPersistedDivertConfigurations = new ConcurrentHashMap<>(); @@ -516,6 +518,20 @@ public void deleteMessage(final long messageID) throws Exception { } } + private void deleteRecordAsync(long journalId) throws Exception { + deleteRecord(journalId, false); + } + + private void deleteRecordSync(long journalId) throws Exception { + deleteRecord(journalId, true); + } + + private void deleteRecord(long journalId, boolean sync) throws Exception { + try (ArtemisCloseable lock = closeableReadLock()) { + bindingsJournal.tryAppendDeleteRecord(journalId, this::recordNotFoundCallback, sync); + } + } + private void messageUpdateCallback(long id, boolean found) { if (!found) { ActiveMQServerLogger.LOGGER.cannotFindMessageOnJournal(id, new Exception("trace")); @@ -783,15 +799,10 @@ public void updateDeliveryCount(final MessageReference ref) throws Exception { messageJournal.tryAppendUpdateRecord(ref.getMessage().getMessageID(), JournalRecordIds.UPDATE_DELIVERY_COUNT, updateInfo, syncNonTransactional, true, this::messageUpdateCallback, getContext(syncNonTransactional)); } } + @Override public void storeAddressSetting(PersistedAddressSettingJSON addressSetting) throws Exception { - deleteAddressSetting(addressSetting.getAddressMatch()); - try (ArtemisCloseable lock = closeableReadLock()) { - long id = idGenerator.generateID(); - addressSetting.setStoreId(id); - bindingsJournal.appendAddRecord(id, JournalRecordIds.ADDRESS_SETTING_RECORD_JSON, addressSetting, true); - mapPersistedAddressSettings.put(addressSetting.getAddressMatch(), addressSetting); - } + storeConfiguration(addressSetting, mapPersistedAddressSettings); } @Override @@ -801,44 +812,29 @@ public List recoverAddressSettings() throws Exc @Override public AbstractPersistedAddressSetting recoverAddressSettings(SimpleString address) { - return mapPersistedAddressSettings.get(address); + return mapPersistedAddressSettings.get(address.toString()); } @Override - public List recoverSecuritySettings() throws Exception { - return new ArrayList<>(mapPersistedSecuritySettings.values()); + public void storeSecuritySetting(PersistedSecuritySetting persistedRoles) throws Exception { + storeConfiguration(persistedRoles, mapPersistedSecuritySettings); } @Override - public void storeSecuritySetting(PersistedSecuritySetting persistedRoles) throws Exception { - - deleteSecuritySetting(persistedRoles.getAddressMatch()); - try (ArtemisCloseable lock = closeableReadLock()) { - final long id = idGenerator.generateID(); - persistedRoles.setStoreId(id); - bindingsJournal.appendAddRecord(id, JournalRecordIds.SECURITY_SETTING_RECORD, persistedRoles, true); - mapPersistedSecuritySettings.put(persistedRoles.getAddressMatch(), persistedRoles); - } + public List recoverSecuritySettings() throws Exception { + return new ArrayList<>(mapPersistedSecuritySettings.values()); } @Override public void storeDivertConfiguration(PersistedDivertConfiguration persistedDivertConfiguration) throws Exception { - deleteDivertConfiguration(persistedDivertConfiguration.getName()); - try (ArtemisCloseable lock = closeableReadLock()) { - final long id = idGenerator.generateID(); - persistedDivertConfiguration.setStoreId(id); - bindingsJournal.appendAddRecord(id, JournalRecordIds.DIVERT_RECORD, persistedDivertConfiguration, true); - mapPersistedDivertConfigurations.put(persistedDivertConfiguration.getName(), persistedDivertConfiguration); - } + storeConfiguration(persistedDivertConfiguration, mapPersistedDivertConfigurations); } @Override public void deleteDivertConfiguration(String divertName) throws Exception { - PersistedDivertConfiguration oldDivert = mapPersistedDivertConfigurations.remove(divertName); - if (oldDivert != null) { - try (ArtemisCloseable lock = closeableReadLock()) { - bindingsJournal.tryAppendDeleteRecord(oldDivert.getStoreId(), this::recordNotFoundCallback, false); - } + PersistedDivertConfiguration old = mapPersistedDivertConfigurations.remove(divertName); + if (old != null) { + deleteRecordAsync(old.getStoreId()); } } @@ -849,22 +845,14 @@ public List recoverDivertConfigurations() { @Override public void storeBridgeConfiguration(PersistedBridgeConfiguration persistedBridgeConfiguration) throws Exception { - deleteBridgeConfiguration(persistedBridgeConfiguration.getName()); - try (ArtemisCloseable lock = closeableReadLock()) { - final long id = idGenerator.generateID(); - persistedBridgeConfiguration.setStoreId(id); - bindingsJournal.appendAddRecord(id, JournalRecordIds.BRIDGE_RECORD, persistedBridgeConfiguration, true); - mapPersistedBridgeConfigurations.put(persistedBridgeConfiguration.getName(), persistedBridgeConfiguration); - } + storeConfiguration(persistedBridgeConfiguration, mapPersistedBridgeConfigurations); } @Override public void deleteBridgeConfiguration(String bridgeName) throws Exception { - PersistedBridgeConfiguration oldBridge = mapPersistedBridgeConfigurations.remove(bridgeName); - if (oldBridge != null) { - try (ArtemisCloseable lock = closeableReadLock()) { - bindingsJournal.tryAppendDeleteRecord(oldBridge.getStoreId(), this::recordNotFoundCallback, false); - } + PersistedBridgeConfiguration old = mapPersistedBridgeConfigurations.remove(bridgeName); + if (old != null) { + deleteRecordAsync(old.getStoreId()); } } @@ -875,22 +863,14 @@ public List recoverBridgeConfigurations() { @Override public void storeConnector(PersistedConnector persistedConnector) throws Exception { - deleteConnector(persistedConnector.getName()); - try (ArtemisCloseable lock = closeableReadLock()) { - final long id = idGenerator.generateID(); - persistedConnector.setStoreId(id); - bindingsJournal.appendAddRecord(id, JournalRecordIds.CONNECTOR_RECORD, persistedConnector, true); - mapPersistedConnectors.put(persistedConnector.getName(), persistedConnector); - } + storeConfiguration(persistedConnector, mapPersistedConnectors); } @Override public void deleteConnector(String connectorName) throws Exception { - PersistedConnector oldConnector = mapPersistedConnectors.remove(connectorName); - if (oldConnector != null) { - try (ArtemisCloseable lock = closeableReadLock()) { - bindingsJournal.tryAppendDeleteRecord(oldConnector.getStoreId(), this::recordNotFoundCallback, false); - } + PersistedConnector old = mapPersistedConnectors.remove(connectorName); + if (old != null) { + deleteRecordAsync(old.getStoreId()); } } @@ -901,22 +881,14 @@ public List recoverConnectors() { @Override public void storeUser(PersistedUser persistedUser) throws Exception { - deleteUser(persistedUser.getUsername()); - try (ArtemisCloseable lock = closeableReadLock()) { - final long id = idGenerator.generateID(); - persistedUser.setStoreId(id); - bindingsJournal.appendAddRecord(id, JournalRecordIds.USER_RECORD, persistedUser, true); - mapPersistedUsers.put(persistedUser.getUsername(), persistedUser); - } + storeConfiguration(persistedUser, mapPersistedUsers); } @Override public void deleteUser(String username) throws Exception { - PersistedUser oldUser = mapPersistedUsers.remove(username); - if (oldUser != null) { - try (ArtemisCloseable lock = closeableReadLock()) { - bindingsJournal.tryAppendDeleteRecord(oldUser.getStoreId(), this::recordNotFoundCallback, false); - } + PersistedUser old = mapPersistedUsers.remove(username); + if (old != null) { + deleteRecordAsync(old.getStoreId()); } } @@ -927,22 +899,14 @@ public Map getPersistedUsers() { @Override public void storeRole(PersistedRole persistedRole) throws Exception { - deleteRole(persistedRole.getUsername()); - try (ArtemisCloseable lock = closeableReadLock()) { - final long id = idGenerator.generateID(); - persistedRole.setStoreId(id); - bindingsJournal.appendAddRecord(id, JournalRecordIds.ROLE_RECORD, persistedRole, true); - mapPersistedRoles.put(persistedRole.getUsername(), persistedRole); - } + storeConfiguration(persistedRole, mapPersistedRoles); } @Override public void deleteRole(String username) throws Exception { - PersistedRole oldRole = mapPersistedRoles.remove(username); - if (oldRole != null) { - try (ArtemisCloseable lock = closeableReadLock()) { - bindingsJournal.tryAppendDeleteRecord(oldRole.getStoreId(), this::recordNotFoundCallback, false); - } + PersistedRole old = mapPersistedRoles.remove(username); + if (old != null) { + deleteRecordAsync(old.getStoreId()); } } @@ -953,24 +917,16 @@ public Map getPersistedRoles() { @Override public void storeKeyValuePair(PersistedKeyValuePair persistedKeyValuePair) throws Exception { - deleteKeyValuePair(persistedKeyValuePair.getMapId(), persistedKeyValuePair.getKey()); - try (ArtemisCloseable lock = closeableReadLock()) { - final long id = idGenerator.generateID(); - persistedKeyValuePair.setStoreId(id); - bindingsJournal.appendAddRecord(id, JournalRecordIds.KEY_VALUE_PAIR_RECORD, persistedKeyValuePair, true); - insertPersistedKeyValuePair(persistedKeyValuePair); - } + storeConfiguration(persistedKeyValuePair, () -> insertPersistedKeyValuePair(persistedKeyValuePair)); } @Override public void deleteKeyValuePair(String mapId, String key) throws Exception { Map persistedKeyValuePairs = mapPersistedKeyValuePairs.get(mapId); if (persistedKeyValuePairs != null) { - PersistedKeyValuePair oldMapStringEntry = persistedKeyValuePairs.remove(key); - if (oldMapStringEntry != null) { - try (ArtemisCloseable lock = closeableReadLock()) { - bindingsJournal.tryAppendDeleteRecord(oldMapStringEntry.getStoreId(), this::recordNotFoundCallback, false); - } + PersistedKeyValuePair old = persistedKeyValuePairs.remove(key); + if (old != null) { + deleteRecordAsync(old.getStoreId()); } } } @@ -981,6 +937,26 @@ public Map getPersistedKeyValuePairs(String mapId return persistedKeyValuePairs != null ? new HashMap<>(persistedKeyValuePairs) : new HashMap<>(); } + private void storeConfiguration(T persistedConfiguration, Map map) throws Exception { + storeConfiguration(persistedConfiguration, () -> map.put(persistedConfiguration.getName(), persistedConfiguration)); + } + + private void storeConfiguration(T persistedConfiguration, Supplier s) throws Exception { + try (ArtemisCloseable lock = closeableReadLock()) { + final long recordID = idGenerator.generateID(); + persistedConfiguration.setStoreId(recordID); + T old = s.get(); + if (old != null) { + final long txID = idGenerator.generateID(); + bindingsJournal.appendDeleteRecordTransactional(txID, old.getStoreId()); + bindingsJournal.appendAddRecordTransactional(txID, recordID, persistedConfiguration.getRecordType(), persistedConfiguration); + commitBindings(txID); + } else { + bindingsJournal.appendAddRecord(recordID, persistedConfiguration.getRecordType(), persistedConfiguration, true); + } + } + } + @Override public void storeID(final long journalID, final long id) throws Exception { try (ArtemisCloseable lock = closeableReadLock()) { @@ -989,29 +965,23 @@ public void storeID(final long journalID, final long id) throws Exception { } @Override - public void deleteID(long journalD) throws Exception { - try (ArtemisCloseable lock = closeableReadLock()) { - bindingsJournal.tryAppendDeleteRecord(journalD, this::recordNotFoundCallback, false); - } + public void deleteID(long journalID) throws Exception { + deleteRecordAsync(journalID); } @Override public void deleteAddressSetting(SimpleString addressMatch) throws Exception { - AbstractPersistedAddressSetting oldSetting = mapPersistedAddressSettings.remove(addressMatch); - if (oldSetting != null) { - try (ArtemisCloseable lock = closeableReadLock()) { - bindingsJournal.tryAppendDeleteRecord(oldSetting.getStoreId(), this::recordNotFoundCallback, false); - } + AbstractPersistedAddressSetting old = mapPersistedAddressSettings.remove(addressMatch.toString()); + if (old != null) { + deleteRecordAsync(old.getStoreId()); } } @Override public void deleteSecuritySetting(SimpleString addressMatch) throws Exception { - PersistedSecuritySetting oldRoles = mapPersistedSecuritySettings.remove(addressMatch); - if (oldRoles != null) { - try (ArtemisCloseable lock = closeableReadLock()) { - bindingsJournal.tryAppendDeleteRecord(oldRoles.getStoreId(), this::recordNotFoundCallback, false); - } + PersistedSecuritySetting old = mapPersistedSecuritySettings.remove(addressMatch.toString()); + if (old != null) { + deleteRecordAsync(old.getStoreId()); } } @@ -1548,9 +1518,7 @@ public long storeQueueStatus(long queueID, AddressQueueStatus status) throws Exc @Override public void deleteQueueStatus(long recordID) throws Exception { - try (ArtemisCloseable lock = closeableReadLock()) { - bindingsJournal.tryAppendDeleteRecord(recordID, this::recordNotFoundCallback, true); - } + deleteRecordSync(recordID); } @Override @@ -1566,9 +1534,7 @@ public long storeAddressStatus(long addressID, AddressQueueStatus status) throws @Override public void deleteAddressStatus(long recordID) throws Exception { - try (ArtemisCloseable lock = closeableReadLock()) { - bindingsJournal.tryAppendDeleteRecord(recordID, this::recordNotFoundCallback, true); - } + deleteRecordSync(recordID); } @Override @@ -1686,14 +1652,14 @@ public JournalLoadInformation loadBindingJournal(final List qu GroupingEncoding encoding = newGroupEncoding(id, buffer); groupingInfos.add(encoding); } else if (rec == JournalRecordIds.ADDRESS_SETTING_RECORD) { - PersistedAddressSetting setting = newAddressEncoding(id, buffer); - mapPersistedAddressSettings.put(setting.getAddressMatch(), setting); + PersistedAddressSetting setting = newPersistedConfigurationEncoding(PersistedAddressSetting.class, id, buffer); + mapPersistedAddressSettings.put(setting.getName(), setting); } else if (rec == JournalRecordIds.ADDRESS_SETTING_RECORD_JSON) { - PersistedAddressSettingJSON setting = newAddressJSONEncoding(id, buffer); - mapPersistedAddressSettings.put(setting.getAddressMatch(), setting); + PersistedAddressSettingJSON setting = newPersistedConfigurationEncoding(PersistedAddressSettingJSON.class, id, buffer); + mapPersistedAddressSettings.put(setting.getName(), setting); } else if (rec == JournalRecordIds.SECURITY_SETTING_RECORD) { - PersistedSecuritySetting roles = newSecurityRecord(id, buffer); - mapPersistedSecuritySettings.put(roles.getAddressMatch(), roles); + PersistedSecuritySetting roles = newPersistedConfigurationEncoding(PersistedSecuritySetting.class, id, buffer); + mapPersistedSecuritySettings.put(roles.getName(), roles); } else if (rec == JournalRecordIds.QUEUE_STATUS_RECORD) { QueueStatusEncoding statusEncoding = newQueueStatusEncoding(id, buffer); PersistentQueueBindingEncoding queueBindingEncoding = mapBindings.get(statusEncoding.queueID); @@ -1715,22 +1681,21 @@ public JournalLoadInformation loadBindingJournal(final List qu this.deleteAddressStatus(statusEncoding.getId()); } } else if (rec == JournalRecordIds.DIVERT_RECORD) { - PersistedDivertConfiguration divertConfiguration = newDivertEncoding(id, buffer); + PersistedDivertConfiguration divertConfiguration = newPersistedConfigurationEncoding(PersistedDivertConfiguration.class, id, buffer); mapPersistedDivertConfigurations.put(divertConfiguration.getName(), divertConfiguration); } else if (rec == JournalRecordIds.BRIDGE_RECORD) { - PersistedBridgeConfiguration bridgeConfiguration = newBridgeEncoding(id, buffer); + PersistedBridgeConfiguration bridgeConfiguration = newPersistedConfigurationEncoding(PersistedBridgeConfiguration.class, id, buffer); mapPersistedBridgeConfigurations.put(bridgeConfiguration.getName(), bridgeConfiguration); } else if (rec == JournalRecordIds.USER_RECORD) { - PersistedUser user = newUserEncoding(id, buffer); + PersistedUser user = newPersistedConfigurationEncoding(PersistedUser.class, id, buffer); mapPersistedUsers.put(user.getUsername(), user); } else if (rec == JournalRecordIds.ROLE_RECORD) { - PersistedRole role = newRoleEncoding(id, buffer); + PersistedRole role = newPersistedConfigurationEncoding(PersistedRole.class, id, buffer); mapPersistedRoles.put(role.getUsername(), role); } else if (rec == JournalRecordIds.KEY_VALUE_PAIR_RECORD) { - PersistedKeyValuePair keyValuePair = newKeyValuePairEncoding(id, buffer); - insertPersistedKeyValuePair(keyValuePair); + insertPersistedKeyValuePair(newPersistedConfigurationEncoding(PersistedKeyValuePair.class, id, buffer)); } else if (rec == JournalRecordIds.CONNECTOR_RECORD) { - PersistedConnector connector = newConnectorEncoding(id, buffer); + PersistedConnector connector = newPersistedConfigurationEncoding(PersistedConnector.class, id, buffer); mapPersistedConnectors.put(connector.getName(), connector); } else { // unlikely to happen @@ -1755,7 +1720,7 @@ public JournalLoadInformation loadBindingJournal(final List qu return bindingsInfo; } - private void insertPersistedKeyValuePair(final PersistedKeyValuePair keyValuePair) { + private PersistedKeyValuePair insertPersistedKeyValuePair(final PersistedKeyValuePair keyValuePair) { Map persistedKeyValuePairs = mapPersistedKeyValuePairs.get(keyValuePair.getMapId()); if (persistedKeyValuePairs == null) { ConcurrentMap newMap = new ConcurrentHashMap<>(); @@ -1764,7 +1729,7 @@ private void insertPersistedKeyValuePair(final PersistedKeyValuePair keyValuePai persistedKeyValuePairs = Objects.requireNonNullElse(existingMap, newMap); } - persistedKeyValuePairs.put(keyValuePair.getKey(), keyValuePair); + return persistedKeyValuePairs.put(keyValuePair.getKey(), keyValuePair); } protected abstract void beforeStart() throws Exception; @@ -2153,27 +2118,6 @@ public void pageSyncDone() { } } - protected static PersistedSecuritySetting newSecurityRecord(long id, ActiveMQBuffer buffer) { - PersistedSecuritySetting roles = new PersistedSecuritySetting(); - roles.decode(buffer); - roles.setStoreId(id); - return roles; - } - - static PersistedAddressSetting newAddressEncoding(long id, ActiveMQBuffer buffer) { - PersistedAddressSetting setting = new PersistedAddressSetting(); - setting.decode(buffer); - setting.setStoreId(id); - return setting; - } - - static PersistedAddressSettingJSON newAddressJSONEncoding(long id, ActiveMQBuffer buffer) { - PersistedAddressSettingJSON setting = new PersistedAddressSettingJSON(); - setting.decode(buffer); - setting.setStoreId(id); - return setting; - } - static AddressStatusEncoding newAddressStatusEncoding(long id, ActiveMQBuffer buffer) { AddressStatusEncoding addressStatus = new AddressStatusEncoding(); addressStatus.decode(buffer); @@ -2181,46 +2125,15 @@ static AddressStatusEncoding newAddressStatusEncoding(long id, ActiveMQBuffer bu return addressStatus; } - static PersistedDivertConfiguration newDivertEncoding(long id, ActiveMQBuffer buffer) { - PersistedDivertConfiguration persistedDivertConfiguration = new PersistedDivertConfiguration(); - persistedDivertConfiguration.decode(buffer); - persistedDivertConfiguration.setStoreId(id); - return persistedDivertConfiguration; - } - - static PersistedBridgeConfiguration newBridgeEncoding(long id, ActiveMQBuffer buffer) { - PersistedBridgeConfiguration persistedBridgeConfiguration = new PersistedBridgeConfiguration(); - persistedBridgeConfiguration.decode(buffer); - persistedBridgeConfiguration.setStoreId(id); - return persistedBridgeConfiguration; - } - - static PersistedConnector newConnectorEncoding(long id, ActiveMQBuffer buffer) { - PersistedConnector persistedBridgeConfiguration = new PersistedConnector(); - persistedBridgeConfiguration.decode(buffer); - persistedBridgeConfiguration.setStoreId(id); - return persistedBridgeConfiguration; - } - - static PersistedUser newUserEncoding(long id, ActiveMQBuffer buffer) { - PersistedUser persistedUser = new PersistedUser(); - persistedUser.decode(buffer); - persistedUser.setStoreId(id); - return persistedUser; - } - - static PersistedRole newRoleEncoding(long id, ActiveMQBuffer buffer) { - PersistedRole persistedRole = new PersistedRole(); - persistedRole.decode(buffer); - persistedRole.setStoreId(id); - return persistedRole; - } - - static PersistedKeyValuePair newKeyValuePairEncoding(long id, ActiveMQBuffer buffer) { - PersistedKeyValuePair persistedKeyValuePair = new PersistedKeyValuePair(); - persistedKeyValuePair.decode(buffer); - persistedKeyValuePair.setStoreId(id); - return persistedKeyValuePair; + static T newPersistedConfigurationEncoding(Class clazz, long id, ActiveMQBuffer buffer) { + try { + T persistedConfiguration = clazz.getDeclaredConstructor().newInstance(); + persistedConfiguration.decode(buffer); + persistedConfiguration.setStoreId(id); + return persistedConfiguration; + } catch (Exception e) { + throw new RuntimeException("Error creating instance of " + clazz.getSimpleName(), e); + } } static GroupingEncoding newGroupEncoding(long id, ActiveMQBuffer buffer) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java index 576212fab4f..6b68993252e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java @@ -44,8 +44,13 @@ import org.apache.activemq.artemis.core.journal.impl.JournalReaderCallback; import org.apache.activemq.artemis.core.paging.cursor.impl.PageSubscriptionCounterImpl; import org.apache.activemq.artemis.core.paging.impl.PageTransactionInfoImpl; +import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting; +import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSettingJSON; import org.apache.activemq.artemis.core.persistence.config.PersistedBridgeConfiguration; import org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration; +import org.apache.activemq.artemis.core.persistence.config.PersistedRole; +import org.apache.activemq.artemis.core.persistence.config.PersistedSecuritySetting; +import org.apache.activemq.artemis.core.persistence.config.PersistedUser; import org.apache.activemq.artemis.core.persistence.impl.journal.BatchingIDGenerator.IDCounterEncoding; import org.apache.activemq.artemis.core.persistence.impl.journal.codec.AckRetry; import org.apache.activemq.artemis.core.persistence.impl.journal.codec.CursorAckRecordEncoding; @@ -596,15 +601,10 @@ public static Object newObjectEncoding(RecordInfo info, JournalStorageManager st switch (rec) { case DIVERT_RECORD: - PersistedDivertConfiguration persistedDivertConfiguration = new PersistedDivertConfiguration(); - persistedDivertConfiguration.decode(buffer); - return persistedDivertConfiguration; + return AbstractJournalStorageManager.newPersistedConfigurationEncoding(PersistedDivertConfiguration.class, id, buffer); - case BRIDGE_RECORD: { - PersistedBridgeConfiguration persistedBridgeConfiguration = new PersistedBridgeConfiguration(); - persistedBridgeConfiguration.decode(buffer); - return persistedBridgeConfiguration; - } + case BRIDGE_RECORD: + return AbstractJournalStorageManager.newPersistedConfigurationEncoding(PersistedBridgeConfiguration.class, id, buffer); case ADD_LARGE_MESSAGE_PENDING: { PendingLargeMessageEncoding lmEncoding = new PendingLargeMessageEncoding(); @@ -738,13 +738,13 @@ public static Object newObjectEncoding(RecordInfo info, JournalStorageManager st return AbstractJournalStorageManager.newGroupEncoding(id, buffer); case ADDRESS_SETTING_RECORD: - return AbstractJournalStorageManager.newAddressEncoding(id, buffer); + return AbstractJournalStorageManager.newPersistedConfigurationEncoding(PersistedAddressSetting.class, id, buffer); case ADDRESS_SETTING_RECORD_JSON: - return AbstractJournalStorageManager.newAddressJSONEncoding(id, buffer); + return AbstractJournalStorageManager.newPersistedConfigurationEncoding(PersistedAddressSettingJSON.class, id, buffer); case SECURITY_SETTING_RECORD: - return AbstractJournalStorageManager.newSecurityRecord(id, buffer); + return AbstractJournalStorageManager.newPersistedConfigurationEncoding(PersistedSecuritySetting.class, id, buffer); case ADDRESS_BINDING_RECORD: return AbstractJournalStorageManager.newAddressBindingEncoding(id, buffer); @@ -753,10 +753,10 @@ public static Object newObjectEncoding(RecordInfo info, JournalStorageManager st return AbstractJournalStorageManager.newAddressStatusEncoding(id, buffer); case USER_RECORD: - return AbstractJournalStorageManager.newUserEncoding(id, buffer); + return AbstractJournalStorageManager.newPersistedConfigurationEncoding(PersistedUser.class, id, buffer); case ROLE_RECORD: - return AbstractJournalStorageManager.newRoleEncoding(id, buffer); + return AbstractJournalStorageManager.newPersistedConfigurationEncoding(PersistedRole.class, id, buffer); case ACK_RETRY: return AckRetry.getPersister().decode(buffer, null, null);