Skip to content

Migrate from S3 #1157

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
291 changes: 172 additions & 119 deletions pages/advanced-algorithms/available-algorithms/migrate.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,13 @@ import { Steps } from 'nextra/components'

# migrate

A module that contains procedures describing graphs on a meta-level.
The `migrate` module provides an efficient way to transfer graph data from various relational databases
into Memgraph. This module allows you to retrieve data from various source systems,
transforming tabular data into graph structures.

With Cypher, you can shape the migrated data dynamically, making it easy to create nodes,
establish relationships, and enrich your graph. Below are examples showing how to retrieve,
filter, and convert relational data into a graph format.

<Cards>
<Cards.Card
Expand All @@ -25,204 +31,251 @@ A module that contains procedures describing graphs on a meta-level.
| **Implementation** | Python |
| **Parallelism** | sequential |

---

## Procedures

### `mysql()`

With the `migrate.mysql()` procedure you can access MySQL and migrate your data to Memgraph.
The result table is converted into a stream, and the returned rows can be used
to create graph structures. The value of the `config` parameter must be at least
an empty map. If `config_path` is passed, every key-value pair from JSON file
will overwrite any values in `config` file.
With the `migrate.mysql()` procedure, you can access MySQL and migrate your data to Memgraph.
The result table is converted into a stream, and the returned rows can be used to create graph structures.

{<h4 className="custom-header"> Input: </h4>}

* `table_or_sql: str` ➡ Table name or an SQL query. When the table name is specified, the module
will migrate all the rows from the table. In the case that a SQL query is provided, the module
will migrate the rows returned from the queries.
* `config: mgp.Map` ➡ Connection configuration parameters (as in `mysql.connector.connect`).
* `config_path` ➡ Path to a JSON file containing configuration parameters (as in `mysql.connector.connect`).
* `params: mgp.Nullable[mgp.Any] (default=None)` ➡ Optionally, queries can be parameterized. In that case, `params` provides parameter values.

- `table_or_sql: str` ➡ Table name or an SQL query.
- `config: mgp.Map` ➡ Connection parameters (as in `mysql.connector.connect`).
- `config_path` ➡ Path to a JSON file containing configuration parameters.
- `params: mgp.Nullable[mgp.Any] (default=None)` ➡ Query parameters (if applicable).

{<h4 className="custom-header"> Output: </h4>}

* `row: mgp.Map`: The result table as a stream of rows.
- `row: mgp.Map` The result table as a stream of rows.

{<h4 className="custom-header"> Usage: </h4>}

To inspect a sample of rows, use the following query:

#### Retrieve and inspect data
```cypher
CALL migrate.mysql('example_table', {user:'memgraph',
password:'password',
host:'localhost',
database:'demo_db'} )
CALL migrate.mysql('example_table', {user: 'memgraph',
password: 'password',
host: 'localhost',
database: 'demo_db'} )
YIELD row
RETURN row;
RETURN row
LIMIT 5000;
```

In the case you want to migrate specific results from a SQL query, it is enough to modify the
first argument of the query module call, and continue to use the Cypher query language to
shape your results:

#### Filter specific data
```cypher
CALL migrate.mysql('SELECT * FROM example_table', {user:'memgraph',
password:'password',
host:'localhost',
database:'demo_db'} )
CALL migrate.mysql('SELECT * FROM users', {user: 'memgraph',
password: 'password',
host: 'localhost',
database: 'demo_db'} )
YIELD row
WITH row
WHERE row.age >= 30
RETURN row;
```

### `sql_server()`
#### Create nodes from migrated data
```cypher
CALL migrate.mysql('SELECT id, name, age FROM users', {user: 'memgraph',
password: 'password',
host: 'localhost',
database: 'demo_db'} )
YIELD row
CREATE (u:User {id: row.id, name: row.name, age: row.age});
```

#### Create relationships between users
```cypher
CALL migrate.mysql('SELECT user1_id, user2_id FROM friendships', {user: 'memgraph',
password: 'password',
host: 'localhost',
database: 'demo_db'} )
YIELD row
MATCH (u1:User {id: row.user1_id}), (u2:User {id: row.user2_id})
CREATE (u1)-[:FRIENDS_WITH]->(u2);
```

With the `migrate.sql_server()` procedure you can access SQL Server and migrate your data
to Memgraph. The result table is converted into a stream, and the returned rows can
be used to create graph structures. The value of the `config` parameter must be
at least an empty map. If `config_path` is passed, every key-value pair from
JSON file will overwrite any values in `config` file.
---

### `oracle_db()`

With the `migrate.oracle_db()` procedure, you can access Oracle DB and migrate your data to Memgraph.

{<h4 className="custom-header"> Input: </h4>}

