Skip to content

Commit 045a312

Browse files
authored
JAMES-3925 deal with uploaded blobs directly with blobstoreDAO instea… (#2707)
1 parent 22fe565 commit 045a312

File tree

13 files changed

+110
-75
lines changed

13 files changed

+110
-75
lines changed

server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/upload/CassandraUploadRepository.java

+14-10
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
****************************************************************/
1919
package org.apache.james.jmap.cassandra.upload;
2020

21-
import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST;
2221
import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
2322

2423
import java.io.InputStream;
@@ -29,7 +28,8 @@
2928

3029
import jakarta.inject.Inject;
3130

32-
import org.apache.james.blob.api.BlobStore;
31+
import org.apache.james.blob.api.BlobId;
32+
import org.apache.james.blob.api.BlobStoreDAO;
3333
import org.apache.james.blob.api.BucketName;
3434
import org.apache.james.core.Username;
3535
import org.apache.james.jmap.api.model.Upload;
@@ -48,32 +48,36 @@
4848
public class CassandraUploadRepository implements UploadRepository {
4949
public static final BucketName UPLOAD_BUCKET = BucketName.of("jmap-uploads");
5050
private final UploadDAO uploadDAO;
51-
private final BlobStore blobStore;
51+
private final BlobId.Factory blobIdFactory;
52+
private final BlobStoreDAO blobStoreDAO;
5253
private final Clock clock;
5354

5455
@Inject
55-
public CassandraUploadRepository(UploadDAO uploadDAO, BlobStore blobStore, Clock clock) {
56+
public CassandraUploadRepository(UploadDAO uploadDAO, BlobId.Factory blobIdFactory, BlobStoreDAO blobStoreDAO, Clock clock) {
5657
this.uploadDAO = uploadDAO;
57-
this.blobStore = blobStore;
58+
this.blobIdFactory = blobIdFactory;
59+
this.blobStoreDAO = blobStoreDAO;
5860
this.clock = clock;
5961
}
6062

6163
@Override
6264
public Mono<UploadMetaData> upload(InputStream data, ContentType contentType, Username user) {
6365
UploadId uploadId = generateId();
66+
BlobId blobId = blobIdFactory.of(uploadId.asString());
6467

6568
return Mono.fromCallable(() -> new CountingInputStream(data))
66-
.flatMap(countingInputStream -> Mono.from(blobStore.save(UPLOAD_BUCKET, countingInputStream, LOW_COST))
67-
.map(blobId -> new UploadDAO.UploadRepresentation(uploadId, blobId, contentType, countingInputStream.getCount(), user,
69+
.flatMap(countingInputStream -> Mono.from(blobStoreDAO.save(UPLOAD_BUCKET, blobId, countingInputStream))
70+
.thenReturn(countingInputStream))
71+
.map(countingInputStream -> new UploadDAO.UploadRepresentation(uploadId, blobId, contentType, countingInputStream.getCount(), user,
6872
clock.instant().truncatedTo(ChronoUnit.MILLIS)))
6973
.flatMap(upload -> uploadDAO.save(upload)
70-
.thenReturn(upload.toUploadMetaData())));
74+
.thenReturn(upload.toUploadMetaData()));
7175
}
7276

7377
@Override
7478
public Mono<Upload> retrieve(UploadId id, Username user) {
7579
return uploadDAO.retrieve(user, id)
76-
.flatMap(upload -> Mono.from(blobStore.readReactive(UPLOAD_BUCKET, upload.getBlobId(), LOW_COST))
80+
.flatMap(upload -> Mono.from(blobStoreDAO.readReactive(UPLOAD_BUCKET, upload.getBlobId()))
7781
.map(inputStream -> Upload.from(upload.toUploadMetaData(), () -> inputStream)))
7882
.switchIfEmpty(Mono.error(() -> new UploadNotFoundException(id)));
7983
}
@@ -94,7 +98,7 @@ public Mono<Void> deleteByUploadDateBefore(Duration expireDuration) {
9498
Instant expirationTime = clock.instant().minus(expireDuration);
9599
return Flux.from(uploadDAO.all())
96100
.filter(upload -> upload.getUploadDate().isBefore(expirationTime))
97-
.flatMap(upload -> Mono.from(blobStore.delete(UPLOAD_BUCKET, upload.getBlobId()))
101+
.flatMap(upload -> Mono.from(blobStoreDAO.delete(UPLOAD_BUCKET, upload.getBlobId()))
98102
.then(uploadDAO.delete(upload.getUser(), upload.getId())), DEFAULT_CONCURRENCY)
99103
.then();
100104
}

server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/upload/CassandraJMAPCurrentUploadUsageCalculatorTest.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,13 @@
2525
import org.apache.james.backends.cassandra.components.CassandraDataDefinition;
2626
import org.apache.james.backends.cassandra.components.CassandraMutualizedQuotaDataDefinition;
2727
import org.apache.james.backends.cassandra.components.CassandraQuotaCurrentValueDao;
28-
import org.apache.james.blob.api.BucketName;
28+
import org.apache.james.blob.api.BlobId;
2929
import org.apache.james.blob.api.PlainBlobId;
3030
import org.apache.james.blob.memory.MemoryBlobStoreDAO;
3131
import org.apache.james.jmap.api.upload.JMAPCurrentUploadUsageCalculator;
3232
import org.apache.james.jmap.api.upload.JMAPCurrentUploadUsageCalculatorContract;
3333
import org.apache.james.jmap.api.upload.UploadRepository;
3434
import org.apache.james.jmap.api.upload.UploadUsageRepository;
35-
import org.apache.james.server.blob.deduplication.DeDuplicationBlobStore;
3635
import org.junit.jupiter.api.BeforeEach;
3736
import org.junit.jupiter.api.extension.RegisterExtension;
3837

@@ -48,9 +47,9 @@ public class CassandraJMAPCurrentUploadUsageCalculatorTest implements JMAPCurren
4847
@BeforeEach
4948
private void setup() {
5049
Clock clock = Clock.systemUTC();
50+
BlobId.Factory blobIdFactory = new PlainBlobId.Factory();
5151
uploadRepository = new CassandraUploadRepository(new UploadDAO(cassandraCluster.getCassandraCluster().getConf(),
52-
new PlainBlobId.Factory()), new DeDuplicationBlobStore(new MemoryBlobStoreDAO(), BucketName.of("default"), new PlainBlobId.Factory()),
53-
clock);
52+
blobIdFactory), blobIdFactory, new MemoryBlobStoreDAO(), clock);
5453
uploadUsageRepository = new CassandraUploadUsageRepository(new CassandraQuotaCurrentValueDao(cassandraCluster.getCassandraCluster().getConf()));
5554
jmapCurrentUploadUsageCalculator = new JMAPCurrentUploadUsageCalculator(uploadRepository, uploadUsageRepository);
5655
}

server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/upload/CassandraUploadRepositoryTest.java

+11-4
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,13 @@
2222
import java.time.Clock;
2323

2424
import org.apache.james.backends.cassandra.CassandraClusterExtension;
25-
import org.apache.james.blob.api.BucketName;
25+
import org.apache.james.blob.api.BlobId;
26+
import org.apache.james.blob.api.BlobStoreDAO;
2627
import org.apache.james.blob.api.PlainBlobId;
2728
import org.apache.james.blob.memory.MemoryBlobStoreDAO;
2829
import org.apache.james.jmap.api.model.UploadId;
2930
import org.apache.james.jmap.api.upload.UploadRepository;
3031
import org.apache.james.jmap.api.upload.UploadRepositoryContract;
31-
import org.apache.james.server.blob.deduplication.DeDuplicationBlobStore;
3232
import org.apache.james.utils.UpdatableTickingClock;
3333
import org.junit.jupiter.api.BeforeEach;
3434
import org.junit.jupiter.api.Disabled;
@@ -39,15 +39,17 @@
3939
class CassandraUploadRepositoryTest implements UploadRepositoryContract {
4040
@RegisterExtension
4141
static CassandraClusterExtension cassandra = new CassandraClusterExtension(UploadDataDefinition.MODULE);
42+
private BlobStoreDAO blobStoreDAO;
4243
private CassandraUploadRepository testee;
4344
private UpdatableTickingClock clock;
4445

4546
@BeforeEach
4647
void setUp() {
4748
clock = new UpdatableTickingClock(Clock.systemUTC().instant());
49+
BlobId.Factory blobIdFactory = new PlainBlobId.Factory();
50+
blobStoreDAO = new MemoryBlobStoreDAO();
4851
testee = new CassandraUploadRepository(new UploadDAO(cassandra.getCassandraCluster().getConf(),
49-
new PlainBlobId.Factory()), new DeDuplicationBlobStore(new MemoryBlobStoreDAO(), BucketName.of("default"), new PlainBlobId.Factory()),
50-
clock);
52+
blobIdFactory), blobIdFactory, blobStoreDAO, clock);
5153
}
5254

5355
@Override
@@ -77,4 +79,9 @@ public void deleteShouldReturnFalseWhenRowDoesNotExist() {
7779
public UpdatableTickingClock clock() {
7880
return clock;
7981
}
82+
83+
@Override
84+
public BlobStoreDAO blobStoreDAO() {
85+
return blobStoreDAO;
86+
}
8087
}

server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/upload/CassandraUploadServiceTest.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import org.apache.james.backends.cassandra.components.CassandraDataDefinition;
2727
import org.apache.james.backends.cassandra.components.CassandraMutualizedQuotaDataDefinition;
2828
import org.apache.james.backends.cassandra.components.CassandraQuotaCurrentValueDao;
29-
import org.apache.james.blob.api.BucketName;
29+
import org.apache.james.blob.api.BlobId;
3030
import org.apache.james.blob.api.PlainBlobId;
3131
import org.apache.james.blob.memory.MemoryBlobStoreDAO;
3232
import org.apache.james.jmap.api.upload.UploadRepository;
@@ -35,7 +35,6 @@
3535
import org.apache.james.jmap.api.upload.UploadServiceDefaultImpl;
3636
import org.apache.james.jmap.api.upload.UploadUsageRepository;
3737
import org.apache.james.mailbox.cassandra.modules.CassandraMailboxQuotaDataDefinition;
38-
import org.apache.james.server.blob.deduplication.DeDuplicationBlobStore;
3938
import org.junit.jupiter.api.BeforeEach;
4039
import org.junit.jupiter.api.extension.RegisterExtension;
4140

@@ -51,8 +50,9 @@ class CassandraUploadServiceTest implements UploadServiceContract {
5150
@BeforeEach
5251
void setUp(CassandraCluster cassandraCluster) {
5352
Clock clock = Clock.systemUTC();
54-
uploadRepository = new CassandraUploadRepository(new UploadDAO(cassandraCluster.getConf(), new PlainBlobId.Factory()), new DeDuplicationBlobStore(new MemoryBlobStoreDAO(),
55-
BucketName.of("default"), new PlainBlobId.Factory()), clock);
53+
BlobId.Factory blobIdFactory = new PlainBlobId.Factory();
54+
uploadRepository = new CassandraUploadRepository(new UploadDAO(cassandraCluster.getConf(),
55+
blobIdFactory), blobIdFactory, new MemoryBlobStoreDAO(), clock);
5656
uploadUsageRepository = new CassandraUploadUsageRepository(new CassandraQuotaCurrentValueDao(cassandraCluster.getConf()));
5757
testee = new UploadServiceDefaultImpl(uploadRepository, uploadUsageRepository, UploadServiceContract.TEST_CONFIGURATION());
5858
}

server/data/data-jmap-postgres/src/main/java/org/apache/james/jmap/postgres/upload/PostgresUploadRepository.java

+15-10
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
package org.apache.james.jmap.postgres.upload;
2121

2222
import static org.apache.james.backends.postgres.PostgresCommons.INSTANT_TO_LOCAL_DATE_TIME;
23-
import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST;
2423
import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
2524

2625
import java.io.InputStream;
@@ -31,7 +30,8 @@
3130
import jakarta.inject.Inject;
3231
import jakarta.inject.Singleton;
3332

34-
import org.apache.james.blob.api.BlobStore;
33+
import org.apache.james.blob.api.BlobId;
34+
import org.apache.james.blob.api.BlobStoreDAO;
3535
import org.apache.james.blob.api.BucketName;
3636
import org.apache.james.core.Username;
3737
import org.apache.james.jmap.api.model.Upload;
@@ -49,17 +49,19 @@
4949

5050
public class PostgresUploadRepository implements UploadRepository {
5151
public static final BucketName UPLOAD_BUCKET = BucketName.of("jmap-uploads");
52-
private final BlobStore blobStore;
52+
private final BlobId.Factory blobIdFactory;
53+
private final BlobStoreDAO blobStoreDAO;
5354
private final Clock clock;
5455
private final PostgresUploadDAO.Factory uploadDAOFactory;
5556
private final PostgresUploadDAO byPassRLSUploadDAO;
5657

5758
@Inject
5859
@Singleton
59-
public PostgresUploadRepository(BlobStore blobStore, Clock clock,
60+
public PostgresUploadRepository(BlobId.Factory blobIdFactory, BlobStoreDAO blobStoreDAO, Clock clock,
6061
PostgresUploadDAO.Factory uploadDAOFactory,
6162
PostgresUploadDAO byPassRLSUploadDAO) {
62-
this.blobStore = blobStore;
63+
this.blobIdFactory = blobIdFactory;
64+
this.blobStoreDAO = blobStoreDAO;
6365
this.clock = clock;
6466
this.uploadDAOFactory = uploadDAOFactory;
6567
this.byPassRLSUploadDAO = byPassRLSUploadDAO;
@@ -68,17 +70,20 @@ public PostgresUploadRepository(BlobStore blobStore, Clock clock,
6870
@Override
6971
public Mono<UploadMetaData> upload(InputStream data, ContentType contentType, Username user) {
7072
UploadId uploadId = generateId();
73+
BlobId blobId = blobIdFactory.of(uploadId.asString());
7174
PostgresUploadDAO uploadDAO = uploadDAOFactory.create(user.getDomainPart());
75+
7276
return Mono.fromCallable(() -> new CountingInputStream(data))
73-
.flatMap(countingInputStream -> Mono.from(blobStore.save(UPLOAD_BUCKET, countingInputStream, LOW_COST))
74-
.map(blobId -> UploadMetaData.from(uploadId, contentType, countingInputStream.getCount(), blobId, clock.instant()))
75-
.flatMap(uploadMetaData -> uploadDAO.insert(uploadMetaData, user)));
77+
.flatMap(countingInputStream -> Mono.from(blobStoreDAO.save(UPLOAD_BUCKET, blobId, countingInputStream))
78+
.thenReturn(countingInputStream))
79+
.map(countingInputStream -> UploadMetaData.from(uploadId, contentType, countingInputStream.getCount(), blobId, clock.instant()))
80+
.flatMap(uploadMetaData -> uploadDAO.insert(uploadMetaData, user));
7681
}
7782

7883
@Override
7984
public Mono<Upload> retrieve(UploadId id, Username user) {
8085
return uploadDAOFactory.create(user.getDomainPart()).get(id, user)
81-
.flatMap(upload -> Mono.from(blobStore.readReactive(UPLOAD_BUCKET, upload.blobId(), LOW_COST))
86+
.flatMap(upload -> Mono.from(blobStoreDAO.readReactive(UPLOAD_BUCKET, upload.blobId()))
8287
.map(inputStream -> Upload.from(upload, () -> inputStream)))
8388
.switchIfEmpty(Mono.error(() -> new UploadNotFoundException(id)));
8489
}
@@ -101,7 +106,7 @@ public Mono<Void> deleteByUploadDateBefore(Duration expireDuration) {
101106
.flatMap(uploadPair -> {
102107
Username username = uploadPair.getRight();
103108
UploadMetaData upload = uploadPair.getLeft();
104-
return Mono.from(blobStore.delete(UPLOAD_BUCKET, upload.blobId()))
109+
return Mono.from(blobStoreDAO.delete(UPLOAD_BUCKET, upload.blobId()))
105110
.then(byPassRLSUploadDAO.delete(upload.uploadId(), username));
106111
}, DEFAULT_CONCURRENCY)
107112
.then();

server/data/data-jmap-postgres/src/test/java/org/apache/james/jmap/postgres/upload/PostgresUploadRepositoryTest.java

+9-5
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,11 @@
2424
import org.apache.james.backends.postgres.PostgresDataDefinition;
2525
import org.apache.james.backends.postgres.PostgresExtension;
2626
import org.apache.james.blob.api.BlobId;
27-
import org.apache.james.blob.api.BlobStore;
28-
import org.apache.james.blob.api.BucketName;
27+
import org.apache.james.blob.api.BlobStoreDAO;
2928
import org.apache.james.blob.api.PlainBlobId;
3029
import org.apache.james.blob.memory.MemoryBlobStoreDAO;
3130
import org.apache.james.jmap.api.upload.UploadRepository;
3231
import org.apache.james.jmap.api.upload.UploadRepositoryContract;
33-
import org.apache.james.server.blob.deduplication.DeDuplicationBlobStore;
3432
import org.apache.james.utils.UpdatableTickingClock;
3533
import org.junit.jupiter.api.BeforeEach;
3634
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -40,17 +38,18 @@ class PostgresUploadRepositoryTest implements UploadRepositoryContract {
4038
@RegisterExtension
4139
static PostgresExtension postgresExtension = PostgresExtension.withoutRowLevelSecurity(
4240
PostgresDataDefinition.aggregateModules(PostgresUploadDataDefinition.MODULE));
41+
private BlobStoreDAO blobStoreDAO;
4342
private UploadRepository testee;
4443
private UpdatableTickingClock clock;
4544

4645
@BeforeEach
4746
void setUp() {
4847
clock = new UpdatableTickingClock(Clock.systemUTC().instant());
4948
BlobId.Factory blobIdFactory = new PlainBlobId.Factory();
50-
BlobStore blobStore = new DeDuplicationBlobStore(new MemoryBlobStoreDAO(), BucketName.DEFAULT, blobIdFactory);
5149
PostgresUploadDAO uploadDAO = new PostgresUploadDAO(postgresExtension.getDefaultPostgresExecutor(), blobIdFactory);
5250
PostgresUploadDAO.Factory uploadFactory = new PostgresUploadDAO.Factory(blobIdFactory, postgresExtension.getExecutorFactory());
53-
testee = new PostgresUploadRepository(blobStore, clock, uploadFactory, uploadDAO);
51+
blobStoreDAO = new MemoryBlobStoreDAO();
52+
testee = new PostgresUploadRepository(blobIdFactory, blobStoreDAO, clock, uploadFactory, uploadDAO);
5453
}
5554

5655
@Override
@@ -62,4 +61,9 @@ public UploadRepository testee() {
6261
public UpdatableTickingClock clock() {
6362
return clock;
6463
}
64+
65+
@Override
66+
public BlobStoreDAO blobStoreDAO() {
67+
return blobStoreDAO;
68+
}
6569
}

server/data/data-jmap-postgres/src/test/java/org/apache/james/jmap/postgres/upload/PostgresUploadServiceTest.java

+1-5
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,13 @@
2626
import org.apache.james.backends.postgres.quota.PostgresQuotaCurrentValueDAO;
2727
import org.apache.james.backends.postgres.quota.PostgresQuotaDataDefinition;
2828
import org.apache.james.blob.api.BlobId;
29-
import org.apache.james.blob.api.BlobStore;
30-
import org.apache.james.blob.api.BucketName;
3129
import org.apache.james.blob.api.PlainBlobId;
3230
import org.apache.james.blob.memory.MemoryBlobStoreDAO;
3331
import org.apache.james.jmap.api.upload.UploadRepository;
3432
import org.apache.james.jmap.api.upload.UploadService;
3533
import org.apache.james.jmap.api.upload.UploadServiceContract;
3634
import org.apache.james.jmap.api.upload.UploadServiceDefaultImpl;
3735
import org.apache.james.jmap.api.upload.UploadUsageRepository;
38-
import org.apache.james.server.blob.deduplication.DeDuplicationBlobStore;
3936
import org.junit.jupiter.api.BeforeEach;
4037
import org.junit.jupiter.api.extension.RegisterExtension;
4138

@@ -52,10 +49,9 @@ public class PostgresUploadServiceTest implements UploadServiceContract {
5249
@BeforeEach
5350
void setUp() {
5451
BlobId.Factory blobIdFactory = new PlainBlobId.Factory();
55-
BlobStore blobStore = new DeDuplicationBlobStore(new MemoryBlobStoreDAO(), BucketName.DEFAULT, blobIdFactory);
5652
PostgresUploadDAO uploadDAO = new PostgresUploadDAO(postgresExtension.getDefaultPostgresExecutor(), blobIdFactory);
5753
PostgresUploadDAO.Factory uploadFactory = new PostgresUploadDAO.Factory(blobIdFactory, postgresExtension.getExecutorFactory());
58-
uploadRepository = new PostgresUploadRepository(blobStore, Clock.systemUTC(), uploadFactory, uploadDAO);
54+
uploadRepository = new PostgresUploadRepository(blobIdFactory, new MemoryBlobStoreDAO(), Clock.systemUTC(), uploadFactory, uploadDAO);
5955
uploadUsageRepository = new PostgresUploadUsageRepository(new PostgresQuotaCurrentValueDAO(postgresExtension.getDefaultPostgresExecutor()));
6056
testee = new UploadServiceDefaultImpl(uploadRepository, uploadUsageRepository, UploadServiceContract.TEST_CONFIGURATION());
6157
}

0 commit comments

Comments
 (0)