forked from koeninger/kafka-exactly-once
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.html
379 lines (278 loc) · 8.43 KB
/
index.html
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
<!DOCTYPE html>
<html>
<head>
<title>Title</title>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8"/>
<style type="text/css">
@import url(http://fonts.googleapis.com/css?family=Yanone+Kaffeesatz);
@import url(http://fonts.googleapis.com/css?family=Droid+Serif:400,700,400italic);
@import url(http://fonts.googleapis.com/css?family=Ubuntu+Mono:400,700,400italic);
body { font-family: 'Droid Serif'; }
h1, h2, h3 {
font-family: 'Yanone Kaffeesatz';
font-weight: normal;
}
img {
width: 100%;
height: auto;
}
img#kixer-logo {
width: 130px;
height: 54px;
}
ul, ol {
margin: 6px 0 6px 0;
}
li {
margin: 0 0 12px 0;
}
.remark-code, .remark-inline-code { font-family: 'Ubuntu Mono'; }
</style>
</head>
<body>
<textarea id="source">
class: center, middle
# Exactly-Once Streaming from Kafka
<image src="slides/kixer-logo.png" id="kixer-logo" />
https://github.com/koeninger/kafka-exactly-once
---
## Kafka is a ~~message queue~~ circular buffer
* Split into topic/partition
* Fixed size, based on disk space or time
* Oldest messages deleted to maintain size
* Messages are otherwise immutable
* Indexed only by offset
* Client tracks read offset, not server
###Delivery semantics are _your responsibility_
From Kafka, through a transformation, to results in your data store
---
##At-most-once
1. Save offsets
2. !! Possible failure !!
3. Save results
###On failure, restart at last saved offset, messages are lost
---
##At-least-once
1. Save results
2. !! Possible failure !!
3. Save offsets
###On failure, messages are repeated
No possible magic config option to do better than this
---
## Idempotent exactly-once
1. Save results with a natural unique key
2. !! Possible failure !!
3. Save offsets
###On failure, messages are repeated, but we don't care
Immutable messages and a pure transformation yield the same results
---
## Idempotent pros / cons
Pro:
* Simple
* Works well for shape-preserving transformations (map)
Con:
* May be hard to identify natural unique key
* Especially hard for aggregate transformations (fold)
* Won't work for destructive updates
Note:
* Results and offsets may be in different data stores
---
## Transactional exactly-once
1. Begin transaction
2. Save results
3. Save offsets
4. Ensure offsets are ok (increasing without gaps)
5. Commit transaction
### On failure, rollback, results and offsets remain in sync
---
## Transactional pros / cons
Pro:
* Works easily for any transformation
* Destructive updates ok
Con:
* More complex
* Requires a transactional data store
Note:
* Results and offsets must be in same data store
---

---
## Receiver-based stream pros / cons
Pro:
* WAL design could work with non-Kafka data sources
Con:
* Long running receivers make parallelism awkward and costly
* Duplication of write operations
* Dependent on HDFS
* Must use idempotence for exactly-once
* No access to offsets, can't use transactional approach
---

---
## Direct stream pros / cons
Pro:
* Spark partition 1:1 Kafka topic/partition, easy cheap parallelism
* No duplicate writes
* No dependency on HDFS
* Access to offsets, can use idempotent or transactional
Con:
* Specific to Kafka
* Need adequate Kafka retention (OffsetOutOfRange is _your fault_)
---
## Don't care about semantics?

### How about server cost?
---
## Basic direct stream API
```scala
val stream: InputDStream[(String, String)] =
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
streamingContext,
Map(
"metadata.broker.list" -> "localhost:9092,anotherhost:9092"),
"auto.offset.reset" -> "largest"
),
Set("sometopic", "anothertopic")
)
```
---
## Basic direct stream API delivery semantics
auto.offset.reset -> largest:
* Starts at latest offset, thus losing data
* Not at-most-once (need to set maxFailures as well)
auto.offset.reset -> smallest:
* Starts at earliest offset
* At-least-once, but replays whole log
### If you want finer grained control, must store offsets somewhere
---
## Where to store offsets
Spark Checkpoint:
* Easy
* No need to access offsets, will be used on restart
* Must use idempotent, not transactional
* Checkpoints may not be recoverable
Your own data store:
* Complex
* Need to access offsets, save them, and provide them on (re)start
* Idempotent or transactional
* Offsets are just as recoverable as your results
---
## Spark checkpoint
Same as any other Spark checkpoint:
```scala
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...) // new context
val stream = KafkaUtils.createDirectStream(...) // setup DStream
...
ssc.checkpoint(checkpointDirectory) // set checkpoint directory
ssc
}
// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(
checkpointDirectory,
functionToCreateContext _)
```
Keep in mind you still need idempotent storage of results.
Other than that, you're done.
---
## Providing offsets on (re)start
```scala
// begin from the the offsets committed to the database
val fromOffsets = DB.readOnly { implicit session =>
sql"select topic, part, off from txn_offsets".
map { resultSet =>
TopicAndPartition(resultSet.string(1), resultSet.int(2)) -> resultSet.long(3)
}.list.apply().toMap
}
val messageHandler =
(mmd: MessageAndMetadata[String, String]) => mmd.message.length
val stream: InputDStream[Int] =
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, Int](
streamingContext,
Map("metadata.broker.list" -> "localhost:9092,anotherhost:9092"),
fromOffsets,
messageHandler
)
```
This is the more advanced API. Instead of a set of topics, it takes:
* a map of TopicAndPartition -> the offset to start from
* a function to extract the desired value from each message and metadata.
---
## Accessing offsets, per message
```scala
val messageHandler =
(mmd: MessageAndMetadata[String, String]) =>
(mmd.topic, mmd.partition, mmd.offset, mmd.key, mmd.message)
```
Your message handler has full access to all of the metadata.
This may not be the most efficient way, though.
---
## Accessing offsets, per batch
```scala
stream.foreachRDD { rdd =>
// Cast the rdd to an interface that lets us get an array of OffsetRange
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val results = rdd.someTransformationUsingSparkMethods
...
// Your save method. Note that this runs on the driver
mySaveBatch(offsetRanges, results)
}
```
Each OffsetRange in the array has the following fields
* topic: Kafka topic name
* partition: Kafka partition id
* fromOffset: inclusive starting offset
* untilOffset: exclusive ending offset
---
## Accessing offsets, per partition
This is safe because each Spark partition is 1:1 with a Kafka topic/partition
```scala
stream.foreachRDD { rdd =>
// Cast the rdd to an interface that lets us get an array of OffsetRange
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreachPartition { iter =>
// index to get the correct offset range for the rdd partition we're working on
val offsetRange: OffsetRange = offsetRanges(TaskContext.get.partitionId)
val perPartitionResult = iter.someTransformationUsingScalaMethods
// Your save method. Note this runs on the executors.
mySavePartition(offsetRange, perPartitionResult)
}
}
```
This is **not safe** because there is a shuffle, so no longer 1:1
```scala
rdd.reduceByKey.foreachPartition { ...
```
---
## Storing offsets transactionally
Make sure that start of this offset range matches end of last saved offset range
```scala
// localTx is transactional
DB.localTx { implicit session =>
// store results
...
// store offsets
val offsetRows = sql"""
update txn_offsets set off = ${osr.untilOffset}
where topic = ${osr.topic} and part = ${osr.partition} and off = ${osr.fromOffset}
""".update.apply()
if (offsetRows != 1) {
// rollback and/or throw exception
}
}
```
---
class: center, middle
# Questions?
<image src="slides/kixer-logo.png" id="kixer-logo" />
https://github.com/koeninger/kafka-exactly-once
</textarea>
<script src="slides/remark-latest.min.js">
</script>
<script>
var slideshow = remark.create();
</script>
</body>
</html>