Install Required Packages

In order to run the script you need a bunch of R packages all of which are are publicly available, either on CRAN (Comprehensive R Archive Network) or on our GitHub-Account.

The script described in the following requires the package kwb.monitoring and all its dependencies. All these packages are automatically installed by running the following code. Make sure to have none of the packages to be installed already loaded.

Load kwb.monitoring

We are now ready to start the actual script. First, you need to load the package kwb.monitoring and all the packages that it depends on:

library(kwb.monitoring)

Configuration

We need to start with a configuration section that defines how the script will behave, depending on the monitoring station of which data is being analysed. We are using three letter codes to identify a monitoring station. Let’s assume that we have two monitoring stations: ALT and NEU (taken from KWB project OGRE where they represent “Altbau” and “Neubau”). At both stations at least the water level and the flow are measured.

The code section printed in the following defines different types of settings:

  • eventSettings define for each monitoring station time intervals for which to apply different thresholds of water level and flow. In the example below two time windows are defined in each case but you are free to adapt the list for more or less time intervals. The event settings have the following effect: If for a timestamp belonging to the time interval between tBeg and tEnd the measured water level is above Hthreshold and the measured flow is above Qthreshold, then, and only then, this timestamp is considered to lie within a “hydraulic event”. The script will detect hydraulic events and will generate plots and statistics for each event.

  • regressionSettings define which H/Q-regression models to use if flow data are lacking. Different models can be applied to different time intervals (sublist models). The sublist usage defines in which time intervals missing values are replaced by values that are predicted by the regression models rather than doing a linear interpolation (standard behaviour).

  • intervalsToRemove defines time intervals of which data shall not be used for the analysis. These intervals are removed before showing the event overview plots so that these intervals will not be shown. Also, you will not be able to calculate any composite sampling for these intervals.

  • settings defines diverse main settings that control the behaviour of the script. For an explanation of these settings, see the comments in the script below or the help page of the function kwb.monitoring::configure.

Define Helper Functions

Create a Testing Environment

Configure the Script

# Main Configuration -----------------------------------------------------------
settings <- configure(
  
  # Where is the "root" directory to raw data?
  rawdir = rawdir,
  
  # which station?
  station = "ALT", # "NEU"
  
  # "left", "centre" or "right" 
  sampleEventMethod = "centre",
  
  # either "interpolate" or "predict"
  # note the regressionSettings -> predict is only used for the intervals set 
  # in "usage", otherwise interpolate will be used
  replaceMissingQMethod = "interpolate", 
  
  # which bottles are to be considered (read from the sampler file)?
  bottlesToConsider = NA, # NA means: all bottles
  #bottlesToConsider = 1:4,
  
  # which bottles are to be discarded (because they are not full)?
  bottlesToDiscard = NA,
  #bottlesToDiscard = 7,
  
  # sample volume (in mL) given to the bottle representing the time 
  # interval with highest flow volume
  Vbottle = 1600, 
  
  # maximum total volume for mixed sample, in mL
  Vmax = 5000,
  
  # What are the level thresholds (in m) that trigger the start of the sampler?
  Hthresholds = c(ALT=0.055, NEU=0.05),
  
  # What are the level thresholds (in l/s) that can define the start of an event?
  Qthresholds = c(ALT=3, NEU=2.5), 
  
  # What are the minimum volumes that are to be considered in the plots?
  Vthresholds = c(ALT=50, NEU=32), 
  
  # time step in hydraulic data
  #tstep.s = 120, still needed?  
  
  tstep.fill.s = 120,
  
  # separation of events (how long in seconds may Hthreshold be underrun 
  # within an event?)
  evtSepTime = 4*3600, # in seconds, here: 4 hours
  
  # separation of sampled events within one and the same sampler file.
  # If the difference between two sample times is greater than this time 
  # (in seconds) two sampled events are distinguished. Set to NA to prevent
  # splitting of sampled events.  
  sampleEventSeparationTime = 4*3600, # NA, # 3600
  
  # minimum duration (in minutes) of hydraulic events to be considered
  durationThreshold = 10, # min
  
  # separator to be used in output files (csv)
  outsep = ";",
  
  # decimal character to be used in output files (csv)
  outdec = ",",
  
  # time interval in seconds for rain data aggregation, e.g. 600 = 10 minutes  
  # NA = no aggregation of original rain data  
  rain.aggregation.interval = 600
)

