38
38
import java .util .Optional ;
39
39
import java .util .concurrent .CompletableFuture ;
40
40
import java .util .concurrent .ConcurrentHashMap ;
41
- import java .util .concurrent .ForkJoinPool ;
42
41
import java .util .concurrent .TimeUnit ;
43
42
import java .util .concurrent .atomic .AtomicInteger ;
44
43
import java .util .function .BiConsumer ;
@@ -84,7 +83,7 @@ private record GetSetMethodFunctions(Function<QuoteCb, BigDecimal> getter, BiCon
84
83
private boolean cpuConstraint ;
85
84
private final List <String > nonValueFieldNames = List .of ("_id" , "createdAt" , "class" );
86
85
private final List <PropertyDescriptor > propertyDescriptors ;
87
- private final Scheduler mongoScheduler = Schedulers .newBoundedElastic (10 , 10 , "mongoImport" , 10 );
86
+ private final Scheduler mongoScheduler = Schedulers .newBoundedElastic (6 , 10 , "mongoImport" , 10 );
88
87
@ Value ("${single.instance.deployment:false}" )
89
88
private boolean singleInstanceDeployment ;
90
89
@@ -176,17 +175,17 @@ private String createHourDayAvg() {
176
175
LocalDateTime start = LocalDateTime .now ();
177
176
LOG .info ("CpuConstraint property: " + this .cpuConstraint );
178
177
if (this .cpuConstraint ) {
179
- this .createCbHourlyAvg ( );
180
- this .createCbDailyAvg ( );
178
+ this .createCbIntervalAvg ( false );
179
+ this .createCbIntervalAvg ( true );
181
180
LOG .info (this .serviceUtils .createAvgLogStatement (start , "Prepared Coinbase Data Time:" ));
182
181
} else {
183
182
// This can only be used on machines without cpu constraints.
184
183
CompletableFuture <String > future7 = CompletableFuture .supplyAsync (() -> {
185
- this .createCbHourlyAvg ( );
184
+ this .createCbIntervalAvg ( false );
186
185
return "createCbHourlyAvg() Done." ;
187
186
}, CompletableFuture .delayedExecutor (10 , TimeUnit .SECONDS ));
188
187
CompletableFuture <String > future8 = CompletableFuture .supplyAsync (() -> {
189
- this .createCbDailyAvg ( );
188
+ this .createCbIntervalAvg ( true );
190
189
return "createCbDailyAvg() Done." ;
191
190
}, CompletableFuture .delayedExecutor (10 , TimeUnit .SECONDS ));
192
191
String combined = Stream .of (future7 , future8 ).map (CompletableFuture ::join ).collect (Collectors .joining (" " ));
@@ -195,25 +194,6 @@ private String createHourDayAvg() {
195
194
return "done." ;
196
195
}
197
196
198
- private void createCbHourlyAvg () {
199
- LOG .info ("createCbHourlyAvg()" );
200
- LocalDateTime startAll = LocalDateTime .now ();
201
- MyTimeFrame timeFrame = this .serviceUtils .createTimeFrame (CB_HOUR_COL , QuoteCb .class , true );
202
- Calendar now = Calendar .getInstance ();
203
- now .setTime (Date .from (LocalDate .now ().atStartOfDay ().atZone (ZoneId .systemDefault ()).toInstant ()));
204
- final var timeFrames = this .createTimeFrames (timeFrame , now );
205
- if (this .cpuConstraint ) {
206
- timeFrames .stream ().forEachOrdered (timeFrame1 -> this .processTimeFrame (timeFrame1 , false ));
207
- } else {
208
- try (ForkJoinPool customThreadPool = new ForkJoinPool (2 )) {
209
- customThreadPool .submit (() -> timeFrames .parallelStream ()
210
- .forEachOrdered (timeFrame1 -> this .processTimeFrame (timeFrame1 , false )));
211
- customThreadPool .shutdown ();
212
- }
213
- }
214
- LOG .info (this .serviceUtils .createAvgLogStatement (startAll , "Prepared Coinbase Hourly Data Time:" ));
215
- }
216
-
217
197
private void processTimeFrame (MyTimeFrame timeFrame1 , boolean isDay ) {
218
198
Date start = new Date ();
219
199
SimpleDateFormat sdf = new SimpleDateFormat ("dd.MM.yyyy" );
@@ -238,23 +218,15 @@ private void processTimeFrame(MyTimeFrame timeFrame1, boolean isDay) {
238
218
+ (new Date ().getTime () - start .getTime ()) + "ms" + " 0 < properties: " + nonZeroProperties .get ());
239
219
}
240
220
241
- private void createCbDailyAvg ( ) {
242
- LOG .info ("createCbDailyAvg()" );
221
+ private void createCbIntervalAvg ( boolean isDay ) {
222
+ LOG .info (isDay ? "createCbDailyAvg()" : "createCbHourlyAvg ()" );
243
223
LocalDateTime startAll = LocalDateTime .now ();
244
- final MyTimeFrame timeFrame = this .serviceUtils .createTimeFrame (CB_DAY_COL , QuoteCb .class , false );
224
+ final MyTimeFrame timeFrame = this .serviceUtils .createTimeFrame (isDay ? CB_DAY_COL : CB_HOUR_COL , QuoteCb .class , false );
245
225
final Calendar now = Calendar .getInstance ();
246
226
now .setTime (Date .from (LocalDate .now ().atStartOfDay ().atZone (ZoneId .systemDefault ()).toInstant ()));
247
- final var timeFrames = this .createTimeFrames (timeFrame , now );
248
- if (this .cpuConstraint ) {
249
- timeFrames .stream ().forEachOrdered (timeFrame1 -> this .processTimeFrame (timeFrame1 , true ));
250
- } else {
251
- try (ForkJoinPool customThreadPool = new ForkJoinPool (2 )) {
252
- customThreadPool .submit (() -> timeFrames .parallelStream ()
253
- .forEachOrdered (timeFrame1 -> this .processTimeFrame (timeFrame1 , true )));
254
- customThreadPool .shutdown ();
255
- }
256
- }
257
- LOG .info (this .serviceUtils .createAvgLogStatement (startAll , "Prepared Coinbase Daily Data Time:" ));
227
+ this .createTimeFrames (timeFrame , now ).stream ().forEachOrdered (timeFrame1 -> this .processTimeFrame (timeFrame1 , isDay ));
228
+ var logStmt = String .format ("Prepared Coinbase %s Data Time:" , isDay ? "Daily" : "Hourly" );
229
+ LOG .info (this .serviceUtils .createAvgLogStatement (startAll , logStmt ));
258
230
}
259
231
260
232
private List <MyTimeFrame > createTimeFrames (final MyTimeFrame timeFrame , final Calendar now ) {
0 commit comments