← 返回首页
HPCTrainingExamples/Python/hip-python at main · amd/HPCTrainingExamples · GitHub
Skip to content

Navigation Menu

Toggle navigation
Sign in
Appearance settings
Search or jump to...

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Include my email address so I can be contacted

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
Resetting focus

Latest commit

 

History

History
 main
Top

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
View all files

README.md

HIP-Python

README.md from HPCTrainingExamples/Python/hip-python in the Training Examples repository

For these examples, get a GPU with salloc or srun.

salloc -N 1 --ntasks 16 --gpus=1 --time=01:00:00 or srun -N 1 --ntasks 16 --gpus=1 --time=01:00:00 --pty /bin/bash

Be sure and free up the GPU when you are done with the exercises.

The first test is to check that the hip-python environment is set up correctly.

module load rocm hip-python python -c 'from hip import hip, hiprtc' 2> /dev/null && echo 'Success' || echo 'Failure'

Note

These examples assume you are working on AAC6, where a hip-python module is already installed. On your own or other training systems such as AAC7, you can install it yourself with

python3 -m venv hip-python-venv source hip-python-venv/bin/activate export ROCM_VERSION=`cat $ROCM_PATH/.info/version` python3 -m pip install -i https://test.pypi.org/simple "hip-python==${ROCM_VERSION}.*" --force-reinstall --no-cache python3 -m pip install -i https://test.pypi.org/simple "hip-python-as-cuda==${ROCM_VERSION}.*" --force-reinstall --no-cache

Make sure to specify the correct version string that matches your local ROCm installation. This example installs hip-python for rocm/7.2.0.

HIP-Python has an extensive capability for retrieving device properties and attributes. We'll take a look at the two main functions -- higGetDeviceProperties and hipDeviceGetAttribute.

Obtaining Device Properties

We'll take a look at the higGetDeviceProperties function first. Copy the following code into a file named hipGetDeviceProperties_example.py or pull the example down with

git clone https://github.com/AMD/HPCTrainingExamples cd HPCTrainingExamples/Python/hip-python

The hipGetDeviceProperties_example.py file

from hip import hip def hip_check(call_result): err = call_result[0] result = call_result[1:] if len(result) == 1: result = result[0] if isinstance(err, hip.hipError_t) and err != hip.hipError_t.hipSuccess: raise RuntimeError(str(err)) return result props = hip.hipDeviceProp_t() hip_check(hip.hipGetDeviceProperties(props,0)) for attrib in sorted(props.PROPERTIES()): print(f"props.{attrib}={getattr(props,attrib)}") print("ok")

Try it by loading the proper modules and running it with python3.

module load rocm hip-python python3 hipGetDeviceProperties_example.py

Some of the useful properties that can be obtained are:

props.managedMemory=1 props.name=b'AMD Instinct MI210' props.warpSize=64

Getting Device Attributes

The second function to get device information is hipDeviceGetAttribute. Copy the following into hipDeviceGetAttribute_example.py or use the file in the hip-python examples.

from hip import hip def hip_check(call_result): err = call_result[0] result = call_result[1:] if len(result) == 1: result = result[0] if isinstance(err, hip.hipError_t) and err != hip.hipError_t.hipSuccess: raise RuntimeError(str(err)) return result device_num = 0 for attrib in ( hip.hipDeviceAttribute_t.hipDeviceAttributeMaxBlockDimX, hip.hipDeviceAttribute_t.hipDeviceAttributeMaxBlockDimY, hip.hipDeviceAttribute_t.hipDeviceAttributeMaxBlockDimZ, hip.hipDeviceAttribute_t.hipDeviceAttributeMaxGridDimX, hip.hipDeviceAttribute_t.hipDeviceAttributeMaxGridDimY, hip.hipDeviceAttribute_t.hipDeviceAttributeMaxGridDimZ, hip.hipDeviceAttribute_t.hipDeviceAttributeWarpSize, ): value = hip_check(hip.hipDeviceGetAttribute(attrib,device_num)) print(f"{attrib.name}: {value}") print("ok")

Run this file.

module load rocm hip-python python3 hipDeviceGetAttribute_example.py

