XRootD
Loading...
Searching...
No Matches
XrdThrottleManager.hh
Go to the documentation of this file.
1
2/*
3 * XrdThrottleManager
4 *
5 * This class provides an implementation of a throttle manager.
6 * The throttled manager purposely pause if the bandwidth, IOPS
7 * rate, or number of outstanding IO requests is sustained above
8 * a certain level.
9 *
10 * The XrdThrottleManager is user-aware and provides fairshare.
11 *
12 * This works by having a separate thread periodically refilling
13 * each user's shares.
14 *
15 * Note that we do not actually keep close track of users, but rather
16 * put them into a hash. This way, we can pretend there's a constant
17 * number of users and use a lock-free algorithm.
18 */
19
20#ifndef __XrdThrottleManager_hh_
21#define __XrdThrottleManager_hh_
22
23#ifdef __GNUC__
24#define likely(x) __builtin_expect(!!(x), 1)
25#define unlikely(x) __builtin_expect(!!(x), 0)
26#else
27#define likely(x) x
28#define unlikely(x) x
29#endif
30
31#include <string>
32#include <vector>
33#include <ctime>
34#include <mutex>
35#include <unordered_map>
36#include <memory>
37
39
40class XrdSysError;
41class XrdOucTrace;
44
46{
47
48friend class XrdThrottleTimer;
49
50public:
51
52void Init();
53
54bool OpenFile(const std::string &entity, std::string &open_error_message);
55bool CloseFile(const std::string &entity);
56
57void Apply(int reqsize, int reqops, int uid);
58
59bool IsThrottling() {return (m_ops_per_second > 0) || (m_bytes_per_second > 0);}
60
61void SetThrottles(float reqbyterate, float reqoprate, int concurrency, float interval_length)
62 {m_interval_length_seconds = interval_length; m_bytes_per_second = reqbyterate;
63 m_ops_per_second = reqoprate; m_concurrency_limit = concurrency;}
64
65void SetLoadShed(std::string &hostname, unsigned port, unsigned frequency)
66 {m_loadshed_host = hostname; m_loadshed_port = port; m_loadshed_frequency = frequency;}
67
68void SetMaxOpen(unsigned long max_open) {m_max_open = max_open;}
69
70void SetMaxConns(unsigned long max_conns) {m_max_conns = max_conns;}
71
72void SetMonitor(XrdXrootdGStream *gstream) {m_gstream = gstream;}
73
74//int Stats(char *buff, int blen, int do_sync=0) {return m_pool.Stats(buff, blen, do_sync);}
75
76static
77int GetUid(const char *username);
78
80
81void PrepLoadShed(const char *opaque, std::string &lsOpaque);
82
83bool CheckLoadShed(const std::string &opaque);
84
85void PerformLoadShed(const std::string &opaque, std::string &host, unsigned &port);
86
88
89 ~XrdThrottleManager() {} // The buffmanager is never deleted
90
91protected:
92
93void StopIOTimer(struct timespec);
94
95private:
96
97void Recompute();
98
99void RecomputeInternal();
100
101static
102void * RecomputeBootstrap(void *pp);
103
104int WaitForShares();
105
106void GetShares(int &shares, int &request);
107
108void StealShares(int uid, int &reqsize, int &reqops);
109
110XrdOucTrace * m_trace;
111XrdSysError * m_log;
112
113XrdSysCondVar m_compute_var;
114
115// Controls for the various rates.
116float m_interval_length_seconds;
117float m_bytes_per_second;
118float m_ops_per_second;
119int m_concurrency_limit;
120
121// Maintain the shares
122static const
123int m_max_users;
124std::vector<int> m_primary_bytes_shares;
125std::vector<int> m_secondary_bytes_shares;
126std::vector<int> m_primary_ops_shares;
127std::vector<int> m_secondary_ops_shares;
128int m_last_round_allocation;
129
130// Active IO counter
131int m_io_active;
132struct timespec m_io_wait;
133unsigned m_io_total{0};
134// Stable IO counters - must hold m_compute_var lock when reading/writing;
135int m_stable_io_active;
136int m_stable_io_total{0}; // It would take ~3 years to overflow a 32-bit unsigned integer at 100Hz of IO operations.
137struct timespec m_stable_io_wait;
138
139// Load shed details
140std::string m_loadshed_host;
141unsigned m_loadshed_port;
142unsigned m_loadshed_frequency;
143int m_loadshed_limit_hit;
144
145// Maximum number of open files
146unsigned long m_max_open{0};
147unsigned long m_max_conns{0};
148std::unordered_map<std::string, unsigned long> m_file_counters;
149std::unordered_map<std::string, unsigned long> m_conn_counters;
150std::unordered_map<std::string, std::unique_ptr<std::unordered_map<pid_t, unsigned long>>> m_active_conns;
151std::mutex m_file_mutex;
152
153// Monitoring handle, if configured
154XrdXrootdGStream* m_gstream{nullptr};
155
156static const char *TraceID;
157
158};
159
161{
162
164
165public:
166
168{
169 struct timespec end_timer = {0, 0};
170#if defined(__linux__) || defined(__APPLE__) || defined(__GNU__) || (defined(__FreeBSD_kernel__) && defined(__GLIBC__))
171 int retval = clock_gettime(clock_id, &end_timer);
172#else
173 int retval = -1;
174#endif
175 if (likely(retval == 0))
176 {
177 end_timer.tv_sec -= m_timer.tv_sec;
178 end_timer.tv_nsec -= m_timer.tv_nsec;
179 if (end_timer.tv_nsec < 0)
180 {
181 end_timer.tv_sec--;
182 end_timer.tv_nsec += 1000000000;
183 }
184 }
185 if (m_timer.tv_nsec != -1)
186 {
187 m_manager.StopIOTimer(end_timer);
188 }
189 m_timer.tv_sec = 0;
190 m_timer.tv_nsec = -1;
191}
192
194{
195 if (!((m_timer.tv_sec == 0) && (m_timer.tv_nsec == -1)))
196 {
197 StopTimer();
198 }
199}
200
201protected:
202
204 m_manager(manager)
205{
206#if defined(__linux__) || defined(__APPLE__) || defined(__GNU__) || (defined(__FreeBSD_kernel__) && defined(__GLIBC__))
207 int retval = clock_gettime(clock_id, &m_timer);
208#else
209 int retval = -1;
210#endif
211 if (unlikely(retval == -1))
212 {
213 m_timer.tv_sec = 0;
214 m_timer.tv_nsec = 0;
215 }
216}
217
218private:
219XrdThrottleManager &m_manager;
220struct timespec m_timer;
221
222static clockid_t clock_id;
223};
224
225#endif
#define likely(x)
#define unlikely(x)
void SetThrottles(float reqbyterate, float reqoprate, int concurrency, float interval_length)
void SetMaxOpen(unsigned long max_open)
void Apply(int reqsize, int reqops, int uid)
void StopIOTimer(struct timespec)
void SetLoadShed(std::string &hostname, unsigned port, unsigned frequency)
friend class XrdThrottleTimer
void SetMonitor(XrdXrootdGStream *gstream)
void PrepLoadShed(const char *opaque, std::string &lsOpaque)
bool CheckLoadShed(const std::string &opaque)
XrdThrottleTimer StartIOTimer()
void SetMaxConns(unsigned long max_conns)
XrdThrottleManager(XrdSysError *lP, XrdOucTrace *tP)
void PerformLoadShed(const std::string &opaque, std::string &host, unsigned &port)
bool CloseFile(const std::string &entity)
bool OpenFile(const std::string &entity, std::string &open_error_message)
static int GetUid(const char *username)
XrdThrottleTimer(XrdThrottleManager &manager)
friend class XrdThrottleManager