# Set the path to a path dictionary file
dictionaryFile <- system.file("extdata/pathDictionary.txt", package = "kwb.monitoring")
  
# Read the path dictionary. The dictionary describes paths, file names and file
# name patterns in the form of a "grammar". By using the dictioanry keywords can
# be resolved to full paths.
dictionary <- pathDictionary(
  dictionaryFile = dictionaryFile,
  RAW_DIR = settings$rawdir,
  STATION = settings$station
)

# Assign the path dictionary to the main settings. 
settings$dictionary <- dictionary

# Make the event settings and regression settings part of the main settings
settings$event = eventSettings
settings$regression = regressionSettings

# Define the path to a .RData file containing rain data
rainDataFile <- file.path(settings$rawdir, "..", "RData", "data_rain.RData")

Provide Create a Testing Environment

# Create a test directory structure for raw data
flow_dir <- getOrCreatePath("FLOW_DIR", dictionary)

last_flow_dir <- kwb.utils::createDirectory(file.path(flow_dir, "20180601"))
#> The parent directory /home/travis/R/Library/kwb.monitoring/extdata/rawdata/test-project/KWB/flows needs to be created first.
#> The parent directory /home/travis/R/Library/kwb.monitoring/extdata/rawdata/test-project/KWB needs to be created first.
#> The parent directory /home/travis/R/Library/kwb.monitoring/extdata/rawdata/test-project needs to be created first.
#> The directory "/home/travis/R/Library/kwb.monitoring/extdata/rawdata/test-project" was created.
#> The directory "/home/travis/R/Library/kwb.monitoring/extdata/rawdata/test-project/KWB" was created.
#> The directory "/home/travis/R/Library/kwb.monitoring/extdata/rawdata/test-project/KWB/flows" was created.
#> The directory "/home/travis/R/Library/kwb.monitoring/extdata/rawdata/test-project/KWB/flows/20180601" was created.

file <- file.path(last_flow_dir, "flows.csv")

writeLines(con = file, c(
  "NIVUS",
  "CPU32",
  "Datum",
  "Fenster min",
  "Fenster max",
  paste0("skip_", 6:8),
  "Datum\tUhrzeit\tH\tv\tQ\tT",
  "01.06.2018\t00:00:00\t0.1\t0.1\t0.1\t0.1", 
  "01.06.2018\t00:01:00\t0.2\t0.1\t0.1\t0.1",
  "01.06.2018\t00:02:00\t0.3\t0.1\t0.1\t0.1", 
  "01.06.2018\t00:03:00\t0.2\t0.1\t0.1\t0.1",
  "01.06.2018\t00:04:00\t0.1\t0.1\t0.1\t0.1", 
  "01.06.2018\t00:05:00\t0.0\t0.1\t0.1\t0.1"
))

kwb.utils::createDirectory(getOrCreatePath("SAMPLE_DIR", dictionary))
#> The directory "/home/travis/R/Library/kwb.monitoring/extdata/rawdata/test-project/KWB/samples" was created.
#> [1] "/home/travis/R/Library/kwb.monitoring/extdata/rawdata/test-project/KWB/samples"

User defined functions