Output

hipDeviceAttributeMaxBlockDimX: 1024 hipDeviceAttributeMaxBlockDimY: 1024 hipDeviceAttributeMaxBlockDimZ: 1024 hipDeviceAttributeMaxGridDimX: 2147483647 hipDeviceAttributeMaxGridDimY: 65536 hipDeviceAttributeMaxGridDimZ: 65536 hipDeviceAttributeWarpSize: 64 ok

Accessing HIP Streams using HIP-Python

In the HIP streams example, we'll see how to create streams from Python and pass array data to the stream routines from Python arrays.

The code in the file hipstreams_example.py.

import ctypes import random import array from hip import hip def hip_check(call_result): err = call_result[0] result = call_result[1:] if len(result) == 1: result = result[0] if isinstance(err, hip.hipError_t) and err != hip.hipError_t.hipSuccess: raise RuntimeError(str(err)) return result # inputs n = 100 x_h = array.array("i",[int(random.random()*10) for i in range(0,n)]) num_bytes = x_h.itemsize * len(x_h) x_d = hip_check(hip.hipMalloc(num_bytes)) stream = hip_check(hip.hipStreamCreate()) hip_check(hip.hipMemcpyAsync(x_d,x_h,num_bytes,hip.hipMemcpyKind.hipMemcpyHostToDevice,stream)) hip_check(hip.hipMemsetAsync(x_d,0,num_bytes,stream)) hip_check(hip.hipMemcpyAsync(x_h,x_d,num_bytes,hip.hipMemcpyKind.hipMemcpyDeviceToHost,stream)) hip_check(hip.hipStreamSynchronize(stream)) hip_check(hip.hipStreamDestroy(stream)) # deallocate device data hip_check(hip.hipFree(x_d)) for i,x in enumerate(x_h): if x != 0: raise ValueError(f"expected '0' for element {i}, is: '{x}'") print("ok")

Now run this example.

module load rocm hip-python python3 hipstreams_example.py

Calling hipBLAS from Python using HIP-Python

In the file hipblas_numpy_example.py, the hipBLAS library Saxpy routine is called. It operates on a numpy data array.

import ctypes import math import numpy as np from hip import hip from hip import hipblas def hip_check(call_result): err = call_result[0] result = call_result[1:] if len(result) == 1: result = result[0] if isinstance(err,hip.hipError_t) and err != hip.hipError_t.hipSuccess: raise RuntimeError(str(err)) elif isinstance(err,hipblas.hipblasStatus_t) and err != hipblas.hipblasStatus_t.HIPBLAS_STATUS_SUCCESS: raise RuntimeError(str(err)) return result num_elements = 100 # input data on host alpha = ctypes.c_float(2) x_h = np.random.rand(num_elements).astype(dtype=np.float32) y_h = np.random.rand(num_elements).astype(dtype=np.float32) # expected result y_expected = alpha*x_h + y_h # device vectors num_bytes = num_elements * np.dtype(np.float32).itemsize x_d = hip_check(hip.hipMalloc(num_bytes)) y_d = hip_check(hip.hipMalloc(num_bytes)) # copy input data to device hip_check(hip.hipMemcpy(x_d,x_h,num_bytes,hip.hipMemcpyKind.hipMemcpyHostToDevice)) hip_check(hip.hipMemcpy(y_d,y_h,num_bytes,hip.hipMemcpyKind.hipMemcpyHostToDevice)) # call hipblasSaxpy + initialization & destruction of handle handle = hip_check(hipblas.hipblasCreate()) hip_check(hipblas.hipblasSaxpy(handle, num_elements, ctypes.addressof(alpha), x_d, 1, y_d, 1)) hip_check(hipblas.hipblasDestroy(handle)) # copy result (stored in y_d) back to host (store in y_h) hip_check(hip.hipMemcpy(y_h,y_d,num_bytes,hip.hipMemcpyKind.hipMemcpyDeviceToHost)) # compare to expected result if np.allclose(y_expected,y_h): print("ok") else: print("FAILED") #print(f"{y_h=}") #print(f"{y_expected=}") # clean up hip_check(hip.hipFree(x_d)) hip_check(hip.hipFree(y_d))

