Source code for vcstools.check_files

import subprocess
import os
import numpy as np
import re
import traceback
import logging
from vcstools.metadb_utils import getmeta, get_files
from vcstools.config import load_config_file

logger = logging.getLogger(__name__)

[docs]def check_download(obsID, directory=None, startsec=None, n_secs=None, data_type='raw'): """Checks that the number of files in directory is the same as that found on the archive and also checks that all files have the same size (253440000 for raw, 7864340480 for recombined tarballs by default). Parameters ---------- obsID : `int` The MWA Observation ID. directory : `str`, optional The directory in which to check the files. |br| Default: /astro/mwavcs/vcs/[obsID]/raw/. startsec : `int` The gps time of first file to check. n_sec : `int` The number of seconds from the startsec to check. data_type : `str` The type of data from ['raw', 'tar_ics', 'ics']. |br| Default: raw. Returns ------- error : `boolean` If `True` there are missing or incorrect files, `False` if all files are correct. """ comp_config = load_config_file() if not data_type in ['raw', 'tar_ics', 'ics']: logger.error("Wrong data type given to download check.") return True if not directory: directory = os.path.join(comp_config['base_data_dir'], str(obsID), "raw") if data_type == 'raw' else os.path.join(comp_config['base_data_dir'], str(obsID), "combined") base = "\n Checking file size and number of files for obsID {0} in {1} for ".format(obsID, directory) n_secs = n_secs if n_secs else 1 logger.info(base + "gps times {0} to {1}".format(startsec, startsec+n_secs-1) if startsec else base + "the whole time range.") # put files in try: files, suffix, required_size = get_files_and_sizes(obsID, data_type, mintime=startsec, maxtime=startsec + n_secs) except: return True if not startsec: n_files_expected = len(files) command = "ls -l %s/*%s | ((tee /dev/fd/5 | wc -l >/dev/fd/4) 5>&1 | " %(directory, suffix) + \ "awk '($5!=%s){print \"file \" $9 \" has size \" $5 \" (expected %s)\"}' >> %s/%s_all.txt) 4>&1;" %(required_size, required_size,directory, obsID) + \ "cat %s/%s_all.txt; rm -rf %s/%s_all.txt" %(directory, obsID, directory, obsID) output = subprocess.Popen([command], stdout=subprocess.PIPE, shell=True).stdout else: n_files_expected = 0 #remove stray metafits from list that causes int errors files = [ x for x in files if "metafits" not in x ] times = [int(time[11:21]) for time in files] for sec in range(startsec,startsec+n_secs): n_files_expected += times.count(sec) output = subprocess.Popen(["count=0;for sec in `seq -w %s %s `;do let count=${count}+`ls -l %s/*${sec}*%s | " %(startsec, startsec+n_secs-1, directory, suffix) + \ "((tee /dev/fd/5 | wc -l >/dev/fd/4) 5>&1 | awk '($5!=%s) " %(required_size) + \ "{print \"file \" $9 \" has size \" $5 \" (expected %s)\"}' >> %s/errors_%s.txt) 4>&1`;done;" %(required_size,directory,startsec) +\ "echo ${count}; cat %s/errors_%s.txt;rm -rf %s/errors_%s.txt" %(directory,startsec,directory,startsec)], stdout=subprocess.PIPE, shell=True).stdout output = output.readlines() files_in_dir = int(output[0].strip()) error = False # in case we're checking for downloaded tarballs also need to check ics-files. if data_type == 'tar_ics': logger.info("Now checking ICS files") error, n_ics = check_recombine_ics(directory=directory, startsec=startsec, n_secs=n_secs,#n_files_expected, obsID=obsID) n_files_expected *= 2 files_in_dir += n_ics if not files_in_dir == n_files_expected: logger.error("We have {0} files but expected {1}".format(files_in_dir, n_files_expected)) error = True for line in output[1:]: if b'file' in line: logger.error(line) error = True if not error: logger.info("We have all {0} {1} files as expected.".format(files_in_dir, data_type)) return error
[docs]def check_recombine(obsID, directory=None, required_size=327680000, required_size_ics=30720000, startsec=None, n_secs=None): """Checks that the number of files in the directory as that found on the archive and also checks that all files have the same size (327680000 by default) Parameters ---------- obsID : `int` The MWA Observation ID. directory : `str`, optional The directory in which to check the files. |br| Default: /astro/mwavcs/vcs/[obsID]/combined/. required_size : `int` The required size of the recombined files in bytes. |br| Default: 327680000. required_size_ics : `int` The required size of the ics files in bytes. |br| Default: 30720000. startsec : `int` The gps time of first file to check. n_sec : `int` The number of seconds from the startsec to check. Returns ------- error : `boolean` If `True` there are missing or incorrect files, `False` if all files are correct. """ comp_config = load_config_file() if not directory: directory = os.path.join(comp_config['base_data_dir'], str(obsID), "combined") base = "\n Checking file size and number of files for obsID {0} in {1} for ".format(obsID, directory) n_secs = n_secs if n_secs else 1 logger.info(base + "gps times {0} to {1}".format(startsec, startsec+n_secs-1) if startsec else base + "the whole time range.") required_size = required_size # we need to get the number of unique seconds from the file names files = np.array(get_files(obsID)) mask = np.array(['.dat' in file for file in files]) if not startsec: times = [time[11:21] for time in files[mask]] n_secs = len(set(times)) command = "ls -l %s/*ch*.dat | ((tee /dev/fd/5 | wc -l >/dev/fd/4) 5>&1 | " %(directory) + \ "awk '($5!=%s){print $9}' | tee >> %s/%s_all.txt | xargs rm -rf) 4>&1;" %(required_size, directory, obsID) + \ "cat %s/%s_all.txt; rm -rf %s/%s_all.txt" %(directory, obsID, directory, obsID) output = subprocess.Popen([command], stdout=subprocess.PIPE, shell=True).stdout else: output = subprocess.Popen(["count=0;for sec in `seq -w %s %s `;do let count=${count}+`ls -l %s/*${sec}*ch*.dat | " %(startsec, startsec+n_secs-1, directory) + \ "((tee /dev/fd/5 | wc -l >/dev/fd/4) 5>&1 | awk '($5!=%s) " %(required_size) + \ "{print $9}' | tee >> %s/errors_%s.txt | xargs rm -rf) 4>&1`;done;" %(directory,startsec) +\ "echo ${count}; cat %s/errors_%s.txt;rm -rf %s/errors_%s.txt" %(directory,startsec,directory,startsec)], stdout=subprocess.PIPE, shell=True).stdout output = output.readlines() files_in_dir = int(output[0].strip()) expected_files = n_secs * 25 error = False error, n_ics = check_recombine_ics(directory=directory, \ startsec=startsec, n_secs=n_secs, required_size=required_size_ics) files_in_dir += n_ics if not files_in_dir == expected_files: logger.error("We have {0} files but expected {1}".format(files_in_dir, expected_files)) error = True for line in output[1:]: if b'dat' in line: logger.warning("Deleted {0} due to wrong size.".format(line.strip())) error = True if not error: logger.info("We have all {0} files as expected.".format(files_in_dir)) return error
[docs]def check_recombine_ics(obsID=None, directory=None, startsec=None, n_secs=None, required_size=None): """Checks that the number of recombined ics files in the directory as that found on the archive. Parameters ---------- obsID : `int`, optional The MWA Observation ID. directory : `str`, optional The directory in which to check the files. |br| Default: /astro/mwavcs/vcs/[obsID]/combined/. startsec : `int` The gps time of first file to check. n_sec : `int` The number of seconds from the startsec to check. required_size : `int` The required size of the ics files in bytes. |br| Default: None. Returns ------- error : `boolean` If `True` there are missing or incorrect files, `False` if all files are correct. """ if not required_size: try: _, _, required_size = get_files_and_sizes(obsID, 'ics', mintime=startsec, maxtime=startsec + n_secs) except: traceback.print_exc() return True, 0 if not startsec: #output = subprocess.Popen(["ls -ltr %s/*ics.dat | awk '($5!=%s){print \"file \" $9 \" has size \" $5 \" (expected %s)\"}'" %(directory, required_size, required_size)], # stdout=subprocess.PIPE, shell=True).communicate()[0] command = "ls -l %s/*ics.dat | ((tee /dev/fd/5 | wc -l >/dev/fd/4) 5>&1 | " %(directory) + \ "awk '($5!=%s){print $9}' | tee >> %s/ics_all.txt | xargs rm -rf) 4>&1;" %(required_size, directory) + \ "cat %s/ics_all.txt; rm -rf %s/ics_all.txt" %(directory, directory) output = subprocess.Popen([command], stdout=subprocess.PIPE, shell=True).stdout else: n_secs = n_secs if n_secs else 1 output = subprocess.Popen(["count=0;for sec in `seq -w %s %s `;do let count=${count}+`ls -l %s/*${sec}*ics.dat | " %(startsec, startsec+n_secs-1, directory) + \ "((tee /dev/fd/5 | wc -l >/dev/fd/4) 5>&1 | awk '($5!=%s) " %(required_size) + \ "{print $9}' | tee >> %s/errors_%s.txt | xargs rm -rf) 4>&1`;done;" %(directory,startsec) +\ "echo ${count}; cat %s/errors_%s.txt;rm -rf %s/errors_%s.txt" %(directory,startsec,directory,startsec)], stdout=subprocess.PIPE, shell=True).stdout output = output.readlines() files_in_dir = int(output[0].strip()) error = False if not files_in_dir == n_secs: logger.error("We have {0} ics-files but expected {1}".format(files_in_dir, n_secs)) error = True for line in output[1:]: if b'dat' in line: error = True line = line.strip().decode() logger.error("Deleted {0} due to wrong size.".format(line)) dat_files = line.replace('_ics.dat','*.dat') rm_cmd = "rm -rf {0}".format(dat_files) logger.warning("Also running {0} to make sure ics files are rebuilt.".format(rm_cmd)) subprocess.Popen(rm_cmd, stdout=subprocess.PIPE, shell=True) if error == False: logger.info("We have all {0} ICS files as expected.".format(files_in_dir)) return error, files_in_dir
[docs]def get_files_and_sizes(obsID, mode, mintime=0, maxtime=2000000000): """Get files and sizes from the MWA metadata server and check that they're all the same size Parameters ---------- obsID : `int` The MWA observation ID. mode : `str` The typ of file from 'raw', 'tar_ics' and 'ics' mintime : `int` The minimum GPS time of observations to check (inclusive, >=). |br| Default: 0 maxtime : `int` The maximum GPS time of observations to check (exculsive, <). |br| Default: 2000000000 Returns ------- files_masked : `list` List of the files with the input mode/suffix. suffix : `str` The file suffix from ['.dat', '.tar', '_ics.dat'] depnding on the input mode. sizes[0] : `int` Size of files in bytes. """ if mode == 'raw': suffix = '[0-9].dat' # checks for a digit to make this distinct from ics elif mode == 'tar_ics': suffix = '.tar' elif mode == 'ics': suffix = '_ics.dat' else: logger.error("Wrong mode supplied. Options are raw, tar_ics, and ics") return logger.info("Retrieving file info from MWA database for all {0} files...".format(suffix)) files_meta = getmeta(service='data_files', params={'obs_id':obsID, 'nocache':1, 'mintime':mintime, 'maxtime':maxtime}) # 'nocache' is used above so we get don't use the cached metadata as that could # be out of data so we force it to get up to date values files = np.array(list(files_meta.keys())) files_masked = [] sizes = [] for f in files: #if suffix in f: if re.match(".*{}".format(suffix), f): logger.debug("f: {} suffix: {} size:{}".format(f, suffix, files_meta[f]['size'])) sizes.append(files_meta[f]['size']) files_masked.append(f) logger.info("...Done. Expect all on database to be {0} bytes in size...".format(sizes[0])) size_check = True for s in sizes: if not s == sizes[0]: size_check = False if size_check: logger.info("...yep they are. Now checking on disk.") return files_masked, suffix, sizes[0] else: logger.error("Not all files have the same size. Check your data!") logger.error("{0}".format(np.vstack((files,sizes)).T)) return