diff --git a/packages/rsocket-core/src/RSocketError.js b/packages/rsocket-core/src/RSocketError.js
new file mode 100644
index 00000000..dd5632b0
--- /dev/null
+++ b/packages/rsocket-core/src/RSocketError.js
@@ -0,0 +1,26 @@
+/** Copyright (c) Facebook, Inc. and its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * @flow
+ */
+
+'use strict';
+
+export default class RSocketError extends Error {
+  +errorCode: number;
+  constructor(errorCode: number, message: string) {
+    super(message);
+    this.errorCode = errorCode;
+  }
+}
diff --git a/packages/rsocket-core/src/RSocketMachine.js b/packages/rsocket-core/src/RSocketMachine.js
index 02bbb3cc..a1dcb524 100644
--- a/packages/rsocket-core/src/RSocketMachine.js
+++ b/packages/rsocket-core/src/RSocketMachine.js
@@ -58,6 +58,7 @@ import {
   ResponderLeaseHandler,
   Disposable,
 } from './RSocketLease';
+import RSocketError from './RSocketError';
 
 type Role = 'CLIENT' | 'SERVER';
 
@@ -443,7 +444,7 @@ class RSocketMachineImpl<D, M> implements RSocketMachine<D, M> {
                     this._sendStreamComplete(streamId);
                   },
                   onError: error => {
-                    this._sendStreamError(streamId, error.message);
+                    this._sendStreamError(streamId, error);
                   },
                   //Subscriber methods
                   onNext: payload => {
@@ -677,7 +678,7 @@ class RSocketMachineImpl<D, M> implements RSocketMachine<D, M> {
     if (this._isRequest(frame.type)) {
       const leaseError = this._useLeaseOrError(this._responderLeaseHandler);
       if (leaseError) {
-        this._sendStreamError(streamId, leaseError);
+        this._sendStreamError(streamId, new Error(leaseError));
         return;
       }
     }
@@ -758,7 +759,7 @@ class RSocketMachineImpl<D, M> implements RSocketMachine<D, M> {
       onComplete: payload => {
         this._sendStreamPayload(streamId, payload, true);
       },
-      onError: error => this._sendStreamError(streamId, error.message),
+      onError: error => this._sendStreamError(streamId, error),
       onSubscribe: cancel => {
         const subscription = {
           cancel,
@@ -773,7 +774,7 @@ class RSocketMachineImpl<D, M> implements RSocketMachine<D, M> {
     const payload = this._deserializePayload(frame);
     this._requestHandler.requestStream(payload).subscribe({
       onComplete: () => this._sendStreamComplete(streamId),
-      onError: error => this._sendStreamError(streamId, error.message),
+      onError: error => this._sendStreamError(streamId, error),
       onNext: payload => this._sendStreamPayload(streamId, payload),
       onSubscribe: subscription => {
         this._subscriptions.set(streamId, subscription);
@@ -835,7 +836,7 @@ class RSocketMachineImpl<D, M> implements RSocketMachine<D, M> {
 
     this._requestHandler.requestChannel(framesToPayloads).subscribe({
       onComplete: () => this._sendStreamComplete(streamId),
-      onError: error => this._sendStreamError(streamId, error.message),
+      onError: error => this._sendStreamError(streamId, error),
       onNext: payload => this._sendStreamPayload(streamId, payload),
       onSubscribe: subscription => {
         this._subscriptions.set(streamId, subscription);
@@ -864,16 +865,19 @@ class RSocketMachineImpl<D, M> implements RSocketMachine<D, M> {
     });
   }
 
-  _sendStreamError(streamId: number, errorMessage: string): void {
+  _sendStreamError(streamId: number, err: Error): void {
     this._subscriptions.delete(streamId);
     this._connection.sendOne({
-      code: ERROR_CODES.APPLICATION_ERROR,
+      code:
+        err instanceof RSocketError
+          ? err.errorCode
+          : ERROR_CODES.APPLICATION_ERROR,
       flags: 0,
-      message: errorMessage,
+      message: err.message,
       streamId,
       type: FRAME_TYPES.ERROR,
     });
-    const error = new Error(`terminated from the requester: ${errorMessage}`);
+    const error = new Error(`terminated from the requester: ${err.message}`);
     this._handleStreamError(streamId, error);
   }
 
diff --git a/packages/rsocket-core/src/__tests__/RSocketServer-test.js b/packages/rsocket-core/src/__tests__/RSocketServer-test.js
index 0b0bd843..4ebaab9f 100644
--- a/packages/rsocket-core/src/__tests__/RSocketServer-test.js
+++ b/packages/rsocket-core/src/__tests__/RSocketServer-test.js
@@ -31,6 +31,7 @@ import {genMockConnection} from 'MockDuplexConnection';
 import {genMockSubscriber} from 'MockFlowableSubscriber';
 import {genMockPublisher} from 'MockFlowableSubscription';
 import {Single, Flowable} from 'rsocket-flowable';
+import RSocketError from '../RSocketError';
 
 jest.useFakeTimers();
 
@@ -226,6 +227,56 @@ describe('RSocketServer', () => {
       expect(console.error).toHaveBeenCalled();
     });
 
+    it('sends custom error code if request handler throws RSocketError', () => {
+      console.error = jest.fn();
+      const transport = genMockTransportServer();
+      const server = new RSocketServer({
+        getRequestHandler: () => {
+          return {
+            requestResponse: () => {
+              throw new RSocketError(1234, 'Custom Error');
+            },
+          };
+        },
+        transport,
+      });
+      server.start();
+      transport.mock.connect();
+      connection.receive.mock.publisher.onNext({
+        type: FRAME_TYPES.SETUP,
+        data: undefined,
+        dataMimeType: '<dataMimeType>',
+        flags: 0,
+        keepAlive: 42,
+        lifetime: 2017,
+        metadata: undefined,
+        metadataMimeType: '<metadataMimeType>',
+        resumeToken: null,
+        streamId: 0,
+        majorVersion: 1,
+        minorVersion: 0,
+      });
+      jest.runOnlyPendingTimers();
+      connection.receive.mock.publisher.onNext({
+        type: FRAME_TYPES.REQUEST_RESPONSE,
+        data: undefined,
+        dataMimeType: '<dataMimeType>',
+        flags: 0,
+        metadata: undefined,
+        metadataMimeType: '<metadataMimeType>',
+        streamId: 1,
+      });
+      expect(connection.sendOne.mock.calls.length).toBe(1);
+      expect(connection.sendOne.mock.frame).toEqual({
+        code: 1234,
+        flags: 0,
+        message: 'Custom Error',
+        streamId: 1,
+        type: FRAME_TYPES.ERROR,
+      });
+      expect(console.error).toHaveBeenCalled();
+    });
+
     it('call subscription.cancel() for all active subscriptions', () => {
       let cancelled = false;
       const transport = genMockTransportServer();
diff --git a/packages/rsocket-core/src/index.js b/packages/rsocket-core/src/index.js
index 0b31e5ef..dbeeb825 100644
--- a/packages/rsocket-core/src/index.js
+++ b/packages/rsocket-core/src/index.js
@@ -39,6 +39,10 @@ import RSocketServer from './RSocketServer';
 
 export {RSocketServer};
 
+import RSocketError from './RSocketError';
+
+export {RSocketError};
+
 import RSocketResumableTransport from './RSocketResumableTransport';
 
 export {RSocketResumableTransport};