Using Unified Shared Memory for hipBLAS using HIP-Python

We can also take advantage of the single address space on the MI300A or the managed memory that moves the data from host to device and back for us on the other AMD Instinct GPUs. It simplifies the code because the memory does not have to be duplicated on the CPU and GPU. The code is in the file hipblas_numpy_USM_example.py.

import ctypes import math import numpy as np from hip import hip from hip import hipblas def hip_check(call_result): err = call_result[0] result = call_result[1:] if len(result) == 1: result = result[0] if isinstance(err,hip.hipError_t) and err != hip.hipError_t.hipSuccess: raise RuntimeError(str(err)) elif isinstance(err,hipblas.hipblasStatus_t) and err != hipblas.hipblasStatus_t.HIPBLAS_STATUS_SUCCESS: raise RuntimeError(str(err)) return result num_elements = 100 # input data on host alpha = ctypes.c_float(2) x_h = np.random.rand(num_elements).astype(dtype=np.float32) y_h = np.random.rand(num_elements).astype(dtype=np.float32) # expected result y_expected = alpha*x_h + y_h # call hipblasSaxpy + initialization & destruction of handle handle = hip_check(hipblas.hipblasCreate()) hip_check(hipblas.hipblasSaxpy(handle, num_elements, ctypes.addressof(alpha), x_h, 1, y_h, 1)) hip_check(hipblas.hipblasDestroy(handle)) # compare to expected result if np.allclose(y_expected,y_h): print("ok") else: print("FAILED") #print(f"{y_h=}") #print(f"{y_expected=}")

To run this unified shared memory example, we also need the environment variable HSA_XNACK set to one.

module load rocm hip-python export HSA_XNACK=1 python3 hipblas_numpy_USM_example.py

Calling hipFFT from Python using HIP-Python

The HIP FFT library can also be called from Python. We create a plan, perform the FFT, and then destroy the plan. This file is hipfft_numpy_example.py.

import numpy as np from hip import hip, hipfft def hip_check(call_result): err = call_result[0] result = call_result[1:] if len(result) == 1: result = result[0] if isinstance(err, hip.hipError_t) and err != hip.hipError_t.hipSuccess: raise RuntimeError(str(err)) if isinstance(err, hipfft.hipfftResult) and err != hipfft.hipfftResult.HIPFFT_SUCCESS: raise RuntimeError(str(err)) return result # initial data N = 100 hx = np.zeros(N,dtype=np.cdouble) hx[:] = 1 - 1j # copy to device dx = hip_check(hip.hipMalloc(hx.size*hx.itemsize)) hip_check(hip.hipMemcpy(dx, hx, dx.size, hip.hipMemcpyKind.hipMemcpyHostToDevice)) # create plan plan = hip_check(hipfft.hipfftPlan1d(N, hipfft.hipfftType.HIPFFT_Z2Z, 1)) # execute plan hip_check(hipfft.hipfftExecZ2Z(plan, idata=dx, odata=dx, direction=hipfft.HIPFFT_FORWARD)) hip_check(hip.hipDeviceSynchronize()) # copy to host and free device data hip_check(hip.hipMemcpy(hx,dx,dx.size,hip.hipMemcpyKind.hipMemcpyDeviceToHost)) hip_check(hip.hipFree(dx)) if not np.isclose(hx[0].real,N) or not np.isclose(hx[0].imag,-N): raise RuntimeError("element 0 must be '{N}-j{N}'.") for i in range(1,N): if not np.isclose(abs(hx[i]),0): raise RuntimeError(f"element {i} must be '0'") hip_check(hipfft.hipfftDestroy(plan)) print("ok")

Run this examples with:

module load rocm hip-python python3 hipfft_numpy_example.py

Unified Shared Memory version of calling hipFFT HIP-Python

The code is much simplier if we take advantage of the unified shared memory or managed memory. We can just use the host versions of the data directly. The simpler code is in hipfft_numpy_USM_example.py

