2424
2525package org .sourcelab .kafka .webview .ui .controller .stream ;
2626
27+ import com .salesforce .kafka .test .junit4 .SharedKafkaTestResource ;
28+ import org .junit .Assert ;
29+ import org .junit .ClassRule ;
2730import org .junit .Test ;
2831import org .junit .runner .RunWith ;
2932import org .sourcelab .kafka .webview .ui .controller .AbstractMvcTest ;
33+ import org .sourcelab .kafka .webview .ui .controller .api .requests .ConsumeRequest ;
34+ import org .sourcelab .kafka .webview .ui .manager .socket .WebSocketConsumersManager ;
35+ import org .sourcelab .kafka .webview .ui .model .Cluster ;
3036import org .sourcelab .kafka .webview .ui .model .View ;
37+ import org .sourcelab .kafka .webview .ui .repository .ViewRepository ;
38+ import org .sourcelab .kafka .webview .ui .tools .ClusterTestTools ;
3139import org .sourcelab .kafka .webview .ui .tools .ViewTestTools ;
3240import org .springframework .beans .factory .annotation .Autowired ;
3341import org .springframework .boot .test .autoconfigure .web .servlet .AutoConfigureMockMvc ;
3442import org .springframework .boot .test .context .SpringBootTest ;
43+ import org .springframework .messaging .simp .SimpMessageHeaderAccessor ;
44+ import org .springframework .security .core .Authentication ;
3545import org .springframework .test .context .junit4 .SpringRunner ;
3646import org .springframework .transaction .annotation .Transactional ;
3747
48+ import java .util .ArrayList ;
49+
3850import static org .hamcrest .Matchers .containsString ;
51+ import static org .junit .Assert .assertEquals ;
52+ import static org .mockito .Mockito .mock ;
53+ import static org .mockito .Mockito .when ;
3954import static org .springframework .security .test .web .servlet .request .SecurityMockMvcRequestPostProcessors .user ;
4055import static org .springframework .test .web .servlet .request .MockMvcRequestBuilders .get ;
4156import static org .springframework .test .web .servlet .result .MockMvcResultMatchers .content ;
4560@ SpringBootTest
4661@ AutoConfigureMockMvc
4762public class StreamControllerTest extends AbstractMvcTest {
63+ @ ClassRule
64+ public static SharedKafkaTestResource sharedKafkaTestResource = new SharedKafkaTestResource ();
4865
4966 @ Autowired
5067 private ViewTestTools viewTestTools ;
5168
69+ @ Autowired
70+ private ViewRepository viewRepository ;
71+
72+ @ Autowired
73+ private ClusterTestTools clusterTestTools ;
74+
75+ @ Autowired
76+ private StreamController streamController ;
77+
78+ @ Autowired
79+ private WebSocketConsumersManager webSocketConsumersManager ;
80+
5281 /**
5382 * Ensure authentication is required.
5483 */
@@ -68,6 +97,7 @@ public void testUrlsRequireAuthentication() throws Exception {
6897 @ Test
6998 @ Transactional
7099 public void smokeTestStream () throws Exception {
100+
71101 final View view = viewTestTools .createView ("TestView" );
72102
73103 // Hit the stream page for specified view.
@@ -82,4 +112,68 @@ public void smokeTestStream() throws Exception {
82112 .andExpect (content ().string (containsString ("Switch to View" )))
83113 .andExpect (content ().string (containsString ("href=\" /view/" + view .getId () + "\" " )));
84114 }
115+
116+ /**
117+ * Test starting a new websocket consumer.
118+ * Verifies bug uncovered in ISSUE-212 https://github.com/SourceLabOrg/kafka-webview/issues/212
119+ */
120+ @ Test
121+ public void shouldReceiveAMessageFromTheServer () throws Exception {
122+ final String expectedSessionId = "MYSESSIONID" ;
123+ final String topicName = "TestTopic" + System .currentTimeMillis ();
124+
125+ // Sanity test, no active consumers
126+ Assert .assertEquals ("Should have no active consumers" , 0 , webSocketConsumersManager .countActiveConsumers ());
127+
128+ // Create a topic
129+ sharedKafkaTestResource
130+ .getKafkaTestUtils ()
131+ .createTopic (topicName , 2 , (short ) 1 );
132+
133+ // Create cluster
134+ final Cluster cluster = clusterTestTools
135+ .createCluster ("TestCluster" , sharedKafkaTestResource .getKafkaConnectString ());
136+
137+ // Create view
138+ final View view = viewTestTools
139+ .createViewWithCluster ("TestView" , cluster );
140+
141+ // Sanity test, no enforced partitions
142+ assertEquals ("Partitions Property should be empty string" , "" , view .getPartitions ());
143+
144+ final ConsumeRequest consumeRequest = new ConsumeRequest ();
145+ consumeRequest .setAction ("head" );
146+ consumeRequest .setPartitions ("0,1" );
147+ consumeRequest .setFilters (new ArrayList <>());
148+ consumeRequest .setResultsPerPartition (10 );
149+
150+ final SimpMessageHeaderAccessor mockHeaderAccessor = mock (SimpMessageHeaderAccessor .class );
151+ final Authentication mockPrincipal = mock (Authentication .class );
152+ when (mockHeaderAccessor .getUser ())
153+ .thenReturn (mockPrincipal );
154+ when (mockPrincipal .getPrincipal ())
155+ .thenReturn (nonAdminUserDetails );
156+ when (mockHeaderAccessor .getSessionId ())
157+ .thenReturn (expectedSessionId );
158+
159+ try {
160+ final String result = streamController .newConsumer (
161+ view .getId (),
162+ consumeRequest ,
163+ mockHeaderAccessor
164+ );
165+ assertEquals ("Should return success response" , "{success: true}" , result );
166+
167+ // Verify consumer stood up
168+ assertEquals ("Should have one active consumer" , 1 , webSocketConsumersManager .countActiveConsumers ());
169+
170+ // Lets refresh the view entity and verify the partitions field is still empty.
171+ // Validates ISSUE-212
172+ final View updatedView = viewRepository .findById (view .getId ()).get ();
173+ assertEquals ("Partitions Property should be empty string" , "" , updatedView .getPartitions ());
174+ } finally {
175+ // Cleanup, disconnect websocket consumers
176+ webSocketConsumersManager .removeConsumersForSessionId (expectedSessionId );
177+ }
178+ }
85179}
0 commit comments