2424import org .apache .flink .runtime .memory .OpaqueMemoryResource ;
2525import org .apache .flink .runtime .operators .testutils .MockEnvironment ;
2626import org .apache .flink .runtime .taskexecutor .TaskManagerConfiguration ;
27+ import org .apache .flink .state .rocksdb .RocksDBMemoryControllerUtils .RocksDBMemoryFactory ;
2728
2829import org .junit .jupiter .api .BeforeAll ;
2930import org .junit .jupiter .api .Test ;
3031import org .junit .jupiter .api .io .TempDir ;
31- import org .junit .jupiter .params .ParameterizedTest ;
32- import org .junit .jupiter .params .provider .Arguments ;
33- import org .junit .jupiter .params .provider .MethodSource ;
3432import org .slf4j .Logger ;
3533import org .slf4j .LoggerFactory ;
3634
3735import java .io .File ;
3836import java .io .IOException ;
3937import java .nio .file .Path ;
4038import java .util .Map ;
41- import java .util .stream .Stream ;
4239
4340import static java .util .Collections .emptyMap ;
4441import static java .util .Collections .singletonMap ;
42+ import static org .apache .flink .contrib .streaming .state .RocksDBOptions .FIX_PER_SLOT_MEMORY_SIZE ;
43+ import static org .apache .flink .contrib .streaming .state .RocksDBOptions .FIX_PER_TM_MEMORY_SIZE ;
44+ import static org .apache .flink .contrib .streaming .state .RocksDBOptions .USE_MANAGED_MEMORY ;
45+ import static org .apache .flink .contrib .streaming .state .RocksDBOptions .WRITE_BUFFER_RATIO ;
4546import static org .apache .flink .runtime .taskexecutor .TaskExecutorResourceUtils .resourceSpecFromConfigForLocalExecution ;
4647import static org .apache .flink .state .rocksdb .EmbeddedRocksDBStateBackend .ensureRocksDBIsLoaded ;
48+ import static org .apache .flink .state .rocksdb .MemoryShareScope .SLOT ;
4749import static org .apache .flink .state .rocksdb .RocksDBMemoryControllerUtils .calculateWriteBufferManagerCapacity ;
48- import static org .apache .flink .state .rocksdb .RocksDBOptions .FIX_PER_SLOT_MEMORY_SIZE ;
49- import static org .apache .flink .state .rocksdb .RocksDBOptions .FIX_PER_TM_MEMORY_SIZE ;
50- import static org .apache .flink .state .rocksdb .RocksDBOptions .USE_MANAGED_MEMORY ;
51- import static org .apache .flink .state .rocksdb .RocksDBOptions .WRITE_BUFFER_RATIO ;
5250import static org .apache .flink .state .rocksdb .RocksDBSharedResourcesFactory .SLOT_SHARED_MANAGED ;
5351import static org .apache .flink .state .rocksdb .RocksDBSharedResourcesFactory .SLOT_SHARED_UNMANAGED ;
5452import static org .apache .flink .state .rocksdb .RocksDBSharedResourcesFactory .TM_SHARED_UNMANAGED ;
5553import static org .apache .flink .util .CollectionUtil .entry ;
5654import static org .apache .flink .util .CollectionUtil .map ;
5755import static org .junit .jupiter .api .Assertions .assertEquals ;
58- import static org .junit .jupiter .params . provider . Arguments . arguments ;
56+ import static org .junit .jupiter .api . Assertions . assertSame ;
5957
6058/** {@link RocksDBSharedResourcesFactory} test. */
59+ @ SuppressWarnings ({"rawtypes" , "DataFlowIssue" , "SameParameterValue" })
6160public class RocksDBSharedResourcesFactoryTest {
6261 private static final Logger LOG =
6362 LoggerFactory .getLogger (RocksDBSharedResourcesFactoryTest .class );
6463
64+ private static final MemorySize TM_SIZE = new MemorySize (20 );
65+ private static final MemorySize MANAGED_MEMORY_SIZE = new MemorySize (15 );
66+ public static final MemorySize PER_SLOT = new MemorySize (10 );
67+
6568 @ TempDir static Path tempDir ;
6669
6770 @ BeforeAll
6871 static void init () throws IOException {
6972 ensureRocksDBIsLoaded (tempDir .toAbsolutePath ().toString ());
7073 }
7174
72- private static Stream <Arguments > getSelectionStrategyParams () {
73- Map <Object , Object > defaults = emptyMap ();
74-
75- // format: job options, tm options, expected factory type
76- return Stream .of (
77- // default: per slot, managed
78- arguments (defaults , defaults , SLOT_SHARED_MANAGED ),
79- // no sharing (allocate per column family), unmanaged
80- arguments (singletonMap (USE_MANAGED_MEMORY , false ), defaults , null ),
81- // prioritize managed (set explicitly)
82- arguments (
83- singletonMap (USE_MANAGED_MEMORY , true ),
84- singletonMap (FIX_PER_TM_MEMORY_SIZE , MemorySize .ofMebiBytes (1 )),
85- SLOT_SHARED_MANAGED ),
86- // prioritize managed (job default)
87- arguments (
88- defaults ,
89- singletonMap (FIX_PER_TM_MEMORY_SIZE , MemorySize .ofMebiBytes (1 )),
90- SLOT_SHARED_MANAGED ),
91- // prioritize fixed-per-slot over fixed-per-tm
92- arguments (
93- singletonMap (FIX_PER_SLOT_MEMORY_SIZE , MemorySize .ofMebiBytes (1 )),
94- singletonMap (FIX_PER_TM_MEMORY_SIZE , MemorySize .ofMebiBytes (1 )),
95- SLOT_SHARED_UNMANAGED ),
96- // prioritize fixed-per-slot over managed
97- arguments (
98- map (
99- entry (FIX_PER_SLOT_MEMORY_SIZE , MemorySize .ofMebiBytes (1 )),
100- entry (USE_MANAGED_MEMORY , true )),
101- singletonMap (FIX_PER_TM_MEMORY_SIZE , MemorySize .ofMebiBytes (1 )),
102- SLOT_SHARED_UNMANAGED ),
103- // use fixed-per-tm - when not managed and not fixed-per-slot
104- arguments (
105- singletonMap (USE_MANAGED_MEMORY , false ),
106- singletonMap (FIX_PER_TM_MEMORY_SIZE , MemorySize .ofMebiBytes (1 )),
107- TM_SHARED_UNMANAGED ));
75+ @ Test
76+ public void testDefaults () throws Exception {
77+ testSelectionStrategy (
78+ MANAGED_MEMORY_SIZE ,
79+ emptyMap (),
80+ emptyMap (),
81+ SLOT ,
82+ MANAGED_MEMORY_SIZE ,
83+ true ,
84+ SLOT_SHARED_MANAGED );
85+ }
86+
87+ @ Test
88+ public void testDisableManaged () throws Exception {
89+ testSelectionStrategy (
90+ MANAGED_MEMORY_SIZE ,
91+ singletonMap (USE_MANAGED_MEMORY , false ),
92+ emptyMap (),
93+ null , // everything else below is ignored
94+ MemorySize .ZERO ,
95+ false ,
96+ null );
97+ }
98+
99+ @ Test
100+ public void testEnableManaged () throws Exception {
101+ testSelectionStrategy (
102+ MANAGED_MEMORY_SIZE ,
103+ singletonMap (USE_MANAGED_MEMORY , true ),
104+ singletonMap (FIX_PER_TM_MEMORY_SIZE , TM_SIZE ), // ignore
105+ SLOT ,
106+ MANAGED_MEMORY_SIZE ,
107+ true ,
108+ SLOT_SHARED_MANAGED );
109+ }
110+
111+ @ Test
112+ public void testEnableManagedByDefault () throws Exception {
113+ testSelectionStrategy (
114+ MANAGED_MEMORY_SIZE ,
115+ emptyMap (),
116+ singletonMap (FIX_PER_TM_MEMORY_SIZE , TM_SIZE ), // ignore
117+ SLOT ,
118+ MANAGED_MEMORY_SIZE ,
119+ true ,
120+ SLOT_SHARED_MANAGED );
121+ }
122+
123+ @ Test
124+ public void testPrioritizeFixedPerSlotOverTm () throws Exception {
125+ testSelectionStrategy (
126+ MANAGED_MEMORY_SIZE ,
127+ singletonMap (FIX_PER_SLOT_MEMORY_SIZE , PER_SLOT ),
128+ singletonMap (FIX_PER_TM_MEMORY_SIZE , TM_SIZE ),
129+ SLOT ,
130+ PER_SLOT ,
131+ false ,
132+ SLOT_SHARED_UNMANAGED );
133+ }
134+
135+ @ Test
136+ public void testPrioritizeFixedPerSlotOverManaged () throws Exception {
137+ MemorySize perSlot = PER_SLOT ;
138+ testSelectionStrategy (
139+ MANAGED_MEMORY_SIZE ,
140+ map (entry (FIX_PER_SLOT_MEMORY_SIZE , perSlot ), entry (USE_MANAGED_MEMORY , true )),
141+ singletonMap (FIX_PER_TM_MEMORY_SIZE , TM_SIZE ),
142+ SLOT ,
143+ perSlot ,
144+ false ,
145+ SLOT_SHARED_UNMANAGED );
146+ }
147+
148+ @ Test
149+ public void testFixedPerTm () throws Exception {
150+ testSelectionStrategy (
151+ MANAGED_MEMORY_SIZE ,
152+ singletonMap (USE_MANAGED_MEMORY , false ),
153+ singletonMap (FIX_PER_TM_MEMORY_SIZE , TM_SIZE ),
154+ MemoryShareScope .TM ,
155+ MemorySize .ZERO ,
156+ false ,
157+ TM_SHARED_UNMANAGED );
108158 }
109159
110- @ ParameterizedTest (name = "jobConfig: {0}, tmConfig: {1}" )
111- @ MethodSource ("getSelectionStrategyParams" )
112- @ SuppressWarnings ("rawtypes" )
113- public void testSelectionStrategy (
160+ private void testSelectionStrategy (
161+ MemorySize managedMemorySize ,
114162 Map <ConfigOption , Object > jobOptions ,
115163 Map <ConfigOption , Object > tmOptions ,
116- RocksDBSharedResourcesFactory expected )
117- throws IOException {
164+ MemoryShareScope expectedScope ,
165+ MemorySize expectedSize ,
166+ Boolean expectManaged ,
167+ RocksDBSharedResourcesFactory expectedFactory )
168+ throws Exception {
118169
119170 Configuration jobConfig = new Configuration ();
120171 jobOptions .forEach (jobConfig ::set );
121172
122173 Configuration tmConfig = new Configuration ();
123174 tmOptions .forEach (tmConfig ::set );
124175
125- assertEquals (
126- expected ,
127- RocksDBSharedResourcesFactory .from (
128- RocksDBMemoryConfiguration .fromConfiguration (jobConfig ), getEnv (tmConfig )));
176+ RocksDBMemoryConfiguration jobMemoryConfig =
177+ RocksDBMemoryConfiguration .fromConfiguration (jobConfig );
178+ RocksDBSharedResourcesFactory actualFactory =
179+ RocksDBSharedResourcesFactory .from (jobMemoryConfig , getEnv (tmConfig ));
180+ assertEquals (expectedFactory , actualFactory );
181+ if (expectedScope == null ) {
182+ return ;
183+ }
184+ assertEquals (expectManaged , actualFactory .isManaged ());
185+ assertEquals (expectedScope , actualFactory .getShareScope ());
186+ MockEnvironment env =
187+ MockEnvironment .builder ()
188+ .setManagedMemorySize (managedMemorySize .getBytes ())
189+ .build ();
190+ OpaqueMemoryResource <RocksDBSharedResources > resource1 =
191+ actualFactory .create (jobMemoryConfig , env , 1 , LOG , RocksDBMemoryFactory .DEFAULT );
192+ assertEquals (expectedSize .getBytes (), resource1 .getSize ());
193+ OpaqueMemoryResource <RocksDBSharedResources > resource2 =
194+ actualFactory .create (jobMemoryConfig , env , 1 , LOG , RocksDBMemoryFactory .DEFAULT );
195+ assertSame (resource1 .getResourceHandle (), resource2 .getResourceHandle ());
129196 }
130197
131198 @ Test
@@ -136,18 +203,19 @@ public void testTmSharedMemorySize() throws Exception {
136203 tmConfig .set (FIX_PER_TM_MEMORY_SIZE , new MemorySize (size ));
137204 tmConfig .set (WRITE_BUFFER_RATIO , writeBufferRatio );
138205
139- OpaqueMemoryResource <RocksDBSharedResources > resource =
206+ try ( OpaqueMemoryResource <RocksDBSharedResources > resource =
140207 TM_SHARED_UNMANAGED .create (
141208 RocksDBMemoryConfiguration .fromConfiguration (new Configuration ()),
142209 getEnv (tmConfig ),
143210 0 , // managed memory fraction must be ignored
144211 LOG ,
145- RocksDBMemoryControllerUtils . RocksDBMemoryFactory .DEFAULT );
212+ RocksDBMemoryFactory .DEFAULT )) {
146213
147- assertEquals (size , resource .getSize ());
148- assertEquals (
149- calculateWriteBufferManagerCapacity (size , writeBufferRatio ),
150- resource .getResourceHandle ().getWriteBufferManagerCapacity ());
214+ assertEquals (size , resource .getSize ());
215+ assertEquals (
216+ calculateWriteBufferManagerCapacity (size , writeBufferRatio ),
217+ resource .getResourceHandle ().getWriteBufferManagerCapacity ());
218+ }
151219 }
152220
153221 private static Environment getEnv (Configuration tmConfig ) throws IOException {
0 commit comments