* `table_or_sql: str` ➡ Table name or an SQL query. When the table name is specified, the module
will migrate all the rows from the table. In the case that a SQL query is provided, the module
will migrate the rows returned from the queries.
* `config: mgp.Map` ➡ Connection configuration parameters (as in `pyodbc.connect`).
* `config_path` ➡ Path to the JSON file containing configuration parameters (as in `pyodbc.connect`).
* `params: mgp.Nullable[mgp.Any] (default=None)` ➡ Optionally, queries can be parameterized. In that case, `params` provides parameter values.

- `table_or_sql: str` ➡ Table name or an SQL query.
- `config: mgp.Map` ➡ Connection parameters (as in `mysql.connector.connect`).
- `config_path` ➡ Path to a JSON file containing configuration parameters.
- `params: mgp.Nullable[mgp.Any] (default=None)` ➡ Query parameters (if applicable).

{<h4 className="custom-header"> Output: </h4>}

* `row: mgp.Map`: The result table as a stream of rows.
- `row: mgp.Map` The result table as a stream of rows.

{<h4 className="custom-header"> Usage: </h4>}

To inspect the first 5000 rows from a database, use the following query:

#### Retrieve and inspect data
```cypher
CALL migrate.sql_server('example_table', {user:'memgraph',
password:'password',
host:'localhost',
database:'demo_db'} )
CALL migrate.oracle_db('example_table', {user: 'memgraph',
password: 'password',
host: 'localhost',
database: 'demo_db'} )
YIELD row
RETURN row
LIMIT 5000;
```

In the case you want to migrate specific results from a SQL query, it is enough to modify the
first argument of the query module call, and continue to use the Cypher query language to
shape your results:

#### Merge nodes to avoid duplicates
```cypher
CALL migrate.sql_server('SELECT * FROM example_table', {user:'memgraph',
password:'password',
host:'localhost',
database:'demo_db'} )
CALL migrate.oracle_db('SELECT id, name FROM companies', {user: 'memgraph',
password: 'password',
host: 'localhost',
database: 'business_db'} )
YIELD row
WITH row
WHERE row.age >= 30
RETURN row;
MERGE (c:Company {id: row.id})
SET c.name = row.name;
```

### `oracle_db()`
---

### `postgresql()`

With the `migrate.oracle_db` you can access Oracle DB and migrate your data to Memgraph.
The result table is converted into a stream, and the returned rows can be used to
create graph structures. The value of the `config` parameter must be at least an
empty map. If `config_path` is passed, every key-value pair from JSON file will
overwrite any values in `config` file.
With the `migrate.postgresql()` procedure, you can access PostgreSQL and migrate your data to Memgraph.

{<h4 className="custom-header"> Input: </h4>}

* `table_or_sql: str` ➡ Table name or an SQL query. When the table name is specified, the module
will migrate all the rows from the table. In the case that a SQL query is provided, the module
will migrate the rows returned from the queries.
* `config: mgp.Map` ➡ Connection configuration parameters (as in `oracledb.connect`).
* `config_path` ➡ Path to the JSON file containing configuration parameters (as in `oracledb.connect`).
* `params: mgp.Nullable[mgp.Any] (default=None)` ➡ Optionally, queries may be parameterized. In that case, `params` provides parameter values.

- `table_or_sql: str` ➡ Table name or an SQL query.
- `config: mgp.Map` ➡ Connection parameters (as in `mysql.connector.connect`).
- `config_path` ➡ Path to a JSON file containing configuration parameters.
- `params: mgp.Nullable[mgp.Any] (default=None)` ➡ Query parameters (if applicable).

{<h4 className="custom-header"> Output: </h4>}

* `row: mgp.Map`: The result table as a stream of rows.
- `row: mgp.Map` The result table as a stream of rows.

{<h4 className="custom-header"> Usage: </h4>}

To inspect the first 5000 rows from a database, use the following query:

#### Retrieve and inspect data
```cypher
CALL migrate.oracle_db('example_table', {user:'memgraph',
password:'password',
host:'localhost',
database:'demo_db'} )
CALL migrate.postgresql('example_table', {user: 'memgraph',
password: 'password',
host: 'localhost',
database: 'demo_db'} )
YIELD row
RETURN row
LIMIT 5000;
```

In the case you want to migrate specific results from a SQL query, it is enough to modify the
first argument of the query module call, and continue to use the Cypher query language to
shape your results:
#### Create nodes for products
```cypher
CALL migrate.postgresql('SELECT product_id, name, price FROM products', {user: 'memgraph',
password: 'password',
host: 'localhost',
database: 'retail_db'} )
YIELD row
CREATE (p:Product {id: row.product_id, name: row.name, price: row.price});
```

#### Establish relationships between orders and customers
```cypher
CALL migrate.oracle_db('SELECT * FROM example_table', {user:'memgraph',
password:'password',
host:'localhost',
database:'demo_db'} )
CALL migrate.postgresql('SELECT order_id, customer_id FROM orders', {user: 'memgraph',
password: 'password',
host: 'localhost',
database: 'retail_db'} )
YIELD row
WITH row
WHERE row.age >= 30
RETURN row;
MATCH (o:Order {id: row.order_id}), (c:Customer {id: row.customer_id})
CREATE (c)-[:PLACED]->(o);
```

