Skip to content

Commit

Permalink
make Ocpp1XJsonServer use OcppJsonServer
Browse files Browse the repository at this point in the history
  • Loading branch information
Reinier Lamers committed Oct 13, 2018
1 parent b988bf3 commit d088de7
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,14 @@ import java.time.ZonedDateTime
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.io.Source

import messages.v1x._
import api._
import OcppJsonServer.OutgoingEndpoint

object ExampleServerTestApp extends App {

val server = new OcppJsonServer(2345, Version.V15) {
val server = new Ocpp1XJsonServer(2345, Version.V15) {

override def handleConnection(chargePointIdentity: String, conn: OutgoingEndpoint): CentralSystemRequestHandler = {
override def handleConnection(chargePointIdentity: String, conn: Ocpp1XJsonServer.OutgoingEndpoint): CentralSystemRequestHandler = {

conn.onClose.foreach(_ => println(s"Disconnected client $chargePointIdentity"))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import scala.concurrent.duration._
import scala.util.Success
import org.specs2.mutable.Specification
import messages.v1x._
import OcppJsonServer.OutgoingEndpoint
import server.Ocpp20JsonServer
import messages.v20._

class ClientServerIntegrationSpec extends Specification {
Expand All @@ -29,8 +27,8 @@ class ClientServerIntegrationSpec extends Specification {
val testResponse = HeartbeatRes(ZonedDateTime.of(2017, 7, 7, 12, 30, 6, 0, ZoneId.of("UTC")))
val serverStarted = Promise[Unit]()

val server = new OcppJsonServer(testPort, Version.V16) {
def handleConnection(cpSerial: String, remote: OutgoingEndpoint): CentralSystemRequestHandler = {
val server = new Ocpp1XJsonServer(testPort, Version.V16) {
def handleConnection(cpSerial: String, remote: Ocpp1XJsonServer.OutgoingEndpoint): CentralSystemRequestHandler = {
(req: CentralSystemReq) =>
req match {
case HeartbeatReq =>
Expand Down Expand Up @@ -73,9 +71,9 @@ class ClientServerIntegrationSpec extends Specification {

val clientResponsePromise = Promise[GetLocalListVersionRes]()

val server = new OcppJsonServer(testPort, Version.V16) {
val server = new Ocpp1XJsonServer(testPort, Version.V16) {

def handleConnection(cpSerial: String, remote: OutgoingEndpoint): CentralSystemRequestHandler = {
def handleConnection(cpSerial: String, remote: Ocpp1XJsonServer.OutgoingEndpoint): CentralSystemRequestHandler = {
(req: CentralSystemReq) =>
req match {
case HeartbeatReq =>
Expand Down Expand Up @@ -143,16 +141,21 @@ class ClientServerIntegrationSpec extends Specification {
val serverStarted = Promise[Unit]()

val server: Ocpp20JsonServer = new Ocpp20JsonServer(testPort) {
def handleConnection(cpSerial: String, remote: server.OutgoingEndpoint): CsmsRequestHandler = new CsmsRequestHandler {
def apply[REQ <: CsmsRequest, RES <: CsmsResponse](req: REQ)(implicit reqRes: CsmsReqRes[REQ, RES], ec: ExecutionContext): Future[RES] = {
def handleConnection(cpSerial: String, remote: Ocpp20JsonServer.OutgoingEndpoint): CsmsRequestHandler =

new CsmsRequestHandler {

def apply[REQ <: CsmsRequest, RES <: CsmsResponse](req: REQ)
(implicit reqRes: CsmsReqRes[REQ, RES], ec: ExecutionContext): Future[RES] = {

req match {
case BootNotificationRequest(cs, r) =>
Future.successful(testResponse.asInstanceOf[RES])
case _ =>
Future.failed(OcppException(PayloadErrorCode.InternalError, s"Unexpected request in test server: $req"))
}
}
}
}

override def onStart(): Unit = {
serverStarted.complete(Success(()))
Expand All @@ -169,12 +172,16 @@ class ClientServerIntegrationSpec extends Specification {
testSerial,
new URI(s"http://127.0.0.1:$testPort/")
) {
new CsRequestHandler {
def apply[REQ <: CsRequest, RES <: CsResponse](req: REQ)(implicit reqRes: CsReqRes[REQ, RES], ec: ExecutionContext): Future[RES] = {
Future.failed(OcppException(PayloadErrorCode.InternalError, "No incoming charge point request expected in this test"))

new CsRequestHandler {

def apply[REQ <: CsRequest, RES <: CsResponse](req: REQ)
(implicit reqRes: CsReqRes[REQ, RES], ec: ExecutionContext): Future[RES] = {

Future.failed(OcppException(PayloadErrorCode.InternalError, "No incoming charge point request expected in this test"))
}
}
}
}

Await.result(client.send(testRequest), 1.second) mustEqual testResponse
} finally server.stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ trait Ocpp1XConnectionComponent[

this: SrpcComponent =>

implicit val executionContext: ExecutionContext
protected implicit val executionContext: ExecutionContext

trait Ocpp1XConnection extends BaseOcppConnection {
/** The version of the OCPP protocol used on this connection */
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package com.thenewmotion.ocpp
package json

import scala.language.higherKinds
import messages._
import messages.v1x._
import messages.v20._
import messages.v20.{Request => _, Response => _, _}

package object api {

Expand All @@ -19,7 +21,30 @@ package object api {
RequestHandler[CsmsRequest, CsmsResponse, CsmsReqRes]

val OcppJsonClient: client.OcppJsonClient.type = client.OcppJsonClient

type OcppJsonServer = server.Ocpp1XJsonServer
val OcppJsonServer: server.Ocpp1XJsonServer.type = server.Ocpp1XJsonServer
type OcppJsonClient[
F <: VersionFamily,
OQ <: Request,
IS <: Response,
ORR[_ <: OQ, _ <: IS] <: ReqRes[_, _],
IQ <: Request,
OS <: Response,
IRR[_ <: IQ, _ <: OS] <: ReqRes[_, _]
] = client.OcppJsonClient[F, OQ, IS, ORR, IQ, OS, IRR]
type Ocpp1XJsonClient = client.Ocpp1XJsonClient
type Ocpp20JsonClient = client.Ocpp20JsonClient

val OcppJsonServer: server.OcppJsonServer.type = server.OcppJsonServer
type OcppJsonServer[
F <: VersionFamily,
OQ <: Request,
IS <: Response,
ORR[_ <: OQ, _ <: IS] <: ReqRes[_, _],
IQ <: Request,
OS <: Response,
IRR[_ <: IQ, _ <: OS] <: ReqRes[_, _]
] = server.OcppJsonServer[F, OQ, IS, ORR, IQ, OS, IRR]
val Ocpp1XJsonServer: server.Ocpp1XJsonServer.type = server.Ocpp1XJsonServer
type Ocpp1XJsonServer = server.Ocpp1XJsonServer
val Ocpp20JsonServer: server.Ocpp20JsonServer.type = server.Ocpp20JsonServer
type Ocpp20JsonServer = server.Ocpp20JsonServer
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,125 +2,40 @@ package com.thenewmotion.ocpp
package json.api
package server

import java.net.InetSocketAddress
import java.util.Collections

import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.ExecutionContext
import org.java_websocket.WebSocket
import org.java_websocket.drafts.{Draft, Draft_6455}
import org.java_websocket.handshake.ClientHandshake
import org.java_websocket.server.WebSocketServer
import messages.v1x._
import Ocpp1XJsonServer._
import org.java_websocket.protocols.{IProtocol, Protocol}

/**
* A simple server implementation to show how this library can be used in servers.
*
* @param listenPort The port to listen on
* @param requestedOcppVersion The OCPP version to serve (either 1.5 or 1.6; negotiation is not supported)
*/
abstract class Ocpp1XJsonServer(listenPort: Int, requestedOcppVersion: Version)
extends WebSocketServer(
new InetSocketAddress(listenPort),
Collections.singletonList[Draft](new Draft_6455(
Collections.emptyList(),
Collections.singletonList[IProtocol](new Protocol(protosForVersions(requestedOcppVersion)))
))
){

private type OcppCake = CentralSystemOcpp1XConnectionComponent with DefaultSrpcComponent with SimpleServerWebSocketComponent

private val ocppConnections: mutable.Map[WebSocket, OcppCake] = mutable.HashMap[WebSocket, OcppCake]()

/**
* This method should be overridden by the user of this class to define the behavior of the Central System. It will
* be called once for each connection to this server that is established.
*
* @param clientChargePointIdentity The charge point identity of the client
* @param remote An OutgoingEndpoint to send messages to the Charge Point or close the connection
*
* @return The handler for incoming requests from the Charge Point
*/
def handleConnection(clientChargePointIdentity: String, remote: OutgoingEndpoint): CentralSystemRequestHandler

override def onStart(): Unit = {}

override def onOpen(conn: WebSocket, hndshk: ClientHandshake): Unit = {

val uri = hndshk.getResourceDescriptor
uri.split("/").lastOption match {

case None =>
conn.close(1003, "No ChargePointIdentity in path")

case Some(chargePointIdentity) =>
onOpenWithCPIdentity(conn, chargePointIdentity)
}
}

private def onOpenWithCPIdentity(conn : WebSocket, chargePointIdentity: String): Unit = {
val ocppConnection = new CentralSystemOcpp1XConnectionComponent with DefaultSrpcComponent with SimpleServerWebSocketComponent {
override val ocppConnection: Ocpp1XConnection = defaultCentralSystemOcppConnection

override val srpcConnection: DefaultSrpcConnection = new DefaultSrpcConnection()

override val webSocketConnection: SimpleServerWebSocketConnection = new SimpleServerWebSocketConnection {
val webSocket: WebSocket = conn
}

private val outgoingEndpoint = new OutgoingEndpoint {
def send[REQ <: ChargePointReq, RES <: ChargePointRes](req: REQ)(implicit reqRes: ChargePointReqRes[REQ, RES]): Future[RES] =
ocppConnection.sendRequest(req)

def close(): Future[Unit] = srpcConnection.close()

val onClose: Future[Unit] = srpcConnection.onClose
}

private val requestHandler = handleConnection(chargePointIdentity, outgoingEndpoint)

def onRequest[REQ <: CentralSystemReq, RES <: CentralSystemRes](req: REQ)(implicit reqRes: CentralSystemReqRes[REQ, RES]) =
requestHandler.apply(req)

implicit val executionContext: ExecutionContext = concurrent.ExecutionContext.Implicits.global
abstract class Ocpp1XJsonServer(listenPort: Int, requestedOcppVersion: Version1X)(implicit ec: ExecutionContext)
extends OcppJsonServer[
VersionFamily.V1X.type,
ChargePointReq,
ChargePointRes,
ChargePointReqRes,
CentralSystemReq,
CentralSystemRes,
CentralSystemReqRes
](listenPort, requestedOcppVersion) {
protected def newConnectionCake(
connection: WebSocket,
chargePointIdentity: String
): BaseConnectionCake =
new BaseConnectionCake(connection, chargePointIdentity)
with CentralSystemOcpp1XConnectionComponent {

def ocppConnection: Ocpp1XConnection = defaultCentralSystemOcppConnection

def ocppVersion: Version = requestedOcppVersion
}

ocppConnections.put(conn, ocppConnection)
()
}

override def onClose(
conn: WebSocket,
code: Int,
reason: IdTag,
remote: Boolean
): Unit = {
ocppConnections.remove(conn) foreach { c =>
c.feedIncomingDisconnect()
}
}

override def onMessage(conn: WebSocket, message: String): Unit =
ocppConnections.get(conn) foreach { c =>
c.feedIncomingMessage(message)
}

override def onError(conn: WebSocket, ex: Exception): Unit =
ocppConnections.get(conn) foreach { c =>
c.feedIncomingError(ex)
}
}

object Ocpp1XJsonServer {
type OutgoingEndpoint = OutgoingOcppEndpoint[ChargePointReq, ChargePointRes, ChargePointReqRes]

private val protosForVersions = Map[Version, String](
Version.V15 -> "ocpp1.5",
Version.V16 -> "ocpp1.6"
)
}

Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,7 @@ abstract class Ocpp20JsonServer(listenPort: Int)(implicit ec: ExecutionContext)
): BaseConnectionCake = new BaseConnectionCake(connection, chargePointIdentity) with CsmsOcpp20ConnectionComponent
}

object Ocpp20JsonServer {
type OutgoingEndpoint = OutgoingOcppEndpoint[CsRequest, CsResponse, CsReqRes]
}

0 comments on commit d088de7

Please sign in to comment.