-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathesq.erl
111 lines (87 loc) · 2.58 KB
/
esq.erl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
%%
%% Copyright (c) 2012, Dmitry Kolesnikov
%% All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%
%% @description
%% erlang simple queue
-module(esq).
-include("esq.hrl").
-compile({no_auto_import,[element/2]}).
-export([start/0]).
-export([
new/1
,new/2
,free/1
,enq/2
,deq/1
,deq/2
,ack/2
,head/1
]).
%%
%% data types
-type payload() :: _.
-type element() :: #{receipt => uid:l(), payload => payload()}.
-type queue() :: pid().
%%
%% start application
-spec start() -> ok.
start() ->
application:ensure_all_started(esq).
%%
%% create new queue
%% Options
%% {ttl, integer()} - message time-to-live in milliseconds,
%% expired messages are evicted from queue
%% {ttf, integer()} - message time-to-flight in milliseconds,
%% the time required to deliver message acknowledgment before it
%% reappears to client(s) again
%% {tts, integer()} - message time-to-sync in milliseconds,
%% time to update overflow queue, any overflow message remain invisible
%% for read until spool segment is synced.
%% {capacity, integer()} - size of the head
-spec new(list()) -> {ok, queue()}.
new(Path) ->
new(Path, []).
new(Path, Opts) ->
esq_queue:start_link(Path, Opts).
%%
%% close queue and release all resources
-spec free(queue()) -> ok.
free(Queue) ->
pipe:call(Queue, free).
%%
%% enqueue message to queue, exit if file operation fails
-spec enq(payload(), queue()) -> ok.
enq(E, Queue) ->
pipe:call(Queue, {enq, E}, infinity).
%%
%% dequeue message from queue, exit if file operation fails
-spec deq(queue()) -> [element()].
-spec deq(integer(), queue()) -> [element()].
deq(Queue) ->
deq(1, Queue).
deq(N, Queue) ->
pipe:call(Queue, {deq, N}, infinity).
%%
%% acknowledge message
-spec ack(uid:l(), queue()) -> ok.
ack(Uid, Queue) ->
pipe:call(Queue, {ack, Uid}, infinity).
%%
%%
-spec head(queue()) -> element() | undefined.
head(Queue) ->
pipe:call(Queue, head, infinity).