Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions .github/workflows/main-java.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
name: Java Institutional CI

on:
push:
branches: [ "main", "feature/*" ]
pull_request:
branches: [ "main" ]

jobs:
build:
name: Build and Test (JDK 21)
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4

- name: Set up JDK 21
uses: actions/setup-java@v4
with:
java-version: '21'
distribution: 'temurin'
cache: maven

- name: Build with Maven
run: mvn -B package --file pom.xml

- name: Run Tests
run: mvn test

- name: Security Check (Snyk)
uses: snyk/actions/maven@master
continue-on-error: true
env:
SNYK_TOKEN: ${{ secrets.SNYK_TOKEN }}
21 changes: 0 additions & 21 deletions .github/workflows/main.yml

This file was deleted.

10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
# Castle Trade OMS Java Core

[![Java 21](https://img.shields.io/badge/Java-21-orange.svg)](https://www.oracle.com/java/technologies/javase/jdk21-archive-downloads.html)
[![Spring Boot 3.2](https://img.shields.io/badge/Spring_Boot-3.2-brightgreen.svg)](https://spring.io/projects/spring-boot)
[![Engineering Quality](https://img.shields.io/badge/Quality-Institutional-gold.svg)](#)
[![Performance](https://img.shields.io/badge/Latency-Sub--ms-blue.svg)](#)

## Institutional Order Management System (OMS)

This repository contains the core Order Management System (OMS) for Castle Trade LLC, built using Java 21, Spring Boot 3.2+, and Reactive Programming (Project Reactor). The system is designed for high-throughput, low-latency institutional trade execution.

### Key Performance Improvements

- **Memory Optimization (Object Pooling)**: Integrated a custom `OrderPool` implementation to minimize GC pressure during high-throughput order bursts.
- **Reactive Lifecycle Management**: Zero-allocation approach for high-frequency order objects using pre-allocated memory buffers.

### Architectural Pillars

- **Java 21 Virtual Threads (Project Loom)**: Leveraged to handle massive concurrency without the overhead of carrier threads, ensuring the OMS remains responsive under peak load.
Expand Down
64 changes: 49 additions & 15 deletions docs/architecture.md
Original file line number Diff line number Diff line change
@@ -1,21 +1,55 @@
# Institutional OMS Architecture
# 🏛️ Institutional OMS Architecture

## Conceptual Design
## 1. High-Level Design
The `trade-oms-java-core` follows a **Clean Hexagonal Architecture**, optimized for the **Java 21 Virtual Threads** and **Project Reactor** ecosystems.

The `trade-oms-java-core` is architected as a cloud-native microservice following the **Hexagonal Architecture (Ports and Adapters)** pattern. This ensures that the core trading logic remains independent of external technologies such as databases, messaging queues, or UI frameworks.
```mermaid
graph TD
subgraph "External Adapters"
REST["REST API (WebFlux)"]
WS["WebSocket Ingest"]
end

### Reactive Execution Pipeline
subgraph "Application Core"
direction TB
PortIn["SubmitOrderUseCase (Port)"]
Service["OrderService (Domain Service)"]
Pool["OrderPool (Memory Management)"]
VThreads["Virtual Threat Executor"]
end

1. **Inbound Port**: Orders enter via REST/WebSocket (WebFlux).
2. **Domain Layer**: The `Order` entity undergoes institutional validation.
3. **Execution Service**: The `OrderService` handles state transitions.
4. **Concurrency Model**:
- **Non-blocking I/O**: For all external network/DB calls.
- **Virtual Threads**: For processing-intensive or legacy-blocking components, allowing for linear scalability without thread-pool exhaustion.
subgraph "Domain Model"
Order["Order Entity"]
end

### Component Diagram (Conceptual)
- **API Adapter** (Spring WebFlux) -> **SubmitOrder Port** -> **OrderService** (Domain Logic) -> **Repository Port** (R2DBC Adapter).
subgraph "Infrastructure Adapters"
DB["PostgreSQL (R2DBC)"]
Audit["Audit Log (File/Cloud)"]
end

## Scalability and Resilience
- **Horizontally Scalable**: Stateless design allows for immediate scaling behind a load balancer.
- **Resilience**: Integrated circuit breakers and back-pressure handling via Project Reactor.
REST --> PortIn
WS --> PortIn
PortIn --> Service
Service --> Pool
Service --> VThreads
Service --> Order
VThreads --> DB
Service --> Audit
```

## 2. Low-Latency Memory Strategy
To maintain sub-millisecond execution times under heavy load, the system utilizes a **Zero-Allocation** strategy for hot-path objects.

- **Object Pooling**: Instead of relying on the JVM Garbage Collector for every order, we borrow `Order` instances from a pre-allocated `OrderPool`.
- **Reactive Lifecycle**: Objects are automatically returned to the pool using `.doFinally()` hooks in the Reactor stream.

## 3. Concurrency Matrix
| Component | Engine | Strategy |
| :--- | :--- | :--- |
| API Ingest | WebFlux | Event Loop (Non-blocking) |
| Order Validation | Project Reactor | Functional / Stateless |
| Execution Logic | Virtual Threads | Thread-per-request (Lightweight) |
| Database I/O | R2DBC | Asynchronous |

---
*Engineering standard: Castle Trade LLC Institutional Framework.*
10 changes: 10 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.asyncer</groupId>
<artifactId>r2dbc-mysql</artifactId>
<version>1.1.3</version>
</dependency>
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-h2</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
19 changes: 19 additions & 0 deletions src/main/java/com/castletrade/oms/core/domain/model/Order.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.castletrade.oms.core.domain.model;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.math.BigDecimal;
import java.time.LocalDateTime;

Expand All @@ -10,6 +12,8 @@
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Order {
private String id;
private String symbol;
Expand All @@ -21,6 +25,21 @@ public class Order {
private OrderStatus status;
private LocalDateTime timestamp;

/**
* Resets the order state for reuse in an object pool.
*/
public void clear() {
this.id = null;
this.symbol = null;
this.assetClass = null;
this.side = null;
this.quantity = null;
this.price = null;
this.executionType = null;
this.status = null;
this.timestamp = null;
Comment on lines +31 to +40
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): Considera restringir la visibilidad de clear() para limitar su mal uso fuera de los contextos de pool.

Como método público, clear() permite que cualquier consumidor reinicie un Order que puede seguir en uso, lo que puede causar errores sutiles (por ejemplo, entradas en colecciones que de repente pierden sus campos de identidad). Dado que existe para el uso con el pool, considera hacerlo con visibilidad de paquete (package‑private) o limitar su visibilidad de otra forma de modo que solo OrderPool pueda llamarlo.

Implementación sugerida:

    /**
     * Resets the order state for reuse in an object pool.
     * Package-private to prevent misuse outside pooling contexts.
     */
    void clear() {
        this.id = null;
        this.symbol = null;
        this.assetClass = null;
        this.side = null;
        this.quantity = null;
        this.price = null;
  1. Asegúrate de que cualquier código (por ejemplo, OrderPool) que invoque clear() resida en el mismo paquete com.castletrade.oms.core.domain.model o ajusta su paquete en consecuencia.
  2. Si hay tests que llaman a order.clear() desde un paquete diferente, mueve esos tests al mismo paquete o accede a través de la abstracción de pooling en lugar de llamar a clear() directamente.
Original comment in English

suggestion (bug_risk): Consider restricting clear() visibility to limit misuse outside pooling contexts.

As a public method, clear() lets any caller reset an Order that may still be in use, which can cause subtle bugs (e.g., entries in collections suddenly losing their identity fields). Since it exists for pooling, consider making it package‑private or otherwise limiting visibility so only OrderPool can call it.

Suggested implementation:

    /**
     * Resets the order state for reuse in an object pool.
     * Package-private to prevent misuse outside pooling contexts.
     */
    void clear() {
        this.id = null;
        this.symbol = null;
        this.assetClass = null;
        this.side = null;
        this.quantity = null;
        this.price = null;
  1. Ensure any code (e.g., OrderPool) that invokes clear() resides in the same package com.castletrade.oms.core.domain.model or adjust its package accordingly.
  2. If there are tests calling order.clear() from a different package, either move those tests into the same package or access it via the pooling abstraction instead of directly calling clear().

}

public enum AssetClass {
EQUITY, FX, CRYPTO, FIXED_INCOME
}
Expand Down
61 changes: 61 additions & 0 deletions src/main/java/com/castletrade/oms/core/domain/model/OrderPool.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.castletrade.oms.core.domain.model;

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
* High-performance object pool for Order objects.
* Designed to minimize GC pressure in high-frequency trading scenarios.
*/
@Slf4j
@Component
public class OrderPool {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Considera introducir una interfaz OrderProvider y una implementación alternativa sencilla para que el pooling quede oculto tras una abstracción y pueda sustituirse sin tocar el código consumidor.

Puedes mantener la optimización de pooling pero desacoplarla del dominio y facilitar su sustitución por una implementación más simple.

1. Introduce una interfaz OrderProvider

Define una abstracción mínima de la que dependan los consumidores en lugar de depender directamente de OrderPool:

package com.castletrade.oms.core.domain.model;

public interface OrderProvider {
    Order borrowOrder();
    void returnOrder(Order order);
}

Después, adapta OrderPool para que la implemente:

@Slf4j
@Component
public class OrderPool implements OrderProvider {
    // existing implementation unchanged
}

Todos los consumidores actuales deberían actualizarse para depender de OrderProvider en lugar de OrderPool:

// before
private final OrderPool orderPool;

// after
private final OrderProvider orderProvider;
// before
Order order = orderPool.borrowOrder();
orderPool.returnOrder(order);

// after
Order order = orderProvider.borrowOrder();
orderProvider.returnOrder(order);

2. Añade una implementación trivial sin pool

Puedes proporcionar una implementación sencilla, basada solo en asignaciones, para casos en los que el pooling no sea necesario (o para tests), sin tocar la lógica existente:

@Component
@Primary // if you want this as the default
public class SimpleOrderProvider implements OrderProvider {

    @Override
    public Order borrowOrder() {
        return new Order();
    }

    @Override
    public void returnOrder(Order order) {
        // nothing to do
    }
}

Después, OrderPool puede usarse solo donde realmente se necesite, o habilitarse mediante configuración/perfil:

@Component
@Profile("pooled-orders")
public class OrderPool implements OrderProvider {
    // current implementation
}

Esto mantiene intacto todo el comportamiento actual de OrderPool, pero:

  • Mueve la complejidad detrás de una interfaz estrecha.
  • Te permite mantener el modelo de dominio centrado en Order en lugar de en detalles de pooling.
  • Permite simplificar o incluso eliminar el pooling más adelante sin tocar los puntos de uso.
Original comment in English

issue (complexity): Consider introducing an OrderProvider interface and simple alternative implementation so that pooling is hidden behind an abstraction and can be swapped out without touching calling code.

You can keep the pooling optimization but decouple it from the domain and make it easier to swap out with a simpler implementation.

1. Introduce an OrderProvider interface

Define a minimal abstraction that callers depend on instead of OrderPool directly:

package com.castletrade.oms.core.domain.model;

public interface OrderProvider {
    Order borrowOrder();
    void returnOrder(Order order);
}

Then adapt OrderPool to implement this:

@Slf4j
@Component
public class OrderPool implements OrderProvider {
    // existing implementation unchanged
}

All current consumers should be updated to depend on OrderProvider instead of OrderPool:

// before
private final OrderPool orderPool;

// after
private final OrderProvider orderProvider;
// before
Order order = orderPool.borrowOrder();
orderPool.returnOrder(order);

// after
Order order = orderProvider.borrowOrder();
orderProvider.returnOrder(order);

2. Add a trivial, non-pooled implementation

You can provide a simple, allocation-only implementation for cases where pooling isn’t needed (or for tests), without touching existing logic:

@Component
@Primary // if you want this as the default
public class SimpleOrderProvider implements OrderProvider {

    @Override
    public Order borrowOrder() {
        return new Order();
    }

    @Override
    public void returnOrder(Order order) {
        // nothing to do
    }
}

Then OrderPool can be used only where really needed, or enabled via configuration/profile:

@Component
@Profile("pooled-orders")
public class OrderPool implements OrderProvider {
    // current implementation
}

This keeps all current OrderPool behavior intact but:

  • Moves complexity behind a narrow interface.
  • Lets you keep the domain model focused on Order rather than pooling details.
  • Allows you to simplify or even remove pooling later without touching call sites.

private final BlockingQueue<Order> pool;
private static final int DEFAULT_POOL_SIZE = 1000;

public OrderPool() {
this(DEFAULT_POOL_SIZE);
}

public OrderPool(int size) {
this.pool = new ArrayBlockingQueue<>(size);
for (int i = 0; i < size; i++) {
pool.add(new Order());
}
log.info("Initialized OrderPool with {} pre-allocated objects", size);
}

/**
* Borrows an order from the pool.
*/
public Order borrowOrder() {
Order order = pool.poll();
if (order == null) {
log.warn("OrderPool exhausted, creating new transient object. Consider increasing pool size.");
return new Order();
}
Comment on lines +31 to +39
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (performance): Usar WARN para el agotamiento del pool en una ruta caliente puede generar un exceso de ruido en los logs bajo carga.

En escenarios de alto rendimiento, esta condición puede producirse con frecuencia, así que WARN aquí puede tanto ralentizar la ruta caliente como inundar los logs. Considera bajar el nivel a INFO/DEBUG o añadir limitación de frecuencia (por ejemplo, registrar cada N ocurrencias o en un intervalo de tiempo) para mantener la observabilidad sin un E/S de logs excesivo.

Suggested change
/**
* Borrows an order from the pool.
*/
public Order borrowOrder() {
Order order = pool.poll();
if (order == null) {
log.warn("OrderPool exhausted, creating new transient object. Consider increasing pool size.");
return new Order();
}
/**
* Borrows an order from the pool.
*/
public Order borrowOrder() {
Order order = pool.poll();
if (order == null) {
log.debug("OrderPool exhausted, creating new transient object. Consider increasing pool size.");
return new Order();
}
Original comment in English

suggestion (performance): Using WARN for pool exhaustion in a hot path may generate excessive log noise under load.

In high-throughput scenarios, this condition may be hit frequently, so WARN here can both slow the hot path and flood logs. Consider lowering the level to INFO/DEBUG or adding throttling (e.g., log every Nth occurrence or on a time interval) to preserve observability without excessive log I/O.

Suggested change
/**
* Borrows an order from the pool.
*/
public Order borrowOrder() {
Order order = pool.poll();
if (order == null) {
log.warn("OrderPool exhausted, creating new transient object. Consider increasing pool size.");
return new Order();
}
/**
* Borrows an order from the pool.
*/
public Order borrowOrder() {
Order order = pool.poll();
if (order == null) {
log.debug("OrderPool exhausted, creating new transient object. Consider increasing pool size.");
return new Order();
}

return order;
}

/**
* Returns an order to the pool after resetting its state.
*/
public void returnOrder(Order order) {
if (order == null) return;
order.clear();
boolean accepted = pool.offer(order);
if (!accepted) {
log.debug("OrderPool full, discarding order object.");
Comment on lines +46 to +51
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Devolver la misma instancia de Order varias veces puede llevar a múltiples referencias al mismo objeto en el pool.

Dado que ArrayBlockingQueue permite duplicados, el mismo Order puede añadirse al pool varias veces si se llama a returnOrder dos veces sobre él (o sobre un objeto que nunca se pidió prestado). Eso puede llevar a que la misma instancia mutable se entregue a múltiples consumidores simultáneamente. Si esto es posible en tu caso de uso, añade una protección (por ejemplo, un flag inPool en Order o una pequeña estructura de seguimiento) para evitar devoluciones dobles u objetos externos que se devuelvan.

Original comment in English

issue (bug_risk): Returning the same Order instance multiple times can lead to multiple references to the same object in the pool.

Since ArrayBlockingQueue allows duplicates, the same Order can be added to the pool multiple times if returnOrder is called twice on it (or on an object never borrowed). That can lead to the same mutable instance being handed to multiple borrowers concurrently. If this is possible in your usage, add a guard (e.g., an inPool flag on Order or a small tracking structure) to prevent double returns or foreign objects being returned.

}
}

/**
* Returns the current number of available objects in the pool.
*/
public int getAvailableCount() {
return pool.size();
}
}
24 changes: 19 additions & 5 deletions src/main/java/com/castletrade/oms/core/service/OrderService.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.castletrade.oms.core.service;

import com.castletrade.oms.core.domain.model.Order;
import com.castletrade.oms.core.domain.model.OrderPool;
import com.castletrade.oms.core.port.in.SubmitOrderUseCase;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
Expand All @@ -12,23 +14,35 @@
import java.util.concurrent.Executors;

/**
* Implementation of the Order submission use case using Reactive Streams
* and Java 21 Virtual Threads for specialized processing.
* Implementation of the Order submission use case using Reactive Streams,
* Java 21 Virtual Threads, and an Object Pool for memory optimization.
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class OrderService implements SubmitOrderUseCase {

// Virtual Thread executor for high-concurrency blocking tasks (if any)
private final var virtualThreadExecutor = Executors.newVirtualThreadPerTaskExecutor();
private final OrderPool orderPool;

// Virtual Thread executor for high-concurrency blocking tasks
private final java.util.concurrent.ExecutorService virtualThreadExecutor = Executors.newVirtualThreadPerTaskExecutor();

@Override
public Mono<Order> submitOrder(Order order) {
// In a real high-throughput scenario, the 'order' passed here
// would be a DTO, and we would borrow from the pool here.
log.debug("Entering order submission pipeline for symbol: {}", order.getSymbol());

return Mono.just(order)
.map(this::initializeOrder)
.flatMap(this::validateOrder)
.publishOn(Schedulers.fromExecutor(virtualThreadExecutor)) // Offloading to Virtual Threads
.publishOn(Schedulers.fromExecutor(virtualThreadExecutor))
.doOnNext(this::processExecution)
.doFinally(signalType -> {
// Demonstrating the return to pool after the reactive stream completes/errors
log.debug("Returning order resources to pool. Signal: {}", signalType);
orderPool.returnOrder(order);
})
.subscribeOn(Schedulers.boundedElastic());
}

Expand Down
13 changes: 13 additions & 0 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
spring:
application:
name: trade-oms-java-core
threads:
virtual:
enabled: true
r2dbc:
url: r2dbc:h2:mem:///testdb;DB_CLOSE_DELAY=-1
username: sa
password:
sql:
init:
mode: always
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.castletrade.oms.core.domain.model;

import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.*;

class OrderPoolTest {
Comment thread
sourcery-ai[bot] marked this conversation as resolved.

@Test
void testBorrowAndReturn() {
OrderPool pool = new OrderPool(10);
assertEquals(10, pool.getAvailableCount());

Order order = pool.borrowOrder();
assertNotNull(order);
assertEquals(9, pool.getAvailableCount());

order.setSymbol("BTC/USD");
Comment thread
sourcery-ai[bot] marked this conversation as resolved.
pool.returnOrder(order);
Comment thread
sourcery-ai[bot] marked this conversation as resolved.

assertEquals(10, pool.getAvailableCount());

Order reborrowed = pool.borrowOrder();
assertNull(reborrowed.getSymbol(), "Order should be cleared when returned to pool");
}

@Test
void testPoolExhaustion() {
OrderPool pool = new OrderPool(1);
pool.borrowOrder();

Order exhausted = pool.borrowOrder();
assertNotNull(exhausted, "Should create fresh object when pool is empty");
assertEquals(0, pool.getAvailableCount());
}
}
Loading