Skip to content

Commit d63e34c

Browse files
author
Allen Short
committed
AMP eval server
1 parent 5e7312a commit d63e34c

File tree

3 files changed

+229
-0
lines changed

3 files changed

+229
-0
lines changed

devbot/amp.mt

+200
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
# Copyright (C) 2015 Google Inc. All rights reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4+
# use this file except in compliance with the License. You may obtain a copy
5+
# of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11+
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12+
# License for the specific language governing permissions and limitations
13+
# under the License.
14+
15+
16+
import "lib/enum" =~ [=> makeEnum]
17+
import "lib/codec/utf8" =~ [=> UTF8 :DeepFrozen]
18+
import "lib/tubes" =~ [
19+
=> nullPump :DeepFrozen,
20+
=> makeMapPump :DeepFrozen,
21+
=> makeStatefulPump :DeepFrozen,
22+
=> makePumpTube :DeepFrozen,
23+
=> chain :DeepFrozen,
24+
]
25+
26+
exports (makeAMPServer, makeAMPClient)
27+
28+
# Either we await a key length, value length, or string.
29+
def [AMPState :DeepFrozen,
30+
KEY :DeepFrozen,
31+
VALUE :DeepFrozen,
32+
STRING :DeepFrozen
33+
] := makeEnum(["AMP key length", "AMP value length", "AMP string"])
34+
35+
def makeAMPPacketMachine() as DeepFrozen:
36+
var packetMap :Map := [].asMap()
37+
var pendingKey :Str := ""
38+
var results :List := []
39+
40+
return object AMPPacketMachine:
41+
to getStateGuard():
42+
return AMPState
43+
44+
to getInitialState():
45+
return [KEY, 2]
46+
47+
to advance(state :AMPState, data):
48+
switch (state):
49+
match ==KEY:
50+
# We have two bytes of data representing key length.
51+
# Except the first byte is always 0x00.
52+
def len := data[1]
53+
# If the length was zero, then it was the end-of-packet
54+
# marker. Go ahead and snip the packet.
55+
if (len == 0):
56+
results with= (packetMap)
57+
packetMap := [].asMap()
58+
return [KEY, 2]
59+
60+
# Otherwise, get the actual key string.
61+
return [STRING, len]
62+
match ==VALUE:
63+
# Same as the KEY case, but without EOP.
64+
def len := (data[0] << 8) | data[1]
65+
return [STRING, len]
66+
match ==STRING:
67+
# First, decode.
68+
def s := UTF8.decode(_makeBytes.fromInts(data), null)
69+
# Was this for a key or a value? We'll guess based on
70+
# whether there's a pending key.
71+
if (pendingKey == ""):
72+
# This was a key.
73+
pendingKey := s
74+
return [VALUE, 2]
75+
else:
76+
# This was a value.
77+
packetMap with= (pendingKey, s)
78+
pendingKey := ""
79+
return [KEY, 2]
80+
81+
to results():
82+
return results
83+
84+
85+
def packAMPPacket(packet) as DeepFrozen:
86+
var buf := []
87+
for via (UTF8.encode) key => via (UTF8.encode) value in packet:
88+
def keySize :(Int <= 0xff) := key.size()
89+
buf += [0x00, keySize]
90+
buf += _makeList.fromIterable(key)
91+
def valueSize :(Int <= 0xffff) := value.size()
92+
buf += [valueSize >> 8, valueSize & 0xff]
93+
buf += _makeList.fromIterable(value)
94+
buf += [0x00, 0x00]
95+
return _makeBytes.fromInts(buf)
96+
97+
98+
def makeAMP(drain) as DeepFrozen:
99+
var responder := null
100+
var buf := []
101+
var serial :Int := 0
102+
var pending := [].asMap()
103+
104+
return object AMP:
105+
to flowingFrom(upstream):
106+
null
107+
108+
to flowAborted(reason):
109+
null
110+
111+
to flowStopped(reason):
112+
null
113+
114+
to sendPacket(packet :Bytes):
115+
buf with= (packet)
116+
when (drain) ->
117+
if (drain != null):
118+
for item in buf:
119+
drain.receive(item)
120+
buf := []
121+
122+
to receive(item):
123+
# Either it's a new command, a successful reply, or a failure.
124+
switch (item):
125+
match [=> _command] | var arguments:
126+
# New command.
127+
if (responder == null):
128+
traceln(`AMP: No responder to handle command`)
129+
return
130+
131+
def _answer := if (arguments.contains("_ask")) {
132+
def [=> _ask] | args := arguments
133+
arguments := args
134+
_ask
135+
} else {null}
136+
def result := responder<-(_command, arguments)
137+
if (serial != null):
138+
when (result) ->
139+
def packet := result | [=> _answer]
140+
AMP.sendPacket(packAMPPacket(packet))
141+
catch _error_description:
142+
def packet := result | [=> _answer,
143+
=> _error_description]
144+
AMP.sendPacket(packAMPPacket(packet))
145+
match [=> _answer] | arguments:
146+
# Successful reply.
147+
def answer := _makeInt.fromBytes(_answer)
148+
if (pending.contains(answer)):
149+
pending[answer].resolve(arguments)
150+
pending without= (answer)
151+
match [=> _error] | arguments:
152+
# Error reply.
153+
def error := _makeInt(_error)
154+
if (pending.contains(error)):
155+
def [=> _error_description := "unknown error"] | _ := arguments
156+
pending[error].smash(_error_description)
157+
pending without= (error)
158+
match _:
159+
pass
160+
161+
to send(command :Str, var arguments :Map, expectReply :Bool):
162+
if (expectReply):
163+
arguments |= ["_command" => command, "_ask" => `$serial`]
164+
def [p, r] := Ref.promise()
165+
pending |= [serial => r]
166+
serial += 1
167+
AMP.sendPacket(packAMPPacket(arguments))
168+
return p
169+
else:
170+
AMP.sendPacket(packAMPPacket(arguments))
171+
172+
to setResponder(r):
173+
responder := r
174+
175+
176+
def makeAMPServer(endpoint) as DeepFrozen:
177+
return object AMPServerEndpoint:
178+
to listen(callback):
179+
def f(fount, drain):
180+
def amp := makeAMP(drain)
181+
chain([
182+
fount,
183+
makePumpTube(makeStatefulPump(makeAMPPacketMachine())),
184+
amp,
185+
])
186+
callback(amp)
187+
endpoint.listen(f)
188+
189+
190+
def makeAMPClient(endpoint) as DeepFrozen:
191+
return object AMPClientEndpoint:
192+
to connect():
193+
def [fount, drain] := endpoint.connect()
194+
def amp := makeAMP(drain)
195+
chain([
196+
fount,
197+
makePumpTube(makeStatefulPump(makeAMPPacketMachine())),
198+
amp,
199+
])
200+
return amp