import numpy as np from hip import hip, hipfft def hip_check(call_result): err = call_result[0] result = call_result[1:] if len(result) == 1: result = result[0] if isinstance(err, hip.hipError_t) and err != hip.hipError_t.hipSuccess: raise RuntimeError(str(err)) if isinstance(err, hipfft.hipfftResult) and err != hipfft.hipfftResult.HIPFFT_SUCCESS: raise RuntimeError(str(err)) return result # initial data N = 100 hx = np.zeros(N,dtype=np.cdouble) hx[:] = 1 - 1j # create plan plan = hip_check(hipfft.hipfftPlan1d(N, hipfft.hipfftType.HIPFFT_Z2Z, 1)) # execute plan hip_check(hipfft.hipfftExecZ2Z(plan, idata=hx, odata=hx, direction=hipfft.HIPFFT_FORWARD)) hip_check(hip.hipDeviceSynchronize()) if not np.isclose(hx[0].real,N) or not np.isclose(hx[0].imag,-N): raise RuntimeError("element 0 must be '{N}-j{N}'.") for i in range(1,N): if not np.isclose(abs(hx[i]),0): raise RuntimeError(f"element {i} must be '0'") hip_check(hipfft.hipfftDestroy(plan)) print("ok")

Run this with:

module load rocm hip-python export HSA_XNACK=1 python3 hipfft_numpy_USM_example.py

Calling RCCL from Python using HIP-Python

We can also call the RCCL communication library from Python using HIP-Python. An example of this is shown in rccl_example.py.

import numpy as np from hip import hip, rccl def hip_check(call_result): err = call_result[0] result = call_result[1:] if len(result) == 1: result = result[0] if isinstance(err, hip.hipError_t) and err != hip.hipError_t.hipSuccess: raise RuntimeError(str(err)) if isinstance(err, rccl.ncclResult_t) and err != rccl.ncclResult_t.ncclSuccess: raise RuntimeError(str(err)) return result # init the communicators num_gpus = hip_check(hip.hipGetDeviceCount()) comms = np.empty(num_gpus,dtype="uint64") # size of pointer type, such as ncclComm devlist = np.array(range(0,num_gpus),dtype="int32") hip_check(rccl.ncclCommInitAll(comms, num_gpus, devlist)) # init data on the devices N = 4 ones = np.ones(N,dtype="int32") zeros = np.zeros(ones.size,dtype="int32") dxlist = [] for dev in devlist: hip_check(hip.hipSetDevice(dev)) dx = hip_check(hip.hipMalloc(ones.size*ones.itemsize)) # items are bytes dxlist.append(dx) hx = ones if dev == 0 else zeros hip_check(hip.hipMemcpy(dx,hx,dx.size,hip.hipMemcpyKind.hipMemcpyHostToDevice)) # perform a broadcast hip_check(rccl.ncclGroupStart()) for dev in devlist: hip_check(hip.hipSetDevice(dev)) hip_check(rccl.ncclBcast(dxlist[dev], N, rccl.ncclDataType_t.ncclInt32, 0, int(comms[dev]), None)) # conversion to Python int is required to not let the numpy datatype to be interpreted as single-element Py_buffer hip_check(rccl.ncclGroupEnd()) # download and check the output; confirm all entries are one hx = np.empty(N,dtype="int32") for dev in devlist: dx=dxlist[dev] hx[:] = 0 hip_check(hip.hipMemcpy(hx,dx,dx.size,hip.hipMemcpyKind.hipMemcpyDeviceToHost)) for i,item in enumerate(hx): if item != 1: raise RuntimeError(f"failed for element {i}") # clean up for dx in dxlist: hip_check(hip.hipFree(dx)) for comm in comms: hip_check(rccl.ncclCommDestroy(int(comm))) # conversion to Python int is required to not let the numpy datatype to be interpreted as single-element Py_buffer print("ok")

Running this example:

module load rocm hip-python python3 rccl_example.py

Unified Shared Memory with RCCL using HIP-Python

We can also use the host data directly by relying on the unified shared memory or the managed memory on the AMD Instinct GPUs. The code for this is shown in rccl_USM_example.py

