39 template<
typename ID,
typename RECORD>
46 using queue_type = std::vector<Entry>;
47 using iterator =
typename queue_type::iterator;
51 int write_queue_size()
const {
return m_write_queue.size(); }
52 bool read_queue_empty()
const {
return m_read_queue.empty(); }
53 int read_queue_size()
const {
return m_read_queue.size(); }
56 void push(
ID id, RECORD
stat) { m_write_queue.push_back({ id,
stat }); }
58 RECORD& write_record(
int pos) {
return m_write_queue[pos].record; }
61 int swap_queues() { m_read_queue.clear(); m_write_queue.swap(m_read_queue);
return read_queue_size(); }
62 const queue_type& read_queue()
const {
return m_read_queue; }
63 iterator begin()
const {
return m_read_queue.begin(); }
64 iterator end()
const {
return m_read_queue.end(); }
67 void shrink_read_queue() { m_read_queue.clear(); m_read_queue.shrink_to_fit(); }
70 queue_type m_write_queue, m_read_queue;
74 std::string m_filename;
75 unsigned int m_last_queue_swap_u1 = 0xffffffff;
76 int m_last_write_queue_pos = -1;
77 DirState *m_dir_state =
nullptr;
81 m_last_queue_swap_u1 = 0xffffffff;
82 m_last_write_queue_pos = -1;
83 m_dir_state =
nullptr;
86 std::vector<AccessToken> m_access_tokens;
87 std::vector<int> m_access_tokens_free_slots;
100 long long m_size_in_st_blocks;
104 Queue<int, OpenRecord> m_file_open_q;
105 Queue<int, Stats> m_file_update_stats_q;
106 Queue<int, CloseRecord> m_file_close_q;
107 Queue<DirState*, PurgeRecord> m_file_purge_q1;
108 Queue<std::string, PurgeRecord> m_file_purge_q2;
109 Queue<std::string, long long> m_file_purge_q3;
112 long long m_current_usage_in_st_blocks = 0;
114 XrdSysMutex m_queue_mutex;
115 unsigned int m_queue_swap_u1 = 0u;
117 DataFsState &m_fs_state;
124 const std::string &f_lfn;
125 XrdSysCondVar &f_cond;
129 XrdSysMutex m_dir_scan_mutex;
130 std::list<LfnCondRecord> m_dir_scan_open_requests;
131 int m_dir_scan_check_counter = 0;
132 bool m_dir_scan_in_progress =
true;
134 void process_inter_dir_scan_open_requests(FsTraversal &fst);
135 void cross_check_or_process_oob_lfn(
const std::string &lfn, FsTraversal &fst);
136 long long get_file_usage_bytes_to_remove(
const DataFsPurgeshot &ps,
long long previous_file_usage,
int logLeve);
153 if ( ! m_access_tokens_free_slots.empty()) {
154 token_id = m_access_tokens_free_slots.back();
155 m_access_tokens_free_slots.pop_back();
156 m_access_tokens[token_id].m_filename = filename;
157 m_access_tokens[token_id].m_last_queue_swap_u1 = m_queue_swap_u1 - 1;
159 token_id = (int) m_access_tokens.size();
160 m_access_tokens.push_back({filename, m_queue_swap_u1 - 1});
163 m_file_open_q.push(token_id, {open_timestamp, existing_file});
169 AccessToken &at =
token(token_id);
171 if (at.m_last_queue_swap_u1 != m_queue_swap_u1) {
172 m_file_update_stats_q.push(token_id, stats);
173 at.m_last_queue_swap_u1 = m_queue_swap_u1;
174 at.m_last_write_queue_pos = m_file_update_stats_q.write_queue_size() - 1;
176 Stats &existing_stats = m_file_update_stats_q.write_record(at.m_last_write_queue_pos);
177 existing_stats.
AddUp(stats);
187 m_file_close_q.push(token_id, {close_timestamp, full_stats});
194 m_file_purge_q1.push(target, {size_in_st_blocks, 1});
198 m_file_purge_q1.push(target, {size_in_st_blocks, n_files});
202 m_file_purge_q2.push(target, {size_in_st_blocks, n_files});
206 m_file_purge_q3.push(filename, size_in_st_blocks);
217 AccessToken&
token(
int i) {
return m_access_tokens[i]; }
229 std::vector<DirStateElement> &vec,
234 std::vector<DirPurgeElement> &vec,