Before we start, we need to define some helper functions. The following functions are defined:

  • read_hydraulics reads the hydraulic data (water level and flow). It finds all the needed information on monitoring station and paths in the settings given as the only parameter.

  • getCurrentFlowSubDirectory is a helper function used by read_hydraulics. It finds the flow subfolder containing the most current data, identified by sorting subfolder names, in our case representing the date in “yyyymmdd” format, decreasingly.

  • readSamplerFile provides a data frame containing the information on when samples were taken into which bottle, considering the numbers of the bottles to be considered (you may e.g. want to consider only bottles with an even or odd number).

  • rainGaugesCorrelatedWithStation returns for a given monitoring station a vector of names of rain gauges that have been found to measure rain heights correlating with the discharge volume measured at the monitoring station.

  • processSampleEvent defines how to proceed during the analysis of one so called “sample event”. With sample event we mean the time limits that are logged by the auto sampler that is triggered by the measured water level.

# read_hydraulics --------------------------------------------------------------
read_hydraulics <- function(settings)
{
  flowDirectory <- getOrCreatePath("FLOW_DIR", dictionary = settings$dictionary)
  
  flowSubdirCurrent <- getCurrentFlowSubDirectory(flowDirectory)
  
  csv <- getOrCreatePath(
    "FLOW_CSV", 
    dictionary = settings$dictionary, 
    FLOW_SUBDIR_CURRENT = flowSubdirCurrent
  )
  
  hydraulics <- kwb.logger::readLogger_NIVUS_PCM4_2(kwb.utils::safePath(csv))
  
  renames <- list(
    myDateTime = "DateTime",
    Fuellstand_m = "H",
    Geschw_m_s = "v",
    Durchfluss_l_s = "Q",
    T_.C = "T"
  )
  
  columnNames <- as.character(renames)
  
  hydraulics <- kwb.utils::selectColumns(
    kwb.utils::renameColumns(hydraulics, renames), 
    columnNames
  )
  
  hydraulics$DateTime <- as_utc(hydraulics$DateTime)
  
  hydraulics
  ### data frame with columns "DateTime" (POSIXct, UTC), "H",
  ### "v", "Q", "T"
}

# getCurrentFlowSubDirectory ---------------------------------------------------
getCurrentFlowSubDirectory <- function
(
  flowDirectory
)
{
  subDirectories <- dir(flowDirectory)
  
  patternSubdir <- "^\\d{8}$"
  unexpected <- grep(patternSubdir, subDirectories, value=TRUE, invert=TRUE)
  
  if (! kwb.utils::isNullOrEmpty(unexpected)) {
    stop(
      sprintf("There are unexpected files/folders in \"%s\": %s\n%s\n",
              flowDirectory, paste(kwb.utils::hsQuoteChr(unexpected)), 
              "Only folders of type 'YYYYMMDD' expected!")
    )
  }
  
  currentSubdir <- sort(setdiff(subDirectories, unexpected), decreasing=TRUE)[1]
  
  if (is.na(currentSubdir)) {
    warning("There is no subdirectory 'YYYYMMDD' in ", flowDirectory)
  }
  else {
    cat("Most recent flow sub-directory:", currentSubdir, "\n")
  }  
  
  return(currentSubdir)
}

# readSamplerFile --------------------------------------------------------------
readSamplerFile <- function 
(
  samplerFile, bottlesToConsider, siteCode = NA
)  
{
  cat(sprintf("Reading sample data from \"%s\"... ", basename(samplerFile)))
  
  if (containsNulString(samplerFile)) {
    cat("skipped!\n")
    warning(paste(
      sprintf(
        "File \"%s\" contains null string and is skipped!",
        basename(samplerFile)),
      "Remove first two bytes of the file!"))
    sampleDataExtended <- NULL
  }
  else {    
    sampleData <- kwb.logger::readLogger_SIGMA_SD900(samplerFile)
    cat("ok.\n")
    sampleSite <- attr(sampleData, "metadata")$SITE_ID
    
    if (!is.na(siteCode) && sampleSite != siteCode) {
      stop(sprintf("The SITE_ID indicated in \"%s\" is not \"%s\" as expected!",
                   sampleSite, siteCode))
    }
    
    sampleDataExtended <- cbind(
      samplerFile = basename(samplerFile), 
      sampleData, 
      stringsAsFactors = FALSE
    )
  }
  
  # filter for relevant bottles
  if (!is.null(bottlesToConsider) & 
        ! all(is.na(bottlesToConsider))) {
    cat("Filtering for bottles", commaCollapsed(bottlesToConsider), "... ")
    indices <- which(sampleDataExtended$bottle %in% bottlesToConsider)
    sampleDataExtended <- sampleDataExtended[indices, ]
    cat("ok.\n")
  }
  
  kwb.utils::renameColumns(sampleDataExtended, list(myDateTime = "sampleTime"))
}