import numpy as np from hip import hip, rccl def hip_check(call_result): err = call_result[0] result = call_result[1:] if len(result) == 1: result = result[0] if isinstance(err, hip.hipError_t) and err != hip.hipError_t.hipSuccess: raise RuntimeError(str(err)) if isinstance(err, rccl.ncclResult_t) and err != rccl.ncclResult_t.ncclSuccess: raise RuntimeError(str(err)) return result # init the communicators num_gpus = hip_check(hip.hipGetDeviceCount()) comms = np.empty(num_gpus,dtype="uint64") # size of pointer type, such as ncclComm devlist = np.array(range(0,num_gpus),dtype="int32") hip_check(rccl.ncclCommInitAll(comms, num_gpus, devlist)) # init data on the devices N = 4 ones = np.ones(N,dtype="int32") zeros = np.zeros(ones.size,dtype="int32") dxlist = [] for dev in devlist: hip_check(hip.hipSetDevice(dev)) hx = ones if dev == 0 else zeros dxlist.append(hx) # perform a broadcast hip_check(rccl.ncclGroupStart()) for dev in devlist: hip_check(hip.hipSetDevice(dev)) hip_check(rccl.ncclBcast(dxlist[dev], N, rccl.ncclDataType_t.ncclInt32, 0, int(comms[dev]), None)) # conversion to Python int is required to not let the numpy datatype to be interpreted as single-element Py_buffer hip_check(rccl.ncclGroupEnd()) # download and check the output; confirm all entries are one hx = np.empty(N,dtype="int32") for dev in devlist: hx=dxlist[dev] for i,item in enumerate(hx): if item != 1: raise RuntimeError(f"failed for element {i}") # clean up for comm in comms: hip_check(rccl.ncclCommDestroy(int(comm))) # conversion to Python int is required to not let the numpy datatype to be interpreted as single-element Py_buffer print("ok")

Running this version requires setting HSA_XNACK to one as in the previous unified shared memory examples.

module load rocm hip-python export HSA_XNACK=1 python3 rccl_USM_example.py

Cython Basics

Cython compiles Python-like code to C for significant performance gains on CPU-bound operations. This section demonstrates basic Cython usage with a simple array sum example.

The example is in the cython_basic/ directory.

Simple Array Sum Example

The file array_sum.pyx contains the function that we like to pre-precompile. It is written in Cython syntax, which adds a few additional features to standard Python such as cdef to define types.

def array_sum(double[:, ::1] A): """Compute the sum of all elements in a 2D array.""" cdef int m = A.shape[0] cdef int n = A.shape[1] cdef int i, j cdef double result = 0 for i in range(m): for j in range(n): result += A[i, j] return result

The setup.py file defines how to build the extension:

from setuptools import Extension, setup from Cython.Build import cythonize setup( ext_modules=cythonize( [Extension("array_sum", ["array_sum.pyx"])], compiler_directives={"language_level": 3}, ) )

The key routine setup() is the entry point for the compilation. The cythonize utility takes care of translating the *.pyx file to C code, while the other commands provide details on how the translated files should be compiled, i.e. compiler flags, linked libraries, the list of source files to compile (in this case only array_sum.pyx), and the name of compiled module (in this case array_sum).

Once the extension has been compiled, it can be used in other Python files like a regular Python module:

import array_sum # Importing our compiled extension result = array_sum.array_sum(A) # Calling the precompiled function

Build and Run

First, install cython in your environment with

python3 -m venv cython_venv source cython_venv/bin/activate python3 -m pip install cython numpy

Then, build the extension and run the demo

python3 setup.py build_ext --inplace python3 cython_basic.py

Finally, clean up with

deactivate rm -rf cython_venv build array_sum*.so array_sum.c

The output should look like something like this (speedup varies by system):

Matrix size: 1000x1000 Pure Python: 113.0 ms (result: 500591.090701) Cython: 0.8 ms (result: 500591.090701) Speedup: 138.4x Correctness verified! ok

You can see the significant speedup we achieved by pre-compiling the array_sum function with Cython.

Cython with HIP-Python

This example demonstrates how Cython can be used together with HIP to accelerate the data preparation on the host before we launch a kernel. This pattern can be applied if the host side orchestration and preparation work becomes the bottleneck in Python code with GPU acceleration.

