Skip to content

Commit

Permalink
Add the HttpContext that started the socket.io call in the handler
Browse files Browse the repository at this point in the history
  • Loading branch information
vbfox committed Jul 5, 2017
1 parent e209e8f commit 6bd1b4c
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 21 deletions.
6 changes: 5 additions & 1 deletion Release Notes.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
### New in 0.3

* Add the `HttpContext` that started the socket.io call in the handler

### New in 0.2

* Remove CORS Specific headers
Expand All @@ -6,4 +10,4 @@

### New in 0.1

Initial open source version
Initial open source version
6 changes: 3 additions & 3 deletions src/SocketIoSuave.Sample/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ open System.Threading
open Newtonsoft.Json
open Newtonsoft.Json.Linq

let private log = Targets.create Debug [| "SocketIoSuave" |]
let private log = Targets.create Verbose [| "SocketIoSuave" |]

type NewMessageEvent = {
username: string
Expand Down Expand Up @@ -51,7 +51,7 @@ let main argv =

let mutable userCount = 0

let rec handlePacket state (socket: ISocketIoSocket) = async {
let rec handlePacket state (socket: ISocketIoSocket) httpContext = async {
let logVerbose s = log.debug (eventX (sprintf "{socketId} %s" s) >> setField "socketId" (socket.Id))
let! p = socket.Receive()
match p with
Expand Down Expand Up @@ -88,7 +88,7 @@ let main argv =
logVerbose (sprintf "Unknown command: %s" cmd)
state

return! handlePacket newState socket
return! handlePacket newState socket httpContext
| None ->
let userName = defaultArg state.userName "???"
logVerbose (sprintf "[%A] LEAVE" userName)
Expand Down
21 changes: 12 additions & 9 deletions src/SocketIoSuave/EngineIo.fs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,10 @@ type private SocketEngineCommunication =
broadcast: (SocketId option) * (PacketContent seq) -> unit
}

type private EngineIoSocket(id: SocketId, pingTimeout: TimeSpan, comms: SocketEngineCommunication, handleSocket: IEngineIoSocket -> Async<unit>) as this =
/// A function that handle a single engine.io socket
type EngineHandler = IEngineIoSocket -> HttpContext-> Async<unit>

type private EngineIoSocket(id: SocketId, pingTimeout: TimeSpan, comms: SocketEngineCommunication, handleSocket: EngineHandler) as this =
let setSocketIdField = setField "socketId" (id.ToString())
let logVerbose s = log.verbose (eventX (sprintf "{socketId} %s" s) >> setSocketIdField)
let logError s = log.error (eventX (sprintf "{socketId} %s" s) >> setSocketIdField)
Expand Down Expand Up @@ -278,11 +281,11 @@ type private EngineIoSocket(id: SocketId, pingTimeout: TimeSpan, comms: SocketEn

member val Id = id
member __.Transport with get() = transport
member __.Start() =
member __.Start(startContext: HttpContext) =
setPingTimeout ()

// Start the async handler for this socket on the threadpool
task <- handleSocket (this) |> Async.StartAsTask
task <- handleSocket this startContext |> Async.StartAsTask

// When the handler finishes, close the socket
task.ContinueWith(fun task ->
Expand Down Expand Up @@ -363,7 +366,7 @@ let private mutateField<'t when 't: not struct> (targetField: 't byref) (mutatio
let afterExchange = System.Threading.Interlocked.CompareExchange(&targetField, newValue, before)
retry <- not (obj.ReferenceEquals(before, afterExchange))

type EngineIo(config, handleSocket: IEngineIoSocket -> Async<unit>) as this =
type EngineIo(config, handleSocket: EngineHandler) as this =
let mutable sessions: Map<SocketId, EngineIoSocket> = Map.empty
let idGenerator = Base64Id.create config.RandomNumberGenerator

Expand Down Expand Up @@ -396,15 +399,15 @@ type EngineIo(config, handleSocket: IEngineIoSocket -> Async<unit>) as this =
|> setUniqueHeader "Content-Type" "text/plain; charset=UTF-8"
|> setContentBytes (payload |> PayloadEncoder.encodeToString |> System.Text.Encoding.UTF8.GetBytes)

let handleGet' engineCtx: AsyncResult<SocketId*Payload, Error> =
let handleGet' engineCtx httpContext: AsyncResult<SocketId*Payload, Error> =
match engineCtx.SocketId with
| None ->
let socketIdString = idGenerator ()
let socketId = SocketId socketIdString

let socket = new EngineIoSocket(socketId, socketTimeout, socketCommunications, handleSocket)
mutateField &sessions (fun s -> s |> Map.add socket.Id socket)
socket.Start()
socket.Start(httpContext)

log.info (eventX "{socketId} Connected, creating session" >> Message.setFieldValue "socketId" socketId)

Expand Down Expand Up @@ -435,9 +438,9 @@ type EngineIo(config, handleSocket: IEngineIoSocket -> Async<unit>) as this =
| Unknown -> simpleResponse HttpCode.HTTP_500 "Unknown error"
| UnknownSessionId -> simpleResponse HttpCode.HTTP_404 "Unknown session ID"

let handleGet engineCtx =
let handleGet engineCtx httpContext =
asyncTrial {
let! x = handleGet' engineCtx
let! x = handleGet' engineCtx httpContext
let (socketId, payload) = x
return payloadToResponse socketId payload engineCtx
}
Expand Down Expand Up @@ -548,7 +551,7 @@ type EngineIo(config, handleSocket: IEngineIoSocket -> Async<unit>) as this =
| GET, Some(Polling) ->
let engineCtx = getContext ctx.request
async {
let! result = handleGet engineCtx |> Async.ofAsyncResult
let! result = handleGet engineCtx ctx |> Async.ofAsyncResult
return returnResponse ctx (resultToHttp result)
}
| GET, Some(Websocket) ->
Expand Down
18 changes: 10 additions & 8 deletions src/SocketIoSuave/SocketIo.fs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ open SocketIoSuave.EngineIo.Engine
open System.Collections.Generic
open System.Threading.Tasks
open Newtonsoft.Json.Linq
open Newtonsoft.Json

let private initialPacket = { Packet.Type = PacketType.Connect; Namespace = "/"; EventId = None; Data = [] }

Expand All @@ -39,7 +40,7 @@ type SocketIoConfig =
PingInterval = TimeSpan.FromSeconds(3.)
PingTimeout = TimeSpan.FromSeconds(10.)
}
JsonSerializer = Newtonsoft.Json.JsonSerializer()
JsonSerializer = Newtonsoft.Json.JsonSerializer(NullValueHandling = NullValueHandling.Include, MissingMemberHandling = MissingMemberHandling.Error)
}

let private log = Log.create "SocketIoSuave.SocketIo"
Expand Down Expand Up @@ -81,7 +82,9 @@ let private mkEvent (eventName: string) args serializer =
Data = jsonEventName :: jsonArgs
}

type private SocketIoSocket(config : SocketIoConfig, engineSocket: IEngineIoSocket, handlePackets: ISocketIoSocket -> Async<unit>) as this =
type SocketHandler = ISocketIoSocket -> HttpContext -> Async<unit>

type private SocketIoSocket(config : SocketIoConfig, engineSocket: IEngineIoSocket, handlePackets: SocketHandler) as this =
let setSocketIdField = setField "socketId" (engineSocket.Id)

let closeLock = new obj()
Expand Down Expand Up @@ -161,9 +164,9 @@ type private SocketIoSocket(config : SocketIoConfig, engineSocket: IEngineIoSock
do
incomming.Error.Add(onMailBoxError)

member __.Handle() =
member __.Handle(httpContext: HttpContext) =
// Start the async handler for this socket on the threadpool
task <- handlePackets (this) |> Async.StartAsTask
task <- handlePackets this httpContext |> Async.StartAsTask

// When the (user provided) handler finishes, close the socket
task.ContinueWith(fun _ ->
Expand All @@ -188,7 +191,6 @@ type private SocketIoSocket(config : SocketIoConfig, engineSocket: IEngineIoSock
>> setField "error" ex)
this.Close()
}


member __.Close() =
lock closeLock (fun _ ->
Expand All @@ -206,10 +208,10 @@ type private SocketIoSocket(config : SocketIoConfig, engineSocket: IEngineIoSock
member __.Broadcast(eventName, args) = broadcast (mkEvent eventName args config.JsonSerializer)
member __.Close() = this.Close()

type SocketIo(config: SocketIoConfig, handlePackets: ISocketIoSocket -> Async<unit>) =
let handleSocket (engineSocket: IEngineIoSocket) =
type SocketIo(config: SocketIoConfig, handlePackets: SocketHandler) =
let handleSocket (engineSocket: IEngineIoSocket) httpContext =
let socket = new SocketIoSocket(config, engineSocket, handlePackets)
socket.Handle()
socket.Handle(httpContext)

let engine = new EngineIo(config.EngineConfig, handleSocket)

Expand Down

0 comments on commit 6bd1b4c

Please sign in to comment.