diff --git a/Project.toml b/Project.toml index a6c168d..1d4347c 100644 --- a/Project.toml +++ b/Project.toml @@ -1,26 +1,24 @@ name = "AWSCRT" uuid = "df31ea59-17a4-4ebd-9d69-4f45266dc2c7" -version = "0.2.0" +version = "0.3.0" [deps] -AWSCRT_jll = "01db5350-6ea1-5d9a-9a47-8a31a394cb9c" -CEnum = "fa961155-64e5-5f13-b03f-caf6b980ea82" CountDownLatches = "621fb831-fdad-4fff-93ac-1af7b7ed19e3" -ForeignCallbacks = "809b5ff2-8730-47bb-8e19-67299d747c44" JSON = "682c06a0-de6a-54ab-a142-c8b1cf79cde6" -LibAWSCRT = "df7458b6-5204-493f-a0e7-404b4eb72fac" +LibAwsCommon = "c6e421ba-b5f8-4792-a1c4-42948de3ed9d" +LibAwsIO = "a5388770-19df-4151-b103-3d71de896ddf" +LibAwsMqtt = "dbf63f58-971e-4a9b-b153-820e5f7f543b" [compat] -AWSCRT_jll = "=0.1.2" AWS = "1.78" Aqua = "0.8.4" -CEnum = "0.4" CountDownLatches = "2" Dates = "1" Documenter = "1" -ForeignCallbacks = "0.1.1" JSON = "0.21" -LibAWSCRT = "=0.1.0" +LibAwsCommon = "1" +LibAwsIO = "1" +LibAwsMqtt = "1" Random = "1" Test = "1" julia = "1.9" diff --git a/src/AWSCRT.jl b/src/AWSCRT.jl index ef78ef6..1c0dc58 100644 --- a/src/AWSCRT.jl +++ b/src/AWSCRT.jl @@ -3,14 +3,14 @@ Environment variables: - `AWS_CRT_MEMORY_TRACING`: Set to `0`, `1`, or `2` to enable memory tracing. Default is off. See `aws_mem_trace_level`. - `AWS_CRT_MEMORY_TRACING_FRAMES_PER_STACK`: Set the number of frames per stack for memory tracing. Default is the AWS library's default. - - `AWS_CRT_LOG_LEVEL`: Set to `0` through `6` to enable logging. Default is off. See [`aws_log_level`](https://octogonapus.github.io/LibAWSCRT.jl/dev/#LibAWSCRT.aws_log_level). + - `AWS_CRT_LOG_LEVEL`: Set to `0` through `6` to enable logging. Default is off. See [`aws_log_level`](https://juliaservices.github.io/LibAwsCommon.jl/dev/#LibAwsCommon.aws_log_level). - `AWS_CRT_LOG_PATH`: Set to the log file path. Must be set if `AWS_CRT_LOG_LEVEL` is set. Note: all the symbols in this package that begin with underscores are private and are not part of this package's published interface. Please don't use them. """ module AWSCRT -using LibAWSCRT, ForeignCallbacks, CountDownLatches, CEnum, JSON +using CountDownLatches, JSON, LibAwsCommon, LibAwsIO, LibAwsMqtt import Base: lock, unlock export lock, unlock @@ -21,7 +21,7 @@ const _C_ON_ANY_MESSAGE_IDS_LOCK = ReentrantLock() const _C_ON_ANY_MESSAGE_IDS = IdDict{Any,Any}() # set during __init__ -const _LIBPTR = Ref{Ptr{Cvoid}}(Ptr{Cvoid}(0)) +const _LIB_COMMON_PTR = Ref{Ptr{Cvoid}}(Ptr{Cvoid}(0)) const _AWSCRT_ALLOCATOR = Ref{Union{Ptr{aws_allocator},Nothing}}(nothing) # cfunctions set during __init__ @@ -104,7 +104,7 @@ export publish_current_state export wait_until_synced function __init__() - _LIBPTR[] = Libc.Libdl.dlopen(LibAWSCRT.libawscrt) + _LIB_COMMON_PTR[] = Libc.Libdl.dlopen(LibAwsCommon.libaws_c_common) _C_ON_CONNECTION_INTERRUPTED[] = @cfunction(_c_on_connection_interrupted, Cvoid, (Ptr{aws_mqtt_client_connection}, Cint, Ptr{Cvoid})) diff --git a/src/AWSIO.jl b/src/AWSIO.jl index ec0b9a3..c8864e7 100644 --- a/src/AWSIO.jl +++ b/src/AWSIO.jl @@ -303,23 +303,22 @@ Arguments: - `options (TLSContextOptions)`: Configuration options. """ function ClientTLSContext(options::TLSContextOptions) - tls_ctx_opt = Ref(aws_tls_ctx_options(ntuple(_ -> UInt8(0), 200))) - # tls_ctx_opt = Ref( - # aws_tls_ctx_options( - # C_NULL, - # options.min_tls_version, - # aws_tls_cipher_pref(0), - # aws_byte_buf(0, C_NULL, 0, C_NULL), - # C_NULL, - # C_NULL, - # aws_byte_buf(0, C_NULL, 0, C_NULL), - # aws_byte_buf(0, C_NULL, 0, C_NULL), - # 0, - # options.verify_peer, - # C_NULL, - # C_NULL, - # ), - # ) + tls_ctx_opt = Ref( + aws_tls_ctx_options( + C_NULL, + options.min_tls_version, + aws_tls_cipher_pref(0), + aws_byte_buf(0, C_NULL, 0, C_NULL), + C_NULL, + C_NULL, + aws_byte_buf(0, C_NULL, 0, C_NULL), + aws_byte_buf(0, C_NULL, 0, C_NULL), + 0, + options.verify_peer, + C_NULL, + C_NULL, + ), + ) GC.@preserve tls_ctx_opt begin # TODO pkcs11 # TODO pkcs12 diff --git a/src/AWSMQTT.jl b/src/AWSMQTT.jl index 08c35a2..7aa36b6 100644 --- a/src/AWSMQTT.jl +++ b/src/AWSMQTT.jl @@ -1138,8 +1138,8 @@ function resubscribe_existing_topics(connection::MQTTConnection) out_ch = Channel(1) ud = _OnResubcribeCompleteUD( connection.events, - Libc.Libdl.dlsym(_LIBPTR[], :aws_array_list_length), - Libc.Libdl.dlsym(_LIBPTR[], :aws_array_list_get_at), + Libc.Libdl.dlsym(_LIB_COMMON_PTR[], :aws_array_list_length), + Libc.Libdl.dlsym(_LIB_COMMON_PTR[], :aws_array_list_get_at), (msg) -> put!(out_ch, msg), ) udp = Base.pointer_from_objref(ud) diff --git a/test/debug.jl b/test/debug.jl deleted file mode 100644 index 42cdb67..0000000 --- a/test/debug.jl +++ /dev/null @@ -1,293 +0,0 @@ -using TestEnv -TestEnv.activate() - -using Test, AWSCRT, LibAWSCRT, JSON, CountDownLatches, Random, Documenter, Aqua, Dates -parallel = true - -include("util.jl") - -##################################################################################################### - -Base.errormonitor(Threads.@spawn begin - while true - GC.gc(true) - sleep(1) - end -end) - -include("shadow_framework_integ_test.jl") - -##################################################################################################### - -# topic1 = "test-topic-$(Random.randstring(6))" -# payload1 = Random.randstring(48) -# payload2 = Random.randstring(48) -# client_id1 = random_client_id() -# @show topic1 payload1 client_id1 - -# client = MQTTClient(new_tls_ctx()) -# connection = MQTTConnection(client) - -# any_msg = Channel(1) -# sub_msg = Channel(1) - -# # on_message( -# # connection, -# # (topic::String, payload::String, dup::Bool, qos::aws_mqtt_qos, retain::Bool) -> begin -# # put!(any_msg, (; topic, payload, qos, retain)) -# # end, -# # ) - -# task = connect( -# connection, -# ENV["ENDPOINT"], -# 8883, -# client_id1; -# will = Will(topic1, AWS_MQTT_QOS_AT_LEAST_ONCE, "The client has gone offline!", false), -# on_connection_interrupted = (conn, error_code) -> begin -# @warn "connection interrupted" error_code -# end, -# on_connection_resumed = (conn, return_code, session_present) -> begin -# @info "connection resumed" return_code session_present -# end, -# ) -# @test fetch(task) == Dict(:session_present => false) - -# task, id = subscribe( -# connection, -# topic1, -# AWS_MQTT_QOS_AT_LEAST_ONCE, -# (topic::String, payload::String, dup::Bool, qos::aws_mqtt_qos, retain::Bool) -> begin -# @info "subscribe callback" topic payload dup qos retain -# end, -# ) -# d = fetch(task) - -# task, id = publish(connection, topic1, payload1, AWS_MQTT_QOS_AT_LEAST_ONCE) -# task, id = publish(connection, topic1, payload1, AWS_MQTT_QOS_AT_LEAST_ONCE) - -# sleep(12) - -# # Subscribe, publish a message, and test we get it on both callbacks -# task, id = subscribe( -# connection, -# topic1, -# AWS_MQTT_QOS_AT_LEAST_ONCE, -# (topic::String, payload::String, dup::Bool, qos::aws_mqtt_qos, retain::Bool) -> begin -# put!(sub_msg, (; topic, payload, qos, retain)) -# end, -# ) -# d = fetch(task) -# @test d[:packet_id] == id -# @test d[:topic] == topic1 -# @test d[:qos] == AWS_MQTT_QOS_AT_LEAST_ONCE - -# task, id = resubscribe_existing_topics(connection) -# d = fetch(task) -# @test d[:packet_id] == id -# @test d[:topics] == [(topic1, AWS_MQTT_QOS_AT_LEAST_ONCE)] - -# for _ = 1:10 -# payload = Random.randstring(48) -# @show payload -# task, id = publish(connection, topic1, payload, AWS_MQTT_QOS_AT_LEAST_ONCE) -# @test fetch(task) == Dict(:packet_id => id) - -# msg = take!(sub_msg) -# @show msg -# @test msg.topic == topic1 -# @test msg.payload == payload -# @test msg.qos == AWS_MQTT_QOS_AT_LEAST_ONCE -# @test !msg.retain - -# msg = take!(any_msg) -# @show msg -# @test msg.topic == topic1 -# @test msg.payload == payload -# @test msg.qos == AWS_MQTT_QOS_AT_LEAST_ONCE -# @test !msg.retain -# end - -# @show task, id = publish( -# connection, -# "\$aws/things/AWSCRT_Test1/shadow/name/shadow-abc123/update", -# json(Dict("state" => Dict("desired" => Dict("foo" => 2)))), -# AWS_MQTT_QOS_AT_LEAST_ONCE, -# ) -# @show fetch(task) - -# shadow_name = Random.randstring(6) -# @show task, id = subscribe( -# connection, -# "\$aws/things/AWSCRT_Test1/shadow/name/shadow-$shadow_name/get/rejected", -# AWS_MQTT_QOS_AT_LEAST_ONCE, -# (topic::String, payload::String, dup::Bool, qos::aws_mqtt_qos, retain::Bool) -> begin -# @info "get rejected callback" topic payload dup qos retain - -# task, id = publish( -# connection, -# "\$aws/things/AWSCRT_Test1/shadow/name/shadow-$shadow_name/update", -# json(Dict("state" => Dict("desired" => Dict("foo" => 2)))), -# AWS_MQTT_QOS_AT_LEAST_ONCE, -# ) -# @info "get rejected callback" task id fetch(task) -# end, -# ) -# @show d = fetch(task) - -# @show task, id = publish( -# connection, -# "\$aws/things/AWSCRT_Test1/shadow/name/shadow-$shadow_name/get", -# json(Dict()), -# AWS_MQTT_QOS_AT_LEAST_ONCE, -# ) -# @show fetch(task) - -# sleep(3) - -""" -summary - -with FCBs: -1. publish from julia thread -2. subscribe callback runs on CRT event loop thread, puts msg in FCBs queue, and returns -3. julia thread waiting on FCBs queue is woken, publishes another msg, and waits again (i.e. fetch()) -4. CRT event loop thread is not blocked - -without FCBs: -1. publish from julia thread -2. subscribe callback runs on CRT event loop thread, publishes another msg, and waits (i.e. fetch()) -3. CRT event loop thread is now deadlocked waiting for the subscribe callback to run again, but the event loop is blocked due to that wait - -solution: ??? -need to get off the CRT event loop thread somehow to unblock the event loop -am i just arriving back at FCBs? -Valentin said FCBs were not necessary as of v1.9, and strictly speaking they aren't because the code "works" but we have this deadlock now -I think it will be too much burden on the user to always have fast nonblocking callbacks and we need to keep the code running on the event loop 100% inside this package, therefore we need a solution like FCBs - -add comments to CRT callbacks that note they block the event loop -""" - -##################################################################################################### - -# struct OOBShadowClient -# shadow_client::ShadowClient -# msgs::Vector{Any} -# shadow_callback::Function - -# function OOBShadowClient(oob_connection, THING1_NAME, shadow_name) -# msgs = Any[] -# function shadow_callback( -# shadow_client::ShadowClient, -# topic::String, -# payload::String, -# dup::Bool, -# qos::aws_mqtt_qos, -# retain::Bool, -# ) -# push!(msgs, (; shadow_client, topic, payload, dup, qos, retain)) -# end -# return new(ShadowClient(oob_connection, THING1_NAME, shadow_name), msgs, shadow_callback) -# end -# end - -# function subscribe_for_single_shadow_msg(oobsc, topic, payload) -# tasks_and_ids = subscribe(oobsc.shadow_client, AWS_MQTT_QOS_AT_LEAST_ONCE, oobsc.shadow_callback) -# for (task, id) in tasks_and_ids -# fetch(task) -# end -# fetch(publish(oobsc.shadow_client, topic, payload, AWS_MQTT_QOS_AT_LEAST_ONCE)[1]) -# wait_for(() -> !isempty(oobsc.msgs)) -# fetch(unsubscribe(oobsc.shadow_client)[1]) -# end - -# function test_get_accepted_payload_equals_shadow_doc(payload, doc) -# @info "test_get_accepted_payload_equals_shadow_doc" payload doc -# p = JSON.parse(payload) -# for k in keys(doc) -# if k != "version" -# @test p["state"]["reported"][k] == doc[k] -# end -# end -# @test p["version"] == doc["version"] -# end - -# function maybe_get(d, keys...) -# return if length(keys) == 1 -# get(d, keys[1], nothing) -# else -# if haskey(d, keys[1]) -# maybe_get(d[keys[1]], keys[2:end]...) -# else -# nothing -# end -# end -# end - -# mutable struct ShadowDocMissingProperty -# foo::Int -# version::Int -# end - -# connection = new_mqtt_connection() -# shadow_name = random_shadow_name() -# doc = Dict("foo" => 1) - -# values_foo = [] -# foo_cb = x -> push!(values_foo, x) - -# values_pre_update = [] -# pre_update_cb = x -> push!(values_pre_update, x) - -# values_post_update = [] -# latch_post_update = Ref(CountDownLatch(1)) -# post_update_cb = x -> begin -# push!(values_post_update, x) -# count_down(latch_post_update[]) -# end - -# sf = ShadowFramework( -# connection, -# THING1_NAME, -# shadow_name, -# doc; -# shadow_document_pre_update_callback = pre_update_cb, -# shadow_document_post_update_callback = post_update_cb, -# shadow_document_property_callbacks = Dict{String,Function}("foo" => foo_cb), -# ) -# sc = shadow_client(sf) - -# msgs = [] -# function shadow_callback( -# shadow_client::ShadowClient, -# topic::String, -# payload::String, -# dup::Bool, -# qos::aws_mqtt_qos, -# retain::Bool, -# ) -# push!(msgs, (; shadow_client, topic, payload, dup, qos, retain)) -# end - -# oobc = new_mqtt_connection() -# oobsc = OOBShadowClient(oobc, THING1_NAME, shadow_name) - -# fetch(subscribe(sf)[1]) -# sleep(3) - -# @info "publishing first update" -# @show publish( -# oobsc.shadow_client, -# "/update", -# json(Dict("state" => Dict("desired" => Dict("foo" => 2)))), -# AWS_MQTT_QOS_AT_LEAST_ONCE, -# ) -# sleep(3) -# # wait_for(() -> !isempty(values_post_update)) -# # @test doc["foo"] == 2 - -# # @info "unsubscribing" -# # for (task, id) in unsubscribe(sf) -# # fetch(task) -# # end -# # @info "done unsubscribing" diff --git a/test/debug_preserve.jl b/test/debug_preserve.jl deleted file mode 100644 index bfd891c..0000000 --- a/test/debug_preserve.jl +++ /dev/null @@ -1,27 +0,0 @@ -mutable struct Obj - x::Float32 -end - -l = ReentrantLock() -refs = Base.IdDict() - -function foo() - x = Obj(1234567.0f32) - p = Base.pointer_from_objref(x) - lock(l) do - @time refs[x] = nothing - @time refs[p] = nothing - end - @show refs - return p -end - -function bar(p) - GC.gc(true) - @time haskey(refs, p) - x = Base.unsafe_pointer_to_objref(p)::Obj - @time haskey(refs, x) - @show x -end - -bar(foo()) diff --git a/test/debug_preserve_task.jl b/test/debug_preserve_task.jl deleted file mode 100644 index 7811337..0000000 --- a/test/debug_preserve_task.jl +++ /dev/null @@ -1,20 +0,0 @@ -mutable struct Obj - x::Float32 -end - -function foo() - x = Ref(Obj(1234567.0f32)) - p = Base.pointer_from_objref(x) - Threads.@spawn begin - GC.gc(true) - GC.@preserve x begin - GC.gc(true) - y = Base.unsafe_pointer_to_objref(p)::Ref{Obj} - @show y - end - end -end - -t = foo() -GC.gc(true) -wait(t) diff --git a/test/testheader.jl b/test/testheader.jl index 9209b2a..0b690f5 100644 --- a/test/testheader.jl +++ b/test/testheader.jl @@ -13,7 +13,7 @@ end ENV["JULIA_DEBUG"] = "AWSCRT" -using Test, AWSCRT, LibAWSCRT, JSON, CountDownLatches, Random, Documenter, Aqua, Dates, AWS +using Test, AWSCRT, JSON, CountDownLatches, Random, Documenter, Aqua, Dates, AWS, LibAwsMqtt @service DynamoDB include("util.jl") diff --git a/test_sysimage/src/Foo.jl b/test_sysimage/src/Foo.jl index 6a410ff..3f2935f 100644 --- a/test_sysimage/src/Foo.jl +++ b/test_sysimage/src/Foo.jl @@ -1,7 +1,6 @@ module Foo using AWSCRT -using AWSCRT.LibAWSCRT using Random using Test diff --git a/test_sysimage/sysimage/precompile_list.jl b/test_sysimage/sysimage/precompile_list.jl index 510361c..259a8b9 100644 --- a/test_sysimage/sysimage/precompile_list.jl +++ b/test_sysimage/sysimage/precompile_list.jl @@ -8,7 +8,7 @@ precompile(Tuple{typeof(Base.merge_types), Tuple{Vararg{Symbol}}, Type{var"#s78" precompile(Tuple{Core.Compiler.var"#273#274", Any, Any}) precompile(Tuple{typeof(Base.uv_asynccb), Ptr{Nothing}}) precompile(Tuple{typeof(AWSCRT.__init__)}) -precompile(Tuple{typeof(AWSCRT.on_connection_complete), Ptr{LibAWSCRT.aws_mqtt_client_connection}, Int32, Int32, UInt8, Ptr{Nothing}}) +precompile(Tuple{typeof(AWSCRT.on_connection_complete), Ptr{LibAwsCommon.aws_mqtt_client_connection}, Int32, Int32, UInt8, Ptr{Nothing}}) # including the below precompiles causes the segfault precompile(Tuple{typeof(Foo.start)})