The example is in the cython_hip_example/ directory.

Source Files

First, we again prepare a Cython module with the code we want to speedup in matrix_prep.pyx In this case, it includes scaling a matrix by a scalar and copying the data to the GPU and back by calling the HIP API via hip-python. The code adds decorators which allow Cython to produce more optimized code.

# cython: language_level=3 cimport cython import numpy as np from hip import hip def hip_check(call_result): err = call_result[0] result = call_result[1:] if len(result) == 1: result = result[0] if isinstance(err, hip.hipError_t) and err != hip.hipError_t.hipSuccess: raise RuntimeError(str(err)) return result @cython.boundscheck(False) @cython.wraparound(False) def scale_only_cython(double[:, ::1] A, double scale): """Cython-optimized matrix scaling (CPU only).""" cdef int m = A.shape[0], n = A.shape[1] cdef int i, j for i in range(m): for j in range(n): A[i, j] *= scale @cython.boundscheck(False) @cython.wraparound(False) def prepare_and_transfer(double[:, ::1] A, double scale): """ Cython-optimized: scale matrix on CPU, then transfer to GPU. The nested loop is what Cython accelerates vs pure Python. """ cdef int m = A.shape[0], n = A.shape[1] cdef int i, j # CPU work: scale the matrix (Cython makes this fast) for i in range(m): for j in range(n): A[i, j] *= scale # GPU work: transfer to device num_bytes = A.shape[0] * A.shape[1] * sizeof(double) d_ptr = hip_check(hip.hipMalloc(num_bytes)) hip_check(hip.hipMemcpy(d_ptr, A, num_bytes, hip.hipMemcpyKind.hipMemcpyHostToDevice)) return d_ptr, num_bytes def transfer_back_and_free(d_ptr, double[:, ::1] A, size_t num_bytes): """Copy results from GPU and free device memory.""" hip_check(hip.hipMemcpy(A, d_ptr, num_bytes, hip.hipMemcpyKind.hipMemcpyDeviceToHost)) hip_check(hip.hipFree(d_ptr))

Next, we need a setup.py script to compile the Cython module. In this case, the script gets more complex, since it has to be compiled and linked against the base ROCm installation similar to what we would expect for "normal" C-code.

import os import numpy as np from setuptools import Extension, setup from Cython.Build import cythonize ROCM_PATH = os.environ.get("ROCM_PATH", "/opt/rocm") setup( ext_modules=cythonize([ Extension( "matrix_prep", sources=["matrix_prep.pyx"], include_dirs=[np.get_include()], library_dirs=[os.path.join(ROCM_PATH, "lib")], libraries=["amdhip64"], extra_compile_args=["-D__HIP_PLATFORM_AMD__"], ) ], compiler_directives={"language_level": 3}) )

Build and Run (requires GPU)

First, make sure that your environment is setup correctly. Now, we also need a ROCm installation:

python3 -m venv venv_cython source venv_cython/bin/activate module load rocm hip-python python3 -m pip install cython numpy

Then, we can build the Cython module and run it:

python3 setup.py build_ext --inplace python3 demo.py

You will see some output like this (performance will vary depending on your system):

1. CPU Computation Only (1000x1000 matrix scaling): Pure Python: 156.2 ms Cython: 0.7 ms Speedup: 239.0x 2. Full Pipeline (Cython prep + HIP transfer): Cython + HIP transfer: 210.1 ms 3. Correctness Check: Results match - verified! ok

Again, we see significant speedup if we precompile the host code!

Don't forget to clean up afterwards:

deactivate rm -rf venv_cython build matrix_prep*.so matrix_prep.c

Compiling and Launching Kernels

We can also create our own C programs and compile them with the hiprtc module for a Just-In-Time (JIT) compile capability. This example shows a C routine called print_tid() that is encoded as a string. The string is then converted into program source and compiled. We use the ability to query the device parameters to get the GPU architecture to compile for.