devbot/repl-server.mt

+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
import "devbot/amp" =~ [=> makeAMPServer :DeepFrozen]
2+
import "lib/entropy/entropy" =~ [=> makeEntropy :DeepFrozen]
3+
import "lib/entropy/pcg" =~ [=> makePCG :DeepFrozen]
4+
exports (main)
5+
6+
def main(argv, => makeTCP4ServerEndpoint, => currentRuntime) :Int as DeepFrozen:
7+
def [_, seed] := currentRuntime.getCrypt().makeSecureEntropy().getEntropy()
8+
def e := makeEntropy(makePCG(seed, 0))
9+
def sessions := [].asMap().diverge()
10+
def listener(command, args, => FAIL):
11+
if (command == "Eval" && args =~ [=> var session := null, => sourceText]):
12+
if (session == null):
13+
session := e.nextInt(2 ** 32)
14+
sessions[session] := safeScope
15+
def [val, newScope] := eval.evalToPair(sourceText, sessions[session])
16+
sessions[session] := newScope
17+
return ["result" => M.toQuote(val), "session" => M.toString(session)]
18+
else:
19+
FAIL(`$command[${", ".join(args.getKeys())}] not recognized`)
20+
def amp := makeAMPServer(makeTCP4ServerEndpoint(4960))
21+
amp.listen(fn a {a.setResponder(listener)})
22+
return 0
23+

mt.json

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"name": "devbot",
3+
"paths": ["devbot"],
4+
"entrypoint": "devbot/repl-server",
5+
"dependencies": {}
6+
}

0 commit comments

Comments
 (0)