# rainGaugesCorrelatedWithStation ----------------------------------------------
rainGaugesCorrelatedWithStation <- function 
### selects gauges after the result of the volume correlation test
(
  station = NULL
)
{
  x <- list(
    ALT = c("ReiI", "BlnXI", "BlnIX", "BlnX"),
    NEU = c("Wit", "ReiI", "BlnIX")    
  )
  
  if (!is.null(station)) {
    x <- x[[station]]
  } 
  
  x
  ### list (one list element per monitoring site) of character vectors 
  ### representing rain gauge names
}

# processSampleEvent -----------------------------------------------------------
processSampleEvent <- function
(
  hydraulicData, 
  settings, 
  events, 
  eventsAndStat, 
  sampleEventIndex = -1, 
  to.pdf = TRUE,
  verbose = FALSE
) 
{  
  stopifnot(length(sampleEventIndex) == 1)
  
  # Filter for sampler event by index
  fileName <- getByPositiveOrNegativeIndex(
    elements = sort(unique(events$samplerEvents$samplerFile)), 
    index = sampleEventIndex
  ) 
  
  sampleInformation <- filterSampleEventsForFilename(
    events = events, 
    fileName = fileName
  )  
  
  samplerEvent <- sampleInformation$samplerEvents
  
  if (verbose) {
    printSampleInformation(sampleInformation)
  }
  
  saveSampleInformation(sampleInformation, settings, sampleFile=samplerEvent$samplerFile)
  
  # find "merged" event(s) containing the "sampler event"
  mergedEventNumber <- indicesOfEventsContainingEvent(eventsAndStat$merged, samplerEvent)
  
  stopifnot(length(mergedEventNumber) == 1)
  
  mergedEventAndStat <- eventsAndStat$merged[mergedEventNumber, ]
  
  # select subset of data within the sampler event
  hydraulicSubset <- hsGetEvent(hydraulicData, events = mergedEventAndStat, 1)
  
  if (kwb.utils::isNullOrEmpty(hydraulicSubset)) {
    warning("There is no hydraulic data for this event:\n", 
            formatEvent(mergedEventAndStat))
    return()
  }
  
  # generate a column containing the bottle number for each timestep  
  hydraulicSubset$bottle <- hsEventNumber(
    hydraulicSubset$DateTime, 
    events = sampleInformation$bottleEvents,
    eventNumbers=sampleInformation$bottleEvents$bottle
  )
  
  # generate a column containing the cumulative volume
  hydraulicSubset$Vcum_m3 <- cumsum(hydraulicSubset$Q)*settings$tstep.fill.s/1000
  
  # write interpolated data to files for "quality assurance"
  eventName <- sampleLogFileToSampleName(samplerEvent$samplerFile)
  
  writeCsvToPathFromDictionary(
    dataFrame = hydraulicSubset, 
    key = "SAMPLED_EVENT_CSV_HYDRAULICS", 
    SAMPLED_EVENT_NAME = eventName, 
    settings = settings, 
    open.directory = FALSE
  )
  
  # calculate sum of flows per bottle  
  volumeCompositeSample <- calculateVolumeCompositeSample(  
    hydraulicSubset, settings
  )
  
  writeCsvToPathFromDictionary(
    dataFrame = addSumRow(volumeCompositeSample), 
    key = "SAMPLED_EVENT_CSV_COMPOSITE", 
    SAMPLED_EVENT_NAME = eventName,
    settings = settings, 
    open.directory = FALSE
  )
  
  plot_sampled_event(
    hydraulicData = hydraulicData,
    settings = settings, 
    sampleInformation = sampleInformation, 
    mergedEventAndStat = mergedEventAndStat,
    volumeCompositeSample = volumeCompositeSample,
    to.pdf = to.pdf
  )  
}

