XRootD
Loading...
Searching...
No Matches
XrdPfcResourceMonitor.hh
Go to the documentation of this file.
1#ifndef __XRDPFC_RESOURCEMONITOR_HH__
2#define __XRDPFC_RESOURCEMONITOR_HH__
3
4#include "XrdPfcStats.hh"
5
7
8#include <string>
9#include <vector>
10#include <list>
11
12class XrdOss;
13
14namespace XrdPfc {
15
16struct DataFsState;
17struct DirState;
18struct DirStateElement;
19struct DataFsSnapshot;
20struct DirPurgeElement;
21struct DataFsPurgeshot;
22class FsTraversal;
23
24//==============================================================================
25// ResourceMonitor
26//==============================================================================
27
28// Encapsulates local variables used withing the previous mega-function Purge().
29//
30// This will be used within the continuously/periodically ran heart-beat / breath
31// function ... and then parts of it will be passed to invoked FS scan and purge
32// jobs (which will be controlled throught this as well).
33
34// Andy: XRDADMINPATH Is the directory for administrative files (i.e. all.adminpath)
35// Also: XrdOucEnv::Export("XRDLOGDIR", logParms.logfn); (in XrdOucLogging::configLog)
36
38{
39 template<typename ID, typename RECORD>
40 class Queue {
41 public:
42 struct Entry {
44 RECORD record;
45 };
46 using queue_type = std::vector<Entry>;
47 using iterator = typename queue_type::iterator;
48
49 Queue() = default;
50
51 int write_queue_size() const { return m_write_queue.size(); }
52 bool read_queue_empty() const { return m_read_queue.empty(); }
53 int read_queue_size() const { return m_read_queue.size(); }
54
55 // Writer / producer access
56 void push(ID id, RECORD stat) { m_write_queue.push_back({ id, stat }); }
57 // Existing entry access for updating Stats
58 RECORD& write_record(int pos) { return m_write_queue[pos].record; }
59
60 // Reader / consumer access
61 int swap_queues() { m_read_queue.clear(); m_write_queue.swap(m_read_queue); return read_queue_size(); }
62 const queue_type& read_queue() const { return m_read_queue; }
63 iterator begin() const { return m_read_queue.begin(); }
64 iterator end() const { return m_read_queue.end(); }
65
66 // Shrinkage of overgrown queues
67 void shrink_read_queue() { m_read_queue.clear(); m_read_queue.shrink_to_fit(); }
68
69 private:
70 queue_type m_write_queue, m_read_queue;
71 };
72
73 struct AccessToken {
74 std::string m_filename;
75 unsigned int m_last_queue_swap_u1 = 0xffffffff;
76 int m_last_write_queue_pos = -1;
77 DirState *m_dir_state = nullptr;
78
79 void clear() {
80 m_filename.clear();
81 m_last_queue_swap_u1 = 0xffffffff;
82 m_last_write_queue_pos = -1;
83 m_dir_state = nullptr;
84 }
85 };
86 std::vector<AccessToken> m_access_tokens;
87 std::vector<int> m_access_tokens_free_slots;
88
89 struct OpenRecord {
90 time_t m_open_time;
91 bool m_existing_file;
92 };
93
94 struct CloseRecord {
95 time_t m_close_time;
96 Stats m_full_stats;
97 };
98
99 struct PurgeRecord {
100 long long m_size_in_st_blocks;
101 int m_n_files;
102 };
103
104 Queue<int, OpenRecord> m_file_open_q;
105 Queue<int, Stats> m_file_update_stats_q;
106 Queue<int, CloseRecord> m_file_close_q;
107 Queue<DirState*, PurgeRecord> m_file_purge_q1;
108 Queue<std::string, PurgeRecord> m_file_purge_q2;
109 Queue<std::string, long long> m_file_purge_q3;
110 // DirPurge queue -- not needed? But we do need last-change timestamp in DirState.
111
112 long long m_current_usage_in_st_blocks = 0; // aggregate disk usage by files
113
114 XrdSysMutex m_queue_mutex; // mutex shared between queues
115 unsigned int m_queue_swap_u1 = 0u; // identifier of current swap cycle
116
117 DataFsState &m_fs_state;
118 XrdOss &m_oss;
119
120 // Requests for File opens during name-space scans. Such LFNs are processed
121 // with some priority
122 struct LfnCondRecord
123 {
124 const std::string &f_lfn;
125 XrdSysCondVar &f_cond;
126 bool &f_checked;
127 };
128
129 XrdSysMutex m_dir_scan_mutex;
130 std::list<LfnCondRecord> m_dir_scan_open_requests;
131 int m_dir_scan_check_counter = 0;
132 bool m_dir_scan_in_progress = true;
133
134 void process_inter_dir_scan_open_requests(FsTraversal &fst);
135 void cross_check_or_process_oob_lfn(const std::string &lfn, FsTraversal &fst);
136 long long get_file_usage_bytes_to_remove(const DataFsPurgeshot &ps, long long previous_file_usage, int logLeve);
137
138public:
139 ResourceMonitor(XrdOss& oss);
141
142 // --- Initial scan, building of DirState tree
143
144 void scan_dir_and_recurse(FsTraversal &fst);
146
147 // --- Event registration
148
149 int register_file_open(const std::string& filename, time_t open_timestamp, bool existing_file) {
150 // Simply return a token, we will resolve it in the actual processing of the queue.
151 XrdSysMutexHelper _lock(&m_queue_mutex);
152 int token_id;
153 if ( ! m_access_tokens_free_slots.empty()) {
154 token_id = m_access_tokens_free_slots.back();
155 m_access_tokens_free_slots.pop_back();
156 m_access_tokens[token_id].m_filename = filename;
157 m_access_tokens[token_id].m_last_queue_swap_u1 = m_queue_swap_u1 - 1;
158 } else {
159 token_id = (int) m_access_tokens.size();
160 m_access_tokens.push_back({filename, m_queue_swap_u1 - 1});
161 }
162
163 m_file_open_q.push(token_id, {open_timestamp, existing_file});
164 return token_id;
165 }
166
167 void register_file_update_stats(int token_id, const Stats& stats) {
168 XrdSysMutexHelper _lock(&m_queue_mutex);
169 AccessToken &at = token(token_id);
170 // Check if this is the first update within this queue swap cycle.
171 if (at.m_last_queue_swap_u1 != m_queue_swap_u1) {
172 m_file_update_stats_q.push(token_id, stats);
173 at.m_last_queue_swap_u1 = m_queue_swap_u1;
174 at.m_last_write_queue_pos = m_file_update_stats_q.write_queue_size() - 1;
175 } else {
176 Stats &existing_stats = m_file_update_stats_q.write_record(at.m_last_write_queue_pos);
177 existing_stats.AddUp(stats);
178 }
179 // Optionally, one could return "scaler" to moodify stat-reporting
180 // frequency in the file ... if it comes too often or too rarely.
181 // See also the logic for determining reporting interval (in N_bytes_read)
182 // in File::Open().
183 }
184
185 void register_file_close(int token_id, time_t close_timestamp, const Stats& full_stats) {
186 XrdSysMutexHelper _lock(&m_queue_mutex);
187 m_file_close_q.push(token_id, {close_timestamp, full_stats});
188 }
189
190 // deletions can come from purge and from direct requests (Cache::UnlinkFile), the latter
191 // also covering the emergency shutdown of a file.
192 void register_file_purge(DirState* target, long long size_in_st_blocks) {
193 XrdSysMutexHelper _lock(&m_queue_mutex);
194 m_file_purge_q1.push(target, {size_in_st_blocks, 1});
195 }
196 void register_multi_file_purge(DirState* target, long long size_in_st_blocks, int n_files) {
197 XrdSysMutexHelper _lock(&m_queue_mutex);
198 m_file_purge_q1.push(target, {size_in_st_blocks, n_files});
199 }
200 void register_multi_file_purge(const std::string& target, long long size_in_st_blocks, int n_files) {
201 XrdSysMutexHelper _lock(&m_queue_mutex);
202 m_file_purge_q2.push(target, {size_in_st_blocks, n_files});
203 }
204 void register_file_purge(const std::string& filename, long long size_in_st_blocks) {
205 XrdSysMutexHelper _lock(&m_queue_mutex);
206 m_file_purge_q3.push(filename, size_in_st_blocks);
207 }
208
209 // void register_dir_purge(DirState* target);
210 // target assumed to be empty at this point, triggered by a file_purge removing the last file in it.
211 // hmmh, this is actually tricky ... who will purge the dirs? we should now at export-to-vector time
212 // and can prune leaf directories. This might fail if a file has been created in there in the meantime, which is ok.
213 // However, is there a race condition between rmdir and creation of a new file in that dir? Ask Andy.
214
215 // --- Helpers for event processing and actions
216
217 AccessToken& token(int i) { return m_access_tokens[i]; }
218
219 // --- Actions
220
221 int process_queues();
222
223 void heart_beat();
224
225 // --- Helpers for export of DirState vector snapshot.
226
227 void fill_sshot_vec_children(const DirState &parent_ds,
228 int parent_idx,
229 std::vector<DirStateElement> &vec,
230 int max_depth);
231
232 void fill_pshot_vec_children(const DirState &parent_ds,
233 int parent_idx,
234 std::vector<DirPurgeElement> &vec,
235 int max_depth);
236
237 // Interface to other part of XCache -- note the CamelCase() notation.
238 void CrossCheckIfScanIsInProgress(const std::string &lfn, XrdSysCondVar &cond);
239
240 // main function, steers startup then enters heart_beat. does not die.
241 void init_before_main(); // called from startup thread / configuration processing
242 void main_thread_function(); // run in dedicated thread
243
245 // The following variables are set under the above lock, purge task signals to heart_beat.
248 bool m_purge_task_active {false}; // from the perspective of heart-beat, set only in heartbeat
249 bool m_purge_task_complete {false}; // from the perspective of the task, reset in heartbeat, set in task
250 // When m_purge_task_active == true, DirState entries are not removed from the tree to
251 // allow purge thread to report cleared files directly via DirState ptr.
252 // Note, DirState removal happens during stat propagation traversal.
253
254 // Purge helpers etc.
256 void perform_purge_check(bool purge_cold_files, int tl);
257
260};
261
262}
263
264#endif
#define stat(a, b)
Definition XrdPosix.hh:101
#define ID
void CrossCheckIfScanIsInProgress(const std::string &lfn, XrdSysCondVar &cond)
int register_file_open(const std::string &filename, time_t open_timestamp, bool existing_file)
void perform_purge_check(bool purge_cold_files, int tl)
void fill_sshot_vec_children(const DirState &parent_ds, int parent_idx, std::vector< DirStateElement > &vec, int max_depth)
void perform_purge_task(DataFsPurgeshot &ps)
void register_file_purge(const std::string &filename, long long size_in_st_blocks)
void scan_dir_and_recurse(FsTraversal &fst)
void register_file_purge(DirState *target, long long size_in_st_blocks)
void fill_pshot_vec_children(const DirState &parent_ds, int parent_idx, std::vector< DirPurgeElement > &vec, int max_depth)
void register_multi_file_purge(DirState *target, long long size_in_st_blocks, int n_files)
void register_file_update_stats(int token_id, const Stats &stats)
void register_multi_file_purge(const std::string &target, long long size_in_st_blocks, int n_files)
void register_file_close(int token_id, time_t close_timestamp, const Stats &full_stats)
Statistics of cache utilisation by a File object.
void AddUp(const Stats &s)
XrdPosixStats Stats