from hip import hip, hiprtc def hip_check(call_result): err = call_result[0] result = call_result[1:] if len(result) == 1: result = result[0] if isinstance(err, hip.hipError_t) and err != hip.hipError_t.hipSuccess: raise RuntimeError(str(err)) elif ( isinstance(err, hiprtc.hiprtcResult) and err != hiprtc.hiprtcResult.HIPRTC_SUCCESS ): raise RuntimeError(str(err)) return result source = b"""\ extern "C" __global__ void print_tid() { printf("tid: %d\\n", (int) threadIdx.x); } """ prog = hip_check(hiprtc.hiprtcCreateProgram(source, b"print_tid", 0, [], [])) props = hip.hipDeviceProp_t() hip_check(hip.hipGetDeviceProperties(props,0)) arch = props.gcnArchName print(f"Compiling kernel for {arch}") cflags = [b"--offload-arch="+arch] err, = hiprtc.hiprtcCompileProgram(prog, len(cflags), cflags) if err != hiprtc.hiprtcResult.HIPRTC_SUCCESS: log_size = hip_check(hiprtc.hiprtcGetProgramLogSize(prog)) log = bytearray(log_size) hip_check(hiprtc.hiprtcGetProgramLog(prog, log)) raise RuntimeError(log.decode()) code_size = hip_check(hiprtc.hiprtcGetCodeSize(prog)) code = bytearray(code_size) hip_check(hiprtc.hiprtcGetCode(prog, code)) module = hip_check(hip.hipModuleLoadData(code)) kernel = hip_check(hip.hipModuleGetFunction(module, b"print_tid")) # hip_check( hip.hipModuleLaunchKernel( kernel, *(1, 1, 1), # grid *(32, 1, 1), # block sharedMemBytes=0, stream=None, kernelParams=None, extra=None, ) ) hip_check(hip.hipDeviceSynchronize()) hip_check(hip.hipModuleUnload(module)) hip_check(hiprtc.hiprtcDestroyProgram(prog.createRef())) print("ok")

To run the example of creating a kernel and launching it:

module load rocm hip-python python3 create_launch_C_kernel.py

Kernels with arguments

It is a little more complicated to launch a kernel with arguments. The program is scale_vector() and it has six arguments. We add an "extra" field with the six arguments as part of the launch kernel call. This example is in kernel_with_arguments.py.

import ctypes import array import random import math from hip import hip, hiprtc def hip_check(call_result): err = call_result[0] result = call_result[1:] if len(result) == 1: result = result[0] if isinstance(err, hip.hipError_t) and err != hip.hipError_t.hipSuccess: raise RuntimeError(str(err)) elif ( isinstance(err, hiprtc.hiprtcResult) and err != hiprtc.hiprtcResult.HIPRTC_SUCCESS ): raise RuntimeError(str(err)) return result source = b"""\ extern "C" __global__ void scale_vector(float factor, int n, short unused1, int unused2, float unused3, float *x) { int tid = threadIdx.x + blockIdx.x * blockDim.x; if ( tid == 0 ) { printf("tid: %d, factor: %f, x*: %lu, n: %lu, unused1: %d, unused2: %d, unused3: %f\\n",tid,factor,x,n,(int) unused1,unused2,unused3); } if (tid < n) { x[tid] *= factor; } } """ prog = hip_check(hiprtc.hiprtcCreateProgram(source, b"scale_vector", 0, [], [])) props = hip.hipDeviceProp_t() hip_check(hip.hipGetDeviceProperties(props,0)) arch = props.gcnArchName print(f"Compiling kernel for {arch}") cflags = [b"--offload-arch="+arch] err, = hiprtc.hiprtcCompileProgram(prog, len(cflags), cflags) if err != hiprtc.hiprtcResult.HIPRTC_SUCCESS: log_size = hip_check(hiprtc.hiprtcGetProgramLogSize(prog)) log = bytearray(log_size) hip_check(hiprtc.hiprtcGetProgramLog(prog, log)) raise RuntimeError(log.decode()) code_size = hip_check(hiprtc.hiprtcGetCodeSize(prog)) code = bytearray(code_size) hip_check(hiprtc.hiprtcGetCode(prog, code)) module = hip_check(hip.hipModuleLoadData(code)) kernel = hip_check(hip.hipModuleGetFunction(module, b"scale_vector")) # kernel launch ## inputs n = 100 x_h = array.array("f",[random.random() for i in range(0,n)]) num_bytes = x_h.itemsize * len(x_h) x_d = hip_check(hip.hipMalloc(num_bytes)) print(f"{hex(int(x_d))=}") ## upload host data hip_check(hip.hipMemcpy(x_d,x_h,num_bytes,hip.hipMemcpyKind.hipMemcpyHostToDevice)) factor = 1.23 ## expected result x_expected = [a*factor for a in x_h] block = hip.dim3(x=32) grid = hip.dim3(math.ceil(n/block.x)) ## launch hip_check( hip.hipModuleLaunchKernel( kernel, *grid, *block, sharedMemBytes=0, stream=None, kernelParams=None, extra=( ctypes.c_float(factor), # 4 bytes ctypes.c_int(n), # 8 bytes ctypes.c_short(5), # unused1, 10 bytes ctypes.c_int(2), # unused2, 16 bytes (+2 padding bytes) ctypes.c_float(5.6), # unused3 20 bytes x_d, # 32 bytes (+4 padding bytes) ) ) ) # copy result back hip_check(hip.hipMemcpy(x_h,x_d,num_bytes,hip.hipMemcpyKind.hipMemcpyDeviceToHost)) for i,x_h_i in enumerate(x_h): if not math.isclose(x_h_i,x_expected[i],rel_tol=1e-6): raise RuntimeError(f"values do not match, {x_h[i]=} vs. {x_expected[i]=}, {i=}") hip_check(hip.hipFree(x_d)) hip_check(hip.hipModuleUnload(module)) hip_check(hiprtc.hiprtcDestroyProgram(prog.createRef())) print("ok")