### `postgresql()`
---

With the `migrate.postgresql` you can access PostgreSQL and migrate your data to Memgraph.
The result table is converted into a stream, and the returned rows can be used to
create graph structures. The value of the `config` parameter must be at least an
empty map. If `config_path` is passed, every key-value pair from JSON file will
overwrite any values in `config` file.
### `sql_server()`

With the `migrate.sql_server()` procedure, you can access SQL Server and migrate your data to Memgraph.

{<h4 className="custom-header"> Input: </h4>}

* `table_or_sql: str` ➡ Table name or an SQL query. When the table name is specified, the module
will migrate all the rows from the table. In the case that a SQL query is provided, the module
will migrate the rows returned from the queries.
* `config: mgp.Map` ➡ Connection configuration parameters (as in `psycopg2.connect`).
* `config_path` ➡ Path to the JSON file containing configuration parameters (as in `psycopg2.connect`).
* `params: mgp.Nullable[mgp.Any] (default=None)` ➡ Optionally, queries may be parameterized. In that case, `params` provides parameter values.

- `table_or_sql: str` ➡ Table name or an SQL query.
- `config: mgp.Map` ➡ Connection parameters (as in `mysql.connector.connect`).
- `config_path` ➡ Path to a JSON file containing configuration parameters.
- `params: mgp.Nullable[mgp.Any] (default=None)` ➡ Query parameters (if applicable).

{<h4 className="custom-header"> Output: </h4>}

* `row: mgp.Map`: The result table as a stream of rows.
- `row: mgp.Map` The result table as a stream of rows.

{<h4 className="custom-header"> Usage: </h4>}

To inspect the first 5000 rows from a database, use the following query:

#### Retrieve and inspect data
```cypher
CALL migrate.postgresql('example_table', {user:'memgraph',
password:'password',
host:'localhost',
database:'demo_db'} )
CALL migrate.sql_server('example_table', {user: 'memgraph',
password: 'password',
host: 'localhost',
database: 'demo_db'} )
YIELD row
RETURN row
LIMIT 5000;
```

In the case you want to migrate specific results from a SQL query, it is enough to modify the
first argument of the query module call, and continue to use the Cypher query language to
shape your results:
#### Convert SQL table rows into graph nodes
```cypher
CALL migrate.sql_server('SELECT id, name, role FROM employees', {user: 'memgraph',
password: 'password',
host: 'localhost',
database: 'company_db'} )
YIELD row
CREATE (e:Employee {id: row.id, name: row.name, role: row.role});
```

---

### `s3()`

With the `migrate.s3()` procedure, you can **access a CSV file in AWS S3**, stream the data into Memgraph,
and transform it into a **graph representation** using Cypher. The migration is using the Python `boto3` client.

{<h4 className="custom-header"> Input: </h4>}

- `file_path: str` ➡ S3 file path in the format `'s3://bucket-name/path/to/file.csv'`.
- `config: mgp.Map` ➡ AWS connection parameters. All of them are optional.
- `aws_access_key_id` - if not provided, environment variable `AWS_ACCESS_KEY_ID` will be used
- `aws_secret_access_key` - if not provided, environment variable `AWS_SECRET_ACCESS_KEY` will be used
- `region_name` - if not provided, environment variable `AWS_REGION` will be used
- `aws_session_token` - if not provided, environment variable `AWS_SESSION_TOKEN` will be used
- `config_path: str` (optional) ➡ Path to a JSON file containing AWS credentials.

{<h4 className="custom-header"> Output: </h4>}

- `row: mgp.Map` ➡ Each row from the CSV file as a structured dictionary.

{<h4 className="custom-header"> Usage: </h4>}

#### Retrieve and inspect CSV data from S3
```cypher
CALL migrate.s3('s3://my-bucket/data.csv', {aws_access_key_id: 'your-key',
aws_secret_access_key: 'your-secret',
region_name: 'us-east-1'} )
YIELD row
RETURN row
LIMIT 100;
```

#### Filter specific rows from the CSV
```cypher
CALL migrate.postgresql('SELECT * FROM example_table', {user:'memgraph',
password:'password',
host:'localhost',
database:'demo_db'} )
CALL migrate.s3('s3://my-bucket/customers.csv', {aws_access_key_id: 'your-key',
aws_secret_access_key: 'your-secret',
region_name: 'us-west-2'} )
YIELD row
WITH row
WHERE row.age >= 30
RETURN row;
```

#### Create nodes dynamically from CSV data
```cypher
CALL migrate.s3('s3://my-bucket/employees.csv', {aws_access_key_id: 'your-key',
aws_secret_access_key: 'your-secret',
region_name: 'eu-central-1'} )
YIELD row
CREATE (e:Employee {id: row.id, name: row.name, position: row.position});
```
Loading