diff --git a/std/net/curl.d b/std/net/curl.d index 1c0b53ac914..51f24e83456 100644 --- a/std/net/curl.d +++ b/std/net/curl.d @@ -156,36 +156,30 @@ Distributed under the Boost Software License, Version 1.0. */ module std.net.curl; -import core.thread; -import etc.c.curl; -import std.concurrency; -import std.encoding; -import std.exception; -import std.meta; -import std.range.primitives; -import std.socket : InternetAddress; -import std.traits; -import std.typecons; - -import std.internal.cstring; - public import etc.c.curl : CurlOption; +import core.time : dur; +import etc.c.curl : CURLcode; +import std.concurrency : Tid; +import std.range.primitives; +import std.encoding : EncodingScheme; +import std.traits : isSomeChar; +import std.typecons : Flag, Yes, No, Tuple; version(unittest) { + import std.socket : Socket; + // Run unit test with the PHOBOS_TEST_ALLOW_NET=1 set in order to // allow net traffic - import std.range; - import std.stdio; - - import std.socket : Address, INADDR_LOOPBACK, Socket, TcpSocket; - private struct TestServer { + import std.socket : Socket, TcpSocket; + string addr() { return _addr; } void handle(void function(Socket s) dg) { + import std.concurrency : send; tid.send(dg); } @@ -195,6 +189,9 @@ version(unittest) static void loop(shared TcpSocket listener) { + import std.concurrency : OwnerTerminated, receiveOnly; + import std.stdio : stderr; + try while (true) { void function(Socket) handler = void; @@ -215,6 +212,9 @@ version(unittest) private TestServer startServer() { + import std.concurrency : spawn; + import std.socket : INADDR_LOOPBACK, InternetAddress, TcpSocket; + auto sock = new TcpSocket; sock.bind(new InternetAddress(INADDR_LOOPBACK, InternetAddress.PORT_ANY)); sock.listen(1); @@ -225,6 +225,7 @@ version(unittest) private ref TestServer testServer() { + import std.concurrency : initOnce; __gshared TestServer server; return initOnce!server(startServer()); } @@ -313,7 +314,7 @@ version(unittest) version(StdDdoc) import std.stdio; // Default data timeout for Protocols -private enum _defaultDataTimeout = dur!"minutes"(2); +enum _defaultDataTimeout = dur!"minutes"(2); /** Macros: @@ -758,6 +759,7 @@ if (isCurlConn!Conn) { import std.algorithm.searching : findSplitAfter; import std.conv : text; + import std.exception : enforce; auto trimmed = url.findSplitAfter("ftp://")[1]; auto t = trimmed.findSplitAfter("/"); @@ -980,6 +982,8 @@ private auto _basicHTTP(T)(const(char)[] url, const(void)[] sendData, HTTP clien { import std.algorithm.comparison : min; import std.format : format; + import std.exception : enforce; + import etc.c.curl : CurlSeek, CurlSeekPos; immutable doSend = sendData !is null && (client.method == HTTP.Method.post || @@ -1056,6 +1060,7 @@ private auto _basicHTTP(T)(const(char)[] url, const(void)[] sendData, HTTP clien @system unittest { import std.algorithm.searching : canFind; + import std.exception : collectException; testServer.handle((s) { auto req = s.recvReq; @@ -1176,6 +1181,7 @@ private auto _decodeContent(T)(ubyte[] content, string encoding) } else { + import std.exception : enforce; import std.format : format; // Optimally just return the utf8 encoded content @@ -1281,6 +1287,7 @@ if (isCurlConn!Conn && isSomeChar!Char && isSomeChar!Terminator) @property @safe Char[] front() { + import std.exception : enforce; enforce!CurlException(currentValid, "Cannot call front() on empty range"); return current; } @@ -1288,6 +1295,7 @@ if (isCurlConn!Conn && isSomeChar!Char && isSomeChar!Terminator) void popFront() { import std.algorithm.searching : findSplitAfter, findSplit; + import std.exception : enforce; enforce!CurlException(currentValid, "Cannot call popFront() on empty range"); if (lines.empty) @@ -1437,6 +1445,8 @@ private T[] _getForRange(T,Conn)(const(char)[] url, Conn conn) */ private mixin template WorkerThreadProtocol(Unit, alias units) { + import core.time : Duration; + @property bool empty() { tryEnsureUnits(); @@ -1455,7 +1465,9 @@ private mixin template WorkerThreadProtocol(Unit, alias units) void popFront() { + import std.concurrency : send; import std.format : format; + tryEnsureUnits(); assert(state == State.gotUnits, format("Expected %s but got $s", @@ -1471,7 +1483,9 @@ private mixin template WorkerThreadProtocol(Unit, alias units) */ bool wait(Duration d) { + import core.time : dur; import std.datetime.stopwatch : StopWatch; + import std.concurrency : receiveTimeout; if (state == State.gotUnits) return true; @@ -1522,6 +1536,7 @@ private mixin template WorkerThreadProtocol(Unit, alias units) void tryEnsureUnits() { + import std.concurrency : receive; while (true) { final switch (state) @@ -1566,6 +1581,8 @@ private static struct AsyncLineInputRange(Char) private this(Tid tid, size_t transmitBuffers, size_t bufferSize) { + import std.concurrency : send; + workerTid = tid; state = State.needUnits; @@ -1656,6 +1673,7 @@ if (isCurlConn!Conn && isSomeChar!Char && isSomeChar!Terminator) } else { + import std.concurrency; // 50 is just an arbitrary number for now setMaxMailboxSize(thisTid, 50, OnCrowding.block); auto tid = spawn(&_spawnAsync!(Conn, Char, Terminator)); @@ -1711,6 +1729,8 @@ auto byLineAsync(Conn = AutoProtocol, Terminator = char, Char = char) // Range that reads one chunk at a time asynchronously. private static struct AsyncChunkInputRange { + import std.concurrency : Tid, send; + private ubyte[] chunk; mixin WorkerThreadProtocol!(ubyte, chunk); @@ -1806,6 +1826,7 @@ if (isCurlConn!(Conn)) } else { + import std.concurrency; // 50 is just an arbitrary number for now setMaxMailboxSize(thisTid, 50, OnCrowding.block); auto tid = spawn(&_spawnAsync!(Conn, ubyte)); @@ -1861,6 +1882,9 @@ if (isCurlConn!(Conn)) private void _asyncDuplicateConnection(Conn, PostData) (const(char)[] url, Conn conn, PostData postData, Tid tid) { + import std.concurrency : send; + import std.exception : enforce; + // no move semantic available in std.concurrency ie. must use casting. auto connDup = conn.dup(); connDup.url = url; @@ -1909,6 +1933,9 @@ private void _asyncDuplicateConnection(Conn, PostData) */ private mixin template Protocol() { + import etc.c.curl : CurlReadFunc; + import core.time : Duration; + import std.socket : InternetAddress; /// Value to return from $(D onSend)/$(D onReceive) delegates in order to /// pause a request @@ -2263,6 +2290,7 @@ decodeString(Char = char)(const(ubyte)[] data, EncodingScheme scheme, size_t maxChars = size_t.max) { + import std.encoding : INVALID_SEQUENCE; Char[] res; immutable startLen = data.length; size_t charsDecoded = 0; @@ -2303,6 +2331,8 @@ private bool decodeLineInto(Terminator, Char = char)(ref const(ubyte)[] basesrc, Terminator terminator) { import std.algorithm.searching : endsWith; + import std.encoding : INVALID_SEQUENCE; + import std.exception : enforce; // if there is anything in the basesrc then try to decode that // first. @@ -2399,6 +2429,8 @@ struct HTTP mixin Protocol; import std.datetime.systime : SysTime; + import std.typecons : RefCounted; + import etc.c.curl; /// Authentication method equal to $(REF CurlAuth, etc,c,curl) alias AuthMethod = CurlAuth; @@ -2495,6 +2527,7 @@ struct HTTP } private RefCounted!Impl p; + import etc.c.curl : CurlTimeCond; /** Time condition enumeration as an alias of $(REF CurlTimeCond, etc,c,curl) @@ -2867,6 +2900,7 @@ struct HTTP void addRequestHeader(const(char)[] name, const(char)[] value) { import std.format : format; + import std.internal.cstring : tempCString; import std.uni : icmp; if (icmp(name, "User-Agent") == 0) @@ -3270,6 +3304,7 @@ struct HTTP @system unittest // charset/Charset/CHARSET/... { import std.meta : AliasSeq; + import etc.c.curl; foreach (c; AliasSeq!("charset", "Charset", "CHARSET", "CharSet", "charSet", "ChArSeT", "cHaRsEt")) @@ -3316,6 +3351,9 @@ struct FTP mixin Protocol; + import std.typecons : RefCounted; + import etc.c.curl; + private struct Impl { ~this() @@ -3601,6 +3639,7 @@ struct FTP */ void addCommand(const(char)[] command) { + import std.internal.cstring : tempCString; p.commands = Curl.curl.slist_append(p.commands, command.tempCString().buffPtr); p.curl.set(CurlOption.postquote, p.commands); @@ -3714,6 +3753,8 @@ struct FTP struct SMTP { mixin Protocol; + import std.typecons : RefCounted; + import etc.c.curl; private struct Impl { @@ -3799,6 +3840,7 @@ struct SMTP @property void url(const(char)[] url) { import std.algorithm.searching : startsWith; + import std.exception : enforce; import std.uni : toLower; auto lowered = url.toLower(); @@ -4102,12 +4144,12 @@ class HTTPStatusException : CurlException /// Equal to $(REF CURLcode, etc,c,curl) alias CurlCode = CURLcode; -import std.typecons : Flag, Yes, No; /// Flag to specify whether or not an exception is thrown on error. alias ThrowOnError = Flag!"throwOnError"; private struct CurlAPI { + import etc.c.curl; static struct API { extern(C): @@ -4138,6 +4180,8 @@ private struct CurlAPI static void* loadAPI() { + import std.exception : enforce; + version (Posix) { import core.sys.posix.dlfcn : dlsym, dlopen, dlclose, RTLD_LAZY; @@ -4238,6 +4282,8 @@ private struct CurlAPI */ struct Curl { + import etc.c.curl; + alias OutData = void[]; alias InData = ubyte[]; private bool _stopped; @@ -4264,6 +4310,7 @@ struct Curl */ void initialize() { + import std.exception : enforce; enforce!CurlException(!handle, "Curl instance already initialized"); handle = curl.easy_init(); enforce!CurlException(handle, "Curl instance couldn't be initialized"); @@ -4287,6 +4334,7 @@ struct Curl */ Curl dup() { + import std.meta : AliasSeq; Curl copy; copy.handle = curl.easy_duphandle(handle); copy._stopped = false; @@ -4337,6 +4385,7 @@ struct Curl private void _check(CurlCode code) { + import std.exception : enforce; enforce!CurlTimeoutException(code != CurlError.operation_timedout, errorString(code)); @@ -4356,6 +4405,7 @@ struct Curl private void throwOnStopped(string message = null) { + import std.exception : enforce; auto def = "Curl instance called after being cleaned up"; enforce!CurlException(!stopped, message == null ? def : message); @@ -4392,6 +4442,7 @@ struct Curl */ void set(CurlOption option, const(char)[] value) { + import std.internal.cstring : tempCString; throwOnStopped(); _check(curl.easy_setopt(this.handle, option, value.tempCString().buffPtr)); } @@ -4824,6 +4875,7 @@ private struct Pool(Data) @safe Data pop() { + import std.exception : enforce; enforce!Exception(root != null, "pop() called on empty pool"); auto d = root.data; auto n = root.next; @@ -4841,6 +4893,8 @@ private static size_t _receiveAsyncChunks(ubyte[] data, ref ubyte[] outdata, ref ubyte[] buffer, Tid fromTid, ref bool aborted) { + import std.concurrency : receive, send, thisTid; + immutable datalen = data.length; // Copy data to fill active buffer @@ -4888,6 +4942,7 @@ private static size_t _receiveAsyncChunks(ubyte[] data, ref ubyte[] outdata, private static void _finalizeAsyncChunks(ubyte[] outdata, ref ubyte[] buffer, Tid fromTid) { + import std.concurrency : send, thisTid; if (!outdata.empty) { // Resize the last buffer @@ -4906,7 +4961,10 @@ private static size_t _receiveAsyncLines(Terminator, Unit) ref Pool!(Unit[]) freeBuffers, ref Unit[] buffer, Tid fromTid, ref bool aborted) { + import std.concurrency : prioritySend, receive, send, thisTid; + import std.exception : enforce; import std.format : format; + import std.traits : isArray; immutable datalen = data.length; @@ -4996,6 +5054,7 @@ private static size_t _receiveAsyncLines(Terminator, Unit) private static void _finalizeAsyncLines(Unit)(bool bufferValid, Unit[] buffer, Tid fromTid) { + import std.concurrency : send, thisTid; if (bufferValid && buffer.length != 0) fromTid.send(thisTid, curlMessage(cast(immutable(Unit)[])buffer[0..$])); } @@ -5007,6 +5066,8 @@ void _finalizeAsyncLines(Unit)(bool bufferValid, Unit[] buffer, Tid fromTid) // output (e.g. AsyncHTTPLineOutputRange). private static void _spawnAsync(Conn, Unit, Terminator = void)() { + import std.concurrency : prioritySend, receiveOnly, send, thisTid; + import etc.c.curl : CURL, CurlError; Tid fromTid = receiveOnly!Tid(); // Get buffer to read into