Skip to content

Commit

Permalink
Merge pull request #862 from Azure-Samples/feat/sguda-setup-notebook-…
Browse files Browse the repository at this point in the history
…updates

Create initial version of notebooks
  • Loading branch information
promisinganuj authored Dec 10, 2024
2 parents 3358ede + 9e5ad7d commit 01543b0
Show file tree
Hide file tree
Showing 31 changed files with 3,430 additions and 518 deletions.
6 changes: 6 additions & 0 deletions e2e_samples/fabric_dataops_sample/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,18 @@ Here is a list of resources that are deployed:
![Fabric API Permissions](./images/admin-portal-allow-apis.png)

- Grant the service principal or managed identity the `Contributor` and `User Access Administrator` privileged roles on the Azure resource group. For `User Access Administrator` role, you would need to add delegate condition during role assignment. A condition is an additional check to provide more fine-grained access control. Check the [documentation](https://learn.microsoft.com/azure/role-based-access-control/delegate-role-assignments-portal?tabs=template) for more details. During the deployment, the `Storage Blob Data Contributor` and the `Key Vault Secrets Officer` roles are granted to a newly created service principal (Fabric workspace identity) and an existing Entra security group (Fabric workspace admins). Here is a valid sample condition for the `User Access Administrator` role assignment:

![Role Assignment Condition](./images/role-assignment-condition.png)

- Microsoft Graph API permissions:
- For service principal, grant the Graph API application permission `Group.Read.All` to read the security group properties.

![Graph API Permissions](./images/graph-api-permission.png)

- For managed identity, assign the elevated [Directory Readers](https://learn.microsoft.com/en-us/entra/identity/role-based-access-control/permissions-reference#directory-readers) role to read the security group properties. For this, go to `Azure Active Directory > Roles and administrators > Directory Readers > Add assignment`, and add the the managed identity.

![Managed Identity Permissions](./images/managed-identity-permission.png)

- Configure Fabric capacity administrators.
- If you want to use an **existing** Fabric capacity, ensure that both your user account and the principal (service principal or managed identity) are [added as Capacity Administrators](https://learn.microsoft.com/fabric/admin/capacity-settings?tabs=fabric-capacity#add-and-remove-admins) to that capacity.
- If you are creating a **new** Fabric capacity, you need to provide a list of users and principals (service principal or managed identity) that will be added as capacity admins in the `FABRIC_CAPACITY_ADMINS` environment variable. For users, mention 'userPrincipalName'. For principals (sp/mi), mention 'Object ID'. Don't add spaces after the comma.
Expand Down
49 changes: 49 additions & 0 deletions e2e_samples/fabric_dataops_sample/config/application.cfg.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# ##############################################################
# See - https://docs.python.org/3/library/configparser.html
# Tips:
# - ${section:option} can be used for interpolation of values from another section in the same file.
# - When creating stage specific resources like ADLS, whose names are globally unique
# - make sure we can create them i.e, those names are available.
# we can also have env specific sections like [dev] , [test] etc.,
# but it could cause maintenance issues and may not be ideal.
# Instead, define common sections and here make env specific derivations
# based on the input "env/stage" value in the code.
#
##############################################################

# Important:
# - DO NOT use quotes for wrapping strings
# - Note that workspace_name etc are not defined at each section level
# rather in DEFAULT section. This way we can use individual sections
# like `setup` or `standardize` or `transform` to over-ride
# the values from `DEFAULT` section if needed.

[DEFAULT]
service_name = parking_sensors_fabric_e2e
service_version = 0.1
workspace_name = <workspace-name>
workspace_id = <workspace-id>
lakehouse_name = <lakehouse-name>
lakehouse_id = <lakehouse-id>

[keyvault]
name = <keyvault-name>
uri = https://${keyvault:name}.vault.azure.net/

[setup]
# --------- Globals for nb-setup notebook
process_name = setup
# ddl file path is relative to the attached lakehouse's Files/ location
adls_shortcut_name = <adls-shortcut-name>
ddl_file = ${setup:adls_shortcut_name}/config/lakehouse_ddls.yaml

[standardize]
# --------- Globals for nb-standardize notebook
process_name = standardize

[transform]
# --------- Globals for nb-transform notebook
process_name = transform

[otel]
appinsights_connection_name = appinsights-connection-string
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
dependencies:
- pip:
- jupyter-black==0.3.4
- black==24.4.2
- pytest==8.2.2
- jupyter-black==0.4.0
- pytest==8.3.3
- ipytest==0.14.2
- opentelemetry-exporter-otlp==1.25.0
- azure-monitor-opentelemetry-exporter==1.0.0b32
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
# This examples uses advanced config using monitor exporters using:
# ref: https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/monitor/azure-monitor-opentelemetry-exporter#microsoft-opentelemetry-exporter-for-azure-monitor

# Simple configuration could also be done using as below in which case, tracer, logger and meter are
# pre-configured to send data to azure-monitor.
# ref: https://learn.microsoft.com/en-us/azure/azure-monitor/app/opentelemetry-enable?tabs=python
# ```
# from azure.monitor.opentelemetry import configure_azure_monitor

# # Configure the Distro to authenticate with Azure Monitor - without managed identity
# configure_azure_monitor(
# connection_string="your-connection-string"
# )

# # using a managed identity credential.
# configure_azure_monitor(
# connection_string="your-connection-string",
# credential=ManagedIdentityCredential(),
# )
# ```
# - https://learn.microsoft.com/en-us/python/api/overview/azure/monitor-opentelemetry-exporter-readme?view=azure-python-preview
# - https://learn.microsoft.com/en-us/python/api/overview/azure/monitor-opentelemetry-exporter-readme?view=azure-python-preview#examples

import logging

# from azure.monitor.opentelemetry import configure_azure_monitor # We are using the advanced
# configs shown below using AzureMonitor*Exporter.
from azure.monitor.opentelemetry.exporter import (
AzureMonitorLogExporter,
AzureMonitorMetricExporter,
AzureMonitorTraceExporter,
)

# metrics
# traces
from opentelemetry import metrics, trace

# logs
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

logging.basicConfig(level=logging.INFO)


class OpenTelemetryAppInsightsExporter:
def __init__(self, conn_string: str) -> None:
"""
Initializes the OpenTelemetryExporter class.
Args:
conn_string (str): Azure AppInsights connection string.
"""
self.conn_string = conn_string

return None

def get_otel_tracer(self, trace_resource_attributes: dict, tracer_name: str = __name__) -> object:
"""
Creates and returns an OpenTelemetry tracer object.
Args:
trace_resource_attributes (dict): The OpenTelemetry resource attributes in dictionary format
tracer_name (str): The name of the tracer. Default is __name__.
Returns:
tracer: OpenTelemetry tracer object
"""
resource = Resource(attributes=trace_resource_attributes)
tracer_provider = TracerProvider(resource=resource)
# Exporter to send data to AppInsights
trace_exporter = AzureMonitorTraceExporter(connection_string=self.conn_string)
span_processor = BatchSpanProcessor(trace_exporter)
tracer_provider.add_span_processor(span_processor)
tracer = trace.get_tracer(tracer_name, tracer_provider=tracer_provider)

return tracer

def get_otel_logger(
self, log_resource_attributes: dict, logger_name: str = __name__, add_console_handler: bool = True
) -> object:
"""
Creates and returns an OpenTelemetry logger object.
Args:
log_resource_attributes (dict): The OpenTelemetry resource attributes in dictionary format
logger_name (str): The name of the logger. Default is __name__.
add_console_handler (bool): Whether to add a console handler to the logger. Default is True.
Returns:
logger: OpenTelemetry logger object
"""
resource = Resource(attributes=log_resource_attributes)
log_exporter = AzureMonitorLogExporter(connection_string=self.conn_string)

logger_provider = LoggerProvider(resource=resource)
logger_provider.add_log_record_processor(BatchLogRecordProcessor(log_exporter))
handler = LoggingHandler(level=logging.INFO, logger_provider=logger_provider)
logging.getLogger().addHandler(handler) # Attach OTLP handler to root logger

logger = logging.getLogger(logger_name) # get namespaced logger

# # Create a console handler - Optional
if add_console_handler:
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
logger.addHandler(console_handler)

return logger

def get_otel_metrics(
self, metric_resource_attributes: dict, metric_name: str = __name__, metric_version: str = "0"
) -> object:
"""
Creates and returns an OpenTelemetry metrics object.
Args:
metric_resource_attributes (dict): The OpenTelemetry resource attributes in dictionary format
metric_name (str): The name of the metric. Default is __name__.
metric_version (str): The version of the metric. Default is "0".
Returns:
meter: OpenTelemetry meter object
"""
resource = Resource(attributes=metric_resource_attributes)
metrics_exporter = AzureMonitorMetricExporter(connection_string=self.conn_string)
metrics_reader = PeriodicExportingMetricReader(metrics_exporter)
metrics_provider = MeterProvider(resource=resource, metric_readers=[metrics_reader])
metrics.set_meter_provider(metrics_provider)
meter = metrics.get_meter_provider().get_meter(name=metric_name, version=metric_version)

return meter
157 changes: 157 additions & 0 deletions e2e_samples/fabric_dataops_sample/config/lakehouse_ddls.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
######################################################################################
# About this file: Config file to hold all DDLs for lakehouse
# NOTE:
# - Use {workspace_name} and {lakehouse_name}to indicate workspace and lakehouse names.
# These values will be replaced during runtime.
# - Ideally a full qualified name to query across workspaces would be composed of:
# {workspace_name}.{lakehouse_name}.{schema_name}.{table_name}
# However, as there is a known issue (as of Nov 2024), with workspace names having special characters
# in them, we are using {lakehouse_name}.{schema_name}.{table_name} for now. This way we can
# still have support for schemas as long we are querying in the same (currently working) workspace.
#
# Field Descriptions
# ------------------
# type: grouping indicator for tables
# table_name: table name with schema
# data_file: Path relative to Files/ in {workspace_name}.{lakehouse_name}.
# If specified this data is loaded into the table name mentioned in table_name
# in overwrite mode. Ex: if path is "Files/data/dim_time.csv" then this should be
# set as "data/dim_time.csv"
# create_sql: sql statement to create the table. Only one statement is allowed.
# runs when data_file is not specified.
# When both data_file and create_sql are specified then data_file takes precedence.
# ##############################################################################
version: 0.1
description: "DDLs for Parking sensor Fabric e2e sample"
table_types:
- type: fact
tables:
- table_name: dw.fact_parking
description: "Fact table for parking data"
data_file:
create_sql: |
CREATE TABLE {lakehouse_name}.dw.fact_parking (
dim_date_id STRING,
dim_time_id STRING,
dim_parking_bay_id STRING,
dim_location_id STRING,
dim_st_marker_id STRING,
status STRING,
load_id STRING,
loaded_on TIMESTAMP
)
USING DELTA
- type: dimension
tables:
- table_name: dw.dim_st_marker
description: "Dimension table for {workspace_name}.{lakehouse_name}.dw.dim_st_marker"
data_file:
create_sql: |
CREATE TABLE {lakehouse_name}.dw.dim_st_marker (
dim_st_marker_id STRING,
st_marker_id STRING,
load_id STRING,
loaded_on TIMESTAMP
)
USING DELTA
- table_name: dw.dim_location
description: "Dimension table for {workspace_name}.{lakehouse_name}.dw.dim_location"
data_file:
create_sql: |
CREATE TABLE {lakehouse_name}.dw.dim_location (
dim_location_id STRING,
lat FLOAT,
lon FLOAT,
load_id STRING,
loaded_on TIMESTAMP
)
USING DELTA
- table_name: dw.dim_parking_bay
description: "Dimension table for {workspace_name}.{lakehouse_name}.dw.dim_parking_bay"
data_file:
create_sql: |
CREATE TABLE {lakehouse_name}.dw.dim_parking_bay (
dim_parking_bay_id STRING,
bay_id INT,
marker_id STRING,
meter_id STRING,
rd_seg_dsc STRING,
rd_seg_id STRING,
load_id STRING,
loaded_on TIMESTAMP
)
USING DELTA
- type: reference
tables:
- table_name: dw.dim_date
description: "Dimension table for {workspace_name}.{lakehouse_name}.dw.dim_date"
data_file: sc-adls-main/reference/dim_date.csv
- table_name: dw.dim_time
description: "Dimension table for {workspace_name}.{lakehouse_name}.dw.dim_time"
data_file: sc-adls-main/reference/dim_time.csv
- type: interim
tables:
- table_name: interim.parking_bay
description: "Dimension table for {workspace_name}.{lakehouse_name}.interim.parking_bay"
data_file:
create_sql: |
CREATE TABLE {lakehouse_name}.interim.parking_bay (
bay_id INT,
last_edit TIMESTAMP,
marker_id STRING,
meter_id STRING,
rd_seg_dsc STRING,
rd_seg_id STRING,
the_geom STRUCT<coordinates: ARRAY<ARRAY<ARRAY<ARRAY<DOUBLE>>>>, type: STRING>,
load_id STRING,
loaded_on TIMESTAMP
)
USING DELTA
- table_name: interim.sensor
description: "Dimension table for {workspace_name}.{lakehouse_name}.interim.sensor"
data_file:
create_sql: |
CREATE TABLE {lakehouse_name}.interim.sensor (
bay_id INT,
st_marker_id STRING,
lat FLOAT,
lon FLOAT,
location STRUCT<coordinates: ARRAY<DOUBLE>, type: STRING>,
status STRING,
load_id STRING,
loaded_on TIMESTAMP
)
USING DELTA
- type: error
tables:
- table_name: malformed.parking_bay
description: "Dimension table for {workspace_name}.{lakehouse_name}.malformed.parking_bay"
data_file:
create_sql: |
CREATE TABLE {lakehouse_name}.malformed.parking_bay (
bay_id INT,
last_edit TIMESTAMP,
marker_id STRING,
meter_id STRING,
rd_seg_dsc STRING,
rd_seg_id STRING,
the_geom STRUCT<coordinates: ARRAY<ARRAY<ARRAY<ARRAY<DOUBLE>>>>, type: STRING>,
load_id STRING,
loaded_on TIMESTAMP
)
USING DELTA
- table_name: malformed.sensor
description: "Dimension table for {workspace_name}.{lakehouse_name}.malformed.sensor"
data_file:
create_sql: |
CREATE TABLE {lakehouse_name}.malformed.sensor (
bay_id INT,
st_marker_id STRING,
lat FLOAT,
lon FLOAT,
location STRUCT<coordinates: ARRAY<DOUBLE>, type: STRING>,
status STRING,
load_id STRING,
loaded_on TIMESTAMP
)
USING DELTA
Loading

0 comments on commit 01543b0

Please sign in to comment.