-
Notifications
You must be signed in to change notification settings - Fork 47
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weโll occasionally send you account related emails.
Already on GitHub? Sign in to your account
INTERNAL: make lop piped operations process synchronously #795
base: develop
Are you sure you want to change the base?
Conversation
9eb5496
to
076d02b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
์ผ๋ถ ๋ฆฌ๋ทฐ
|| rv.getOperationStatus().getResponse() == CollectionResponse.CANCELED) { | ||
// countdown if this is last op | ||
latch.countDown(); | ||
} else if (!rv.getOperationStatus().isSuccess()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rv.getOperationStatus().isSuccess()
์กฐ๊ฑด์ผ๋ก next ์ฐ์ฐ ์ํ ์ฌ๋ถ๋ฅผ ํ๋จํ๊ธฐ ์ด๋ ต์ต๋๋ค.
์ด ๋ถ๋ถ์ ๋ํด offline ๋
ผ์ํด์ผ ํ ๊ฒ ๊ฐ์ต๋๋ค.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
๊ด๋ จ ์ฌํญ์ ์ข ๋ ์์ธํ๊ฒ ์ ์ผ๋ฉด ๋ค์๊ณผ ๊ฐ์ต๋๋ค.
- PIPE_ERROR ์๋ต์ด ์จ ๊ฒฝ์ฐ
- ํ์ฌ์ piped ์ฐ์ฐ์์ PIPE_ERROR๋ก ์ธํด ์์ ์ํ๋์ง item ์ฐ์ฐ์ด ์์ ์ ์์ต๋๋ค.
- ์ด ๊ฒฝ์ฐ, ํ์ฌ failed Result์ ๊ฒฐ๊ณผ๋ฅผ ๋ด์ง ์๊ณ ์์ผ๋ฏ๋ก, ์ด๋ฅผ ์ถ๊ฐํด์ผ ํฉ๋๋ค.
- PIPE_END ์๋ต์ด ์จ ๊ฒฝ์ฐ
- ๊ฐ๋ณ ์ฐ์ฐ์ ๋ํด NOT_FOUND, ELEMENT_EXISTS, OVERFLOWED, OUT_OF_RANGE, TYPE_MISMATCH, BKEY_MISMATCH ์ค๋ฅ๊ฐ ๋ฐ์ํ ์ ์์ต๋๋ค.
- B+Tree collection์ ๋ํด BKEY_MISMATCH ์ค๋ฅ๊ฐ ๋ฐ์ํ ๊ฒฝ์ฐ, ์ฃผ์ด์ง element๋ง insert ์คํจํ๊ณ ๋๋จธ์ง ์ฑ๊ณตํ ์ ์์ต๋๋ค. ์ด ๊ฒฝ์ฐ, next ์ฐ์ฐ์ ์ํํ๋ ๊ฒ์ด ๋ง์ง ์๋ ์ง ? ELEMENT_EXISTS ์ค๋ฅ๋ ๋น์ทํด ๋ณด์ ๋๋ค.
- ๊ทธ ์ธ์ ์ค๋ฅ์ธ ๊ฒฝ์ฐ๋ ์บ์ ์๋ฒ์์ ์ํ์ ์ค์งํ๊ณ , PIPE_ERROR ๋ฆฌํดํด์ผ ํ์ง ์๋ ์๊ฐํฉ๋๋ค.
- ๊ฒฐ๊ตญ, ์บ์ ์๋ฒ ๋์ ๋ถ๋ถ๋ ํจ๊ป ๊ณ ๋ ค๋์ด์ผ ํ์ง ์๋ ์๊ฐํฉ๋๋ค.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
END ์๋ต์ด ์๊ณ btree์ผ ๋ BKEY_MISMATCH, map, set์ผ ๋ ELEMENT_EXISTS๋ง ๋ฐ์ํ๋ค๋ฉด ๋ค์ Operation ๋ณด๋ด๋ ๊ฒ์๋ ๋์ํฉ๋๋ค.
PIPE_ERROR๊ฐ CLIENT_ERROR, SERVER_ERROR ๋ก ์ธํด ๋ฐ์ํ๋ค๋ฉด ์์ธ๊ฐ ๋ฐ์ํด failed result๋ฅผ ์กฐํํ ์ ์์ต๋๋ค. PIPE_ERROR๊ฐ command/memory overflow ๋ก ์ธํด ๋ฐ์ํ๋ค๋ฉด ์๋ฌ๋ฅผ ๋ฌด์ํ๊ณ ๋ชจ๋ ์๋ต์ ๊ธฐ์ค์ผ๋ก ์ ์ฒด ์ฑ๊ณต/์คํจ ์ฌ๋ถ๋ง ํ๋ณํฉ๋๋ค. ๋ฐ๋ผ์ "PIPE_ERROR" ๋ผ๋ ์๋ต์ด ์๋์ง future์์๋ ํ์ธํ ์ ์์ผ๋ฉฐ, failed result์ ๊ฒฐ๊ณผ๋ฅผ ๋ฃ์ ํ์๋ ์์ต๋๋ค.
Lines 139 to 148 in f60ed48
if (line.startsWith("END") || line.startsWith("PIPE_ERROR ")) { | |
/* ENABLE_MIGRATION if */ | |
if (needRedirect()) { | |
transitionState(OperationState.REDIRECT); | |
return; | |
} | |
/* ENABLE_MIGRATION end */ | |
cb.receivedStatus((successAll) ? END : FAILED_END); | |
transitionState(OperationState.COMPLETE); | |
} else if (line.startsWith("RESPONSE ")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
๊ธฐ์กด pipe ์ฒ๋ฆฌ์๋ ์ผ๋ถ ๋ฌธ์ ๊ฐ ์๋ ๊ฒ ๊ฐ์ต๋๋ค.
- CLIENT_ERROR, SERVER_ERROR ๋ฑ์ ์ค๋ฅ๊ฐ ์๋ ๊ฒฝ์ฐ, java client๋ PIPE_ERROR ์๋ต๊น์ง ์ฝ์ง ์๋๋ค.
๊ทธ๋ฌ๋ฉด, ๋ค์ ์ฐ์ฐ์์ ์๋ต์ ์ฝ์ ์์ PIPE_ERROR ์๋ต์ ๋ณด๊ฒ ๋๋ ๋ฌธ์ ๊ฐ ์์ต๋๋ค.
๋๋ต ํ์ธํ ์ฌํญ์ด๋ผ, ์ด ๋ถ๋ถ์ ๊ดํ ์ฝ๋๋ ์ง์ ํ์ธํด ๋ณด๊ธฐ ๋ฐ๋๋๋ค. - ์๋ 2๊ฐ์ง PIPE_ERROR์ ๋ํด "END" ์๋ต์ ๋ฐ์ ๊ฒฝ์ฐ์ ๋์ผํ๊ฒ ์ฒ๋ฆฌํฉ๋๋ค.
์ฆ, ์ฒ๋ฆฌํ์ง ๋ชปํ ์ฐ์ฐ์ด ์กด์ฌํ๋๋ผ๋ ์ฒ๋ฆฌ๊ฐ ์๋ฃ๋ ๊ฒ์ผ๋ก ํ๋จํ๊ณ ์์ต๋๋ค.- PIPE_ERROR command overflow
- PIPE_ERROR memory overflow
์ด๋ฌํ pipe ์ฒ๋ฆฌ ์ค๋ฅ๋ ๋ณธ PR๊ณผ๋ ๋ณ๋๋ก ์ฒ๋ฆฌํด์ผ ํ์ฃ ?
๊ฒํ ๋ฐ๋๋๋ค.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CLIENT_ERROR, SERVER_ERROR ๋ฑ์ ์ค๋ฅ๊ฐ ์๋ ๊ฒฝ์ฐ, ๋ฐ๋ก OperationException์ด throw ๋ฉ๋๋ค.
์ด ์์ธ๋ฅผ MemcachedConnection์์ catchํ์ฌ ์ฐ๊ฒฐ์ ๋๊ธฐ ๋๋ฌธ์ ๋ค์ ์ฐ์ฐ์ด ์๋ต์ ์ฝ์ ์ ์์ ๊ฒ์ผ๋ก ๋ณด์
๋๋ค.
arcus-java-client/src/main/java/net/spy/memcached/MemcachedConnection.java
Lines 948 to 952 in 45720de
} catch (OperationException e) { | |
qa.setupForAuth("operation exception"); // noop if !shouldAuth | |
getLogger().warn("Reconnection due to exception " + | |
"handling a memcached exception on %s.", qa, e); | |
lostConnection(qa, ReconnDelay.IMMEDIATE, "operation exception"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PIPE_ERROR command/memory overflow ์ค๋ฅ ์, ์ฒ๋ฆฌ๋์ง ๋ชปํ ์ฐ์ฐ์ ๋ํด
๋ณธ PR ์ฒ๋ผ CANCELED CollectionOperationStatus ์ค์ ํ๋ ๊ฒ์ด ์ข๊ฒ ์ต๋๋ค.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PIPE_ERROR command/memory overflow ์ค๋ฅ ์, ์ฒ๋ฆฌ๋์ง ๋ชปํ ์ฐ์ฐ์ ๋ํด
๋ณธ PR ์ฒ๋ผ CANCELED CollectionOperationStatus ์ค์ ํ๋ ๊ฒ์ด ์ข๊ฒ ์ต๋๋ค.
์ ์ฌํญ์ ๊ตฌํํ๊ธฐ ์ํด
์๋ ๋ก์ง์์ cb.receivedStatus()
ํธ์ถ ์์ ์ธ์ ๊ฐ์ด ์์ ๋์ด์ผ ํ ๊ฒ ๊ฐ์ต๋๋ค.
if (line.startsWith("END") || line.startsWith("PIPE_ERROR ")) {
/* ENABLE_MIGRATION if */
if (needRedirect()) {
transitionState(OperationState.REDIRECT);
return;
}
/* ENABLE_MIGRATION end */
cb.receivedStatus((successAll) ? END : FAILED_END);
transitionState(OperationState.COMPLETE);
} else if (line.startsWith("RESPONSE ")) {
์๋์ ๊ฐ์ด ํธ์ถํด์ผ ํ ๊ฒ ๊ฐ์ต๋๋ค.
cb.receivedStatus((index == count && successAll) ? END : FAILED_END);
๋ํ, ์๋ ๊ฒฝ์ฐ์ CENCEL ์ํ๋ฅผ ์ถ๊ฐํ๋ ๋ก์ง๋ ์์ ๋์ด์ผ ํฉ๋๋ค.
if (!rv.getOperationStatus().isSuccess()) {
rv.addEachResult(index + (idx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
new CollectionOperationStatus(false, "CANCELED", CollectionResponse.CANCELED));
latch.countDown();
}
์์ ๊ฐ์ด ๋ณ๊ฒฝ๋๋ฉด,
complete() ๋ก์ง์์์ ๊ฒ์ฌ ์์๋ ๋ณ๊ฒฝ๋์ด์ผ ํฉ๋๋ค.
์๋ ์กฐ๊ฑด์ด ๊ฐ์ฅ ๋จผ์ ๊ฒ์ฌ๋์ด์ผ ํ ๊ฒ์
๋๋ค.
if (!rv.getOperationStatus().isSuccess())
90a7ee5
to
df233d9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
๋ฆฌ๋ทฐ ์๋ฃ
600f2b2
to
d232a01
Compare
@jhpark816 ๋ฆฌ๋ทฐ ๋ฐ์ํ์ต๋๋ค. |
@uhm0311 ๋ฆฌ๋ทฐ ๋ฐ๋๋๋ค. |
์ปจํ๋ฆญํธ ํ์ธํด์ฃผ์ธ์. |
237a9b6
to
f4aabe9
Compare
src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedInsertOperationImpl.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
๋ฆฌ๋ทฐ ์๋ฃ
latch.countDown(); | ||
} else if (!rv.getOperationStatus().isSuccess()) { | ||
// If this operation failed, remaining subsequent operation | ||
// should not be added and should be marked as cancelled. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
complete() ๋ก์ง์ ์๋ฒฝํ๊ฒ ๋ง๋ค๋ ค๋ฉด,
PIPE_ERROR์ด๋๋ผ๋ ๋ชจ๋ ์๋ต์ ์ฝ์ด๋ด๋ ๊ธฐ๋ฅ์ด ๋จผ์ ๊ตฌํ๋์ด์ผ ํ ๊ฒ ๊ฐ์ต๋๋ค.
์ด๋ฅผ ์ด์๋ก ์ฌ๋ฆฌ๊ณ ์์
์ ์์ํ๋ฉด ์ข๊ฒ ์ต๋๋ค.
|| rv.getOperationStatus().getResponse() == CollectionResponse.CANCELED) { | ||
// countdown if this is last op | ||
latch.countDown(); | ||
} else if (!rv.getOperationStatus().isSuccess()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PIPE_ERROR command/memory overflow ์ค๋ฅ ์, ์ฒ๋ฆฌ๋์ง ๋ชปํ ์ฐ์ฐ์ ๋ํด
๋ณธ PR ์ฒ๋ผ CANCELED CollectionOperationStatus ์ค์ ํ๋ ๊ฒ์ด ์ข๊ฒ ์ต๋๋ค.
์ ์ฌํญ์ ๊ตฌํํ๊ธฐ ์ํด
์๋ ๋ก์ง์์ cb.receivedStatus()
ํธ์ถ ์์ ์ธ์ ๊ฐ์ด ์์ ๋์ด์ผ ํ ๊ฒ ๊ฐ์ต๋๋ค.
if (line.startsWith("END") || line.startsWith("PIPE_ERROR ")) {
/* ENABLE_MIGRATION if */
if (needRedirect()) {
transitionState(OperationState.REDIRECT);
return;
}
/* ENABLE_MIGRATION end */
cb.receivedStatus((successAll) ? END : FAILED_END);
transitionState(OperationState.COMPLETE);
} else if (line.startsWith("RESPONSE ")) {
์๋์ ๊ฐ์ด ํธ์ถํด์ผ ํ ๊ฒ ๊ฐ์ต๋๋ค.
cb.receivedStatus((index == count && successAll) ? END : FAILED_END);
๋ํ, ์๋ ๊ฒฝ์ฐ์ CENCEL ์ํ๋ฅผ ์ถ๊ฐํ๋ ๋ก์ง๋ ์์ ๋์ด์ผ ํฉ๋๋ค.
if (!rv.getOperationStatus().isSuccess()) {
rv.addEachResult(index + (idx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
new CollectionOperationStatus(false, "CANCELED", CollectionResponse.CANCELED));
latch.countDown();
}
์์ ๊ฐ์ด ๋ณ๊ฒฝ๋๋ฉด,
complete() ๋ก์ง์์์ ๊ฒ์ฌ ์์๋ ๋ณ๊ฒฝ๋์ด์ผ ํฉ๋๋ค.
์๋ ์กฐ๊ฑด์ด ๊ฐ์ฅ ๋จผ์ ๊ฒ์ฌ๋์ด์ผ ํ ๊ฒ์
๋๋ค.
if (!rv.getOperationStatus().isSuccess())
src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedInsertOperationImpl.java
Show resolved
Hide resolved
src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java
Outdated
Show resolved
Hide resolved
db4f08a
to
e5edceb
Compare
src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedInsertOperationImpl.java
Show resolved
Hide resolved
9319ba0
to
8c9fee1
Compare
๐ Related Issue
โจ๏ธ What I did