Script I: Data Visualisation and Composite Sample Calculation

After having done all the configuration above you may run the main script that is printed in the following. The following steps of which some are optional, are performed:

  • loading raw data (water level, flow) from csv files into hydraulicData.raw,

  • writing raw data (no meta data, exacly one header line) to a CSV file (optional),

  • generating a plot (by default in a PDF file that will be opened) showing the complete timeseries of hydraulic data and one plot per day,

  • providing “valid” data in hydraulicData.all by filling gaps in the raw data,

  • writing validated data to a CSV file (optional),

  • removing data from user-defined time intervals containing “invalid” data,

  • building different kinds of events (hydraulic = defined by H and Q, sampling = defined by the sampling times, merged = overlay of hydraulic events and sampling events),

  • generating a plot showing the event limits of hydraulic, sampling and merged events,

  • writing event data to files (optional),

  • adding hydraulic statistics (such as Qmax, discharge volume) as new columns to the tables of events, resulting in eventsAndStat,

  • writing event statistics to CSV files (optional),

  • loading rain data from an .RData file (optional),

  • filtering for events with a minimum discharge volume,

  • generating a plot (by default in a PDF file that will be opened) showing an overview of hydraulic events and one page showing level, discharge, rain (optional) and the event statistics per event,

  • finally calculating and visualising the shares of volumes that need to be taken from each bottle in order to create a volume-proportional composite sample, i.e. a composite sample in which concentrations are weighted with the discharge volumes that have been calculated for the time intervals that are represented by the different bottles.

And here comes the script:

# Read last available hydraulic data (required if station changed)
# "-10d" = last 10 days
hydraulicData.raw <- kwb.base::selectTimeInterval(
  read_hydraulics(settings), width = "-30d"
)
#> Most recent flow sub-directory: 20180601 
#> 
#> *** selectTimeInterval: t1=2018-05-02 00:05:00, t2=2018-06-01 00:05:00, width=-30d

writeCsvToPathFromDictionary(
  hydraulicData.raw, key = "HYDRAULIC_DATA_RAW", settings, 
  open.directory = FALSE
)
#> Writing to '/home/travis/R/Library/kwb.monitoring/extdata/rawdata/flows_raw.csv'... ok.

# Show overview plots (complete and daily), optional
showOverview(
  hydraulicData.raw, settings, Qmax = 20, Hmax = 35, to.pdf = TRUE, 
  save.pdf = TRUE
)

# Prepare hydraulic data (fill gaps, interpolate/predict)
hydraulicData.all <- validateAndFillHydraulicData(
  hydraulicData = hydraulicData.raw, 
  settings = settings
)
#> There are 2 timestamps (out of a total of 5 ) that are not multiples of the timestep ( 120 seconds ):
#> [1] "2018-06-01 00:01:00 UTC" "2018-06-01 00:03:00 UTC"
#> replaceMissingQMethod = interpolate -> The interpolated values are used when Q.raw is NA.

# Write data to file (optional)
writeCsvToPathFromDictionary(
  hydraulicData.all, key = "HYDRAULIC_DATA_VAL", settings, 
  open.directory = FALSE
)
#> Writing to '/home/travis/R/Library/kwb.monitoring/extdata/rawdata/flows_val.csv'... ok.

