Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Note: This tap currently does not support incremental state.
| files | False | None | An array of csv file stream settings. |
| csv_files_definition| False | None | A path to the JSON file holding an array of file settings. |
| add_metadata_columns| False | False | When True, add the metadata columns (`_sdc_source_file`, `_sdc_source_file_mtime`, `_sdc_source_lineno`) to output. |
| add_metadata_dict| False | False | When True, adds the metadata object (`source`, `time_extracted`) to output. |

A full list of supported settings and capabilities is available by running: `tap-csv --about`

Expand Down
20 changes: 18 additions & 2 deletions tap_csv/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@
import csv
import os
from datetime import datetime, timezone
from typing import Iterable, List, Optional
from typing import Iterable, List, Optional, Dict

from singer_sdk import typing as th
from singer_sdk.streams import Stream

SDC_SOURCE_FILE_COLUMN = "_sdc_source_file"
SDC_SOURCE_LINENO_COLUMN = "_sdc_source_lineno"
SDC_SOURCE_FILE_MTIME_COLUMN = "_sdc_source_file_mtime"

METADATA_COLUMN = "metadata"

class CSVStream(Stream):
"""Stream class for CSV streams."""
Expand Down Expand Up @@ -48,6 +48,10 @@ def get_records(self, context: Optional[dict]) -> Iterable[dict]:
if self.config.get("add_metadata_columns", False):
row = [file_path, file_last_modified, file_lineno] + row

if self.config.get("add_metadata_dict", False):
metadata_dict={"source": file_path, "time_extracted": datetime.utcnow()}
row = [metadata_dict] + row

yield dict(zip(self.header, row))

def _get_recursive_file_paths(self, file_path: str) -> list:
Expand Down Expand Up @@ -152,7 +156,19 @@ def schema(self) -> dict:
th.Property(SDC_SOURCE_FILE_MTIME_COLUMN, th.DateTimeType)
)
properties.append(th.Property(SDC_SOURCE_LINENO_COLUMN, th.IntegerType))

# If enabled, add file's metadata to output
if self.config.get("add_metadata_dict", False):
header = [
METADATA_COLUMN,
] + header

t = th.ObjectType(
th.Property("source", th.StringType),
th.Property("time_extracted", th.StringType),
additional_properties=False,
)
properties.append(th.Property(METADATA_COLUMN, t))
# Cache header for future use
self.header = header

Expand Down
9 changes: 9 additions & 0 deletions tap_csv/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,15 @@ class TapCSV(Tap):
"`_sdc_source_file_mtime`, `_sdc_source_lineno`) to output."
),
),
th.Property(
"add_metadata_dict",
th.BooleanType,
required=False,
default=False,
description=(
"When True, adds basic metadata as dict"
),
),
).to_dict()

@classproperty
Expand Down