-
Notifications
You must be signed in to change notification settings - Fork 246
Expand file tree
/
Copy pathon_transfer.cpp
More file actions
130 lines (110 loc) · 3.68 KB
/
on_transfer.cpp
File metadata and controls
130 lines (110 loc) · 3.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
/*
* Copyright (c) 2022 Lucian Radu Teodorescu
*
* Licensed under the Apache License Version 2.0 with LLVM Exceptions
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://llvm.org/LICENSE.txt
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* General context:
* - server application that processes images
* - execution contexts:
* - 1 dedicated thread for network I/O
* - N worker threads used for CPU-intensive work
* - M threads for auxiliary I/O
* - optional GPU context that may be used on some types of servers
*
* Specific problem description:
* - reading data from the socket before processing the request
* - reading of the data is done on the I/O context
* - no processing of the data needs to be done on the I/O context
*
* Example goals:
* - show how one can change the execution context
* - exemplify the use of `starts_on` and `continues_on` algorithms
*/
#include <array>
#include <cstring>
#include <iostream>
#include <mutex>
#include <string_view>
// Pull in the reference implementation of P2300:
#include <exec/async_scope.hpp>
#include <stdexec/execution.hpp>
// Use a thread pool
#include "exec/static_thread_pool.hpp"
namespace ex = stdexec;
struct sync_stream
{
private:
static std::mutex s_mtx_;
public:
std::ostream& sout_;
std::unique_lock<std::mutex> lock_{s_mtx_};
template <class T>
friend auto operator<<(sync_stream&& self, T const & value) -> sync_stream&&
{
self.sout_ << value;
return std::move(self);
}
friend auto
operator<<(sync_stream&& self, std::ostream& (*manip)(std::ostream&) ) -> sync_stream&&
{
self.sout_ << manip;
return std::move(self);
}
};
std::mutex sync_stream::s_mtx_{};
auto legacy_read_from_socket(int, char* buffer, size_t buffer_len) -> size_t
{
char const fake_data[] = "Hello, world!";
size_t sz = sizeof(fake_data) - 1;
size_t count = (std::min) (sz, buffer_len);
std::memcpy(buffer, fake_data, count);
return count;
}
void process_read_data(char const * read_data, size_t read_len)
{
sync_stream{.sout_ = std::cout} << "Processing '" << std::string_view{read_data, read_len}
<< "'\n";
}
auto main() -> int
{
// Create a thread pool and get a scheduler from it
exec::static_thread_pool work_pool{8};
ex::scheduler auto work_sched = work_pool.get_scheduler();
exec::static_thread_pool io_pool{1};
ex::scheduler auto io_sched = io_pool.get_scheduler();
std::array<std::byte, 16 * 1024> buffer;
exec::async_scope scope;
// Fake a couple of requests
for (int i = 0; i < 10; i++)
{
int sock = i;
auto buf = reinterpret_cast<char*>(&buffer[0]);
// A sender that just calls the legacy read function
auto snd_read = ex::just(sock, buf, buffer.size()) | ex::then(legacy_read_from_socket);
// The entire flow
auto snd =
// start by reading data on the I/O thread
ex::starts_on(io_sched, std::move(snd_read))
// do the processing on the worker threads pool
| ex::continues_on(work_sched)
// process the incoming data (on worker threads)
| ex::then([buf](size_t read_len) { process_read_data(buf, read_len); })
// done
;
// execute the whole flow asynchronously
scope.spawn(std::move(snd));
}
(void) stdexec::sync_wait(scope.on_empty());
return 0;
}