hydraulicData <- removeIntervals(
  hydraulicData.all, 
  intervals = intervalsToRemove[[settings$station]]
)

# Build different kinds of events (hydraulic, sample, merged), required
events <- getAllTypesOfEvents(
  hydraulicData, settings, FUN.readSamplerFile = readSamplerFile
)
#> Warning in getHydraulicEvents(hydraulicData, settings): *** The event conditions are never met.
#>   Range of Q values: [0.100000, 0.100000]
#>   Range of H values: [0.100000, 0.300000]
#>   Q-threshold: 3.000000
#>   H-threshold: 0.055000
#> Warning: There are no hydraulic events according to the defined criteria
#> (or hydraulicData was NULL)!
#> Warning: There are no sample files (matching "sample_\\d.csv") in:
#>   "/home/travis/R/Library/kwb.monitoring/extdata/rawdata/test-project/KWB/samples"
#> Warning: There are no sampling events!
#> Warning: No hydraulicEvents given to getMergedEvents(). Returning
#> samplerEvents.

# Plot overview of event limits (optional)
# plotEventOverview(events[c("hydraulic", "samplerEvents", "merged")], settings)
# 
# # Write events to files (optional)
# writeCsvToPathFromDictionary(events$samplingEvents, "SAMPLING_EVENTS", settings, 
#                              open.directory = FALSE)
# writeCsvToPathFromDictionary(events$bottleEvents, "BOTTLE_EVENTS", settings, 
#                              open.directory = FALSE)
# writeCsvToPathFromDictionary(events$samplerEvents, "SAMPLER_EVENTS", settings, 
#                              open.directory = FALSE)
# 
# # Add statistics to hydraulic events
# eventsAndStat <- addStatisticsToEvents(events, hydraulicData.all)
# 
# # Write event including statistics to file (optional)
# writeCsvToPathFromDictionary(
#   eventsAndStat$hydraulic, "HYDRAULIC_EVENTS", settings, 
#   open.directory = FALSE
# )
# 
# # Load rain data if available
# if (file.exists(rainDataFile)) {
#   load(rainDataFile)
# } else {
#   rainData <- NULL #(if you dont want to show rain data)    
# }
# 
# seriesNames <- rainGaugesCorrelatedWithStation(station = settings$station)
# 
# hydraulicEvents <- eventsAndStat$hydraulic
# 
# Vt <- settings$Vthresholds[settings$station]
# 
# hydraulicEvents <- kwb.utils::renameColumns(
#   hydraulicEvents, list(event = "eventNumber")
# )
# 
# # Select events with a minimum volume
# selected <- !is.na(hydraulicEvents$V.m3) & hydraulicEvents$V.m3 > Vt
# events.to.plot <- hydraulicEvents[selected,]
# 
# # plot hydraulic events with rain
# plot_hydraulic_events(
#   hydraulicData = hydraulicData, 
#   settings = settings, 
#   eventsAndStat = events.to.plot,
#   to.pdf = TRUE, 
#   rainData = rainData, 
#   gauges = seriesNames[1:3]
# )  
# 
# # Analyse one sampler event given by its number ("sampleEventIndex")
# # according to the order in the sample log files. By giving a negative index
# # you may choose events from the end (-1 means "last", -2 one before last,
# # etc.)
# processSampleEvent(
#   hydraulicData, settings, events, eventsAndStat, sampleEventIndex = -2, 
#   to.pdf = FALSE
# )

Script II: Finding and storing H/Q-regression models

The following script helps you to generate a regression model between water level and discharge values. Raw data are loaded into hydraulicData.raw. Data of time intervals that have been assessed as “invalid” (defined in intervalsToRemove, see above) are removed. The resulting timeseries of H and Q are displayed as well as a plot showing H over Q. You are provided a simple user interface to further manipulate the selection of values to be used to calculate a regression model.