Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create initial version of notebooks #862

Merged
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions e2e_samples/fabric_dataops_sample/README.md
promisinganuj marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,18 @@ Here is a list of resources that are deployed:
![Fabric API Permissions](./images/admin-portal-allow-apis.png)

- Grant the service principal or managed identity the `Contributor` and `User Access Administrator` privileged roles on the Azure resource group. For `User Access Administrator` role, you would need to add delegate condition during role assignment. A condition is an additional check to provide more fine-grained access control. Check the [documentation](https://learn.microsoft.com/azure/role-based-access-control/delegate-role-assignments-portal?tabs=template) for more details. During the deployment, the `Storage Blob Data Contributor` and the `Key Vault Secrets Officer` roles are granted to a newly created service principal (Fabric workspace identity) and an existing Entra security group (Fabric workspace admins). Here is a valid sample condition for the `User Access Administrator` role assignment:

![Role Assignment Condition](./images/role-assignment-condition.png)

- Microsoft Graph API permissions:
- For service principal, grant the Graph API application permission `Group.Read.All` to read the security group properties.

![Graph API Permissions](./images/graph-api-permission.png)

- For managed identity, assign the elevated [Directory Readers](https://learn.microsoft.com/en-us/entra/identity/role-based-access-control/permissions-reference#directory-readers) role to read the security group properties. For this, go to `Azure Active Directory > Roles and administrators > Directory Readers > Add assignment`, and add the the managed identity.

![Managed Identity Permissions](./images/managed-identity-permission.png)

- Configure Fabric capacity administrators.
- If you want to use an **existing** Fabric capacity, ensure that both your user account and the principal (service principal or managed identity) are [added as Capacity Administrators](https://learn.microsoft.com/fabric/admin/capacity-settings?tabs=fabric-capacity#add-and-remove-admins) to that capacity.
- If you are creating a **new** Fabric capacity, you need to provide a list of users and principals (service principal or managed identity) that will be added as capacity admins in the `FABRIC_CAPACITY_ADMINS` environment variable. For users, mention 'userPrincipalName'. For principals (sp/mi), mention 'Object ID'. Don't add spaces after the comma.
Expand Down
49 changes: 49 additions & 0 deletions e2e_samples/fabric_dataops_sample/config/application.cfg.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# ##############################################################
# See - https://docs.python.org/3/library/configparser.html
# Tips:
# - ${section:option} can be used for interpolation of values from another section in the same file.
# - When creating stage specific resources like ADLS, whose names are globally unique
# - make sure we can create them i.e, those names are available.
# we can also have env specific sections like [dev] , [test] etc.,
# but it could cause maintenance issues and may not be ideal.
# Instead, define common sections and here make env specific derivations
# based on the input "env/stage" value in the code.
#
##############################################################

# Important:
# - DO NOT use quotes for wrapping strings
# - Note that workspace_name etc are not defined at each section level
# rather in DEFAULT section. This way we can use individual sections
# like `setup` or `standardize` or `transform` to over-ride
# the values from `DEFAULT` section if needed.

[DEFAULT]
service_name = parking_sensors_fabric_e2e
service_version = 0.1
workspace_name = <workspace-name>
workspace_id = <workspace-id>
lakehouse_name = <lakehouse-name>
lakehouse_id = <lakehouse-id>

[keyvault]
name = <keyvault-name>
uri = https://${keyvault:name}.vault.azure.net/

[setup]
# --------- Globals for nb-setup notebook
process_name = setup
# ddl file path is relative to the attached lakehouse's Files/ location
adls_shortcut_name = <adls-shortcut-name>
ddl_file = ${setup:adls_shortcut_name}/config/lakehouse_ddls.yaml

[standardize]
# --------- Globals for nb-standardize notebook
process_name = standardize

[transform]
# --------- Globals for nb-transform notebook
process_name = transform

[otel]
appinsights_connection_name = <appinsights-connection-string>
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
dependencies:
- pip:
- jupyter-black==0.3.4
- black==24.4.2
- pytest==8.2.2
- jupyter-black==0.4.0
- pytest==8.3.3
- ipytest==0.14.2
- opentelemetry-exporter-otlp==1.25.0
- azure-monitor-opentelemetry-exporter==1.0.0b32
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
# This examples uses advanced config using monitor exporters using:
# ref: https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/monitor/azure-monitor-opentelemetry-exporter#microsoft-opentelemetry-exporter-for-azure-monitor

# Simple configuration could also be done using as below in which case, tracer, logger and meter are
# pre-configured to send data to azure-monitor.
# ref: https://learn.microsoft.com/en-us/azure/azure-monitor/app/opentelemetry-enable?tabs=python
# ```
# from azure.monitor.opentelemetry import configure_azure_monitor

# # Configure the Distro to authenticate with Azure Monitor - without managed identity
# configure_azure_monitor(
# connection_string="your-connection-string"
# )

# # using a managed identity credential.
# configure_azure_monitor(
# connection_string="your-connection-string",
# credential=ManagedIdentityCredential(),
# )
# ```
# - https://learn.microsoft.com/en-us/python/api/overview/azure/monitor-opentelemetry-exporter-readme?view=azure-python-preview
# - https://learn.microsoft.com/en-us/python/api/overview/azure/monitor-opentelemetry-exporter-readme?view=azure-python-preview#examples

import logging

# from azure.monitor.opentelemetry import configure_azure_monitor # We are using the advanced
# configs shown below using AzureMonitor*Exporter.
from azure.monitor.opentelemetry.exporter import (
AzureMonitorLogExporter,
AzureMonitorMetricExporter,
AzureMonitorTraceExporter,
)

# metrics
# traces
from opentelemetry import metrics, trace

# logs
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

logging.basicConfig(level=logging.INFO)


class OpenTelemetryAppInsightsExporter:
def __init__(self, conn_string: str) -> None:
"""
Initializes the OpenTelemetryExporter class.

Args:
conn_string (str): Azure AppInsights connection string.
"""
self.conn_string = conn_string

return None

def get_otel_tracer(self, trace_resource_attributes: dict, tracer_name: str = __name__) -> object:
"""
Creates and returns an OpenTelemetry tracer object.

Args:
trace_resource_attributes (dict): The OpenTelemetry resource attributes in dictionary format
tracer_name (str): The name of the tracer. Default is __name__.
Returns:
tracer: OpenTelemetry tracer object
"""
resource = Resource(attributes=trace_resource_attributes)
tracer_provider = TracerProvider(resource=resource)
# Exporter to send data to AppInsights
trace_exporter = AzureMonitorTraceExporter(connection_string=self.conn_string)
span_processor = BatchSpanProcessor(trace_exporter)
tracer_provider.add_span_processor(span_processor)
tracer = trace.get_tracer(tracer_name, tracer_provider=tracer_provider)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @sreedhar-guda, it's tiny thing but let me ask.
Do you have any intention here why not setting tracer_provider globally like trace.set_tracer_provider(tracer_provider).
The reason why I'm asking this is because this code will have a following error (but it's not blocker) during exporting.

ERROR:azure.monitor.opentelemetry.exporter.export.trace._exporter:Failed to derive Resource from Tracer Provider: 'ProxyTracerProvider' object has no attribute 'resource'

(Reference: azure monitor otel github looks globally setting tracer_provider

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maniSbindra , do you know the above details?


return tracer

def get_otel_logger(
self, log_resource_attributes: dict, logger_name: str = __name__, add_console_handler: bool = True
) -> object:
"""
Creates and returns an OpenTelemetry logger object.

Args:
log_resource_attributes (dict): The OpenTelemetry resource attributes in dictionary format
logger_name (str): The name of the logger. Default is __name__.
add_console_handler (bool): Whether to add a console handler to the logger. Default is True.
Returns:
logger: OpenTelemetry logger object
"""
resource = Resource(attributes=log_resource_attributes)
log_exporter = AzureMonitorLogExporter(connection_string=self.conn_string)

logger_provider = LoggerProvider(resource=resource)
logger_provider.add_log_record_processor(BatchLogRecordProcessor(log_exporter))
handler = LoggingHandler(level=logging.INFO, logger_provider=logger_provider)
logging.getLogger().addHandler(handler) # Attach OTLP handler to root logger

logger = logging.getLogger(logger_name) # get namespaced logger

# # Create a console handler - Optional
if add_console_handler:
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
logger.addHandler(console_handler)

return logger

def get_otel_metrics(
self, metric_resource_attributes: dict, metric_name: str = __name__, metric_version: str = "0"
) -> object:
"""
Creates and returns an OpenTelemetry metrics object.

Args:
metric_resource_attributes (dict): The OpenTelemetry resource attributes in dictionary format
metric_name (str): The name of the metric. Default is __name__.
metric_version (str): The version of the metric. Default is "0".
Returns:
meter: OpenTelemetry meter object
"""
resource = Resource(attributes=metric_resource_attributes)
metrics_exporter = AzureMonitorMetricExporter(connection_string=self.conn_string)
metrics_reader = PeriodicExportingMetricReader(metrics_exporter)
metrics_provider = MeterProvider(resource=resource, metric_readers=[metrics_reader])
metrics.set_meter_provider(metrics_provider)
meter = metrics.get_meter_provider().get_meter(name=metric_name, version=metric_version)

return meter
157 changes: 157 additions & 0 deletions e2e_samples/fabric_dataops_sample/config/lakehouse_ddls.yaml
promisinganuj marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
######################################################################################
# About this file: Config file to hold all DDLs for lakehouse
# NOTE:
# - Use {workspace_name} and {lakehouse_name}to indicate workspace and lakehouse names.
# These values will be replaced during runtime.
# - Ideally a full qualified name to query across workspaces would be composed of:
# {workspace_name}.{lakehouse_name}.{schema_name}.{table_name}
# However, as there is a known issue (as of Nov 2024), with workspace names having special characters
# in them, we are using {lakehouse_name}.{schema_name}.{table_name} for now. This way we can
# still have support for schemas as long we are querying in the same (currently working) workspace.
#
# Field Descriptions
# ------------------
# type: grouping indicator for tables
# table_name: table name with schema
# data_file: Path relative to Files/ in {workspace_name}.{lakehouse_name}.
# If specified this data is loaded into the table name mentioned in table_name
# in overwrite mode. Ex: if path is "Files/data/dim_time.csv" then this should be
# set as "data/dim_time.csv"
# create_sql: sql statement to create the table. Only one statement is allowed.
# runs when data_file is not specified.
# When both data_file and create_sql are specified then data_file takes precedence.
# ##############################################################################
version: 0.1
description: "DDLs for Parking sensor Fabric e2e sample"
table_types:
- type: fact
tables:
- table_name: dw.fact_parking
description: "Fact table for parking data"
data_file:
create_sql: |
CREATE TABLE {lakehouse_name}.dw.fact_parking (
dim_date_id STRING,
dim_time_id STRING,
dim_parking_bay_id STRING,
dim_location_id STRING,
dim_st_marker_id STRING,
status STRING,
load_id STRING,
loaded_on TIMESTAMP
)
USING DELTA
- type: dimension
tables:
- table_name: dw.dim_st_marker
description: "Dimension table for {workspace_name}.{lakehouse_name}.dw.dim_st_marker"
data_file:
create_sql: |
CREATE TABLE {lakehouse_name}.dw.dim_st_marker (
dim_st_marker_id STRING,
st_marker_id STRING,
load_id STRING,
loaded_on TIMESTAMP
)
USING DELTA
- table_name: dw.dim_location
description: "Dimension table for {workspace_name}.{lakehouse_name}.dw.dim_location"
data_file:
create_sql: |
CREATE TABLE {lakehouse_name}.dw.dim_location (
dim_location_id STRING,
lat FLOAT,
lon FLOAT,
load_id STRING,
loaded_on TIMESTAMP
)
USING DELTA
- table_name: dw.dim_parking_bay
description: "Dimension table for {workspace_name}.{lakehouse_name}.dw.dim_parking_bay"
data_file:
create_sql: |
CREATE TABLE {lakehouse_name}.dw.dim_parking_bay (
dim_parking_bay_id STRING,
bay_id INT,
marker_id STRING,
meter_id STRING,
rd_seg_dsc STRING,
rd_seg_id STRING,
load_id STRING,
loaded_on TIMESTAMP
)
USING DELTA
- type: reference
tables:
- table_name: dw.dim_date
description: "Dimension table for {workspace_name}.{lakehouse_name}.dw.dim_date"
data_file: sc-adls-main/reference/dim_date.csv
- table_name: dw.dim_time
description: "Dimension table for {workspace_name}.{lakehouse_name}.dw.dim_time"
data_file: sc-adls-main/reference/dim_time.csv
- type: interim
tables:
- table_name: interim.parking_bay
description: "Dimension table for {workspace_name}.{lakehouse_name}.interim.parking_bay"
data_file:
create_sql: |
CREATE TABLE {lakehouse_name}.interim.parking_bay (
bay_id INT,
last_edit TIMESTAMP,
marker_id STRING,
meter_id STRING,
rd_seg_dsc STRING,
rd_seg_id STRING,
the_geom STRUCT<coordinates: ARRAY<ARRAY<ARRAY<ARRAY<DOUBLE>>>>, type: STRING>,
load_id STRING,
loaded_on TIMESTAMP
)
USING DELTA
- table_name: interim.sensor
description: "Dimension table for {workspace_name}.{lakehouse_name}.interim.sensor"
data_file:
create_sql: |
CREATE TABLE {lakehouse_name}.interim.sensor (
bay_id INT,
st_marker_id STRING,
lat FLOAT,
lon FLOAT,
location STRUCT<coordinates: ARRAY<DOUBLE>, type: STRING>,
status STRING,
load_id STRING,
loaded_on TIMESTAMP
)
USING DELTA
- type: error
tables:
- table_name: malformed.parking_bay
description: "Dimension table for {workspace_name}.{lakehouse_name}.malformed.parking_bay"
data_file:
create_sql: |
CREATE TABLE {lakehouse_name}.malformed.parking_bay (
bay_id INT,
last_edit TIMESTAMP,
marker_id STRING,
meter_id STRING,
rd_seg_dsc STRING,
rd_seg_id STRING,
the_geom STRUCT<coordinates: ARRAY<ARRAY<ARRAY<ARRAY<DOUBLE>>>>, type: STRING>,
load_id STRING,
loaded_on TIMESTAMP
)
USING DELTA
- table_name: malformed.sensor
description: "Dimension table for {workspace_name}.{lakehouse_name}.malformed.sensor"
data_file:
create_sql: |
CREATE TABLE {lakehouse_name}.malformed.sensor (
bay_id INT,
st_marker_id STRING,
lat FLOAT,
lon FLOAT,
location STRUCT<coordinates: ARRAY<DOUBLE>, type: STRING>,
status STRING,
load_id STRING,
loaded_on TIMESTAMP
)
USING DELTA
Loading
Loading