Skip to content

Commit ceeab36

Browse files
committedDec 8, 2013

30 files changed

+4454
-4454
lines changed
 

‎gradlew.bat

+90-90
Original file line numberDiff line numberDiff line change
@@ -1,90 +1,90 @@
1-
@if "%DEBUG%" == "" @echo off
2-
@rem ##########################################################################
3-
@rem
4-
@rem Gradle startup script for Windows
5-
@rem
6-
@rem ##########################################################################
7-
8-
@rem Set local scope for the variables with windows NT shell
9-
if "%OS%"=="Windows_NT" setlocal
10-
11-
@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
12-
set DEFAULT_JVM_OPTS=
13-
14-
set DIRNAME=%~dp0
15-
if "%DIRNAME%" == "" set DIRNAME=.
16-
set APP_BASE_NAME=%~n0
17-
set APP_HOME=%DIRNAME%
18-
19-
@rem Find java.exe
20-
if defined JAVA_HOME goto findJavaFromJavaHome
21-
22-
set JAVA_EXE=java.exe
23-
%JAVA_EXE% -version >NUL 2>&1
24-
if "%ERRORLEVEL%" == "0" goto init
25-
26-
echo.
27-
echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
28-
echo.
29-
echo Please set the JAVA_HOME variable in your environment to match the
30-
echo location of your Java installation.
31-
32-
goto fail
33-
34-
:findJavaFromJavaHome
35-
set JAVA_HOME=%JAVA_HOME:"=%
36-
set JAVA_EXE=%JAVA_HOME%/bin/java.exe
37-
38-
if exist "%JAVA_EXE%" goto init
39-
40-
echo.
41-
echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME%
42-
echo.
43-
echo Please set the JAVA_HOME variable in your environment to match the
44-
echo location of your Java installation.
45-
46-
goto fail
47-
48-
:init
49-
@rem Get command-line arguments, handling Windowz variants
50-
51-
if not "%OS%" == "Windows_NT" goto win9xME_args
52-
if "%@eval[2+2]" == "4" goto 4NT_args
53-
54-
:win9xME_args
55-
@rem Slurp the command line arguments.
56-
set CMD_LINE_ARGS=
57-
set _SKIP=2
58-
59-
:win9xME_args_slurp
60-
if "x%~1" == "x" goto execute
61-
62-
set CMD_LINE_ARGS=%*
63-
goto execute
64-
65-
:4NT_args
66-
@rem Get arguments from the 4NT Shell from JP Software
67-
set CMD_LINE_ARGS=%$
68-
69-
:execute
70-
@rem Setup the command line
71-
72-
set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar
73-
74-
@rem Execute Gradle
75-
"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS%
76-
77-
:end
78-
@rem End local scope for the variables with windows NT shell
79-
if "%ERRORLEVEL%"=="0" goto mainEnd
80-
81-
:fail
82-
rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of
83-
rem the _cmd.exe /c_ return code!
84-
if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1
85-
exit /b 1
86-
87-
:mainEnd
88-
if "%OS%"=="Windows_NT" endlocal
89-
90-
:omega
1+
@if "%DEBUG%" == "" @echo off
2+
@rem ##########################################################################
3+
@rem
4+
@rem Gradle startup script for Windows
5+
@rem
6+
@rem ##########################################################################
7+
8+
@rem Set local scope for the variables with windows NT shell
9+
if "%OS%"=="Windows_NT" setlocal
10+
11+
@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
12+
set DEFAULT_JVM_OPTS=
13+
14+
set DIRNAME=%~dp0
15+
if "%DIRNAME%" == "" set DIRNAME=.
16+
set APP_BASE_NAME=%~n0
17+
set APP_HOME=%DIRNAME%
18+
19+
@rem Find java.exe
20+
if defined JAVA_HOME goto findJavaFromJavaHome
21+
22+
set JAVA_EXE=java.exe
23+
%JAVA_EXE% -version >NUL 2>&1
24+
if "%ERRORLEVEL%" == "0" goto init
25+
26+
echo.
27+
echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
28+
echo.
29+
echo Please set the JAVA_HOME variable in your environment to match the
30+
echo location of your Java installation.
31+
32+
goto fail
33+
34+
:findJavaFromJavaHome
35+
set JAVA_HOME=%JAVA_HOME:"=%
36+
set JAVA_EXE=%JAVA_HOME%/bin/java.exe
37+
38+
if exist "%JAVA_EXE%" goto init
39+
40+
echo.
41+
echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME%
42+
echo.
43+
echo Please set the JAVA_HOME variable in your environment to match the
44+
echo location of your Java installation.
45+
46+
goto fail
47+
48+
:init
49+
@rem Get command-line arguments, handling Windowz variants
50+
51+
if not "%OS%" == "Windows_NT" goto win9xME_args
52+
if "%@eval[2+2]" == "4" goto 4NT_args
53+
54+
:win9xME_args
55+
@rem Slurp the command line arguments.
56+
set CMD_LINE_ARGS=
57+
set _SKIP=2
58+
59+
:win9xME_args_slurp
60+
if "x%~1" == "x" goto execute
61+
62+
set CMD_LINE_ARGS=%*
63+
goto execute
64+
65+
:4NT_args
66+
@rem Get arguments from the 4NT Shell from JP Software
67+
set CMD_LINE_ARGS=%$
68+
69+
:execute
70+
@rem Setup the command line
71+
72+
set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar
73+
74+
@rem Execute Gradle
75+
"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS%
76+
77+
:end
78+
@rem End local scope for the variables with windows NT shell
79+
if "%ERRORLEVEL%"=="0" goto mainEnd
80+
81+
:fail
82+
rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of
83+
rem the _cmd.exe /c_ return code!
84+
if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1
85+
exit /b 1
86+
87+
:mainEnd
88+
if "%OS%"=="Windows_NT" endlocal
89+
90+
:omega
Original file line numberDiff line numberDiff line change
@@ -1,49 +1,49 @@
1-
/**
2-
* Copyright 2013 Netflix, Inc.
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5-
* use this file except in compliance with the License. You may obtain a copy of
6-
* the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12-
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13-
* License for the specific language governing permissions and limitations under
14-
* the License.
15-
*/
16-
package rx.joins;
17-
18-
import rx.Notification;
19-
import rx.util.functions.Action0;
20-
import rx.util.functions.Action1;
21-
22-
/**
23-
* Represents an active plan.
24-
*/
25-
public class ActivePlan1<T1> extends ActivePlan0 {
26-
private final Action1<T1> onNext;
27-
private final Action0 onCompleted;
28-
private final JoinObserver1<T1> first;
29-
public ActivePlan1(JoinObserver1<T1> first, Action1<T1> onNext, Action0 onCompleted) {
30-
this.onNext = onNext;
31-
this.onCompleted = onCompleted;
32-
this.first = first;
33-
addJoinObserver(first);
34-
}
35-
36-
@Override
37-
public void match() {
38-
if (!first.queue().isEmpty()) {
39-
Notification<T1> n1 = first.queue().peek();
40-
if (n1.isOnCompleted()) {
41-
onCompleted.call();
42-
} else {
43-
dequeue();
44-
onNext.call(n1.getValue());
45-
}
46-
}
47-
}
48-
49-
}
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.joins;
17+
18+
import rx.Notification;
19+
import rx.util.functions.Action0;
20+
import rx.util.functions.Action1;
21+
22+
/**
23+
* Represents an active plan.
24+
*/
25+
public class ActivePlan1<T1> extends ActivePlan0 {
26+
private final Action1<T1> onNext;
27+
private final Action0 onCompleted;
28+
private final JoinObserver1<T1> first;
29+
public ActivePlan1(JoinObserver1<T1> first, Action1<T1> onNext, Action0 onCompleted) {
30+
this.onNext = onNext;
31+
this.onCompleted = onCompleted;
32+
this.first = first;
33+
addJoinObserver(first);
34+
}
35+
36+
@Override
37+
public void match() {
38+
if (!first.queue().isEmpty()) {
39+
Notification<T1> n1 = first.queue().peek();
40+
if (n1.isOnCompleted()) {
41+
onCompleted.call();
42+
} else {
43+
dequeue();
44+
onNext.call(n1.getValue());
45+
}
46+
}
47+
}
48+
49+
}
Original file line numberDiff line numberDiff line change
@@ -1,54 +1,54 @@
1-
/**
2-
* Copyright 2013 Netflix, Inc.
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5-
* use this file except in compliance with the License. You may obtain a copy of
6-
* the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12-
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13-
* License for the specific language governing permissions and limitations under
14-
* the License.
15-
*/
16-
package rx.joins;
17-
18-
import rx.Notification;
19-
import rx.util.functions.Action0;
20-
import rx.util.functions.Action2;
21-
22-
/**
23-
* Represents an active plan.
24-
*/
25-
public class ActivePlan2<T1, T2> extends ActivePlan0 {
26-
private final Action2<T1, T2> onNext;
27-
private final Action0 onCompleted;
28-
private final JoinObserver1<T1> first;
29-
private final JoinObserver1<T2> second;
30-
public ActivePlan2(JoinObserver1<T1> first, JoinObserver1<T2> second, Action2<T1, T2> onNext, Action0 onCompleted) {
31-
this.onNext = onNext;
32-
this.onCompleted = onCompleted;
33-
this.first = first;
34-
this.second = second;
35-
addJoinObserver(first);
36-
addJoinObserver(second);
37-
}
38-
39-
@Override
40-
public void match() {
41-
if (!first.queue().isEmpty() && !second.queue().isEmpty()) {
42-
Notification<T1> n1 = first.queue().peek();
43-
Notification<T2> n2 = second.queue().peek();
44-
45-
if (n1.isOnCompleted() || n2.isOnCompleted()) {
46-
onCompleted.call();
47-
} else {
48-
dequeue();
49-
onNext.call(n1.getValue(), n2.getValue());
50-
}
51-
}
52-
}
53-
54-
}
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.joins;
17+
18+
import rx.Notification;
19+
import rx.util.functions.Action0;
20+
import rx.util.functions.Action2;
21+
22+
/**
23+
* Represents an active plan.
24+
*/
25+
public class ActivePlan2<T1, T2> extends ActivePlan0 {
26+
private final Action2<T1, T2> onNext;
27+
private final Action0 onCompleted;
28+
private final JoinObserver1<T1> first;
29+
private final JoinObserver1<T2> second;
30+
public ActivePlan2(JoinObserver1<T1> first, JoinObserver1<T2> second, Action2<T1, T2> onNext, Action0 onCompleted) {
31+
this.onNext = onNext;
32+
this.onCompleted = onCompleted;
33+
this.first = first;
34+
this.second = second;
35+
addJoinObserver(first);
36+
addJoinObserver(second);
37+
}
38+
39+
@Override
40+
public void match() {
41+
if (!first.queue().isEmpty() && !second.queue().isEmpty()) {
42+
Notification<T1> n1 = first.queue().peek();
43+
Notification<T2> n2 = second.queue().peek();
44+
45+
if (n1.isOnCompleted() || n2.isOnCompleted()) {
46+
onCompleted.call();
47+
} else {
48+
dequeue();
49+
onNext.call(n1.getValue(), n2.getValue());
50+
}
51+
}
52+
}
53+
54+
}
Original file line numberDiff line numberDiff line change
@@ -1,64 +1,64 @@
1-
/**
2-
* Copyright 2013 Netflix, Inc.
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5-
* use this file except in compliance with the License. You may obtain a copy of
6-
* the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12-
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13-
* License for the specific language governing permissions and limitations under
14-
* the License.
15-
*/
16-
package rx.joins;
17-
18-
import rx.Notification;
19-
import rx.util.functions.Action0;
20-
import rx.util.functions.Action3;
21-
22-
/**
23-
* Represents an active plan.
24-
*/
25-
public class ActivePlan3<T1, T2, T3> extends ActivePlan0 {
26-
private final Action3<T1, T2, T3> onNext;
27-
private final Action0 onCompleted;
28-
private final JoinObserver1<T1> first;
29-
private final JoinObserver1<T2> second;
30-
private final JoinObserver1<T3> third;
31-
public ActivePlan3(JoinObserver1<T1> first,
32-
JoinObserver1<T2> second,
33-
JoinObserver1<T3> third,
34-
Action3<T1, T2, T3> onNext,
35-
Action0 onCompleted) {
36-
this.onNext = onNext;
37-
this.onCompleted = onCompleted;
38-
this.first = first;
39-
this.second = second;
40-
this.third = third;
41-
addJoinObserver(first);
42-
addJoinObserver(second);
43-
addJoinObserver(third);
44-
}
45-
46-
@Override
47-
public void match() {
48-
if (!first.queue().isEmpty()
49-
&& !second.queue().isEmpty()
50-
&& !third.queue().isEmpty()) {
51-
Notification<T1> n1 = first.queue().peek();
52-
Notification<T2> n2 = second.queue().peek();
53-
Notification<T3> n3 = third.queue().peek();
54-
55-
if (n1.isOnCompleted() || n2.isOnCompleted() || n3.isOnCompleted()) {
56-
onCompleted.call();
57-
} else {
58-
dequeue();
59-
onNext.call(n1.getValue(), n2.getValue(), n3.getValue());
60-
}
61-
}
62-
}
63-
64-
}
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.joins;
17+
18+
import rx.Notification;
19+
import rx.util.functions.Action0;
20+
import rx.util.functions.Action3;
21+
22+
/**
23+
* Represents an active plan.
24+
*/
25+
public class ActivePlan3<T1, T2, T3> extends ActivePlan0 {
26+
private final Action3<T1, T2, T3> onNext;
27+
private final Action0 onCompleted;
28+
private final JoinObserver1<T1> first;
29+
private final JoinObserver1<T2> second;
30+
private final JoinObserver1<T3> third;
31+
public ActivePlan3(JoinObserver1<T1> first,
32+
JoinObserver1<T2> second,
33+
JoinObserver1<T3> third,
34+
Action3<T1, T2, T3> onNext,
35+
Action0 onCompleted) {
36+
this.onNext = onNext;
37+
this.onCompleted = onCompleted;
38+
this.first = first;
39+
this.second = second;
40+
this.third = third;
41+
addJoinObserver(first);
42+
addJoinObserver(second);
43+
addJoinObserver(third);
44+
}
45+
46+
@Override
47+
public void match() {
48+
if (!first.queue().isEmpty()
49+
&& !second.queue().isEmpty()
50+
&& !third.queue().isEmpty()) {
51+
Notification<T1> n1 = first.queue().peek();
52+
Notification<T2> n2 = second.queue().peek();
53+
Notification<T3> n3 = third.queue().peek();
54+
55+
if (n1.isOnCompleted() || n2.isOnCompleted() || n3.isOnCompleted()) {
56+
onCompleted.call();
57+
} else {
58+
dequeue();
59+
onNext.call(n1.getValue(), n2.getValue(), n3.getValue());
60+
}
61+
}
62+
}
63+
64+
}
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,26 @@
1-
/**
2-
* Copyright 2013 Netflix, Inc.
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5-
* use this file except in compliance with the License. You may obtain a copy of
6-
* the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12-
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13-
* License for the specific language governing permissions and limitations under
14-
* the License.
15-
*/
16-
package rx.joins;
17-
18-
import rx.Subscription;
19-
20-
/**
21-
* Base interface to manage joined observations.
22-
*/
23-
public interface JoinObserver extends Subscription {
24-
void subscribe(Object gate);
25-
void dequeue();
26-
}
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.joins;
17+
18+
import rx.Subscription;
19+
20+
/**
21+
* Base interface to manage joined observations.
22+
*/
23+
public interface JoinObserver extends Subscription {
24+
void subscribe(Object gate);
25+
void dequeue();
26+
}
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,25 @@
1-
/**
2-
* Copyright 2013 Netflix, Inc.
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License");
5-
* you may not use this file except in compliance with the License.
6-
* You may obtain a copy of the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS,
12-
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
* See the License for the specific language governing permissions and
14-
* limitations under the License.
15-
*/
16-
17-
package rx.joins;
18-
19-
/**
20-
* Base interface for join patterns.
21-
* @see <a href='http://msdn.microsoft.com/en-us/library/system.reactive.joins.pattern(v=vs.103).aspx'>MSDN: Pattern</a>
22-
*/
23-
public interface Pattern {
24-
25-
}
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx.joins;
18+
19+
/**
20+
* Base interface for join patterns.
21+
* @see <a href='http://msdn.microsoft.com/en-us/library/system.reactive.joins.pattern(v=vs.103).aspx'>MSDN: Pattern</a>
22+
*/
23+
public interface Pattern {
24+
25+
}
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,45 @@
1-
/**
2-
* Copyright 2013 Netflix, Inc.
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5-
* use this file except in compliance with the License. You may obtain a copy of
6-
* the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12-
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13-
* License for the specific language governing permissions and limitations under
14-
* the License.
15-
*/
16-
package rx.joins;
17-
18-
import rx.Observable;
19-
import rx.util.functions.Func1;
20-
21-
/**
22-
* Represents a join pattern over one observable sequence.
23-
*/
24-
public class Pattern1<T1> implements Pattern {
25-
private final Observable<T1> first;
26-
public Pattern1(Observable<T1> first) {
27-
this.first = first;
28-
}
29-
public Observable<T1> first() {
30-
return first;
31-
}
32-
/**
33-
* Matches when all observable sequences have an available
34-
* element and projects the elements by invoking the selector function.
35-
* @param selector the function that will be invoked for elements in the source sequences.
36-
* @return
37-
* @throws NullPointerException if selector is null
38-
*/
39-
public <R> Plan0<R> then(Func1<T1, R> selector) {
40-
if (selector == null) {
41-
throw new NullPointerException();
42-
}
43-
return new Plan1<T1, R>(this, selector);
44-
}
45-
}
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.joins;
17+
18+
import rx.Observable;
19+
import rx.util.functions.Func1;
20+
21+
/**
22+
* Represents a join pattern over one observable sequence.
23+
*/
24+
public class Pattern1<T1> implements Pattern {
25+
private final Observable<T1> first;
26+
public Pattern1(Observable<T1> first) {
27+
this.first = first;
28+
}
29+
public Observable<T1> first() {
30+
return first;
31+
}
32+
/**
33+
* Matches when all observable sequences have an available
34+
* element and projects the elements by invoking the selector function.
35+
* @param selector the function that will be invoked for elements in the source sequences.
36+
* @return
37+
* @throws NullPointerException if selector is null
38+
*/
39+
public <R> Plan0<R> then(Func1<T1, R> selector) {
40+
if (selector == null) {
41+
throw new NullPointerException();
42+
}
43+
return new Plan1<T1, R>(this, selector);
44+
}
45+
}
Original file line numberDiff line numberDiff line change
@@ -1,54 +1,54 @@
1-
/**
2-
* Copyright 2013 Netflix, Inc.
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5-
* use this file except in compliance with the License. You may obtain a copy of
6-
* the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12-
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13-
* License for the specific language governing permissions and limitations under
14-
* the License.
15-
*/
16-
package rx.joins;
17-
18-
import rx.Observable;
19-
import rx.util.functions.Func2;
20-
21-
/**
22-
* Represents a join pattern over observable sequences.
23-
*/
24-
public class Pattern2<T1, T2> implements Pattern {
25-
private final Observable<T1> first;
26-
private final Observable<T2> second;
27-
public Pattern2(Observable<T1> first, Observable<T2> second) {
28-
this.first = first;
29-
this.second = second;
30-
}
31-
public Observable<T1> first() {
32-
return first;
33-
}
34-
public Observable<T2> second() {
35-
return second;
36-
}
37-
/**
38-
* Creates a pattern that matches when all three observable sequences have an available element.
39-
* @param other Observable sequence to match with the two previous sequences.
40-
* @return Pattern object that matches when all observable sequences have an available element.
41-
*/
42-
public <T3> Pattern3<T1, T2, T3> and(Observable<T3> other) {
43-
if (other == null) {
44-
throw new NullPointerException();
45-
}
46-
return new Pattern3<T1, T2, T3>(first, second, other);
47-
}
48-
public <R> Plan0<R> then(Func2<T1, T2, R> selector) {
49-
if (selector == null) {
50-
throw new NullPointerException();
51-
}
52-
return new Plan2<T1, T2, R>(this, selector);
53-
}
54-
}
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.joins;
17+
18+
import rx.Observable;
19+
import rx.util.functions.Func2;
20+
21+
/**
22+
* Represents a join pattern over observable sequences.
23+
*/
24+
public class Pattern2<T1, T2> implements Pattern {
25+
private final Observable<T1> first;
26+
private final Observable<T2> second;
27+
public Pattern2(Observable<T1> first, Observable<T2> second) {
28+
this.first = first;
29+
this.second = second;
30+
}
31+
public Observable<T1> first() {
32+
return first;
33+
}
34+
public Observable<T2> second() {
35+
return second;
36+
}
37+
/**
38+
* Creates a pattern that matches when all three observable sequences have an available element.
39+
* @param other Observable sequence to match with the two previous sequences.
40+
* @return Pattern object that matches when all observable sequences have an available element.
41+
*/
42+
public <T3> Pattern3<T1, T2, T3> and(Observable<T3> other) {
43+
if (other == null) {
44+
throw new NullPointerException();
45+
}
46+
return new Pattern3<T1, T2, T3>(first, second, other);
47+
}
48+
public <R> Plan0<R> then(Func2<T1, T2, R> selector) {
49+
if (selector == null) {
50+
throw new NullPointerException();
51+
}
52+
return new Plan2<T1, T2, R>(this, selector);
53+
}
54+
}
Original file line numberDiff line numberDiff line change
@@ -1,55 +1,55 @@
1-
/**
2-
* Copyright 2013 Netflix, Inc.
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5-
* use this file except in compliance with the License. You may obtain a copy of
6-
* the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12-
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13-
* License for the specific language governing permissions and limitations under
14-
* the License.
15-
*/
16-
package rx.joins;
17-
18-
import rx.Observable;
19-
import rx.util.functions.Func3;
20-
21-
/**
22-
* Represents a join pattern over observable sequences.
23-
*/
24-
public class Pattern3<T1, T2, T3> implements Pattern {
25-
private final Observable<T1> first;
26-
private final Observable<T2> second;
27-
private final Observable<T3> third;
28-
public Pattern3(Observable<T1> first, Observable<T2> second,
29-
Observable<T3> third) {
30-
this.first = first;
31-
this.second = second;
32-
this.third = third;
33-
}
34-
public Observable<T1> first() {
35-
return first;
36-
}
37-
public Observable<T2> second() {
38-
return second;
39-
}
40-
public Observable<T3> third() {
41-
return third;
42-
}
43-
// public <T4> Pattern4<T1, T2, T3, T4> and(Observable<T4> other) {
44-
// if (other == null) {
45-
// throw new NullPointerException();
46-
// }
47-
// return new Pattern4<T1, T2, T3, T4>(first, second, third, other);
48-
// }
49-
public <R> Plan0<R> then(Func3<T1, T2, T3, R> selector) {
50-
if (selector == null) {
51-
throw new NullPointerException();
52-
}
53-
return new Plan3<T1, T2, T3, R>(this, selector);
54-
}
55-
}
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.joins;
17+
18+
import rx.Observable;
19+
import rx.util.functions.Func3;
20+
21+
/**
22+
* Represents a join pattern over observable sequences.
23+
*/
24+
public class Pattern3<T1, T2, T3> implements Pattern {
25+
private final Observable<T1> first;
26+
private final Observable<T2> second;
27+
private final Observable<T3> third;
28+
public Pattern3(Observable<T1> first, Observable<T2> second,
29+
Observable<T3> third) {
30+
this.first = first;
31+
this.second = second;
32+
this.third = third;
33+
}
34+
public Observable<T1> first() {
35+
return first;
36+
}
37+
public Observable<T2> second() {
38+
return second;
39+
}
40+
public Observable<T3> third() {
41+
return third;
42+
}
43+
// public <T4> Pattern4<T1, T2, T3, T4> and(Observable<T4> other) {
44+
// if (other == null) {
45+
// throw new NullPointerException();
46+
// }
47+
// return new Pattern4<T1, T2, T3, T4>(first, second, third, other);
48+
// }
49+
public <R> Plan0<R> then(Func3<T1, T2, T3, R> selector) {
50+
if (selector == null) {
51+
throw new NullPointerException();
52+
}
53+
return new Plan3<T1, T2, T3, R>(this, selector);
54+
}
55+
}
+79-79
Original file line numberDiff line numberDiff line change
@@ -1,79 +1,79 @@
1-
/**
2-
* Copyright 2013 Netflix, Inc.
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5-
* use this file except in compliance with the License. You may obtain a copy of
6-
* the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12-
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13-
* License for the specific language governing permissions and limitations under
14-
* the License.
15-
*/
16-
package rx.joins;
17-
18-
import java.util.Map;
19-
import java.util.concurrent.atomic.AtomicReference;
20-
import rx.Observer;
21-
import static rx.joins.Plan0.createObserver;
22-
import rx.util.functions.Action0;
23-
import rx.util.functions.Action1;
24-
import rx.util.functions.Action2;
25-
import rx.util.functions.Actions;
26-
import rx.util.functions.Func2;
27-
28-
/**
29-
* Represents an execution plan for join patterns.
30-
*/
31-
public class Plan2<T1, T2, R> extends Plan0<R> {
32-
protected Pattern2<T1, T2> expression;
33-
protected Func2<T1, T2, R> selector;
34-
public Plan2(Pattern2<T1, T2> expression, Func2<T1, T2, R> selector) {
35-
this.expression = expression;
36-
this.selector = selector;
37-
}
38-
39-
@Override
40-
public ActivePlan0 activate(Map<Object, JoinObserver> externalSubscriptions,
41-
final Observer<R> observer, final Action1<ActivePlan0> deactivate) {
42-
Action1<Throwable> onError = Actions.onErrorFrom(observer);
43-
44-
final JoinObserver1<T1> firstJoinObserver = createObserver(externalSubscriptions, expression.first(), onError);
45-
final JoinObserver1<T2> secondJoinObserver = createObserver(externalSubscriptions, expression.second(), onError);
46-
47-
final AtomicReference<ActivePlan2<T1, T2>> self = new AtomicReference<ActivePlan2<T1, T2>>();
48-
49-
ActivePlan2<T1, T2> activePlan = new ActivePlan2<T1, T2>(firstJoinObserver, secondJoinObserver, new Action2<T1, T2>() {
50-
@Override
51-
public void call(T1 t1, T2 t2) {
52-
R result;
53-
try {
54-
result = selector.call(t1, t2);
55-
} catch (Throwable t) {
56-
observer.onError(t);
57-
return;
58-
}
59-
observer.onNext(result);
60-
}
61-
},
62-
new Action0() {
63-
@Override
64-
public void call() {
65-
firstJoinObserver.removeActivePlan(self.get());
66-
secondJoinObserver.removeActivePlan(self.get());
67-
deactivate.call(self.get());
68-
}
69-
});
70-
71-
self.set(activePlan);
72-
73-
firstJoinObserver.addActivePlan(activePlan);
74-
secondJoinObserver.addActivePlan(activePlan);
75-
76-
return activePlan;
77-
}
78-
79-
}
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.joins;
17+
18+
import java.util.Map;
19+
import java.util.concurrent.atomic.AtomicReference;
20+
import rx.Observer;
21+
import static rx.joins.Plan0.createObserver;
22+
import rx.util.functions.Action0;
23+
import rx.util.functions.Action1;
24+
import rx.util.functions.Action2;
25+
import rx.util.functions.Actions;
26+
import rx.util.functions.Func2;
27+
28+
/**
29+
* Represents an execution plan for join patterns.
30+
*/
31+
public class Plan2<T1, T2, R> extends Plan0<R> {
32+
protected Pattern2<T1, T2> expression;
33+
protected Func2<T1, T2, R> selector;
34+
public Plan2(Pattern2<T1, T2> expression, Func2<T1, T2, R> selector) {
35+
this.expression = expression;
36+
this.selector = selector;
37+
}
38+
39+
@Override
40+
public ActivePlan0 activate(Map<Object, JoinObserver> externalSubscriptions,
41+
final Observer<R> observer, final Action1<ActivePlan0> deactivate) {
42+
Action1<Throwable> onError = Actions.onErrorFrom(observer);
43+
44+
final JoinObserver1<T1> firstJoinObserver = createObserver(externalSubscriptions, expression.first(), onError);
45+
final JoinObserver1<T2> secondJoinObserver = createObserver(externalSubscriptions, expression.second(), onError);
46+
47+
final AtomicReference<ActivePlan2<T1, T2>> self = new AtomicReference<ActivePlan2<T1, T2>>();
48+
49+
ActivePlan2<T1, T2> activePlan = new ActivePlan2<T1, T2>(firstJoinObserver, secondJoinObserver, new Action2<T1, T2>() {
50+
@Override
51+
public void call(T1 t1, T2 t2) {
52+
R result;
53+
try {
54+
result = selector.call(t1, t2);
55+
} catch (Throwable t) {
56+
observer.onError(t);
57+
return;
58+
}
59+
observer.onNext(result);
60+
}
61+
},
62+
new Action0() {
63+
@Override
64+
public void call() {
65+
firstJoinObserver.removeActivePlan(self.get());
66+
secondJoinObserver.removeActivePlan(self.get());
67+
deactivate.call(self.get());
68+
}
69+
});
70+
71+
self.set(activePlan);
72+
73+
firstJoinObserver.addActivePlan(activePlan);
74+
secondJoinObserver.addActivePlan(activePlan);
75+
76+
return activePlan;
77+
}
78+
79+
}
+83-83
Original file line numberDiff line numberDiff line change
@@ -1,83 +1,83 @@
1-
/**
2-
* Copyright 2013 Netflix, Inc.
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5-
* use this file except in compliance with the License. You may obtain a copy of
6-
* the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12-
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13-
* License for the specific language governing permissions and limitations under
14-
* the License.
15-
*/
16-
package rx.joins;
17-
18-
import java.util.Map;
19-
import java.util.concurrent.atomic.AtomicReference;
20-
import rx.Observer;
21-
import static rx.joins.Plan0.createObserver;
22-
import rx.util.functions.Action0;
23-
import rx.util.functions.Action1;
24-
import rx.util.functions.Action3;
25-
import rx.util.functions.Actions;
26-
import rx.util.functions.Func3;
27-
28-
/**
29-
* Represents an execution plan for join patterns.
30-
*/
31-
public class Plan3<T1, T2, T3, R> extends Plan0<R> {
32-
protected Pattern3<T1, T2, T3> expression;
33-
protected Func3<T1, T2, T3, R> selector;
34-
public Plan3(Pattern3<T1, T2, T3> expression, Func3<T1, T2, T3, R> selector) {
35-
this.expression = expression;
36-
this.selector = selector;
37-
}
38-
39-
@Override
40-
public ActivePlan0 activate(Map<Object, JoinObserver> externalSubscriptions,
41-
final Observer<R> observer, final Action1<ActivePlan0> deactivate) {
42-
Action1<Throwable> onError = Actions.onErrorFrom(observer);
43-
44-
final JoinObserver1<T1> firstJoinObserver = createObserver(externalSubscriptions, expression.first(), onError);
45-
final JoinObserver1<T2> secondJoinObserver = createObserver(externalSubscriptions, expression.second(), onError);
46-
final JoinObserver1<T3> thirdJoinObserver = createObserver(externalSubscriptions, expression.third(), onError);
47-
48-
final AtomicReference<ActivePlan3<T1, T2, T3>> self = new AtomicReference<ActivePlan3<T1, T2, T3>>();
49-
50-
ActivePlan3<T1, T2, T3> activePlan = new ActivePlan3<T1, T2, T3>(firstJoinObserver, secondJoinObserver,
51-
thirdJoinObserver, new Action3<T1, T2, T3>() {
52-
@Override
53-
public void call(T1 t1, T2 t2, T3 t3) {
54-
R result;
55-
try {
56-
result = selector.call(t1, t2, t3);
57-
} catch (Throwable t) {
58-
observer.onError(t);
59-
return;
60-
}
61-
observer.onNext(result);
62-
}
63-
},
64-
new Action0() {
65-
@Override
66-
public void call() {
67-
firstJoinObserver.removeActivePlan(self.get());
68-
secondJoinObserver.removeActivePlan(self.get());
69-
thirdJoinObserver.removeActivePlan(self.get());
70-
deactivate.call(self.get());
71-
}
72-
});
73-
74-
self.set(activePlan);
75-
76-
firstJoinObserver.addActivePlan(activePlan);
77-
secondJoinObserver.addActivePlan(activePlan);
78-
thirdJoinObserver.addActivePlan(activePlan);
79-
80-
return activePlan;
81-
}
82-
83-
}
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.joins;
17+
18+
import java.util.Map;
19+
import java.util.concurrent.atomic.AtomicReference;
20+
import rx.Observer;
21+
import static rx.joins.Plan0.createObserver;
22+
import rx.util.functions.Action0;
23+
import rx.util.functions.Action1;
24+
import rx.util.functions.Action3;
25+
import rx.util.functions.Actions;
26+
import rx.util.functions.Func3;
27+
28+
/**
29+
* Represents an execution plan for join patterns.
30+
*/
31+
public class Plan3<T1, T2, T3, R> extends Plan0<R> {
32+
protected Pattern3<T1, T2, T3> expression;
33+
protected Func3<T1, T2, T3, R> selector;
34+
public Plan3(Pattern3<T1, T2, T3> expression, Func3<T1, T2, T3, R> selector) {
35+
this.expression = expression;
36+
this.selector = selector;
37+
}
38+
39+
@Override
40+
public ActivePlan0 activate(Map<Object, JoinObserver> externalSubscriptions,
41+
final Observer<R> observer, final Action1<ActivePlan0> deactivate) {
42+
Action1<Throwable> onError = Actions.onErrorFrom(observer);
43+
44+
final JoinObserver1<T1> firstJoinObserver = createObserver(externalSubscriptions, expression.first(), onError);
45+
final JoinObserver1<T2> secondJoinObserver = createObserver(externalSubscriptions, expression.second(), onError);
46+
final JoinObserver1<T3> thirdJoinObserver = createObserver(externalSubscriptions, expression.third(), onError);
47+
48+
final AtomicReference<ActivePlan3<T1, T2, T3>> self = new AtomicReference<ActivePlan3<T1, T2, T3>>();
49+
50+
ActivePlan3<T1, T2, T3> activePlan = new ActivePlan3<T1, T2, T3>(firstJoinObserver, secondJoinObserver,
51+
thirdJoinObserver, new Action3<T1, T2, T3>() {
52+
@Override
53+
public void call(T1 t1, T2 t2, T3 t3) {
54+
R result;
55+
try {
56+
result = selector.call(t1, t2, t3);
57+
} catch (Throwable t) {
58+
observer.onError(t);
59+
return;
60+
}
61+
observer.onNext(result);
62+
}
63+
},
64+
new Action0() {
65+
@Override
66+
public void call() {
67+
firstJoinObserver.removeActivePlan(self.get());
68+
secondJoinObserver.removeActivePlan(self.get());
69+
thirdJoinObserver.removeActivePlan(self.get());
70+
deactivate.call(self.get());
71+
}
72+
});
73+
74+
self.set(activePlan);
75+
76+
firstJoinObserver.addActivePlan(activePlan);
77+
secondJoinObserver.addActivePlan(activePlan);
78+
thirdJoinObserver.addActivePlan(activePlan);
79+
80+
return activePlan;
81+
}
82+
83+
}
Original file line numberDiff line numberDiff line change
@@ -1,131 +1,131 @@
1-
/**
2-
* Copyright 2013 Netflix, Inc.
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5-
* use this file except in compliance with the License. You may obtain a copy of
6-
* the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12-
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13-
* License for the specific language governing permissions and limitations under
14-
* the License.
15-
*/
16-
package rx.operators;
17-
18-
import java.util.ArrayList;
19-
import java.util.Arrays;
20-
import java.util.HashMap;
21-
import java.util.List;
22-
import java.util.Map;
23-
import rx.Observable;
24-
import rx.Observable.OnSubscribeFunc;
25-
import rx.Observer;
26-
import rx.Subscription;
27-
import rx.joins.ActivePlan0;
28-
import rx.joins.JoinObserver;
29-
import rx.joins.Pattern1;
30-
import rx.joins.Pattern2;
31-
import rx.joins.Plan0;
32-
import rx.subjects.PublishSubject;
33-
import rx.subscriptions.CompositeSubscription;
34-
import rx.util.functions.Action1;
35-
import rx.util.functions.Func1;
36-
import rx.util.functions.Func2;
37-
38-
/**
39-
* Join patterns: And, Then, When.
40-
*/
41-
public class OperationJoinPatterns {
42-
/**
43-
* Creates a pattern that matches when both observable sequences have an available element.
44-
*/
45-
public static <T1, T2> Pattern2<T1, T2> and(/* this */Observable<T1> left, Observable<T2> right) {
46-
if (left == null) {
47-
throw new NullPointerException("left");
48-
}
49-
if (right == null) {
50-
throw new NullPointerException("right");
51-
}
52-
return new Pattern2<T1, T2>(left, right);
53-
}
54-
/**
55-
* Matches when the observable sequence has an available element and projects the element by invoking the selector function.
56-
*/
57-
public static <T1, R> Plan0<R> then(/* this */Observable<T1> source, Func1<T1, R> selector) {
58-
if (source == null) {
59-
throw new NullPointerException("source");
60-
}
61-
if (selector == null) {
62-
throw new NullPointerException("selector");
63-
}
64-
return new Pattern1<T1>(source).then(selector);
65-
}
66-
/**
67-
* Joins together the results from several patterns.
68-
*/
69-
public static <R> OnSubscribeFunc<R> when(Plan0<R>... plans) {
70-
if (plans == null) {
71-
throw new NullPointerException("plans");
72-
}
73-
return when(Arrays.asList(plans));
74-
}
75-
/**
76-
* Joins together the results from several patterns.
77-
*/
78-
public static <R> OnSubscribeFunc<R> when(final Iterable<? extends Plan0<R>> plans) {
79-
if (plans == null) {
80-
throw new NullPointerException("plans");
81-
}
82-
return new OnSubscribeFunc<R>() {
83-
@Override
84-
public Subscription onSubscribe(final Observer<? super R> t1) {
85-
final Map<Object, JoinObserver> externalSubscriptions = new HashMap<Object, JoinObserver>();
86-
final Object gate = new Object();
87-
final List<ActivePlan0> activePlans = new ArrayList<ActivePlan0>();
88-
89-
final Observer<R> out = new Observer<R>() {
90-
@Override
91-
public void onNext(R args) {
92-
t1.onNext(args);
93-
}
94-
@Override
95-
public void onError(Throwable e) {
96-
for (JoinObserver po : externalSubscriptions.values()) {
97-
po.unsubscribe();
98-
}
99-
t1.onError(e);
100-
}
101-
@Override
102-
public void onCompleted() {
103-
t1.onCompleted();
104-
}
105-
};
106-
107-
try {
108-
for (Plan0<R> plan : plans) {
109-
activePlans.add(plan.activate(externalSubscriptions, out, new Action1<ActivePlan0>() {
110-
@Override
111-
public void call(ActivePlan0 activePlan) {
112-
activePlans.remove(activePlan);
113-
if (activePlans.isEmpty()) {
114-
out.onCompleted();
115-
}
116-
}
117-
}));
118-
}
119-
} catch (Throwable t) {
120-
return Observable.<R>error(t).subscribe(t1);
121-
}
122-
CompositeSubscription group = new CompositeSubscription();
123-
for (JoinObserver jo : externalSubscriptions.values()) {
124-
jo.subscribe(gate);
125-
group.add(jo);
126-
}
127-
return group;
128-
}
129-
};
130-
}
131-
}
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.operators;
17+
18+
import java.util.ArrayList;
19+
import java.util.Arrays;
20+
import java.util.HashMap;
21+
import java.util.List;
22+
import java.util.Map;
23+
import rx.Observable;
24+
import rx.Observable.OnSubscribeFunc;
25+
import rx.Observer;
26+
import rx.Subscription;
27+
import rx.joins.ActivePlan0;
28+
import rx.joins.JoinObserver;
29+
import rx.joins.Pattern1;
30+
import rx.joins.Pattern2;
31+
import rx.joins.Plan0;
32+
import rx.subjects.PublishSubject;
33+
import rx.subscriptions.CompositeSubscription;
34+
import rx.util.functions.Action1;
35+
import rx.util.functions.Func1;
36+
import rx.util.functions.Func2;
37+
38+
/**
39+
* Join patterns: And, Then, When.
40+
*/
41+
public class OperationJoinPatterns {
42+
/**
43+
* Creates a pattern that matches when both observable sequences have an available element.
44+
*/
45+
public static <T1, T2> Pattern2<T1, T2> and(/* this */Observable<T1> left, Observable<T2> right) {
46+
if (left == null) {
47+
throw new NullPointerException("left");
48+
}
49+
if (right == null) {
50+
throw new NullPointerException("right");
51+
}
52+
return new Pattern2<T1, T2>(left, right);
53+
}
54+
/**
55+
* Matches when the observable sequence has an available element and projects the element by invoking the selector function.
56+
*/
57+
public static <T1, R> Plan0<R> then(/* this */Observable<T1> source, Func1<T1, R> selector) {
58+
if (source == null) {
59+
throw new NullPointerException("source");
60+
}
61+
if (selector == null) {
62+
throw new NullPointerException("selector");
63+
}
64+
return new Pattern1<T1>(source).then(selector);
65+
}
66+
/**
67+
* Joins together the results from several patterns.
68+
*/
69+
public static <R> OnSubscribeFunc<R> when(Plan0<R>... plans) {
70+
if (plans == null) {
71+
throw new NullPointerException("plans");
72+
}
73+
return when(Arrays.asList(plans));
74+
}
75+
/**
76+
* Joins together the results from several patterns.
77+
*/
78+
public static <R> OnSubscribeFunc<R> when(final Iterable<? extends Plan0<R>> plans) {
79+
if (plans == null) {
80+
throw new NullPointerException("plans");
81+
}
82+
return new OnSubscribeFunc<R>() {
83+
@Override
84+
public Subscription onSubscribe(final Observer<? super R> t1) {
85+
final Map<Object, JoinObserver> externalSubscriptions = new HashMap<Object, JoinObserver>();
86+
final Object gate = new Object();
87+
final List<ActivePlan0> activePlans = new ArrayList<ActivePlan0>();
88+
89+
final Observer<R> out = new Observer<R>() {
90+
@Override
91+
public void onNext(R args) {
92+
t1.onNext(args);
93+
}
94+
@Override
95+
public void onError(Throwable e) {
96+
for (JoinObserver po : externalSubscriptions.values()) {
97+
po.unsubscribe();
98+
}
99+
t1.onError(e);
100+
}
101+
@Override
102+
public void onCompleted() {
103+
t1.onCompleted();
104+
}
105+
};
106+
107+
try {
108+
for (Plan0<R> plan : plans) {
109+
activePlans.add(plan.activate(externalSubscriptions, out, new Action1<ActivePlan0>() {
110+
@Override
111+
public void call(ActivePlan0 activePlan) {
112+
activePlans.remove(activePlan);
113+
if (activePlans.isEmpty()) {
114+
out.onCompleted();
115+
}
116+
}
117+
}));
118+
}
119+
} catch (Throwable t) {
120+
return Observable.<R>error(t).subscribe(t1);
121+
}
122+
CompositeSubscription group = new CompositeSubscription();
123+
for (JoinObserver jo : externalSubscriptions.values()) {
124+
jo.subscribe(gate);
125+
group.add(jo);
126+
}
127+
return group;
128+
}
129+
};
130+
}
131+
}
Original file line numberDiff line numberDiff line change
@@ -1,159 +1,159 @@
1-
/**
2-
* Copyright 2013 Netflix, Inc.
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License");
5-
* you may not use this file except in compliance with the License.
6-
* You may obtain a copy of the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS,
12-
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
* See the License for the specific language governing permissions and
14-
* limitations under the License.
15-
*/
16-
17-
package rx.operators;
18-
19-
import java.util.HashMap;
20-
import java.util.Map;
21-
import rx.Observable;
22-
import rx.Observable.OnSubscribeFunc;
23-
import rx.Observer;
24-
import rx.Subscription;
25-
import rx.subscriptions.Subscriptions;
26-
import rx.util.functions.Func0;
27-
import rx.util.functions.Func1;
28-
import rx.util.functions.Functions;
29-
30-
/**
31-
* Maps the elements of the source observable into a java.util.Map instance and
32-
* emits that once the source observable completes.
33-
*
34-
* @see <a href='https://github.com/Netflix/RxJava/issues/96'>Issue #96</a>
35-
*/
36-
public class OperationToMap {
37-
/**
38-
* ToMap with key selector, identity value selector and default HashMap factory.
39-
*/
40-
public static <T, K> OnSubscribeFunc<Map<K, T>> toMap(Observable<T> source,
41-
Func1<? super T, ? extends K> keySelector) {
42-
return new ToMap<T, K, T>(source, keySelector,
43-
Functions.<T>identity(), new DefaultToMapFactory<K, T>());
44-
}
45-
46-
/**
47-
* ToMap with key selector, value selector and default HashMap factory.
48-
*/
49-
public static <T, K, V> OnSubscribeFunc<Map<K, V>> toMap(Observable<T> source,
50-
Func1<? super T, ? extends K> keySelector,
51-
Func1<? super T, ? extends V> valueSelector) {
52-
return new ToMap<T, K, V>(source, keySelector,
53-
valueSelector, new DefaultToMapFactory<K, V>());
54-
}
55-
56-
/**
57-
* ToMap with key selector, value selector and custom Map factory.
58-
*/
59-
public static <T, K, V> OnSubscribeFunc<Map<K, V>> toMap(Observable<T> source,
60-
Func1<? super T, ? extends K> keySelector,
61-
Func1<? super T, ? extends V> valueSelector,
62-
Func0<? extends Map<K, V>> mapFactory) {
63-
return new ToMap<T, K, V>(source, keySelector,
64-
valueSelector, mapFactory);
65-
}
66-
67-
/** The default map factory. */
68-
public static class DefaultToMapFactory<K, V> implements Func0<Map<K, V>> {
69-
@Override
70-
public Map<K, V> call() {
71-
return new HashMap<K, V>();
72-
}
73-
}
74-
/**
75-
* Maps the elements of the source observable into a java.util.Map instance
76-
* returned by the mapFactory function by using the keySelector and
77-
* valueSelector.
78-
* @param <T> the source's value type
79-
* @param <K> the key type
80-
* @param <V> the value type
81-
*/
82-
public static class ToMap<T, K, V> implements OnSubscribeFunc<Map<K, V>> {
83-
/** The source. */
84-
private final Observable<T> source;
85-
/** Key extractor. */
86-
private final Func1<? super T, ? extends K> keySelector;
87-
/** Value extractor. */
88-
private final Func1<? super T, ? extends V> valueSelector;
89-
/** Map factory. */
90-
private final Func0<? extends Map<K, V>> mapFactory;
91-
public ToMap(
92-
Observable<T> source,
93-
Func1<? super T, ? extends K> keySelector,
94-
Func1<? super T, ? extends V> valueSelector,
95-
Func0<? extends Map<K, V>> mapFactory
96-
) {
97-
this.source = source;
98-
this.keySelector = keySelector;
99-
this.valueSelector = valueSelector;
100-
this.mapFactory = mapFactory;
101-
102-
}
103-
@Override
104-
public Subscription onSubscribe(Observer<? super Map<K, V>> t1) {
105-
Map<K, V> map;
106-
try {
107-
map = mapFactory.call();
108-
} catch (Throwable t) {
109-
t1.onError(t);
110-
return Subscriptions.empty();
111-
}
112-
return source.subscribe(new ToMapObserver<K, V, T>(
113-
t1, keySelector, valueSelector, map));
114-
}
115-
/**
116-
* Observer that collects the source values of T into
117-
* a map.
118-
*/
119-
public static class ToMapObserver<K, V, T> implements Observer<T> {
120-
/** The map. */
121-
Map<K, V> map;
122-
/** Key extractor. */
123-
private final Func1<? super T, ? extends K> keySelector;
124-
/** Value extractor. */
125-
private final Func1<? super T, ? extends V> valueSelector;
126-
/** The observer who is receiving the completed map. */
127-
private final Observer<? super Map<K, V>> t1;
128-
129-
public ToMapObserver(
130-
Observer<? super Map<K, V>> t1,
131-
Func1<? super T, ? extends K> keySelector,
132-
Func1<? super T, ? extends V> valueSelector,
133-
Map<K, V> map) {
134-
this.map = map;
135-
this.t1 = t1;
136-
this.keySelector = keySelector;
137-
this.valueSelector = valueSelector;
138-
}
139-
@Override
140-
public void onNext(T args) {
141-
K key = keySelector.call(args);
142-
V value = valueSelector.call(args);
143-
map.put(key, value);
144-
}
145-
@Override
146-
public void onError(Throwable e) {
147-
map = null;
148-
t1.onError(e);
149-
}
150-
@Override
151-
public void onCompleted() {
152-
Map<K, V> map0 = map;
153-
map = null;
154-
t1.onNext(map0);
155-
t1.onCompleted();
156-
}
157-
}
158-
}
159-
}
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx.operators;
18+
19+
import java.util.HashMap;
20+
import java.util.Map;
21+
import rx.Observable;
22+
import rx.Observable.OnSubscribeFunc;
23+
import rx.Observer;
24+
import rx.Subscription;
25+
import rx.subscriptions.Subscriptions;
26+
import rx.util.functions.Func0;
27+
import rx.util.functions.Func1;
28+
import rx.util.functions.Functions;
29+
30+
/**
31+
* Maps the elements of the source observable into a java.util.Map instance and
32+
* emits that once the source observable completes.
33+
*
34+
* @see <a href='https://github.com/Netflix/RxJava/issues/96'>Issue #96</a>
35+
*/
36+
public class OperationToMap {
37+
/**
38+
* ToMap with key selector, identity value selector and default HashMap factory.
39+
*/
40+
public static <T, K> OnSubscribeFunc<Map<K, T>> toMap(Observable<T> source,
41+
Func1<? super T, ? extends K> keySelector) {
42+
return new ToMap<T, K, T>(source, keySelector,
43+
Functions.<T>identity(), new DefaultToMapFactory<K, T>());
44+
}
45+
46+
/**
47+
* ToMap with key selector, value selector and default HashMap factory.
48+
*/
49+
public static <T, K, V> OnSubscribeFunc<Map<K, V>> toMap(Observable<T> source,
50+
Func1<? super T, ? extends K> keySelector,
51+
Func1<? super T, ? extends V> valueSelector) {
52+
return new ToMap<T, K, V>(source, keySelector,
53+
valueSelector, new DefaultToMapFactory<K, V>());
54+
}
55+
56+
/**
57+
* ToMap with key selector, value selector and custom Map factory.
58+
*/
59+
public static <T, K, V> OnSubscribeFunc<Map<K, V>> toMap(Observable<T> source,
60+
Func1<? super T, ? extends K> keySelector,
61+
Func1<? super T, ? extends V> valueSelector,
62+
Func0<? extends Map<K, V>> mapFactory) {
63+
return new ToMap<T, K, V>(source, keySelector,
64+
valueSelector, mapFactory);
65+
}
66+
67+
/** The default map factory. */
68+
public static class DefaultToMapFactory<K, V> implements Func0<Map<K, V>> {
69+
@Override
70+
public Map<K, V> call() {
71+
return new HashMap<K, V>();
72+
}
73+
}
74+
/**
75+
* Maps the elements of the source observable into a java.util.Map instance
76+
* returned by the mapFactory function by using the keySelector and
77+
* valueSelector.
78+
* @param <T> the source's value type
79+
* @param <K> the key type
80+
* @param <V> the value type
81+
*/
82+
public static class ToMap<T, K, V> implements OnSubscribeFunc<Map<K, V>> {
83+
/** The source. */
84+
private final Observable<T> source;
85+
/** Key extractor. */
86+
private final Func1<? super T, ? extends K> keySelector;
87+
/** Value extractor. */
88+
private final Func1<? super T, ? extends V> valueSelector;
89+
/** Map factory. */
90+
private final Func0<? extends Map<K, V>> mapFactory;
91+
public ToMap(
92+
Observable<T> source,
93+
Func1<? super T, ? extends K> keySelector,
94+
Func1<? super T, ? extends V> valueSelector,
95+
Func0<? extends Map<K, V>> mapFactory
96+
) {
97+
this.source = source;
98+
this.keySelector = keySelector;
99+
this.valueSelector = valueSelector;
100+
this.mapFactory = mapFactory;
101+
102+
}
103+
@Override
104+
public Subscription onSubscribe(Observer<? super Map<K, V>> t1) {
105+
Map<K, V> map;
106+
try {
107+
map = mapFactory.call();
108+
} catch (Throwable t) {
109+
t1.onError(t);
110+
return Subscriptions.empty();
111+
}
112+
return source.subscribe(new ToMapObserver<K, V, T>(
113+
t1, keySelector, valueSelector, map));
114+
}
115+
/**
116+
* Observer that collects the source values of T into
117+
* a map.
118+
*/
119+
public static class ToMapObserver<K, V, T> implements Observer<T> {
120+
/** The map. */
121+
Map<K, V> map;
122+
/** Key extractor. */
123+
private final Func1<? super T, ? extends K> keySelector;
124+
/** Value extractor. */
125+
private final Func1<? super T, ? extends V> valueSelector;
126+
/** The observer who is receiving the completed map. */
127+
private final Observer<? super Map<K, V>> t1;
128+
129+
public ToMapObserver(
130+
Observer<? super Map<K, V>> t1,
131+
Func1<? super T, ? extends K> keySelector,
132+
Func1<? super T, ? extends V> valueSelector,
133+
Map<K, V> map) {
134+
this.map = map;
135+
this.t1 = t1;
136+
this.keySelector = keySelector;
137+
this.valueSelector = valueSelector;
138+
}
139+
@Override
140+
public void onNext(T args) {
141+
K key = keySelector.call(args);
142+
V value = valueSelector.call(args);
143+
map.put(key, value);
144+
}
145+
@Override
146+
public void onError(Throwable e) {
147+
map = null;
148+
t1.onError(e);
149+
}
150+
@Override
151+
public void onCompleted() {
152+
Map<K, V> map0 = map;
153+
map = null;
154+
t1.onNext(map0);
155+
t1.onCompleted();
156+
}
157+
}
158+
}
159+
}