Run this example with:

module load rocm hip-python python3 kernel_with_arguments.py

Numba-HIP

Numba-HIP allows to write GPU kernels for AMD GPUs and Just-in-Time (JIT) compilation using native Python code.

Installation

Numba-HIP is already installed on AAC6 as part of the hip-python module and can be loaded with

module load rocm hip-python

On other systems such as your own, you can install HIP-Python and Numba-HIP with

python3 -m venv hip-python-build source hip-python-build/bin/activate python3 -m pip install -i https://test.pypi.org/simple hip-python~=6.4.1 python3 -m pip config set global.extra-index-url https://test.pypi.org/simple python3 -m pip install "numba-hip[rocm-6-4-1] @ git+https://github.com/ROCm/numba-hip.git"

Replace the module load commands in the following by sourcing this virtual environment.

Note

Make sure to install the correct version that matches the ROCm installed on your system. For this, replace 6.4.1 and rocm-6-4-1 accordingly (e.g., 7.0.0 and rocm-7-0-0).

Kernel Definition

The kernel uses the @hip.jit decorator and follows the standard GPU programming model:

@hip.jit def f(a, b, c): # like threadIdx.x + (blockIdx.x * blockDim.x) tid = hip.grid(1) size = len(c) if tid < size: c[tid] = a[tid] + b[tid]

Memory Management with to_device

Numba-HIP provides the to_device API to transfer NumPy arrays (such as {a,b,c}_host) to GPU memory:

# Transfer to GPU memory via Numba-HIP API a_dev = hip.to_device(a_host) b_dev = hip.to_device(b_host) c_dev = hip.to_device(c_host)

After kernel execution, copy results back with copy_to_host():

c_host = c_dev.copy_to_host()

Running the Example

module load rocm hip-python python3 numba-hip.py

CUDA-Posing Mode

An alternative approach for porting existing CUDA code is to have numba-hip pose as CUDA. This allows using @cuda.jit syntax on AMD GPUs:

from numba import hip hip.pose_as_cuda() from numba import cuda @cuda.jit def f(a, b, c): # like threadIdx.x + (blockIdx.x * blockDim.x) tid = cuda.grid(1) size = len(c) if tid < size: c[tid] = a[tid] + b[tid]

Running this example:

module load rocm hip-python python3 numba-hip-cuda-posing.py

Footer

© 2026 GitHub, Inc.