‎rxjava-core/src/main/java/rx/operators/OperationToMultimap.java

+206-206
Large diffs are not rendered by default.
Original file line numberDiff line numberDiff line change
@@ -1,62 +1,62 @@
1-
/**
2-
* Copyright 2013 Netflix, Inc.
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License");
5-
* you may not use this file except in compliance with the License.
6-
* You may obtain a copy of the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS,
12-
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
* See the License for the specific language governing permissions and
14-
* limitations under the License.
15-
*/
16-
package rx.subscriptions;
17-
18-
import static rx.subscriptions.Subscriptions.empty;
19-
20-
import java.util.concurrent.atomic.AtomicReference;
21-
22-
import rx.Subscription;
23-
24-
/**
25-
* Represents a subscription whose underlying subscription can be swapped for another subscription
26-
* which causes the previous underlying subscription to be unsubscribed.
27-
*
28-
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.disposables.serialdisposable(v=vs.103).aspx">Rx.Net equivalent SerialDisposable</a>
29-
*/
30-
public class SerialSubscription implements Subscription {
31-
private final AtomicReference<Subscription> reference = new AtomicReference<Subscription>(empty());
32-
33-
private static final Subscription UNSUBSCRIBED = new Subscription() {
34-
@Override
35-
public void unsubscribe() {
36-
}
37-
};
38-
39-
@Override
40-
public void unsubscribe() {
41-
setSubscription(UNSUBSCRIBED);
42-
}
43-
44-
public void setSubscription(final Subscription subscription) {
45-
do {
46-
final Subscription current = reference.get();
47-
if (current == UNSUBSCRIBED) {
48-
subscription.unsubscribe();
49-
break;
50-
}
51-
if (reference.compareAndSet(current, subscription)) {
52-
current.unsubscribe();
53-
break;
54-
}
55-
} while (true);
56-
}
57-
58-
public Subscription getSubscription() {
59-
final Subscription subscription = reference.get();
60-
return subscription == UNSUBSCRIBED ? null : subscription;
61-
}
62-
}
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.subscriptions;
17+
18+
import static rx.subscriptions.Subscriptions.empty;
19+
20+
import java.util.concurrent.atomic.AtomicReference;
21+
22+
import rx.Subscription;
23+
24+
/**
25+
* Represents a subscription whose underlying subscription can be swapped for another subscription
26+
* which causes the previous underlying subscription to be unsubscribed.
27+
*
28+
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.disposables.serialdisposable(v=vs.103).aspx">Rx.Net equivalent SerialDisposable</a>
29+
*/
30+
public class SerialSubscription implements Subscription {
31+
private final AtomicReference<Subscription> reference = new AtomicReference<Subscription>(empty());
32+
33+
private static final Subscription UNSUBSCRIBED = new Subscription() {
34+
@Override
35+
public void unsubscribe() {
36+
}
37+
};
38+
39+
@Override
40+
public void unsubscribe() {
41+
setSubscription(UNSUBSCRIBED);
42+
}
43+
44+
public void setSubscription(final Subscription subscription) {
45+
do {
46+
final Subscription current = reference.get();
47+
if (current == UNSUBSCRIBED) {
48+
subscription.unsubscribe();
49+
break;
50+
}
51+
if (reference.compareAndSet(current, subscription)) {
52+
current.unsubscribe();
53+
break;
54+
}
55+
} while (true);
56+
}
57+
58+
public Subscription getSubscription() {
59+
final Subscription subscription = reference.get();
60+
return subscription == UNSUBSCRIBED ? null : subscription;
61+
}
62+
}
Original file line numberDiff line numberDiff line change
@@ -1,81 +1,81 @@
1-
/**
2-
* Copyright 2013 Netflix, Inc.
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License");
5-
* you may not use this file except in compliance with the License.
6-
* You may obtain a copy of the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS,
12-
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
* See the License for the specific language governing permissions and
14-
* limitations under the License.
15-
*/
16-
17-
package rx.subscriptions;
18-
19-
import java.util.concurrent.atomic.AtomicReference;
20-
import rx.Subscription;
21-
22-
/**
23-
* A subscription that allows only a single resource to be assigned.
24-
* <p>
25-
* If this subscription is live, no other subscription may be set() and
26-
* yields an {@link IllegalStateException}.
27-
* <p>
28-
* If the unsubscribe has been called, setting a new subscription will
29-
* unsubscribe it immediately.
30-
*/
31-
public final class SingleAssignmentSubscription implements Subscription {
32-
/** Holds the current resource. */
33-
private final AtomicReference<Subscription> current = new AtomicReference<Subscription>();
34-
/** Sentinel for the unsubscribed state. */
35-
private static final Subscription SENTINEL = new Subscription() {
36-
@Override
37-
public void unsubscribe() {
38-
}
39-
};
40-
/**
41-
* Returns the current subscription or null if not yet set.
42-
*/
43-
public Subscription get() {
44-
Subscription s = current.get();
45-
if (s == SENTINEL) {
46-
return Subscriptions.empty();
47-
}
48-
return s;
49-
}
50-
/**
51-
* Sets a new subscription if not already set.
52-
* @param s the new subscription
53-
* @throws IllegalStateException if this subscription is live and contains
54-
* another subscription.
55-
*/
56-
public void set(Subscription s) {
57-
if (current.compareAndSet(null, s)) {
58-
return;
59-
}
60-
if (current.get() != SENTINEL) {
61-
throw new IllegalStateException("Subscription already set");
62-
}
63-
if (s != null) {
64-
s.unsubscribe();
65-
}
66-
}
67-
@Override
68-
public void unsubscribe() {
69-
Subscription old = current.getAndSet(SENTINEL);
70-
if (old != null) {
71-
old.unsubscribe();
72-
}
73-
}
74-
/**
75-
* Test if this subscription is already unsubscribed.
76-
*/
77-
public boolean isUnsubscribed() {
78-
return current.get() == SENTINEL;
79-
}
80-
81-
}
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx.subscriptions;
18+
19+
import java.util.concurrent.atomic.AtomicReference;
20+
import rx.Subscription;
21+
22+
/**
23+
* A subscription that allows only a single resource to be assigned.
24+
* <p>
25+
* If this subscription is live, no other subscription may be set() and
26+
* yields an {@link IllegalStateException}.
27+
* <p>
28+
* If the unsubscribe has been called, setting a new subscription will
29+
* unsubscribe it immediately.
30+
*/
31+
public final class SingleAssignmentSubscription implements Subscription {
32+
/** Holds the current resource. */
33+
private final AtomicReference<Subscription> current = new AtomicReference<Subscription>();
34+
/** Sentinel for the unsubscribed state. */
35+
private static final Subscription SENTINEL = new Subscription() {
36+
@Override
37+
public void unsubscribe() {
38+
}
39+
};
40+
/**
41+
* Returns the current subscription or null if not yet set.
42+
*/
43+
public Subscription get() {
44+
Subscription s = current.get();
45+
if (s == SENTINEL) {
46+
return Subscriptions.empty();
47+
}
48+
return s;
49+
}
50+
/**
51+
* Sets a new subscription if not already set.
52+
* @param s the new subscription
53+
* @throws IllegalStateException if this subscription is live and contains
54+
* another subscription.
55+
*/
56+
public void set(Subscription s) {
57+
if (current.compareAndSet(null, s)) {
58+
return;
59+
}
60+
if (current.get() != SENTINEL) {
61+
throw new IllegalStateException("Subscription already set");
62+
}
63+
if (s != null) {
64+
s.unsubscribe();
65+
}
66+
}
67+
@Override
68+
public void unsubscribe() {
69+
Subscription old = current.getAndSet(SENTINEL);
70+
if (old != null) {
71+
old.unsubscribe();
72+
}
73+
}
74+
/**
75+
* Test if this subscription is already unsubscribed.
76+
*/
77+
public boolean isUnsubscribed() {
78+
return current.get() == SENTINEL;
79+
}
80+
81+
}
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,24 @@
1-
/**
2-
* Copyright 2013 Netflix, Inc.
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License");
5-
* you may not use this file except in compliance with the License.
6-
* You may obtain a copy of the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS,
12-
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
* See the License for the specific language governing permissions and
14-
* limitations under the License.
15-
*/
16-
17-
package rx.util.functions;
18-
19-
/**
20-
* A four-argument action.
21-
*/
22-
public interface Action4<T1, T2, T3, T4> extends Action {
23-
void call(T1 t1, T2 t2, T3 t3, T4 t4);
24-
}
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx.util.functions;
18+
19+
/**
20+
* A four-argument action.
21+
*/
22+
public interface Action4<T1, T2, T3, T4> extends Action {
23+
void call(T1 t1, T2 t2, T3 t3, T4 t4);
24+
}
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,24 @@
1-
/**
2-
* Copyright 2013 Netflix, Inc.
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License");
5-
* you may not use this file except in compliance with the License.
6-
* You may obtain a copy of the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS,
12-
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
* See the License for the specific language governing permissions and
14-
* limitations under the License.
15-
*/
16-
17-
package rx.util.functions;
18-
19-
/**
20-
* A five-argument action.
21-
*/
22-
public interface Action5<T1, T2, T3, T4, T5> extends Action {
23-
void call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5);
24-
}
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx.util.functions;
18+
19+
/**
20+
* A five-argument action.
21+
*/
22+
public interface Action5<T1, T2, T3, T4, T5> extends Action {
23+
void call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5);
24+
}
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,24 @@
1-
/**
2-
* Copyright 2013 Netflix, Inc.
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License");
5-
* you may not use this file except in compliance with the License.
6-
* You may obtain a copy of the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS,
12-
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
* See the License for the specific language governing permissions and
14-
* limitations under the License.
15-
*/
16-
17-
package rx.util.functions;
18-
19-
/**
20-
* A six-argument action.
21-
*/
22-
public interface Action6<T1, T2, T3, T4, T5, T6> extends Action {
23-
void call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6);
24-
}
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx.util.functions;
18+
19+
/**
20+
* A six-argument action.
21+
*/
22+
public interface Action6<T1, T2, T3, T4, T5, T6> extends Action {
23+
void call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6);
24+
}
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,23 @@
1-
/**
2-
* Copyright 2013 Netflix, Inc.
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License");
5-
* you may not use this file except in compliance with the License.
6-
* You may obtain a copy of the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS,
12-
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
* See the License for the specific language governing permissions and
14-
* limitations under the License.
15-
*/
16-
package rx.util.functions;
17-
18-
/**
19-
* A seven-argument action.
20-
*/
21-
public interface Action7<T1, T2, T3, T4, T5, T6, T7> extends Action {
22-
void call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7);
23-
}
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.util.functions;
17+
18+
/**
19+
* A seven-argument action.
20+
*/
21+
public interface Action7<T1, T2, T3, T4, T5, T6, T7> extends Action {
22+
void call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7);
23+
}
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,23 @@
1-
/**
2-
* Copyright 2013 Netflix, Inc.
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License");
5-
* you may not use this file except in compliance with the License.
6-
* You may obtain a copy of the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS,
12-
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
* See the License for the specific language governing permissions and
14-
* limitations under the License.
15-
*/
16-
package rx.util.functions;
17-
18-
/**
19-
* An eight-argument action.
20-
*/
21-
public interface Action8<T1, T2, T3, T4, T5, T6, T7, T8> extends Action {
22-
void call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8);
23-
}
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.util.functions;
17+
18+
/**
19+
* An eight-argument action.
20+
*/
21+
public interface Action8<T1, T2, T3, T4, T5, T6, T7, T8> extends Action {
22+
void call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8);
23+
}
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,23 @@
1-
/**
2-
* Copyright 2013 Netflix, Inc.
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License");
5-
* you may not use this file except in compliance with the License.
6-
* You may obtain a copy of the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS,
12-
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
* See the License for the specific language governing permissions and
14-
* limitations under the License.
15-
*/
16-
package rx.util.functions;
17-
18-
/**
19-
* A nine-argument action.
20-
*/
21-
public interface Action9<T1, T2, T3, T4, T5, T6, T7, T8, T9> extends Action {
22-
void call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8, T9 t9);
23-
}
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.util.functions;
17+
18+
/**
19+
* A nine-argument action.
20+
*/
21+
public interface Action9<T1, T2, T3, T4, T5, T6, T7, T8, T9> extends Action {
22+
void call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8, T9 t9);
23+
}
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,23 @@
1-
/**
2-
* Copyright 2013 Netflix, Inc.
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License");
5-
* you may not use this file except in compliance with the License.
6-
* You may obtain a copy of the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS,
12-
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
* See the License for the specific language governing permissions and
14-
* limitations under the License.
15-
*/
16-
package rx.util.functions;
17-
18-
/**
19-
* A vector-argument action.
20-
*/
21-
public interface ActionN extends Action {
22-
void call(Object... args);
23-
}
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.util.functions;
17+
18+
/**
19+
* A vector-argument action.
20+
*/
21+
public interface ActionN extends Action {
22+
void call(Object... args);
23+
}

‎rxjava-core/src/main/java/rx/util/functions/Actions.java

+359-359
Large diffs are not rendered by default.

‎rxjava-core/src/main/java/rx/util/functions/Async.java

+957-957
Large diffs are not rendered by default.

‎rxjava-core/src/test/java/rx/operators/OperationJoinsTest.java

+382-382
Large diffs are not rendered by default.

‎rxjava-core/src/test/java/rx/operators/OperationToMapTest.java

+215-215
Large diffs are not rendered by default.

‎rxjava-core/src/test/java/rx/operators/OperationToMultimapTest.java

+250-250
Large diffs are not rendered by default.

‎rxjava-core/src/test/java/rx/subscriptions/SerialSubscriptionTests.java

+211-211
Large diffs are not rendered by default.

‎rxjava-core/src/test/java/rx/util/functions/AsyncTest.java

+653-653
Large diffs are not rendered by default.

0 commit comments

Comments
 (0)
Please sign in to comment.